feat: reduce build size — replace Pillow with cv2, refactor build scripts
All checks were successful
Build Release / create-release (push) Successful in 1s
Build Release / build-docker (push) Successful in 42s
Lint & Test / test (push) Successful in 2m50s
Build Release / build-windows (push) Successful in 3m27s
Build Release / build-linux (push) Successful in 1m59s

- Create utils/image_codec.py with cv2-based image helpers
- Replace PIL usage across all routes, filters, and engines with cv2
- Move Pillow from core deps to [tray] optional in pyproject.toml
- Extract shared build logic into build-common.sh (detect_version, cleanup, etc.)
- Strip unused NumPy/PIL/zeroconf/debug files in build scripts
This commit is contained in:
2026-03-25 14:18:16 +03:00
parent 7da5084337
commit 7939322a7f
18 changed files with 444 additions and 377 deletions

View File

@@ -28,7 +28,6 @@ dependencies = [
"httpx>=0.27.2",
"packaging>=23.0",
"mss>=9.0.2",
"Pillow>=10.4.0",
"numpy>=2.1.3",
"pydantic>=2.9.2",
"pydantic-settings>=2.6.0",
@@ -46,6 +45,7 @@ dependencies = [
"sounddevice>=0.5",
"aiomqtt>=2.0.0",
"openrgb-python>=0.2.15",
"opencv-python-headless>=4.8.0",
]
[project.optional-dependencies]
@@ -57,9 +57,11 @@ dev = [
"black>=24.0.0",
"ruff>=0.6.0",
"opencv-python-headless>=4.8.0",
"Pillow>=10.4.0",
]
camera = [
"opencv-python-headless>=4.8.0",
# opencv-python-headless is now a core dependency (used for image encoding)
# camera extra kept for backwards compatibility
]
# OS notification capture (winrt packages are ~2.5MB total vs winsdk's ~35MB)
notifications = [
@@ -78,6 +80,7 @@ perf = [
]
tray = [
"pystray>=0.19.0; sys_platform == 'win32'",
"Pillow>=10.4.0; sys_platform == 'win32'",
]
[project.urls]

View File

@@ -1,18 +1,21 @@
"""Shared helpers for WebSocket-based capture preview endpoints."""
import asyncio
import base64
import io
import threading
import time
from typing import Callable, Optional
import numpy as np
from PIL import Image
from starlette.websockets import WebSocket
from wled_controller.core.filters import FilterRegistry, ImagePool
from wled_controller.utils import get_logger
from wled_controller.utils.image_codec import (
encode_jpeg,
encode_jpeg_data_uri,
resize_down,
thumbnail,
)
logger = get_logger(__name__)
@@ -32,47 +35,35 @@ def authenticate_ws_token(token: str) -> bool:
return verify_ws_token(token)
def _encode_jpeg(pil_image: Image.Image, quality: int = 85) -> str:
"""Encode a PIL image as a JPEG base64 data URI."""
buf = io.BytesIO()
pil_image.save(buf, format="JPEG", quality=quality)
buf.seek(0)
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
return f"data:image/jpeg;base64,{b64}"
def _encode_jpeg(image: np.ndarray, quality: int = 85) -> str:
"""Encode a numpy RGB image as a JPEG base64 data URI."""
return encode_jpeg_data_uri(image, quality)
def encode_preview_frame(image: np.ndarray, max_width: int = None, quality: int = 80) -> bytes:
"""Encode a numpy RGB image to JPEG bytes, optionally downscaling."""
pil_img = Image.fromarray(image)
if max_width and image.shape[1] > max_width:
scale = max_width / image.shape[1]
new_h = int(image.shape[0] * scale)
pil_img = pil_img.resize((max_width, new_h), Image.LANCZOS)
buf = io.BytesIO()
pil_img.save(buf, format="JPEG", quality=quality)
return buf.getvalue()
if max_width:
image = resize_down(image, max_width)
return encode_jpeg(image, quality)
def _make_thumbnail(pil_image: Image.Image, max_width: int) -> Image.Image:
def _make_thumbnail(image: np.ndarray, max_width: int) -> np.ndarray:
"""Create a thumbnail copy of the image, preserving aspect ratio."""
thumb = pil_image.copy()
aspect = pil_image.height / pil_image.width
thumb.thumbnail((max_width, int(max_width * aspect)), Image.Resampling.LANCZOS)
return thumb
return thumbnail(image, max_width)
def _apply_pp_filters(pil_image: Image.Image, flat_filters: list) -> Image.Image:
"""Apply postprocessing filter instances to a PIL image."""
def _apply_pp_filters(image: np.ndarray, flat_filters: list) -> np.ndarray:
"""Apply postprocessing filter instances to a numpy image."""
if not flat_filters:
return pil_image
return image
pool = ImagePool()
arr = np.array(pil_image)
arr = image
for fi in flat_filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
return arr
async def stream_capture_test(
@@ -98,7 +89,7 @@ async def stream_capture_test(
thumb_width = preview_width or PREVIEW_MAX_WIDTH
# Shared state between capture thread and async loop
latest_frame = None # PIL Image (converted from numpy)
latest_frame = None # numpy RGB array
frame_count = 0
total_capture_time = 0.0
stop_event = threading.Event()
@@ -121,9 +112,8 @@ async def stream_capture_test(
continue
total_capture_time += t1 - t0
frame_count += 1
# Convert numpy -> PIL once in the capture thread
if isinstance(capture.image, np.ndarray):
latest_frame = Image.fromarray(capture.image)
latest_frame = capture.image
else:
latest_frame = capture.image
except Exception as e:
@@ -202,7 +192,7 @@ async def stream_capture_test(
if pp_filters:
final_frame = _apply_pp_filters(final_frame, pp_filters)
w, h = final_frame.size
h, w = final_frame.shape[:2]
full_uri = _encode_jpeg(final_frame, FINAL_JPEG_QUALITY)
thumb = _make_thumbnail(final_frame, FINAL_THUMBNAIL_WIDTH)

View File

@@ -1,7 +1,6 @@
"""Color strip source routes: CRUD, calibration test, preview, and API input push."""
import asyncio
import io as _io
import json as _json
import time as _time
import uuid as _uuid
@@ -989,7 +988,8 @@ async def test_color_strip_ws(
try:
frame = _frame_live.get_latest_frame()
if frame is not None and frame.image is not None:
from PIL import Image as _PIL_Image
from wled_controller.utils.image_codec import encode_jpeg
import cv2 as _cv2
img = frame.image
# Ensure 3-channel RGB (some engines may produce BGRA)
if img.ndim == 3 and img.shape[2] == 4:
@@ -1008,13 +1008,9 @@ async def test_color_strip_ws(
if scale < 1.0:
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
pil = _PIL_Image.fromarray(img).resize((new_w, new_h), _PIL_Image.LANCZOS)
else:
pil = _PIL_Image.fromarray(img)
buf = _io.BytesIO()
pil.save(buf, format='JPEG', quality=70)
img = _cv2.resize(img, (new_w, new_h), interpolation=_cv2.INTER_AREA)
# Wire format: [0xFD] [jpeg_bytes]
await websocket.send_bytes(b'\xfd' + buf.getvalue())
await websocket.send_bytes(b'\xfd' + encode_jpeg(img, quality=70))
except Exception as e:
logger.warning(f"JPEG frame preview error: {e}")

View File

@@ -4,13 +4,10 @@ Extracted from output_targets.py to keep files under 800 lines.
"""
import asyncio
import base64
import io
import time
import numpy as np
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
from PIL import Image
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
@@ -133,19 +130,21 @@ async def test_kc_target(
raw_stream = chain["raw_stream"]
from wled_controller.utils.image_codec import load_image_bytes, load_image_file
if isinstance(raw_stream, StaticImagePictureSource):
source = raw_stream.image_source
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
image = load_image_bytes(resp.content)
else:
from pathlib import Path
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
image = load_image_file(path)
elif isinstance(raw_stream, ScreenCapturePictureSource):
try:
@@ -186,17 +185,15 @@ async def test_kc_target(
if screen_capture is None:
raise RuntimeError("No frame captured")
if isinstance(screen_capture.image, np.ndarray):
pil_image = Image.fromarray(screen_capture.image)
else:
if not isinstance(screen_capture.image, np.ndarray):
raise ValueError("Unexpected image format from engine")
image = screen_capture.image
else:
raise HTTPException(status_code=400, detail="Unsupported picture source type")
# 3b. Apply postprocessing filters (if the picture source has a filter chain)
pp_template_ids = chain.get("postprocessing_template_ids", [])
if pp_template_ids and pp_template_store:
img_array = np.array(pil_image)
image_pool = ImagePool()
for pp_id in pp_template_ids:
try:
@@ -208,15 +205,14 @@ async def test_kc_target(
for fi in flat_filters:
try:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(img_array, image_pool)
result = f.process_image(image, image_pool)
if result is not None:
img_array = result
image = result
except ValueError:
logger.warning(f"KC test: unknown filter '{fi.filter_id}', skipping")
pil_image = Image.fromarray(img_array)
# 4. Extract colors from each rectangle
img_array = np.array(pil_image)
img_array = image
h, w = img_array.shape[:2]
calc_fns = {
@@ -250,11 +246,8 @@ async def test_kc_target(
))
# 5. Encode frame as base64 JPEG
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
image_data_uri = f"data:image/jpeg;base64,{full_b64}"
from wled_controller.utils.image_codec import encode_jpeg_data_uri
image_data_uri = encode_jpeg_data_uri(image, quality=90)
return KCTestResponse(
image=image_data_uri,
@@ -411,8 +404,11 @@ async def test_kc_target_ws(
continue
prev_frame_ref = capture
pil_image = Image.fromarray(capture.image) if isinstance(capture.image, np.ndarray) else None
if pil_image is None:
if not isinstance(capture.image, np.ndarray):
await asyncio.sleep(frame_interval)
continue
cur_image = capture.image
if cur_image is None:
await asyncio.sleep(frame_interval)
continue
@@ -420,7 +416,6 @@ async def test_kc_target_ws(
chain = source_store_inst.resolve_stream_chain(target.picture_source_id)
pp_template_ids = chain.get("postprocessing_template_ids", [])
if pp_template_ids and pp_template_store_inst:
img_array = np.array(pil_image)
image_pool = ImagePool()
for pp_id in pp_template_ids:
try:
@@ -431,15 +426,14 @@ async def test_kc_target_ws(
for fi in flat_filters:
try:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(img_array, image_pool)
result = f.process_image(cur_image, image_pool)
if result is not None:
img_array = result
cur_image = result
except ValueError:
pass
pil_image = Image.fromarray(img_array)
# Extract colors
img_array = np.array(pil_image)
img_array = cur_image
h, w = img_array.shape[:2]
result_rects = []
@@ -466,18 +460,13 @@ async def test_kc_target_ws(
})
# Encode frame as JPEG
if preview_width and pil_image.width > preview_width:
ratio = preview_width / pil_image.width
thumb = pil_image.resize((preview_width, int(pil_image.height * ratio)), Image.LANCZOS)
else:
thumb = pil_image
buf = io.BytesIO()
thumb.save(buf, format="JPEG", quality=85)
b64 = base64.b64encode(buf.getvalue()).decode()
from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down
frame_to_encode = resize_down(cur_image, preview_width) if preview_width else cur_image
frame_uri = encode_jpeg_data_uri(frame_to_encode, quality=85)
await websocket.send_text(_json.dumps({
"type": "frame",
"image": f"data:image/jpeg;base64,{b64}",
"image": frame_uri,
"rectangles": result_rects,
"pattern_template_name": pattern_tmpl.name,
"interpolation_mode": settings.interpolation_mode,

View File

@@ -1,13 +1,10 @@
"""Picture source routes."""
import asyncio
import base64
import io
import time
import httpx
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import Response
@@ -115,16 +112,20 @@ async def validate_image(
img_bytes = path
def _process_image(src):
pil_image = Image.open(io.BytesIO(src) if isinstance(src, bytes) else src)
pil_image = pil_image.convert("RGB")
width, height = pil_image.size
thumb = pil_image.copy()
thumb.thumbnail((320, 320), Image.Resampling.LANCZOS)
buf = io.BytesIO()
thumb.save(buf, format="JPEG", quality=80)
buf.seek(0)
preview = f"data:image/jpeg;base64,{base64.b64encode(buf.getvalue()).decode()}"
return width, height, preview
from wled_controller.utils.image_codec import (
encode_jpeg_data_uri,
load_image_bytes,
load_image_file,
thumbnail as make_thumbnail,
)
if isinstance(src, bytes):
image = load_image_bytes(src)
else:
image = load_image_file(src)
h, w = image.shape[:2]
thumb = make_thumbnail(image, 320)
preview = encode_jpeg_data_uri(thumb, quality=80)
return w, h, preview
width, height, preview = await asyncio.to_thread(_process_image, img_bytes)
@@ -161,11 +162,12 @@ async def get_full_image(
img_bytes = path
def _encode_full(src):
pil_image = Image.open(io.BytesIO(src) if isinstance(src, bytes) else src)
pil_image = pil_image.convert("RGB")
buf = io.BytesIO()
pil_image.save(buf, format="JPEG", quality=90)
return buf.getvalue()
from wled_controller.utils.image_codec import encode_jpeg, load_image_bytes, load_image_file
if isinstance(src, bytes):
image = load_image_bytes(src)
else:
image = load_image_file(src)
return encode_jpeg(image, quality=90)
jpeg_bytes = await asyncio.to_thread(_encode_full, img_bytes)
return Response(content=jpeg_bytes, media_type="image/jpeg")
@@ -333,13 +335,9 @@ async def get_video_thumbnail(
store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Get a thumbnail for a video picture source (first frame)."""
import base64
from io import BytesIO
from PIL import Image
from wled_controller.core.processing.video_stream import extract_thumbnail
from wled_controller.storage.picture_source import VideoCaptureSource
from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down
try:
source = store.get_stream(stream_id)
@@ -352,18 +350,12 @@ async def get_video_thumbnail(
if frame is None:
raise HTTPException(status_code=404, detail="Could not extract thumbnail")
# Encode as JPEG
pil_img = Image.fromarray(frame)
# Resize to max 320px wide for thumbnail
if pil_img.width > 320:
ratio = 320 / pil_img.width
pil_img = pil_img.resize((320, int(pil_img.height * ratio)), Image.LANCZOS)
frame = resize_down(frame, 320)
h, w = frame.shape[:2]
data_uri = encode_jpeg_data_uri(frame, quality=80)
buf = BytesIO()
pil_img.save(buf, format="JPEG", quality=80)
b64 = base64.b64encode(buf.getvalue()).decode()
return {"thumbnail": f"data:image/jpeg;base64,{b64}", "width": pil_img.width, "height": pil_img.height}
return {"thumbnail": data_uri, "width": w, "height": h}
except HTTPException:
raise
@@ -408,16 +400,18 @@ async def test_picture_source(
source = raw_stream.image_source
start_time = time.perf_counter()
from wled_controller.utils.image_codec import load_image_bytes, load_image_file
if source.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
image = load_image_bytes(resp.content)
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = await asyncio.to_thread(lambda: Image.open(path).convert("RGB"))
image = await asyncio.to_thread(load_image_file, path)
actual_duration = time.perf_counter() - start_time
frame_count = 1
@@ -479,12 +473,13 @@ async def test_picture_source(
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
if not isinstance(last_frame.image, np.ndarray):
raise ValueError("Unexpected image format from engine")
image = last_frame.image
# Create thumbnail + encode (CPU-bound — run in thread)
from wled_controller.utils.image_codec import encode_jpeg_data_uri, thumbnail as make_thumbnail
pp_template_ids = chain["postprocessing_template_ids"]
flat_filters = None
if pp_template_ids:
@@ -494,45 +489,33 @@ async def test_picture_source(
except ValueError:
logger.warning(f"PP template {pp_template_ids[0]} not found, skipping postprocessing preview")
def _create_thumbnails_and_encode(pil_img, filters):
thumbnail_w = 640
aspect_ratio = pil_img.height / pil_img.width
thumbnail_h = int(thumbnail_w * aspect_ratio)
thumb = pil_img.copy()
thumb.thumbnail((thumbnail_w, thumbnail_h), Image.Resampling.LANCZOS)
def _create_thumbnails_and_encode(img, filters):
thumb = make_thumbnail(img, 640)
if filters:
pool = ImagePool()
def apply_filters(img):
arr = np.array(img)
def apply_filters(arr):
for fi in filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
return arr
thumb = apply_filters(thumb)
pil_img = apply_filters(pil_img)
img = apply_filters(img)
img_buffer = io.BytesIO()
thumb.save(img_buffer, format='JPEG', quality=85)
thumb_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumb_uri = encode_jpeg_data_uri(thumb, quality=85)
full_uri = encode_jpeg_data_uri(img, quality=90)
th, tw = thumb.shape[:2]
return tw, th, thumb_uri, full_uri
full_buffer = io.BytesIO()
pil_img.save(full_buffer, format='JPEG', quality=90)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
return thumbnail_w, thumbnail_h, thumb_b64, full_b64
thumbnail_width, thumbnail_height, thumbnail_b64, full_b64 = await asyncio.to_thread(
_create_thumbnails_and_encode, pil_image, flat_filters
thumbnail_width, thumbnail_height, thumbnail_data_uri, full_data_uri = await asyncio.to_thread(
_create_thumbnails_and_encode, image, flat_filters
)
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
height, width = image.shape[:2]
return TemplateTestResponse(
full_capture=CaptureImage(
@@ -635,15 +618,11 @@ async def test_picture_source_ws(
def _encode_video_frame(image, pw):
"""Encode numpy RGB image as JPEG base64 data URI."""
from PIL import Image as PILImage
pil = PILImage.fromarray(image)
if pw and pil.width > pw:
ratio = pw / pil.width
pil = pil.resize((pw, int(pil.height * ratio)), PILImage.LANCZOS)
buf = io.BytesIO()
pil.save(buf, format="JPEG", quality=80)
b64 = base64.b64encode(buf.getvalue()).decode()
return f"data:image/jpeg;base64,{b64}", pil.width, pil.height
from wled_controller.utils.image_codec import encode_jpeg_data_uri, resize_down
if pw:
image = resize_down(image, pw)
h, w = image.shape[:2]
return encode_jpeg_data_uri(image, quality=80), w, h
try:
await asyncio.get_event_loop().run_in_executor(None, video_stream.start)

View File

@@ -1,12 +1,9 @@
"""Postprocessing template routes."""
import base64
import io
import time
import httpx
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
from wled_controller.api.auth import AuthRequired
@@ -198,6 +195,13 @@ async def test_pp_template(
raw_stream = chain["raw_stream"]
from wled_controller.utils.image_codec import (
encode_jpeg_data_uri,
load_image_bytes,
load_image_file,
thumbnail as make_thumbnail,
)
if isinstance(raw_stream, StaticImagePictureSource):
# Static image: load directly
from pathlib import Path
@@ -209,12 +213,12 @@ async def test_pp_template(
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
resp = await client.get(source)
resp.raise_for_status()
pil_image = Image.open(io.BytesIO(resp.content)).convert("RGB")
image = load_image_bytes(resp.content)
else:
path = Path(source)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Image file not found: {source}")
pil_image = Image.open(path).convert("RGB")
image = load_image_file(path)
actual_duration = time.perf_counter() - start_time
frame_count = 1
@@ -268,53 +272,37 @@ async def test_pp_template(
if last_frame is None:
raise RuntimeError("No frames captured during test")
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
if not isinstance(last_frame.image, np.ndarray):
raise ValueError("Unexpected image format from engine")
image = last_frame.image
# Create thumbnail
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
thumb = make_thumbnail(image, 640)
# Apply postprocessing filters (expand filter_template references)
flat_filters = pp_store.resolve_filter_instances(pp_template.filters)
if flat_filters:
pool = ImagePool()
def apply_filters(img):
arr = np.array(img)
def apply_filters(arr):
for fi in flat_filters:
f = FilterRegistry.create_instance(fi.filter_id, fi.options)
result = f.process_image(arr, pool)
if result is not None:
arr = result
return Image.fromarray(arr)
return arr
thumbnail = apply_filters(thumbnail)
pil_image = apply_filters(pil_image)
thumb = apply_filters(thumb)
image = apply_filters(image)
# Encode thumbnail
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
# Encode as JPEG
thumbnail_data_uri = encode_jpeg_data_uri(thumb, quality=85)
full_data_uri = encode_jpeg_data_uri(image, quality=90)
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
thumb_w, thumb_h = thumbnail.size
height, width = image.shape[:2]
thumb_h, thumb_w = thumb.shape[:2]
return TemplateTestResponse(
full_capture=CaptureImage(

View File

@@ -1,11 +1,8 @@
"""Capture template, engine, and filter routes."""
import base64
import io
import time
import numpy as np
from PIL import Image
from fastapi import APIRouter, HTTPException, Depends, Query, WebSocket, WebSocketDisconnect
from wled_controller.api.auth import AuthRequired
@@ -320,38 +317,28 @@ def test_template(
if last_frame is None:
raise RuntimeError("No frames captured during test")
# Convert numpy array to PIL Image
if isinstance(last_frame.image, np.ndarray):
pil_image = Image.fromarray(last_frame.image)
else:
if not isinstance(last_frame.image, np.ndarray):
raise ValueError("Unexpected image format from engine")
image = last_frame.image
from wled_controller.utils.image_codec import (
encode_jpeg_data_uri,
thumbnail as make_thumbnail,
)
# Create thumbnail (640px wide, maintain aspect ratio)
thumbnail_width = 640
aspect_ratio = pil_image.height / pil_image.width
thumbnail_height = int(thumbnail_width * aspect_ratio)
thumbnail = pil_image.copy()
thumbnail.thumbnail((thumbnail_width, thumbnail_height), Image.Resampling.LANCZOS)
thumb = make_thumbnail(image, 640)
thumb_h, thumb_w = thumb.shape[:2]
# Encode thumbnail as JPEG
img_buffer = io.BytesIO()
thumbnail.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0)
thumbnail_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
thumbnail_data_uri = f"data:image/jpeg;base64,{thumbnail_b64}"
# Encode full-resolution image as JPEG
full_buffer = io.BytesIO()
pil_image.save(full_buffer, format='JPEG', quality=90)
full_buffer.seek(0)
full_b64 = base64.b64encode(full_buffer.getvalue()).decode('utf-8')
full_data_uri = f"data:image/jpeg;base64,{full_b64}"
# Encode as JPEG
thumbnail_data_uri = encode_jpeg_data_uri(thumb, quality=85)
full_data_uri = encode_jpeg_data_uri(image, quality=90)
# Calculate metrics
actual_fps = frame_count / actual_duration if actual_duration > 0 else 0
avg_capture_time_ms = (total_capture_time / frame_count * 1000) if frame_count > 0 else 0
width, height = pil_image.size
height, width = image.shape[:2]
return TemplateTestResponse(
full_capture=CaptureImage(
@@ -359,8 +346,8 @@ def test_template(
full_image=full_data_uri,
width=width,
height=height,
thumbnail_width=thumbnail_width,
thumbnail_height=thumbnail_height,
thumbnail_width=thumb_w,
thumbnail_height=thumb_h,
),
border_extraction=None,
performance=PerformanceMetrics(

View File

@@ -12,7 +12,6 @@ Prerequisites (system binaries, NOT Python packages):
- adb (bundled with scrcpy, or Android SDK Platform-Tools)
"""
import io
import os
import re
import shutil
@@ -22,7 +21,8 @@ import time
from typing import Any, Dict, List, Optional
import numpy as np
from PIL import Image
from wled_controller.utils.image_codec import load_image_bytes
from wled_controller.core.capture_engines.base import (
CaptureEngine,
@@ -144,8 +144,7 @@ def _screencap_once(adb: str, serial: str) -> Optional[np.ndarray]:
if result.returncode != 0 or len(result.stdout) < 100:
return None
img = Image.open(io.BytesIO(result.stdout))
return np.asarray(img.convert("RGB"))
return load_image_bytes(result.stdout)
except Exception as e:
logger.debug(f"screencap failed for {serial}: {e}")
return None

View File

@@ -2,8 +2,8 @@
from typing import List, Optional
import cv2
import numpy as np
from PIL import Image
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
from wled_controller.core.filters.image_pool import ImagePool
@@ -44,8 +44,7 @@ class DownscalerFilter(PostprocessingFilter):
if new_h == h and new_w == w:
return None
pil_img = Image.fromarray(image)
downscaled = np.array(pil_img.resize((new_w, new_h), Image.LANCZOS))
downscaled = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
result = image_pool.acquire(new_h, new_w, image.shape[2] if image.ndim == 3 else 3)
np.copyto(result, downscaled)

View File

@@ -2,8 +2,8 @@
from typing import List, Optional
import cv2
import numpy as np
from PIL import Image
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
from wled_controller.core.filters.image_pool import ImagePool
@@ -42,9 +42,8 @@ class PixelateFilter(PostprocessingFilter):
# vectorized C++ instead of per-block Python loop
small_w = max(1, w // block_size)
small_h = max(1, h // block_size)
pil_img = Image.fromarray(image)
small = pil_img.resize((small_w, small_h), Image.LANCZOS)
pixelated = np.array(small.resize((w, h), Image.NEAREST))
small = cv2.resize(image, (small_w, small_h), interpolation=cv2.INTER_AREA)
pixelated = cv2.resize(small, (w, h), interpolation=cv2.INTER_NEAREST)
np.copyto(image, pixelated)
return None

View File

@@ -9,8 +9,8 @@ import time
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
from PIL import Image
from wled_controller.core.processing.live_stream import LiveStream
from wled_controller.core.capture.screen_capture import (
@@ -46,8 +46,7 @@ def _process_kc_frame(capture, rect_names, rect_bounds, calc_fn, prev_colors_arr
t0 = time.perf_counter()
# Downsample to working resolution — 144x fewer pixels at 1080p
pil_img = Image.fromarray(capture.image)
small = np.array(pil_img.resize(KC_WORK_SIZE, Image.LANCZOS))
small = cv2.resize(capture.image, KC_WORK_SIZE, interpolation=cv2.INTER_AREA)
# Extract colors for each rectangle from the small image
n = len(rect_names)

View File

@@ -311,20 +311,16 @@ class LiveStreamManager:
This is acceptable because acquire() (the only caller chain) is always
invoked from background worker threads, never from the async event loop.
"""
from io import BytesIO
from pathlib import Path
from PIL import Image
from wled_controller.utils.image_codec import load_image_bytes, load_image_file
if image_source.startswith(("http://", "https://")):
response = httpx.get(image_source, timeout=15.0, follow_redirects=True)
response.raise_for_status()
pil_image = Image.open(BytesIO(response.content))
return load_image_bytes(response.content)
else:
path = Path(image_source)
if not path.exists():
raise FileNotFoundError(f"Image file not found: {image_source}")
pil_image = Image.open(path)
pil_image = pil_image.convert("RGB")
return np.array(pil_image)
return load_image_file(path)

View File

@@ -7,17 +7,16 @@ from pathlib import Path
from tkinter import messagebox
from typing import Callable
from PIL import Image
try:
import pystray
from PIL import Image
PYSTRAY_AVAILABLE = True
except ImportError:
PYSTRAY_AVAILABLE = False
def _load_icon(icon_path: Path) -> Image.Image:
def _load_icon(icon_path: Path) -> "Image.Image":
"""Load tray icon from PNG, with a solid-color fallback."""
if icon_path.exists():
return Image.open(icon_path)

View File

@@ -0,0 +1,91 @@
"""Image encoding/decoding/resizing utilities using OpenCV.
Replaces PIL/Pillow for JPEG encoding, image loading, and resizing operations.
All functions work with numpy RGB arrays (H, W, 3) uint8.
"""
import base64
from pathlib import Path
from typing import Tuple, Union
import cv2
import numpy as np
def encode_jpeg(image: np.ndarray, quality: int = 85) -> bytes:
"""Encode an RGB numpy array as JPEG bytes."""
bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
ok, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not ok:
raise RuntimeError("JPEG encoding failed")
return buf.tobytes()
def encode_jpeg_data_uri(image: np.ndarray, quality: int = 85) -> str:
"""Encode an RGB numpy array as a JPEG base64 data URI."""
raw = encode_jpeg(image, quality)
b64 = base64.b64encode(raw).decode("utf-8")
return f"data:image/jpeg;base64,{b64}"
def resize_image(image: np.ndarray, width: int, height: int) -> np.ndarray:
"""Resize an image to exact dimensions.
Uses INTER_AREA for downscaling (better quality, faster) and
INTER_LANCZOS4 for upscaling.
"""
h, w = image.shape[:2]
shrinking = (width * height) < (w * h)
interp = cv2.INTER_AREA if shrinking else cv2.INTER_LANCZOS4
return cv2.resize(image, (width, height), interpolation=interp)
def thumbnail(image: np.ndarray, max_width: int) -> np.ndarray:
"""Create a thumbnail that fits within max_width, preserving aspect ratio.
Uses INTER_AREA (optimal for downscaling).
"""
h, w = image.shape[:2]
if w <= max_width:
return image.copy()
scale = max_width / w
new_w = max_width
new_h = max(1, int(h * scale))
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
def resize_down(image: np.ndarray, max_width: int) -> np.ndarray:
"""Downscale if wider than max_width; return as-is otherwise.
Uses INTER_AREA (optimal for downscaling).
"""
h, w = image.shape[:2]
if w <= max_width:
return image
scale = max_width / w
new_w = max_width
new_h = max(1, int(h * scale))
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
def load_image_file(path: Union[str, Path]) -> np.ndarray:
"""Load an image file and return as RGB numpy array."""
path = str(path)
bgr = cv2.imread(path, cv2.IMREAD_COLOR)
if bgr is None:
raise FileNotFoundError(f"Cannot load image: {path}")
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
def load_image_bytes(data: bytes) -> np.ndarray:
"""Decode image bytes (JPEG, PNG, etc.) and return as RGB numpy array."""
arr = np.frombuffer(data, dtype=np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise ValueError("Cannot decode image data")
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
def image_size(image: np.ndarray) -> Tuple[int, int]:
"""Return (width, height) of an image array."""
return image.shape[1], image.shape[0]