feat: refactor MQTT from global config to multi-instance entity model
Lint & Test / test (push) Successful in 1m32s

MQTT broker connections are now managed as entities (like HA sources)
instead of a single global config. Each MQTTSource gets its own
runtime with auto-reconnect, ref-counted via MQTTManager.

Backend:
- MQTTSource dataclass + MQTTSourceStore (SQLite)
- MQTTRuntime (per-broker connection, refactored from MQTTService)
- MQTTManager (ref-counted pool, same pattern as HAManager)
- CRUD API at /api/v1/mqtt/sources + test + status endpoints
- MQTTRule gains mqtt_source_id field for source selection
- Automation engine acquires/releases MQTT runtimes automatically
- Legacy MQTTService kept for backward compat during transition

Frontend:
- MQTT source cards in Streams > Integrations tab
- Create/edit modal with test button
- Dashboard integration cards with health-dot indicators
- Removed MQTT tab from settings modal
This commit is contained in:
2026-03-31 18:02:19 +03:00
parent e7c9a568dc
commit c59107c7c7
26 changed files with 1636 additions and 124 deletions
@@ -26,6 +26,7 @@ from .routes.weather_sources import router as weather_sources_router
from .routes.update import router as update_router
from .routes.assets import router as assets_router
from .routes.home_assistant import router as home_assistant_router
from .routes.mqtt import router as mqtt_router
from .routes.game_integration import router as game_integration_router
router = APIRouter()
@@ -53,6 +54,7 @@ router.include_router(weather_sources_router)
router.include_router(update_router)
router.include_router(assets_router)
router.include_router(home_assistant_router)
router.include_router(mqtt_router)
router.include_router(game_integration_router)
__all__ = ["router"]
@@ -35,6 +35,8 @@ from wled_controller.storage.home_assistant_store import HomeAssistantStore
from wled_controller.core.home_assistant.ha_manager import HomeAssistantManager
from wled_controller.storage.game_integration_store import GameIntegrationStore
from wled_controller.core.game_integration.event_bus import GameEventBus
from wled_controller.storage.mqtt_source_store import MQTTSourceStore
from wled_controller.core.mqtt.mqtt_manager import MQTTManager
T = TypeVar("T")
@@ -153,6 +155,14 @@ def get_game_event_bus() -> GameEventBus:
return _get("game_event_bus", "Game event bus")
def get_mqtt_store() -> MQTTSourceStore:
return _get("mqtt_store", "MQTT source store")
def get_mqtt_manager() -> MQTTManager:
return _get("mqtt_manager", "MQTT manager")
def get_database() -> Database:
return _get("database", "Database")
@@ -215,6 +225,8 @@ def init_dependencies(
ha_manager: HomeAssistantManager | None = None,
game_integration_store: GameIntegrationStore | None = None,
game_event_bus: GameEventBus | None = None,
mqtt_store: MQTTSourceStore | None = None,
mqtt_manager: MQTTManager | None = None,
):
"""Initialize global dependencies."""
_deps.update(
@@ -246,5 +258,7 @@ def init_dependencies(
"ha_manager": ha_manager,
"game_integration_store": game_integration_store,
"game_event_bus": game_event_bus,
"mqtt_store": mqtt_store,
"mqtt_manager": mqtt_manager,
}
)
@@ -60,6 +60,7 @@ def _rule_from_schema(s: RuleSchema) -> Rule:
state=s.state or "on",
),
"mqtt": lambda: MQTTRule(
mqtt_source_id=s.mqtt_source_id or "",
topic=s.topic or "",
payload=s.payload or "",
match_mode=s.match_mode or "exact",
@@ -0,0 +1,235 @@
"""MQTT source routes: CRUD + test + status."""
import asyncio
import aiomqtt
from fastapi import APIRouter, Depends, HTTPException
from wled_controller.api.auth import AuthRequired
from wled_controller.api.dependencies import (
fire_entity_event,
get_mqtt_manager,
get_mqtt_store,
)
from wled_controller.api.schemas.mqtt import (
MQTTConnectionStatus,
MQTTSourceCreate,
MQTTSourceListResponse,
MQTTSourceResponse,
MQTTSourceUpdate,
MQTTStatusResponse,
MQTTTestResponse,
)
from wled_controller.core.mqtt.mqtt_manager import MQTTManager
from wled_controller.storage.base_store import EntityNotFoundError
from wled_controller.storage.mqtt_source import MQTTSource
from wled_controller.storage.mqtt_source_store import MQTTSourceStore
from wled_controller.utils import get_logger
logger = get_logger(__name__)
router = APIRouter()
def _to_response(source: MQTTSource, manager: MQTTManager) -> MQTTSourceResponse:
runtime = manager.get_runtime(source.id)
return MQTTSourceResponse(
id=source.id,
name=source.name,
broker_host=source.broker_host,
broker_port=source.broker_port,
username=source.username,
password_set=bool(source.password),
client_id=source.client_id,
base_topic=source.base_topic,
connected=runtime.is_connected if runtime else False,
description=source.description,
tags=source.tags,
created_at=source.created_at,
updated_at=source.updated_at,
)
@router.get(
"/api/v1/mqtt/sources",
response_model=MQTTSourceListResponse,
tags=["MQTT"],
)
async def list_mqtt_sources(
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
manager: MQTTManager = Depends(get_mqtt_manager),
):
sources = store.get_all_sources()
return MQTTSourceListResponse(
sources=[_to_response(s, manager) for s in sources],
count=len(sources),
)
@router.post(
"/api/v1/mqtt/sources",
response_model=MQTTSourceResponse,
status_code=201,
tags=["MQTT"],
)
async def create_mqtt_source(
data: MQTTSourceCreate,
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
manager: MQTTManager = Depends(get_mqtt_manager),
):
try:
source = store.create_source(
name=data.name,
broker_host=data.broker_host,
broker_port=data.broker_port,
username=data.username,
password=data.password,
client_id=data.client_id,
base_topic=data.base_topic,
description=data.description,
tags=data.tags,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
fire_entity_event("mqtt_source", "created", source.id)
return _to_response(source, manager)
@router.get(
"/api/v1/mqtt/sources/{source_id}",
response_model=MQTTSourceResponse,
tags=["MQTT"],
)
async def get_mqtt_source(
source_id: str,
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
manager: MQTTManager = Depends(get_mqtt_manager),
):
try:
source = store.get_source(source_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"MQTT source {source_id} not found")
return _to_response(source, manager)
@router.put(
"/api/v1/mqtt/sources/{source_id}",
response_model=MQTTSourceResponse,
tags=["MQTT"],
)
async def update_mqtt_source(
source_id: str,
data: MQTTSourceUpdate,
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
manager: MQTTManager = Depends(get_mqtt_manager),
):
try:
source = store.update_source(
source_id,
name=data.name,
broker_host=data.broker_host,
broker_port=data.broker_port,
username=data.username,
password=data.password,
client_id=data.client_id,
base_topic=data.base_topic,
description=data.description,
tags=data.tags,
)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"MQTT source {source_id} not found")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
await manager.update_source(source_id)
fire_entity_event("mqtt_source", "updated", source.id)
return _to_response(source, manager)
@router.delete("/api/v1/mqtt/sources/{source_id}", status_code=204, tags=["MQTT"])
async def delete_mqtt_source(
source_id: str,
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
manager: MQTTManager = Depends(get_mqtt_manager),
):
try:
store.delete_source(source_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"MQTT source {source_id} not found")
# Release any active runtime
await manager.release(source_id)
fire_entity_event("mqtt_source", "deleted", source_id)
@router.post(
"/api/v1/mqtt/sources/{source_id}/test",
response_model=MQTTTestResponse,
tags=["MQTT"],
)
async def test_mqtt_source(
source_id: str,
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
):
"""Test connection to an MQTT broker."""
try:
source = store.get_source(source_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"MQTT source {source_id} not found")
try:
async with aiomqtt.Client(
hostname=source.broker_host,
port=source.broker_port,
username=source.username or None,
password=source.password or None,
identifier=f"{source.client_id}-test",
timeout=10.0,
):
return MQTTTestResponse(success=True)
except asyncio.TimeoutError:
return MQTTTestResponse(success=False, error="Connection timed out")
except Exception as e:
return MQTTTestResponse(success=False, error=str(e))
@router.get(
"/api/v1/mqtt/status",
response_model=MQTTStatusResponse,
tags=["MQTT"],
)
async def get_mqtt_status(
_auth: AuthRequired,
store: MQTTSourceStore = Depends(get_mqtt_store),
manager: MQTTManager = Depends(get_mqtt_manager),
):
"""Get overall MQTT integration status (for dashboard indicators)."""
all_sources = store.get_all_sources()
statuses = manager.get_all_sources_status()
status_map = {s["source_id"]: s for s in statuses}
connections = []
connected_count = 0
for source in all_sources:
status = status_map.get(source.id)
connected = status["connected"] if status else False
if connected:
connected_count += 1
connections.append(
MQTTConnectionStatus(
source_id=source.id,
name=source.name,
connected=connected,
broker=f"{source.broker_host}:{source.broker_port}",
)
)
return MQTTStatusResponse(
connections=connections,
total_sources=len(all_sources),
connected_count=connected_count,
)
@@ -28,6 +28,7 @@ class RuleSchema(BaseModel):
# Display state rule fields
state: Optional[str] = Field(None, description="'on' or 'off' (for display_state rule)")
# MQTT rule fields
mqtt_source_id: Optional[str] = Field(None, description="MQTT source ID (for mqtt rule)")
topic: Optional[str] = Field(None, description="MQTT topic to watch (for mqtt rule)")
payload: Optional[str] = Field(None, description="Expected payload value (for mqtt rule)")
match_mode: Optional[str] = Field(
@@ -0,0 +1,83 @@
"""MQTT source schemas (CRUD + test + status)."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
class MQTTSourceCreate(BaseModel):
"""Request to create an MQTT source."""
name: str = Field(description="Source name", min_length=1, max_length=100)
broker_host: str = Field(description="MQTT broker hostname or IP", min_length=1)
broker_port: int = Field(default=1883, description="MQTT broker port", ge=1, le=65535)
username: str = Field(default="", description="Broker username (optional)")
password: str = Field(default="", description="Broker password (optional)")
client_id: str = Field(default="ledgrab", description="MQTT client ID")
base_topic: str = Field(default="ledgrab", description="Base topic prefix")
description: Optional[str] = Field(None, description="Optional description", max_length=500)
tags: List[str] = Field(default_factory=list, description="User-defined tags")
class MQTTSourceUpdate(BaseModel):
"""Request to update an MQTT source."""
name: Optional[str] = Field(None, description="Source name", min_length=1, max_length=100)
broker_host: Optional[str] = Field(None, description="MQTT broker hostname or IP", min_length=1)
broker_port: Optional[int] = Field(None, description="MQTT broker port", ge=1, le=65535)
username: Optional[str] = Field(None, description="Broker username")
password: Optional[str] = Field(None, description="Broker password")
client_id: Optional[str] = Field(None, description="MQTT client ID")
base_topic: Optional[str] = Field(None, description="Base topic prefix")
description: Optional[str] = Field(None, description="Optional description", max_length=500)
tags: Optional[List[str]] = None
class MQTTSourceResponse(BaseModel):
"""MQTT source response."""
id: str = Field(description="Source ID")
name: str = Field(description="Source name")
broker_host: str = Field(description="Broker hostname or IP")
broker_port: int = Field(description="Broker port")
username: str = Field(default="", description="Broker username")
password_set: bool = Field(default=False, description="Whether a password is configured")
client_id: str = Field(description="MQTT client ID")
base_topic: str = Field(description="Base topic prefix")
connected: bool = Field(default=False, description="Whether the broker connection is active")
description: Optional[str] = Field(None, description="Description")
tags: List[str] = Field(default_factory=list, description="User-defined tags")
created_at: datetime = Field(description="Creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
class MQTTSourceListResponse(BaseModel):
"""List of MQTT sources."""
sources: List[MQTTSourceResponse] = Field(description="List of MQTT sources")
count: int = Field(description="Number of sources")
class MQTTTestResponse(BaseModel):
"""Connection test result."""
success: bool = Field(description="Whether broker connection succeeded")
error: Optional[str] = Field(None, description="Error message if connection failed")
class MQTTConnectionStatus(BaseModel):
"""Connection status for dashboard indicators."""
source_id: str
name: str
connected: bool
broker: str
class MQTTStatusResponse(BaseModel):
"""Overall MQTT integration status for dashboard."""
connections: List[MQTTConnectionStatus]
total_sources: int
connected_count: int