Add EntitySelect/IconSelect UI improvements across modals
- Portal IconSelect popups to document.body with position:fixed to prevent clipping by modal overflow-y:auto - Replace custom scene selectors in automation editor with EntitySelect command-palette pickers (main scene + fallback scene) - Add IconSelect grid for automation deactivation mode (none/revert/fallback) - Add IconSelect grid for automation condition type and match type - Replace mapped zone source dropdowns with EntitySelect pickers - Replace scene target selector with EntityPalette.pick() pattern - Remove effect palette preview bar from CSS editor - Remove sensitivity badge from audio color strip source cards - Clean up unused scene-selector CSS and scene-target-add-row CSS - Add locale keys for all new UI elements across en/ru/zh Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
"""Calibration system for mapping screen pixels to LED positions."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Literal, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Literal, Optional, Set, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
@@ -51,6 +51,19 @@ class CalibrationSegment:
|
||||
reverse: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class CalibrationLine:
|
||||
"""One LED line in advanced calibration — references one picture source edge."""
|
||||
|
||||
picture_source_id: str
|
||||
edge: Literal["top", "right", "bottom", "left"]
|
||||
led_count: int
|
||||
span_start: float = 0.0
|
||||
span_end: float = 1.0
|
||||
reverse: bool = False
|
||||
border_width: int = 10
|
||||
|
||||
|
||||
@dataclass
|
||||
class CalibrationConfig:
|
||||
"""Complete calibration configuration.
|
||||
@@ -59,8 +72,14 @@ class CalibrationConfig:
|
||||
are derived at runtime via the `segments` property.
|
||||
"""
|
||||
|
||||
layout: Literal["clockwise", "counterclockwise"]
|
||||
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"]
|
||||
# Mode: "simple" = 4-edge model (backward compat), "advanced" = generic line list
|
||||
mode: Literal["simple", "advanced"] = "simple"
|
||||
# Advanced mode: ordered list of CalibrationLine objects (ignored in simple mode)
|
||||
lines: List[CalibrationLine] = field(default_factory=list)
|
||||
|
||||
# Simple mode fields (also used as defaults for CalibrationConfig constructor)
|
||||
layout: Literal["clockwise", "counterclockwise"] = "clockwise"
|
||||
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"] = "bottom_left"
|
||||
offset: int = 0
|
||||
leds_top: int = 0
|
||||
leds_right: int = 0
|
||||
@@ -135,6 +154,20 @@ class CalibrationConfig:
|
||||
Raises:
|
||||
ValueError: If configuration is invalid
|
||||
"""
|
||||
if self.mode == "advanced":
|
||||
if not self.lines:
|
||||
raise ValueError("Advanced calibration must have at least one line")
|
||||
for i, line in enumerate(self.lines):
|
||||
if line.led_count <= 0:
|
||||
raise ValueError(f"Line {i}: LED count must be positive, got {line.led_count}")
|
||||
if not (0.0 <= line.span_start <= 1.0) or not (0.0 <= line.span_end <= 1.0):
|
||||
raise ValueError(f"Line {i}: span must be in [0.0, 1.0]")
|
||||
if line.span_end <= line.span_start:
|
||||
raise ValueError(f"Line {i}: span_end must be greater than span_start")
|
||||
if line.border_width < 1:
|
||||
raise ValueError(f"Line {i}: border_width must be at least 1")
|
||||
return True
|
||||
|
||||
total = self.get_total_leds()
|
||||
if total <= 0:
|
||||
raise ValueError("Calibration must have at least one LED")
|
||||
@@ -154,9 +187,26 @@ class CalibrationConfig:
|
||||
return True
|
||||
|
||||
def get_total_leds(self) -> int:
|
||||
"""Get total number of LEDs across all edges."""
|
||||
"""Get total number of LEDs across all edges/lines."""
|
||||
if self.mode == "advanced":
|
||||
return sum(line.led_count for line in self.lines)
|
||||
return self.leds_top + self.leds_right + self.leds_bottom + self.leds_left
|
||||
|
||||
def get_required_picture_source_ids(self) -> List[str]:
|
||||
"""Get deduplicated list of picture source IDs referenced by lines.
|
||||
|
||||
Returns empty list for simple mode (the stream provides the source).
|
||||
"""
|
||||
if self.mode != "advanced":
|
||||
return []
|
||||
seen: Set[str] = set()
|
||||
result: List[str] = []
|
||||
for line in self.lines:
|
||||
if line.picture_source_id not in seen:
|
||||
seen.add(line.picture_source_id)
|
||||
result.append(line.picture_source_id)
|
||||
return result
|
||||
|
||||
def get_segment_for_edge(self, edge: str) -> CalibrationSegment | None:
|
||||
"""Get segment configuration for a specific edge."""
|
||||
for seg in self.segments:
|
||||
@@ -408,6 +458,216 @@ class PixelMapper:
|
||||
return led_colors
|
||||
|
||||
|
||||
class AdvancedPixelMapper:
|
||||
"""Maps multi-source screen pixels to LED colors for advanced calibration.
|
||||
|
||||
Each CalibrationLine references a picture source and an edge, with its own
|
||||
span and border_width. Frames from multiple sources are passed in as a dict.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
calibration: CalibrationConfig,
|
||||
interpolation_mode: Literal["average", "median", "dominant"] = "average",
|
||||
):
|
||||
self.calibration = calibration
|
||||
self.interpolation_mode = interpolation_mode
|
||||
calibration.validate()
|
||||
|
||||
if interpolation_mode == "average":
|
||||
self._calc_color = calculate_average_color
|
||||
elif interpolation_mode == "median":
|
||||
self._calc_color = calculate_median_color
|
||||
elif interpolation_mode == "dominant":
|
||||
self._calc_color = calculate_dominant_color
|
||||
else:
|
||||
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
|
||||
|
||||
total_leds = calibration.get_total_leds()
|
||||
self._total_leds = total_leds
|
||||
self._led_buf = np.zeros((total_leds, 3), dtype=np.uint8)
|
||||
self._use_fast_avg = interpolation_mode == "average"
|
||||
|
||||
# Build segment-like metadata from lines (led_start for each line)
|
||||
offset = calibration.offset % total_leds if total_leds > 0 else 0
|
||||
self._line_indices: List[np.ndarray] = []
|
||||
led_start = 0
|
||||
for line in calibration.lines:
|
||||
indices = np.arange(led_start, led_start + line.led_count)
|
||||
if line.reverse:
|
||||
indices = indices[::-1]
|
||||
if offset > 0:
|
||||
indices = (indices + offset) % total_leds
|
||||
self._line_indices.append(indices)
|
||||
led_start += line.led_count
|
||||
|
||||
# Skip arrays (same logic as PixelMapper)
|
||||
skip_start = calibration.skip_leds_start
|
||||
skip_end = calibration.skip_leds_end
|
||||
self._skip_start = skip_start
|
||||
self._skip_end = skip_end
|
||||
self._active_count = max(0, total_leds - skip_start - skip_end)
|
||||
if 0 < self._active_count < total_leds:
|
||||
self._skip_src = np.linspace(0, total_leds - 1, self._active_count)
|
||||
self._skip_x = np.arange(total_leds, dtype=np.float64)
|
||||
self._skip_float = np.empty((total_leds, 3), dtype=np.float64)
|
||||
self._skip_resampled = np.empty((self._active_count, 3), dtype=np.uint8)
|
||||
else:
|
||||
self._skip_src = self._skip_x = self._skip_float = self._skip_resampled = None
|
||||
|
||||
# Per-line edge cache (keyed by line index to avoid collision)
|
||||
self._edge_cache: Dict[int, tuple] = {}
|
||||
|
||||
logger.info(
|
||||
f"Initialized advanced pixel mapper with {total_leds} LEDs, "
|
||||
f"{len(calibration.lines)} lines, {interpolation_mode} interpolation"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_edge_strip(
|
||||
frame: np.ndarray, edge: str, border_width: int,
|
||||
span_start: float, span_end: float,
|
||||
) -> np.ndarray:
|
||||
"""Extract a border strip from a frame for the given edge and span."""
|
||||
h, w = frame.shape[:2]
|
||||
bw = min(border_width, h // 4, w // 4)
|
||||
|
||||
if edge == "top":
|
||||
strip = frame[:bw, :, :]
|
||||
elif edge == "bottom":
|
||||
strip = frame[-bw:, :, :]
|
||||
elif edge == "right":
|
||||
strip = frame[:, -bw:, :]
|
||||
else: # left
|
||||
strip = frame[:, :bw, :]
|
||||
|
||||
# Apply span
|
||||
if span_start > 0.0 or span_end < 1.0:
|
||||
if edge in ("top", "bottom"):
|
||||
total_w = strip.shape[1]
|
||||
s, e = int(span_start * total_w), int(span_end * total_w)
|
||||
strip = strip[:, s:e, :]
|
||||
else:
|
||||
total_h = strip.shape[0]
|
||||
s, e = int(span_start * total_h), int(span_end * total_h)
|
||||
strip = strip[s:e, :, :]
|
||||
|
||||
return strip
|
||||
|
||||
def _map_edge_average(
|
||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int,
|
||||
cache_key: int,
|
||||
) -> np.ndarray:
|
||||
"""Vectorized average-color mapping (same algo as PixelMapper)."""
|
||||
if edge_name in ("top", "bottom"):
|
||||
axis = 0
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
axis = 1
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
cache = self._edge_cache.get(cache_key)
|
||||
if cache is None or cache[0] != edge_len:
|
||||
step = edge_len / led_count
|
||||
boundaries = (np.arange(led_count + 1, dtype=np.float64) * step).astype(np.int64)
|
||||
boundaries[1:] = np.maximum(boundaries[1:], boundaries[:-1] + 1)
|
||||
np.minimum(boundaries, edge_len, out=boundaries)
|
||||
starts = boundaries[:-1]
|
||||
ends = boundaries[1:]
|
||||
lengths = (ends - starts).reshape(-1, 1).astype(np.float64)
|
||||
cumsum_buf = np.empty((edge_len + 1, 3), dtype=np.float64)
|
||||
edge_1d_buf = np.empty((edge_len, 3), dtype=np.float64)
|
||||
cache = (edge_len, starts, ends, lengths, cumsum_buf, edge_1d_buf)
|
||||
self._edge_cache[cache_key] = cache
|
||||
|
||||
_, starts, ends, lengths, cumsum_buf, edge_1d_buf = cache
|
||||
np.mean(edge_pixels, axis=axis, out=edge_1d_buf)
|
||||
cumsum_buf[0] = 0
|
||||
np.cumsum(edge_1d_buf, axis=0, out=cumsum_buf[1:])
|
||||
segment_sums = cumsum_buf[ends] - cumsum_buf[starts]
|
||||
return np.clip(segment_sums / lengths, 0, 255).astype(np.uint8)
|
||||
|
||||
def _map_edge_fallback(
|
||||
self, edge_pixels: np.ndarray, edge_name: str, led_count: int,
|
||||
) -> np.ndarray:
|
||||
"""Per-LED color mapping for median/dominant modes."""
|
||||
if edge_name in ("top", "bottom"):
|
||||
edge_len = edge_pixels.shape[1]
|
||||
else:
|
||||
edge_len = edge_pixels.shape[0]
|
||||
|
||||
step = edge_len / led_count
|
||||
result = np.empty((led_count, 3), dtype=np.uint8)
|
||||
for i in range(led_count):
|
||||
start = int(i * step)
|
||||
end = max(start + 1, int((i + 1) * step))
|
||||
end = min(end, edge_len)
|
||||
if edge_name in ("top", "bottom"):
|
||||
segment = edge_pixels[:, start:end, :]
|
||||
else:
|
||||
segment = edge_pixels[start:end, :, :]
|
||||
result[i] = self._calc_color(segment)
|
||||
return result
|
||||
|
||||
def map_lines_to_leds(self, frames: Dict[str, np.ndarray]) -> np.ndarray:
|
||||
"""Map multi-source frames to LED colors using calibration lines.
|
||||
|
||||
Args:
|
||||
frames: Dict mapping picture_source_id to captured frame (H, W, 3) uint8
|
||||
|
||||
Returns:
|
||||
numpy array of shape (total_leds, 3), dtype uint8
|
||||
"""
|
||||
led_array = self._led_buf
|
||||
led_array[:] = 0
|
||||
|
||||
for i, line in enumerate(self.calibration.lines):
|
||||
frame = frames.get(line.picture_source_id)
|
||||
if frame is None:
|
||||
continue
|
||||
|
||||
edge_pixels = self._extract_edge_strip(
|
||||
frame, line.edge, line.border_width,
|
||||
line.span_start, line.span_end,
|
||||
)
|
||||
|
||||
if self._use_fast_avg:
|
||||
colors = self._map_edge_average(
|
||||
edge_pixels, line.edge, line.led_count, cache_key=i,
|
||||
)
|
||||
else:
|
||||
colors = self._map_edge_fallback(
|
||||
edge_pixels, line.edge, line.led_count,
|
||||
)
|
||||
|
||||
led_array[self._line_indices[i]] = colors
|
||||
|
||||
# Phase 3: Physical skip (same as PixelMapper)
|
||||
if self._skip_src is not None:
|
||||
np.copyto(self._skip_float, led_array, casting='unsafe')
|
||||
for ch in range(3):
|
||||
self._skip_resampled[:, ch] = np.round(
|
||||
np.interp(self._skip_src, self._skip_x, self._skip_float[:, ch])
|
||||
).astype(np.uint8)
|
||||
led_array[:] = 0
|
||||
end_idx = self._total_leds - self._skip_end
|
||||
led_array[self._skip_start:end_idx] = self._skip_resampled
|
||||
elif self._active_count <= 0:
|
||||
led_array[:] = 0
|
||||
|
||||
return led_array
|
||||
|
||||
|
||||
def create_pixel_mapper(
|
||||
calibration: CalibrationConfig,
|
||||
interpolation_mode: str = "average",
|
||||
):
|
||||
"""Factory: create the right mapper for the calibration mode."""
|
||||
if calibration.mode == "advanced":
|
||||
return AdvancedPixelMapper(calibration, interpolation_mode)
|
||||
return PixelMapper(calibration, interpolation_mode)
|
||||
|
||||
|
||||
def create_default_calibration(led_count: int) -> CalibrationConfig:
|
||||
"""Create a default calibration for a rectangular screen.
|
||||
|
||||
@@ -464,7 +724,35 @@ def calibration_from_dict(data: dict) -> CalibrationConfig:
|
||||
ValueError: If data is invalid
|
||||
"""
|
||||
try:
|
||||
mode = data.get("mode", "simple")
|
||||
|
||||
if mode == "advanced":
|
||||
lines_data = data.get("lines", [])
|
||||
lines = [
|
||||
CalibrationLine(
|
||||
picture_source_id=ld["picture_source_id"],
|
||||
edge=ld["edge"],
|
||||
led_count=ld["led_count"],
|
||||
span_start=ld.get("span_start", 0.0),
|
||||
span_end=ld.get("span_end", 1.0),
|
||||
reverse=ld.get("reverse", False),
|
||||
border_width=ld.get("border_width", 10),
|
||||
)
|
||||
for ld in lines_data
|
||||
]
|
||||
config = CalibrationConfig(
|
||||
mode="advanced",
|
||||
lines=lines,
|
||||
offset=data.get("offset", 0),
|
||||
skip_leds_start=data.get("skip_leds_start", 0),
|
||||
skip_leds_end=data.get("skip_leds_end", 0),
|
||||
)
|
||||
config.validate()
|
||||
return config
|
||||
|
||||
# Simple mode (backward compat — missing "mode" key defaults here)
|
||||
config = CalibrationConfig(
|
||||
mode="simple",
|
||||
layout=data["layout"],
|
||||
start_position=data["start_position"],
|
||||
offset=data.get("offset", 0),
|
||||
@@ -505,7 +793,33 @@ def calibration_to_dict(config: CalibrationConfig) -> dict:
|
||||
Returns:
|
||||
Dictionary representation
|
||||
"""
|
||||
if config.mode == "advanced":
|
||||
result: dict = {
|
||||
"mode": "advanced",
|
||||
"lines": [
|
||||
{
|
||||
"picture_source_id": line.picture_source_id,
|
||||
"edge": line.edge,
|
||||
"led_count": line.led_count,
|
||||
"span_start": line.span_start,
|
||||
"span_end": line.span_end,
|
||||
"reverse": line.reverse,
|
||||
"border_width": line.border_width,
|
||||
}
|
||||
for line in config.lines
|
||||
],
|
||||
}
|
||||
if config.offset != 0:
|
||||
result["offset"] = config.offset
|
||||
if config.skip_leds_start > 0:
|
||||
result["skip_leds_start"] = config.skip_leds_start
|
||||
if config.skip_leds_end > 0:
|
||||
result["skip_leds_end"] = config.skip_leds_end
|
||||
return result
|
||||
|
||||
# Simple mode
|
||||
result = {
|
||||
"mode": "simple",
|
||||
"layout": config.layout,
|
||||
"start_position": config.start_position,
|
||||
"offset": config.offset,
|
||||
|
||||
@@ -19,7 +19,7 @@ from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
from wled_controller.core.capture.calibration import CalibrationConfig, PixelMapper
|
||||
from wled_controller.core.capture.calibration import CalibrationConfig, PixelMapper, AdvancedPixelMapper, create_pixel_mapper
|
||||
from wled_controller.core.capture.screen_capture import extract_border_pixels
|
||||
from wled_controller.core.processing.live_stream import LiveStream
|
||||
from wled_controller.utils import get_logger
|
||||
@@ -151,15 +151,22 @@ class PictureColorStripStream(ColorStripStream):
|
||||
restarting the thread (except when the underlying LiveStream changes).
|
||||
"""
|
||||
|
||||
def __init__(self, live_stream: LiveStream, source):
|
||||
def __init__(self, live_stream, source):
|
||||
"""
|
||||
Args:
|
||||
live_stream: Acquired LiveStream (lifecycle managed by ColorStripStreamManager)
|
||||
live_stream: Acquired LiveStream or Dict[str, LiveStream] for advanced mode.
|
||||
Lifecycle managed by ColorStripStreamManager.
|
||||
source: PictureColorStripSource config
|
||||
"""
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
||||
|
||||
self._live_stream = live_stream
|
||||
# Support both single LiveStream and dict of streams (advanced mode)
|
||||
if isinstance(live_stream, dict):
|
||||
self._live_streams = live_stream
|
||||
self._live_stream = next(iter(live_stream.values()))
|
||||
else:
|
||||
self._live_streams = {}
|
||||
self._live_stream = live_stream
|
||||
self._fps: int = 30 # internal capture rate (send FPS is on the target)
|
||||
self._smoothing: float = source.smoothing
|
||||
self._brightness: float = source.brightness
|
||||
@@ -167,7 +174,7 @@ class PictureColorStripStream(ColorStripStream):
|
||||
self._gamma: float = source.gamma
|
||||
self._interpolation_mode: str = source.interpolation_mode
|
||||
self._calibration: CalibrationConfig = source.calibration
|
||||
self._pixel_mapper = PixelMapper(
|
||||
self._pixel_mapper = create_pixel_mapper(
|
||||
self._calibration, interpolation_mode=self._interpolation_mode
|
||||
)
|
||||
cal_leds = self._calibration.get_total_leds()
|
||||
@@ -201,6 +208,8 @@ class PictureColorStripStream(ColorStripStream):
|
||||
|
||||
@property
|
||||
def display_index(self) -> Optional[int]:
|
||||
if self._live_streams:
|
||||
return None # multi-source, ambiguous
|
||||
return self._live_stream.display_index
|
||||
|
||||
@property
|
||||
@@ -255,9 +264,9 @@ class PictureColorStripStream(ColorStripStream):
|
||||
|
||||
PixelMapper is rebuilt atomically if calibration or interpolation_mode changed.
|
||||
"""
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource, AdvancedPictureColorStripSource
|
||||
|
||||
if not isinstance(source, PictureColorStripSource):
|
||||
if not isinstance(source, (PictureColorStripSource, AdvancedPictureColorStripSource)):
|
||||
return
|
||||
|
||||
self._smoothing = source.smoothing
|
||||
@@ -277,7 +286,7 @@ class PictureColorStripStream(ColorStripStream):
|
||||
self._calibration = source.calibration
|
||||
cal_leds = source.calibration.get_total_leds()
|
||||
self._led_count = source.led_count if source.led_count > 0 else cal_leds
|
||||
self._pixel_mapper = PixelMapper(
|
||||
self._pixel_mapper = create_pixel_mapper(
|
||||
source.calibration, interpolation_mode=source.interpolation_mode
|
||||
)
|
||||
self._previous_colors = None # Reset smoothing history on calibration change
|
||||
@@ -375,10 +384,21 @@ class PictureColorStripStream(ColorStripStream):
|
||||
t0 = time.perf_counter()
|
||||
|
||||
calibration = self._calibration
|
||||
border_pixels = extract_border_pixels(frame, calibration.border_width)
|
||||
t1 = time.perf_counter()
|
||||
mapper = self._pixel_mapper
|
||||
|
||||
led_colors = self._pixel_mapper.map_border_to_leds(border_pixels)
|
||||
if isinstance(mapper, AdvancedPixelMapper):
|
||||
# Advanced mode: gather frames from all live streams
|
||||
frames_dict = {}
|
||||
for ps_id, ls in self._live_streams.items():
|
||||
f = ls.get_latest_frame()
|
||||
if f is not None:
|
||||
frames_dict[ps_id] = f
|
||||
t1 = time.perf_counter()
|
||||
led_colors = mapper.map_lines_to_leds(frames_dict)
|
||||
else:
|
||||
border_pixels = extract_border_pixels(frame, calibration.border_width)
|
||||
t1 = time.perf_counter()
|
||||
led_colors = mapper.map_border_to_leds(border_pixels)
|
||||
t2 = time.perf_counter()
|
||||
|
||||
# Ensure scratch pool is sized for this frame
|
||||
|
||||
@@ -42,12 +42,14 @@ class _ColorStripEntry:
|
||||
|
||||
stream: ColorStripStream
|
||||
ref_count: int
|
||||
# ID of the picture source whose LiveStream we acquired (for release)
|
||||
picture_source_id: str
|
||||
# IDs of picture sources whose LiveStreams we acquired (for release)
|
||||
picture_source_ids: list = None
|
||||
# Per-consumer target FPS values (target_id → fps)
|
||||
target_fps: Dict[str, int] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.picture_source_ids is None:
|
||||
self.picture_source_ids = []
|
||||
if self.target_fps is None:
|
||||
self.target_fps = {}
|
||||
|
||||
@@ -155,7 +157,7 @@ class ColorStripStreamManager:
|
||||
css_stream.start()
|
||||
key = f"{css_id}:{consumer_id}" if consumer_id else css_id
|
||||
self._streams[key] = _ColorStripEntry(
|
||||
stream=css_stream, ref_count=1, picture_source_id="",
|
||||
stream=css_stream, ref_count=1, picture_source_ids=[],
|
||||
)
|
||||
logger.info(f"Created {source.source_type} stream {key}")
|
||||
return css_stream
|
||||
@@ -167,26 +169,47 @@ class ColorStripStreamManager:
|
||||
logger.info(f"Reusing stream {css_id} (ref_count={entry.ref_count})")
|
||||
return entry.stream
|
||||
|
||||
# Create new picture stream — needs a LiveStream from the capture pipeline
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
||||
if not isinstance(source, PictureColorStripSource):
|
||||
# Create new picture stream — needs LiveStream(s) from the capture pipeline
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource, AdvancedPictureColorStripSource
|
||||
if not isinstance(source, (PictureColorStripSource, AdvancedPictureColorStripSource)):
|
||||
raise ValueError(
|
||||
f"Unsupported sharable source type '{source.source_type}' for {css_id}"
|
||||
)
|
||||
|
||||
if not source.picture_source_id:
|
||||
raise ValueError(
|
||||
f"Color strip source {css_id} has no picture_source_id assigned"
|
||||
)
|
||||
|
||||
# Acquire the underlying live stream (ref-counted)
|
||||
live_stream = self._live_stream_manager.acquire(source.picture_source_id)
|
||||
# Determine required picture sources based on calibration mode
|
||||
required_ps_ids = source.calibration.get_required_picture_source_ids()
|
||||
if not required_ps_ids:
|
||||
# Simple mode: use the CSS source's single picture_source_id
|
||||
ps_id = getattr(source, "picture_source_id", "")
|
||||
if not ps_id:
|
||||
raise ValueError(
|
||||
f"Color strip source {css_id} has no picture_source_id assigned"
|
||||
)
|
||||
required_ps_ids = [ps_id]
|
||||
|
||||
# Acquire all required live streams (with rollback on failure)
|
||||
acquired = {}
|
||||
try:
|
||||
css_stream = PictureColorStripStream(live_stream, source)
|
||||
for ps_id in required_ps_ids:
|
||||
acquired[ps_id] = self._live_stream_manager.acquire(ps_id)
|
||||
except Exception as e:
|
||||
for ps_id in acquired:
|
||||
self._live_stream_manager.release(ps_id)
|
||||
raise ValueError(
|
||||
f"Failed to acquire live streams for source {css_id}: {e}"
|
||||
) from e
|
||||
|
||||
# Create stream (single LiveStream for simple, dict for advanced)
|
||||
try:
|
||||
if len(acquired) == 1 and source.calibration.mode == "simple":
|
||||
live_arg = next(iter(acquired.values()))
|
||||
else:
|
||||
live_arg = acquired
|
||||
css_stream = PictureColorStripStream(live_arg, source)
|
||||
css_stream.start()
|
||||
except Exception as e:
|
||||
self._live_stream_manager.release(source.picture_source_id)
|
||||
for ps_id in acquired:
|
||||
self._live_stream_manager.release(ps_id)
|
||||
raise RuntimeError(
|
||||
f"Failed to start color strip stream for source {css_id}: {e}"
|
||||
) from e
|
||||
@@ -194,7 +217,7 @@ class ColorStripStreamManager:
|
||||
self._streams[css_id] = _ColorStripEntry(
|
||||
stream=css_stream,
|
||||
ref_count=1,
|
||||
picture_source_id=source.picture_source_id,
|
||||
picture_source_ids=list(acquired.keys()),
|
||||
)
|
||||
|
||||
logger.info(f"Created picture color strip stream {css_id}")
|
||||
@@ -229,13 +252,13 @@ class ColorStripStreamManager:
|
||||
source_id = key.split(":")[0] if ":" in key else key
|
||||
self._release_clock(source_id, entry.stream)
|
||||
|
||||
picture_source_id = entry.picture_source_id
|
||||
picture_source_ids = entry.picture_source_ids
|
||||
del self._streams[key]
|
||||
logger.info(f"Removed color strip stream {key}")
|
||||
|
||||
# Release the underlying live stream (not needed for static sources)
|
||||
if picture_source_id:
|
||||
self._live_stream_manager.release(picture_source_id)
|
||||
# Release all underlying live streams
|
||||
for ps_id in picture_source_ids:
|
||||
self._live_stream_manager.release(ps_id)
|
||||
|
||||
def update_source(self, css_id: str, new_source) -> None:
|
||||
"""Hot-update processing params on all running streams for a source.
|
||||
@@ -280,15 +303,19 @@ class ColorStripStreamManager:
|
||||
source_id = key.split(":")[0] if ":" in key else key
|
||||
self._release_clock(source_id, entry.stream)
|
||||
|
||||
# Track picture_source_id change for future reference counting
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource
|
||||
if isinstance(new_source, PictureColorStripSource):
|
||||
# Track picture source changes for future reference counting
|
||||
from wled_controller.storage.color_strip_source import PictureColorStripSource, AdvancedPictureColorStripSource
|
||||
if isinstance(new_source, (PictureColorStripSource, AdvancedPictureColorStripSource)):
|
||||
new_ps_ids = new_source.calibration.get_required_picture_source_ids()
|
||||
if not new_ps_ids:
|
||||
ps_id = getattr(new_source, "picture_source_id", "")
|
||||
new_ps_ids = [ps_id] if ps_id else []
|
||||
for key in matching_keys:
|
||||
entry = self._streams[key]
|
||||
if new_source.picture_source_id != entry.picture_source_id:
|
||||
if set(new_ps_ids) != set(entry.picture_source_ids):
|
||||
logger.info(
|
||||
f"CSS {css_id}: picture_source_id changed — "
|
||||
f"restart target to use new source"
|
||||
f"CSS {css_id}: picture source set changed — "
|
||||
f"restart target to use new sources"
|
||||
)
|
||||
break
|
||||
|
||||
|
||||
Reference in New Issue
Block a user