fix(shutdown): survive PC restart with WAL fsync + Win32 session-end guard
Two bugs caused user data ('G502' target's color-strip ref, etc.) to
revert after PC restart while persisting fine across normal app
restarts:
1. SQLite was in WAL mode with synchronous=NORMAL and Database.close()
was never called. On graceful Python exit the sqlite3 finalizer
checkpoints the WAL, but on an unclean PC shutdown (power loss,
forced reboot, or Windows force-terminating pythonw.exe) the WAL
stayed in OS cache, never reached disk, and the next boot rolled the
DB back to the last checkpoint -- losing recent edits.
2. Nothing handled WM_QUERYENDSESSION / WM_ENDSESSION, so on PC
shutdown Windows force-killed pythonw.exe after ~5s and the FastAPI
lifespan never ran. The 'stop_targets' setting was silently ignored
and devices were left at their last frame.
Changes:
- Database: PRAGMA synchronous=FULL + wal_autocheckpoint=100, plus an
explicit wal_checkpoint(TRUNCATE) inside Database.close().
- New utils/win_shutdown.py: hidden top-level window in a daemon thread
with a ctypes WindowProc that catches WM_QUERYENDSESSION (calls
ShutdownBlockReasonCreate to extend Windows' 5s hung-app timeout up
to the ~20s GUI ceiling), fires the shutdown callback, then waits in
WM_ENDSESSION on a completion event before returning. Also raises
the process shutdown priority via SetProcessShutdownParameters. All
Win32 argtypes/restypes are bound once at import to avoid LPARAM
overflow on x64.
- New shutdown_state.py: leaf module owning the cross-thread Event so
__main__ does not import the heavy ledgrab.main at startup.
- main.py lifespan: per-step asyncio.wait_for budgets (8s for
processor_manager.stop_all, 1.5s each for HA/MQTT, etc.) so a hung
device cannot starve the DB checkpoint, then db.close() and
shutdown_complete.set() always run.
- __main__.py: install the Windows shutdown guard before tray start;
install SIGINT/SIGTERM/SIGBREAK handlers only on the tray path
(uvicorn overwrites them on no-tray); raise server_thread.join to 20s.
- Tests cover WM_QUERYENDSESSION (fires callback, returns TRUE,
idempotent), WM_ENDSESSION (waits on event, times out cleanly,
cancel-path returns instantly), signal handler installation, and
that main and shutdown_state share the same Event instance.
This commit is contained in:
@@ -6,6 +6,7 @@ shows a system-tray icon with **Show UI** / **Exit** actions.
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
@@ -42,6 +43,8 @@ from ledgrab.config import get_config # noqa: E402
|
||||
from ledgrab.server_ref import set_server, set_tray # noqa: E402
|
||||
from ledgrab.tray import PYSTRAY_AVAILABLE, TrayManager # noqa: E402
|
||||
from ledgrab.utils import setup_logging, get_logger # noqa: E402
|
||||
from ledgrab.utils.platform import is_windows # noqa: E402
|
||||
from ledgrab.utils.win_shutdown import WindowsShutdownGuard # noqa: E402
|
||||
|
||||
setup_logging()
|
||||
logger = get_logger(__name__)
|
||||
@@ -117,10 +120,22 @@ def main() -> None:
|
||||
server = uvicorn.Server(uv_config)
|
||||
set_server(server)
|
||||
|
||||
# Wire the OS-shutdown safety net. The lifespan in ``ledgrab.main`` signals
|
||||
# ``shutdown_complete`` once it has stopped targets and checkpointed the
|
||||
# DB; the Windows guard waits on that event before letting the OS finish
|
||||
# ending the session. Without this, the entire shutdown lifespan never
|
||||
# runs on PC reboot — devices stay on and the SQLite WAL is lost.
|
||||
guard = _install_os_shutdown_guard(server)
|
||||
|
||||
use_tray = PYSTRAY_AVAILABLE and (sys.platform == "win32" or _force_tray())
|
||||
|
||||
if use_tray:
|
||||
logger.info("Starting with system tray icon")
|
||||
# Install signal handlers BEFORE starting the uvicorn thread so a
|
||||
# SIGINT/SIGBREAK during startup still triggers a clean shutdown.
|
||||
# We do NOT install them on the no-tray path because uvicorn's
|
||||
# ``server.run()`` overwrites SIGINT/SIGTERM with its own handlers.
|
||||
_install_signal_handlers(server)
|
||||
|
||||
# Uvicorn in a background thread
|
||||
server_thread = threading.Thread(
|
||||
@@ -147,12 +162,20 @@ def main() -> None:
|
||||
set_tray(tray)
|
||||
tray.run()
|
||||
|
||||
# Tray exited — wait for server to finish its graceful shutdown
|
||||
server_thread.join(timeout=10)
|
||||
# Tray exited — wait for server to finish its graceful shutdown.
|
||||
# Use a longer join than the lifespan's own ~18 s budget so we don't
|
||||
# cut the DB checkpoint short on a slow disk.
|
||||
server_thread.join(timeout=20)
|
||||
if guard is not None:
|
||||
guard.stop()
|
||||
else:
|
||||
if not PYSTRAY_AVAILABLE:
|
||||
logger.info("System tray not available (install pystray for tray support)")
|
||||
server.run()
|
||||
try:
|
||||
server.run()
|
||||
finally:
|
||||
if guard is not None:
|
||||
guard.stop()
|
||||
|
||||
|
||||
def _request_shutdown(server: uvicorn.Server) -> None:
|
||||
@@ -160,6 +183,57 @@ def _request_shutdown(server: uvicorn.Server) -> None:
|
||||
server.should_exit = True
|
||||
|
||||
|
||||
def _install_os_shutdown_guard(server: uvicorn.Server) -> "WindowsShutdownGuard | None":
|
||||
"""Install the OS-shutdown safety net (Windows only).
|
||||
|
||||
Returns the guard so the caller can ``stop()`` it on normal exit, or
|
||||
``None`` on platforms where no guard is needed.
|
||||
"""
|
||||
if not is_windows():
|
||||
return None
|
||||
|
||||
# ``shutdown_state`` is a leaf module — importing it does NOT pull in
|
||||
# ``ledgrab.main`` and its global stores. uvicorn loads ``main`` lazily
|
||||
# via the import string ``"ledgrab.main:app"`` once it starts serving.
|
||||
from ledgrab.shutdown_state import shutdown_complete
|
||||
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: _request_shutdown(server),
|
||||
shutdown_complete=shutdown_complete,
|
||||
)
|
||||
if guard.start():
|
||||
logger.info("Windows shutdown guard installed")
|
||||
else:
|
||||
logger.warning("Windows shutdown guard failed to start")
|
||||
return guard
|
||||
|
||||
|
||||
def _install_signal_handlers(server: uvicorn.Server) -> None:
|
||||
"""Catch terminal/admin shutdown signals and trigger graceful exit.
|
||||
|
||||
Uvicorn already installs SIGINT/SIGTERM handlers when ``server.run()``
|
||||
is called on the main thread (the no-tray path). For the tray path,
|
||||
uvicorn runs on a background thread and skips signal installation, so
|
||||
we install our own here. SIGBREAK is Windows-specific and fires on
|
||||
Ctrl-Break and in some service-stop scenarios.
|
||||
"""
|
||||
|
||||
def _handler(signum, frame): # noqa: ANN001 - signal handler signature
|
||||
logger.warning("Signal %s received — requesting shutdown", signum)
|
||||
_request_shutdown(server)
|
||||
|
||||
candidates = ["SIGINT", "SIGTERM", "SIGBREAK"]
|
||||
for name in candidates:
|
||||
sig = getattr(signal, name, None)
|
||||
if sig is None:
|
||||
continue
|
||||
try:
|
||||
signal.signal(sig, _handler)
|
||||
except (ValueError, OSError) as e:
|
||||
# ValueError: not on main thread; OSError: signal not supported here.
|
||||
logger.debug("Could not install handler for %s: %s", name, e)
|
||||
|
||||
|
||||
def _force_tray() -> bool:
|
||||
"""Allow forcing tray on non-Windows via LEDGRAB_TRAY=1."""
|
||||
import os
|
||||
|
||||
+110
-38
@@ -1,8 +1,10 @@
|
||||
"""FastAPI application entry point."""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Awaitable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
@@ -50,6 +52,7 @@ import ledgrab.core.game_integration.adapters # noqa: F401 — register built-i
|
||||
from ledgrab.core.game_integration.community_loader import register_community_adapters
|
||||
from ledgrab.core.mqtt.mqtt_manager import MQTTManager
|
||||
from ledgrab.storage.mqtt_source_store import MQTTSourceStore
|
||||
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||
from ledgrab.storage.audio_processing_template_store import AudioProcessingTemplateStore
|
||||
from ledgrab.storage.pattern_template_store import PatternTemplateStore
|
||||
import ledgrab.core.audio.filters # noqa: F401 — trigger audio filter auto-registration
|
||||
@@ -69,6 +72,10 @@ logger = get_logger(__name__)
|
||||
# Get configuration
|
||||
config = get_config()
|
||||
|
||||
# The shutdown-complete signal is owned by a leaf module so ``__main__``
|
||||
# can import it without dragging in this module's heavy global state.
|
||||
from ledgrab.shutdown_state import shutdown_complete # noqa: E402
|
||||
|
||||
|
||||
def _migrate_legacy_data_location() -> None:
|
||||
"""Rescue data from pre-rename cwd-relative paths.
|
||||
@@ -166,6 +173,7 @@ ha_store = HomeAssistantStore(db)
|
||||
ha_manager = HomeAssistantManager(ha_store)
|
||||
mqtt_source_store = MQTTSourceStore(db)
|
||||
mqtt_manager = MQTTManager(mqtt_source_store)
|
||||
http_endpoint_store = HTTPEndpointStore(db)
|
||||
audio_processing_template_store = AudioProcessingTemplateStore(db)
|
||||
game_integration_store = GameIntegrationStore(db)
|
||||
pattern_template_store = PatternTemplateStore(db)
|
||||
@@ -191,6 +199,7 @@ processor_manager = ProcessorManager(
|
||||
mqtt_manager=mqtt_manager,
|
||||
game_event_bus=game_event_bus,
|
||||
audio_processing_template_store=audio_processing_template_store,
|
||||
http_endpoint_store=http_endpoint_store,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -247,7 +256,9 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as e:
|
||||
logger.error("Legacy MQTT migration failed: %s", e)
|
||||
|
||||
# Create automation engine (needs processor_manager + MQTT manager + stores for scene activation)
|
||||
# Create automation engine. HTTPPollRule evaluation reads from a
|
||||
# ValueStream produced by the ValueStreamManager (which lives inside
|
||||
# the processor manager), so the engine needs that handle.
|
||||
automation_engine = AutomationEngine(
|
||||
automation_store,
|
||||
processor_manager,
|
||||
@@ -256,6 +267,8 @@ async def lifespan(app: FastAPI):
|
||||
device_store=device_store,
|
||||
ha_manager=ha_manager,
|
||||
mqtt_manager=mqtt_manager,
|
||||
value_stream_manager=processor_manager.value_stream_manager,
|
||||
value_source_store=value_source_store,
|
||||
)
|
||||
|
||||
# Create auto-backup engine — derive paths from database location so that
|
||||
@@ -309,6 +322,7 @@ async def lifespan(app: FastAPI):
|
||||
game_event_bus=game_event_bus,
|
||||
mqtt_store=mqtt_source_store,
|
||||
mqtt_manager=mqtt_manager,
|
||||
http_endpoint_store=http_endpoint_store,
|
||||
audio_processing_template_store=audio_processing_template_store,
|
||||
pattern_template_store=pattern_template_store,
|
||||
)
|
||||
@@ -385,28 +399,39 @@ async def lifespan(app: FastAPI):
|
||||
yield
|
||||
|
||||
# Shutdown
|
||||
#
|
||||
# Each step has a strict time budget. Windows gives a GUI app with a
|
||||
# shutdown-block-reason set ~20 s before it force-terminates the
|
||||
# process; if any single step stalls (network call to a dead WLED, a
|
||||
# zombie MQTT broker), we MUST keep moving so the steps that actually
|
||||
# protect the user's state — device restore frames and the DB
|
||||
# checkpoint — still get to run.
|
||||
logger.info("Shutting down LED Grab")
|
||||
|
||||
# Persist all stores to disk before stopping anything.
|
||||
# This ensures in-memory data survives force-kills and restarts
|
||||
# where no CRUD happened during the session.
|
||||
_save_all_stores()
|
||||
async def _bounded(label: str, coro: Awaitable, timeout: float) -> None:
|
||||
try:
|
||||
await asyncio.wait_for(coro, timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Shutdown step '%s' exceeded %.1fs — moving on", label, timeout)
|
||||
except Exception as e:
|
||||
logger.error("Shutdown step '%s' raised: %s", label, e)
|
||||
|
||||
# Legacy hook — SQLite stores are write-through so this only logs.
|
||||
# Durability comes from PRAGMA synchronous=FULL + the explicit
|
||||
# wal_checkpoint(TRUNCATE) in Database.close() at the end of this block.
|
||||
try:
|
||||
_save_all_stores()
|
||||
except Exception as e:
|
||||
logger.error(f"Error persisting stores: {e}")
|
||||
|
||||
# Stop automation engine first so it can no longer activate scenes that
|
||||
# would talk to processors mid-shutdown.
|
||||
try:
|
||||
await automation_engine.stop()
|
||||
logger.info("Stopped automation engine")
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping automation engine: {e}")
|
||||
await _bounded("automation_engine.stop", automation_engine.stop(), timeout=1.5)
|
||||
|
||||
# Stop discovery watcher and OS notification listener so they stop
|
||||
# firing events into a shutting-down processor manager.
|
||||
if discovery_watcher is not None:
|
||||
try:
|
||||
await discovery_watcher.stop()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping discovery watcher: {e}")
|
||||
await _bounded("discovery_watcher.stop", discovery_watcher.stop(), timeout=1.0)
|
||||
|
||||
try:
|
||||
os_notif_listener.stop()
|
||||
@@ -432,22 +457,18 @@ async def lifespan(app: FastAPI):
|
||||
action = "stop_targets"
|
||||
|
||||
logger.info("Shutdown action: %s", action)
|
||||
try:
|
||||
await processor_manager.stop_all(restore_devices=action != "nothing")
|
||||
logger.info("Stopped all processors")
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping processors: {e}")
|
||||
# This is the step that *implements* the user's stop_targets setting.
|
||||
# Give it the largest slice of the budget.
|
||||
await _bounded(
|
||||
"processor_manager.stop_all",
|
||||
processor_manager.stop_all(restore_devices=action != "nothing"),
|
||||
timeout=8.0,
|
||||
)
|
||||
logger.info("Stopped all processors")
|
||||
|
||||
# Now safe to tear down the connections that processors depended on.
|
||||
try:
|
||||
await ha_manager.shutdown()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping Home Assistant manager: {e}")
|
||||
|
||||
try:
|
||||
await mqtt_manager.shutdown()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping MQTT manager: {e}")
|
||||
await _bounded("ha_manager.shutdown", ha_manager.shutdown(), timeout=1.5)
|
||||
await _bounded("mqtt_manager.shutdown", mqtt_manager.shutdown(), timeout=1.5)
|
||||
|
||||
# Independent services — order doesn't matter relative to processors.
|
||||
try:
|
||||
@@ -455,26 +476,37 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping weather manager: {e}")
|
||||
|
||||
await _bounded("update_service.stop", update_service.stop(), timeout=0.5)
|
||||
await _bounded("auto_backup_engine.stop", auto_backup_engine.stop(), timeout=0.5)
|
||||
|
||||
# Close the DB last so it runs a TRUNCATE checkpoint, flushing the WAL
|
||||
# into the main file. Without this, writes can survive a graceful app
|
||||
# restart (Python finalizer checkpoints on GC) but be lost on a later
|
||||
# unclean PC shutdown — the symptom users see as "my fix reverted after
|
||||
# rebooting the PC."
|
||||
try:
|
||||
await update_service.stop()
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping update checker: {e}")
|
||||
logger.error(f"Error closing database: {e}")
|
||||
|
||||
try:
|
||||
await auto_backup_engine.stop()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping auto-backup engine: {e}")
|
||||
# Tell any external supervisor (Windows shutdown guard, tray) that
|
||||
# cleanup is done so Windows can finish ending the session promptly.
|
||||
shutdown_complete.set()
|
||||
logger.info("Shutdown complete")
|
||||
|
||||
|
||||
# Create FastAPI application
|
||||
# Create FastAPI application. The built-in ``/docs``, ``/redoc``, and
|
||||
# ``/openapi.json`` routes are disabled here so they can be re-added below
|
||||
# with an :data:`AuthRequired` dependency — exposing the full OpenAPI surface
|
||||
# (route paths + parameter schemas) without auth is information disclosure.
|
||||
app = FastAPI(
|
||||
title="LED Grab",
|
||||
description="Control WLED devices based on screen content for ambient lighting",
|
||||
version=__version__,
|
||||
lifespan=lifespan,
|
||||
docs_url="/docs",
|
||||
redoc_url="/redoc",
|
||||
openapi_url="/openapi.json",
|
||||
docs_url=None,
|
||||
redoc_url=None,
|
||||
openapi_url=None,
|
||||
)
|
||||
|
||||
# Configure CORS
|
||||
@@ -521,6 +553,46 @@ async def _no_cache_static(request: Request, call_next):
|
||||
return await call_next(request)
|
||||
|
||||
|
||||
# Middleware: baseline security headers on every response. CSP is intentionally
|
||||
# omitted here because the UI uses inline event handlers / templates and a
|
||||
# wrong CSP value would break the app; the other three headers are universally
|
||||
# safe defaults and close several common browser-side attack vectors.
|
||||
@app.middleware("http")
|
||||
async def _security_headers(request: Request, call_next):
|
||||
response = await call_next(request)
|
||||
response.headers.setdefault("X-Content-Type-Options", "nosniff")
|
||||
response.headers.setdefault("X-Frame-Options", "DENY")
|
||||
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
|
||||
response.headers.setdefault(
|
||||
"Permissions-Policy",
|
||||
"geolocation=(), microphone=(), camera=(), payment=()",
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
# ── Auth-gated OpenAPI surface ────────────────────────────────────────────
|
||||
# Re-add the docs endpoints we disabled above, now protected by the same
|
||||
# Bearer auth as the rest of the API. When auth is unconfigured, loopback
|
||||
# clients still get in anonymously (per ``verify_api_key`` policy).
|
||||
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # noqa: E402
|
||||
from ledgrab.api.auth import AuthRequired # noqa: E402
|
||||
|
||||
|
||||
@app.get("/openapi.json", include_in_schema=False)
|
||||
async def _openapi(_auth: AuthRequired):
|
||||
return JSONResponse(app.openapi())
|
||||
|
||||
|
||||
@app.get("/docs", include_in_schema=False)
|
||||
async def _swagger_docs(_auth: AuthRequired):
|
||||
return get_swagger_ui_html(openapi_url="/openapi.json", title=f"{app.title} — API docs")
|
||||
|
||||
|
||||
@app.get("/redoc", include_in_schema=False)
|
||||
async def _redoc_docs(_auth: AuthRequired):
|
||||
return get_redoc_html(openapi_url="/openapi.json", title=f"{app.title} — API docs")
|
||||
|
||||
|
||||
# Mount static files
|
||||
static_path = Path(__file__).parent / "static"
|
||||
if static_path.exists():
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
"""Cross-thread shutdown completion signal.
|
||||
|
||||
This module is intentionally tiny so importing it does not pull in the
|
||||
heavy global state (Database, stores, processor manager) instantiated at
|
||||
import time by ``ledgrab.main``. ``__main__`` imports it on the main
|
||||
thread before uvicorn loads ``ledgrab.main`` in its event-loop thread;
|
||||
both ends share the same ``threading.Event`` instance.
|
||||
|
||||
The lifespan in ``ledgrab.main`` calls ``shutdown_complete.set()`` at the
|
||||
very end of its teardown sequence (after stopping targets, flushing
|
||||
stores, and checkpointing the DB). External supervisors — the Windows
|
||||
OS-shutdown guard and the tray's "Shutdown" handler — wait on it so
|
||||
they release Windows / unblock only once cleanup is genuinely done.
|
||||
"""
|
||||
|
||||
import threading
|
||||
|
||||
shutdown_complete: threading.Event = threading.Event()
|
||||
@@ -57,6 +57,7 @@ _ENTITY_TABLES = [
|
||||
"assets",
|
||||
"home_assistant_sources",
|
||||
"mqtt_sources",
|
||||
"http_endpoints",
|
||||
"game_integrations",
|
||||
"audio_processing_templates",
|
||||
"pattern_templates",
|
||||
@@ -88,6 +89,14 @@ class Database:
|
||||
)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
# synchronous=FULL fsyncs the WAL on every commit. Without it, writes
|
||||
# can be lost on an unclean PC shutdown (power loss, forced reboot):
|
||||
# the WAL stays in OS cache, never reaches disk, and the next startup
|
||||
# rolls back to the last checkpoint — silently losing recent edits.
|
||||
self._conn.execute("PRAGMA synchronous=FULL")
|
||||
# Auto-checkpoint the WAL into the main DB every N pages so the
|
||||
# window of unsynced data stays small even if close() is skipped.
|
||||
self._conn.execute("PRAGMA wal_autocheckpoint=100")
|
||||
self._conn.execute("PRAGMA busy_timeout=5000")
|
||||
self._lock = threading.RLock()
|
||||
|
||||
@@ -336,7 +345,16 @@ class Database:
|
||||
# -- Lifecycle -----------------------------------------------------------
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the database connection."""
|
||||
"""Close the database connection.
|
||||
|
||||
Runs a TRUNCATE checkpoint before closing so the WAL is fully merged
|
||||
into the main DB file. This protects against data loss if the OS
|
||||
loses the WAL between graceful app shutdown and a later PC shutdown.
|
||||
"""
|
||||
with self._lock:
|
||||
try:
|
||||
self._conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
|
||||
except sqlite3.Error as e:
|
||||
logger.warning("WAL checkpoint on close failed: %s", e)
|
||||
self._conn.close()
|
||||
logger.info("Database connection closed")
|
||||
|
||||
@@ -0,0 +1,417 @@
|
||||
"""Windows OS-shutdown handler.
|
||||
|
||||
Without this, ``pythonw.exe -m ledgrab`` is force-terminated by Windows at
|
||||
PC shutdown/restart/logoff: the FastAPI ``lifespan`` shutdown hook never
|
||||
runs, so the ``stop_targets`` setting is silently ignored and the SQLite
|
||||
WAL is never checkpointed (the user's most recent edits roll back on the
|
||||
next boot).
|
||||
|
||||
How it works
|
||||
------------
|
||||
|
||||
The fix is a hidden, top-level window in a daemon thread:
|
||||
|
||||
* Top-level — message-only windows (``HWND_MESSAGE``) do **not** receive
|
||||
shutdown broadcasts. We need a real top-level window. We make it
|
||||
invisible by never calling ``ShowWindow`` and by using
|
||||
``WS_EX_TOOLWINDOW`` (no taskbar entry, no Alt-Tab).
|
||||
* ``WM_QUERYENDSESSION`` arrives first. We immediately call
|
||||
``ShutdownBlockReasonCreate`` so Windows shows our reason on the
|
||||
shutdown UI **and** extends its hung-app timeout (default ~5 s) up to
|
||||
the GUI ceiling (~20 s). We then trigger the caller's shutdown
|
||||
callback (which sets ``uvicorn.Server.should_exit``).
|
||||
* ``WM_ENDSESSION`` arrives second. We wait on a ``threading.Event``
|
||||
that the FastAPI lifespan sets when it is fully torn down (including
|
||||
the DB checkpoint), then destroy the block reason and return.
|
||||
* ``SetProcessShutdownParameters`` raises our shutdown priority so we
|
||||
are notified before non-system apps.
|
||||
|
||||
This module is a no-op on non-Windows platforms.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ctypes
|
||||
import logging
|
||||
import threading
|
||||
from ctypes import wintypes
|
||||
from typing import Callable, Optional
|
||||
|
||||
from ledgrab.utils.platform import is_windows
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -- Win32 constants ---------------------------------------------------------
|
||||
|
||||
_WM_QUERYENDSESSION = 0x0011
|
||||
_WM_ENDSESSION = 0x0016
|
||||
_WM_CLOSE = 0x0010
|
||||
_WM_DESTROY = 0x0002
|
||||
|
||||
_CS_HREDRAW = 0x0002
|
||||
_CS_VREDRAW = 0x0001
|
||||
|
||||
_WS_OVERLAPPED = 0x00000000
|
||||
_WS_EX_TOOLWINDOW = 0x00000080
|
||||
_WS_EX_NOACTIVATE = 0x08000000
|
||||
|
||||
# SetProcessShutdownParameters: higher value = notified earlier. The default
|
||||
# for user apps is 0x280. We use 0x300 so we shut down before normal apps but
|
||||
# still inside the user-app range (0x100–0x3FF).
|
||||
_SHUTDOWN_PRIORITY = 0x300
|
||||
|
||||
# Bound how long we wait inside WM_ENDSESSION for the lifespan to finish.
|
||||
# Windows' GUI-app ceiling (with a shutdown block reason set) is 20 s; leave
|
||||
# a safety margin so we always destroy the block reason and return cleanly.
|
||||
_SHUTDOWN_WAIT_SECONDS = 18.0
|
||||
|
||||
|
||||
# -- Win32 typedefs ----------------------------------------------------------
|
||||
|
||||
# LRESULT / LPARAM are LONG_PTR (signed, pointer-sized: 32-bit on x86, 64-bit
|
||||
# on x64). wintypes.LPARAM is already c_ssize_t, but there is no LRESULT in
|
||||
# wintypes — we define it here so the WNDPROC return value is the right size.
|
||||
LRESULT = ctypes.c_ssize_t
|
||||
|
||||
|
||||
# -- WindowProc signature ----------------------------------------------------
|
||||
|
||||
_WNDPROC = ctypes.WINFUNCTYPE(
|
||||
LRESULT,
|
||||
wintypes.HWND,
|
||||
wintypes.UINT,
|
||||
wintypes.WPARAM,
|
||||
wintypes.LPARAM,
|
||||
)
|
||||
|
||||
|
||||
class _WNDCLASS(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("style", wintypes.UINT),
|
||||
("lpfnWndProc", _WNDPROC),
|
||||
("cbClsExtra", ctypes.c_int),
|
||||
("cbWndExtra", ctypes.c_int),
|
||||
("hInstance", wintypes.HINSTANCE),
|
||||
("hIcon", wintypes.HICON),
|
||||
("hCursor", wintypes.HANDLE),
|
||||
("hbrBackground", wintypes.HBRUSH),
|
||||
("lpszMenuName", wintypes.LPCWSTR),
|
||||
("lpszClassName", wintypes.LPCWSTR),
|
||||
]
|
||||
|
||||
|
||||
class _MSG(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("hwnd", wintypes.HWND),
|
||||
("message", wintypes.UINT),
|
||||
("wParam", wintypes.WPARAM),
|
||||
("lParam", wintypes.LPARAM),
|
||||
("time", wintypes.DWORD),
|
||||
("pt_x", wintypes.LONG),
|
||||
("pt_y", wintypes.LONG),
|
||||
]
|
||||
|
||||
|
||||
def _bind_winapi() -> None:
|
||||
"""Declare argtypes/restype for every Win32 function we call.
|
||||
|
||||
Without these, ctypes treats integer args as ``c_int`` (32-bit) — which
|
||||
silently overflows when Windows passes a 64-bit LPARAM into our
|
||||
WindowProc and we hand it back to ``DefWindowProcW``. The result is
|
||||
``OverflowError: int too long to convert`` and the message is dropped.
|
||||
Binding types once at import is the safe, idempotent fix.
|
||||
"""
|
||||
user32 = ctypes.windll.user32
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
|
||||
user32.DefWindowProcW.argtypes = [
|
||||
wintypes.HWND,
|
||||
wintypes.UINT,
|
||||
wintypes.WPARAM,
|
||||
wintypes.LPARAM,
|
||||
]
|
||||
user32.DefWindowProcW.restype = LRESULT
|
||||
|
||||
user32.GetMessageW.argtypes = [
|
||||
ctypes.POINTER(_MSG),
|
||||
wintypes.HWND,
|
||||
wintypes.UINT,
|
||||
wintypes.UINT,
|
||||
]
|
||||
user32.GetMessageW.restype = wintypes.BOOL
|
||||
|
||||
user32.TranslateMessage.argtypes = [ctypes.POINTER(_MSG)]
|
||||
user32.TranslateMessage.restype = wintypes.BOOL
|
||||
|
||||
user32.DispatchMessageW.argtypes = [ctypes.POINTER(_MSG)]
|
||||
user32.DispatchMessageW.restype = LRESULT
|
||||
|
||||
user32.PostMessageW.argtypes = [
|
||||
wintypes.HWND,
|
||||
wintypes.UINT,
|
||||
wintypes.WPARAM,
|
||||
wintypes.LPARAM,
|
||||
]
|
||||
user32.PostMessageW.restype = wintypes.BOOL
|
||||
|
||||
user32.PostQuitMessage.argtypes = [ctypes.c_int]
|
||||
user32.PostQuitMessage.restype = None
|
||||
|
||||
user32.RegisterClassW.argtypes = [ctypes.POINTER(_WNDCLASS)]
|
||||
user32.RegisterClassW.restype = wintypes.ATOM
|
||||
|
||||
user32.CreateWindowExW.argtypes = [
|
||||
wintypes.DWORD,
|
||||
wintypes.LPCWSTR,
|
||||
wintypes.LPCWSTR,
|
||||
wintypes.DWORD,
|
||||
ctypes.c_int,
|
||||
ctypes.c_int,
|
||||
ctypes.c_int,
|
||||
ctypes.c_int,
|
||||
wintypes.HWND,
|
||||
wintypes.HMENU,
|
||||
wintypes.HINSTANCE,
|
||||
wintypes.LPVOID,
|
||||
]
|
||||
user32.CreateWindowExW.restype = wintypes.HWND
|
||||
|
||||
user32.ShutdownBlockReasonCreate.argtypes = [wintypes.HWND, wintypes.LPCWSTR]
|
||||
user32.ShutdownBlockReasonCreate.restype = wintypes.BOOL
|
||||
|
||||
user32.ShutdownBlockReasonDestroy.argtypes = [wintypes.HWND]
|
||||
user32.ShutdownBlockReasonDestroy.restype = wintypes.BOOL
|
||||
|
||||
kernel32.GetModuleHandleW.argtypes = [wintypes.LPCWSTR]
|
||||
kernel32.GetModuleHandleW.restype = wintypes.HMODULE
|
||||
|
||||
kernel32.SetProcessShutdownParameters.argtypes = [wintypes.DWORD, wintypes.DWORD]
|
||||
kernel32.SetProcessShutdownParameters.restype = wintypes.BOOL
|
||||
|
||||
|
||||
if is_windows():
|
||||
_bind_winapi()
|
||||
|
||||
|
||||
class WindowsShutdownGuard:
|
||||
"""Catches Windows session-end notifications and runs a cleanup callback.
|
||||
|
||||
Usage::
|
||||
|
||||
complete = threading.Event()
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: server.should_exit_set_true(),
|
||||
shutdown_complete=complete,
|
||||
)
|
||||
guard.start()
|
||||
... # app runs
|
||||
# When your cleanup is finished, signal so Windows can finish ending the session.
|
||||
complete.set()
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
on_shutdown: Callable[[], None],
|
||||
shutdown_complete: threading.Event,
|
||||
app_name: str = "LED Grab",
|
||||
wait_seconds: float = _SHUTDOWN_WAIT_SECONDS,
|
||||
) -> None:
|
||||
self._on_shutdown = on_shutdown
|
||||
self._shutdown_complete = shutdown_complete
|
||||
self._app_name = app_name
|
||||
self._wait_seconds = wait_seconds
|
||||
|
||||
self._hwnd: Optional[wintypes.HWND] = None
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._ready = threading.Event()
|
||||
self._fired = False # idempotency: only trigger on_shutdown once
|
||||
|
||||
# We must keep a strong reference to the WindowProc callback for the
|
||||
# entire lifetime of the window — Win32 stores a raw pointer and
|
||||
# ctypes will free it the moment the Python object is GC'd, which
|
||||
# would crash inside the message pump.
|
||||
self._wndproc_ref = _WNDPROC(self._wndproc)
|
||||
|
||||
# -- public API ----------------------------------------------------------
|
||||
|
||||
def start(self) -> bool:
|
||||
"""Start the shutdown-guard thread. Returns False on non-Windows."""
|
||||
if not is_windows():
|
||||
return False
|
||||
|
||||
try:
|
||||
self._raise_shutdown_priority()
|
||||
except Exception as e: # pragma: no cover - best-effort
|
||||
logger.warning("SetProcessShutdownParameters failed: %s", e)
|
||||
|
||||
self._thread = threading.Thread(
|
||||
target=self._run,
|
||||
name="windows-shutdown-guard",
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
# Wait briefly so callers can rely on the window existing before they
|
||||
# return from start(). If the thread fails to create the window we
|
||||
# still return — the failure is logged from inside the thread.
|
||||
self._ready.wait(timeout=2.0)
|
||||
return self._hwnd is not None
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Tear down the hidden window. Safe to call from any thread."""
|
||||
hwnd = self._hwnd
|
||||
if hwnd is None:
|
||||
return
|
||||
try:
|
||||
ctypes.windll.user32.PostMessageW(hwnd, _WM_CLOSE, 0, 0)
|
||||
except Exception: # pragma: no cover - shutdown best-effort
|
||||
pass
|
||||
|
||||
# -- internals -----------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _raise_shutdown_priority() -> None:
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
if not kernel32.SetProcessShutdownParameters(_SHUTDOWN_PRIORITY, 0):
|
||||
# Failure is benign — the WM_ENDSESSION path still works without
|
||||
# an elevated priority, we'd just be notified slightly later in
|
||||
# the shutdown ordering. Don't even warn.
|
||||
err = ctypes.get_last_error()
|
||||
logger.debug("SetProcessShutdownParameters returned 0 (err=%d)", err)
|
||||
|
||||
def _run(self) -> None:
|
||||
try:
|
||||
self._create_window()
|
||||
except Exception:
|
||||
logger.exception("Failed to create Windows shutdown-guard window")
|
||||
self._ready.set()
|
||||
return
|
||||
|
||||
self._ready.set()
|
||||
self._pump_messages()
|
||||
|
||||
def _create_window(self) -> None:
|
||||
user32 = ctypes.windll.user32
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
|
||||
h_instance = kernel32.GetModuleHandleW(None)
|
||||
class_name = f"LedGrabShutdownGuard_{id(self)}"
|
||||
|
||||
wc = _WNDCLASS()
|
||||
wc.style = _CS_HREDRAW | _CS_VREDRAW
|
||||
wc.lpfnWndProc = self._wndproc_ref
|
||||
wc.cbClsExtra = 0
|
||||
wc.cbWndExtra = 0
|
||||
wc.hInstance = h_instance
|
||||
wc.hIcon = None
|
||||
wc.hCursor = None
|
||||
wc.hbrBackground = None
|
||||
wc.lpszMenuName = None
|
||||
wc.lpszClassName = class_name
|
||||
|
||||
atom = user32.RegisterClassW(ctypes.byref(wc))
|
||||
if not atom:
|
||||
err = ctypes.get_last_error()
|
||||
raise OSError(f"RegisterClassW failed (err={err})")
|
||||
|
||||
# Top-level (parent=NULL), but invisible and off-screen, no taskbar
|
||||
# entry, and not focusable. Crucially NOT HWND_MESSAGE — message-only
|
||||
# windows do not receive shutdown broadcasts.
|
||||
hwnd = user32.CreateWindowExW(
|
||||
_WS_EX_TOOLWINDOW | _WS_EX_NOACTIVATE,
|
||||
class_name,
|
||||
self._app_name,
|
||||
_WS_OVERLAPPED,
|
||||
-32000, # off-screen x
|
||||
-32000, # off-screen y
|
||||
0, # width
|
||||
0, # height
|
||||
None, # parent
|
||||
None, # menu
|
||||
h_instance,
|
||||
None, # lpParam
|
||||
)
|
||||
if not hwnd:
|
||||
err = ctypes.get_last_error()
|
||||
raise OSError(f"CreateWindowExW failed (err={err})")
|
||||
|
||||
self._hwnd = hwnd
|
||||
logger.info("Windows shutdown guard ready (hwnd=%s)", hwnd)
|
||||
|
||||
def _pump_messages(self) -> None:
|
||||
user32 = ctypes.windll.user32
|
||||
msg = _MSG()
|
||||
while True:
|
||||
ret = user32.GetMessageW(ctypes.byref(msg), None, 0, 0)
|
||||
if ret == 0: # WM_QUIT
|
||||
break
|
||||
if ret == -1: # error
|
||||
logger.error("GetMessageW returned -1; exiting shutdown guard")
|
||||
break
|
||||
user32.TranslateMessage(ctypes.byref(msg))
|
||||
user32.DispatchMessageW(ctypes.byref(msg))
|
||||
|
||||
# -- WindowProc ----------------------------------------------------------
|
||||
|
||||
def _wndproc(
|
||||
self,
|
||||
hwnd: wintypes.HWND,
|
||||
msg: wintypes.UINT,
|
||||
wparam: wintypes.WPARAM,
|
||||
lparam: wintypes.LPARAM,
|
||||
) -> int:
|
||||
user32 = ctypes.windll.user32
|
||||
|
||||
if msg == _WM_QUERYENDSESSION:
|
||||
logger.warning("WM_QUERYENDSESSION received (lParam=0x%x) — beginning shutdown", lparam)
|
||||
self._begin_shutdown(hwnd)
|
||||
return 1 # TRUE — allow the session to end
|
||||
|
||||
if msg == _WM_ENDSESSION:
|
||||
logger.warning("WM_ENDSESSION received (wParam=%s)", wparam)
|
||||
# wParam=0 means the session was cancelled; nothing to clean up.
|
||||
if wparam:
|
||||
self._wait_for_cleanup(hwnd)
|
||||
return 0
|
||||
|
||||
if msg == _WM_DESTROY:
|
||||
user32.PostQuitMessage(0)
|
||||
return 0
|
||||
|
||||
return user32.DefWindowProcW(hwnd, msg, wparam, lparam)
|
||||
|
||||
def _begin_shutdown(self, hwnd: wintypes.HWND) -> None:
|
||||
if self._fired:
|
||||
return
|
||||
self._fired = True
|
||||
|
||||
try:
|
||||
ctypes.windll.user32.ShutdownBlockReasonCreate(
|
||||
hwnd,
|
||||
f"{self._app_name} is stopping LED targets and saving settings…",
|
||||
)
|
||||
except Exception as e: # pragma: no cover - best-effort
|
||||
logger.warning("ShutdownBlockReasonCreate failed: %s", e)
|
||||
|
||||
try:
|
||||
self._on_shutdown()
|
||||
except Exception:
|
||||
logger.exception("on_shutdown callback raised")
|
||||
|
||||
def _wait_for_cleanup(self, hwnd: wintypes.HWND) -> None:
|
||||
try:
|
||||
completed = self._shutdown_complete.wait(timeout=self._wait_seconds)
|
||||
if not completed:
|
||||
logger.error(
|
||||
"Shutdown cleanup did not complete within %.0fs — proceeding anyway",
|
||||
self._wait_seconds,
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
ctypes.windll.user32.ShutdownBlockReasonDestroy(hwnd)
|
||||
except Exception: # pragma: no cover - best-effort
|
||||
pass
|
||||
|
||||
|
||||
__all__ = ["WindowsShutdownGuard"]
|
||||
@@ -0,0 +1,67 @@
|
||||
"""Tests for the ``__main__`` entry-point helpers.
|
||||
|
||||
These cover the bits that aren't exercised by the FastAPI test client —
|
||||
the signal-handler install path and the shutdown-state plumbing — so a
|
||||
regression in the launcher can't silently break the user's
|
||||
"stop targets on PC shutdown" guarantee.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import signal
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
|
||||
from ledgrab.__main__ import _install_signal_handlers, _request_shutdown
|
||||
|
||||
|
||||
def test_request_shutdown_sets_should_exit() -> None:
|
||||
server = SimpleNamespace(should_exit=False)
|
||||
_request_shutdown(server)
|
||||
assert server.should_exit is True
|
||||
|
||||
|
||||
def test_install_signal_handlers_installs_for_known_signals() -> None:
|
||||
"""Tray path runs uvicorn on a background thread, so our handlers must
|
||||
actually survive — verify each catchable signal is replaced.
|
||||
"""
|
||||
server = SimpleNamespace(should_exit=False)
|
||||
previous = {
|
||||
name: signal.getsignal(getattr(signal, name))
|
||||
for name in ("SIGINT",)
|
||||
if hasattr(signal, name)
|
||||
}
|
||||
|
||||
try:
|
||||
_install_signal_handlers(server)
|
||||
for name in ("SIGINT", "SIGTERM", "SIGBREAK"):
|
||||
sig = getattr(signal, name, None)
|
||||
if sig is None:
|
||||
continue
|
||||
current = signal.getsignal(sig)
|
||||
# The handler is our local closure — its qualname starts with the function it's defined in.
|
||||
assert callable(current), f"{name} handler should be installed"
|
||||
assert getattr(current, "__qualname__", "").startswith(
|
||||
"_install_signal_handlers"
|
||||
), f"{name} should be replaced by our handler, got {current!r}"
|
||||
finally:
|
||||
# Restore original handlers so the rest of the test suite isn't poisoned.
|
||||
for name, handler in previous.items():
|
||||
signal.signal(getattr(signal, name), handler)
|
||||
|
||||
|
||||
def test_shutdown_state_is_shared_threading_event() -> None:
|
||||
"""``__main__`` and ``main`` must share the same Event instance — if a
|
||||
fresh one is constructed on either side, WM_ENDSESSION waits forever.
|
||||
"""
|
||||
from ledgrab.shutdown_state import shutdown_complete as state_event
|
||||
|
||||
assert isinstance(state_event, threading.Event)
|
||||
|
||||
# If main.py is importable, confirm it re-exports the same object.
|
||||
try:
|
||||
from ledgrab.main import shutdown_complete as main_event
|
||||
except Exception:
|
||||
return # main.py needs full app state — fine to skip on a bare test run.
|
||||
|
||||
assert main_event is state_event, "main.py must re-export the same Event, not create a new one"
|
||||
@@ -0,0 +1,168 @@
|
||||
"""Tests for the Windows shutdown guard.
|
||||
|
||||
The guard is a no-op outside Windows, so the cross-platform tests just
|
||||
check that ``start()`` returns ``False`` and never touches Win32.
|
||||
|
||||
On Windows we exercise the full WM_QUERYENDSESSION → WM_ENDSESSION
|
||||
sequence end-to-end by ``SendMessage``-ing the hidden window directly:
|
||||
the guard should fire the callback synchronously, then block in
|
||||
WM_ENDSESSION until the completion event is signalled.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.utils.win_shutdown import WindowsShutdownGuard
|
||||
|
||||
IS_WINDOWS = sys.platform == "win32"
|
||||
|
||||
|
||||
@pytest.mark.skipif(IS_WINDOWS, reason="Non-Windows behaviour")
|
||||
def test_start_returns_false_off_windows() -> None:
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: None,
|
||||
shutdown_complete=threading.Event(),
|
||||
)
|
||||
assert guard.start() is False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
|
||||
def test_start_creates_hidden_window() -> None:
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: None,
|
||||
shutdown_complete=threading.Event(),
|
||||
)
|
||||
try:
|
||||
assert guard.start() is True
|
||||
assert guard._hwnd is not None
|
||||
finally:
|
||||
guard.stop()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
|
||||
def test_query_endsession_fires_callback_and_returns_true() -> None:
|
||||
import ctypes
|
||||
|
||||
WM_QUERYENDSESSION = 0x0011
|
||||
|
||||
fired: list[str] = []
|
||||
complete = threading.Event()
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: fired.append("cb"),
|
||||
shutdown_complete=complete,
|
||||
wait_seconds=0.5,
|
||||
)
|
||||
try:
|
||||
assert guard.start() is True
|
||||
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_QUERYENDSESSION, 0, 0)
|
||||
assert result == 1, "WM_QUERYENDSESSION must return TRUE so Windows ends the session"
|
||||
assert fired == ["cb"], "shutdown callback should fire exactly once on WM_QUERYENDSESSION"
|
||||
finally:
|
||||
guard.stop()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
|
||||
def test_query_endsession_is_idempotent() -> None:
|
||||
"""Two WM_QUERYENDSESSION messages must not run the callback twice."""
|
||||
import ctypes
|
||||
|
||||
WM_QUERYENDSESSION = 0x0011
|
||||
|
||||
fired: list[str] = []
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: fired.append("cb"),
|
||||
shutdown_complete=threading.Event(),
|
||||
wait_seconds=0.5,
|
||||
)
|
||||
try:
|
||||
assert guard.start() is True
|
||||
ctypes.windll.user32.SendMessageW(guard._hwnd, WM_QUERYENDSESSION, 0, 0)
|
||||
ctypes.windll.user32.SendMessageW(guard._hwnd, WM_QUERYENDSESSION, 0, 0)
|
||||
assert fired == ["cb"]
|
||||
finally:
|
||||
guard.stop()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
|
||||
def test_endsession_waits_for_completion_event() -> None:
|
||||
import ctypes
|
||||
|
||||
WM_ENDSESSION = 0x0016
|
||||
|
||||
complete = threading.Event()
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: None,
|
||||
shutdown_complete=complete,
|
||||
wait_seconds=2.0,
|
||||
)
|
||||
try:
|
||||
assert guard.start() is True
|
||||
|
||||
def signal_after(delay: float) -> None:
|
||||
time.sleep(delay)
|
||||
complete.set()
|
||||
|
||||
threading.Thread(target=signal_after, args=(0.2,), daemon=True).start()
|
||||
t0 = time.monotonic()
|
||||
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_ENDSESSION, 1, 0)
|
||||
elapsed = time.monotonic() - t0
|
||||
assert result == 0
|
||||
assert (
|
||||
0.15 < elapsed < 1.0
|
||||
), f"WM_ENDSESSION should wait for completion, took {elapsed:.2f}s"
|
||||
finally:
|
||||
guard.stop()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
|
||||
def test_endsession_gives_up_after_timeout() -> None:
|
||||
"""If cleanup never finishes, WM_ENDSESSION must still return — Windows
|
||||
will hard-kill us otherwise."""
|
||||
import ctypes
|
||||
|
||||
WM_ENDSESSION = 0x0016
|
||||
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: None,
|
||||
shutdown_complete=threading.Event(), # never set
|
||||
wait_seconds=0.3,
|
||||
)
|
||||
try:
|
||||
assert guard.start() is True
|
||||
t0 = time.monotonic()
|
||||
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_ENDSESSION, 1, 0)
|
||||
elapsed = time.monotonic() - t0
|
||||
assert result == 0
|
||||
assert (
|
||||
0.25 < elapsed < 1.0
|
||||
), f"WM_ENDSESSION must time out near wait_seconds, took {elapsed:.2f}s"
|
||||
finally:
|
||||
guard.stop()
|
||||
|
||||
|
||||
@pytest.mark.skipif(not IS_WINDOWS, reason="Requires Win32 user32")
|
||||
def test_endsession_with_cancel_does_not_wait() -> None:
|
||||
"""wParam=0 on WM_ENDSESSION means the session was cancelled — no cleanup needed."""
|
||||
import ctypes
|
||||
|
||||
WM_ENDSESSION = 0x0016
|
||||
|
||||
guard = WindowsShutdownGuard(
|
||||
on_shutdown=lambda: None,
|
||||
shutdown_complete=threading.Event(), # never set
|
||||
wait_seconds=5.0,
|
||||
)
|
||||
try:
|
||||
assert guard.start() is True
|
||||
t0 = time.monotonic()
|
||||
result = ctypes.windll.user32.SendMessageW(guard._hwnd, WM_ENDSESSION, 0, 0)
|
||||
elapsed = time.monotonic() - t0
|
||||
assert result == 0
|
||||
assert elapsed < 0.2, f"WM_ENDSESSION with wParam=0 should be instant, took {elapsed:.2f}s"
|
||||
finally:
|
||||
guard.stop()
|
||||
Reference in New Issue
Block a user