Add profile conditions, scene presets, MQTT integration, and Scenes tab
Feature 1 — Profile Conditions: time-of-day, system idle (Win32 GetLastInputInfo), and display state (GUID_CONSOLE_DISPLAY_STATE) condition types for automatic profile activation. Feature 2 — Scene Presets: snapshot/restore system that captures target running states, device brightness, and profile enables. Server-side capture with 5-step activation order. Dedicated Scenes tab with CardSection-based card grid, command palette integration, and dashboard quick-activate section. Feature 3 — MQTT Integration: MQTTService singleton with aiomqtt, MQTTLEDClient device provider for pixel output, MQTT profile condition type with topic/payload matching, and frontend support for MQTT device type and condition editor. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
0
server/src/wled_controller/core/mqtt/__init__.py
Normal file
0
server/src/wled_controller/core/mqtt/__init__.py
Normal file
176
server/src/wled_controller/core/mqtt/mqtt_service.py
Normal file
176
server/src/wled_controller/core/mqtt/mqtt_service.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""Singleton async MQTT service — shared broker connection for all features."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Callable, Dict, Optional, Set
|
||||
|
||||
import aiomqtt
|
||||
|
||||
from wled_controller.config import MQTTConfig
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class MQTTService:
|
||||
"""Manages a persistent MQTT broker connection.
|
||||
|
||||
Features:
|
||||
- Publish messages (retained or transient)
|
||||
- Subscribe to topics with callback dispatch
|
||||
- Topic value cache for synchronous reads (profile condition evaluation)
|
||||
- Auto-reconnect loop
|
||||
- Birth / will messages for online status
|
||||
"""
|
||||
|
||||
def __init__(self, config: MQTTConfig):
|
||||
self._config = config
|
||||
self._client: Optional[aiomqtt.Client] = None
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._connected = False
|
||||
|
||||
# Subscription management
|
||||
self._subscriptions: Dict[str, Set[Callable]] = {} # topic -> set of callbacks
|
||||
self._topic_cache: Dict[str, str] = {} # topic -> last payload string
|
||||
|
||||
# Pending publishes queued while disconnected
|
||||
self._publish_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected
|
||||
|
||||
@property
|
||||
def is_enabled(self) -> bool:
|
||||
return self._config.enabled
|
||||
|
||||
async def start(self) -> None:
|
||||
if not self._config.enabled:
|
||||
logger.info("MQTT service disabled in configuration")
|
||||
return
|
||||
if self._task is not None:
|
||||
return
|
||||
self._task = asyncio.create_task(self._connection_loop())
|
||||
logger.info(f"MQTT service starting — broker {self._config.broker_host}:{self._config.broker_port}")
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._task is None:
|
||||
return
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
self._connected = False
|
||||
logger.info("MQTT service stopped")
|
||||
|
||||
async def publish(self, topic: str, payload: str, retain: bool = False, qos: int = 0) -> None:
|
||||
"""Publish a message. If disconnected, queue for later delivery."""
|
||||
if not self._config.enabled:
|
||||
return
|
||||
if self._connected and self._client:
|
||||
try:
|
||||
await self._client.publish(topic, payload, retain=retain, qos=qos)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(f"MQTT publish failed ({topic}): {e}")
|
||||
# Queue for retry
|
||||
try:
|
||||
self._publish_queue.put_nowait((topic, payload, retain, qos))
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
|
||||
async def subscribe(self, topic: str, callback: Callable) -> None:
|
||||
"""Subscribe to a topic. Callback receives (topic: str, payload: str)."""
|
||||
if topic not in self._subscriptions:
|
||||
self._subscriptions[topic] = set()
|
||||
self._subscriptions[topic].add(callback)
|
||||
|
||||
# Subscribe on the live client if connected
|
||||
if self._connected and self._client:
|
||||
try:
|
||||
await self._client.subscribe(topic)
|
||||
except Exception as e:
|
||||
logger.warning(f"MQTT subscribe failed ({topic}): {e}")
|
||||
|
||||
def get_last_value(self, topic: str) -> Optional[str]:
|
||||
"""Get cached last value for a topic (synchronous — for profile evaluation)."""
|
||||
return self._topic_cache.get(topic)
|
||||
|
||||
async def _connection_loop(self) -> None:
|
||||
"""Persistent connection loop with auto-reconnect."""
|
||||
base_topic = self._config.base_topic
|
||||
will_topic = f"{base_topic}/status"
|
||||
will_payload = "offline"
|
||||
|
||||
while True:
|
||||
try:
|
||||
async with aiomqtt.Client(
|
||||
hostname=self._config.broker_host,
|
||||
port=self._config.broker_port,
|
||||
username=self._config.username or None,
|
||||
password=self._config.password or None,
|
||||
identifier=self._config.client_id,
|
||||
will=aiomqtt.Will(
|
||||
topic=will_topic,
|
||||
payload=will_payload,
|
||||
retain=True,
|
||||
),
|
||||
) as client:
|
||||
self._client = client
|
||||
self._connected = True
|
||||
logger.info("MQTT connected to broker")
|
||||
|
||||
# Publish birth message
|
||||
await client.publish(will_topic, "online", retain=True)
|
||||
|
||||
# Re-subscribe to all registered topics
|
||||
for topic in self._subscriptions:
|
||||
await client.subscribe(topic)
|
||||
|
||||
# Drain pending publishes
|
||||
while not self._publish_queue.empty():
|
||||
try:
|
||||
t, p, r, q = self._publish_queue.get_nowait()
|
||||
await client.publish(t, p, retain=r, qos=q)
|
||||
except Exception:
|
||||
break
|
||||
|
||||
# Message receive loop
|
||||
async for msg in client.messages:
|
||||
topic_str = str(msg.topic)
|
||||
payload_str = msg.payload.decode("utf-8", errors="replace") if msg.payload else ""
|
||||
self._topic_cache[topic_str] = payload_str
|
||||
|
||||
# Dispatch to callbacks
|
||||
for sub_topic, callbacks in self._subscriptions.items():
|
||||
if aiomqtt.Topic(sub_topic).matches(msg.topic):
|
||||
for cb in callbacks:
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(cb):
|
||||
asyncio.create_task(cb(topic_str, payload_str))
|
||||
else:
|
||||
cb(topic_str, payload_str)
|
||||
except Exception as e:
|
||||
logger.error(f"MQTT callback error ({topic_str}): {e}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
self._connected = False
|
||||
self._client = None
|
||||
logger.warning(f"MQTT connection lost: {e}. Reconnecting in 5s...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# ===== State exposure helpers =====
|
||||
|
||||
async def publish_target_state(self, target_id: str, state: dict) -> None:
|
||||
"""Publish target state to MQTT (called from event handler)."""
|
||||
topic = f"{self._config.base_topic}/target/{target_id}/state"
|
||||
await self.publish(topic, json.dumps(state), retain=True)
|
||||
|
||||
async def publish_profile_state(self, profile_id: str, action: str) -> None:
|
||||
"""Publish profile state change to MQTT."""
|
||||
topic = f"{self._config.base_topic}/profile/{profile_id}/state"
|
||||
await self.publish(topic, json.dumps({"action": action}), retain=True)
|
||||
Reference in New Issue
Block a user