Refactor core/ into logical sub-packages and split filter files

Reorganize the flat core/ directory (17 files) into three sub-packages:
- core/devices/ — LED device communication (led_client, wled/adalight clients, providers, DDP)
- core/processing/ — target processing pipeline (processor_manager, target processors, live streams, settings)
- core/capture/ — screen capture & calibration (screen_capture, calibration, pixel_processor, overlay)

Also split the monolithic filters/builtin.py (460 lines, 8 filters) into
individual files: brightness, saturation, gamma, downscaler, pixelate,
auto_crop, flip, color_correction.

Includes the ProcessorManager refactor from target-centric architecture:
ProcessorManager slimmed from ~1600 to ~490 lines with unified
_processors dict replacing duplicate _targets/_kc_targets dicts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-18 12:03:29 +03:00
parent 77dd342c4c
commit fc779eef39
50 changed files with 2740 additions and 2267 deletions
@@ -0,0 +1,25 @@
"""Screen capture and calibration."""
from wled_controller.core.capture.screen_capture import (
BorderPixels,
ScreenCapture,
capture_display,
extract_border_pixels,
get_available_displays,
)
from wled_controller.core.capture.calibration import (
CalibrationConfig,
PixelMapper,
create_default_calibration,
)
__all__ = [
"BorderPixels",
"CalibrationConfig",
"PixelMapper",
"ScreenCapture",
"capture_display",
"create_default_calibration",
"extract_border_pixels",
"get_available_displays",
]
@@ -0,0 +1,515 @@
"""Calibration system for mapping screen pixels to LED positions."""
from dataclasses import dataclass, field
from typing import Dict, List, Literal, Tuple
import numpy as np
from wled_controller.core.capture.screen_capture import (
BorderPixels,
get_edge_segments,
calculate_average_color,
calculate_median_color,
calculate_dominant_color,
)
from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Edge traversal order for each (start_position, layout) combination.
# Determines which edge comes first when walking around the screen.
EDGE_ORDER: Dict[Tuple[str, str], List[str]] = {
("bottom_left", "clockwise"): ["left", "top", "right", "bottom"],
("bottom_left", "counterclockwise"): ["bottom", "right", "top", "left"],
("bottom_right", "clockwise"): ["bottom", "left", "top", "right"],
("bottom_right", "counterclockwise"): ["right", "top", "left", "bottom"],
("top_left", "clockwise"): ["top", "right", "bottom", "left"],
("top_left", "counterclockwise"): ["left", "bottom", "right", "top"],
("top_right", "clockwise"): ["right", "bottom", "left", "top"],
("top_right", "counterclockwise"): ["top", "left", "bottom", "right"],
}
# Whether LEDs are reversed on each edge for each (start_position, layout) combination.
EDGE_REVERSE: Dict[Tuple[str, str], Dict[str, bool]] = {
("bottom_left", "clockwise"): {"left": True, "top": False, "right": False, "bottom": True},
("bottom_left", "counterclockwise"): {"bottom": False, "right": True, "top": True, "left": False},
("bottom_right", "clockwise"): {"bottom": True, "left": True, "top": False, "right": False},
("bottom_right", "counterclockwise"): {"right": True, "top": True, "left": False, "bottom": False},
("top_left", "clockwise"): {"top": False, "right": False, "bottom": True, "left": True},
("top_left", "counterclockwise"): {"left": False, "bottom": False, "right": True, "top": True},
("top_right", "clockwise"): {"right": False, "bottom": True, "left": True, "top": False},
("top_right", "counterclockwise"): {"top": True, "left": False, "bottom": False, "right": True},
}
@dataclass
class CalibrationSegment:
"""Configuration for one segment of the LED strip."""
edge: Literal["top", "right", "bottom", "left"]
led_start: int
led_count: int
reverse: bool = False
@dataclass
class CalibrationConfig:
"""Complete calibration configuration.
Stores only the core parameters. Segments (with led_start, reverse, edge order)
are derived at runtime via the `segments` property.
"""
layout: Literal["clockwise", "counterclockwise"]
start_position: Literal["top_left", "top_right", "bottom_left", "bottom_right"]
offset: int = 0
leds_top: int = 0
leds_right: int = 0
leds_bottom: int = 0
leds_left: int = 0
# Per-edge span: fraction of screen side covered by LEDs (0.01.0)
span_top_start: float = 0.0
span_top_end: float = 1.0
span_right_start: float = 0.0
span_right_end: float = 1.0
span_bottom_start: float = 0.0
span_bottom_end: float = 1.0
span_left_start: float = 0.0
span_left_end: float = 1.0
# Skip LEDs: black out N LEDs at the start/end of the strip
skip_leds_start: int = 0
skip_leds_end: int = 0
# Border width: how many pixels from the screen edge to sample
border_width: int = 10
def build_segments(self) -> List[CalibrationSegment]:
"""Derive segment list from core parameters."""
key = (self.start_position, self.layout)
edge_order = EDGE_ORDER.get(key, ["bottom", "right", "top", "left"])
reverse_map = EDGE_REVERSE.get(key, {})
led_counts = {
"top": self.leds_top,
"right": self.leds_right,
"bottom": self.leds_bottom,
"left": self.leds_left,
}
segments = []
led_start = 0
for edge in edge_order:
count = led_counts[edge]
if count > 0:
segments.append(CalibrationSegment(
edge=edge,
led_start=led_start,
led_count=count,
reverse=reverse_map.get(edge, False),
))
led_start += count
return segments
@property
def segments(self) -> List[CalibrationSegment]:
"""Get derived segment list."""
return self.build_segments()
def get_edge_span(self, edge: str) -> tuple[float, float]:
"""Get span (start, end) for a given edge."""
return (
getattr(self, f"span_{edge}_start", 0.0),
getattr(self, f"span_{edge}_end", 1.0),
)
def validate(self) -> bool:
"""Validate calibration configuration.
Returns:
True if configuration is valid
Raises:
ValueError: If configuration is invalid
"""
total = self.get_total_leds()
if total <= 0:
raise ValueError("Calibration must have at least one LED")
for edge, count in [("top", self.leds_top), ("right", self.leds_right),
("bottom", self.leds_bottom), ("left", self.leds_left)]:
if count < 0:
raise ValueError(f"LED count for {edge} must be non-negative, got {count}")
for edge in ["top", "right", "bottom", "left"]:
start, end = self.get_edge_span(edge)
if not (0.0 <= start <= 1.0) or not (0.0 <= end <= 1.0):
raise ValueError(f"Span for {edge} must be in [0.0, 1.0], got ({start}, {end})")
if end <= start:
raise ValueError(f"Span end must be greater than start for {edge}, got ({start}, {end})")
return True
def get_total_leds(self) -> int:
"""Get total number of LEDs across all edges."""
return self.leds_top + self.leds_right + self.leds_bottom + self.leds_left
def get_segment_for_edge(self, edge: str) -> CalibrationSegment | None:
"""Get segment configuration for a specific edge."""
for seg in self.segments:
if seg.edge == edge:
return seg
return None
class PixelMapper:
"""Maps screen border pixels to LED colors based on calibration."""
def __init__(
self,
calibration: CalibrationConfig,
interpolation_mode: Literal["average", "median", "dominant"] = "average",
):
"""Initialize pixel mapper.
Args:
calibration: Calibration configuration
interpolation_mode: Color calculation mode
"""
self.calibration = calibration
self.interpolation_mode = interpolation_mode
# Validate calibration
self.calibration.validate()
# Select color calculation function
if interpolation_mode == "average":
self._calc_color = calculate_average_color
elif interpolation_mode == "median":
self._calc_color = calculate_median_color
elif interpolation_mode == "dominant":
self._calc_color = calculate_dominant_color
else:
raise ValueError(f"Invalid interpolation mode: {interpolation_mode}")
logger.info(
f"Initialized pixel mapper with {self.calibration.get_total_leds()} LEDs "
f"using {interpolation_mode} interpolation"
)
def _get_edge_pixels(self, border_pixels: BorderPixels, edge_name: str) -> np.ndarray:
"""Get edge pixel array with span slicing applied."""
if edge_name == "top":
edge_pixels = border_pixels.top
elif edge_name == "right":
edge_pixels = border_pixels.right
elif edge_name == "bottom":
edge_pixels = border_pixels.bottom
else:
edge_pixels = border_pixels.left
span_start, span_end = self.calibration.get_edge_span(edge_name)
if span_start > 0.0 or span_end < 1.0:
if edge_name in ("top", "bottom"):
total_w = edge_pixels.shape[1]
s, e = int(span_start * total_w), int(span_end * total_w)
edge_pixels = edge_pixels[:, s:e, :]
else:
total_h = edge_pixels.shape[0]
s, e = int(span_start * total_h), int(span_end * total_h)
edge_pixels = edge_pixels[s:e, :, :]
return edge_pixels
def _map_edge_average(
self, edge_pixels: np.ndarray, edge_name: str, led_count: int
) -> np.ndarray:
"""Vectorized average-color mapping for one edge. Returns (led_count, 3) uint8."""
# Reduce border dimension → 1D array of shape (edge_length, 3)
if edge_name in ("top", "bottom"):
edge_1d = edge_pixels.mean(axis=0) # mean across border_width
else:
edge_1d = edge_pixels.mean(axis=1) # mean across border_width
edge_len = edge_1d.shape[0]
# Compute segment boundaries (matching get_edge_segments float stepping)
step = edge_len / led_count
boundaries = np.empty(led_count + 1, dtype=np.int64)
for i in range(led_count + 1):
boundaries[i] = int(i * step)
# Ensure each segment has at least 1 pixel
for i in range(led_count):
if boundaries[i + 1] <= boundaries[i]:
boundaries[i + 1] = boundaries[i] + 1
# Clamp all boundaries to edge_len (not just the last one)
boundaries = np.minimum(boundaries, edge_len)
# Cumulative sum for O(1) range means — no per-LED Python numpy calls
cumsum = np.zeros((edge_len + 1, 3), dtype=np.float64)
cumsum[1:] = np.cumsum(edge_1d.astype(np.float64), axis=0)
starts = boundaries[:-1]
ends = boundaries[1:]
lengths = (ends - starts).reshape(-1, 1).astype(np.float64)
segment_sums = cumsum[ends] - cumsum[starts]
return np.clip(segment_sums / lengths, 0, 255).astype(np.uint8)
def map_border_to_leds(
self,
border_pixels: BorderPixels
) -> np.ndarray:
"""Map screen border pixels to LED colors.
Args:
border_pixels: Extracted border pixels from screen
Returns:
numpy array of shape (total_leds, 3), dtype uint8
Raises:
ValueError: If border pixels don't match calibration
"""
total_leds = self.calibration.get_total_leds()
skip_start = self.calibration.skip_leds_start
skip_end = self.calibration.skip_leds_end
active_count = max(0, total_leds - skip_start - skip_end)
use_fast_avg = self.interpolation_mode == "average"
# Phase 1: Map full perimeter to total_leds positions
if use_fast_avg:
led_array = np.zeros((total_leds, 3), dtype=np.uint8)
else:
led_colors = [(0, 0, 0)] * total_leds
for edge_name in ["top", "right", "bottom", "left"]:
segment = self.calibration.get_segment_for_edge(edge_name)
if not segment:
continue
edge_pixels = self._get_edge_pixels(border_pixels, edge_name)
if use_fast_avg:
# Vectorized: compute all LED colors for this edge at once
colors = self._map_edge_average(
edge_pixels, edge_name, segment.led_count
)
led_indices = np.arange(segment.led_start, segment.led_start + segment.led_count)
if segment.reverse:
led_indices = led_indices[::-1]
led_array[led_indices] = colors
else:
# Per-LED fallback for median/dominant modes
try:
pixel_segments = get_edge_segments(
edge_pixels, segment.led_count, edge_name
)
except ValueError as e:
logger.error(f"Failed to segment {edge_name} edge: {e}")
raise
led_indices = list(range(segment.led_start, segment.led_start + segment.led_count))
if segment.reverse:
led_indices = list(reversed(led_indices))
for led_idx, pixel_segment in zip(led_indices, pixel_segments):
color = self._calc_color(pixel_segment)
led_colors[led_idx] = color
# Phase 2: Offset rotation
offset = self.calibration.offset % total_leds if total_leds > 0 else 0
if use_fast_avg:
if offset > 0:
led_array = np.roll(led_array, offset, axis=0)
# Phase 3: Physical skip — resample full perimeter to active LEDs
# Maps the entire screen to active_count positions so each active LED
# covers a proportionally larger slice of the perimeter.
if active_count > 0 and active_count < total_leds:
src = np.linspace(0, total_leds - 1, active_count)
full_f = led_array.astype(np.float64)
x = np.arange(total_leds, dtype=np.float64)
resampled = np.empty((active_count, 3), dtype=np.uint8)
for ch in range(3):
resampled[:, ch] = np.round(
np.interp(src, x, full_f[:, ch])
).astype(np.uint8)
led_array[:] = 0
end_idx = total_leds - skip_end
led_array[skip_start:end_idx] = resampled
elif active_count <= 0:
led_array[:] = 0
return led_array
else:
if offset > 0:
led_colors = led_colors[total_leds - offset:] + led_colors[:total_leds - offset]
# Phase 3: Physical skip — resample full perimeter to active LEDs
if active_count > 0 and active_count < total_leds:
arr = np.array(led_colors, dtype=np.float64)
src = np.linspace(0, total_leds - 1, active_count)
x = np.arange(total_leds, dtype=np.float64)
resampled = np.empty((active_count, 3), dtype=np.float64)
for ch in range(3):
resampled[:, ch] = np.interp(src, x, arr[:, ch])
led_colors = [(0, 0, 0)] * total_leds
for i in range(active_count):
r, g, b = resampled[i]
led_colors[skip_start + i] = (int(round(r)), int(round(g)), int(round(b)))
elif active_count <= 0:
led_colors = [(0, 0, 0)] * total_leds
return np.array(led_colors, dtype=np.uint8)
def test_calibration(self, edge: str, color: Tuple[int, int, int]) -> List[Tuple[int, int, int]]:
"""Generate test pattern to light up specific edge.
Useful for verifying calibration configuration.
Args:
edge: Edge to light up (top, right, bottom, left)
color: RGB color to use
Returns:
List of LED colors with only the specified edge lit
Raises:
ValueError: If edge is not in calibration
"""
segment = self.calibration.get_segment_for_edge(edge)
if not segment:
raise ValueError(f"Edge '{edge}' not found in calibration")
total_leds = self.calibration.get_total_leds()
led_colors = [(0, 0, 0)] * total_leds
# Light up the specified edge
led_indices = range(segment.led_start, segment.led_start + segment.led_count)
for led_idx in led_indices:
led_colors[led_idx] = color
logger.info(f"Generated test pattern for {edge} edge with color {color}")
return led_colors
def create_default_calibration(led_count: int) -> CalibrationConfig:
"""Create a default calibration for a rectangular screen.
Assumes LEDs are evenly distributed around the screen edges in clockwise order
starting from bottom-left.
Args:
led_count: Total number of LEDs
Returns:
Default calibration configuration
"""
if led_count < 4:
raise ValueError("Need at least 4 LEDs for default calibration")
# Distribute LEDs evenly across 4 edges
leds_per_edge = led_count // 4
remainder = led_count % 4
# Distribute remainder to longer edges (bottom and top)
bottom_count = leds_per_edge + (1 if remainder > 0 else 0)
right_count = leds_per_edge
top_count = leds_per_edge + (1 if remainder > 1 else 0)
left_count = leds_per_edge + (1 if remainder > 2 else 0)
config = CalibrationConfig(
layout="clockwise",
start_position="bottom_left",
leds_bottom=bottom_count,
leds_right=right_count,
leds_top=top_count,
leds_left=left_count,
)
logger.info(
f"Created default calibration for {led_count} LEDs: "
f"bottom={bottom_count}, right={right_count}, "
f"top={top_count}, left={left_count}"
)
return config
def calibration_from_dict(data: dict) -> CalibrationConfig:
"""Create calibration configuration from dictionary.
Args:
data: Dictionary with calibration data
Returns:
CalibrationConfig instance
Raises:
ValueError: If data is invalid
"""
try:
config = CalibrationConfig(
layout=data["layout"],
start_position=data["start_position"],
offset=data.get("offset", 0),
leds_top=data.get("leds_top", 0),
leds_right=data.get("leds_right", 0),
leds_bottom=data.get("leds_bottom", 0),
leds_left=data.get("leds_left", 0),
span_top_start=data.get("span_top_start", 0.0),
span_top_end=data.get("span_top_end", 1.0),
span_right_start=data.get("span_right_start", 0.0),
span_right_end=data.get("span_right_end", 1.0),
span_bottom_start=data.get("span_bottom_start", 0.0),
span_bottom_end=data.get("span_bottom_end", 1.0),
span_left_start=data.get("span_left_start", 0.0),
span_left_end=data.get("span_left_end", 1.0),
skip_leds_start=data.get("skip_leds_start", 0),
skip_leds_end=data.get("skip_leds_end", 0),
border_width=data.get("border_width", 10),
)
config.validate()
return config
except KeyError as e:
raise ValueError(f"Missing required calibration field: {e}")
except Exception as e:
if isinstance(e, ValueError):
raise
raise ValueError(f"Invalid calibration data: {e}")
def calibration_to_dict(config: CalibrationConfig) -> dict:
"""Convert calibration configuration to dictionary.
Args:
config: Calibration configuration
Returns:
Dictionary representation
"""
result = {
"layout": config.layout,
"start_position": config.start_position,
"offset": config.offset,
"leds_top": config.leds_top,
"leds_right": config.leds_right,
"leds_bottom": config.leds_bottom,
"leds_left": config.leds_left,
}
# Include span fields only when not default (full coverage)
for edge in ["top", "right", "bottom", "left"]:
start = getattr(config, f"span_{edge}_start")
end = getattr(config, f"span_{edge}_end")
if start != 0.0 or end != 1.0:
result[f"span_{edge}_start"] = start
result[f"span_{edge}_end"] = end
# Include skip fields only when non-default
if config.skip_leds_start > 0:
result["skip_leds_start"] = config.skip_leds_start
if config.skip_leds_end > 0:
result["skip_leds_end"] = config.skip_leds_end
if config.border_width != 10:
result["border_width"] = config.border_width
return result
@@ -0,0 +1,166 @@
"""Pixel processing utilities for color correction and manipulation."""
from typing import List, Tuple
import numpy as np
from wled_controller.utils import get_logger
logger = get_logger(__name__)
def apply_color_correction(
colors: List[Tuple[int, int, int]],
gamma: float = 2.2,
saturation: float = 1.0,
brightness: float = 1.0,
) -> List[Tuple[int, int, int]]:
"""Apply color correction to LED colors.
Args:
colors: List of (R, G, B) tuples
gamma: Gamma correction factor (default 2.2)
saturation: Saturation multiplier (0.0-2.0)
brightness: Brightness multiplier (0.0-1.0)
Returns:
Corrected list of (R, G, B) tuples
"""
if not colors:
return colors
# Convert to numpy array for efficient processing
colors_array = np.array(colors, dtype=np.float32) / 255.0
# Apply brightness
if brightness != 1.0:
colors_array *= brightness
# Apply saturation
if saturation != 1.0:
# Convert RGB to HSV-like saturation adjustment
# Calculate luminance (grayscale)
luminance = np.dot(colors_array, [0.299, 0.587, 0.114])
luminance = luminance[:, np.newaxis] # Reshape for broadcasting
# Blend between grayscale and color based on saturation
colors_array = luminance + (colors_array - luminance) * saturation
# Apply gamma correction
if gamma != 1.0:
colors_array = np.power(colors_array, 1.0 / gamma)
# Clamp to valid range and convert back to integers
colors_array = np.clip(colors_array * 255.0, 0, 255).astype(np.uint8)
# Convert back to list of tuples
corrected_colors = [tuple(color) for color in colors_array]
return corrected_colors
def smooth_colors(
current_colors: List[Tuple[int, int, int]],
previous_colors: List[Tuple[int, int, int]],
smoothing_factor: float = 0.5,
) -> List[Tuple[int, int, int]]:
"""Smooth color transitions between frames.
Args:
current_colors: Current frame colors
previous_colors: Previous frame colors
smoothing_factor: Smoothing amount (0.0-1.0, where 0=no smoothing, 1=full smoothing)
Returns:
Smoothed colors
"""
if not current_colors or not previous_colors:
return current_colors
if len(current_colors) != len(previous_colors):
logger.warning(
f"Color count mismatch: current={len(current_colors)}, "
f"previous={len(previous_colors)}. Skipping smoothing."
)
return current_colors
if smoothing_factor <= 0:
return current_colors
if smoothing_factor >= 1:
return previous_colors
# Convert to numpy arrays
current = np.array(current_colors, dtype=np.float32)
previous = np.array(previous_colors, dtype=np.float32)
# Blend between current and previous
smoothed = current * (1 - smoothing_factor) + previous * smoothing_factor
# Convert back to integers
smoothed = np.clip(smoothed, 0, 255).astype(np.uint8)
return [tuple(color) for color in smoothed]
def adjust_brightness_global(
colors: List[Tuple[int, int, int]],
target_brightness: int,
) -> List[Tuple[int, int, int]]:
"""Adjust colors to achieve target global brightness.
Args:
colors: List of (R, G, B) tuples
target_brightness: Target brightness (0-255)
Returns:
Adjusted colors
"""
if not colors or target_brightness == 255:
return colors
# Calculate scaling factor
scale = target_brightness / 255.0
# Scale all colors
scaled = [
(
int(r * scale),
int(g * scale),
int(b * scale),
)
for r, g, b in colors
]
return scaled
def limit_brightness(
colors: List[Tuple[int, int, int]],
max_brightness: int = 255,
) -> List[Tuple[int, int, int]]:
"""Limit maximum brightness of any color channel.
Args:
colors: List of (R, G, B) tuples
max_brightness: Maximum allowed brightness (0-255)
Returns:
Limited colors
"""
if not colors or max_brightness == 255:
return colors
limited = []
for r, g, b in colors:
# Find max channel value
max_val = max(r, g, b)
if max_val > max_brightness:
# Scale down proportionally
scale = max_brightness / max_val
r = int(r * scale)
g = int(g * scale)
b = int(b * scale)
limited.append((r, g, b))
return limited
@@ -0,0 +1,335 @@
"""Screen capture functionality using mss library."""
from dataclasses import dataclass
from typing import Dict, List
import mss
import numpy as np
from PIL import Image
from wled_controller.utils import get_logger, get_monitor_names, get_monitor_refresh_rates
logger = get_logger(__name__)
@dataclass
class DisplayInfo:
"""Information about a display/monitor."""
index: int
name: str
width: int
height: int
x: int
y: int
is_primary: bool
refresh_rate: int # in Hz
@dataclass
class ScreenCapture:
"""Captured screen image data."""
image: np.ndarray
width: int
height: int
display_index: int
@dataclass
class BorderPixels:
"""Border pixels extracted from screen edges."""
top: np.ndarray
right: np.ndarray
bottom: np.ndarray
left: np.ndarray
def get_available_displays() -> List[DisplayInfo]:
"""Get list of available displays/monitors.
Returns:
List of DisplayInfo objects for each available monitor
Raises:
RuntimeError: If unable to detect displays
"""
try:
# Get friendly monitor names (Windows only, falls back to generic names)
monitor_names = get_monitor_names()
# Get monitor refresh rates (Windows only, falls back to 60Hz)
refresh_rates = get_monitor_refresh_rates()
with mss.mss() as sct:
displays = []
# Skip the first monitor (combined virtual screen on multi-monitor setups)
for idx, monitor in enumerate(sct.monitors[1:], start=0):
# Use friendly name from WMI if available, otherwise generic name
friendly_name = monitor_names.get(idx, f"Display {idx}")
# Use detected refresh rate or default to 60Hz
refresh_rate = refresh_rates.get(idx, 60)
display_info = DisplayInfo(
index=idx,
name=friendly_name,
width=monitor["width"],
height=monitor["height"],
x=monitor["left"],
y=monitor["top"],
is_primary=(idx == 0),
refresh_rate=refresh_rate,
)
displays.append(display_info)
logger.info(f"Detected {len(displays)} display(s)")
return displays
except Exception as e:
logger.error(f"Failed to detect displays: {e}")
raise RuntimeError(f"Failed to detect displays: {e}")
def capture_display(display_index: int = 0) -> ScreenCapture:
"""Capture the specified display.
Args:
display_index: Index of the display to capture (0-based)
Returns:
ScreenCapture object containing the captured image
Raises:
ValueError: If display_index is invalid
RuntimeError: If screen capture fails
"""
try:
with mss.mss() as sct:
# mss monitors[0] is the combined screen, monitors[1+] are individual displays
monitor_index = display_index + 1
if monitor_index >= len(sct.monitors):
raise ValueError(
f"Invalid display index {display_index}. "
f"Available displays: 0-{len(sct.monitors) - 2}"
)
monitor = sct.monitors[monitor_index]
# Capture screenshot
screenshot = sct.grab(monitor)
# Convert to numpy array (RGB)
img = Image.frombytes("RGB", screenshot.size, screenshot.rgb)
img_array = np.array(img)
logger.debug(
f"Captured display {display_index}: {monitor['width']}x{monitor['height']}"
)
return ScreenCapture(
image=img_array,
width=monitor["width"],
height=monitor["height"],
display_index=display_index,
)
except ValueError:
raise
except Exception as e:
logger.error(f"Failed to capture display {display_index}: {e}")
raise RuntimeError(f"Screen capture failed: {e}")
def extract_border_pixels(
screen_capture: ScreenCapture,
border_width: int = 10
) -> BorderPixels:
"""Extract border pixels from screen capture.
Args:
screen_capture: Captured screen image
border_width: Width of the border in pixels to extract
Returns:
BorderPixels object containing pixels from each edge
Raises:
ValueError: If border_width is invalid
"""
if border_width < 1:
raise ValueError("border_width must be at least 1")
if border_width > min(screen_capture.width, screen_capture.height) // 4:
raise ValueError(
f"border_width {border_width} is too large for screen size "
f"{screen_capture.width}x{screen_capture.height}"
)
img = screen_capture.image
height, width = img.shape[:2]
# Extract border regions
# Top edge: top border_width rows, full width
top = img[:border_width, :, :]
# Bottom edge: bottom border_width rows, full width
bottom = img[-border_width:, :, :]
# Right edge: right border_width columns, full height
right = img[:, -border_width:, :]
# Left edge: left border_width columns, full height
left = img[:, :border_width, :]
logger.debug(
f"Extracted borders: top={top.shape}, right={right.shape}, "
f"bottom={bottom.shape}, left={left.shape}"
)
return BorderPixels(
top=top,
right=right,
bottom=bottom,
left=left,
)
def get_edge_segments(
edge_pixels: np.ndarray,
segment_count: int,
edge_name: str
) -> List[np.ndarray]:
"""Divide edge pixels into segments.
Args:
edge_pixels: Pixel array for one edge
segment_count: Number of segments to divide into
edge_name: Name of the edge (for orientation)
Returns:
List of pixel arrays, one per segment
Raises:
ValueError: If segment_count is invalid
"""
if segment_count < 1:
raise ValueError("segment_count must be at least 1")
# Determine the dimension to divide
# For top/bottom edges: divide along width (axis 1)
# For left/right edges: divide along height (axis 0)
if edge_name in ["top", "bottom"]:
divide_axis = 1 # Width
edge_length = edge_pixels.shape[1]
else: # left, right
divide_axis = 0 # Height
edge_length = edge_pixels.shape[0]
# Use float stepping so multiple LEDs can share pixels when
# segment_count > edge_length (e.g. after downscaling).
step = edge_length / segment_count
segments = []
for i in range(segment_count):
start = int(i * step)
end = max(start + 1, int((i + 1) * step))
# Clamp to edge bounds
end = min(end, edge_length)
if divide_axis == 1:
segment = edge_pixels[:, start:end, :]
else:
segment = edge_pixels[start:end, :, :]
segments.append(segment)
return segments
def calculate_average_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate average color of a pixel region.
Args:
pixels: Pixel array (height, width, 3)
Returns:
Tuple of (R, G, B) average values
"""
if pixels.size == 0:
return (0, 0, 0)
# Calculate mean across height and width dimensions
mean_color = np.mean(pixels, axis=(0, 1))
# Convert to integers and clamp to valid range
r = int(np.clip(mean_color[0], 0, 255))
g = int(np.clip(mean_color[1], 0, 255))
b = int(np.clip(mean_color[2], 0, 255))
return (r, g, b)
def calculate_median_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate median color of a pixel region.
Args:
pixels: Pixel array (height, width, 3)
Returns:
Tuple of (R, G, B) median values
"""
if pixels.size == 0:
return (0, 0, 0)
# Calculate median across height and width dimensions
median_color = np.median(pixels, axis=(0, 1))
# Convert to integers and clamp to valid range
r = int(np.clip(median_color[0], 0, 255))
g = int(np.clip(median_color[1], 0, 255))
b = int(np.clip(median_color[2], 0, 255))
return (r, g, b)
def calculate_dominant_color(pixels: np.ndarray) -> tuple[int, int, int]:
"""Calculate dominant color of a pixel region using simple clustering.
Args:
pixels: Pixel array (height, width, 3)
Returns:
Tuple of (R, G, B) dominant color values
"""
if pixels.size == 0:
return (0, 0, 0)
# Reshape to (n_pixels, 3)
pixels_reshaped = pixels.reshape(-1, 3)
# For performance, sample pixels if there are too many
max_samples = 1000
if len(pixels_reshaped) > max_samples:
indices = np.random.choice(len(pixels_reshaped), max_samples, replace=False)
pixels_reshaped = pixels_reshaped[indices]
# Simple dominant color: quantize colors and find most common
# Reduce color space to 32 levels per channel for binning
quantized = (pixels_reshaped // 8) * 8
# Find unique colors and their counts
unique_colors, counts = np.unique(quantized, axis=0, return_counts=True)
# Get the most common color
dominant_idx = np.argmax(counts)
dominant_color = unique_colors[dominant_idx]
r = int(np.clip(dominant_color[0], 0, 255))
g = int(np.clip(dominant_color[1], 0, 255))
b = int(np.clip(dominant_color[2], 0, 255))
return (r, g, b)
@@ -0,0 +1,640 @@
"""Screen overlay visualization for LED calibration testing."""
import colorsys
import logging
import sys
import threading
import time
import tkinter as tk
from typing import Dict, List, Tuple, Optional
from wled_controller.core.capture.calibration import CalibrationConfig
from wled_controller.core.capture_engines.base import DisplayInfo
logger = logging.getLogger(__name__)
class OverlayWindow:
"""Transparent overlay window for LED calibration visualization.
Runs in a separate thread with its own Tkinter event loop.
Draws border sampling zones, LED position markers, pixel mapping ranges,
and calibration info text.
"""
def __init__(
self,
display_info: DisplayInfo,
calibration: CalibrationConfig,
target_id: str,
target_name: str = None
):
"""Initialize overlay for a specific display and calibration.
Args:
display_info: Display geometry (x, y, width, height)
calibration: LED calibration configuration
target_id: Target ID for logging
target_name: Target friendly name for display
"""
self.display_info = display_info
self.calibration = calibration
self.target_id = target_id
self.target_name = target_name or target_id
self.root: Optional[tk.Tk] = None
self.canvas: Optional[tk.Canvas] = None
self.running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._after_id: Optional[str] = None
def start(self) -> None:
"""Start overlay in background thread."""
if self._thread and self._thread.is_alive():
raise RuntimeError("Overlay already running")
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run_tkinter_loop,
name=f"Overlay-{self.target_id}",
daemon=True
)
self._thread.start()
def stop(self) -> None:
"""Stop overlay and clean up thread."""
self._stop_event.set()
# Wait for the tkinter thread to fully exit
if self._thread and self._thread.is_alive():
self._thread.join(timeout=3.0)
# Give extra time for Tcl interpreter cleanup
import time
time.sleep(0.2)
def _run_tkinter_loop(self) -> None:
"""Tkinter event loop (runs in background thread)."""
try:
self.root = tk.Tk()
self._setup_window()
self._draw_visualization()
# Check stop event periodically
def check_stop():
if self._stop_event.is_set():
# Cancel the pending after callback before quitting
if self._after_id:
try:
self.root.after_cancel(self._after_id)
except Exception:
pass
self.root.quit() # Exit mainloop cleanly
else:
self._after_id = self.root.after(100, check_stop)
self._after_id = self.root.after(100, check_stop)
self.running = True
logger.info(f"Overlay window started for {self.target_id}")
self.root.mainloop()
except Exception as e:
logger.error(f"Overlay error for {self.target_id}: {e}", exc_info=True)
finally:
self.running = False
# Clean up the window properly
if self.root:
try:
# Cancel any remaining after callbacks
if self._after_id:
self.root.after_cancel(self._after_id)
self._after_id = None
# Destroy window and clean up Tcl interpreter
self.root.destroy()
self.root = None
except Exception as e:
logger.debug(f"Cleanup error: {e}")
logger.info(f"Overlay window stopped for {self.target_id}")
def _setup_window(self) -> None:
"""Configure transparent, frameless, always-on-top window."""
# Position at display coordinates
geometry = f"{self.display_info.width}x{self.display_info.height}"
geometry += f"+{self.display_info.x}+{self.display_info.y}"
self.root.geometry(geometry)
# Remove window decorations
self.root.overrideredirect(True)
# Transparent background and always on top
self.root.attributes('-topmost', True)
# Create canvas for drawing
self.canvas = tk.Canvas(
self.root,
width=self.display_info.width,
height=self.display_info.height,
highlightthickness=0,
bg='black'
)
self.canvas.pack()
# Make canvas itself transparent
self.root.wm_attributes('-transparentcolor', 'black')
self.root.attributes('-alpha', 0.85) # Semi-transparent overlay
# Windows-specific: make click-through
if sys.platform == 'win32':
try:
import ctypes
# Get window handle
hwnd = ctypes.windll.user32.GetParent(self.root.winfo_id())
# Set WS_EX_LAYERED | WS_EX_TRANSPARENT extended styles
style = ctypes.windll.user32.GetWindowLongW(hwnd, -20) # GWL_EXSTYLE
style |= 0x80000 | 0x20 # WS_EX_LAYERED | WS_EX_TRANSPARENT
ctypes.windll.user32.SetWindowLongW(hwnd, -20, style)
except Exception as e:
logger.warning(f"Could not set click-through: {e}")
def _draw_visualization(self) -> None:
"""Draw all visualization elements on canvas."""
w = self.display_info.width
h = self.display_info.height
bw = self.calibration.border_width
# 1. Border sampling zones (colored semi-transparent rectangles)
self._draw_border_zones(w, h, bw)
# 2. LED axes with tick marks
self._draw_led_axes(w, h, bw)
# 3. Calibration info text overlay
self._draw_info_text(w, h)
def _draw_border_zones(self, w: int, h: int, bw: int) -> None:
"""Draw colored rectangles showing border sampling zones."""
# Top zone
self.canvas.create_rectangle(
0, 0, w, bw,
fill='#FF0000', stipple='gray25', outline='#FF0000', width=2
)
# Right zone
self.canvas.create_rectangle(
w - bw, 0, w, h,
fill='#00FF00', stipple='gray25', outline='#00FF00', width=2
)
# Bottom zone
self.canvas.create_rectangle(
0, h - bw, w, h,
fill='#0000FF', stipple='gray25', outline='#0000FF', width=2
)
# Left zone
self.canvas.create_rectangle(
0, 0, bw, h,
fill='#FFFF00', stipple='gray25', outline='#FFFF00', width=2
)
def _draw_led_axes(self, w: int, h: int, bw: int) -> None:
"""Draw axes with tick marks showing LED positions."""
segments = self.calibration.segments
total_leds = self.calibration.get_total_leds()
# Determine tick interval based on total LEDs
if total_leds <= 50:
tick_interval = 5
elif total_leds <= 100:
tick_interval = 10
elif total_leds <= 200:
tick_interval = 20
else:
tick_interval = 50
led_index = 0
for seg in segments:
edge = seg.edge
count = seg.led_count
reverse = seg.reverse
# Get span for this edge
span_start, span_end = self.calibration.get_edge_span(edge)
# Draw axis line and tick marks for this edge
if edge == 'top':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y_axis = bw / 2
# Draw axis line
self.canvas.create_line(
start_px, y_axis, end_px, y_axis,
fill='white', width=3
)
# Draw tick marks
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x, y_axis - tick_len, x, y_axis + tick_len,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
label_y = bw + 20
self.canvas.create_rectangle(
x - 30, label_y - 12, x + 30, label_y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
x, label_y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
led_index += 1
elif edge == 'bottom':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y_axis = h - bw / 2
# Draw axis line
self.canvas.create_line(
start_px, y_axis, end_px, y_axis,
fill='white', width=3
)
# Draw tick marks
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x, y_axis - tick_len, x, y_axis + tick_len,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
label_y = h - bw - 20
self.canvas.create_rectangle(
x - 30, label_y - 12, x + 30, label_y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
x, label_y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
led_index += 1
elif edge == 'left':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x_axis = bw / 2
# Draw axis line
self.canvas.create_line(
x_axis, start_px, x_axis, end_px,
fill='white', width=3
)
# Draw tick marks
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x_axis - tick_len, y, x_axis + tick_len, y,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
label_x = bw + 40
self.canvas.create_rectangle(
label_x - 30, y - 12, label_x + 30, y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
label_x, y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
led_index += 1
elif edge == 'right':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x_axis = w - bw / 2
# Draw axis line
self.canvas.create_line(
x_axis, start_px, x_axis, end_px,
fill='white', width=3
)
# Draw tick marks
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x_axis - tick_len, y, x_axis + tick_len, y,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
label_x = w - bw - 40
self.canvas.create_rectangle(
label_x - 30, y - 12, label_x + 30, y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
label_x, y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
led_index += 1
def _calculate_led_positions(
self,
segment,
w: int, h: int, bw: int
) -> List[Tuple[float, float]]:
"""Calculate (x, y) positions for each LED in a segment."""
edge = segment.edge
count = segment.led_count
reverse = segment.reverse
# Get span for this edge
span_start, span_end = self.calibration.get_edge_span(edge)
positions = []
if edge == 'top':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y = bw / 2 # Middle of border zone
for i in range(count):
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
positions.append((x, y))
elif edge == 'bottom':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y = h - bw / 2
for i in range(count):
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
positions.append((x, y))
elif edge == 'left':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x = bw / 2
for i in range(count):
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
positions.append((x, y))
elif edge == 'right':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x = w - bw / 2
for i in range(count):
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
positions.append((x, y))
if reverse:
positions.reverse()
return positions
def _draw_mapping_ranges(self, w: int, h: int, bw: int) -> None:
"""Draw colored segments showing pixel ranges for each LED."""
segments = self.calibration.segments
# Generate distinct colors for each LED using HSV
total_leds = self.calibration.get_total_leds()
colors = []
for i in range(total_leds):
hue = (i / total_leds) * 360
colors.append(self._hsv_to_rgb(hue, 0.7, 0.9))
led_index = 0
for seg in segments:
ranges = self._calculate_pixel_ranges(seg, w, h)
for i, (x1, y1, x2, y2) in enumerate(ranges):
color = colors[led_index % len(colors)]
# Draw line with LED's color
self.canvas.create_line(
x1, y1, x2, y2,
fill=color, width=3, capstyle='round'
)
led_index += 1
def _calculate_pixel_ranges(
self,
segment,
w: int, h: int
) -> List[Tuple[float, float, float, float]]:
"""Calculate pixel range boundaries for each LED in segment.
Returns list of (x1, y1, x2, y2) line coordinates.
"""
edge = segment.edge
count = segment.led_count
span_start, span_end = self.calibration.get_edge_span(edge)
ranges = []
if edge == 'top':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
x1 = start_px + i * segment_len
x2 = start_px + (i + 1) * segment_len
ranges.append((x1, 0, x2, 0))
elif edge == 'bottom':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
x1 = start_px + i * segment_len
x2 = start_px + (i + 1) * segment_len
ranges.append((x1, h, x2, h))
elif edge == 'left':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
y1 = start_px + i * segment_len
y2 = start_px + (i + 1) * segment_len
ranges.append((0, y1, 0, y2))
elif edge == 'right':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
y1 = start_px + i * segment_len
y2 = start_px + (i + 1) * segment_len
ranges.append((w, y1, w, y2))
return ranges
def _draw_info_text(self, w: int, h: int) -> None:
"""Draw calibration info text overlay."""
info_lines = [
f"Target: {self.target_name}",
f"Total LEDs: {self.calibration.get_total_leds()}",
f"Start: {self.calibration.start_position.replace('_', ' ').title()}",
f"Direction: {self.calibration.layout.title()}",
f"Offset: {self.calibration.offset}",
f"Border Width: {self.calibration.border_width}px",
"",
f"Top: {self.calibration.leds_top} LEDs",
f"Right: {self.calibration.leds_right} LEDs",
f"Bottom: {self.calibration.leds_bottom} LEDs",
f"Left: {self.calibration.leds_left} LEDs",
]
# Draw background box with better readability
text_x = 30
text_y = 30
line_height = 28
padding = 20
box_width = 320
box_height = len(info_lines) * line_height + padding * 2
# Solid dark background with border
self.canvas.create_rectangle(
text_x - padding, text_y - padding,
text_x + box_width, text_y + box_height,
fill='#1a1a1a', outline='white', width=3
)
# Draw text lines with better spacing
for i, line in enumerate(info_lines):
self.canvas.create_text(
text_x, text_y + i * line_height,
text=line,
anchor='nw',
fill='yellow',
font=('Arial', 14, 'bold')
)
@staticmethod
def _hsv_to_rgb(h: float, s: float, v: float) -> str:
"""Convert HSV to hex RGB color string."""
r, g, b = colorsys.hsv_to_rgb(h / 360, s, v)
return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}"
class OverlayManager:
"""Manages overlay windows across multiple targets."""
def __init__(self):
self._overlays: Dict[str, OverlayWindow] = {}
self._lock = threading.Lock()
def start_overlay(
self,
target_id: str,
display_info: DisplayInfo,
calibration: CalibrationConfig,
target_name: str = None
) -> None:
"""Start overlay for a target."""
with self._lock:
if target_id in self._overlays:
raise RuntimeError(f"Overlay already running for {target_id}")
overlay = OverlayWindow(display_info, calibration, target_id, target_name)
overlay.start()
# Wait for overlay to initialize
timeout = 3.0
start = time.time()
while not overlay.running and time.time() - start < timeout:
time.sleep(0.1)
if not overlay.running:
overlay.stop()
raise RuntimeError("Overlay failed to start within timeout")
self._overlays[target_id] = overlay
logger.info(f"Started overlay for target {target_id}")
def stop_overlay(self, target_id: str) -> None:
"""Stop overlay for a target."""
with self._lock:
overlay = self._overlays.pop(target_id, None)
if overlay:
overlay.stop()
logger.info(f"Stopped overlay for target {target_id}")
def is_running(self, target_id: str) -> bool:
"""Check if overlay is running for a target."""
with self._lock:
return target_id in self._overlays
def stop_all(self) -> None:
"""Stop all overlays."""
with self._lock:
for target_id in list(self._overlays.keys()):
overlay = self._overlays.pop(target_id)
overlay.stop()
logger.info("Stopped all overlays")