Add pluggable postprocessing filter system with collapsible UI

Replace hardcoded gamma/saturation/brightness fields with a flexible
filter pipeline architecture. Templates now contain an ordered list of
filter instances, each with its own options schema. Filters operate on
full images before border extraction.

- Add filter framework: base class, registry, image pool, filter instance
- Implement 6 built-in filters: brightness, saturation, gamma, downscaler, pixelate, auto crop
- Move smoothing from PP templates to device stream settings (temporal, not spatial)
- Add GET /api/v1/filters endpoint for available filter types
- Dynamic filter UI in template modal with add/remove/reorder/collapse
- Replace camera icon with display icon for screen capture streams
- Legacy migration: existing templates auto-convert flat fields to filter list

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-11 11:57:19 +03:00
parent e8cbc73161
commit ebd6cc7d7d
16 changed files with 1115 additions and 192 deletions

View File

@@ -0,0 +1,21 @@
"""Postprocessing filter system.
Provides a pluggable filter architecture for image postprocessing.
Import this package to ensure all built-in filters are registered.
"""
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
from wled_controller.core.filters.filter_instance import FilterInstance
from wled_controller.core.filters.image_pool import ImagePool
from wled_controller.core.filters.registry import FilterRegistry
# Import builtin filters to trigger auto-registration
import wled_controller.core.filters.builtin # noqa: F401
__all__ = [
"FilterOptionDef",
"FilterInstance",
"FilterRegistry",
"ImagePool",
"PostprocessingFilter",
]

View File

@@ -0,0 +1,91 @@
"""Base classes for the postprocessing filter system."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import numpy as np
@dataclass
class FilterOptionDef:
"""Describes a single configurable option for a filter."""
key: str
label: str
option_type: str # "float" | "int"
default: Any
min_value: Any
max_value: Any
step: Any
def to_dict(self) -> dict:
return {
"key": self.key,
"label": self.label,
"type": self.option_type,
"default": self.default,
"min_value": self.min_value,
"max_value": self.max_value,
"step": self.step,
}
class PostprocessingFilter(ABC):
"""Base class for all postprocessing filters.
Each filter operates on a full image (np.ndarray H×W×3 uint8).
Filters that preserve dimensions modify in-place and return None.
Filters that change dimensions return a new array from the image pool.
"""
filter_id: str = ""
filter_name: str = ""
def __init__(self, options: Dict[str, Any]):
"""Initialize filter with validated options."""
self.options = self.validate_options(options)
@classmethod
@abstractmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
"""Return the list of configurable options for this filter type."""
...
@abstractmethod
def process_image(self, image: np.ndarray, image_pool: "ImagePool") -> Optional[np.ndarray]:
"""Process image.
Args:
image: Input image as np.ndarray (H, W, 3) dtype=uint8.
image_pool: Shared pool for acquiring new arrays when dimensions change.
Returns:
None if modified in-place (same dimensions).
New np.ndarray from image_pool if dimensions changed.
"""
...
@classmethod
def validate_options(cls, options: dict) -> dict:
"""Validate and clamp options against the schema. Returns cleaned dict."""
schema = cls.get_options_schema()
cleaned = {}
for opt_def in schema:
raw = options.get(opt_def.key, opt_def.default)
if opt_def.option_type == "float":
val = float(raw)
elif opt_def.option_type == "int":
val = int(raw)
else:
val = raw
# Clamp to range
if opt_def.min_value is not None and val < opt_def.min_value:
val = opt_def.min_value
if opt_def.max_value is not None and val > opt_def.max_value:
val = opt_def.max_value
cleaned[opt_def.key] = val
return cleaned
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.options})"

View File

@@ -0,0 +1,282 @@
"""Built-in postprocessing filters."""
from typing import Any, Dict, List, Optional
import numpy as np
from wled_controller.core.filters.base import FilterOptionDef, PostprocessingFilter
from wled_controller.core.filters.image_pool import ImagePool
from wled_controller.core.filters.registry import FilterRegistry
@FilterRegistry.register
class BrightnessFilter(PostprocessingFilter):
"""Adjusts image brightness by multiplying pixel values."""
filter_id = "brightness"
filter_name = "Brightness"
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return [
FilterOptionDef(
key="value",
label="Brightness",
option_type="float",
default=1.0,
min_value=0.0,
max_value=2.0,
step=0.05,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
value = self.options["value"]
if value == 1.0:
return None
# In-place float operation
arr = image.astype(np.float32)
arr *= value
np.clip(arr, 0, 255, out=arr)
np.copyto(image, arr.astype(np.uint8))
return None
@FilterRegistry.register
class SaturationFilter(PostprocessingFilter):
"""Adjusts color saturation via luminance blending."""
filter_id = "saturation"
filter_name = "Saturation"
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return [
FilterOptionDef(
key="value",
label="Saturation",
option_type="float",
default=1.0,
min_value=0.0,
max_value=2.0,
step=0.1,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
value = self.options["value"]
if value == 1.0:
return None
arr = image.astype(np.float32) / 255.0
lum = np.dot(arr[..., :3], [0.299, 0.587, 0.114])[..., np.newaxis]
arr[..., :3] = lum + (arr[..., :3] - lum) * value
np.clip(arr * 255.0, 0, 255, out=arr)
np.copyto(image, arr.astype(np.uint8))
return None
@FilterRegistry.register
class GammaFilter(PostprocessingFilter):
"""Applies gamma correction."""
filter_id = "gamma"
filter_name = "Gamma"
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return [
FilterOptionDef(
key="value",
label="Gamma",
option_type="float",
default=2.2,
min_value=0.1,
max_value=5.0,
step=0.1,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
value = self.options["value"]
if value == 1.0:
return None
arr = image.astype(np.float32) / 255.0
np.power(arr, 1.0 / value, out=arr)
np.clip(arr * 255.0, 0, 255, out=arr)
np.copyto(image, arr.astype(np.uint8))
return None
@FilterRegistry.register
class DownscalerFilter(PostprocessingFilter):
"""Downscales image by a factor. Returns a new image from the pool."""
filter_id = "downscaler"
filter_name = "Downscaler"
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return [
FilterOptionDef(
key="factor",
label="Scale Factor",
option_type="float",
default=0.5,
min_value=0.1,
max_value=1.0,
step=0.05,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
factor = self.options["factor"]
if factor >= 1.0:
return None
h, w = image.shape[:2]
new_h = max(1, int(h * factor))
new_w = max(1, int(w * factor))
if new_h == h and new_w == w:
return None
# Use PIL for high-quality downscaling
from PIL import Image
pil_img = Image.fromarray(image)
pil_img = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS)
result = image_pool.acquire(new_h, new_w, image.shape[2] if image.ndim == 3 else 3)
np.copyto(result, np.array(pil_img))
return result
@FilterRegistry.register
class PixelateFilter(PostprocessingFilter):
"""Pixelates the image by averaging blocks of pixels."""
filter_id = "pixelate"
filter_name = "Pixelate"
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return [
FilterOptionDef(
key="block_size",
label="Block Size",
option_type="int",
default=8,
min_value=2,
max_value=64,
step=1,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
block_size = self.options["block_size"]
if block_size <= 1:
return None
h, w = image.shape[:2]
# Process each block: compute mean and fill
for y in range(0, h, block_size):
for x in range(0, w, block_size):
y_end = min(y + block_size, h)
x_end = min(x + block_size, w)
block = image[y:y_end, x:x_end]
mean_color = block.mean(axis=(0, 1)).astype(np.uint8)
image[y:y_end, x:x_end] = mean_color
return None
@FilterRegistry.register
class AutoCropFilter(PostprocessingFilter):
"""Detects and crops black bars (letterboxing/pillarboxing) from the image."""
filter_id = "auto_crop"
filter_name = "Auto Crop"
@classmethod
def get_options_schema(cls) -> List[FilterOptionDef]:
return [
FilterOptionDef(
key="threshold",
label="Black Threshold",
option_type="int",
default=15,
min_value=0,
max_value=50,
step=1,
),
FilterOptionDef(
key="min_bar_size",
label="Min Bar Size (px)",
option_type="int",
default=20,
min_value=0,
max_value=200,
step=5,
),
]
def process_image(self, image: np.ndarray, image_pool: ImagePool) -> Optional[np.ndarray]:
threshold = self.options.get("threshold", 15)
min_bar_size = self.options.get("min_bar_size", 20)
h, w = image.shape[:2]
min_h = max(1, h // 10)
min_w = max(1, w // 10)
# Compute max channel value per row and per column (vectorized)
row_max = image.max(axis=(1, 2)) # shape (h,)
col_max = image.max(axis=(0, 2)) # shape (w,)
# Scan from top
top = 0
while top < h and row_max[top] <= threshold:
top += 1
# Scan from bottom
bottom = h
while bottom > top and row_max[bottom - 1] <= threshold:
bottom -= 1
# Scan from left
left = 0
while left < w and col_max[left] <= threshold:
left += 1
# Scan from right
right = w
while right > left and col_max[right - 1] <= threshold:
right -= 1
# Apply min_bar_size: only crop if the detected bar is large enough
if top < min_bar_size:
top = 0
if (h - bottom) < min_bar_size:
bottom = h
if left < min_bar_size:
left = 0
if (w - right) < min_bar_size:
right = w
# Safety: don't crop if remaining content is too small
if (bottom - top) < min_h:
top, bottom = 0, h
if (right - left) < min_w:
left, right = 0, w
# No crop needed
if top == 0 and bottom == h and left == 0 and right == w:
return None
cropped_h = bottom - top
cropped_w = right - left
channels = image.shape[2] if image.ndim == 3 else 3
result = image_pool.acquire(cropped_h, cropped_w, channels)
np.copyto(result, image[top:bottom, left:right])
return result

View File

@@ -0,0 +1,25 @@
"""FilterInstance data model for serializable filter configurations."""
from dataclasses import dataclass, field
from typing import Any, Dict
@dataclass
class FilterInstance:
"""A configured instance of a filter within a postprocessing template."""
filter_id: str
options: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"filter_id": self.filter_id,
"options": dict(self.options),
}
@classmethod
def from_dict(cls, data: dict) -> "FilterInstance":
return cls(
filter_id=data["filter_id"],
options=data.get("options", {}),
)

View File

@@ -0,0 +1,41 @@
"""Reusable numpy array pool to minimize allocation churn during fast image processing."""
from collections import defaultdict
from typing import Dict, List, Tuple
import numpy as np
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class ImagePool:
"""Pool of pre-allocated numpy arrays keyed by shape.
When a filter needs a new array (e.g. for downscaling), it acquires one
from the pool instead of allocating. After use, arrays are released back.
"""
def __init__(self, max_per_shape: int = 4):
self._pool: Dict[Tuple[int, ...], List[np.ndarray]] = defaultdict(list)
self._max_per_shape = max_per_shape
def acquire(self, height: int, width: int, channels: int = 3) -> np.ndarray:
"""Get a pre-allocated array or create a new one."""
shape = (height, width, channels)
bucket = self._pool[shape]
if bucket:
return bucket.pop()
return np.empty(shape, dtype=np.uint8)
def release(self, array: np.ndarray) -> None:
"""Return an array to the pool for reuse."""
shape = array.shape
bucket = self._pool[shape]
if len(bucket) < self._max_per_shape:
bucket.append(array)
def clear(self) -> None:
"""Release all pooled arrays."""
self._pool.clear()

View File

@@ -0,0 +1,53 @@
"""Filter registry for discovering and instantiating postprocessing filters."""
from typing import Dict, List, Type
from wled_controller.core.filters.base import PostprocessingFilter
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class FilterRegistry:
"""Singleton registry of all available postprocessing filter types."""
_filters: Dict[str, Type[PostprocessingFilter]] = {}
@classmethod
def register(cls, filter_cls: Type[PostprocessingFilter]) -> Type[PostprocessingFilter]:
"""Register a filter class. Can be used as a decorator."""
filter_id = filter_cls.filter_id
if not filter_id:
raise ValueError(f"Filter class {filter_cls.__name__} must define filter_id")
if filter_id in cls._filters:
logger.warning(f"Overwriting filter registration for '{filter_id}'")
cls._filters[filter_id] = filter_cls
logger.debug(f"Registered filter: {filter_id} ({filter_cls.__name__})")
return filter_cls
@classmethod
def get(cls, filter_id: str) -> Type[PostprocessingFilter]:
"""Get a filter class by ID.
Raises:
ValueError: If filter_id is not registered.
"""
if filter_id not in cls._filters:
raise ValueError(f"Unknown filter type: '{filter_id}'")
return cls._filters[filter_id]
@classmethod
def get_all(cls) -> Dict[str, Type[PostprocessingFilter]]:
"""Get all registered filter types."""
return dict(cls._filters)
@classmethod
def create_instance(cls, filter_id: str, options: dict) -> PostprocessingFilter:
"""Create a filter instance from a filter_id and options dict."""
filter_cls = cls.get(filter_id)
return filter_cls(options)
@classmethod
def is_registered(cls, filter_id: str) -> bool:
"""Check if a filter ID is registered."""
return filter_id in cls._filters

View File

@@ -14,7 +14,8 @@ from wled_controller.core.calibration import (
create_default_calibration,
)
from wled_controller.core.capture_engines import CaptureEngine, EngineRegistry
from wled_controller.core.pixel_processor import apply_color_correction, smooth_colors
from wled_controller.core.filters import FilterInstance, FilterRegistry, ImagePool, PostprocessingFilter
from wled_controller.core.pixel_processor import smooth_colors
from wled_controller.core.screen_capture import extract_border_pixels
from wled_controller.core.wled_client import WLEDClient
from wled_controller.utils import get_logger
@@ -106,10 +107,9 @@ class ProcessorState:
resolved_target_fps: Optional[int] = None
resolved_engine_type: Optional[str] = None
resolved_engine_config: Optional[dict] = None
resolved_gamma: Optional[float] = None
resolved_saturation: Optional[float] = None
resolved_brightness: Optional[float] = None
resolved_smoothing: Optional[float] = None
resolved_filters: Optional[List[FilterInstance]] = None
image_pool: Optional[ImagePool] = None
filter_instances: Optional[List[PostprocessingFilter]] = None
class ProcessorManager:
@@ -298,10 +298,7 @@ class ProcessorManager:
if pp_template_ids and self._pp_template_store:
try:
pp = self._pp_template_store.get_template(pp_template_ids[0])
state.resolved_gamma = pp.gamma
state.resolved_saturation = pp.saturation
state.resolved_brightness = pp.brightness
state.resolved_smoothing = pp.smoothing
state.resolved_filters = pp.filters
except ValueError:
logger.warning(f"PP template {pp_template_ids[0]} not found, using defaults")
@@ -314,13 +311,17 @@ class ProcessorManager:
except ValueError as e:
logger.warning(f"Failed to resolve stream {state.picture_stream_id}: {e}, falling back to legacy settings")
# Fallback: use legacy device settings
# Fallback: use legacy device settings (construct filters from flat fields)
state.resolved_display_index = state.settings.display_index
state.resolved_target_fps = state.settings.fps
state.resolved_gamma = state.settings.gamma
state.resolved_saturation = state.settings.saturation
state.resolved_brightness = state.settings.brightness
state.resolved_smoothing = state.settings.smoothing
legacy_filters = []
if state.settings.brightness != 1.0:
legacy_filters.append(FilterInstance("brightness", {"value": state.settings.brightness}))
if state.settings.saturation != 1.0:
legacy_filters.append(FilterInstance("saturation", {"value": state.settings.saturation}))
if state.settings.gamma != 1.0:
legacy_filters.append(FilterInstance("gamma", {"value": state.settings.gamma}))
state.resolved_filters = legacy_filters
# Resolve engine from legacy capture_template_id
if state.capture_template_id and self._capture_template_store:
@@ -457,23 +458,40 @@ class ProcessorManager:
# Use resolved values (populated by _resolve_stream_settings)
display_index = state.resolved_display_index or settings.display_index
target_fps = state.resolved_target_fps or settings.fps
gamma = state.resolved_gamma if state.resolved_gamma is not None else settings.gamma
saturation = state.resolved_saturation if state.resolved_saturation is not None else settings.saturation
pp_brightness = state.resolved_brightness if state.resolved_brightness is not None else settings.brightness
smoothing = state.resolved_smoothing if state.resolved_smoothing is not None else settings.smoothing
smoothing = settings.smoothing
# These always come from device settings (LED projection)
border_width = settings.border_width
wled_brightness = settings.brightness # WLED hardware brightness
# Instantiate filter objects once (not per-frame)
resolved_filters = state.resolved_filters or []
image_pool = ImagePool()
state.image_pool = image_pool
filter_objects = []
for fi in resolved_filters:
try:
filter_objects.append(FilterRegistry.create_instance(fi.filter_id, fi.options))
except ValueError as e:
logger.warning(f"Skipping unknown filter '{fi.filter_id}': {e}")
state.filter_instances = filter_objects
logger.info(
f"Processing loop started for {device_id} "
f"(display={display_index}, fps={target_fps})"
f"(display={display_index}, fps={target_fps}, filters={len(filter_objects)})"
)
frame_time = 1.0 / target_fps
fps_samples = []
def _apply_filters(image):
"""Apply all postprocessing filters to the captured image."""
for f in filter_objects:
result = f.process_image(image, image_pool)
if result is not None:
image = result
return image
try:
while state.is_running:
loop_start = time.time()
@@ -490,21 +508,16 @@ class ProcessorManager:
display_index
)
# Apply postprocessing filters to the full captured image
if filter_objects:
capture.image = await asyncio.to_thread(_apply_filters, capture.image)
# Extract border pixels
border_pixels = await asyncio.to_thread(extract_border_pixels, capture, border_width)
# Map to LED colors
led_colors = await asyncio.to_thread(state.pixel_mapper.map_border_to_leds, border_pixels)
# Apply color correction from postprocessing
led_colors = await asyncio.to_thread(
apply_color_correction,
led_colors,
gamma=gamma,
saturation=saturation,
brightness=pp_brightness,
)
# Apply smoothing from postprocessing
if state.previous_colors and smoothing > 0:
led_colors = await asyncio.to_thread(