Add Linux D-Bus notification listener support
Refactor OS notification listener into platform backends: - Windows: winsdk toast polling (unchanged behavior) - Linux: dbus-next monitoring of org.freedesktop.Notifications Add [notifications] optional dependency group in pyproject.toml. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -18,7 +18,7 @@ RUN apt-get update && apt-get install -y \
|
||||
COPY pyproject.toml .
|
||||
COPY src/ ./src/
|
||||
COPY config/ ./config/
|
||||
RUN pip install --no-cache-dir .
|
||||
RUN pip install --no-cache-dir ".[notifications]"
|
||||
|
||||
# Create directories for data and logs
|
||||
RUN mkdir -p /app/data /app/logs
|
||||
|
||||
@@ -60,6 +60,11 @@ dev = [
|
||||
camera = [
|
||||
"opencv-python-headless>=4.8.0",
|
||||
]
|
||||
# OS notification capture
|
||||
notifications = [
|
||||
"winsdk>=1.0.0b10; sys_platform == 'win32'",
|
||||
"dbus-next>=0.2.3; sys_platform == 'linux'",
|
||||
]
|
||||
# High-performance screen capture engines (Windows only)
|
||||
perf = [
|
||||
"dxcam>=0.0.5; sys_platform == 'win32'",
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
"""OS-level notification listener for Windows.
|
||||
"""OS-level notification listener — cross-platform.
|
||||
|
||||
Polls Windows toast notifications via the UserNotificationListener API
|
||||
(winsdk) and fires matching NotificationColorStripStream instances when
|
||||
new notifications appear. Sources with os_listener=True are monitored.
|
||||
Monitors desktop notifications and fires matching NotificationColorStripStream
|
||||
instances when new notifications appear. Sources with os_listener=True are
|
||||
monitored.
|
||||
|
||||
Runs in a background thread with ~500ms polling interval.
|
||||
Supported platforms:
|
||||
- **Windows**: polls toast notifications via winsdk UserNotificationListener
|
||||
- **Linux**: monitors org.freedesktop.Notifications via D-Bus (dbus-next)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -18,7 +20,7 @@ from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_POLL_INTERVAL = 0.5 # seconds between polls
|
||||
_POLL_INTERVAL = 0.5 # seconds between polls (Windows only)
|
||||
|
||||
# Module-level singleton for dependency access
|
||||
_instance: Optional["OsNotificationListener"] = None
|
||||
@@ -29,29 +31,22 @@ def get_os_notification_listener() -> Optional["OsNotificationListener"]:
|
||||
return _instance
|
||||
|
||||
|
||||
class OsNotificationListener:
|
||||
"""Monitors OS notifications and fires notification CSS streams."""
|
||||
# ── Platform backends ──────────────────────────────────────────────────
|
||||
|
||||
def __init__(self, color_strip_store, color_strip_stream_manager):
|
||||
self._css_store = color_strip_store
|
||||
self._css_stream_manager = color_strip_stream_manager
|
||||
class _WindowsBackend:
|
||||
"""Polls Windows toast notifications via winsdk."""
|
||||
|
||||
def __init__(self, on_notification):
|
||||
self._on_notification = on_notification
|
||||
self._running = False
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._seen_ids: Set[int] = set()
|
||||
self._available = False
|
||||
# Recent notification history (thread-safe deque, newest first)
|
||||
self._history: collections.deque = collections.deque(maxlen=50)
|
||||
|
||||
def start(self) -> None:
|
||||
global _instance
|
||||
_instance = self
|
||||
if self._running:
|
||||
return
|
||||
@staticmethod
|
||||
def probe() -> bool:
|
||||
"""Return True if this backend can run on the current system."""
|
||||
if platform.system() != "Windows":
|
||||
logger.info("OS notification listener: skipped (not Windows)")
|
||||
return
|
||||
|
||||
# Check winsdk availability
|
||||
return False
|
||||
try:
|
||||
from winsdk.windows.ui.notifications.management import (
|
||||
UserNotificationListener,
|
||||
@@ -64,15 +59,16 @@ class OsNotificationListener:
|
||||
f"OS notification listener: access denied (status={status}). "
|
||||
"Enable notification access in Windows Settings > Privacy > Notifications."
|
||||
)
|
||||
return
|
||||
self._available = True
|
||||
return False
|
||||
return True
|
||||
except ImportError:
|
||||
logger.info("OS notification listener: winsdk not installed, skipping")
|
||||
return
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning(f"OS notification listener: init error: {e}")
|
||||
return
|
||||
logger.warning(f"OS notification listener: Windows init error: {e}")
|
||||
return False
|
||||
|
||||
def start(self) -> None:
|
||||
self._running = True
|
||||
self._thread = threading.Thread(
|
||||
target=self._poll_loop,
|
||||
@@ -80,17 +76,14 @@ class OsNotificationListener:
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
logger.info("OS notification listener started")
|
||||
|
||||
def stop(self) -> None:
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=3.0)
|
||||
self._thread = None
|
||||
logger.info("OS notification listener stopped")
|
||||
|
||||
def _poll_loop(self) -> None:
|
||||
"""Background polling loop."""
|
||||
from winsdk.windows.ui.notifications.management import UserNotificationListener
|
||||
from winsdk.windows.ui.notifications import NotificationKinds
|
||||
|
||||
@@ -118,7 +111,8 @@ class OsNotificationListener:
|
||||
current_ids.add(n.id)
|
||||
if n.id not in self._seen_ids:
|
||||
self._seen_ids.add(n.id)
|
||||
self._on_new_notification(n)
|
||||
app_name = self._extract_app_name(n)
|
||||
self._on_notification(app_name)
|
||||
|
||||
# Prune seen IDs that are no longer in the notification center
|
||||
self._seen_ids &= current_ids
|
||||
@@ -131,22 +125,167 @@ class OsNotificationListener:
|
||||
loop.close()
|
||||
self._running = False
|
||||
|
||||
@staticmethod
|
||||
def _extract_app_name(notification) -> Optional[str]:
|
||||
try:
|
||||
return notification.app_info.display_info.display_name
|
||||
except (OSError, AttributeError):
|
||||
try:
|
||||
return notification.app_info.app_user_model_id
|
||||
except (OSError, AttributeError):
|
||||
return None
|
||||
|
||||
|
||||
class _LinuxBackend:
|
||||
"""Monitors D-Bus org.freedesktop.Notifications for new notifications."""
|
||||
|
||||
def __init__(self, on_notification):
|
||||
self._on_notification = on_notification
|
||||
self._running = False
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
|
||||
@staticmethod
|
||||
def probe() -> bool:
|
||||
"""Return True if this backend can run on the current system."""
|
||||
if platform.system() != "Linux":
|
||||
return False
|
||||
try:
|
||||
import dbus_next # noqa: F401
|
||||
return True
|
||||
except ImportError:
|
||||
logger.info("OS notification listener: dbus-next not installed, skipping")
|
||||
return False
|
||||
|
||||
def start(self) -> None:
|
||||
self._running = True
|
||||
self._thread = threading.Thread(
|
||||
target=self._run_loop,
|
||||
name="os-notif-listener",
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=3.0)
|
||||
self._thread = None
|
||||
|
||||
def _run_loop(self) -> None:
|
||||
"""Run the async D-Bus monitor in a dedicated event loop."""
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(self._monitor())
|
||||
except Exception as e:
|
||||
if self._running:
|
||||
logger.error(f"OS notification listener D-Bus error: {e}")
|
||||
finally:
|
||||
loop.close()
|
||||
self._running = False
|
||||
|
||||
async def _monitor(self) -> None:
|
||||
from dbus_next.aio import MessageBus
|
||||
from dbus_next import BusType, Message, MessageType
|
||||
|
||||
bus = await MessageBus(bus_type=BusType.SESSION).connect()
|
||||
|
||||
# Use BecomeMonitor to eavesdrop on Notify method calls.
|
||||
# This sees calls from apps to the notification daemon with the app_name.
|
||||
rule = (
|
||||
"type='method_call',"
|
||||
"interface='org.freedesktop.Notifications',"
|
||||
"member='Notify'"
|
||||
)
|
||||
try:
|
||||
msg = Message(
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus.Monitoring",
|
||||
member="BecomeMonitor",
|
||||
signature="asu",
|
||||
body=[[rule], 0],
|
||||
)
|
||||
await bus.call(msg)
|
||||
except Exception:
|
||||
# Fallback: AddMatch (works on most desktop sessions without special permissions)
|
||||
try:
|
||||
msg = Message(
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="AddMatch",
|
||||
signature="s",
|
||||
body=[rule],
|
||||
)
|
||||
await bus.call(msg)
|
||||
except Exception as e:
|
||||
logger.warning(f"OS notification listener: D-Bus match failed: {e}")
|
||||
|
||||
def _on_message(msg):
|
||||
if msg.message_type != MessageType.METHOD_CALL:
|
||||
return
|
||||
if msg.member != "Notify":
|
||||
return
|
||||
try:
|
||||
# Notify args: app_name, replaces_id, app_icon, summary, body, ...
|
||||
args = msg.body
|
||||
app_name = args[0] if args else None
|
||||
self._on_notification(app_name)
|
||||
except Exception as e:
|
||||
logger.debug(f"OS notification listener: D-Bus parse error: {e}")
|
||||
|
||||
bus.add_message_handler(_on_message)
|
||||
|
||||
logger.info("OS notification listener: D-Bus monitor active")
|
||||
|
||||
# Keep running until stopped
|
||||
while self._running:
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
bus.disconnect()
|
||||
|
||||
|
||||
# ── Main listener class ───────────────────────────────────────────────
|
||||
|
||||
class OsNotificationListener:
|
||||
"""Monitors OS notifications and fires notification CSS streams."""
|
||||
|
||||
def __init__(self, color_strip_store, color_strip_stream_manager):
|
||||
self._css_store = color_strip_store
|
||||
self._css_stream_manager = color_strip_stream_manager
|
||||
self._available = False
|
||||
self._backend = None
|
||||
# Recent notification history (thread-safe deque, newest first)
|
||||
self._history: collections.deque = collections.deque(maxlen=50)
|
||||
|
||||
def start(self) -> None:
|
||||
global _instance
|
||||
_instance = self
|
||||
|
||||
# Try platform backends in order
|
||||
for backend_cls in (_WindowsBackend, _LinuxBackend):
|
||||
if backend_cls.probe():
|
||||
self._backend = backend_cls(on_notification=self._on_new_notification)
|
||||
self._backend.start()
|
||||
self._available = True
|
||||
logger.info(f"OS notification listener started ({backend_cls.__name__})")
|
||||
return
|
||||
|
||||
logger.info("OS notification listener: no supported platform backend available")
|
||||
|
||||
def stop(self) -> None:
|
||||
if self._backend:
|
||||
self._backend.stop()
|
||||
self._backend = None
|
||||
logger.info("OS notification listener stopped")
|
||||
|
||||
@property
|
||||
def recent_history(self) -> List[Dict]:
|
||||
"""Return recent notification history (newest first)."""
|
||||
return list(self._history)
|
||||
|
||||
def _on_new_notification(self, notification) -> None:
|
||||
def _on_new_notification(self, app_name: Optional[str]) -> None:
|
||||
"""Handle a new OS notification — fire matching streams."""
|
||||
try:
|
||||
app_name = notification.app_info.display_info.display_name
|
||||
except (OSError, AttributeError):
|
||||
try:
|
||||
app_name = notification.app_info.app_user_model_id
|
||||
except (OSError, AttributeError):
|
||||
app_name = None
|
||||
|
||||
# Find all notification sources with os_listener=True
|
||||
from wled_controller.storage.color_strip_source import NotificationColorStripSource
|
||||
|
||||
fired = 0
|
||||
|
||||
Reference in New Issue
Block a user