Add LEDDeviceProvider abstraction and standby capability flag
Consolidate all device-type-specific logic into LEDDeviceProvider ABC with provider registry. WLEDDeviceProvider handles client creation, health checks, validation, mDNS discovery, and brightness control. Routes now delegate to providers instead of using if/else type checks. Add standby_required capability and expose device capabilities in API. Target editor conditionally shows standby interval based on selected device's capabilities. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,113 +0,0 @@
|
||||
"""Network discovery for LED devices (mDNS/DNS-SD)."""
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
|
||||
import httpx
|
||||
from zeroconf import ServiceStateChange, Zeroconf
|
||||
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
|
||||
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
WLED_MDNS_TYPE = "_wled._tcp.local."
|
||||
DEFAULT_SCAN_TIMEOUT = 3.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiscoveredDevice:
|
||||
"""A device found via network discovery."""
|
||||
|
||||
name: str
|
||||
url: str
|
||||
device_type: str
|
||||
ip: str
|
||||
mac: str
|
||||
led_count: Optional[int]
|
||||
version: Optional[str]
|
||||
|
||||
|
||||
async def _enrich_wled_device(
|
||||
url: str, fallback_name: str
|
||||
) -> tuple[str, Optional[str], Optional[int], str]:
|
||||
"""Probe a WLED device's /json/info to get name, version, LED count, MAC."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=2) as client:
|
||||
resp = await client.get(f"{url}/json/info")
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return (
|
||||
data.get("name", fallback_name),
|
||||
data.get("ver"),
|
||||
data.get("leds", {}).get("count"),
|
||||
data.get("mac", ""),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not fetch WLED info from {url}: {e}")
|
||||
return fallback_name, None, None, ""
|
||||
|
||||
|
||||
async def discover_wled_devices(
|
||||
timeout: float = DEFAULT_SCAN_TIMEOUT,
|
||||
) -> List[DiscoveredDevice]:
|
||||
"""Scan the local network for WLED devices via mDNS."""
|
||||
discovered: dict[str, AsyncServiceInfo] = {}
|
||||
|
||||
def on_state_change(**kwargs):
|
||||
service_type = kwargs.get("service_type", "")
|
||||
name = kwargs.get("name", "")
|
||||
state_change = kwargs.get("state_change")
|
||||
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
|
||||
discovered[name] = AsyncServiceInfo(service_type, name)
|
||||
|
||||
aiozc = AsyncZeroconf()
|
||||
browser = AsyncServiceBrowser(
|
||||
aiozc.zeroconf, WLED_MDNS_TYPE, handlers=[on_state_change]
|
||||
)
|
||||
|
||||
await asyncio.sleep(timeout)
|
||||
|
||||
# Resolve all discovered services
|
||||
for info in discovered.values():
|
||||
await info.async_request(aiozc.zeroconf, timeout=2000)
|
||||
|
||||
await browser.async_cancel()
|
||||
|
||||
# Build raw list with IPs, then enrich in parallel
|
||||
raw: list[tuple[str, str, str]] = [] # (service_name, ip, url)
|
||||
for name, info in discovered.items():
|
||||
addrs = info.parsed_addresses()
|
||||
if not addrs:
|
||||
continue
|
||||
ip = addrs[0]
|
||||
port = info.port or 80
|
||||
url = f"http://{ip}" if port == 80 else f"http://{ip}:{port}"
|
||||
service_name = name.replace(f".{WLED_MDNS_TYPE}", "")
|
||||
raw.append((service_name, ip, url))
|
||||
|
||||
# Enrich all devices in parallel
|
||||
enrichment = await asyncio.gather(
|
||||
*[_enrich_wled_device(url, sname) for sname, _, url in raw]
|
||||
)
|
||||
|
||||
results: List[DiscoveredDevice] = []
|
||||
for (service_name, ip, url), (wled_name, version, led_count, mac) in zip(
|
||||
raw, enrichment
|
||||
):
|
||||
results.append(
|
||||
DiscoveredDevice(
|
||||
name=wled_name,
|
||||
url=url,
|
||||
device_type="wled",
|
||||
ip=ip,
|
||||
mac=mac,
|
||||
led_count=led_count,
|
||||
version=version,
|
||||
)
|
||||
)
|
||||
|
||||
await aiozc.async_close()
|
||||
logger.info(f"mDNS scan found {len(results)} WLED device(s)")
|
||||
return results
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Abstract base class for LED device communication clients."""
|
||||
"""Abstract base class for LED device communication clients and provider registry."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
@@ -22,6 +22,19 @@ class DeviceHealth:
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiscoveredDevice:
|
||||
"""A device found via network discovery."""
|
||||
|
||||
name: str
|
||||
url: str
|
||||
device_type: str
|
||||
ip: str
|
||||
mac: str
|
||||
led_count: Optional[int]
|
||||
version: Optional[str]
|
||||
|
||||
|
||||
class LEDClient(ABC):
|
||||
"""Abstract base for LED device communication.
|
||||
|
||||
@@ -128,36 +141,97 @@ class LEDClient(ABC):
|
||||
await self.close()
|
||||
|
||||
|
||||
# Per-device-type capability sets.
|
||||
# Used by API routes to gate type-specific features (e.g. brightness control).
|
||||
DEVICE_TYPE_CAPABILITIES = {
|
||||
"wled": {"brightness_control"},
|
||||
}
|
||||
# ===== LED DEVICE PROVIDER =====
|
||||
|
||||
class LEDDeviceProvider(ABC):
|
||||
"""Encapsulates everything about a specific LED device type.
|
||||
|
||||
Implement one subclass per device type (WLED, etc.) and register it
|
||||
via register_provider(). All type-specific dispatch (client creation,
|
||||
health checks, discovery, validation, brightness) goes through the provider.
|
||||
"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def device_type(self) -> str:
|
||||
"""Type identifier string, e.g. 'wled'."""
|
||||
...
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set:
|
||||
"""Capability set for this device type. Override to add capabilities."""
|
||||
return set()
|
||||
|
||||
@abstractmethod
|
||||
def create_client(self, url: str, **kwargs) -> LEDClient:
|
||||
"""Create a connected-ready LEDClient for this device type."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
|
||||
"""Check device health without a full client connection."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def validate_device(self, url: str) -> dict:
|
||||
"""Validate a device URL before adding it.
|
||||
|
||||
Returns:
|
||||
dict with at least {'led_count': int}
|
||||
|
||||
Raises:
|
||||
Exception on validation failure (caller converts to HTTP error).
|
||||
"""
|
||||
...
|
||||
|
||||
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
|
||||
"""Discover devices on the network.
|
||||
|
||||
Override in providers that support discovery. Default: empty list.
|
||||
"""
|
||||
return []
|
||||
|
||||
async def get_brightness(self, url: str) -> int:
|
||||
"""Get device brightness (0-255). Override if capabilities include brightness_control."""
|
||||
raise NotImplementedError
|
||||
|
||||
async def set_brightness(self, url: str, brightness: int) -> None:
|
||||
"""Set device brightness (0-255). Override if capabilities include brightness_control."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def get_device_capabilities(device_type: str) -> set:
|
||||
"""Return the capability set for a device type."""
|
||||
return DEVICE_TYPE_CAPABILITIES.get(device_type, set())
|
||||
# ===== PROVIDER REGISTRY =====
|
||||
|
||||
_provider_registry: Dict[str, LEDDeviceProvider] = {}
|
||||
|
||||
|
||||
def create_led_client(device_type: str, url: str, **kwargs) -> LEDClient:
|
||||
"""Factory: create the right LEDClient subclass for a device type.
|
||||
def register_provider(provider: LEDDeviceProvider) -> None:
|
||||
"""Register a device provider."""
|
||||
_provider_registry[provider.device_type] = provider
|
||||
|
||||
Args:
|
||||
device_type: Device type identifier (e.g. "wled")
|
||||
url: Device URL
|
||||
**kwargs: Passed to the client constructor
|
||||
|
||||
Returns:
|
||||
LEDClient instance
|
||||
def get_provider(device_type: str) -> LEDDeviceProvider:
|
||||
"""Get the provider for a device type.
|
||||
|
||||
Raises:
|
||||
ValueError: If device_type is unknown
|
||||
ValueError: If device_type is unknown.
|
||||
"""
|
||||
if device_type == "wled":
|
||||
from wled_controller.core.wled_client import WLEDClient
|
||||
return WLEDClient(url, **kwargs)
|
||||
raise ValueError(f"Unknown LED device type: {device_type}")
|
||||
provider = _provider_registry.get(device_type)
|
||||
if not provider:
|
||||
raise ValueError(f"Unknown LED device type: {device_type}")
|
||||
return provider
|
||||
|
||||
|
||||
def get_all_providers() -> Dict[str, LEDDeviceProvider]:
|
||||
"""Return all registered providers."""
|
||||
return dict(_provider_registry)
|
||||
|
||||
|
||||
# ===== FACTORY FUNCTIONS (delegate to providers) =====
|
||||
|
||||
def create_led_client(device_type: str, url: str, **kwargs) -> LEDClient:
|
||||
"""Factory: create the right LEDClient subclass for a device type."""
|
||||
return get_provider(device_type).create_client(url, **kwargs)
|
||||
|
||||
|
||||
async def check_device_health(
|
||||
@@ -166,15 +240,23 @@ async def check_device_health(
|
||||
http_client,
|
||||
prev_health: Optional[DeviceHealth] = None,
|
||||
) -> DeviceHealth:
|
||||
"""Factory: dispatch health check to the right LEDClient subclass.
|
||||
"""Factory: dispatch health check to the right provider."""
|
||||
return await get_provider(device_type).check_health(url, http_client, prev_health)
|
||||
|
||||
Args:
|
||||
device_type: Device type identifier
|
||||
url: Device URL
|
||||
http_client: Shared httpx.AsyncClient
|
||||
prev_health: Previous health result
|
||||
"""
|
||||
if device_type == "wled":
|
||||
from wled_controller.core.wled_client import WLEDClient
|
||||
return await WLEDClient.check_health(url, http_client, prev_health)
|
||||
return DeviceHealth(online=True, last_checked=datetime.utcnow())
|
||||
|
||||
def get_device_capabilities(device_type: str) -> set:
|
||||
"""Return the capability set for a device type."""
|
||||
try:
|
||||
return get_provider(device_type).capabilities
|
||||
except ValueError:
|
||||
return set()
|
||||
|
||||
|
||||
# ===== AUTO-REGISTER BUILT-IN PROVIDERS =====
|
||||
|
||||
def _register_builtin_providers():
|
||||
from wled_controller.core.wled_provider import WLEDDeviceProvider
|
||||
register_provider(WLEDDeviceProvider())
|
||||
|
||||
|
||||
_register_builtin_providers()
|
||||
|
||||
170
server/src/wled_controller/core/wled_provider.py
Normal file
170
server/src/wled_controller/core/wled_provider.py
Normal file
@@ -0,0 +1,170 @@
|
||||
"""WLED device provider — consolidates all WLED-specific dispatch logic."""
|
||||
|
||||
import asyncio
|
||||
from typing import List, Optional
|
||||
|
||||
import httpx
|
||||
from zeroconf import ServiceStateChange
|
||||
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
|
||||
|
||||
from wled_controller.core.led_client import (
|
||||
DeviceHealth,
|
||||
DiscoveredDevice,
|
||||
LEDClient,
|
||||
LEDDeviceProvider,
|
||||
)
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
WLED_MDNS_TYPE = "_wled._tcp.local."
|
||||
DEFAULT_SCAN_TIMEOUT = 3.0
|
||||
|
||||
|
||||
class WLEDDeviceProvider(LEDDeviceProvider):
|
||||
"""Provider for WLED LED controllers."""
|
||||
|
||||
@property
|
||||
def device_type(self) -> str:
|
||||
return "wled"
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set:
|
||||
return {"brightness_control", "standby_required"}
|
||||
|
||||
def create_client(self, url: str, **kwargs) -> LEDClient:
|
||||
from wled_controller.core.wled_client import WLEDClient
|
||||
return WLEDClient(url, **kwargs)
|
||||
|
||||
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
|
||||
from wled_controller.core.wled_client import WLEDClient
|
||||
return await WLEDClient.check_health(url, http_client, prev_health)
|
||||
|
||||
async def validate_device(self, url: str) -> dict:
|
||||
"""Validate a WLED device URL by probing /json/info.
|
||||
|
||||
Returns:
|
||||
dict with 'led_count' key.
|
||||
|
||||
Raises:
|
||||
httpx.ConnectError: Device unreachable.
|
||||
httpx.TimeoutException: Connection timed out.
|
||||
ValueError: Invalid LED count.
|
||||
"""
|
||||
url = url.rstrip("/")
|
||||
async with httpx.AsyncClient(timeout=5) as client:
|
||||
response = await client.get(f"{url}/json/info")
|
||||
response.raise_for_status()
|
||||
wled_info = response.json()
|
||||
led_count = wled_info.get("leds", {}).get("count")
|
||||
if not led_count or led_count < 1:
|
||||
raise ValueError(
|
||||
f"WLED device at {url} reported invalid LED count: {led_count}"
|
||||
)
|
||||
logger.info(
|
||||
f"WLED device reachable: {wled_info.get('name', 'Unknown')} "
|
||||
f"v{wled_info.get('ver', '?')} ({led_count} LEDs)"
|
||||
)
|
||||
return {"led_count": led_count}
|
||||
|
||||
# ===== DISCOVERY =====
|
||||
|
||||
async def discover(self, timeout: float = DEFAULT_SCAN_TIMEOUT) -> List[DiscoveredDevice]:
|
||||
"""Scan the local network for WLED devices via mDNS."""
|
||||
discovered: dict[str, AsyncServiceInfo] = {}
|
||||
|
||||
def on_state_change(**kwargs):
|
||||
service_type = kwargs.get("service_type", "")
|
||||
name = kwargs.get("name", "")
|
||||
state_change = kwargs.get("state_change")
|
||||
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
|
||||
discovered[name] = AsyncServiceInfo(service_type, name)
|
||||
|
||||
aiozc = AsyncZeroconf()
|
||||
browser = AsyncServiceBrowser(
|
||||
aiozc.zeroconf, WLED_MDNS_TYPE, handlers=[on_state_change]
|
||||
)
|
||||
|
||||
await asyncio.sleep(timeout)
|
||||
|
||||
# Resolve all discovered services
|
||||
for info in discovered.values():
|
||||
await info.async_request(aiozc.zeroconf, timeout=2000)
|
||||
|
||||
await browser.async_cancel()
|
||||
|
||||
# Build raw list with IPs, then enrich in parallel
|
||||
raw: list[tuple[str, str, str]] = [] # (service_name, ip, url)
|
||||
for name, info in discovered.items():
|
||||
addrs = info.parsed_addresses()
|
||||
if not addrs:
|
||||
continue
|
||||
ip = addrs[0]
|
||||
port = info.port or 80
|
||||
url = f"http://{ip}" if port == 80 else f"http://{ip}:{port}"
|
||||
service_name = name.replace(f".{WLED_MDNS_TYPE}", "")
|
||||
raw.append((service_name, ip, url))
|
||||
|
||||
# Enrich all devices in parallel
|
||||
enrichment = await asyncio.gather(
|
||||
*[self._enrich_device(url, sname) for sname, _, url in raw]
|
||||
)
|
||||
|
||||
results: List[DiscoveredDevice] = []
|
||||
for (service_name, ip, url), (wled_name, version, led_count, mac) in zip(
|
||||
raw, enrichment
|
||||
):
|
||||
results.append(
|
||||
DiscoveredDevice(
|
||||
name=wled_name,
|
||||
url=url,
|
||||
device_type="wled",
|
||||
ip=ip,
|
||||
mac=mac,
|
||||
led_count=led_count,
|
||||
version=version,
|
||||
)
|
||||
)
|
||||
|
||||
await aiozc.async_close()
|
||||
logger.info(f"mDNS scan found {len(results)} WLED device(s)")
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
async def _enrich_device(
|
||||
url: str, fallback_name: str
|
||||
) -> tuple[str, Optional[str], Optional[int], str]:
|
||||
"""Probe a WLED device's /json/info to get name, version, LED count, MAC."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=2) as client:
|
||||
resp = await client.get(f"{url}/json/info")
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return (
|
||||
data.get("name", fallback_name),
|
||||
data.get("ver"),
|
||||
data.get("leds", {}).get("count"),
|
||||
data.get("mac", ""),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not fetch WLED info from {url}: {e}")
|
||||
return fallback_name, None, None, ""
|
||||
|
||||
# ===== BRIGHTNESS =====
|
||||
|
||||
async def get_brightness(self, url: str) -> int:
|
||||
url = url.rstrip("/")
|
||||
async with httpx.AsyncClient(timeout=5.0) as http_client:
|
||||
resp = await http_client.get(f"{url}/json/state")
|
||||
resp.raise_for_status()
|
||||
state = resp.json()
|
||||
return state.get("bri", 255)
|
||||
|
||||
async def set_brightness(self, url: str, brightness: int) -> None:
|
||||
url = url.rstrip("/")
|
||||
async with httpx.AsyncClient(timeout=5.0) as http_client:
|
||||
resp = await http_client.post(
|
||||
f"{url}/json/state",
|
||||
json={"bri": brightness},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
Reference in New Issue
Block a user