Add OS notification listener for Windows toast capture

Polls Windows notifications via winsdk UserNotificationListener API
and fires matching notification CSS streams with os_listener=True.
Includes history API endpoint for tracking captured notifications.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-10 21:57:29 +03:00
parent 42280f094a
commit bf212c86ec
3 changed files with 208 additions and 0 deletions

View File

@@ -570,6 +570,19 @@ async def notify_source(
} }
@router.get("/api/v1/color-strip-sources/os-notifications/history", tags=["Color Strip Sources"])
async def os_notification_history(_auth: AuthRequired):
"""Return recent OS notification capture history (newest first)."""
from wled_controller.core.processing.os_notification_listener import get_os_notification_listener
listener = get_os_notification_listener()
if listener is None:
return {"available": False, "history": []}
return {
"available": listener._available,
"history": listener.recent_history,
}
@router.websocket("/api/v1/color-strip-sources/{source_id}/ws") @router.websocket("/api/v1/color-strip-sources/{source_id}/ws")
async def css_api_input_ws( async def css_api_input_ws(
websocket: WebSocket, websocket: WebSocket,

View File

@@ -0,0 +1,181 @@
"""OS-level notification listener for Windows.
Polls Windows toast notifications via the UserNotificationListener API
(winsdk) and fires matching NotificationColorStripStream instances when
new notifications appear. Sources with os_listener=True are monitored.
Runs in a background thread with ~500ms polling interval.
"""
import asyncio
import collections
import platform
import threading
import time
from typing import Dict, List, Optional, Set
from wled_controller.utils import get_logger
logger = get_logger(__name__)
_POLL_INTERVAL = 0.5 # seconds between polls
# Module-level singleton for dependency access
_instance: Optional["OsNotificationListener"] = None
def get_os_notification_listener() -> Optional["OsNotificationListener"]:
"""Return the global OsNotificationListener instance (or None)."""
return _instance
class OsNotificationListener:
"""Monitors OS notifications and fires notification CSS streams."""
def __init__(self, color_strip_store, color_strip_stream_manager):
self._css_store = color_strip_store
self._css_stream_manager = color_strip_stream_manager
self._running = False
self._thread: Optional[threading.Thread] = None
self._seen_ids: Set[int] = set()
self._available = False
# Recent notification history (thread-safe deque, newest first)
self._history: collections.deque = collections.deque(maxlen=50)
def start(self) -> None:
global _instance
_instance = self
if self._running:
return
if platform.system() != "Windows":
logger.info("OS notification listener: skipped (not Windows)")
return
# Check winsdk availability
try:
from winsdk.windows.ui.notifications.management import (
UserNotificationListener,
UserNotificationListenerAccessStatus,
)
listener = UserNotificationListener.current
status = listener.get_access_status()
if status != UserNotificationListenerAccessStatus.ALLOWED:
logger.warning(
f"OS notification listener: access denied (status={status}). "
"Enable notification access in Windows Settings > Privacy > Notifications."
)
return
self._available = True
except ImportError:
logger.info("OS notification listener: winsdk not installed, skipping")
return
except Exception as e:
logger.warning(f"OS notification listener: init error: {e}")
return
self._running = True
self._thread = threading.Thread(
target=self._poll_loop,
name="os-notif-listener",
daemon=True,
)
self._thread.start()
logger.info("OS notification listener started")
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=3.0)
self._thread = None
logger.info("OS notification listener stopped")
def _poll_loop(self) -> None:
"""Background polling loop."""
from winsdk.windows.ui.notifications.management import UserNotificationListener
from winsdk.windows.ui.notifications import NotificationKinds
listener = UserNotificationListener.current
loop = asyncio.new_event_loop()
async def _get_notifications():
return await listener.get_notifications_async(NotificationKinds.TOAST)
# Seed with current notifications so we don't fire on startup
try:
initial = loop.run_until_complete(_get_notifications())
for n in initial:
self._seen_ids.add(n.id)
logger.debug(f"OS notification listener: seeded {len(self._seen_ids)} existing notifications")
except Exception as e:
logger.warning(f"OS notification listener: seed error: {e}")
try:
while self._running:
try:
notifications = loop.run_until_complete(_get_notifications())
current_ids = set()
for n in notifications:
current_ids.add(n.id)
if n.id not in self._seen_ids:
self._seen_ids.add(n.id)
self._on_new_notification(n)
# Prune seen IDs that are no longer in the notification center
self._seen_ids &= current_ids
except Exception as e:
logger.debug(f"OS notification listener poll error: {e}")
time.sleep(_POLL_INTERVAL)
finally:
loop.close()
self._running = False
@property
def recent_history(self) -> List[Dict]:
"""Return recent notification history (newest first)."""
return list(self._history)
def _on_new_notification(self, notification) -> None:
"""Handle a new OS notification — fire matching streams."""
try:
app_name = notification.app_info.display_info.display_name
except (OSError, AttributeError):
try:
app_name = notification.app_info.app_user_model_id
except (OSError, AttributeError):
app_name = None
# Find all notification sources with os_listener=True
from wled_controller.storage.color_strip_source import NotificationColorStripSource
fired = 0
filtered = 0
try:
sources = self._css_store.get_all_sources()
except Exception:
sources = []
for source in sources:
if not isinstance(source, NotificationColorStripSource):
continue
if not source.os_listener:
continue
streams = self._css_stream_manager.get_streams_by_source_id(source.id)
for stream in streams:
if hasattr(stream, "fire"):
if stream.fire(app_name=app_name):
fired += 1
else:
filtered += 1
entry = {
"app": app_name,
"time": time.time(),
"fired": fired,
"filtered": filtered,
}
self._history.appendleft(entry)
logger.info(f"OS notification captured: app={app_name!r}, fired={fired}, filtered={filtered}")

View File

@@ -35,6 +35,7 @@ from wled_controller.core.automations.automation_engine import AutomationEngine
from wled_controller.core.mqtt.mqtt_service import MQTTService from wled_controller.core.mqtt.mqtt_service import MQTTService
from wled_controller.core.devices.mqtt_client import set_mqtt_service from wled_controller.core.devices.mqtt_client import set_mqtt_service
from wled_controller.core.backup.auto_backup import AutoBackupEngine from wled_controller.core.backup.auto_backup import AutoBackupEngine
from wled_controller.core.processing.os_notification_listener import OsNotificationListener
from wled_controller.api.routes.system import STORE_MAP from wled_controller.api.routes.system import STORE_MAP
from wled_controller.utils import setup_logging, get_logger from wled_controller.utils import setup_logging, get_logger
@@ -188,6 +189,13 @@ async def lifespan(app: FastAPI):
# Start auto-backup engine (periodic configuration backups) # Start auto-backup engine (periodic configuration backups)
await auto_backup_engine.start() await auto_backup_engine.start()
# Start OS notification listener (Windows toast → notification CSS streams)
os_notif_listener = OsNotificationListener(
color_strip_store=color_strip_store,
color_strip_stream_manager=processor_manager.color_strip_stream_manager,
)
os_notif_listener.start()
yield yield
# Shutdown # Shutdown
@@ -206,6 +214,12 @@ async def lifespan(app: FastAPI):
except Exception as e: except Exception as e:
logger.error(f"Error stopping automation engine: {e}") logger.error(f"Error stopping automation engine: {e}")
# Stop OS notification listener
try:
os_notif_listener.stop()
except Exception as e:
logger.error(f"Error stopping OS notification listener: {e}")
# Stop all processing # Stop all processing
try: try:
await processor_manager.stop_all() await processor_manager.stop_all()