Overlay: fix 404, crash on repeat, missing edge test colors, device reset on stop

- Target overlay works without active processing: route pre-loads calibration
  and display info from the CSS store, passes to processor as fallback
- Fix server crash on repeated overlay: replace per-window tk.Tk() with single
  persistent hidden root; each overlay is a Toplevel child dispatched via
  root.after() — eliminates Tcl interpreter crashes on Windows
- Fix edge test colors not lighting up: always call set_test_mode regardless
  of processing state (was guarded by 'not proc.is_running'); pass calibration
  so _send_test_pixels knows which LEDs map to which edges
- Fix device reset on overlay stop: keep idle serial client cached after
  clearing test mode; start_processing() already closes it before connecting

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-20 17:16:10 +03:00
parent a3aeafef13
commit 018bedf9f6
7 changed files with 349 additions and 476 deletions

View File

@@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_color_strip_store,
get_picture_source_store,
get_picture_target_store,
get_processor_manager,
)
@@ -23,9 +24,12 @@ from wled_controller.core.capture.calibration import (
calibration_from_dict,
calibration_to_dict,
)
from wled_controller.core.capture.screen_capture import get_available_displays
from wled_controller.core.processing.processor_manager import ProcessorManager
from wled_controller.storage.color_strip_source import PictureColorStripSource
from wled_controller.storage.color_strip_store import ColorStripStore
from wled_controller.storage.picture_source import ProcessedPictureSource, ScreenCapturePictureSource
from wled_controller.storage.picture_source_store import PictureSourceStore
from wled_controller.storage.picture_target_store import PictureTargetStore
from wled_controller.utils import get_logger
@@ -34,7 +38,7 @@ logger = get_logger(__name__)
router = APIRouter()
def _css_to_response(source) -> ColorStripSourceResponse:
def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse:
"""Convert a ColorStripSource to a ColorStripSourceResponse."""
calibration = None
if isinstance(source, PictureColorStripSource) and source.calibration:
@@ -54,21 +58,38 @@ def _css_to_response(source) -> ColorStripSourceResponse:
led_count=getattr(source, "led_count", 0),
calibration=calibration,
description=source.description,
overlay_active=overlay_active,
created_at=source.created_at,
updated_at=source.updated_at,
)
def _resolve_display_index(picture_source_id: str, picture_source_store: PictureSourceStore, depth: int = 0) -> int:
"""Resolve display index from a picture source, following processed source chains."""
if not picture_source_id or depth > 5:
return 0
try:
ps = picture_source_store.get_source(picture_source_id)
except Exception:
return 0
if isinstance(ps, ScreenCapturePictureSource):
return ps.display_index
if isinstance(ps, ProcessedPictureSource):
return _resolve_display_index(ps.source_stream_id, picture_source_store, depth + 1)
return 0
# ===== CRUD ENDPOINTS =====
@router.get("/api/v1/color-strip-sources", response_model=ColorStripSourceListResponse, tags=["Color Strip Sources"])
async def list_color_strip_sources(
_auth: AuthRequired,
store: ColorStripStore = Depends(get_color_strip_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""List all color strip sources."""
sources = store.get_all_sources()
responses = [_css_to_response(s) for s in sources]
responses = [_css_to_response(s, manager.is_css_overlay_active(s.id)) for s in sources]
return ColorStripSourceListResponse(sources=responses, count=len(responses))
@@ -112,11 +133,12 @@ async def get_color_strip_source(
source_id: str,
_auth: AuthRequired,
store: ColorStripStore = Depends(get_color_strip_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Get a color strip source by ID."""
try:
source = store.get_source(source_id)
return _css_to_response(source)
return _css_to_response(source, manager.is_css_overlay_active(source_id))
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@@ -259,3 +281,67 @@ async def test_css_calibration(
except Exception as e:
logger.error(f"Failed to set CSS calibration test mode: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ===== OVERLAY VISUALIZATION =====
@router.post("/api/v1/color-strip-sources/{source_id}/overlay/start", tags=["Color Strip Sources"])
async def start_css_overlay(
source_id: str,
_auth: AuthRequired,
store: ColorStripStore = Depends(get_color_strip_store),
picture_source_store: PictureSourceStore = Depends(get_picture_source_store),
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Start screen overlay visualization for a color strip source."""
try:
source = store.get_source(source_id)
if not isinstance(source, PictureColorStripSource):
raise HTTPException(status_code=400, detail="Overlay only supported for picture color strip sources")
if not source.calibration:
raise HTTPException(status_code=400, detail="Color strip source has no calibration configured")
display_index = _resolve_display_index(source.picture_source_id, picture_source_store)
displays = get_available_displays()
if not displays:
raise HTTPException(status_code=409, detail="No displays available")
display_index = min(display_index, len(displays) - 1)
display_info = displays[display_index]
await manager.start_css_overlay(source_id, display_info, source.calibration, source.name)
return {"status": "started", "source_id": source_id}
except HTTPException:
raise
except RuntimeError as e:
raise HTTPException(status_code=409, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Failed to start CSS overlay: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/color-strip-sources/{source_id}/overlay/stop", tags=["Color Strip Sources"])
async def stop_css_overlay(
source_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Stop screen overlay visualization for a color strip source."""
try:
await manager.stop_css_overlay(source_id)
return {"status": "stopped", "source_id": source_id}
except Exception as e:
logger.error(f"Failed to stop CSS overlay: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/color-strip-sources/{source_id}/overlay/status", tags=["Color Strip Sources"])
async def get_css_overlay_status(
source_id: str,
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
):
"""Check if overlay is active for a color strip source."""
return {"source_id": source_id, "active": manager.is_css_overlay_active(source_id)}

View File

@@ -11,6 +11,7 @@ from PIL import Image
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
get_color_strip_store,
get_device_store,
get_pattern_template_store,
get_picture_source_store,
@@ -40,7 +41,10 @@ from wled_controller.core.capture.screen_capture import (
calculate_average_color,
calculate_dominant_color,
calculate_median_color,
get_available_displays,
)
from wled_controller.storage.color_strip_store import ColorStripStore
from wled_controller.storage.color_strip_source import PictureColorStripSource
from wled_controller.storage import DeviceStore
from wled_controller.storage.pattern_template_store import PatternTemplateStore
from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource
@@ -679,6 +683,8 @@ async def start_target_overlay(
_auth: AuthRequired,
manager: ProcessorManager = Depends(get_processor_manager),
target_store: PictureTargetStore = Depends(get_picture_target_store),
color_strip_store: ColorStripStore = Depends(get_color_strip_store),
picture_source_store: PictureSourceStore = Depends(get_picture_source_store),
):
"""Start screen overlay visualization for a target.
@@ -694,7 +700,26 @@ async def start_target_overlay(
if not target:
raise ValueError(f"Target {target_id} not found")
await manager.start_overlay(target_id, target.name)
# Pre-load calibration and display info from the CSS store so the overlay
# can start even when processing is not currently running.
calibration = None
display_info = None
if isinstance(target, WledPictureTarget) and target.color_strip_source_id:
try:
css = color_strip_store.get_source(target.color_strip_source_id)
if isinstance(css, PictureColorStripSource) and css.calibration:
calibration = css.calibration
# Resolve the display this CSS is capturing
from wled_controller.api.routes.color_strip_sources import _resolve_display_index
display_index = _resolve_display_index(css.picture_source_id, picture_source_store)
displays = get_available_displays()
if displays:
display_index = min(display_index, len(displays) - 1)
display_info = displays[display_index]
except Exception as e:
logger.warning(f"Could not pre-load CSS calibration for overlay on {target_id}: {e}")
await manager.start_overlay(target_id, target.name, calibration=calibration, display_info=display_info)
return {"status": "started", "target_id": target_id}
except ValueError as e:

View File

@@ -57,6 +57,7 @@ class ColorStripSourceResponse(BaseModel):
led_count: int = Field(0, description="Total LED count (0 = auto from calibration)")
calibration: Optional[Calibration] = Field(None, description="LED calibration")
description: Optional[str] = Field(None, description="Description")
overlay_active: bool = Field(False, description="Whether the screen overlay is currently active")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")

View File

@@ -6,7 +6,7 @@ import sys
import threading
import time
import tkinter as tk
from typing import Dict, List, Tuple, Optional
from typing import Dict, List, Optional, Tuple
from wled_controller.core.capture.calibration import CalibrationConfig
from wled_controller.core.capture_engines.base import DisplayInfo
@@ -14,12 +14,16 @@ from wled_controller.core.capture_engines.base import DisplayInfo
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# OverlayWindow — one Toplevel per target, no own thread/mainloop
# ---------------------------------------------------------------------------
class OverlayWindow:
"""Transparent overlay window for LED calibration visualization.
Runs in a separate thread with its own Tkinter event loop.
Draws border sampling zones, LED position markers, pixel mapping ranges,
and calibration info text.
Does NOT own a Tcl interpreter or event loop — it is a Toplevel child of
the shared root managed by OverlayManager. All methods must be called
from the Tkinter thread.
"""
def __init__(
@@ -27,176 +31,93 @@ class OverlayWindow:
display_info: DisplayInfo,
calibration: CalibrationConfig,
target_id: str,
target_name: str = None
target_name: str = None,
):
"""Initialize overlay for a specific display and calibration.
Args:
display_info: Display geometry (x, y, width, height)
calibration: LED calibration configuration
target_id: Target ID for logging
target_name: Target friendly name for display
"""
self.display_info = display_info
self.calibration = calibration
self.target_id = target_id
self.target_name = target_name or target_id
self.root: Optional[tk.Tk] = None
self.canvas: Optional[tk.Canvas] = None
self._window: Optional[tk.Toplevel] = None
self._canvas: Optional[tk.Canvas] = None
self.running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._after_id: Optional[str] = None
def start(self) -> None:
"""Start overlay in background thread."""
if self._thread and self._thread.is_alive():
raise RuntimeError("Overlay already running")
# ----- Lifecycle (must run in Tk thread) -----
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run_tkinter_loop,
name=f"Overlay-{self.target_id}",
daemon=True
)
self._thread.start()
def stop(self) -> None:
"""Stop overlay and clean up thread."""
self._stop_event.set()
# Wait for the tkinter thread to fully exit
if self._thread and self._thread.is_alive():
self._thread.join(timeout=3.0)
# Give extra time for Tcl interpreter cleanup
import time
time.sleep(0.2)
def _run_tkinter_loop(self) -> None:
"""Tkinter event loop (runs in background thread)."""
try:
self.root = tk.Tk()
def start(self, root: tk.Tk) -> None:
"""Create and show the overlay Toplevel (runs in Tk thread)."""
self._window = tk.Toplevel(root)
self._setup_window()
self._draw_visualization()
# Check stop event periodically
def check_stop():
if self._stop_event.is_set():
# Cancel the pending after callback before quitting
if self._after_id:
try:
self.root.after_cancel(self._after_id)
except Exception:
pass
self.root.quit() # Exit mainloop cleanly
else:
self._after_id = self.root.after(100, check_stop)
self._after_id = self.root.after(100, check_stop)
self.running = True
logger.info(f"Overlay window started for {self.target_id}")
self.root.mainloop()
except Exception as e:
logger.error(f"Overlay error for {self.target_id}: {e}", exc_info=True)
finally:
self.running = False
# Clean up the window properly
if self.root:
def stop(self) -> None:
"""Destroy the overlay Toplevel (runs in Tk thread)."""
if self._window is not None:
try:
# Cancel any remaining after callbacks
if self._after_id:
self.root.after_cancel(self._after_id)
self._after_id = None
# Destroy window and clean up Tcl interpreter
self.root.destroy()
self.root = None
self._window.destroy()
except Exception as e:
logger.debug(f"Cleanup error: {e}")
logger.debug(f"Cleanup error for {self.target_id}: {e}")
self._window = None
self._canvas = None
self.running = False
logger.info(f"Overlay window stopped for {self.target_id}")
# ----- Window setup -----
def _setup_window(self) -> None:
"""Configure transparent, frameless, always-on-top window."""
# Position at display coordinates
geometry = f"{self.display_info.width}x{self.display_info.height}"
geometry += f"+{self.display_info.x}+{self.display_info.y}"
self.root.geometry(geometry)
win = self._window
geometry = (
f"{self.display_info.width}x{self.display_info.height}"
f"+{self.display_info.x}+{self.display_info.y}"
)
win.geometry(geometry)
win.overrideredirect(True)
win.attributes("-topmost", True)
# Remove window decorations
self.root.overrideredirect(True)
# Transparent background and always on top
self.root.attributes('-topmost', True)
# Create canvas for drawing
self.canvas = tk.Canvas(
self.root,
self._canvas = tk.Canvas(
win,
width=self.display_info.width,
height=self.display_info.height,
highlightthickness=0,
bg='black'
bg="black",
)
self.canvas.pack()
self._canvas.pack()
# Make canvas itself transparent
self.root.wm_attributes('-transparentcolor', 'black')
self.root.attributes('-alpha', 0.85) # Semi-transparent overlay
win.wm_attributes("-transparentcolor", "black")
win.attributes("-alpha", 0.85)
# Windows-specific: make click-through
if sys.platform == 'win32':
if sys.platform == "win32":
try:
import ctypes
# Get window handle
hwnd = ctypes.windll.user32.GetParent(self.root.winfo_id())
# Set WS_EX_LAYERED | WS_EX_TRANSPARENT extended styles
hwnd = ctypes.windll.user32.GetParent(win.winfo_id())
style = ctypes.windll.user32.GetWindowLongW(hwnd, -20) # GWL_EXSTYLE
style |= 0x80000 | 0x20 # WS_EX_LAYERED | WS_EX_TRANSPARENT
ctypes.windll.user32.SetWindowLongW(hwnd, -20, style)
except Exception as e:
logger.warning(f"Could not set click-through: {e}")
# ----- Drawing -----
def _draw_visualization(self) -> None:
"""Draw all visualization elements on canvas."""
w = self.display_info.width
h = self.display_info.height
bw = self.calibration.border_width
# 1. Border sampling zones (colored semi-transparent rectangles)
self._draw_border_zones(w, h, bw)
# 2. LED axes with tick marks
self._draw_led_axes(w, h, bw)
# 3. Calibration info text overlay
self._draw_info_text(w, h)
def _draw_border_zones(self, w: int, h: int, bw: int) -> None:
"""Draw colored rectangles showing border sampling zones."""
# Top zone
self.canvas.create_rectangle(
0, 0, w, bw,
fill='#FF0000', stipple='gray25', outline='#FF0000', width=2
)
# Right zone
self.canvas.create_rectangle(
w - bw, 0, w, h,
fill='#00FF00', stipple='gray25', outline='#00FF00', width=2
)
# Bottom zone
self.canvas.create_rectangle(
0, h - bw, w, h,
fill='#0000FF', stipple='gray25', outline='#0000FF', width=2
)
# Left zone
self.canvas.create_rectangle(
0, 0, bw, h,
fill='#FFFF00', stipple='gray25', outline='#FFFF00', width=2
)
c = self._canvas
c.create_rectangle(0, 0, w, bw, fill="#FF0000", stipple="gray25", outline="#FF0000", width=2)
c.create_rectangle(w - bw, 0, w, h, fill="#00FF00", stipple="gray25", outline="#00FF00", width=2)
c.create_rectangle(0, h - bw, w, h, fill="#0000FF", stipple="gray25", outline="#0000FF", width=2)
c.create_rectangle(0, 0, bw, h, fill="#FFFF00", stipple="gray25", outline="#FFFF00", width=2)
def _draw_led_axes(self, w: int, h: int, bw: int) -> None:
"""Draw axes with tick marks showing LED positions."""
segments = self.calibration.segments
total_leds = self.calibration.get_total_leds()
# Determine tick interval based on total LEDs
if total_leds <= 50:
tick_interval = 5
elif total_leds <= 100:
@@ -210,333 +131,81 @@ class OverlayWindow:
for seg in segments:
edge = seg.edge
count = seg.led_count
reverse = seg.reverse
# Get span for this edge
span_start, span_end = self.calibration.get_edge_span(edge)
# Draw axis line and tick marks for this edge
if edge == 'top':
if edge == "top":
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y_axis = bw / 2
# Draw axis line
self.canvas.create_line(
start_px, y_axis, end_px, y_axis,
fill='white', width=3
)
# Draw tick marks
self._canvas.create_line(start_px, y_axis, end_px, y_axis, fill="white", width=3)
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
display_idx = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_idx % tick_interval == 0:
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x, y_axis - tick_len, x, y_axis + tick_len,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
tick_len = 15 if display_idx % tick_interval == 0 else 10
self._canvas.create_line(x, y_axis - tick_len, x, y_axis + tick_len, fill="white", width=2)
label_y = bw + 20
self.canvas.create_rectangle(
x - 30, label_y - 12, x + 30, label_y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
x, label_y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
self._canvas.create_rectangle(x - 30, label_y - 12, x + 30, label_y + 12, fill="white", outline="black", width=2)
self._canvas.create_text(x, label_y, text=str(display_idx), fill="black", font=("Arial", 13, "bold"))
led_index += 1
elif edge == 'bottom':
elif edge == "bottom":
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y_axis = h - bw / 2
# Draw axis line
self.canvas.create_line(
start_px, y_axis, end_px, y_axis,
fill='white', width=3
)
# Draw tick marks
self._canvas.create_line(start_px, y_axis, end_px, y_axis, fill="white", width=3)
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
display_idx = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_idx % tick_interval == 0:
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x, y_axis - tick_len, x, y_axis + tick_len,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
tick_len = 15 if display_idx % tick_interval == 0 else 10
self._canvas.create_line(x, y_axis - tick_len, x, y_axis + tick_len, fill="white", width=2)
label_y = h - bw - 20
self.canvas.create_rectangle(
x - 30, label_y - 12, x + 30, label_y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
x, label_y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
self._canvas.create_rectangle(x - 30, label_y - 12, x + 30, label_y + 12, fill="white", outline="black", width=2)
self._canvas.create_text(x, label_y, text=str(display_idx), fill="black", font=("Arial", 13, "bold"))
led_index += 1
elif edge == 'left':
elif edge == "left":
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x_axis = bw / 2
# Draw axis line
self.canvas.create_line(
x_axis, start_px, x_axis, end_px,
fill='white', width=3
)
# Draw tick marks
self._canvas.create_line(x_axis, start_px, x_axis, end_px, fill="white", width=3)
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
display_idx = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_idx % tick_interval == 0:
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x_axis - tick_len, y, x_axis + tick_len, y,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
tick_len = 15 if display_idx % tick_interval == 0 else 10
self._canvas.create_line(x_axis - tick_len, y, x_axis + tick_len, y, fill="white", width=2)
label_x = bw + 40
self.canvas.create_rectangle(
label_x - 30, y - 12, label_x + 30, y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
label_x, y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
self._canvas.create_rectangle(label_x - 30, y - 12, label_x + 30, y + 12, fill="white", outline="black", width=2)
self._canvas.create_text(label_x, y, text=str(display_idx), fill="black", font=("Arial", 13, "bold"))
led_index += 1
elif edge == 'right':
elif edge == "right":
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x_axis = w - bw / 2
# Draw axis line
self.canvas.create_line(
x_axis, start_px, x_axis, end_px,
fill='white', width=3
)
# Draw tick marks
self._canvas.create_line(x_axis, start_px, x_axis, end_px, fill="white", width=3)
for i in range(count):
display_index = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_index % tick_interval == 0:
display_idx = (led_index + self.calibration.offset) % total_leds
if i == 0 or i == count - 1 or display_idx % tick_interval == 0:
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
# Tick mark
tick_len = 15 if display_index % tick_interval == 0 else 10
self.canvas.create_line(
x_axis - tick_len, y, x_axis + tick_len, y,
fill='white', width=2
)
# Label with background (positioned INSIDE screen)
tick_len = 15 if display_idx % tick_interval == 0 else 10
self._canvas.create_line(x_axis - tick_len, y, x_axis + tick_len, y, fill="white", width=2)
label_x = w - bw - 40
self.canvas.create_rectangle(
label_x - 30, y - 12, label_x + 30, y + 12,
fill='white', outline='black', width=2
)
self.canvas.create_text(
label_x, y,
text=str(display_index),
fill='black',
font=('Arial', 13, 'bold')
)
self._canvas.create_rectangle(label_x - 30, y - 12, label_x + 30, y + 12, fill="white", outline="black", width=2)
self._canvas.create_text(label_x, y, text=str(display_idx), fill="black", font=("Arial", 13, "bold"))
led_index += 1
def _calculate_led_positions(
self,
segment,
w: int, h: int, bw: int
) -> List[Tuple[float, float]]:
"""Calculate (x, y) positions for each LED in a segment."""
edge = segment.edge
count = segment.led_count
reverse = segment.reverse
# Get span for this edge
span_start, span_end = self.calibration.get_edge_span(edge)
positions = []
if edge == 'top':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y = bw / 2 # Middle of border zone
for i in range(count):
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
positions.append((x, y))
elif edge == 'bottom':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
y = h - bw / 2
for i in range(count):
frac = i / count if count > 1 else 0.5
x = start_px + frac * (end_px - start_px)
positions.append((x, y))
elif edge == 'left':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x = bw / 2
for i in range(count):
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
positions.append((x, y))
elif edge == 'right':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
x = w - bw / 2
for i in range(count):
frac = i / count if count > 1 else 0.5
y = start_px + frac * (end_px - start_px)
positions.append((x, y))
if reverse:
positions.reverse()
return positions
def _draw_mapping_ranges(self, w: int, h: int, bw: int) -> None:
"""Draw colored segments showing pixel ranges for each LED."""
segments = self.calibration.segments
# Generate distinct colors for each LED using HSV
total_leds = self.calibration.get_total_leds()
colors = []
for i in range(total_leds):
hue = (i / total_leds) * 360
colors.append(self._hsv_to_rgb(hue, 0.7, 0.9))
led_index = 0
for seg in segments:
ranges = self._calculate_pixel_ranges(seg, w, h)
for i, (x1, y1, x2, y2) in enumerate(ranges):
color = colors[led_index % len(colors)]
# Draw line with LED's color
self.canvas.create_line(
x1, y1, x2, y2,
fill=color, width=3, capstyle='round'
)
led_index += 1
def _calculate_pixel_ranges(
self,
segment,
w: int, h: int
) -> List[Tuple[float, float, float, float]]:
"""Calculate pixel range boundaries for each LED in segment.
Returns list of (x1, y1, x2, y2) line coordinates.
"""
edge = segment.edge
count = segment.led_count
span_start, span_end = self.calibration.get_edge_span(edge)
ranges = []
if edge == 'top':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
x1 = start_px + i * segment_len
x2 = start_px + (i + 1) * segment_len
ranges.append((x1, 0, x2, 0))
elif edge == 'bottom':
edge_len = w
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
x1 = start_px + i * segment_len
x2 = start_px + (i + 1) * segment_len
ranges.append((x1, h, x2, h))
elif edge == 'left':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
y1 = start_px + i * segment_len
y2 = start_px + (i + 1) * segment_len
ranges.append((0, y1, 0, y2))
elif edge == 'right':
edge_len = h
start_px = span_start * edge_len
end_px = span_end * edge_len
segment_len = (end_px - start_px) / count if count > 0 else 0
for i in range(count):
y1 = start_px + i * segment_len
y2 = start_px + (i + 1) * segment_len
ranges.append((w, y1, w, y2))
return ranges
def _draw_info_text(self, w: int, h: int) -> None:
"""Draw calibration info text overlay."""
info_lines = [
f"Target: {self.target_name}",
f"Total LEDs: {self.calibration.get_total_leds()}",
@@ -550,91 +219,121 @@ class OverlayWindow:
f"Bottom: {self.calibration.leds_bottom} LEDs",
f"Left: {self.calibration.leds_left} LEDs",
]
# Draw background box with better readability
text_x = 30
text_y = 30
line_height = 28
padding = 20
box_width = 320
box_height = len(info_lines) * line_height + padding * 2
# Solid dark background with border
self.canvas.create_rectangle(
self._canvas.create_rectangle(
text_x - padding, text_y - padding,
text_x + box_width, text_y + box_height,
fill='#1a1a1a', outline='white', width=3
fill="#1a1a1a", outline="white", width=3,
)
# Draw text lines with better spacing
for i, line in enumerate(info_lines):
self.canvas.create_text(
self._canvas.create_text(
text_x, text_y + i * line_height,
text=line,
anchor='nw',
fill='yellow',
font=('Arial', 14, 'bold')
text=line, anchor="nw", fill="yellow", font=("Arial", 14, "bold"),
)
@staticmethod
def _hsv_to_rgb(h: float, s: float, v: float) -> str:
"""Convert HSV to hex RGB color string."""
r, g, b = colorsys.hsv_to_rgb(h / 360, s, v)
return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}"
# ---------------------------------------------------------------------------
# OverlayManager — single persistent Tk root, all ops via root.after()
# ---------------------------------------------------------------------------
class OverlayManager:
"""Manages overlay windows across multiple targets."""
"""Manages overlay windows. Uses one persistent hidden Tk root so that
Toplevel windows can be created/destroyed repeatedly without crashing
the Tcl interpreter."""
def __init__(self):
self._overlays: Dict[str, OverlayWindow] = {}
self._lock = threading.Lock()
self._tk_root: Optional[tk.Tk] = None
self._tk_thread: Optional[threading.Thread] = None
self._tk_ready = threading.Event()
self._start_tk_thread()
# ----- Internal: persistent Tk thread -----
def _start_tk_thread(self) -> None:
def _run():
try:
self._tk_root = tk.Tk()
self._tk_root.withdraw() # invisible root — never shown
self._tk_ready.set()
self._tk_root.mainloop()
except Exception as e:
logger.error(f"Tkinter thread crashed: {e}", exc_info=True)
self._tk_ready.set() # unblock waiters even on failure
self._tk_thread = threading.Thread(
target=_run, name="TkOverlayRoot", daemon=True
)
self._tk_thread.start()
self._tk_ready.wait(timeout=5.0)
def _run_in_tk(self, fn) -> None:
"""Schedule *fn* on the Tk thread and block until it completes."""
if self._tk_root is None:
raise RuntimeError("Tkinter root not available")
done = threading.Event()
exc_box: List[Optional[BaseException]] = [None]
def wrapper():
try:
fn()
except Exception as e:
exc_box[0] = e
finally:
done.set()
self._tk_root.after(0, wrapper)
done.wait(timeout=5.0)
if exc_box[0] is not None:
raise exc_box[0]
# ----- Public API -----
def start_overlay(
self,
target_id: str,
display_info: DisplayInfo,
calibration: CalibrationConfig,
target_name: str = None
target_name: str = None,
) -> None:
"""Start overlay for a target."""
with self._lock:
if target_id in self._overlays:
raise RuntimeError(f"Overlay already running for {target_id}")
overlay = OverlayWindow(display_info, calibration, target_id, target_name)
overlay.start()
# Wait for overlay to initialize
timeout = 3.0
start = time.time()
while not overlay.running and time.time() - start < timeout:
time.sleep(0.1)
self._run_in_tk(lambda: overlay.start(self._tk_root))
if not overlay.running:
overlay.stop()
raise RuntimeError("Overlay failed to start within timeout")
raise RuntimeError(f"Overlay failed to start for {target_id}")
self._overlays[target_id] = overlay
logger.info(f"Started overlay for target {target_id}")
def stop_overlay(self, target_id: str) -> None:
"""Stop overlay for a target."""
with self._lock:
overlay = self._overlays.pop(target_id, None)
if overlay:
overlay.stop()
self._run_in_tk(overlay.stop)
logger.info(f"Stopped overlay for target {target_id}")
def is_running(self, target_id: str) -> bool:
"""Check if overlay is running for a target."""
with self._lock:
return target_id in self._overlays
def stop_all(self) -> None:
"""Stop all overlays."""
with self._lock:
for target_id in list(self._overlays.keys()):
overlay = self._overlays.pop(target_id)
overlay.stop()
self._run_in_tk(overlay.stop)
logger.info("Stopped all overlays")

View File

@@ -435,16 +435,18 @@ class ProcessorManager:
"left": [255, 255, 0],
}
async def start_overlay(self, target_id: str, target_name: str = None) -> None:
async def start_overlay(self, target_id: str, target_name: str = None, calibration=None, display_info=None) -> None:
proc = self._get_processor(target_id)
if not proc.supports_overlay():
raise ValueError(f"Target {target_id} does not support overlays")
await proc.start_overlay(target_name)
await proc.start_overlay(target_name, calibration=calibration, display_info=display_info)
# Light up device LEDs with edge test colors while overlay is visible
if proc.device_id is not None and not proc.is_running:
# Light up device LEDs with edge test colors while overlay is visible.
# Always do this regardless of whether processing is running — the processing
# loop pauses itself when test_mode_active is set.
if proc.device_id is not None:
try:
await self.set_test_mode(proc.device_id, self._OVERLAY_EDGE_COLORS)
await self.set_test_mode(proc.device_id, self._OVERLAY_EDGE_COLORS, calibration)
except Exception as e:
logger.warning(f"Failed to set edge test for overlay on {proc.device_id}: {e}")
@@ -452,8 +454,8 @@ class ProcessorManager:
proc = self._get_processor(target_id)
await proc.stop_overlay()
# Clear device LEDs when overlay is dismissed
if proc.device_id is not None and not proc.is_running:
# Clear device LEDs when overlay is dismissed.
if proc.device_id is not None:
try:
await self.set_test_mode(proc.device_id, {})
except Exception as e:
@@ -462,6 +464,23 @@ class ProcessorManager:
def is_overlay_active(self, target_id: str) -> bool:
return self._get_processor(target_id).is_overlay_active()
# ===== CSS OVERLAY (direct, no target processor required) =====
async def start_css_overlay(self, css_id: str, display_info, calibration, css_name: str = None) -> None:
await asyncio.to_thread(
self._overlay_manager.start_overlay,
css_id, display_info, calibration, css_name,
)
async def stop_css_overlay(self, css_id: str) -> None:
await asyncio.to_thread(
self._overlay_manager.stop_overlay,
css_id,
)
def is_css_overlay_active(self, css_id: str) -> bool:
return self._overlay_manager.is_running(css_id)
# ===== WEBSOCKET (delegates to processor) =====
def add_kc_ws_client(self, target_id: str, ws) -> None:
@@ -508,7 +527,8 @@ class ProcessorManager:
ds.test_mode_edges = {}
ds.test_calibration = None
await self._send_clear_pixels(device_id)
await self._close_idle_client(device_id)
# Keep idle client open — serial reconnect causes device reset.
# start_processing() closes it before connecting its own client.
async def _get_idle_client(self, device_id: str):
"""Get or create a cached idle LED client for a device.

View File

@@ -268,18 +268,22 @@ class WledTargetProcessor(TargetProcessor):
def supports_overlay(self) -> bool:
return True
async def start_overlay(self, target_name: Optional[str] = None) -> None:
async def start_overlay(self, target_name: Optional[str] = None, calibration=None, display_info=None) -> None:
if self._overlay_active:
raise RuntimeError(f"Overlay already active for {self._target_id}")
if calibration is None or display_info is None:
# Calibration comes from the active color strip stream
if self._color_strip_stream is None:
raise ValueError(
f"Cannot start overlay for {self._target_id}: no color strip stream active. "
f"Start processing first."
f"Cannot start overlay for {self._target_id}: no color strip stream active "
f"and no calibration provided."
)
if calibration is None:
calibration = self._color_strip_stream.calibration
if display_info is None:
display_index = self._resolved_display_index
if display_index is None:
display_index = self._color_strip_stream.display_index

View File

@@ -222,3 +222,41 @@ export async function deleteColorStrip(cssId) {
showToast('Failed to delete color strip source', 'error');
}
}
/* ── Overlay ──────────────────────────────────────────────────── */
export async function startCSSOverlay(cssId) {
try {
const response = await fetchWithAuth(`/color-strip-sources/${cssId}/overlay/start`, {
method: 'POST',
});
if (response.ok) {
showToast(t('overlay.started'), 'success');
if (window.loadTargetsTab) window.loadTargetsTab();
} else {
const error = await response.json();
showToast(t('overlay.error.start') + ': ' + error.detail, 'error');
}
} catch (error) {
if (error.isAuth) return;
showToast(t('overlay.error.start'), 'error');
}
}
export async function stopCSSOverlay(cssId) {
try {
const response = await fetchWithAuth(`/color-strip-sources/${cssId}/overlay/stop`, {
method: 'POST',
});
if (response.ok) {
showToast(t('overlay.stopped'), 'success');
if (window.loadTargetsTab) window.loadTargetsTab();
} else {
const error = await response.json();
showToast(t('overlay.error.stop') + ': ' + error.detail, 'error');
}
} catch (error) {
if (error.isAuth) return;
showToast(t('overlay.error.stop'), 'error');
}
}