Polymorphism Phase 2 + remove unused gamma/saturation fields

ProcessorManager: replace all isinstance checks with property-based
dispatch via base TargetProcessor (device_id, led_client,
get_display_index, update_device, update_calibration).

Remove gamma/saturation from ProcessingSettings, ColorCorrection
schema, serialization, and migration — these were never used in the
processing pipeline and are handled by postprocessing template filters.
Delete dead apply_color_correction() function.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-20 02:34:03 +03:00
parent 99f47fdbf9
commit c4e0257389
11 changed files with 57 additions and 135 deletions

View File

@@ -201,12 +201,11 @@ Get processing settings.
{
"display_index": 0,
"fps": 30,
"border_width": 10,
"color_correction": {
"gamma": 2.2,
"saturation": 1.0,
"brightness": 1.0
}
"brightness": 1.0,
"smoothing": 0.3,
"interpolation_mode": "average",
"standby_interval": 1.0,
"state_check_interval": 30
}
```
@@ -219,12 +218,7 @@ Update processing settings.
{
"display_index": 1,
"fps": 60,
"border_width": 15,
"color_correction": {
"gamma": 2.4,
"saturation": 1.2,
"brightness": 0.8
}
"brightness": 0.8
}
```

View File

@@ -63,7 +63,7 @@ router = APIRouter()
def _settings_to_core(schema: ProcessingSettingsSchema) -> ProcessingSettings:
"""Convert schema ProcessingSettings to core ProcessingSettings."""
settings = ProcessingSettings(
return ProcessingSettings(
display_index=schema.display_index,
fps=schema.fps,
interpolation_mode=schema.interpolation_mode,
@@ -72,17 +72,10 @@ def _settings_to_core(schema: ProcessingSettingsSchema) -> ProcessingSettings:
standby_interval=schema.standby_interval,
state_check_interval=schema.state_check_interval,
)
if schema.color_correction:
settings.gamma = schema.color_correction.gamma
settings.saturation = schema.color_correction.saturation
# color_correction.brightness maps to settings.brightness
settings.brightness = schema.color_correction.brightness
return settings
def _settings_to_schema(settings: ProcessingSettings) -> ProcessingSettingsSchema:
"""Convert core ProcessingSettings to schema ProcessingSettings."""
from wled_controller.api.schemas.picture_targets import ColorCorrection
return ProcessingSettingsSchema(
display_index=settings.display_index,
fps=settings.fps,
@@ -91,11 +84,6 @@ def _settings_to_schema(settings: ProcessingSettings) -> ProcessingSettingsSchem
smoothing=settings.smoothing,
standby_interval=settings.standby_interval,
state_check_interval=settings.state_check_interval,
color_correction=ColorCorrection(
gamma=settings.gamma,
saturation=settings.saturation,
brightness=settings.brightness,
),
)
@@ -432,23 +420,11 @@ async def update_target_settings(
fps=settings.fps if 'fps' in sent else existing.fps,
interpolation_mode=settings.interpolation_mode if 'interpolation_mode' in sent else existing.interpolation_mode,
brightness=settings.brightness if 'brightness' in sent else existing.brightness,
gamma=existing.gamma,
saturation=existing.saturation,
smoothing=settings.smoothing if 'smoothing' in sent else existing.smoothing,
standby_interval=settings.standby_interval if 'standby_interval' in sent else existing.standby_interval,
state_check_interval=settings.state_check_interval if 'state_check_interval' in sent else existing.state_check_interval,
)
# Apply color_correction fields if explicitly sent
if 'color_correction' in sent and settings.color_correction:
cc_sent = settings.color_correction.model_fields_set
if 'brightness' in cc_sent:
new_settings.brightness = settings.color_correction.brightness
if 'gamma' in cc_sent:
new_settings.gamma = settings.color_correction.gamma
if 'saturation' in cc_sent:
new_settings.saturation = settings.color_correction.saturation
# Update in store
target_store.update_target(target_id, settings=new_settings)

View File

@@ -24,7 +24,6 @@ from .devices import (
DeviceUpdate,
)
from .picture_targets import (
ColorCorrection,
PictureTargetCreate,
PictureTargetListResponse,
PictureTargetResponse,
@@ -90,7 +89,6 @@ __all__ = [
"DeviceResponse",
"DeviceStateResponse",
"DeviceUpdate",
"ColorCorrection",
"PictureTargetCreate",
"PictureTargetListResponse",
"PictureTargetResponse",

View File

@@ -8,14 +8,6 @@ from pydantic import BaseModel, Field
from wled_controller.core.processing.processing_settings import DEFAULT_STATE_CHECK_INTERVAL
class ColorCorrection(BaseModel):
"""Color correction settings."""
gamma: float = Field(default=2.2, description="Gamma correction", ge=0.1, le=5.0)
saturation: float = Field(default=1.0, description="Saturation multiplier", ge=0.0, le=2.0)
brightness: float = Field(default=1.0, description="Brightness multiplier", ge=0.0, le=1.0)
class ProcessingSettings(BaseModel):
"""Processing settings for a picture target."""
@@ -29,10 +21,6 @@ class ProcessingSettings(BaseModel):
default=DEFAULT_STATE_CHECK_INTERVAL, ge=5, le=600,
description="Seconds between WLED health checks"
)
color_correction: Optional[ColorCorrection] = Field(
default_factory=ColorCorrection,
description="Color correction settings"
)
class KeyColorRectangleSchema(BaseModel):

View File

@@ -8,56 +8,6 @@ from wled_controller.utils import get_logger
logger = get_logger(__name__)
def apply_color_correction(
colors: List[Tuple[int, int, int]],
gamma: float = 2.2,
saturation: float = 1.0,
brightness: float = 1.0,
) -> List[Tuple[int, int, int]]:
"""Apply color correction to LED colors.
Args:
colors: List of (R, G, B) tuples
gamma: Gamma correction factor (default 2.2)
saturation: Saturation multiplier (0.0-2.0)
brightness: Brightness multiplier (0.0-1.0)
Returns:
Corrected list of (R, G, B) tuples
"""
if not colors:
return colors
# Convert to numpy array for efficient processing
colors_array = np.array(colors, dtype=np.float32) / 255.0
# Apply brightness
if brightness != 1.0:
colors_array *= brightness
# Apply saturation
if saturation != 1.0:
# Convert RGB to HSV-like saturation adjustment
# Calculate luminance (grayscale)
luminance = np.dot(colors_array, [0.299, 0.587, 0.114])
luminance = luminance[:, np.newaxis] # Reshape for broadcasting
# Blend between grayscale and color based on saturation
colors_array = luminance + (colors_array - luminance) * saturation
# Apply gamma correction
if gamma != 1.0:
colors_array = np.power(colors_array, 1.0 / gamma)
# Clamp to valid range and convert back to integers
colors_array = np.clip(colors_array * 255.0, 0, 255).astype(np.uint8)
# Convert back to list of tuples
corrected_colors = [tuple(color) for color in colors_array]
return corrected_colors
def smooth_colors(
current_colors: List[Tuple[int, int, int]],
previous_colors: List[Tuple[int, int, int]],

View File

@@ -12,8 +12,6 @@ class ProcessingSettings:
display_index: int = 0
fps: int = 30
brightness: float = 1.0
gamma: float = 2.2
saturation: float = 1.0
smoothing: float = 0.3
interpolation_mode: str = "average"
standby_interval: float = 1.0 # seconds between keepalive sends when screen is static

View File

@@ -191,7 +191,7 @@ class ProcessorManager:
# Check if any processor is using this device
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.device_id == device_id:
if proc.device_id == device_id:
raise RuntimeError(
f"Cannot remove device {device_id}: target {proc.target_id} is using it"
)
@@ -217,7 +217,7 @@ class ProcessorManager:
def update_calibration(self, device_id: str, calibration: CalibrationConfig):
"""Update calibration for a device.
Also propagates to any WledTargetProcessor using this device.
Also propagates to any target processor using this device.
"""
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not found")
@@ -233,9 +233,9 @@ class ProcessorManager:
ds.calibration = calibration
# Propagate to active WLED processors
# Propagate to active processors using this device
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.device_id == device_id:
if proc.device_id == device_id:
proc.update_calibration(calibration)
logger.info(f"Updated calibration for device {device_id}")
@@ -358,10 +358,8 @@ class ProcessorManager:
proc.update_source(picture_source_id)
def update_target_device(self, target_id: str, device_id: str):
"""Update the device for a WLED target."""
"""Update the device for a target."""
proc = self._get_processor(target_id)
if not isinstance(proc, WledTargetProcessor):
raise ValueError(f"Target {target_id} is not a WLED target")
if device_id not in self._devices:
raise ValueError(f"Device {device_id} not registered")
proc.update_device(device_id)
@@ -370,12 +368,11 @@ class ProcessorManager:
"""Start processing for a target (any type)."""
proc = self._get_processor(target_id)
# Enforce one-target-per-device for WLED targets
if isinstance(proc, WledTargetProcessor):
# Enforce one-target-per-device for device-aware targets
if proc.device_id is not None:
for other_id, other in self._processors.items():
if (
other_id != target_id
and isinstance(other, WledTargetProcessor)
and other.device_id == proc.device_id
and other.is_running
):
@@ -383,8 +380,7 @@ class ProcessorManager:
f"Device {proc.device_id} is already being processed by target {other_id}"
)
# Close cached idle client — processor creates its own connection
if isinstance(proc, WledTargetProcessor):
# Close cached idle client — processor creates its own connection
await self._close_idle_client(proc.device_id)
await proc.start()
@@ -400,7 +396,7 @@ class ProcessorManager:
await proc.stop()
# Auto-shutdown device if applicable
if isinstance(proc, WledTargetProcessor):
if proc.device_id is not None:
await self._restore_device_idle_state(proc.device_id)
def get_target_state(self, target_id: str) -> dict:
@@ -411,8 +407,8 @@ class ProcessorManager:
proc = self._get_processor(target_id)
state = proc.get_state()
# Merge device health for WLED targets
if isinstance(proc, WledTargetProcessor) and proc.device_id in self._devices:
# Merge device health for device-aware targets
if proc.device_id is not None and proc.device_id in self._devices:
h = self._devices[proc.device_id].health
state.update({
"device_online": h.online,
@@ -440,14 +436,14 @@ class ProcessorManager:
def is_device_processing(self, device_id: str) -> bool:
"""Check if any target is processing for a device."""
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.device_id == device_id and proc.is_running:
if proc.device_id == device_id and proc.is_running:
return True
return False
def get_processing_target_for_device(self, device_id: str) -> Optional[str]:
"""Get the target_id that is currently processing for a device."""
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.device_id == device_id and proc.is_running:
if proc.device_id == device_id and proc.is_running:
return proc.target_id
return None
@@ -468,7 +464,7 @@ class ProcessorManager:
await proc.start_overlay(target_name)
# Light up device LEDs with edge test colors while overlay is visible
if isinstance(proc, WledTargetProcessor) and not proc.is_running:
if proc.device_id is not None and not proc.is_running:
try:
await self.set_test_mode(proc.device_id, self._OVERLAY_EDGE_COLORS)
except Exception as e:
@@ -479,7 +475,7 @@ class ProcessorManager:
await proc.stop_overlay()
# Clear device LEDs when overlay is dismissed
if isinstance(proc, WledTargetProcessor) and not proc.is_running:
if proc.device_id is not None and not proc.is_running:
try:
await self.set_test_mode(proc.device_id, {})
except Exception as e:
@@ -596,9 +592,9 @@ class ProcessorManager:
logger.error(f"Failed to clear pixels for {device_id}: {e}")
def _find_active_led_client(self, device_id: str):
"""Find an active LED client for a device (from a running WLED processor)."""
"""Find an active LED client for a device (from a running processor)."""
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.device_id == device_id and proc.is_running and proc.led_client:
if proc.device_id == device_id and proc.is_running and proc.led_client:
return proc.led_client
return None
@@ -607,14 +603,14 @@ class ProcessorManager:
def is_display_locked(self, display_index: int) -> bool:
"""Check if a display is currently being captured by any target."""
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.is_running and proc.settings.display_index == display_index:
if proc.is_running and proc.get_display_index() == display_index:
return True
return False
def get_display_lock_info(self, display_index: int) -> Optional[str]:
"""Get the device ID that is currently capturing from a display."""
for proc in self._processors.values():
if isinstance(proc, WledTargetProcessor) and proc.is_running and proc.settings.display_index == display_index:
if proc.is_running and proc.get_display_index() == display_index:
return proc.device_id
return None
@@ -749,9 +745,8 @@ class ProcessorManager:
def _device_is_processing(self, device_id: str) -> bool:
"""Check if any target is actively streaming to this device."""
return any(
isinstance(p, WledTargetProcessor) and p.is_running
p.device_id == device_id and p.is_running
for p in self._processors.values()
if isinstance(p, WledTargetProcessor) and p.device_id == device_id
)
async def _health_check_loop(self, device_id: str):

View File

@@ -157,6 +157,30 @@ class TargetProcessor(ABC):
"""Update the picture source ID."""
self._picture_source_id = picture_source_id
def update_device(self, device_id: str) -> None:
"""Update device association. Raises for targets without devices."""
raise ValueError(f"Target {self._target_id} does not support device assignment")
def update_calibration(self, calibration) -> None:
"""Update calibration. No-op for targets without devices."""
pass
# ----- Device / display info (overridden by device-aware subclasses) -----
@property
def device_id(self) -> Optional[str]:
"""Device ID this processor streams to, or None."""
return None
@property
def led_client(self):
"""Active LED client, or None."""
return None
def get_display_index(self) -> Optional[int]:
"""Display index being captured, or None."""
return None
# ----- State / Metrics reporting -----
@abstractmethod

View File

@@ -261,6 +261,12 @@ class WledTargetProcessor(TargetProcessor):
interpolation_mode=self._settings.interpolation_mode,
)
def get_display_index(self) -> Optional[int]:
"""Display index being captured."""
if self._resolved_display_index is not None:
return self._resolved_display_index
return self._settings.display_index
# ----- State / Metrics -----
def get_state(self) -> dict:

View File

@@ -79,10 +79,7 @@ def _migrate_devices_to_targets():
settings = ProcessingSettings(
display_index=legacy_settings.get("display_index", 0),
fps=legacy_settings.get("fps", 30),
border_width=legacy_settings.get("border_width", 10),
brightness=legacy_settings.get("brightness", 1.0),
gamma=legacy_settings.get("gamma", 2.2),
saturation=legacy_settings.get("saturation", 1.0),
smoothing=legacy_settings.get("smoothing", 0.3),
interpolation_mode=legacy_settings.get("interpolation_mode", "average"),
state_check_interval=legacy_settings.get("state_check_interval", DEFAULT_STATE_CHECK_INTERVAL),

View File

@@ -59,8 +59,6 @@ class WledPictureTarget(PictureTarget):
"display_index": self.settings.display_index,
"fps": self.settings.fps,
"brightness": self.settings.brightness,
"gamma": self.settings.gamma,
"saturation": self.settings.saturation,
"smoothing": self.settings.smoothing,
"interpolation_mode": self.settings.interpolation_mode,
"standby_interval": self.settings.standby_interval,
@@ -78,8 +76,6 @@ class WledPictureTarget(PictureTarget):
display_index=settings_data.get("display_index", 0),
fps=settings_data.get("fps", 30),
brightness=settings_data.get("brightness", 1.0),
gamma=settings_data.get("gamma", 2.2),
saturation=settings_data.get("saturation", 1.0),
smoothing=settings_data.get("smoothing", 0.3),
interpolation_mode=settings_data.get("interpolation_mode", "average"),
standby_interval=settings_data.get("standby_interval", 1.0),