Compare commits
2 Commits
v0.2.1
...
v0.2.1-alp
| Author | SHA1 | Date | |
|---|---|---|---|
| f4da47ca2b | |||
| 7939322a7f |
@@ -15,7 +15,7 @@ jobs:
|
||||
- name: Create Gitea release
|
||||
id: create
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_TOKEN: ${{ secrets.DEPLOY_TOKEN }}
|
||||
run: |
|
||||
TAG="${{ gitea.ref_name }}"
|
||||
BASE_URL="${{ gitea.server_url }}/api/v1/repos/${{ gitea.repository }}"
|
||||
@@ -126,7 +126,7 @@ jobs:
|
||||
|
||||
- name: Attach assets to release
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_TOKEN: ${{ secrets.DEPLOY_TOKEN }}
|
||||
run: |
|
||||
RELEASE_ID="${{ needs.create-release.outputs.release_id }}"
|
||||
BASE_URL="${{ gitea.server_url }}/api/v1/repos/${{ gitea.repository }}"
|
||||
@@ -196,7 +196,7 @@ jobs:
|
||||
|
||||
- name: Attach tarball to release
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_TOKEN: ${{ secrets.DEPLOY_TOKEN }}
|
||||
run: |
|
||||
RELEASE_ID="${{ needs.create-release.outputs.release_id }}"
|
||||
BASE_URL="${{ gitea.server_url }}/api/v1/repos/${{ gitea.repository }}"
|
||||
@@ -248,7 +248,7 @@ jobs:
|
||||
id: docker-login
|
||||
continue-on-error: true
|
||||
run: |
|
||||
echo "${{ secrets.GITEA_TOKEN }}" | docker login \
|
||||
echo "${{ secrets.DEPLOY_TOKEN }}" | docker login \
|
||||
"${{ steps.meta.outputs.server_host }}" \
|
||||
-u "${{ gitea.actor }}" --password-stdin
|
||||
|
||||
|
||||
55
TODO.md
55
TODO.md
@@ -1,26 +1,37 @@
|
||||
# Auto-Update Phase 1: Check & Notify
|
||||
# Build Size Reduction
|
||||
|
||||
## Backend
|
||||
- [ ] Add `packaging` to pyproject.toml dependencies
|
||||
- [ ] Create `core/update/__init__.py`
|
||||
- [ ] Create `core/update/release_provider.py` — ABC + data models
|
||||
- [ ] Create `core/update/gitea_provider.py` — Gitea REST API implementation
|
||||
- [ ] Create `core/update/version_check.py` — semver normalization + comparison
|
||||
- [ ] Create `core/update/update_service.py` — background service + state machine
|
||||
- [ ] Create `api/schemas/update.py` — Pydantic request/response models
|
||||
- [ ] Create `api/routes/update.py` — REST endpoints
|
||||
- [ ] Wire into `api/__init__.py`, `dependencies.py`, `main.py`
|
||||
## Phase 1: Quick Wins (build scripts)
|
||||
|
||||
## Frontend
|
||||
- [ ] Add update banner HTML to `index.html`
|
||||
- [ ] Add Updates tab to `settings.html`
|
||||
- [ ] Add `has-update` CSS styles for version badge in `layout.css`
|
||||
- [ ] Add update banner CSS styles in `components.css`
|
||||
- [ ] Create `features/update.ts` — update check/settings/banner logic
|
||||
- [ ] Wire exports in `app.ts`
|
||||
- [ ] Add i18n keys to `en.json`, `ru.json`, `zh.json`
|
||||
- [x] Strip unused NumPy submodules (polynomial, linalg, ma, lib, distutils)
|
||||
- [x] Strip debug symbols from .pyd/.dll/.so files
|
||||
- [x] Remove zeroconf service database
|
||||
- [x] Remove .py source from site-packages after compiling to .pyc
|
||||
- [x] Strip unused PIL image plugins (keep JPEG/PNG/ICO/BMP for tray)
|
||||
|
||||
## Phase 2: Replace Pillow with cv2
|
||||
|
||||
- [x] Create `utils/image_codec.py` with cv2-based image helpers
|
||||
- [x] Replace PIL in `_preview_helpers.py`
|
||||
- [x] Replace PIL in `picture_sources.py`
|
||||
- [x] Replace PIL in `color_strip_sources.py`
|
||||
- [x] Replace PIL in `templates.py`
|
||||
- [x] Replace PIL in `postprocessing.py`
|
||||
- [x] Replace PIL in `output_targets_keycolors.py`
|
||||
- [x] Replace PIL in `kc_target_processor.py`
|
||||
- [x] Replace PIL in `pixelate.py` filter
|
||||
- [x] Replace PIL in `downscaler.py` filter
|
||||
- [x] Replace PIL in `scrcpy_engine.py`
|
||||
- [x] Replace PIL in `live_stream_manager.py`
|
||||
- [x] Move Pillow from core deps to [tray] optional in pyproject.toml
|
||||
- [x] Make PIL import conditional in `tray.py`
|
||||
- [x] Move opencv-python-headless to core dependencies
|
||||
|
||||
## Phase 4: OpenCV stripping (build scripts)
|
||||
|
||||
- [x] Strip ffmpeg DLL, Haar cascades, dev files (already existed)
|
||||
- [x] Strip typing stubs (already existed)
|
||||
|
||||
## Verification
|
||||
- [ ] Lint check: `ruff check src/ tests/ --fix`
|
||||
- [ ] TypeScript check: `npx tsc --noEmit && npm run build`
|
||||
- [ ] Tests pass: `py -3.13 -m pytest tests/ --no-cov -q`
|
||||
|
||||
- [x] Lint: `ruff check src/ tests/ --fix`
|
||||
- [x] Tests: 341 passed
|
||||
|
||||
140
build-common.sh
Normal file
140
build-common.sh
Normal file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Shared build functions for LedGrab distribution packaging.
|
||||
# Sourced by build-dist.sh (Linux) and build-dist-windows.sh (Windows).
|
||||
#
|
||||
# Expected variables set by the caller before sourcing:
|
||||
# SCRIPT_DIR, BUILD_DIR, DIST_DIR, SERVER_DIR, APP_DIR
|
||||
|
||||
# ── Version detection ────────────────────────────────────────
|
||||
|
||||
detect_version() {
|
||||
# Usage: detect_version [explicit_version]
|
||||
local version="${1:-}"
|
||||
|
||||
if [ -z "$version" ]; then
|
||||
version=$(git describe --tags --exact-match 2>/dev/null || true)
|
||||
fi
|
||||
if [ -z "$version" ]; then
|
||||
version="${GITEA_REF_NAME:-${GITHUB_REF_NAME:-}}"
|
||||
fi
|
||||
if [ -z "$version" ]; then
|
||||
version=$(grep -oP '^version\s*=\s*"\K[^"]+' "$SERVER_DIR/pyproject.toml" 2>/dev/null || echo "0.0.0")
|
||||
fi
|
||||
|
||||
VERSION_CLEAN="${version#v}"
|
||||
|
||||
# Stamp the resolved version into pyproject.toml so that
|
||||
# importlib.metadata reads the correct value at runtime.
|
||||
sed -i "s/^version = .*/version = \"${VERSION_CLEAN}\"/" "$SERVER_DIR/pyproject.toml"
|
||||
}
|
||||
|
||||
# ── Clean previous build ─────────────────────────────────────
|
||||
|
||||
clean_dist() {
|
||||
if [ -d "$DIST_DIR" ]; then
|
||||
echo " Cleaning previous build..."
|
||||
rm -rf "$DIST_DIR"
|
||||
fi
|
||||
mkdir -p "$DIST_DIR"
|
||||
}
|
||||
|
||||
# ── Build frontend ───────────────────────────────────────────
|
||||
|
||||
build_frontend() {
|
||||
echo " Building frontend bundle..."
|
||||
(cd "$SERVER_DIR" && npm ci --loglevel error && npm run build) 2>&1 | {
|
||||
grep -v 'RemoteException' || true
|
||||
}
|
||||
}
|
||||
|
||||
# ── Copy application files ───────────────────────────────────
|
||||
|
||||
copy_app_files() {
|
||||
echo " Copying application files..."
|
||||
mkdir -p "$APP_DIR"
|
||||
|
||||
cp -r "$SERVER_DIR/src" "$APP_DIR/src"
|
||||
cp -r "$SERVER_DIR/config" "$APP_DIR/config"
|
||||
mkdir -p "$DIST_DIR/data" "$DIST_DIR/logs"
|
||||
|
||||
# Clean up source maps and __pycache__
|
||||
find "$APP_DIR" -name "*.map" -delete 2>/dev/null || true
|
||||
find "$APP_DIR" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
|
||||
}
|
||||
|
||||
# ── Site-packages cleanup ────────────────────────────────────
|
||||
#
|
||||
# Strips tests, type stubs, unused submodules, and debug symbols
|
||||
# from the installed site-packages directory.
|
||||
#
|
||||
# Args:
|
||||
# $1 — path to site-packages directory
|
||||
# $2 — native extension suffix: "pyd" (Windows) or "so" (Linux)
|
||||
# $3 — native lib suffix for OpenCV ffmpeg: "dll" or "so"
|
||||
|
||||
cleanup_site_packages() {
|
||||
local sp_dir="$1"
|
||||
local ext_suffix="${2:-so}"
|
||||
local lib_suffix="${3:-so}"
|
||||
|
||||
echo " Cleaning up site-packages to reduce size..."
|
||||
|
||||
# ── Generic cleanup ──────────────────────────────────────
|
||||
find "$sp_dir" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$sp_dir" -type d -name tests -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$sp_dir" -type d -name test -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$sp_dir" -type d -name "*.dist-info" -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$sp_dir" -name "*.pyi" -delete 2>/dev/null || true
|
||||
|
||||
# ── pip / setuptools (not needed at runtime) ─────────────
|
||||
rm -rf "$sp_dir"/pip "$sp_dir"/pip-* 2>/dev/null || true
|
||||
rm -rf "$sp_dir"/setuptools "$sp_dir"/setuptools-* "$sp_dir"/pkg_resources 2>/dev/null || true
|
||||
rm -rf "$sp_dir"/_distutils_hack 2>/dev/null || true
|
||||
|
||||
# ── OpenCV ───────────────────────────────────────────────
|
||||
local cv2_dir="$sp_dir/cv2"
|
||||
if [ -d "$cv2_dir" ]; then
|
||||
# Remove ffmpeg (28 MB on Windows), Haar cascades, dev files
|
||||
rm -f "$cv2_dir"/opencv_videoio_ffmpeg*."$lib_suffix" 2>/dev/null || true
|
||||
rm -rf "$cv2_dir/data" "$cv2_dir/gapi" "$cv2_dir/misc" "$cv2_dir/utils" 2>/dev/null || true
|
||||
rm -rf "$cv2_dir/typing_stubs" "$cv2_dir/typing" 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# ── NumPy ────────────────────────────────────────────────
|
||||
# Remove unused submodules (only core, fft, random are used)
|
||||
for mod in polynomial linalg ma lib distutils f2py typing _pyinstaller; do
|
||||
rm -rf "$sp_dir/numpy/$mod" 2>/dev/null || true
|
||||
done
|
||||
rm -rf "$sp_dir/numpy/tests" "$sp_dir/numpy/*/tests" 2>/dev/null || true
|
||||
|
||||
# ── Pillow (only used for system tray icon) ──────────────
|
||||
rm -rf "$sp_dir/PIL/tests" 2>/dev/null || true
|
||||
# Remove unused image format plugins (keep JPEG, PNG, ICO, BMP)
|
||||
for plugin in Eps Gif Tiff Webp Psd Pcx Xbm Xpm Dds Ftex Gbr Grib \
|
||||
Icns Im Imt Iptc McIrdas Mpo Msp Pcd Pixar Ppm Sgi \
|
||||
Spider Sun Tga Wal Wmf; do
|
||||
rm -f "$sp_dir/PIL/${plugin}ImagePlugin.py" 2>/dev/null || true
|
||||
rm -f "$sp_dir/PIL/${plugin}ImagePlugin.pyc" 2>/dev/null || true
|
||||
done
|
||||
|
||||
# ── zeroconf ─────────────────────────────────────────────
|
||||
rm -rf "$sp_dir/zeroconf/_services" 2>/dev/null || true
|
||||
|
||||
# ── Strip debug symbols ──────────────────────────────────
|
||||
if command -v strip &>/dev/null; then
|
||||
echo " Stripping debug symbols from .$ext_suffix files..."
|
||||
find "$sp_dir" -name "*.$ext_suffix" -exec strip --strip-debug {} \; 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# ── Remove .py source (keep .pyc bytecode) ───────────────
|
||||
echo " Removing .py source from site-packages (keeping .pyc)..."
|
||||
find "$sp_dir" -name "*.py" ! -name "__init__.py" -delete 2>/dev/null || true
|
||||
|
||||
# ── Remove wled_controller if pip-installed ───────────────
|
||||
rm -rf "$sp_dir"/wled_controller* "$sp_dir"/wled*.dist-info 2>/dev/null || true
|
||||
|
||||
local cleaned_size
|
||||
cleaned_size=$(du -sh "$sp_dir" | cut -f1)
|
||||
echo " Site-packages after cleanup: $cleaned_size"
|
||||
}
|
||||
@@ -22,27 +22,13 @@ PYTHON_DIR="$DIST_DIR/python"
|
||||
APP_DIR="$DIST_DIR/app"
|
||||
PYTHON_VERSION="${PYTHON_VERSION:-3.11.9}"
|
||||
|
||||
source "$SCRIPT_DIR/build-common.sh"
|
||||
|
||||
# ── Version detection ────────────────────────────────────────
|
||||
|
||||
VERSION="${1:-}"
|
||||
|
||||
if [ -z "$VERSION" ]; then
|
||||
VERSION=$(git describe --tags --exact-match 2>/dev/null || true)
|
||||
fi
|
||||
if [ -z "$VERSION" ]; then
|
||||
VERSION="${GITEA_REF_NAME:-${GITHUB_REF_NAME:-}}"
|
||||
fi
|
||||
if [ -z "$VERSION" ]; then
|
||||
VERSION=$(grep -oP '^version\s*=\s*"\K[^"]+' "$SERVER_DIR/pyproject.toml" 2>/dev/null || echo "0.0.0")
|
||||
fi
|
||||
|
||||
VERSION_CLEAN="${VERSION#v}"
|
||||
detect_version "${1:-}"
|
||||
ZIP_NAME="LedGrab-v${VERSION_CLEAN}-win-x64.zip"
|
||||
|
||||
# Stamp the resolved version into pyproject.toml so that
|
||||
# importlib.metadata reads the correct value at runtime.
|
||||
sed -i "s/^version = .*/version = \"${VERSION_CLEAN}\"/" "$SERVER_DIR/pyproject.toml"
|
||||
|
||||
echo "=== Cross-building LedGrab v${VERSION_CLEAN} (Windows from Linux) ==="
|
||||
echo " Embedded Python: $PYTHON_VERSION"
|
||||
echo " Output: build/$ZIP_NAME"
|
||||
@@ -50,11 +36,8 @@ echo ""
|
||||
|
||||
# ── Clean ────────────────────────────────────────────────────
|
||||
|
||||
if [ -d "$DIST_DIR" ]; then
|
||||
echo "[1/9] Cleaning previous build..."
|
||||
rm -rf "$DIST_DIR"
|
||||
fi
|
||||
mkdir -p "$DIST_DIR"
|
||||
echo "[1/9] Cleaning..."
|
||||
clean_dist
|
||||
|
||||
# ── Download Windows embedded Python ─────────────────────────
|
||||
|
||||
@@ -195,15 +178,11 @@ WHEEL_DIR="$BUILD_DIR/win-wheels"
|
||||
mkdir -p "$WHEEL_DIR"
|
||||
|
||||
# Core dependencies (cross-platform, should have win_amd64 wheels)
|
||||
# We parse pyproject.toml deps and download win_amd64 wheels.
|
||||
# For packages that are pure Python, --only-binary will fail,
|
||||
# so we fall back to allowing source for those.
|
||||
DEPS=(
|
||||
"fastapi>=0.115.0"
|
||||
"uvicorn[standard]>=0.32.0"
|
||||
"httpx>=0.27.2"
|
||||
"mss>=9.0.2"
|
||||
"Pillow>=10.4.0"
|
||||
"numpy>=2.1.3"
|
||||
"pydantic>=2.9.2"
|
||||
"pydantic-settings>=2.6.0"
|
||||
@@ -220,7 +199,6 @@ DEPS=(
|
||||
"sounddevice>=0.5"
|
||||
"aiomqtt>=2.0.0"
|
||||
"openrgb-python>=0.2.15"
|
||||
# camera extra
|
||||
"opencv-python-headless>=4.8.0"
|
||||
)
|
||||
|
||||
@@ -232,8 +210,9 @@ WIN_DEPS=(
|
||||
"winrt-Windows.Foundation>=3.0.0"
|
||||
"winrt-Windows.Foundation.Collections>=3.0.0"
|
||||
"winrt-Windows.ApplicationModel>=3.0.0"
|
||||
# System tray
|
||||
# System tray (Pillow needed by pystray for tray icon)
|
||||
"pystray>=0.19.0"
|
||||
"Pillow>=10.4.0"
|
||||
)
|
||||
|
||||
# Download cross-platform deps (prefer binary, allow source for pure Python)
|
||||
@@ -286,73 +265,26 @@ for sdist in "$WHEEL_DIR"/*.tar.gz; do
|
||||
done
|
||||
|
||||
# ── Reduce package size ────────────────────────────────────────
|
||||
echo " Cleaning up to reduce size..."
|
||||
|
||||
# Remove caches, tests, docs, type stubs
|
||||
find "$SITE_PACKAGES" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$SITE_PACKAGES" -type d -name tests -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$SITE_PACKAGES" -type d -name test -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$SITE_PACKAGES" -type d -name "*.dist-info" -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$SITE_PACKAGES" -name "*.pyi" -delete 2>/dev/null || true
|
||||
cleanup_site_packages "$SITE_PACKAGES" "pyd" "dll"
|
||||
|
||||
# Remove pip and setuptools (not needed at runtime)
|
||||
rm -rf "$SITE_PACKAGES"/pip "$SITE_PACKAGES"/pip-* 2>/dev/null || true
|
||||
rm -rf "$SITE_PACKAGES"/setuptools "$SITE_PACKAGES"/setuptools-* "$SITE_PACKAGES"/pkg_resources 2>/dev/null || true
|
||||
rm -rf "$SITE_PACKAGES"/_distutils_hack 2>/dev/null || true
|
||||
|
||||
# Remove pythonwin GUI IDE and help file (ships with pywin32 but not needed)
|
||||
# Windows-specific cleanup
|
||||
rm -rf "$SITE_PACKAGES"/pythonwin 2>/dev/null || true
|
||||
rm -f "$SITE_PACKAGES"/PyWin32.chm 2>/dev/null || true
|
||||
|
||||
# OpenCV: remove ffmpeg DLL (28MB, only for video file I/O, not camera),
|
||||
# Haar cascades (2.6MB), and misc dev files
|
||||
CV2_DIR="$SITE_PACKAGES/cv2"
|
||||
if [ -d "$CV2_DIR" ]; then
|
||||
rm -f "$CV2_DIR"/opencv_videoio_ffmpeg*.dll 2>/dev/null || true
|
||||
rm -rf "$CV2_DIR/data" "$CV2_DIR/gapi" "$CV2_DIR/misc" "$CV2_DIR/utils" 2>/dev/null || true
|
||||
rm -rf "$CV2_DIR/typing_stubs" "$CV2_DIR/typing" 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# numpy: remove tests, f2py, typing stubs
|
||||
rm -rf "$SITE_PACKAGES/numpy/tests" "$SITE_PACKAGES/numpy/*/tests" 2>/dev/null || true
|
||||
rm -rf "$SITE_PACKAGES/numpy/f2py" 2>/dev/null || true
|
||||
rm -rf "$SITE_PACKAGES/numpy/typing" 2>/dev/null || true
|
||||
rm -rf "$SITE_PACKAGES/numpy/_pyinstaller" 2>/dev/null || true
|
||||
|
||||
# Pillow: remove unused image plugins' test data
|
||||
rm -rf "$SITE_PACKAGES/PIL/tests" 2>/dev/null || true
|
||||
|
||||
# winrt: remove type stubs
|
||||
find "$SITE_PACKAGES/winrt" -name "*.pyi" -delete 2>/dev/null || true
|
||||
|
||||
# Remove wled_controller if it got installed
|
||||
rm -rf "$SITE_PACKAGES"/wled_controller* "$SITE_PACKAGES"/wled*.dist-info 2>/dev/null || true
|
||||
|
||||
CLEANED_SIZE=$(du -sh "$SITE_PACKAGES" | cut -f1)
|
||||
echo " Site-packages after cleanup: $CLEANED_SIZE"
|
||||
|
||||
WHEEL_COUNT=$(ls "$WHEEL_DIR"/*.whl 2>/dev/null | wc -l)
|
||||
echo " Installed $WHEEL_COUNT packages"
|
||||
|
||||
# ── Build frontend ───────────────────────────────────────────
|
||||
|
||||
echo "[7/9] Building frontend bundle..."
|
||||
(cd "$SERVER_DIR" && npm ci --loglevel error && npm run build) 2>&1 | {
|
||||
grep -v 'RemoteException' || true
|
||||
}
|
||||
echo "[7/9] Building frontend..."
|
||||
build_frontend
|
||||
|
||||
# ── Copy application files ───────────────────────────────────
|
||||
|
||||
echo "[8/9] Copying application files..."
|
||||
mkdir -p "$APP_DIR"
|
||||
|
||||
cp -r "$SERVER_DIR/src" "$APP_DIR/src"
|
||||
cp -r "$SERVER_DIR/config" "$APP_DIR/config"
|
||||
mkdir -p "$DIST_DIR/data" "$DIST_DIR/logs"
|
||||
|
||||
# Clean up source maps and __pycache__
|
||||
find "$APP_DIR" -name "*.map" -delete 2>/dev/null || true
|
||||
find "$APP_DIR" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
|
||||
copy_app_files
|
||||
|
||||
# Pre-compile Python bytecode for faster startup
|
||||
echo " Pre-compiling Python bytecode..."
|
||||
|
||||
@@ -17,38 +17,21 @@ SERVER_DIR="$SCRIPT_DIR/server"
|
||||
VENV_DIR="$DIST_DIR/venv"
|
||||
APP_DIR="$DIST_DIR/app"
|
||||
|
||||
source "$SCRIPT_DIR/build-common.sh"
|
||||
|
||||
# ── Version detection ────────────────────────────────────────
|
||||
|
||||
VERSION="${1:-}"
|
||||
|
||||
if [ -z "$VERSION" ]; then
|
||||
VERSION=$(git describe --tags --exact-match 2>/dev/null || true)
|
||||
fi
|
||||
if [ -z "$VERSION" ]; then
|
||||
VERSION="${GITEA_REF_NAME:-${GITHUB_REF_NAME:-}}"
|
||||
fi
|
||||
if [ -z "$VERSION" ]; then
|
||||
VERSION=$(grep -oP '^version\s*=\s*"\K[^"]+' "$SERVER_DIR/pyproject.toml" 2>/dev/null || echo "0.0.0")
|
||||
fi
|
||||
|
||||
VERSION_CLEAN="${VERSION#v}"
|
||||
detect_version "${1:-}"
|
||||
TAR_NAME="LedGrab-v${VERSION_CLEAN}-linux-x64.tar.gz"
|
||||
|
||||
# Stamp the resolved version into pyproject.toml so that
|
||||
# importlib.metadata reads the correct value at runtime.
|
||||
sed -i "s/^version = .*/version = \"${VERSION_CLEAN}\"/" "$SERVER_DIR/pyproject.toml"
|
||||
|
||||
echo "=== Building LedGrab v${VERSION_CLEAN} (Linux) ==="
|
||||
echo " Output: build/$TAR_NAME"
|
||||
echo ""
|
||||
|
||||
# ── Clean ────────────────────────────────────────────────────
|
||||
|
||||
if [ -d "$DIST_DIR" ]; then
|
||||
echo "[1/7] Cleaning previous build..."
|
||||
rm -rf "$DIST_DIR"
|
||||
fi
|
||||
mkdir -p "$DIST_DIR"
|
||||
echo "[1/7] Cleaning..."
|
||||
clean_dist
|
||||
|
||||
# ── Create virtualenv ────────────────────────────────────────
|
||||
|
||||
@@ -60,38 +43,25 @@ pip install --upgrade pip --quiet
|
||||
# ── Install dependencies ─────────────────────────────────────
|
||||
|
||||
echo "[3/7] Installing dependencies..."
|
||||
pip install --quiet "${SERVER_DIR}[camera,notifications]" 2>&1 | {
|
||||
pip install --quiet "${SERVER_DIR}[notifications]" 2>&1 | {
|
||||
grep -i 'error\|failed' || true
|
||||
}
|
||||
|
||||
# Remove the installed wled_controller package (PYTHONPATH handles app code)
|
||||
SITE_PACKAGES="$VENV_DIR/lib/python*/site-packages"
|
||||
rm -rf $SITE_PACKAGES/wled_controller* $SITE_PACKAGES/wled*.dist-info 2>/dev/null || true
|
||||
# Resolve site-packages path (glob expand)
|
||||
SITE_PACKAGES=$(echo "$VENV_DIR"/lib/python*/site-packages)
|
||||
|
||||
# Clean up caches
|
||||
find "$VENV_DIR" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$VENV_DIR" -type d -name tests -exec rm -rf {} + 2>/dev/null || true
|
||||
find "$VENV_DIR" -type d -name test -exec rm -rf {} + 2>/dev/null || true
|
||||
# Clean up with shared function
|
||||
cleanup_site_packages "$SITE_PACKAGES" "so" "so"
|
||||
|
||||
# ── Build frontend ───────────────────────────────────────────
|
||||
|
||||
echo "[4/7] Building frontend bundle..."
|
||||
(cd "$SERVER_DIR" && npm ci --loglevel error && npm run build) 2>&1 | {
|
||||
grep -v 'RemoteException' || true
|
||||
}
|
||||
echo "[4/7] Building frontend..."
|
||||
build_frontend
|
||||
|
||||
# ── Copy application files ───────────────────────────────────
|
||||
|
||||
echo "[5/7] Copying application files..."
|
||||
mkdir -p "$APP_DIR"
|
||||
|
||||
cp -r "$SERVER_DIR/src" "$APP_DIR/src"
|
||||
cp -r "$SERVER_DIR/config" "$APP_DIR/config"
|
||||
mkdir -p "$DIST_DIR/data" "$DIST_DIR/logs"
|
||||
|
||||
# Clean up source maps and __pycache__
|
||||
find "$APP_DIR" -name "*.map" -delete 2>/dev/null || true
|
||||
find "$APP_DIR" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
|
||||
copy_app_files
|
||||
|
||||
# ── Create launcher ──────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -28,7 +28,6 @@ dependencies = [
|
||||
"httpx>=0.27.2",
|
||||
"packaging>=23.0",
|
||||
"mss>=9.0.2",
|
||||
"Pillow>=10.4.0",
|
||||
"numpy>=2.1.3",
|
||||
"pydantic>=2.9.2",
|
||||
"pydantic-settings>=2.6.0",
|
||||
@@ -46,6 +45,7 @@ dependencies = [
|
||||
"sounddevice>=0.5",
|
||||
"aiomqtt>=2.0.0",
|
||||
"openrgb-python>=0.2.15",
|
||||
"opencv-python-headless>=4.8.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -57,9 +57,11 @@ dev = [
|
||||
"black>=24.0.0",
|
||||
"ruff>=0.6.0",
|
||||
"opencv-python-headless>=4.8.0",
|
||||
"Pillow>=10.4.0",
|
||||
]
|
||||
camera = [
|
||||
"opencv-python-headless>=4.8.0",
|
||||
# opencv-python-headless is now a core dependency (used for image encoding)
|
||||
# camera extra kept for backwards compatibility
|
||||
]
|
||||
# OS notification capture (winrt packages are ~2.5MB total vs winsdk's ~35MB)
|
||||
notifications = [
|
||||
@@ -78,6 +80,7 @@ perf = [
|
||||
]
|
||||
tray = [
|
||||
"pystray>=0.19.0; sys_platform == 'win32'",
|
||||
"Pillow>=10.4.0; sys_platform == 'win32'",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
"""Shared helpers for WebSocket-based capture preview endpoints."""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable, Optional
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from starlette.websockets import WebSocket
|
||||
|
||||
from wled_controller.core.filters import FilterRegistry, ImagePool
|
||||
from wled_controller.utils import get_logger
|
||||
from wled_controller.utils.image_codec import (
|
||||
encode_jpeg,
|
||||
encode_jpeg_data_uri,
|
||||
resize_down,
|
||||
thumbnail,
|
||||
)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -32,47 +35,35 @@ def authenticate_ws_token(token: str) -> bool:
|
||||
return verify_ws_token(token)
|
||||
|
||||
|
||||
def _encode_jpeg(pil_image: Image.Image, quality: int = 85) -> str:
|
||||
"""Encode a PIL image as a JPEG base64 data URI."""
|
||||
buf = io.BytesIO()
|
||||
pil_image.save(buf, format="JPEG", quality=quality)
|
||||
buf.seek(0)
|
||||
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
|
||||
return f"data:image/jpeg;base64,{b64}"
|
||||
def _encode_jpeg(image: np.ndarray, quality: int = 85) -> str:
|
||||
"""Encode a numpy RGB image as a JPEG base64 data URI."""
|
||||
return encode_jpeg_data_uri(image, quality)
|
||||
|
||||
|
||||
def encode_preview_frame(image: np.ndarray, max_width: int = None, quality: int = 80) -> bytes:
|
||||
"""Encode a numpy RGB image to JPEG bytes, optionally downscaling."""
|
||||
pil_img = Image.fromarray(image)
|
||||
if max_width and image.shape[1] > max_width:
|
||||
scale = max_width / image.shape[1]
|
||||
new_h = int(image.shape[0] * scale)
|
||||
pil_img = pil_img.resize((max_width, new_h), Image.LANCZOS)
|
||||
buf = io.BytesIO()
|
||||
pil_img.save(buf, format="JPEG", quality=quality)
|
||||
return buf.getvalue()
|
||||
if max_width:
|
||||
image = resize_down(image, max_width)
|
||||
return encode_jpeg(image, quality)
|
||||
|
||||
|
||||
def _make_thumbnail(pil_image: Image.Image, max_width: int) -> Image.Image:
|
||||
def _make_thumbnail(image: np.ndarray, max_width: int) -> np.ndarray:
|
||||
"""Create a thumbnail copy of the image, preserving aspect ratio."""
|
||||
thumb = pil_image.copy()
|
||||
aspect = pil_image.height / pil_image.width
|
||||
thumb.thumbnail((max_width, int(max_width * aspect)), Image.Resampling.LANCZOS)
|
||||
return thumb
|
||||
return thumbnail(image, max_width)
|
||||
|
||||
|
||||
def _apply_pp_filters(pil_image: Image.Image, flat_filters: list) -> Image.Image:
|
||||
"""Apply postprocessing filter instances to a PIL image."""
|
||||
def _apply_pp_filters(image: np.ndarray, flat_filters: list) -> np.ndarray:
|
||||
"""Apply postprocessing filter instances to a numpy image."""
|
||||
if not flat_filters:
|
||||
return pil_image
|
||||
return image
|
||||
pool = ImagePool()
|
||||
arr = np.array(pil_image)
|
||||
arr = image
|
||||
for fi in flat_filters:
|
||||
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
|
||||
result = f.process_image(arr, pool)
|
||||
if result is not None:
|
||||
arr = result
|
||||
return Image.fromarray(arr)
|
||||
return arr
|
||||
|
||||
|
||||
async def stream_capture_test(
|
||||
@@ -98,7 +89,7 @@ async def stream_capture_test(
|
||||
thumb_width = preview_width or PREVIEW_MAX_WIDTH
|
||||
|
||||
# Shared state between capture thread and async loop
|
||||
latest_frame = None # PIL Image (converted from numpy)
|
||||
latest_frame = None # numpy RGB array
|
||||
frame_count = 0
|
||||
total_capture_time = 0.0
|
||||
stop_event = threading.Event()
|
||||
@@ -121,9 +112,8 @@ async def stream_capture_test(
|
||||
continue
|
||||
total_capture_time += t1 - t0
|
||||
frame_count += 1
|
||||
# Convert numpy -> PIL once in the capture thread
|
||||
if isinstance(capture.image, np.ndarray):
|
||||
latest_frame = Image.fromarray(capture.image)
|
||||
latest_frame = capture.image
|
||||
else:
|
||||
latest_frame = capture.image
|
||||
except Exception as e:
|
||||
@@ -202,7 +192,7 @@ async def stream_capture_test(
|
||||
if pp_filters:
|
||||
final_frame = _apply_pp_filters(final_frame, pp_filters)
|
||||
|
||||
w, h = final_frame.size
|
||||
h, w = final_frame.shape[:2]
|
||||
|
||||
full_uri = _encode_jpeg(final_frame, FINAL_JPEG_QUALITY)
|
||||
thumb = _make_thumbnail(final_frame, FINAL_THUMBNAIL_WIDTH)
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Color strip source routes: CRUD, calibration test, preview, and API input push."""
|
||||
|
||||
import asyncio
|
||||
import io as _io
|
||||
import json as _json
|
||||
import time as _time
|
||||
import uuid as _uuid
|
||||
@@ -989,7 +988,8 @@ async def test_color_strip_ws(
|
||||
try:
|
||||
frame = _frame_live.get_latest_frame()
|
||||
if frame is not None and frame.image is not None:
|
||||
from PIL import Image as _PIL_Image
|
||||
from wled_controller.utils.image_codec import encode_jpeg
|
||||
import cv2 as _cv2
|
||||
img = frame.image
|
||||
# Ensure 3-channel RGB (some engines may produce BGRA)
|
||||
if img.ndim == 3 and img.shape[2] == 4:
|
||||
@@ -1008,13 +1008,9 @@ async def test_color_strip_ws(
|
||||
if scale < 1.0:
|
||||
new_w = max(1, int(w * scale))
|
||||
new_h = max(1, int(h * scale))
|
||||
pil = _PIL_Image.fromarray(img).resize((new_w, new_h), _PIL_Image.LANCZOS)
|
||||
else:
|
||||
pil = _PIL_Image.fromarray(img)
|
||||
buf = _io.BytesIO()
|
||||
pil.save(buf, format='JPEG', quality=70)
|
||||
img = _cv2.resize(img, (new_w, new_h), interpolation=_cv2.INTER_AREA)
|
||||
# Wire format: [0xFD] [jpeg_bytes]
|
||||
await websocket.send_bytes(b'\xfd' + buf.getvalue())
|
||||
await websocket.send_bytes(b'\xfd' + encode_jpeg(img, quality=70))
|
||||
except Exception as e:
|
||||
logger.warning(f"JPEG frame preview error: {e}")
|
||||
|
||||
|
||||
@@ -4,13 +4,10 @@ Extracted from output_targets.py to keep files under 800 lines.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.api.auth import AuthRequired
|
||||
from wled_controller.api.dependencies import (
|
||||
@@ -133,19 +130,21 @@ async def test_kc_target(
|
||||
|
||||
raw_stream = chain["raw_stream"]
|
||||
|
||||
from wled_controller.utils.image_codec import load_image_bytes, load_image_file
|
||||
|
||||
if isinstance(raw_stream, StaticImagePictureSource):
|
||||
source = raw_stream.image_source
|
||||
if source.startswith(("http://", "https://")):
|
||||
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
|
||||
resp = await client.get(source)
|
||||
resp.raise_for_status()
|
||||
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
|
||||
image = load_image_bytes(resp.content)
|
||||
else:
|
||||
from pathlib import Path
|
||||
path = Path(source)
|
||||
if not path.exists():
|
||||
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
|
||||
pil_image = Image.open(path).convert("RGB")
|
||||
image = load_image_file(path)
|
||||
|
||||
elif isinstance(raw_stream, ScreenCapturePictureSource):
|
||||
try:
|
||||
@@ -186,17 +185,15 @@ async def test_kc_target(
|
||||
if screen_capture is None:
|
||||
raise RuntimeError("No frame captured")
|
||||
|
||||
if isinstance(screen_capture.image, np.ndarray):
|
||||
pil_image = Image.fromarray(screen_capture.image)
|
||||
else:
|
||||
if not isinstance(screen_capture.image, np.ndarray):
|
||||
raise ValueError("Unexpected image format from engine")
|
||||
image = screen_capture.image
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="Unsupported picture source type")
|
||||
|
||||
# 3b. Apply postprocessing filters (if the picture source has a filter chain)
|
||||
pp_template_ids = chain.get("postprocessing_template_ids", [])
|
||||
if pp_template_ids and pp_template_store:
|
||||
img_array = np.array(pil_image)
|
||||
image_pool = ImagePool()
|
||||
for pp_id in pp_template_ids:
|
||||
try:
|
||||
@@ -208,15 +205,14 @@ async def test_kc_target(
|
||||
for fi in flat_filters:
|
||||
try:
|
||||
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
|
||||
result = f.process_image(img_array, image_pool)
|
||||
result = f.process_image(image, image_pool)
|
||||
if result is not None:
|
||||
img_array = result
|
||||
image = result
|
||||
except ValueError:
|
||||
logger.warning(f"KC test: unknown filter '{fi.filter_id}', skipping")
|
||||
pil_image = Image.fromarray(img_array)
|
||||
|
||||
# 4. Extract colors from each rectangle
|
||||
img_array = np.array(pil_image)
|
||||
img_array = image
|
||||
h, w = img_array.shape[:2]
|
||||
|
||||
calc_fns = {
|
||||
@@ -250,11 +246,8 @@ async def test_kc_target(
|
||||
))
|
||||
|
||||
# 5. Encode frame as base64 JPEG
|
||||
full_buffer = io.BytesIO()
|
||||
pil_image.save(full_buffer, format='JPEG', quality=90)
|
||||
full_buffer.seek(0)
|
||||
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
|
||||
image_data_uri = f"data:image/jpeg;base64,{full_b64}"
|
||||
from wled_controller.utils.image_codec import encode_jpeg_data_uri
|
||||
image_data_uri = encode_jpeg_data_uri(image, quality=90)
|
||||
|
||||
return KCTestResponse(
|
||||
image=image_data_uri,
|
||||
@@ -411,8 +404,11 @@ async def test_kc_target_ws(
|
||||
continue
|
||||
prev_frame_ref = capture
|
||||
|
||||
pil_image = Image.fromarray(capture.image) if isinstance(capture.image, np.ndarray) else None
|
||||
if pil_image is None:
|
||||
if not isinstance(capture.image, np.ndarray):
|
||||
await asyncio.sleep(frame_interval)
|
||||
continue
|
||||
cur_image = capture.image
|
||||
if cur_image is None:
|
||||
await asyncio.sleep(frame_interval)
|
||||
continue
|
||||
|
||||
@@ -420,7 +416,6 @@ async def test_kc_target_ws(
|
||||
chain = source_store_inst.resolve_stream_chain(target.picture_source_id)
|
||||
pp_template_ids = chain.get("postprocessing_template_ids", [])
|
||||
if pp_template_ids and pp_template_store_inst:
|
||||
img_array = np.array(pil_image)
|
||||
image_pool = ImagePool()
|
||||
for pp_id in pp_template_ids:
|
||||
try:
|
||||
@@ -431,15 +426,14 @@ async def test_kc_target_ws(
|
||||
for fi in flat_filters:
|
||||
try:
|
||||
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
|
||||
result = f.process_image(img_array, image_pool)
|
||||
result = f.process_image(cur_image, image_pool)
|
||||
if result is not None:
|
||||
img_array = result
|
||||
cur_image = result
|
||||
except ValueError:
|
||||
pass
|
||||
pil_image = Image.fromarray(img_array)
|
||||
|
||||
# Extract colors
|
||||
img_array = np.array(pil_image)
|
||||
img_array = cur_image
|
||||
h, w = img_array.shape[:2]
|
||||
|
||||
result_rects = []
|
||||
@@ -466,18 +460,13 @@ async def test_kc_target_ws(
|
||||
})
|
||||
|
||||
# Encode frame as JPEG
|
||||
if preview_width and pil_image.width > preview_width:
|
||||
ratio = preview_width / pil_image.width
|
||||
thumb = pil_image.resize((preview_width, int(pil_image.height * ratio)), Image.LANCZOS)
|
||||
else:
|
||||
thumb = pil_image
|
||||
buf = io.BytesIO()
|
||||
thumb.save(buf, format="JPEG", quality=85)
|
||||
b64 = base64.b64encode(buf.getvalue()).decode()
|
||||
from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down
|
||||
frame_to_encode = resize_down(cur_image, preview_width) if preview_width else cur_image
|
||||
frame_uri = encode_jpeg_data_uri(frame_to_encode, quality=85)
|
||||
|
||||
await websocket.send_text(_json.dumps({
|
||||
"type": "frame",
|
||||
"image": f"data:image/jpeg;base64,{b64}",
|
||||
"image": frame_uri,
|
||||
"rectangles": result_rects,
|
||||
"pattern_template_name": pattern_tmpl.name,
|
||||
"interpolation_mode": settings.interpolation_mode,
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
"""Picture source routes."""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import time
|
||||
|
||||
import httpx
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
|
||||
from fastapi.responses import Response
|
||||
|
||||
@@ -115,16 +112,20 @@ async def validate_image(
|
||||
img_bytes = path
|
||||
|
||||
def _process_image(src):
|
||||
pil_image = Image.open(io.BytesIO(src) if isinstance(src, bytes) else src)
|
||||
pil_image = pil_image.convert("RGB")
|
||||
width, height = pil_image.size
|
||||
thumb = pil_image.copy()
|
||||
thumb.thumbnail((320, 320), Image.Resampling.LANCZOS)
|
||||
buf = io.BytesIO()
|
||||
thumb.save(buf, format="JPEG", quality=80)
|
||||
buf.seek(0)
|
||||
preview = f"data:image/jpeg;base64,{base64.b64encode(buf.getvalue()).decode()}"
|
||||
return width, height, preview
|
||||
from wled_controller.utils.image_codec import (
|
||||
encode_jpeg_data_uri,
|
||||
load_image_bytes,
|
||||
load_image_file,
|
||||
thumbnail as make_thumbnail,
|
||||
)
|
||||
if isinstance(src, bytes):
|
||||
image = load_image_bytes(src)
|
||||
else:
|
||||
image = load_image_file(src)
|
||||
h, w = image.shape[:2]
|
||||
thumb = make_thumbnail(image, 320)
|
||||
preview = encode_jpeg_data_uri(thumb, quality=80)
|
||||
return w, h, preview
|
||||
|
||||
width, height, preview = await asyncio.to_thread(_process_image, img_bytes)
|
||||
|
||||
@@ -161,11 +162,12 @@ async def get_full_image(
|
||||
img_bytes = path
|
||||
|
||||
def _encode_full(src):
|
||||
pil_image = Image.open(io.BytesIO(src) if isinstance(src, bytes) else src)
|
||||
pil_image = pil_image.convert("RGB")
|
||||
buf = io.BytesIO()
|
||||
pil_image.save(buf, format="JPEG", quality=90)
|
||||
return buf.getvalue()
|
||||
from wled_controller.utils.image_codec import encode_jpeg, load_image_bytes, load_image_file
|
||||
if isinstance(src, bytes):
|
||||
image = load_image_bytes(src)
|
||||
else:
|
||||
image = load_image_file(src)
|
||||
return encode_jpeg(image, quality=90)
|
||||
|
||||
jpeg_bytes = await asyncio.to_thread(_encode_full, img_bytes)
|
||||
return Response(content=jpeg_bytes, media_type="image/jpeg")
|
||||
@@ -333,13 +335,9 @@ async def get_video_thumbnail(
|
||||
store: PictureSourceStore = Depends(get_picture_source_store),
|
||||
):
|
||||
"""Get a thumbnail for a video picture source (first frame)."""
|
||||
import base64
|
||||
from io import BytesIO
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.core.processing.video_stream import extract_thumbnail
|
||||
from wled_controller.storage.picture_source import VideoCaptureSource
|
||||
from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down
|
||||
|
||||
try:
|
||||
source = store.get_stream(stream_id)
|
||||
@@ -352,18 +350,12 @@ async def get_video_thumbnail(
|
||||
if frame is None:
|
||||
raise HTTPException(status_code=404, detail="Could not extract thumbnail")
|
||||
|
||||
# Encode as JPEG
|
||||
pil_img = Image.fromarray(frame)
|
||||
# Resize to max 320px wide for thumbnail
|
||||
if pil_img.width > 320:
|
||||
ratio = 320 / pil_img.width
|
||||
pil_img = pil_img.resize((320, int(pil_img.height * ratio)), Image.LANCZOS)
|
||||
frame = resize_down(frame, 320)
|
||||
h, w = frame.shape[:2]
|
||||
data_uri = encode_jpeg_data_uri(frame, quality=80)
|
||||
|
||||
buf = BytesIO()
|
||||
pil_img.save(buf, format="JPEG", quality=80)
|
||||
b64 = base64.b64encode(buf.getvalue()).decode()
|
||||
|
||||
return {"thumbnail": f"data:image/jpeg;base64,{b64}", "width": pil_img.width, "height": pil_img.height}
|
||||
return {"thumbnail": data_uri, "width": w, "height": h}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
@@ -408,16 +400,18 @@ async def test_picture_source(
|
||||
source = raw_stream.image_source
|
||||
start_time = time.perf_counter()
|
||||
|
||||
from wled_controller.utils.image_codec import load_image_bytes, load_image_file
|
||||
|
||||
if source.startswith(("http://", "https://")):
|
||||
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
|
||||
resp = await client.get(source)
|
||||
resp.raise_for_status()
|
||||
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
|
||||
image = load_image_bytes(resp.content)
|
||||
else:
|
||||
path = Path(source)
|
||||
if not path.exists():
|
||||
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
|
||||
pil_image = await asyncio.to_thread(lambda: Image.open(path).convert("RGB"))
|
||||
image = await asyncio.to_thread(load_image_file, path)
|
||||
|
||||
actual_duration = time.perf_counter() - start_time
|
||||
frame_count = 1
|
||||
@@ -479,12 +473,13 @@ async def test_picture_source(
|
||||
if last_frame is None:
|
||||
raise RuntimeError("No frames captured during test")
|
||||
|
||||
if isinstance(last_frame.image, np.ndarray):
|
||||
pil_image = Image.fromarray(last_frame.image)
|
||||
else:
|
||||
if not isinstance(last_frame.image, np.ndarray):
|
||||
raise ValueError("Unexpected image format from engine")
|
||||
image = last_frame.image
|
||||
|
||||
# Create thumbnail + encode (CPU-bound — run in thread)
|
||||
from wled_controller.utils.image_codec import encode_jpeg_data_uri, thumbnail as make_thumbnail
|
||||
|
||||
pp_template_ids = chain["postprocessing_template_ids"]
|
||||
flat_filters = None
|
||||
if pp_template_ids:
|
||||
@@ -494,45 +489,33 @@ async def test_picture_source(
|
||||
except ValueError:
|
||||
logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview")
|
||||
|
||||
def _create_thumbnails_and_encode(pil_img, filters):
|
||||
thumbnail_w = 640
|
||||
aspect_ratio = pil_img.height / pil_img.width
|
||||
thumbnail_h = int(thumbnail_w * aspect_ratio)
|
||||
thumb = pil_img.copy()
|
||||
thumb.thumbnail((thumbnail_w, thumbnail_h), Image.Resampling.LANCZOS)
|
||||
def _create_thumbnails_and_encode(img, filters):
|
||||
thumb = make_thumbnail(img, 640)
|
||||
|
||||
if filters:
|
||||
pool = ImagePool()
|
||||
def apply_filters(img):
|
||||
arr = np.array(img)
|
||||
def apply_filters(arr):
|
||||
for fi in filters:
|
||||
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
|
||||
result = f.process_image(arr, pool)
|
||||
if result is not None:
|
||||
arr = result
|
||||
return Image.fromarray(arr)
|
||||
return arr
|
||||
thumb = apply_filters(thumb)
|
||||
pil_img = apply_filters(pil_img)
|
||||
img = apply_filters(img)
|
||||
|
||||
img_buffer = io.BytesIO()
|
||||
thumb.save(img_buffer, format='JPEG', quality=85)
|
||||
thumb_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
|
||||
thumb_uri = encode_jpeg_data_uri(thumb, quality=85)
|
||||
full_uri = encode_jpeg_data_uri(img, quality=90)
|
||||
th, tw = thumb.shape[:2]
|
||||
return tw, th, thumb_uri, full_uri
|
||||
|
||||
full_buffer = io.BytesIO()
|
||||
pil_img.save(full_buffer, format='JPEG', quality=90)
|
||||
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
|
||||
|
||||
return thumbnail_w, thumbnail_h, thumb_b64, full_b64
|
||||
|
||||
thumbnail_width, thumbnail_height, thumbnail_b64, full_b64 = await asyncio.to_thread(
|
||||
_create_thumbnails_and_encode, pil_image, flat_filters
|
||||
thumbnail_width, thumbnail_height, thumbnail_data_uri, full_data_uri = await asyncio.to_thread(
|
||||
_create_thumbnails_and_encode, image, flat_filters
|
||||
)
|
||||
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
|
||||
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
|
||||
|
||||
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
|
||||
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
|
||||
width, height = pil_image.size
|
||||
height, width = image.shape[:2]
|
||||
|
||||
return TemplateTestResponse(
|
||||
full_capture=CaptureImage(
|
||||
@@ -635,15 +618,11 @@ async def test_picture_source_ws(
|
||||
|
||||
def _encode_video_frame(image, pw):
|
||||
"""Encode numpy RGB image as JPEG base64 data URI."""
|
||||
from PIL import Image as PILImage
|
||||
pil = PILImage.fromarray(image)
|
||||
if pw and pil.width > pw:
|
||||
ratio = pw / pil.width
|
||||
pil = pil.resize((pw, int(pil.height * ratio)), PILImage.LANCZOS)
|
||||
buf = io.BytesIO()
|
||||
pil.save(buf, format="JPEG", quality=80)
|
||||
b64 = base64.b64encode(buf.getvalue()).decode()
|
||||
return f"data:image/jpeg;base64,{b64}", pil.width, pil.height
|
||||
from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down
|
||||
if pw:
|
||||
image = resize_down(image, pw)
|
||||
h, w = image.shape[:2]
|
||||
return encode_jpeg_data_uri(image, quality=80), w, h
|
||||
|
||||
try:
|
||||
await asyncio.get_event_loop().run_in_executor(None, video_stream.start)
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
"""Postprocessing template routes."""
|
||||
|
||||
import base64
|
||||
import io
|
||||
import time
|
||||
|
||||
import httpx
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
|
||||
|
||||
from wled_controller.api.auth import AuthRequired
|
||||
@@ -198,6 +195,13 @@ async def test_pp_template(
|
||||
|
||||
raw_stream = chain["raw_stream"]
|
||||
|
||||
from wled_controller.utils.image_codec import (
|
||||
encode_jpeg_data_uri,
|
||||
load_image_bytes,
|
||||
load_image_file,
|
||||
thumbnail as make_thumbnail,
|
||||
)
|
||||
|
||||
if isinstance(raw_stream, StaticImagePictureSource):
|
||||
# Static image: load directly
|
||||
from pathlib import Path
|
||||
@@ -209,12 +213,12 @@ async def test_pp_template(
|
||||
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
|
||||
resp = await client.get(source)
|
||||
resp.raise_for_status()
|
||||
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
|
||||
image = load_image_bytes(resp.content)
|
||||
else:
|
||||
path = Path(source)
|
||||
if not path.exists():
|
||||
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
|
||||
pil_image = Image.open(path).convert("RGB")
|
||||
image = load_image_file(path)
|
||||
|
||||
actual_duration = time.perf_counter() - start_time
|
||||
frame_count = 1
|
||||
@@ -268,53 +272,37 @@ async def test_pp_template(
|
||||
if last_frame is None:
|
||||
raise RuntimeError("No frames captured during test")
|
||||
|
||||
if isinstance(last_frame.image, np.ndarray):
|
||||
pil_image = Image.fromarray(last_frame.image)
|
||||
else:
|
||||
if not isinstance(last_frame.image, np.ndarray):
|
||||
raise ValueError("Unexpected image format from engine")
|
||||
image = last_frame.image
|
||||
|
||||
# Create thumbnail
|
||||
thumbnail_width = 640
|
||||
aspect_ratio = pil_image.height / pil_image.width
|
||||
thumbnail_height = int(thumbnail_width * aspect_ratio)
|
||||
thumbnail = pil_image.copy()
|
||||
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
|
||||
thumb = make_thumbnail(image, 640)
|
||||
|
||||
# Apply postprocessing filters (expand filter_template references)
|
||||
flat_filters = pp_store.resolve_filter_instances(pp_template.filters)
|
||||
if flat_filters:
|
||||
pool = ImagePool()
|
||||
|
||||
def apply_filters(img):
|
||||
arr = np.array(img)
|
||||
def apply_filters(arr):
|
||||
for fi in flat_filters:
|
||||
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
|
||||
result = f.process_image(arr, pool)
|
||||
if result is not None:
|
||||
arr = result
|
||||
return Image.fromarray(arr)
|
||||
return arr
|
||||
|
||||
thumbnail = apply_filters(thumbnail)
|
||||
pil_image = apply_filters(pil_image)
|
||||
thumb = apply_filters(thumb)
|
||||
image = apply_filters(image)
|
||||
|
||||
# Encode thumbnail
|
||||
img_buffer = io.BytesIO()
|
||||
thumbnail.save(img_buffer, format='JPEG', quality=85)
|
||||
img_buffer.seek(0)
|
||||
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
|
||||
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
|
||||
|
||||
# Encode full-resolution image
|
||||
full_buffer = io.BytesIO()
|
||||
pil_image.save(full_buffer, format='JPEG', quality=90)
|
||||
full_buffer.seek(0)
|
||||
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
|
||||
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
|
||||
# Encode as JPEG
|
||||
thumbnail_data_uri = encode_jpeg_data_uri(thumb, quality=85)
|
||||
full_data_uri = encode_jpeg_data_uri(image, quality=90)
|
||||
|
||||
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
|
||||
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
|
||||
width, height = pil_image.size
|
||||
thumb_w, thumb_h = thumbnail.size
|
||||
height, width = image.shape[:2]
|
||||
thumb_h, thumb_w = thumb.shape[:2]
|
||||
|
||||
return TemplateTestResponse(
|
||||
full_capture=CaptureImage(
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
"""Capture template, engine, and filter routes."""
|
||||
|
||||
import base64
|
||||
import io
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
|
||||
|
||||
from wled_controller.api.auth import AuthRequired
|
||||
@@ -320,38 +317,28 @@ def test_template(
|
||||
if last_frame is None:
|
||||
raise RuntimeError("No frames captured during test")
|
||||
|
||||
# Convert numpy array to PIL Image
|
||||
if isinstance(last_frame.image, np.ndarray):
|
||||
pil_image = Image.fromarray(last_frame.image)
|
||||
else:
|
||||
if not isinstance(last_frame.image, np.ndarray):
|
||||
raise ValueError("Unexpected image format from engine")
|
||||
image = last_frame.image
|
||||
|
||||
from wled_controller.utils.image_codec import (
|
||||
encode_jpeg_data_uri,
|
||||
thumbnail as make_thumbnail,
|
||||
)
|
||||
|
||||
# Create thumbnail (640px wide, maintain aspect ratio)
|
||||
thumbnail_width = 640
|
||||
aspect_ratio = pil_image.height / pil_image.width
|
||||
thumbnail_height = int(thumbnail_width * aspect_ratio)
|
||||
thumbnail = pil_image.copy()
|
||||
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
|
||||
thumb = make_thumbnail(image, 640)
|
||||
thumb_h, thumb_w = thumb.shape[:2]
|
||||
|
||||
# Encode thumbnail as JPEG
|
||||
img_buffer = io.BytesIO()
|
||||
thumbnail.save(img_buffer, format='JPEG', quality=85)
|
||||
img_buffer.seek(0)
|
||||
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
|
||||
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
|
||||
|
||||
# Encode full-resolution image as JPEG
|
||||
full_buffer = io.BytesIO()
|
||||
pil_image.save(full_buffer, format='JPEG', quality=90)
|
||||
full_buffer.seek(0)
|
||||
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
|
||||
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
|
||||
# Encode as JPEG
|
||||
thumbnail_data_uri = encode_jpeg_data_uri(thumb, quality=85)
|
||||
full_data_uri = encode_jpeg_data_uri(image, quality=90)
|
||||
|
||||
# Calculate metrics
|
||||
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
|
||||
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
|
||||
|
||||
width, height = pil_image.size
|
||||
height, width = image.shape[:2]
|
||||
|
||||
return TemplateTestResponse(
|
||||
full_capture=CaptureImage(
|
||||
@@ -359,8 +346,8 @@ def test_template(
|
||||
full_image=full_data_uri,
|
||||
width=width,
|
||||
height=height,
|
||||
thumbnail_width=thumbnail_width,
|
||||
thumbnail_height=thumbnail_height,
|
||||
thumbnail_width=thumb_w,
|
||||
thumbnail_height=thumb_h,
|
||||
),
|
||||
border_extraction=None,
|
||||
performance=PerformanceMetrics(
|
||||
|
||||
@@ -12,7 +12,6 @@ Prerequisites (system binaries, NOT Python packages):
|
||||
- adb (bundled with scrcpy, or Android SDK Platform-Tools)
|
||||
"""
|
||||
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
@@ -22,7 +21,8 @@ import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.utils.image_codec import load_image_bytes
|
||||
|
||||
from wled_controller.core.capture_engines.base import (
|
||||
CaptureEngine,
|
||||
@@ -144,8 +144,7 @@ def _screencap_once(adb: str, serial: str) -> Optional[np.ndarray]:
|
||||
if result.returncode != 0 or len(result.stdout) < 100:
|
||||
return None
|
||||
|
||||
img = Image.open(io.BytesIO(result.stdout))
|
||||
return np.asarray(img.convert("RGB"))
|
||||
return load_image_bytes(result.stdout)
|
||||
except Exception as e:
|
||||
logger.debug(f"screencap failed for {serial}: {e}")
|
||||
return None
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
|
||||
from wled_controller.core.filters.image_pool import ImagePool
|
||||
@@ -44,8 +44,7 @@ class DownscalerFilter(PostprocessingFilter):
|
||||
if new_h == h and new_w == w:
|
||||
return None
|
||||
|
||||
pil_img = Image.fromarray(image)
|
||||
downscaled = np.array(pil_img.resize((new_w, new_h), Image.LANCZOS))
|
||||
downscaled = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
||||
|
||||
result = image_pool.acquire(new_h, new_w, image.shape[2] if image.ndim == 3 else 3)
|
||||
np.copyto(result, downscaled)
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
|
||||
from wled_controller.core.filters.image_pool import ImagePool
|
||||
@@ -42,9 +42,8 @@ class PixelateFilter(PostprocessingFilter):
|
||||
# vectorized C++ instead of per-block Python loop
|
||||
small_w = max(1, w // block_size)
|
||||
small_h = max(1, h // block_size)
|
||||
pil_img = Image.fromarray(image)
|
||||
small = pil_img.resize((small_w, small_h), Image.LANCZOS)
|
||||
pixelated = np.array(small.resize((w, h), Image.NEAREST))
|
||||
small = cv2.resize(image, (small_w, small_h), interpolation=cv2.INTER_AREA)
|
||||
pixelated = cv2.resize(small, (w, h), interpolation=cv2.INTER_NEAREST)
|
||||
np.copyto(image, pixelated)
|
||||
|
||||
return None
|
||||
|
||||
@@ -9,8 +9,8 @@ import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.core.processing.live_stream import LiveStream
|
||||
from wled_controller.core.capture.screen_capture import (
|
||||
@@ -46,8 +46,7 @@ def _process_kc_frame(capture, rect_names, rect_bounds, calc_fn, prev_colors_arr
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Downsample to working resolution — 144x fewer pixels at 1080p
|
||||
pil_img = Image.fromarray(capture.image)
|
||||
small = np.array(pil_img.resize(KC_WORK_SIZE, Image.LANCZOS))
|
||||
small = cv2.resize(capture.image, KC_WORK_SIZE, interpolation=cv2.INTER_AREA)
|
||||
|
||||
# Extract colors for each rectangle from the small image
|
||||
n = len(rect_names)
|
||||
|
||||
@@ -311,20 +311,16 @@ class LiveStreamManager:
|
||||
This is acceptable because acquire() (the only caller chain) is always
|
||||
invoked from background worker threads, never from the async event loop.
|
||||
"""
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image
|
||||
from wled_controller.utils.image_codec import load_image_bytes, load_image_file
|
||||
|
||||
if image_source.startswith(("http://", "https://")):
|
||||
response = httpx.get(image_source, timeout=15.0, follow_redirects=True)
|
||||
response.raise_for_status()
|
||||
pil_image = Image.open(BytesIO(response.content))
|
||||
return load_image_bytes(response.content)
|
||||
else:
|
||||
path = Path(image_source)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Image file not found: {image_source}")
|
||||
pil_image = Image.open(path)
|
||||
|
||||
pil_image = pil_image.convert("RGB")
|
||||
return np.array(pil_image)
|
||||
return load_image_file(path)
|
||||
|
||||
@@ -7,17 +7,16 @@ from pathlib import Path
|
||||
from tkinter import messagebox
|
||||
from typing import Callable
|
||||
|
||||
from PIL import Image
|
||||
|
||||
try:
|
||||
import pystray
|
||||
from PIL import Image
|
||||
|
||||
PYSTRAY_AVAILABLE = True
|
||||
except ImportError:
|
||||
PYSTRAY_AVAILABLE = False
|
||||
|
||||
|
||||
def _load_icon(icon_path: Path) -> Image.Image:
|
||||
def _load_icon(icon_path: Path) -> "Image.Image":
|
||||
"""Load tray icon from PNG, with a solid-color fallback."""
|
||||
if icon_path.exists():
|
||||
return Image.open(icon_path)
|
||||
|
||||
91
server/src/wled_controller/utils/image_codec.py
Normal file
91
server/src/wled_controller/utils/image_codec.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Image encoding/decoding/resizing utilities using OpenCV.
|
||||
|
||||
Replaces PIL/Pillow for JPEG encoding, image loading, and resizing operations.
|
||||
All functions work with numpy RGB arrays (H, W, 3) uint8.
|
||||
"""
|
||||
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Union
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def encode_jpeg(image: np.ndarray, quality: int = 85) -> bytes:
|
||||
"""Encode an RGB numpy array as JPEG bytes."""
|
||||
bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
|
||||
ok, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, quality])
|
||||
if not ok:
|
||||
raise RuntimeError("JPEG encoding failed")
|
||||
return buf.tobytes()
|
||||
|
||||
|
||||
def encode_jpeg_data_uri(image: np.ndarray, quality: int = 85) -> str:
|
||||
"""Encode an RGB numpy array as a JPEG base64 data URI."""
|
||||
raw = encode_jpeg(image, quality)
|
||||
b64 = base64.b64encode(raw).decode("utf-8")
|
||||
return f"data:image/jpeg;base64,{b64}"
|
||||
|
||||
|
||||
def resize_image(image: np.ndarray, width: int, height: int) -> np.ndarray:
|
||||
"""Resize an image to exact dimensions.
|
||||
|
||||
Uses INTER_AREA for downscaling (better quality, faster) and
|
||||
INTER_LANCZOS4 for upscaling.
|
||||
"""
|
||||
h, w = image.shape[:2]
|
||||
shrinking = (width * height) < (w * h)
|
||||
interp = cv2.INTER_AREA if shrinking else cv2.INTER_LANCZOS4
|
||||
return cv2.resize(image, (width, height), interpolation=interp)
|
||||
|
||||
|
||||
def thumbnail(image: np.ndarray, max_width: int) -> np.ndarray:
|
||||
"""Create a thumbnail that fits within max_width, preserving aspect ratio.
|
||||
|
||||
Uses INTER_AREA (optimal for downscaling).
|
||||
"""
|
||||
h, w = image.shape[:2]
|
||||
if w <= max_width:
|
||||
return image.copy()
|
||||
scale = max_width / w
|
||||
new_w = max_width
|
||||
new_h = max(1, int(h * scale))
|
||||
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
||||
|
||||
|
||||
def resize_down(image: np.ndarray, max_width: int) -> np.ndarray:
|
||||
"""Downscale if wider than max_width; return as-is otherwise.
|
||||
|
||||
Uses INTER_AREA (optimal for downscaling).
|
||||
"""
|
||||
h, w = image.shape[:2]
|
||||
if w <= max_width:
|
||||
return image
|
||||
scale = max_width / w
|
||||
new_w = max_width
|
||||
new_h = max(1, int(h * scale))
|
||||
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
||||
|
||||
|
||||
def load_image_file(path: Union[str, Path]) -> np.ndarray:
|
||||
"""Load an image file and return as RGB numpy array."""
|
||||
path = str(path)
|
||||
bgr = cv2.imread(path, cv2.IMREAD_COLOR)
|
||||
if bgr is None:
|
||||
raise FileNotFoundError(f"Cannot load image: {path}")
|
||||
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
|
||||
|
||||
|
||||
def load_image_bytes(data: bytes) -> np.ndarray:
|
||||
"""Decode image bytes (JPEG, PNG, etc.) and return as RGB numpy array."""
|
||||
arr = np.frombuffer(data, dtype=np.uint8)
|
||||
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
||||
if bgr is None:
|
||||
raise ValueError("Cannot decode image data")
|
||||
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
|
||||
|
||||
|
||||
def image_size(image: np.ndarray) -> Tuple[int, int]:
|
||||
"""Return (width, height) of an image array."""
|
||||
return image.shape[1], image.shape[0]
|
||||
Reference in New Issue
Block a user