From 018bedf9f67badb789522ad03195cf59c1122435 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Fri, 20 Feb 2026 17:16:10 +0300 Subject: [PATCH] Overlay: fix 404, crash on repeat, missing edge test colors, device reset on stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Target overlay works without active processing: route pre-loads calibration and display info from the CSS store, passes to processor as fallback - Fix server crash on repeated overlay: replace per-window tk.Tk() with single persistent hidden root; each overlay is a Toplevel child dispatched via root.after() — eliminates Tcl interpreter crashes on Windows - Fix edge test colors not lighting up: always call set_test_mode regardless of processing state (was guarded by 'not proc.is_running'); pass calibration so _send_test_pixels knows which LEDs map to which edges - Fix device reset on overlay stop: keep idle serial client cached after clearing test mode; start_processing() already closes it before connecting Co-Authored-By: Claude Sonnet 4.6 --- .../api/routes/color_strip_sources.py | 92 ++- .../api/routes/picture_targets.py | 27 +- .../api/schemas/color_strip_sources.py | 1 + .../core/capture/screen_overlay.py | 593 +++++------------- .../core/processing/processor_manager.py | 36 +- .../core/processing/wled_target_processor.py | 38 +- .../static/js/features/color-strips.js | 38 ++ 7 files changed, 349 insertions(+), 476 deletions(-) diff --git a/server/src/wled_controller/api/routes/color_strip_sources.py b/server/src/wled_controller/api/routes/color_strip_sources.py index 6140696..ec29a97 100644 --- a/server/src/wled_controller/api/routes/color_strip_sources.py +++ b/server/src/wled_controller/api/routes/color_strip_sources.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( get_color_strip_store, + get_picture_source_store, get_picture_target_store, get_processor_manager, ) @@ -23,9 +24,12 @@ from wled_controller.core.capture.calibration import ( calibration_from_dict, calibration_to_dict, ) +from wled_controller.core.capture.screen_capture import get_available_displays from wled_controller.core.processing.processor_manager import ProcessorManager from wled_controller.storage.color_strip_source import PictureColorStripSource from wled_controller.storage.color_strip_store import ColorStripStore +from wled_controller.storage.picture_source import ProcessedPictureSource, ScreenCapturePictureSource +from wled_controller.storage.picture_source_store import PictureSourceStore from wled_controller.storage.picture_target_store import PictureTargetStore from wled_controller.utils import get_logger @@ -34,7 +38,7 @@ logger = get_logger(__name__) router = APIRouter() -def _css_to_response(source) -> ColorStripSourceResponse: +def _css_to_response(source, overlay_active: bool = False) -> ColorStripSourceResponse: """Convert a ColorStripSource to a ColorStripSourceResponse.""" calibration = None if isinstance(source, PictureColorStripSource) and source.calibration: @@ -54,21 +58,38 @@ def _css_to_response(source) -> ColorStripSourceResponse: led_count=getattr(source, "led_count", 0), calibration=calibration, description=source.description, + overlay_active=overlay_active, created_at=source.created_at, updated_at=source.updated_at, ) +def _resolve_display_index(picture_source_id: str, picture_source_store: PictureSourceStore, depth: int = 0) -> int: + """Resolve display index from a picture source, following processed source chains.""" + if not picture_source_id or depth > 5: + return 0 + try: + ps = picture_source_store.get_source(picture_source_id) + except Exception: + return 0 + if isinstance(ps, ScreenCapturePictureSource): + return ps.display_index + if isinstance(ps, ProcessedPictureSource): + return _resolve_display_index(ps.source_stream_id, picture_source_store, depth + 1) + return 0 + + # ===== CRUD ENDPOINTS ===== @router.get("/api/v1/color-strip-sources", response_model=ColorStripSourceListResponse, tags=["Color Strip Sources"]) async def list_color_strip_sources( _auth: AuthRequired, store: ColorStripStore = Depends(get_color_strip_store), + manager: ProcessorManager = Depends(get_processor_manager), ): """List all color strip sources.""" sources = store.get_all_sources() - responses = [_css_to_response(s) for s in sources] + responses = [_css_to_response(s, manager.is_css_overlay_active(s.id)) for s in sources] return ColorStripSourceListResponse(sources=responses, count=len(responses)) @@ -112,11 +133,12 @@ async def get_color_strip_source( source_id: str, _auth: AuthRequired, store: ColorStripStore = Depends(get_color_strip_store), + manager: ProcessorManager = Depends(get_processor_manager), ): """Get a color strip source by ID.""" try: source = store.get_source(source_id) - return _css_to_response(source) + return _css_to_response(source, manager.is_css_overlay_active(source_id)) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) @@ -259,3 +281,67 @@ async def test_css_calibration( except Exception as e: logger.error(f"Failed to set CSS calibration test mode: {e}") raise HTTPException(status_code=500, detail=str(e)) + + +# ===== OVERLAY VISUALIZATION ===== + +@router.post("/api/v1/color-strip-sources/{source_id}/overlay/start", tags=["Color Strip Sources"]) +async def start_css_overlay( + source_id: str, + _auth: AuthRequired, + store: ColorStripStore = Depends(get_color_strip_store), + picture_source_store: PictureSourceStore = Depends(get_picture_source_store), + manager: ProcessorManager = Depends(get_processor_manager), +): + """Start screen overlay visualization for a color strip source.""" + try: + source = store.get_source(source_id) + if not isinstance(source, PictureColorStripSource): + raise HTTPException(status_code=400, detail="Overlay only supported for picture color strip sources") + if not source.calibration: + raise HTTPException(status_code=400, detail="Color strip source has no calibration configured") + + display_index = _resolve_display_index(source.picture_source_id, picture_source_store) + displays = get_available_displays() + if not displays: + raise HTTPException(status_code=409, detail="No displays available") + display_index = min(display_index, len(displays) - 1) + display_info = displays[display_index] + + await manager.start_css_overlay(source_id, display_info, source.calibration, source.name) + return {"status": "started", "source_id": source_id} + + except HTTPException: + raise + except RuntimeError as e: + raise HTTPException(status_code=409, detail=str(e)) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Failed to start CSS overlay: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/api/v1/color-strip-sources/{source_id}/overlay/stop", tags=["Color Strip Sources"]) +async def stop_css_overlay( + source_id: str, + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), +): + """Stop screen overlay visualization for a color strip source.""" + try: + await manager.stop_css_overlay(source_id) + return {"status": "stopped", "source_id": source_id} + except Exception as e: + logger.error(f"Failed to stop CSS overlay: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/api/v1/color-strip-sources/{source_id}/overlay/status", tags=["Color Strip Sources"]) +async def get_css_overlay_status( + source_id: str, + _auth: AuthRequired, + manager: ProcessorManager = Depends(get_processor_manager), +): + """Check if overlay is active for a color strip source.""" + return {"source_id": source_id, "active": manager.is_css_overlay_active(source_id)} diff --git a/server/src/wled_controller/api/routes/picture_targets.py b/server/src/wled_controller/api/routes/picture_targets.py index c1b175f..e989d1e 100644 --- a/server/src/wled_controller/api/routes/picture_targets.py +++ b/server/src/wled_controller/api/routes/picture_targets.py @@ -11,6 +11,7 @@ from PIL import Image from wled_controller.api.auth import AuthRequired from wled_controller.api.dependencies import ( + get_color_strip_store, get_device_store, get_pattern_template_store, get_picture_source_store, @@ -40,7 +41,10 @@ from wled_controller.core.capture.screen_capture import ( calculate_average_color, calculate_dominant_color, calculate_median_color, + get_available_displays, ) +from wled_controller.storage.color_strip_store import ColorStripStore +from wled_controller.storage.color_strip_source import PictureColorStripSource from wled_controller.storage import DeviceStore from wled_controller.storage.pattern_template_store import PatternTemplateStore from wled_controller.storage.picture_source import ScreenCapturePictureSource, StaticImagePictureSource @@ -679,6 +683,8 @@ async def start_target_overlay( _auth: AuthRequired, manager: ProcessorManager = Depends(get_processor_manager), target_store: PictureTargetStore = Depends(get_picture_target_store), + color_strip_store: ColorStripStore = Depends(get_color_strip_store), + picture_source_store: PictureSourceStore = Depends(get_picture_source_store), ): """Start screen overlay visualization for a target. @@ -694,7 +700,26 @@ async def start_target_overlay( if not target: raise ValueError(f"Target {target_id} not found") - await manager.start_overlay(target_id, target.name) + # Pre-load calibration and display info from the CSS store so the overlay + # can start even when processing is not currently running. + calibration = None + display_info = None + if isinstance(target, WledPictureTarget) and target.color_strip_source_id: + try: + css = color_strip_store.get_source(target.color_strip_source_id) + if isinstance(css, PictureColorStripSource) and css.calibration: + calibration = css.calibration + # Resolve the display this CSS is capturing + from wled_controller.api.routes.color_strip_sources import _resolve_display_index + display_index = _resolve_display_index(css.picture_source_id, picture_source_store) + displays = get_available_displays() + if displays: + display_index = min(display_index, len(displays) - 1) + display_info = displays[display_index] + except Exception as e: + logger.warning(f"Could not pre-load CSS calibration for overlay on {target_id}: {e}") + + await manager.start_overlay(target_id, target.name, calibration=calibration, display_info=display_info) return {"status": "started", "target_id": target_id} except ValueError as e: diff --git a/server/src/wled_controller/api/schemas/color_strip_sources.py b/server/src/wled_controller/api/schemas/color_strip_sources.py index 7afb5ce..3165b92 100644 --- a/server/src/wled_controller/api/schemas/color_strip_sources.py +++ b/server/src/wled_controller/api/schemas/color_strip_sources.py @@ -57,6 +57,7 @@ class ColorStripSourceResponse(BaseModel): led_count: int = Field(0, description="Total LED count (0 = auto from calibration)") calibration: Optional[Calibration] = Field(None, description="LED calibration") description: Optional[str] = Field(None, description="Description") + overlay_active: bool = Field(False, description="Whether the screen overlay is currently active") created_at: datetime = Field(description="Creation timestamp") updated_at: datetime = Field(description="Last update timestamp") diff --git a/server/src/wled_controller/core/capture/screen_overlay.py b/server/src/wled_controller/core/capture/screen_overlay.py index 2457ac3..c548ffb 100644 --- a/server/src/wled_controller/core/capture/screen_overlay.py +++ b/server/src/wled_controller/core/capture/screen_overlay.py @@ -6,7 +6,7 @@ import sys import threading import time import tkinter as tk -from typing import Dict, List, Tuple, Optional +from typing import Dict, List, Optional, Tuple from wled_controller.core.capture.calibration import CalibrationConfig from wled_controller.core.capture_engines.base import DisplayInfo @@ -14,12 +14,16 @@ from wled_controller.core.capture_engines.base import DisplayInfo logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# OverlayWindow — one Toplevel per target, no own thread/mainloop +# --------------------------------------------------------------------------- + class OverlayWindow: """Transparent overlay window for LED calibration visualization. - Runs in a separate thread with its own Tkinter event loop. - Draws border sampling zones, LED position markers, pixel mapping ranges, - and calibration info text. + Does NOT own a Tcl interpreter or event loop — it is a Toplevel child of + the shared root managed by OverlayManager. All methods must be called + from the Tkinter thread. """ def __init__( @@ -27,176 +31,93 @@ class OverlayWindow: display_info: DisplayInfo, calibration: CalibrationConfig, target_id: str, - target_name: str = None + target_name: str = None, ): - """Initialize overlay for a specific display and calibration. - - Args: - display_info: Display geometry (x, y, width, height) - calibration: LED calibration configuration - target_id: Target ID for logging - target_name: Target friendly name for display - """ self.display_info = display_info self.calibration = calibration self.target_id = target_id self.target_name = target_name or target_id - self.root: Optional[tk.Tk] = None - self.canvas: Optional[tk.Canvas] = None + self._window: Optional[tk.Toplevel] = None + self._canvas: Optional[tk.Canvas] = None self.running = False - self._thread: Optional[threading.Thread] = None - self._stop_event = threading.Event() - self._after_id: Optional[str] = None - def start(self) -> None: - """Start overlay in background thread.""" - if self._thread and self._thread.is_alive(): - raise RuntimeError("Overlay already running") + # ----- Lifecycle (must run in Tk thread) ----- - self._stop_event.clear() - self._thread = threading.Thread( - target=self._run_tkinter_loop, - name=f"Overlay-{self.target_id}", - daemon=True - ) - self._thread.start() + def start(self, root: tk.Tk) -> None: + """Create and show the overlay Toplevel (runs in Tk thread).""" + self._window = tk.Toplevel(root) + self._setup_window() + self._draw_visualization() + self.running = True + logger.info(f"Overlay window started for {self.target_id}") def stop(self) -> None: - """Stop overlay and clean up thread.""" - self._stop_event.set() - # Wait for the tkinter thread to fully exit - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=3.0) - # Give extra time for Tcl interpreter cleanup - import time - time.sleep(0.2) + """Destroy the overlay Toplevel (runs in Tk thread).""" + if self._window is not None: + try: + self._window.destroy() + except Exception as e: + logger.debug(f"Cleanup error for {self.target_id}: {e}") + self._window = None + self._canvas = None + self.running = False + logger.info(f"Overlay window stopped for {self.target_id}") - def _run_tkinter_loop(self) -> None: - """Tkinter event loop (runs in background thread).""" - try: - self.root = tk.Tk() - self._setup_window() - self._draw_visualization() - - # Check stop event periodically - def check_stop(): - if self._stop_event.is_set(): - # Cancel the pending after callback before quitting - if self._after_id: - try: - self.root.after_cancel(self._after_id) - except Exception: - pass - self.root.quit() # Exit mainloop cleanly - else: - self._after_id = self.root.after(100, check_stop) - - self._after_id = self.root.after(100, check_stop) - self.running = True - logger.info(f"Overlay window started for {self.target_id}") - self.root.mainloop() - except Exception as e: - logger.error(f"Overlay error for {self.target_id}: {e}", exc_info=True) - finally: - self.running = False - # Clean up the window properly - if self.root: - try: - # Cancel any remaining after callbacks - if self._after_id: - self.root.after_cancel(self._after_id) - self._after_id = None - # Destroy window and clean up Tcl interpreter - self.root.destroy() - self.root = None - except Exception as e: - logger.debug(f"Cleanup error: {e}") - logger.info(f"Overlay window stopped for {self.target_id}") + # ----- Window setup ----- def _setup_window(self) -> None: - """Configure transparent, frameless, always-on-top window.""" - # Position at display coordinates - geometry = f"{self.display_info.width}x{self.display_info.height}" - geometry += f"+{self.display_info.x}+{self.display_info.y}" - self.root.geometry(geometry) + win = self._window + geometry = ( + f"{self.display_info.width}x{self.display_info.height}" + f"+{self.display_info.x}+{self.display_info.y}" + ) + win.geometry(geometry) + win.overrideredirect(True) + win.attributes("-topmost", True) - # Remove window decorations - self.root.overrideredirect(True) - - # Transparent background and always on top - self.root.attributes('-topmost', True) - - # Create canvas for drawing - self.canvas = tk.Canvas( - self.root, + self._canvas = tk.Canvas( + win, width=self.display_info.width, height=self.display_info.height, highlightthickness=0, - bg='black' + bg="black", ) - self.canvas.pack() + self._canvas.pack() - # Make canvas itself transparent - self.root.wm_attributes('-transparentcolor', 'black') - self.root.attributes('-alpha', 0.85) # Semi-transparent overlay + win.wm_attributes("-transparentcolor", "black") + win.attributes("-alpha", 0.85) - # Windows-specific: make click-through - if sys.platform == 'win32': + if sys.platform == "win32": try: import ctypes - # Get window handle - hwnd = ctypes.windll.user32.GetParent(self.root.winfo_id()) - # Set WS_EX_LAYERED | WS_EX_TRANSPARENT extended styles + hwnd = ctypes.windll.user32.GetParent(win.winfo_id()) style = ctypes.windll.user32.GetWindowLongW(hwnd, -20) # GWL_EXSTYLE style |= 0x80000 | 0x20 # WS_EX_LAYERED | WS_EX_TRANSPARENT ctypes.windll.user32.SetWindowLongW(hwnd, -20, style) except Exception as e: logger.warning(f"Could not set click-through: {e}") + # ----- Drawing ----- + def _draw_visualization(self) -> None: - """Draw all visualization elements on canvas.""" w = self.display_info.width h = self.display_info.height bw = self.calibration.border_width - - # 1. Border sampling zones (colored semi-transparent rectangles) self._draw_border_zones(w, h, bw) - - # 2. LED axes with tick marks self._draw_led_axes(w, h, bw) - - # 3. Calibration info text overlay self._draw_info_text(w, h) def _draw_border_zones(self, w: int, h: int, bw: int) -> None: - """Draw colored rectangles showing border sampling zones.""" - # Top zone - self.canvas.create_rectangle( - 0, 0, w, bw, - fill='#FF0000', stipple='gray25', outline='#FF0000', width=2 - ) - # Right zone - self.canvas.create_rectangle( - w - bw, 0, w, h, - fill='#00FF00', stipple='gray25', outline='#00FF00', width=2 - ) - # Bottom zone - self.canvas.create_rectangle( - 0, h - bw, w, h, - fill='#0000FF', stipple='gray25', outline='#0000FF', width=2 - ) - # Left zone - self.canvas.create_rectangle( - 0, 0, bw, h, - fill='#FFFF00', stipple='gray25', outline='#FFFF00', width=2 - ) + c = self._canvas + c.create_rectangle(0, 0, w, bw, fill="#FF0000", stipple="gray25", outline="#FF0000", width=2) + c.create_rectangle(w - bw, 0, w, h, fill="#00FF00", stipple="gray25", outline="#00FF00", width=2) + c.create_rectangle(0, h - bw, w, h, fill="#0000FF", stipple="gray25", outline="#0000FF", width=2) + c.create_rectangle(0, 0, bw, h, fill="#FFFF00", stipple="gray25", outline="#FFFF00", width=2) def _draw_led_axes(self, w: int, h: int, bw: int) -> None: - """Draw axes with tick marks showing LED positions.""" segments = self.calibration.segments total_leds = self.calibration.get_total_leds() - # Determine tick interval based on total LEDs if total_leds <= 50: tick_interval = 5 elif total_leds <= 100: @@ -210,333 +131,81 @@ class OverlayWindow: for seg in segments: edge = seg.edge count = seg.led_count - reverse = seg.reverse - - # Get span for this edge span_start, span_end = self.calibration.get_edge_span(edge) - # Draw axis line and tick marks for this edge - if edge == 'top': + if edge == "top": edge_len = w start_px = span_start * edge_len end_px = span_end * edge_len y_axis = bw / 2 - - # Draw axis line - self.canvas.create_line( - start_px, y_axis, end_px, y_axis, - fill='white', width=3 - ) - - # Draw tick marks + self._canvas.create_line(start_px, y_axis, end_px, y_axis, fill="white", width=3) for i in range(count): - display_index = (led_index + self.calibration.offset) % total_leds - - if i == 0 or i == count - 1 or display_index % tick_interval == 0: + display_idx = (led_index + self.calibration.offset) % total_leds + if i == 0 or i == count - 1 or display_idx % tick_interval == 0: frac = i / count if count > 1 else 0.5 x = start_px + frac * (end_px - start_px) - - # Tick mark - tick_len = 15 if display_index % tick_interval == 0 else 10 - self.canvas.create_line( - x, y_axis - tick_len, x, y_axis + tick_len, - fill='white', width=2 - ) - - # Label with background (positioned INSIDE screen) + tick_len = 15 if display_idx % tick_interval == 0 else 10 + self._canvas.create_line(x, y_axis - tick_len, x, y_axis + tick_len, fill="white", width=2) label_y = bw + 20 - self.canvas.create_rectangle( - x - 30, label_y - 12, x + 30, label_y + 12, - fill='white', outline='black', width=2 - ) - self.canvas.create_text( - x, label_y, - text=str(display_index), - fill='black', - font=('Arial', 13, 'bold') - ) - + self._canvas.create_rectangle(x - 30, label_y - 12, x + 30, label_y + 12, fill="white", outline="black", width=2) + self._canvas.create_text(x, label_y, text=str(display_idx), fill="black", font=("Arial", 13, "bold")) led_index += 1 - elif edge == 'bottom': + elif edge == "bottom": edge_len = w start_px = span_start * edge_len end_px = span_end * edge_len y_axis = h - bw / 2 - - # Draw axis line - self.canvas.create_line( - start_px, y_axis, end_px, y_axis, - fill='white', width=3 - ) - - # Draw tick marks + self._canvas.create_line(start_px, y_axis, end_px, y_axis, fill="white", width=3) for i in range(count): - display_index = (led_index + self.calibration.offset) % total_leds - - if i == 0 or i == count - 1 or display_index % tick_interval == 0: + display_idx = (led_index + self.calibration.offset) % total_leds + if i == 0 or i == count - 1 or display_idx % tick_interval == 0: frac = i / count if count > 1 else 0.5 x = start_px + frac * (end_px - start_px) - - # Tick mark - tick_len = 15 if display_index % tick_interval == 0 else 10 - self.canvas.create_line( - x, y_axis - tick_len, x, y_axis + tick_len, - fill='white', width=2 - ) - - # Label with background (positioned INSIDE screen) + tick_len = 15 if display_idx % tick_interval == 0 else 10 + self._canvas.create_line(x, y_axis - tick_len, x, y_axis + tick_len, fill="white", width=2) label_y = h - bw - 20 - self.canvas.create_rectangle( - x - 30, label_y - 12, x + 30, label_y + 12, - fill='white', outline='black', width=2 - ) - self.canvas.create_text( - x, label_y, - text=str(display_index), - fill='black', - font=('Arial', 13, 'bold') - ) - + self._canvas.create_rectangle(x - 30, label_y - 12, x + 30, label_y + 12, fill="white", outline="black", width=2) + self._canvas.create_text(x, label_y, text=str(display_idx), fill="black", font=("Arial", 13, "bold")) led_index += 1 - elif edge == 'left': + elif edge == "left": edge_len = h start_px = span_start * edge_len end_px = span_end * edge_len x_axis = bw / 2 - - # Draw axis line - self.canvas.create_line( - x_axis, start_px, x_axis, end_px, - fill='white', width=3 - ) - - # Draw tick marks + self._canvas.create_line(x_axis, start_px, x_axis, end_px, fill="white", width=3) for i in range(count): - display_index = (led_index + self.calibration.offset) % total_leds - - if i == 0 or i == count - 1 or display_index % tick_interval == 0: + display_idx = (led_index + self.calibration.offset) % total_leds + if i == 0 or i == count - 1 or display_idx % tick_interval == 0: frac = i / count if count > 1 else 0.5 y = start_px + frac * (end_px - start_px) - - # Tick mark - tick_len = 15 if display_index % tick_interval == 0 else 10 - self.canvas.create_line( - x_axis - tick_len, y, x_axis + tick_len, y, - fill='white', width=2 - ) - - # Label with background (positioned INSIDE screen) + tick_len = 15 if display_idx % tick_interval == 0 else 10 + self._canvas.create_line(x_axis - tick_len, y, x_axis + tick_len, y, fill="white", width=2) label_x = bw + 40 - self.canvas.create_rectangle( - label_x - 30, y - 12, label_x + 30, y + 12, - fill='white', outline='black', width=2 - ) - self.canvas.create_text( - label_x, y, - text=str(display_index), - fill='black', - font=('Arial', 13, 'bold') - ) - + self._canvas.create_rectangle(label_x - 30, y - 12, label_x + 30, y + 12, fill="white", outline="black", width=2) + self._canvas.create_text(label_x, y, text=str(display_idx), fill="black", font=("Arial", 13, "bold")) led_index += 1 - elif edge == 'right': + elif edge == "right": edge_len = h start_px = span_start * edge_len end_px = span_end * edge_len x_axis = w - bw / 2 - - # Draw axis line - self.canvas.create_line( - x_axis, start_px, x_axis, end_px, - fill='white', width=3 - ) - - # Draw tick marks + self._canvas.create_line(x_axis, start_px, x_axis, end_px, fill="white", width=3) for i in range(count): - display_index = (led_index + self.calibration.offset) % total_leds - - if i == 0 or i == count - 1 or display_index % tick_interval == 0: + display_idx = (led_index + self.calibration.offset) % total_leds + if i == 0 or i == count - 1 or display_idx % tick_interval == 0: frac = i / count if count > 1 else 0.5 y = start_px + frac * (end_px - start_px) - - # Tick mark - tick_len = 15 if display_index % tick_interval == 0 else 10 - self.canvas.create_line( - x_axis - tick_len, y, x_axis + tick_len, y, - fill='white', width=2 - ) - - # Label with background (positioned INSIDE screen) + tick_len = 15 if display_idx % tick_interval == 0 else 10 + self._canvas.create_line(x_axis - tick_len, y, x_axis + tick_len, y, fill="white", width=2) label_x = w - bw - 40 - self.canvas.create_rectangle( - label_x - 30, y - 12, label_x + 30, y + 12, - fill='white', outline='black', width=2 - ) - self.canvas.create_text( - label_x, y, - text=str(display_index), - fill='black', - font=('Arial', 13, 'bold') - ) - + self._canvas.create_rectangle(label_x - 30, y - 12, label_x + 30, y + 12, fill="white", outline="black", width=2) + self._canvas.create_text(label_x, y, text=str(display_idx), fill="black", font=("Arial", 13, "bold")) led_index += 1 - def _calculate_led_positions( - self, - segment, - w: int, h: int, bw: int - ) -> List[Tuple[float, float]]: - """Calculate (x, y) positions for each LED in a segment.""" - edge = segment.edge - count = segment.led_count - reverse = segment.reverse - - # Get span for this edge - span_start, span_end = self.calibration.get_edge_span(edge) - - positions = [] - - if edge == 'top': - edge_len = w - start_px = span_start * edge_len - end_px = span_end * edge_len - y = bw / 2 # Middle of border zone - - for i in range(count): - frac = i / count if count > 1 else 0.5 - x = start_px + frac * (end_px - start_px) - positions.append((x, y)) - - elif edge == 'bottom': - edge_len = w - start_px = span_start * edge_len - end_px = span_end * edge_len - y = h - bw / 2 - - for i in range(count): - frac = i / count if count > 1 else 0.5 - x = start_px + frac * (end_px - start_px) - positions.append((x, y)) - - elif edge == 'left': - edge_len = h - start_px = span_start * edge_len - end_px = span_end * edge_len - x = bw / 2 - - for i in range(count): - frac = i / count if count > 1 else 0.5 - y = start_px + frac * (end_px - start_px) - positions.append((x, y)) - - elif edge == 'right': - edge_len = h - start_px = span_start * edge_len - end_px = span_end * edge_len - x = w - bw / 2 - - for i in range(count): - frac = i / count if count > 1 else 0.5 - y = start_px + frac * (end_px - start_px) - positions.append((x, y)) - - if reverse: - positions.reverse() - - return positions - - def _draw_mapping_ranges(self, w: int, h: int, bw: int) -> None: - """Draw colored segments showing pixel ranges for each LED.""" - segments = self.calibration.segments - - # Generate distinct colors for each LED using HSV - total_leds = self.calibration.get_total_leds() - colors = [] - for i in range(total_leds): - hue = (i / total_leds) * 360 - colors.append(self._hsv_to_rgb(hue, 0.7, 0.9)) - - led_index = 0 - for seg in segments: - ranges = self._calculate_pixel_ranges(seg, w, h) - - for i, (x1, y1, x2, y2) in enumerate(ranges): - color = colors[led_index % len(colors)] - - # Draw line with LED's color - self.canvas.create_line( - x1, y1, x2, y2, - fill=color, width=3, capstyle='round' - ) - - led_index += 1 - - def _calculate_pixel_ranges( - self, - segment, - w: int, h: int - ) -> List[Tuple[float, float, float, float]]: - """Calculate pixel range boundaries for each LED in segment. - - Returns list of (x1, y1, x2, y2) line coordinates. - """ - edge = segment.edge - count = segment.led_count - span_start, span_end = self.calibration.get_edge_span(edge) - - ranges = [] - - if edge == 'top': - edge_len = w - start_px = span_start * edge_len - end_px = span_end * edge_len - segment_len = (end_px - start_px) / count if count > 0 else 0 - - for i in range(count): - x1 = start_px + i * segment_len - x2 = start_px + (i + 1) * segment_len - ranges.append((x1, 0, x2, 0)) - - elif edge == 'bottom': - edge_len = w - start_px = span_start * edge_len - end_px = span_end * edge_len - segment_len = (end_px - start_px) / count if count > 0 else 0 - - for i in range(count): - x1 = start_px + i * segment_len - x2 = start_px + (i + 1) * segment_len - ranges.append((x1, h, x2, h)) - - elif edge == 'left': - edge_len = h - start_px = span_start * edge_len - end_px = span_end * edge_len - segment_len = (end_px - start_px) / count if count > 0 else 0 - - for i in range(count): - y1 = start_px + i * segment_len - y2 = start_px + (i + 1) * segment_len - ranges.append((0, y1, 0, y2)) - - elif edge == 'right': - edge_len = h - start_px = span_start * edge_len - end_px = span_end * edge_len - segment_len = (end_px - start_px) / count if count > 0 else 0 - - for i in range(count): - y1 = start_px + i * segment_len - y2 = start_px + (i + 1) * segment_len - ranges.append((w, y1, w, y2)) - - return ranges - def _draw_info_text(self, w: int, h: int) -> None: - """Draw calibration info text overlay.""" info_lines = [ f"Target: {self.target_name}", f"Total LEDs: {self.calibration.get_total_leds()}", @@ -550,91 +219,121 @@ class OverlayWindow: f"Bottom: {self.calibration.leds_bottom} LEDs", f"Left: {self.calibration.leds_left} LEDs", ] - - # Draw background box with better readability text_x = 30 text_y = 30 line_height = 28 padding = 20 box_width = 320 box_height = len(info_lines) * line_height + padding * 2 - - # Solid dark background with border - self.canvas.create_rectangle( + self._canvas.create_rectangle( text_x - padding, text_y - padding, text_x + box_width, text_y + box_height, - fill='#1a1a1a', outline='white', width=3 + fill="#1a1a1a", outline="white", width=3, ) - - # Draw text lines with better spacing for i, line in enumerate(info_lines): - self.canvas.create_text( + self._canvas.create_text( text_x, text_y + i * line_height, - text=line, - anchor='nw', - fill='yellow', - font=('Arial', 14, 'bold') + text=line, anchor="nw", fill="yellow", font=("Arial", 14, "bold"), ) @staticmethod def _hsv_to_rgb(h: float, s: float, v: float) -> str: - """Convert HSV to hex RGB color string.""" r, g, b = colorsys.hsv_to_rgb(h / 360, s, v) return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}" +# --------------------------------------------------------------------------- +# OverlayManager — single persistent Tk root, all ops via root.after() +# --------------------------------------------------------------------------- + class OverlayManager: - """Manages overlay windows across multiple targets.""" + """Manages overlay windows. Uses one persistent hidden Tk root so that + Toplevel windows can be created/destroyed repeatedly without crashing + the Tcl interpreter.""" def __init__(self): self._overlays: Dict[str, OverlayWindow] = {} self._lock = threading.Lock() + self._tk_root: Optional[tk.Tk] = None + self._tk_thread: Optional[threading.Thread] = None + self._tk_ready = threading.Event() + self._start_tk_thread() + + # ----- Internal: persistent Tk thread ----- + + def _start_tk_thread(self) -> None: + def _run(): + try: + self._tk_root = tk.Tk() + self._tk_root.withdraw() # invisible root — never shown + self._tk_ready.set() + self._tk_root.mainloop() + except Exception as e: + logger.error(f"Tkinter thread crashed: {e}", exc_info=True) + self._tk_ready.set() # unblock waiters even on failure + + self._tk_thread = threading.Thread( + target=_run, name="TkOverlayRoot", daemon=True + ) + self._tk_thread.start() + self._tk_ready.wait(timeout=5.0) + + def _run_in_tk(self, fn) -> None: + """Schedule *fn* on the Tk thread and block until it completes.""" + if self._tk_root is None: + raise RuntimeError("Tkinter root not available") + done = threading.Event() + exc_box: List[Optional[BaseException]] = [None] + + def wrapper(): + try: + fn() + except Exception as e: + exc_box[0] = e + finally: + done.set() + + self._tk_root.after(0, wrapper) + done.wait(timeout=5.0) + if exc_box[0] is not None: + raise exc_box[0] + + # ----- Public API ----- def start_overlay( self, target_id: str, display_info: DisplayInfo, calibration: CalibrationConfig, - target_name: str = None + target_name: str = None, ) -> None: - """Start overlay for a target.""" with self._lock: if target_id in self._overlays: raise RuntimeError(f"Overlay already running for {target_id}") overlay = OverlayWindow(display_info, calibration, target_id, target_name) - overlay.start() - - # Wait for overlay to initialize - timeout = 3.0 - start = time.time() - while not overlay.running and time.time() - start < timeout: - time.sleep(0.1) + self._run_in_tk(lambda: overlay.start(self._tk_root)) if not overlay.running: - overlay.stop() - raise RuntimeError("Overlay failed to start within timeout") + raise RuntimeError(f"Overlay failed to start for {target_id}") self._overlays[target_id] = overlay logger.info(f"Started overlay for target {target_id}") def stop_overlay(self, target_id: str) -> None: - """Stop overlay for a target.""" with self._lock: overlay = self._overlays.pop(target_id, None) if overlay: - overlay.stop() + self._run_in_tk(overlay.stop) logger.info(f"Stopped overlay for target {target_id}") def is_running(self, target_id: str) -> bool: - """Check if overlay is running for a target.""" with self._lock: return target_id in self._overlays def stop_all(self) -> None: - """Stop all overlays.""" with self._lock: for target_id in list(self._overlays.keys()): overlay = self._overlays.pop(target_id) - overlay.stop() + self._run_in_tk(overlay.stop) logger.info("Stopped all overlays") diff --git a/server/src/wled_controller/core/processing/processor_manager.py b/server/src/wled_controller/core/processing/processor_manager.py index 5864861..0cd5e25 100644 --- a/server/src/wled_controller/core/processing/processor_manager.py +++ b/server/src/wled_controller/core/processing/processor_manager.py @@ -435,16 +435,18 @@ class ProcessorManager: "left": [255, 255, 0], } - async def start_overlay(self, target_id: str, target_name: str = None) -> None: + async def start_overlay(self, target_id: str, target_name: str = None, calibration=None, display_info=None) -> None: proc = self._get_processor(target_id) if not proc.supports_overlay(): raise ValueError(f"Target {target_id} does not support overlays") - await proc.start_overlay(target_name) + await proc.start_overlay(target_name, calibration=calibration, display_info=display_info) - # Light up device LEDs with edge test colors while overlay is visible - if proc.device_id is not None and not proc.is_running: + # Light up device LEDs with edge test colors while overlay is visible. + # Always do this regardless of whether processing is running — the processing + # loop pauses itself when test_mode_active is set. + if proc.device_id is not None: try: - await self.set_test_mode(proc.device_id, self._OVERLAY_EDGE_COLORS) + await self.set_test_mode(proc.device_id, self._OVERLAY_EDGE_COLORS, calibration) except Exception as e: logger.warning(f"Failed to set edge test for overlay on {proc.device_id}: {e}") @@ -452,8 +454,8 @@ class ProcessorManager: proc = self._get_processor(target_id) await proc.stop_overlay() - # Clear device LEDs when overlay is dismissed - if proc.device_id is not None and not proc.is_running: + # Clear device LEDs when overlay is dismissed. + if proc.device_id is not None: try: await self.set_test_mode(proc.device_id, {}) except Exception as e: @@ -462,6 +464,23 @@ class ProcessorManager: def is_overlay_active(self, target_id: str) -> bool: return self._get_processor(target_id).is_overlay_active() + # ===== CSS OVERLAY (direct, no target processor required) ===== + + async def start_css_overlay(self, css_id: str, display_info, calibration, css_name: str = None) -> None: + await asyncio.to_thread( + self._overlay_manager.start_overlay, + css_id, display_info, calibration, css_name, + ) + + async def stop_css_overlay(self, css_id: str) -> None: + await asyncio.to_thread( + self._overlay_manager.stop_overlay, + css_id, + ) + + def is_css_overlay_active(self, css_id: str) -> bool: + return self._overlay_manager.is_running(css_id) + # ===== WEBSOCKET (delegates to processor) ===== def add_kc_ws_client(self, target_id: str, ws) -> None: @@ -508,7 +527,8 @@ class ProcessorManager: ds.test_mode_edges = {} ds.test_calibration = None await self._send_clear_pixels(device_id) - await self._close_idle_client(device_id) + # Keep idle client open — serial reconnect causes device reset. + # start_processing() closes it before connecting its own client. async def _get_idle_client(self, device_id: str): """Get or create a cached idle LED client for a device. diff --git a/server/src/wled_controller/core/processing/wled_target_processor.py b/server/src/wled_controller/core/processing/wled_target_processor.py index 8daa9cd..a6cc88a 100644 --- a/server/src/wled_controller/core/processing/wled_target_processor.py +++ b/server/src/wled_controller/core/processing/wled_target_processor.py @@ -268,30 +268,34 @@ class WledTargetProcessor(TargetProcessor): def supports_overlay(self) -> bool: return True - async def start_overlay(self, target_name: Optional[str] = None) -> None: + async def start_overlay(self, target_name: Optional[str] = None, calibration=None, display_info=None) -> None: if self._overlay_active: raise RuntimeError(f"Overlay already active for {self._target_id}") - # Calibration comes from the active color strip stream - if self._color_strip_stream is None: - raise ValueError( - f"Cannot start overlay for {self._target_id}: no color strip stream active. " - f"Start processing first." - ) + if calibration is None or display_info is None: + # Calibration comes from the active color strip stream + if self._color_strip_stream is None: + raise ValueError( + f"Cannot start overlay for {self._target_id}: no color strip stream active " + f"and no calibration provided." + ) - calibration = self._color_strip_stream.calibration - display_index = self._resolved_display_index - if display_index is None: - display_index = self._color_strip_stream.display_index + if calibration is None: + calibration = self._color_strip_stream.calibration - if display_index is None or display_index < 0: - raise ValueError(f"Invalid display index {display_index} for overlay") + if display_info is None: + display_index = self._resolved_display_index + if display_index is None: + display_index = self._color_strip_stream.display_index - displays = get_available_displays() - if display_index >= len(displays): - raise ValueError(f"Invalid display index {display_index}") + if display_index is None or display_index < 0: + raise ValueError(f"Invalid display index {display_index} for overlay") - display_info = displays[display_index] + displays = get_available_displays() + if display_index >= len(displays): + raise ValueError(f"Invalid display index {display_index}") + + display_info = displays[display_index] await asyncio.to_thread( self._ctx.overlay_manager.start_overlay, diff --git a/server/src/wled_controller/static/js/features/color-strips.js b/server/src/wled_controller/static/js/features/color-strips.js index 949f5bd..2a14833 100644 --- a/server/src/wled_controller/static/js/features/color-strips.js +++ b/server/src/wled_controller/static/js/features/color-strips.js @@ -222,3 +222,41 @@ export async function deleteColorStrip(cssId) { showToast('Failed to delete color strip source', 'error'); } } + +/* ── Overlay ──────────────────────────────────────────────────── */ + +export async function startCSSOverlay(cssId) { + try { + const response = await fetchWithAuth(`/color-strip-sources/${cssId}/overlay/start`, { + method: 'POST', + }); + if (response.ok) { + showToast(t('overlay.started'), 'success'); + if (window.loadTargetsTab) window.loadTargetsTab(); + } else { + const error = await response.json(); + showToast(t('overlay.error.start') + ': ' + error.detail, 'error'); + } + } catch (error) { + if (error.isAuth) return; + showToast(t('overlay.error.start'), 'error'); + } +} + +export async function stopCSSOverlay(cssId) { + try { + const response = await fetchWithAuth(`/color-strip-sources/${cssId}/overlay/stop`, { + method: 'POST', + }); + if (response.ok) { + showToast(t('overlay.stopped'), 'success'); + if (window.loadTargetsTab) window.loadTargetsTab(); + } else { + const error = await response.json(); + showToast(t('overlay.error.stop') + ': ' + error.detail, 'error'); + } + } catch (error) { + if (error.isAuth) return; + showToast(t('overlay.error.stop'), 'error'); + } +}