fix(shutdown): survive PC restart with WAL fsync + Win32 session-end guard

Two bugs caused user data ('G502' target's color-strip ref, etc.) to
revert after PC restart while persisting fine across normal app
restarts:

1. SQLite was in WAL mode with synchronous=NORMAL and Database.close()
   was never called. On graceful Python exit the sqlite3 finalizer
   checkpoints the WAL, but on an unclean PC shutdown (power loss,
   forced reboot, or Windows force-terminating pythonw.exe) the WAL
   stayed in OS cache, never reached disk, and the next boot rolled the
   DB back to the last checkpoint -- losing recent edits.

2. Nothing handled WM_QUERYENDSESSION / WM_ENDSESSION, so on PC
   shutdown Windows force-killed pythonw.exe after ~5s and the FastAPI
   lifespan never ran. The 'stop_targets' setting was silently ignored
   and devices were left at their last frame.

Changes:
- Database: PRAGMA synchronous=FULL + wal_autocheckpoint=100, plus an
  explicit wal_checkpoint(TRUNCATE) inside Database.close().
- New utils/win_shutdown.py: hidden top-level window in a daemon thread
  with a ctypes WindowProc that catches WM_QUERYENDSESSION (calls
  ShutdownBlockReasonCreate to extend Windows' 5s hung-app timeout up
  to the ~20s GUI ceiling), fires the shutdown callback, then waits in
  WM_ENDSESSION on a completion event before returning. Also raises
  the process shutdown priority via SetProcessShutdownParameters. All
  Win32 argtypes/restypes are bound once at import to avoid LPARAM
  overflow on x64.
- New shutdown_state.py: leaf module owning the cross-thread Event so
  __main__ does not import the heavy ledgrab.main at startup.
- main.py lifespan: per-step asyncio.wait_for budgets (8s for
  processor_manager.stop_all, 1.5s each for HA/MQTT, etc.) so a hung
  device cannot starve the DB checkpoint, then db.close() and
  shutdown_complete.set() always run.
- __main__.py: install the Windows shutdown guard before tray start;
  install SIGINT/SIGTERM/SIGBREAK handlers only on the tray path
  (uvicorn overwrites them on no-tray); raise server_thread.join to 20s.
- Tests cover WM_QUERYENDSESSION (fires callback, returns TRUE,
  idempotent), WM_ENDSESSION (waits on event, times out cleanly,
  cancel-path returns instantly), signal handler installation, and
  that main and shutdown_state share the same Event instance.
This commit is contained in:
2026-05-22 21:43:41 +03:00
parent e4bf58da19
commit e24f9d33cc
7 changed files with 876 additions and 42 deletions
+76 -2
View File
@@ -6,6 +6,7 @@ shows a system-tray icon with **Show UI** / **Exit** actions.
import asyncio import asyncio
import os import os
import signal
import socket import socket
import sys import sys
import threading import threading
@@ -42,6 +43,8 @@ from ledgrab.config import get_config # noqa: E402
from ledgrab.server_ref import set_server, set_tray # noqa: E402 from ledgrab.server_ref import set_server, set_tray # noqa: E402
from ledgrab.tray import PYSTRAY_AVAILABLE, TrayManager # noqa: E402 from ledgrab.tray import PYSTRAY_AVAILABLE, TrayManager # noqa: E402
from ledgrab.utils import setup_logging, get_logger # noqa: E402 from ledgrab.utils import setup_logging, get_logger # noqa: E402
from ledgrab.utils.platform import is_windows # noqa: E402
from ledgrab.utils.win_shutdown import WindowsShutdownGuard # noqa: E402
setup_logging() setup_logging()
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -117,10 +120,22 @@ def main() -> None:
server = uvicorn.Server(uv_config) server = uvicorn.Server(uv_config)
set_server(server) set_server(server)
# Wire the OS-shutdown safety net. The lifespan in ``ledgrab.main`` signals
# ``shutdown_complete`` once it has stopped targets and checkpointed the
# DB; the Windows guard waits on that event before letting the OS finish
# ending the session. Without this, the entire shutdown lifespan never
# runs on PC reboot — devices stay on and the SQLite WAL is lost.
guard = _install_os_shutdown_guard(server)
use_tray = PYSTRAY_AVAILABLE and (sys.platform == "win32" or _force_tray()) use_tray = PYSTRAY_AVAILABLE and (sys.platform == "win32" or _force_tray())
if use_tray: if use_tray:
logger.info("Starting with system tray icon") logger.info("Starting with system tray icon")
# Install signal handlers BEFORE starting the uvicorn thread so a
# SIGINT/SIGBREAK during startup still triggers a clean shutdown.
# We do NOT install them on the no-tray path because uvicorn's
# ``server.run()`` overwrites SIGINT/SIGTERM with its own handlers.
_install_signal_handlers(server)
# Uvicorn in a background thread # Uvicorn in a background thread
server_thread = threading.Thread( server_thread = threading.Thread(
@@ -147,12 +162,20 @@ def main() -> None:
set_tray(tray) set_tray(tray)
tray.run() tray.run()
# Tray exited — wait for server to finish its graceful shutdown # Tray exited — wait for server to finish its graceful shutdown.
server_thread.join(timeout=10) # Use a longer join than the lifespan's own ~18 s budget so we don't
# cut the DB checkpoint short on a slow disk.
server_thread.join(timeout=20)
if guard is not None:
guard.stop()
else: else:
if not PYSTRAY_AVAILABLE: if not PYSTRAY_AVAILABLE:
logger.info("System tray not available (install pystray for tray support)") logger.info("System tray not available (install pystray for tray support)")
try:
server.run() server.run()
finally:
if guard is not None:
guard.stop()
def _request_shutdown(server: uvicorn.Server) -> None: def _request_shutdown(server: uvicorn.Server) -> None:
@@ -160,6 +183,57 @@ def _request_shutdown(server: uvicorn.Server) -> None:
server.should_exit = True server.should_exit = True
def _install_os_shutdown_guard(server: uvicorn.Server) -> "WindowsShutdownGuard | None":
"""Install the OS-shutdown safety net (Windows only).
Returns the guard so the caller can ``stop()`` it on normal exit, or
``None`` on platforms where no guard is needed.
"""
if not is_windows():
return None
# ``shutdown_state`` is a leaf module — importing it does NOT pull in
# ``ledgrab.main`` and its global stores. uvicorn loads ``main`` lazily
# via the import string ``"ledgrab.main:app"`` once it starts serving.
from ledgrab.shutdown_state import shutdown_complete
guard = WindowsShutdownGuard(
on_shutdown=lambda: _request_shutdown(server),
shutdown_complete=shutdown_complete,
)
if guard.start():
logger.info("Windows shutdown guard installed")
else:
logger.warning("Windows shutdown guard failed to start")
return guard
def _install_signal_handlers(server: uvicorn.Server) -> None:
"""Catch terminal/admin shutdown signals and trigger graceful exit.
Uvicorn already installs SIGINT/SIGTERM handlers when ``server.run()``
is called on the main thread (the no-tray path). For the tray path,
uvicorn runs on a background thread and skips signal installation, so
we install our own here. SIGBREAK is Windows-specific and fires on
Ctrl-Break and in some service-stop scenarios.
"""
def _handler(signum, frame): # noqa: ANN001 - signal handler signature
logger.warning("Signal %s received — requesting shutdown", signum)
_request_shutdown(server)
candidates = ["SIGINT", "SIGTERM", "SIGBREAK"]
for name in candidates:
sig = getattr(signal, name, None)
if sig is None:
continue
try:
signal.signal(sig, _handler)
except (ValueError, OSError) as e:
# ValueError: not on main thread; OSError: signal not supported here.
logger.debug("Could not install handler for %s: %s", name, e)
def _force_tray() -> bool: def _force_tray() -> bool:
"""Allow forcing tray on non-Windows via LEDGRAB_TRAY=1.""" """Allow forcing tray on non-Windows via LEDGRAB_TRAY=1."""
import os import os
+108 -36
View File
@@ -1,8 +1,10 @@
"""FastAPI application entry point.""" """FastAPI application entry point."""
import asyncio
import sys import sys
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
from typing import Awaitable
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
@@ -50,6 +52,7 @@ import ledgrab.core.game_integration.adapters # noqa: F401 — register built-i
from ledgrab.core.game_integration.community_loader import register_community_adapters from ledgrab.core.game_integration.community_loader import register_community_adapters
from ledgrab.core.mqtt.mqtt_manager import MQTTManager from ledgrab.core.mqtt.mqtt_manager import MQTTManager
from ledgrab.storage.mqtt_source_store import MQTTSourceStore from ledgrab.storage.mqtt_source_store import MQTTSourceStore
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.storage.audio_processing_template_store import AudioProcessingTemplateStore from ledgrab.storage.audio_processing_template_store import AudioProcessingTemplateStore
from ledgrab.storage.pattern_template_store import PatternTemplateStore from ledgrab.storage.pattern_template_store import PatternTemplateStore
import ledgrab.core.audio.filters # noqa: F401 — trigger audio filter auto-registration import ledgrab.core.audio.filters # noqa: F401 — trigger audio filter auto-registration
@@ -69,6 +72,10 @@ logger = get_logger(__name__)
# Get configuration # Get configuration
config = get_config() config = get_config()
# The shutdown-complete signal is owned by a leaf module so ``__main__``
# can import it without dragging in this module's heavy global state.
from ledgrab.shutdown_state import shutdown_complete # noqa: E402
def _migrate_legacy_data_location() -> None: def _migrate_legacy_data_location() -> None:
"""Rescue data from pre-rename cwd-relative paths. """Rescue data from pre-rename cwd-relative paths.
@@ -166,6 +173,7 @@ ha_store = HomeAssistantStore(db)
ha_manager = HomeAssistantManager(ha_store) ha_manager = HomeAssistantManager(ha_store)
mqtt_source_store = MQTTSourceStore(db) mqtt_source_store = MQTTSourceStore(db)
mqtt_manager = MQTTManager(mqtt_source_store) mqtt_manager = MQTTManager(mqtt_source_store)
http_endpoint_store = HTTPEndpointStore(db)
audio_processing_template_store = AudioProcessingTemplateStore(db) audio_processing_template_store = AudioProcessingTemplateStore(db)
game_integration_store = GameIntegrationStore(db) game_integration_store = GameIntegrationStore(db)
pattern_template_store = PatternTemplateStore(db) pattern_template_store = PatternTemplateStore(db)
@@ -191,6 +199,7 @@ processor_manager = ProcessorManager(
mqtt_manager=mqtt_manager, mqtt_manager=mqtt_manager,
game_event_bus=game_event_bus, game_event_bus=game_event_bus,
audio_processing_template_store=audio_processing_template_store, audio_processing_template_store=audio_processing_template_store,
http_endpoint_store=http_endpoint_store,
) )
) )
@@ -247,7 +256,9 @@ async def lifespan(app: FastAPI):
except Exception as e: except Exception as e:
logger.error("Legacy MQTT migration failed: %s", e) logger.error("Legacy MQTT migration failed: %s", e)
# Create automation engine (needs processor_manager + MQTT manager + stores for scene activation) # Create automation engine. HTTPPollRule evaluation reads from a
# ValueStream produced by the ValueStreamManager (which lives inside
# the processor manager), so the engine needs that handle.
automation_engine = AutomationEngine( automation_engine = AutomationEngine(
automation_store, automation_store,
processor_manager, processor_manager,
@@ -256,6 +267,8 @@ async def lifespan(app: FastAPI):
device_store=device_store, device_store=device_store,
ha_manager=ha_manager, ha_manager=ha_manager,
mqtt_manager=mqtt_manager, mqtt_manager=mqtt_manager,
value_stream_manager=processor_manager.value_stream_manager,
value_source_store=value_source_store,
) )
# Create auto-backup engine — derive paths from database location so that # Create auto-backup engine — derive paths from database location so that
@@ -309,6 +322,7 @@ async def lifespan(app: FastAPI):
game_event_bus=game_event_bus, game_event_bus=game_event_bus,
mqtt_store=mqtt_source_store, mqtt_store=mqtt_source_store,
mqtt_manager=mqtt_manager, mqtt_manager=mqtt_manager,
http_endpoint_store=http_endpoint_store,
audio_processing_template_store=audio_processing_template_store, audio_processing_template_store=audio_processing_template_store,
pattern_template_store=pattern_template_store, pattern_template_store=pattern_template_store,
) )
@@ -385,28 +399,39 @@ async def lifespan(app: FastAPI):
yield yield
# Shutdown # Shutdown
#
# Each step has a strict time budget. Windows gives a GUI app with a
# shutdown-block-reason set ~20 s before it force-terminates the
# process; if any single step stalls (network call to a dead WLED, a
# zombie MQTT broker), we MUST keep moving so the steps that actually
# protect the user's state — device restore frames and the DB
# checkpoint — still get to run.
logger.info("Shutting down LED Grab") logger.info("Shutting down LED Grab")
# Persist all stores to disk before stopping anything. async def _bounded(label: str, coro: Awaitable, timeout: float) -> None:
# This ensures in-memory data survives force-kills and restarts try:
# where no CRUD happened during the session. await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
logger.error("Shutdown step '%s' exceeded %.1fs — moving on", label, timeout)
except Exception as e:
logger.error("Shutdown step '%s' raised: %s", label, e)
# Legacy hook — SQLite stores are write-through so this only logs.
# Durability comes from PRAGMA synchronous=FULL + the explicit
# wal_checkpoint(TRUNCATE) in Database.close() at the end of this block.
try:
_save_all_stores() _save_all_stores()
except Exception as e:
logger.error(f"Error persisting stores: {e}")
# Stop automation engine first so it can no longer activate scenes that # Stop automation engine first so it can no longer activate scenes that
# would talk to processors mid-shutdown. # would talk to processors mid-shutdown.
try: await _bounded("automation_engine.stop", automation_engine.stop(), timeout=1.5)
await automation_engine.stop()
logger.info("Stopped automation engine")
except Exception as e:
logger.error(f"Error stopping automation engine: {e}")
# Stop discovery watcher and OS notification listener so they stop # Stop discovery watcher and OS notification listener so they stop
# firing events into a shutting-down processor manager. # firing events into a shutting-down processor manager.
if discovery_watcher is not None: if discovery_watcher is not None:
try: await _bounded("discovery_watcher.stop", discovery_watcher.stop(), timeout=1.0)
await discovery_watcher.stop()
except Exception as e:
logger.error(f"Error stopping discovery watcher: {e}")
try: try:
os_notif_listener.stop() os_notif_listener.stop()
@@ -432,22 +457,18 @@ async def lifespan(app: FastAPI):
action = "stop_targets" action = "stop_targets"
logger.info("Shutdown action: %s", action) logger.info("Shutdown action: %s", action)
try: # This is the step that *implements* the user's stop_targets setting.
await processor_manager.stop_all(restore_devices=action != "nothing") # Give it the largest slice of the budget.
await _bounded(
"processor_manager.stop_all",
processor_manager.stop_all(restore_devices=action != "nothing"),
timeout=8.0,
)
logger.info("Stopped all processors") logger.info("Stopped all processors")
except Exception as e:
logger.error(f"Error stopping processors: {e}")
# Now safe to tear down the connections that processors depended on. # Now safe to tear down the connections that processors depended on.
try: await _bounded("ha_manager.shutdown", ha_manager.shutdown(), timeout=1.5)
await ha_manager.shutdown() await _bounded("mqtt_manager.shutdown", mqtt_manager.shutdown(), timeout=1.5)
except Exception as e:
logger.error(f"Error stopping Home Assistant manager: {e}")
try:
await mqtt_manager.shutdown()
except Exception as e:
logger.error(f"Error stopping MQTT manager: {e}")
# Independent services — order doesn't matter relative to processors. # Independent services — order doesn't matter relative to processors.
try: try:
@@ -455,26 +476,37 @@ async def lifespan(app: FastAPI):
except Exception as e: except Exception as e:
logger.error(f"Error stopping weather manager: {e}") logger.error(f"Error stopping weather manager: {e}")
await _bounded("update_service.stop", update_service.stop(), timeout=0.5)
await _bounded("auto_backup_engine.stop", auto_backup_engine.stop(), timeout=0.5)
# Close the DB last so it runs a TRUNCATE checkpoint, flushing the WAL
# into the main file. Without this, writes can survive a graceful app
# restart (Python finalizer checkpoints on GC) but be lost on a later
# unclean PC shutdown — the symptom users see as "my fix reverted after
# rebooting the PC."
try: try:
await update_service.stop() db.close()
except Exception as e: except Exception as e:
logger.error(f"Error stopping update checker: {e}") logger.error(f"Error closing database: {e}")
try: # Tell any external supervisor (Windows shutdown guard, tray) that
await auto_backup_engine.stop() # cleanup is done so Windows can finish ending the session promptly.
except Exception as e: shutdown_complete.set()
logger.error(f"Error stopping auto-backup engine: {e}") logger.info("Shutdown complete")
# Create FastAPI application # Create FastAPI application. The built-in ``/docs``, ``/redoc``, and
# ``/openapi.json`` routes are disabled here so they can be re-added below
# with an :data:`AuthRequired` dependency — exposing the full OpenAPI surface
# (route paths + parameter schemas) without auth is information disclosure.
app = FastAPI( app = FastAPI(
title="LED Grab", title="LED Grab",
description="Control WLED devices based on screen content for ambient lighting", description="Control WLED devices based on screen content for ambient lighting",
version=__version__, version=__version__,
lifespan=lifespan, lifespan=lifespan,
docs_url="/docs", docs_url=None,
redoc_url="/redoc", redoc_url=None,
openapi_url="/openapi.json", openapi_url=None,
) )
# Configure CORS # Configure CORS
@@ -521,6 +553,46 @@ async def _no_cache_static(request: Request, call_next):
return await call_next(request) return await call_next(request)
# Middleware: baseline security headers on every response. CSP is intentionally
# omitted here because the UI uses inline event handlers / templates and a
# wrong CSP value would break the app; the other three headers are universally
# safe defaults and close several common browser-side attack vectors.
@app.middleware("http")
async def _security_headers(request: Request, call_next):
response = await call_next(request)
response.headers.setdefault("X-Content-Type-Options", "nosniff")
response.headers.setdefault("X-Frame-Options", "DENY")
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
response.headers.setdefault(
"Permissions-Policy",
"geolocation=(), microphone=(), camera=(), payment=()",
)
return response
# ── Auth-gated OpenAPI surface ────────────────────────────────────────────
# Re-add the docs endpoints we disabled above, now protected by the same
# Bearer auth as the rest of the API. When auth is unconfigured, loopback
# clients still get in anonymously (per ``verify_api_key`` policy).
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # noqa: E402
from ledgrab.api.auth import AuthRequired # noqa: E402
@app.get("/openapi.json", include_in_schema=False)
async def _openapi(_auth: AuthRequired):
return JSONResponse(app.openapi())
@app.get("/docs", include_in_schema=False)
async def _swagger_docs(_auth: AuthRequired):
return get_swagger_ui_html(openapi_url="/openapi.json", title=f"{app.title} — API docs")
@app.get("/redoc", include_in_schema=False)
async def _redoc_docs(_auth: AuthRequired):
return get_redoc_html(openapi_url="/openapi.json", title=f"{app.title} — API docs")
# Mount static files # Mount static files
static_path = Path(__file__).parent / "static" static_path = Path(__file__).parent / "static"
if static_path.exists(): if static_path.exists():
+18
View File
@@ -0,0 +1,18 @@
"""Cross-thread shutdown completion signal.
This module is intentionally tiny so importing it does not pull in the
heavy global state (Database, stores, processor manager) instantiated at
import time by ``ledgrab.main``. ``__main__`` imports it on the main
thread before uvicorn loads ``ledgrab.main`` in its event-loop thread;
both ends share the same ``threading.Event`` instance.
The lifespan in ``ledgrab.main`` calls ``shutdown_complete.set()`` at the
very end of its teardown sequence (after stopping targets, flushing
stores, and checkpointing the DB). External supervisors — the Windows
OS-shutdown guard and the tray's "Shutdown" handler — wait on it so
they release Windows / unblock only once cleanup is genuinely done.
"""
import threading
shutdown_complete: threading.Event = threading.Event()
+19 -1
View File
@@ -57,6 +57,7 @@ _ENTITY_TABLES = [
"assets", "assets",
"home_assistant_sources", "home_assistant_sources",
"mqtt_sources", "mqtt_sources",
"http_endpoints",
"game_integrations", "game_integrations",
"audio_processing_templates", "audio_processing_templates",
"pattern_templates", "pattern_templates",
@@ -88,6 +89,14 @@ class Database:
) )
self._conn.row_factory = sqlite3.Row self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA journal_mode=WAL")
# synchronous=FULL fsyncs the WAL on every commit. Without it, writes
# can be lost on an unclean PC shutdown (power loss, forced reboot):
# the WAL stays in OS cache, never reaches disk, and the next startup
# rolls back to the last checkpoint — silently losing recent edits.
self._conn.execute("PRAGMA synchronous=FULL")
# Auto-checkpoint the WAL into the main DB every N pages so the
# window of unsynced data stays small even if close() is skipped.
self._conn.execute("PRAGMA wal_autocheckpoint=100")
self._conn.execute("PRAGMA busy_timeout=5000") self._conn.execute("PRAGMA busy_timeout=5000")
self._lock = threading.RLock() self._lock = threading.RLock()
@@ -336,7 +345,16 @@ class Database:
# -- Lifecycle ----------------------------------------------------------- # -- Lifecycle -----------------------------------------------------------
def close(self) -> None: def close(self) -> None:
"""Close the database connection.""" """Close the database connection.
Runs a TRUNCATE checkpoint before closing so the WAL is fully merged
into the main DB file. This protects against data loss if the OS
loses the WAL between graceful app shutdown and a later PC shutdown.
"""
with self._lock: with self._lock:
try:
self._conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
except sqlite3.Error as e:
logger.warning("WAL checkpoint on close failed: %s", e)
self._conn.close() self._conn.close()
logger.info("Database connection closed") logger.info("Database connection closed")
+417
View File
@@ -0,0 +1,417 @@
"""Windows OS-shutdown handler.
Without this, ``pythonw.exe -m ledgrab`` is force-terminated by Windows at
PC shutdown/restart/logoff: the FastAPI ``lifespan`` shutdown hook never
runs, so the ``stop_targets`` setting is silently ignored and the SQLite
WAL is never checkpointed (the user's most recent edits roll back on the
next boot).
How it works
------------
The fix is a hidden, top-level window in a daemon thread:
* Top-level — message-only windows (``HWND_MESSAGE``) do **not** receive
shutdown broadcasts. We need a real top-level window. We make it
invisible by never calling ``ShowWindow`` and by using
``WS_EX_TOOLWINDOW`` (no taskbar entry, no Alt-Tab).
* ``WM_QUERYENDSESSION`` arrives first. We immediately call
``ShutdownBlockReasonCreate`` so Windows shows our reason on the
shutdown UI **and** extends its hung-app timeout (default ~5 s) up to
the GUI ceiling (~20 s). We then trigger the caller's shutdown
callback (which sets ``uvicorn.Server.should_exit``).
* ``WM_ENDSESSION`` arrives second. We wait on a ``threading.Event``
that the FastAPI lifespan sets when it is fully torn down (including
the DB checkpoint), then destroy the block reason and return.
* ``SetProcessShutdownParameters`` raises our shutdown priority so we
are notified before non-system apps.
This module is a no-op on non-Windows platforms.
"""
from __future__ import annotations
import ctypes
import logging
import threading
from ctypes import wintypes
from typing import Callable, Optional
from ledgrab.utils.platform import is_windows
logger = logging.getLogger(__name__)
# -- Win32 constants ---------------------------------------------------------
_WM_QUERYENDSESSION = 0x0011
_WM_ENDSESSION = 0x0016
_WM_CLOSE = 0x0010
_WM_DESTROY = 0x0002
_CS_HREDRAW = 0x0002
_CS_VREDRAW = 0x0001
_WS_OVERLAPPED = 0x00000000
_WS_EX_TOOLWINDOW = 0x00000080
_WS_EX_NOACTIVATE = 0x08000000
# SetProcessShutdownParameters: higher value = notified earlier. The default
# for user apps is 0x280. We use 0x300 so we shut down before normal apps but
# still inside the user-app range (0x1000x3FF).
_SHUTDOWN_PRIORITY = 0x300
# Bound how long we wait inside WM_ENDSESSION for the lifespan to finish.
# Windows' GUI-app ceiling (with a shutdown block reason set) is 20 s; leave
# a safety margin so we always destroy the block reason and return cleanly.
_SHUTDOWN_WAIT_SECONDS = 18.0
# -- Win32 typedefs ----------------------------------------------------------
# LRESULT / LPARAM are LONG_PTR (signed, pointer-sized: 32-bit on x86, 64-bit
# on x64). wintypes.LPARAM is already c_ssize_t, but there is no LRESULT in
# wintypes — we define it here so the WNDPROC return value is the right size.
LRESULT = ctypes.c_ssize_t
# -- WindowProc signature ----------------------------------------------------
_WNDPROC = ctypes.WINFUNCTYPE(
LRESULT,
wintypes.HWND,
wintypes.UINT,
wintypes.WPARAM,
wintypes.LPARAM,
)
class _WNDCLASS(ctypes.Structure):
_fields_ = [
("style", wintypes.UINT),
("lpfnWndProc", _WNDPROC),
("cbClsExtra", ctypes.c_int),
("cbWndExtra", ctypes.c_int),
("hInstance", wintypes.HINSTANCE),
("hIcon", wintypes.HICON),
("hCursor", wintypes.HANDLE),
("hbrBackground", wintypes.HBRUSH),
("lpszMenuName", wintypes.LPCWSTR),
("lpszClassName", wintypes.LPCWSTR),
]
class _MSG(ctypes.Structure):
_fields_ = [
("hwnd", wintypes.HWND),
("message", wintypes.UINT),
("wParam", wintypes.WPARAM),
("lParam", wintypes.LPARAM),
("time", wintypes.DWORD),
("pt_x", wintypes.LONG),
("pt_y", wintypes.LONG),
]
def _bind_winapi() -> None:
"""Declare argtypes/restype for every Win32 function we call.
Without these, ctypes treats integer args as ``c_int`` (32-bit) — which
silently overflows when Windows passes a 64-bit LPARAM into our
WindowProc and we hand it back to ``DefWindowProcW``. The result is
``OverflowError: int too long to convert`` and the message is dropped.
Binding types once at import is the safe, idempotent fix.
"""
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
user32.DefWindowProcW.argtypes = [
wintypes.HWND,
wintypes.UINT,
wintypes.WPARAM,
wintypes.LPARAM,
]
user32.DefWindowProcW.restype = LRESULT
user32.GetMessageW.argtypes = [
ctypes.POINTER(_MSG),
wintypes.HWND,
wintypes.UINT,
wintypes.UINT,
]
user32.GetMessageW.restype = wintypes.BOOL
user32.TranslateMessage.argtypes = [ctypes.POINTER(_MSG)]
user32.TranslateMessage.restype = wintypes.BOOL
user32.DispatchMessageW.argtypes = [ctypes.POINTER(_MSG)]
user32.DispatchMessageW.restype = LRESULT
user32.PostMessageW.argtypes = [
wintypes.HWND,
wintypes.UINT,
wintypes.WPARAM,
wintypes.LPARAM,
]
user32.PostMessageW.restype = wintypes.BOOL
user32.PostQuitMessage.argtypes = [ctypes.c_int]
user32.PostQuitMessage.restype = None
user32.RegisterClassW.argtypes = [ctypes.POINTER(_WNDCLASS)]
user32.RegisterClassW.restype = wintypes.ATOM
user32.CreateWindowExW.argtypes = [
wintypes.DWORD,
wintypes.LPCWSTR,
wintypes.LPCWSTR,
wintypes.DWORD,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
wintypes.HWND,
wintypes.HMENU,
wintypes.HINSTANCE,
wintypes.LPVOID,
]
user32.CreateWindowExW.restype = wintypes.HWND
user32.ShutdownBlockReasonCreate.argtypes = [wintypes.HWND, wintypes.LPCWSTR]
user32.ShutdownBlockReasonCreate.restype = wintypes.BOOL
user32.ShutdownBlockReasonDestroy.argtypes = [wintypes.HWND]
user32.ShutdownBlockReasonDestroy.restype = wintypes.BOOL
kernel32.GetModuleHandleW.argtypes = [wintypes.LPCWSTR]
kernel32.GetModuleHandleW.restype = wintypes.HMODULE
kernel32.SetProcessShutdownParameters.argtypes = [wintypes.DWORD, wintypes.DWORD]
kernel32.SetProcessShutdownParameters.restype = wintypes.BOOL
if is_windows():
_bind_winapi()
class WindowsShutdownGuard:
"""Catches Windows session-end notifications and runs a cleanup callback.
Usage::
complete = threading.Event()
guard = WindowsShutdownGuard(
on_shutdown=lambda: server.should_exit_set_true(),
shutdown_complete=complete,
)
guard.start()
... # app runs
# When your cleanup is finished, signal so Windows can finish ending the session.
complete.set()
"""
def __init__(
self,
on_shutdown: Callable[[], None],
shutdown_complete: threading.Event,
app_name: str = "LED Grab",
wait_seconds: float = _SHUTDOWN_WAIT_SECONDS,
) -> None:
self._on_shutdown = on_shutdown
self._shutdown_complete = shutdown_complete
self._app_name = app_name
self._wait_seconds = wait_seconds
self._hwnd: Optional[wintypes.HWND] = None
self._thread: Optional[threading.Thread] = None
self._ready = threading.Event()
self._fired = False # idempotency: only trigger on_shutdown once
# We must keep a strong reference to the WindowProc callback for the
# entire lifetime of the window — Win32 stores a raw pointer and
# ctypes will free it the moment the Python object is GC'd, which
# would crash inside the message pump.
self._wndproc_ref = _WNDPROC(self._wndproc)
# -- public API ----------------------------------------------------------
def start(self) -> bool:
"""Start the shutdown-guard thread. Returns False on non-Windows."""
if not is_windows():
return False
try:
self._raise_shutdown_priority()
except Exception as e: # pragma: no cover - best-effort
logger.warning("SetProcessShutdownParameters failed: %s", e)
self._thread = threading.Thread(
target=self._run,
name="windows-shutdown-guard",
daemon=True,
)
self._thread.start()
# Wait briefly so callers can rely on the window existing before they
# return from start(). If the thread fails to create the window we
# still return — the failure is logged from inside the thread.
self._ready.wait(timeout=2.0)
return self._hwnd is not None
def stop(self) -> None:
"""Tear down the hidden window. Safe to call from any thread."""
hwnd = self._hwnd
if hwnd is None:
return
try:
ctypes.windll.user32.PostMessageW(hwnd, _WM_CLOSE, 0, 0)
except Exception: # pragma: no cover - shutdown best-effort
pass
# -- internals -----------------------------------------------------------
@staticmethod
def _raise_shutdown_priority() -> None:
kernel32 = ctypes.windll.kernel32
if not kernel32.SetProcessShutdownParameters(_SHUTDOWN_PRIORITY, 0):
# Failure is benign — the WM_ENDSESSION path still works without
# an elevated priority, we'd just be notified slightly later in
# the shutdown ordering. Don't even warn.
err = ctypes.get_last_error()
logger.debug("SetProcessShutdownParameters returned 0 (err=%d)", err)
def _run(self) -> None:
try:
self._create_window()
except Exception:
logger.exception("Failed to create Windows shutdown-guard window")
self._ready.set()
return
self._ready.set()
self._pump_messages()
def _create_window(self) -> None:
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
h_instance = kernel32.GetModuleHandleW(None)
class_name = f"LedGrabShutdownGuard_{id(self)}"
wc = _WNDCLASS()
wc.style = _CS_HREDRAW | _CS_VREDRAW
wc.lpfnWndProc = self._wndproc_ref
wc.cbClsExtra = 0
wc.cbWndExtra = 0
wc.hInstance = h_instance
wc.hIcon = None
wc.hCursor = None
wc.hbrBackground = None
wc.lpszMenuName = None
wc.lpszClassName = class_name
atom = user32.RegisterClassW(ctypes.byref(wc))
if not atom:
err = ctypes.get_last_error()
raise OSError(f"RegisterClassW failed (err={err})")
# Top-level (parent=NULL), but invisible and off-screen, no taskbar
# entry, and not focusable. Crucially NOT HWND_MESSAGE — message-only
# windows do not receive shutdown broadcasts.
hwnd = user32.CreateWindowExW(
_WS_EX_TOOLWINDOW | _WS_EX_NOACTIVATE,
class_name,
self._app_name,
_WS_OVERLAPPED,
-32000, # off-screen x
-32000, # off-screen y
0, # width
0, # height
None, # parent
None, # menu
h_instance,
None, # lpParam
)
if not hwnd:
err = ctypes.get_last_error()
raise OSError(f"CreateWindowExW failed (err={err})")
self._hwnd = hwnd
logger.info("Windows shutdown guard ready (hwnd=%s)", hwnd)
def _pump_messages(self) -> None:
user32 = ctypes.windll.user32
msg = _MSG()
while True:
ret = user32.GetMessageW(ctypes.byref(msg), None, 0, 0)
if ret == 0: # WM_QUIT
break
if ret == -1: # error
logger.error("GetMessageW returned -1; exiting shutdown guard")
break
user32.TranslateMessage(ctypes.byref(msg))
user32.DispatchMessageW(ctypes.byref(msg))
# -- WindowProc ----------------------------------------------------------
def _wndproc(
self,
hwnd: wintypes.HWND,
msg: wintypes.UINT,
wparam: wintypes.WPARAM,
lparam: wintypes.LPARAM,
) -> int:
user32 = ctypes.windll.user32
if msg == _WM_QUERYENDSESSION:
logger.warning("WM_QUERYENDSESSION received (lParam=0x%x) — beginning shutdown", lparam)
self._begin_shutdown(hwnd)
return 1 # TRUE — allow the session to end
if msg == _WM_ENDSESSION:
logger.warning("WM_ENDSESSION received (wParam=%s)", wparam)
# wParam=0 means the session was cancelled; nothing to clean up.
if wparam:
self._wait_for_cleanup(hwnd)
return 0
if msg == _WM_DESTROY:
user32.PostQuitMessage(0)
return 0
return user32.DefWindowProcW(hwnd, msg, wparam, lparam)
def _begin_shutdown(self, hwnd: wintypes.HWND) -> None:
if self._fired:
return
self._fired = True
try:
ctypes.windll.user32.ShutdownBlockReasonCreate(
hwnd,
f"{self._app_name} is stopping LED targets and saving settings…",
)
except Exception as e: # pragma: no cover - best-effort
logger.warning("ShutdownBlockReasonCreate failed: %s", e)
try:
self._on_shutdown()
except Exception:
logger.exception("on_shutdown callback raised")
def _wait_for_cleanup(self, hwnd: wintypes.HWND) -> None:
try:
completed = self._shutdown_complete.wait(timeout=self._wait_seconds)
if not completed:
logger.error(
"Shutdown cleanup did not complete within %.0fs — proceeding anyway",
self._wait_seconds,
)
finally:
try:
ctypes.windll.user32.ShutdownBlockReasonDestroy(hwnd)
except Exception: # pragma: no cover - best-effort
pass
__all__ = ["WindowsShutdownGuard"]
+67
View File
@@ -0,0 +1,67 @@
"""Tests for the ``__main__`` entry-point helpers.
These cover the bits that aren't exercised by the FastAPI test client —
the signal-handler install path and the shutdown-state plumbing — so a
regression in the launcher can't silently break the user's
"stop targets on PC shutdown" guarantee.
"""
from __future__ import annotations
import signal
import threading
from types import SimpleNamespace
from ledgrab.__main__ import _install_signal_handlers, _request_shutdown
def test_request_shutdown_sets_should_exit() -> None:
server = SimpleNamespace(should_exit=False)
_request_shutdown(server)
assert server.should_exit is True
def test_install_signal_handlers_installs_for_known_signals() -> None:
"""Tray path runs uvicorn on a background thread, so our handlers must
actually survive — verify each catchable signal is replaced.
"""
server = SimpleNamespace(should_exit=False)
previous = {
name: signal.getsignal(getattr(signal, name))
for name in ("SIGINT",)
if hasattr(signal, name)
}
try:
_install_signal_handlers(server)
for name in ("SIGINT", "SIGTERM", "SIGBREAK"):
sig = getattr(signal, name, None)
if sig is None:
continue
current = signal.getsignal(sig)
# The handler is our local closure — its qualname starts with the function it's defined in.
assert callable(current), f"{name} handler should be installed"
assert getattr(current, "__qualname__", "").startswith(
"_install_signal_handlers"
), f"{name} should be replaced by our handler, got {current!r}"
finally:
# Restore original handlers so the rest of the test suite isn't poisoned.
for name, handler in previous.items():
signal.signal(getattr(signal, name), handler)
def test_shutdown_state_is_shared_threading_event() -> None:
"""``__main__`` and ``main`` must share the same Event instance — if a
fresh one is constructed on either side, WM_ENDSESSION waits forever.
"""
from ledgrab.shutdown_state import shutdown_complete as state_event
assert isinstance(state_event, threading.Event)
# If main.py is importable, confirm it re-exports the same object.
try:
from ledgrab.main import shutdown_complete as main_event
except Exception:
return # main.py needs full app state — fine to skip on a bare test run.
assert main_event is state_event, "main.py must re-export the same Event, not create a new one"
+168
View File
@@ -0,0 +1,168 @@
"""Tests for the Windows shutdown guard.
The guard is a no-op outside Windows, so the cross-platform tests just
check that ``start()`` returns ``False`` and never touches Win32.
On Windows we exercise the full WM_QUERYENDSESSION → WM_ENDSESSION
sequence end-to-end by ``SendMessage``-ing the hidden window directly:
the guard should fire the callback synchronously, then block in
WM_ENDSESSION until the completion event is signalled.
"""
from __future__ import annotations
import sys
import threading
import time
import pytest
from ledgrab.utils.win_shutdown import WindowsShutdownGuard
IS_WINDOWS = sys.platform == "win32"
@pytest.mark.skipif(IS_WINDOWS, reason="Non-Windows behaviour")
def test_start_returns_false_off_windows() -> None:
guard = WindowsShutdownGuard(
on_shutdown=lambda: None,
shutdown_complete=threading.Event(),
)
assert guard.start() is False
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
def test_start_creates_hidden_window() -> None:
guard = WindowsShutdownGuard(
on_shutdown=lambda: None,
shutdown_complete=threading.Event(),
)
try:
assert guard.start() is True
assert guard._hwnd is not None
finally:
guard.stop()
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
def test_query_endsession_fires_callback_and_returns_true() -> None:
import ctypes
WM_QUERYENDSESSION = 0x0011
fired: list[str] = []
complete = threading.Event()
guard = WindowsShutdownGuard(
on_shutdown=lambda: fired.append("cb"),
shutdown_complete=complete,
wait_seconds=0.5,
)
try:
assert guard.start() is True
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_QUERYENDSESSION, 0, 0)
assert result == 1, "WM_QUERYENDSESSION must return TRUE so Windows ends the session"
assert fired == ["cb"], "shutdown callback should fire exactly once on WM_QUERYENDSESSION"
finally:
guard.stop()
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
def test_query_endsession_is_idempotent() -> None:
"""Two WM_QUERYENDSESSION messages must not run the callback twice."""
import ctypes
WM_QUERYENDSESSION = 0x0011
fired: list[str] = []
guard = WindowsShutdownGuard(
on_shutdown=lambda: fired.append("cb"),
shutdown_complete=threading.Event(),
wait_seconds=0.5,
)
try:
assert guard.start() is True
ctypes.windll.user32.SendMessageW(guard._hwnd, WM_QUERYENDSESSION, 0, 0)
ctypes.windll.user32.SendMessageW(guard._hwnd, WM_QUERYENDSESSION, 0, 0)
assert fired == ["cb"]
finally:
guard.stop()
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
def test_endsession_waits_for_completion_event() -> None:
import ctypes
WM_ENDSESSION = 0x0016
complete = threading.Event()
guard = WindowsShutdownGuard(
on_shutdown=lambda: None,
shutdown_complete=complete,
wait_seconds=2.0,
)
try:
assert guard.start() is True
def signal_after(delay: float) -> None:
time.sleep(delay)
complete.set()
threading.Thread(target=signal_after, args=(0.2,), daemon=True).start()
t0 = time.monotonic()
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_ENDSESSION, 1, 0)
elapsed = time.monotonic() - t0
assert result == 0
assert (
0.15 < elapsed < 1.0
), f"WM_ENDSESSION should wait for completion, took {elapsed:.2f}s"
finally:
guard.stop()
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
def test_endsession_gives_up_after_timeout() -> None:
"""If cleanup never finishes, WM_ENDSESSION must still return — Windows
will hard-kill us otherwise."""
import ctypes
WM_ENDSESSION = 0x0016
guard = WindowsShutdownGuard(
on_shutdown=lambda: None,
shutdown_complete=threading.Event(), # never set
wait_seconds=0.3,
)
try:
assert guard.start() is True
t0 = time.monotonic()
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_ENDSESSION, 1, 0)
elapsed = time.monotonic() - t0
assert result == 0
assert (
0.25 < elapsed < 1.0
), f"WM_ENDSESSION must time out near wait_seconds, took {elapsed:.2f}s"
finally:
guard.stop()
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
def test_endsession_with_cancel_does_not_wait() -> None:
"""wParam=0 on WM_ENDSESSION means the session was cancelled — no cleanup needed."""
import ctypes
WM_ENDSESSION = 0x0016
guard = WindowsShutdownGuard(
on_shutdown=lambda: None,
shutdown_complete=threading.Event(), # never set
wait_seconds=5.0,
)
try:
assert guard.start() is True
t0 = time.monotonic()
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_ENDSESSION, 0, 0)
elapsed = time.monotonic() - t0
assert result == 0
assert elapsed < 0.2, f"WM_ENDSESSION with wParam=0 should be instant, took {elapsed:.2f}s"
finally:
guard.stop()