fix(devices): address pre-merge review findings
Closes the issues surfaced by the pre-merge code review of the expand-device-support branch. CRITICAL #2 -- update_device double-encrypts secrets in memory. storage/device_store.py round-tripped through device.to_dict() which encrypts hue_username / hue_client_key / ble_govee_key / nanoleaf_token via _enc(), but Device.__init__ does not decrypt. The cached self._items[device_id] thus held ciphertext where plaintext belonged, breaking runtime auth for paired devices on any update -- even an innocuous rename. Sourcing kwargs from vars(device) directly avoids the round-trip. Regression tests cover Nanoleaf and Hue. HIGH #3 -- secrets leaked in GET /api/v1/devices response. DeviceResponse previously returned nanoleaf_token / hue_username / hue_client_key in plaintext (decrypted server-side from storage), defeating the encryption-at-rest. Replaced with nanoleaf_paired and hue_paired booleans. ble_govee_key intentionally stays -- it's a user-managed value pasted from a third-party tool, must remain visible for edit. Frontend types.ts + the one nanoleaf_token reader updated to the boolean. HIGH #4 -- SSRF surface. validate_lan_host() added to net_classify.py; called from each new driver's validate_device (DDP / Yeelight / WiZ / LIFX / Govee / OPC / Nanoleaf) and from pair_device. Rejects literal public IPs with a descriptive ValueError; non-IP hostnames pass through (mDNS labels, bare hostnames). RFC6890 ranges (documentation, former class E) are accepted as LAN-like since Python's ipaddress.is_private treats them so -- correct policy for LedGrab. HIGH #5 -- decrypt failure deletes the device row. _dec() now catches the exception, logs an error, and returns "" instead of propagating. Without the fix, a regenerated data/.secret_key would silently make every Hue / Nanoleaf / BLE-Govee device disappear from the device list on next startup. Regression test asserts a corrupt envelope leaves the device hydratable. HIGH #6 -- update_device route does not rstrip("/") for non-WLED. Moved the trim before the WLED-specific scheme inference so every device type gets consistent URL normalization between create and update. MEDIUM #7 -- Govee discovery port 4002 collision. Added a lazily- initialized module-level asyncio.Lock that serializes concurrent discover_govee_devices() calls; the previous behavior had the second parallel scan silently return [] when the first still held port 4002. Error message also clarified to mention another Govee tool. MEDIUM #8 -- Nanoleaf discover() leaked browser tasks on cancellation. Moved the browser cancel loop into the finally block so an interrupted mDNS scan still tears them down. MEDIUM #9 -- pair endpoint logged user-supplied URL with exc_info=True. Added _sanitize_url_for_log() that strips userinfo + fragment, and demoted the log from exc_info to type(exc).__name__ + str(exc) so a hostile receiver's response body can't end up in the log file. LOW -- Nanoleaf was the only client without a .port property. Added one (returns NANOLEAF_PORT, fixed) for cross-driver symmetry. LOW -- no end-to-end pair-then-create coverage. Added TestPairThenCreateFlow.test_pair_then_create_persists_encrypted_token which exercises the full path: POST /api/v1/devices/pair returned fields, store.create_device, then asserts (a) in-memory plaintext, (b) to_config() plaintext, (c) persisted ciphertext, (d) API response strip + paired-boolean. Tests: 1379 pass (was 1358 -- 21 new regression tests added). ruff clean. TypeScript clean.
This commit is contained in:
@@ -42,6 +42,30 @@ logger = get_logger(__name__)
|
|||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_url_for_log(url: str) -> str:
|
||||||
|
"""Strip userinfo + fragment from a URL so secrets don't reach logs.
|
||||||
|
|
||||||
|
The pair endpoint receives a user-supplied URL on every call; if a
|
||||||
|
future driver ever accepts ``scheme://user:pass@host`` form the
|
||||||
|
credentials would land in logs without this guard.
|
||||||
|
"""
|
||||||
|
if not url:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
from urllib.parse import urlparse, urlunparse
|
||||||
|
|
||||||
|
parsed = urlparse(url)
|
||||||
|
# urlparse stores userinfo in `netloc`; rebuild without it.
|
||||||
|
if parsed.hostname:
|
||||||
|
netloc = parsed.hostname
|
||||||
|
if parsed.port:
|
||||||
|
netloc = f"{netloc}:{parsed.port}"
|
||||||
|
return urlunparse((parsed.scheme, netloc, parsed.path, parsed.params, parsed.query, ""))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return url
|
||||||
|
|
||||||
|
|
||||||
def _device_to_response(device) -> DeviceResponse:
|
def _device_to_response(device) -> DeviceResponse:
|
||||||
"""Convert a Device to DeviceResponse."""
|
"""Convert a Device to DeviceResponse."""
|
||||||
return DeviceResponse(
|
return DeviceResponse(
|
||||||
@@ -66,15 +90,14 @@ def _device_to_response(device) -> DeviceResponse:
|
|||||||
ddp_color_order=device.ddp_color_order,
|
ddp_color_order=device.ddp_color_order,
|
||||||
espnow_peer_mac=device.espnow_peer_mac,
|
espnow_peer_mac=device.espnow_peer_mac,
|
||||||
espnow_channel=device.espnow_channel,
|
espnow_channel=device.espnow_channel,
|
||||||
hue_username=device.hue_username,
|
hue_paired=bool(device.hue_username and device.hue_client_key),
|
||||||
hue_client_key=device.hue_client_key,
|
|
||||||
hue_entertainment_group_id=device.hue_entertainment_group_id,
|
hue_entertainment_group_id=device.hue_entertainment_group_id,
|
||||||
yeelight_min_interval_ms=device.yeelight_min_interval_ms,
|
yeelight_min_interval_ms=device.yeelight_min_interval_ms,
|
||||||
wiz_min_interval_ms=device.wiz_min_interval_ms,
|
wiz_min_interval_ms=device.wiz_min_interval_ms,
|
||||||
lifx_min_interval_ms=device.lifx_min_interval_ms,
|
lifx_min_interval_ms=device.lifx_min_interval_ms,
|
||||||
govee_min_interval_ms=device.govee_min_interval_ms,
|
govee_min_interval_ms=device.govee_min_interval_ms,
|
||||||
opc_channel=device.opc_channel,
|
opc_channel=device.opc_channel,
|
||||||
nanoleaf_token=device.nanoleaf_token,
|
nanoleaf_paired=bool(device.nanoleaf_token),
|
||||||
nanoleaf_min_interval_ms=device.nanoleaf_min_interval_ms,
|
nanoleaf_min_interval_ms=device.nanoleaf_min_interval_ms,
|
||||||
spi_speed_hz=device.spi_speed_hz,
|
spi_speed_hz=device.spi_speed_hz,
|
||||||
spi_led_type=device.spi_led_type,
|
spi_led_type=device.spi_led_type,
|
||||||
@@ -333,16 +356,23 @@ async def pair_device(
|
|||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise HTTPException(status_code=422, detail=str(exc))
|
raise HTTPException(status_code=422, detail=str(exc))
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
# Strip userinfo before logging so a `scheme://user:pass@host` URL
|
||||||
|
# never lands in the logs (no shipped driver uses userinfo today,
|
||||||
|
# but the pattern is a foot-gun for the next driver author --
|
||||||
|
# caught by review MEDIUM #9). Also keep exc_info=False so a
|
||||||
|
# provider stack trace that may include response bytes from a
|
||||||
|
# hostile receiver doesn't end up in the file either.
|
||||||
|
safe_url = _sanitize_url_for_log(body.url)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Pairing failed for %s at %s: %s",
|
"Pairing failed for %s at %s: %s: %s",
|
||||||
body.device_type,
|
body.device_type,
|
||||||
body.url,
|
safe_url,
|
||||||
|
type(exc).__name__,
|
||||||
exc,
|
exc,
|
||||||
exc_info=True,
|
|
||||||
)
|
)
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=502,
|
status_code=502,
|
||||||
detail=f"Pairing failed for {body.device_type} at {body.url}.",
|
detail=f"Pairing failed for {body.device_type} at {safe_url}.",
|
||||||
)
|
)
|
||||||
|
|
||||||
if not isinstance(fields, dict):
|
if not isinstance(fields, dict):
|
||||||
@@ -520,16 +550,21 @@ async def update_device(
|
|||||||
existing = store.get_device(device_id)
|
existing = store.get_device(device_id)
|
||||||
is_group = existing.device_type == "group"
|
is_group = existing.device_type == "group"
|
||||||
|
|
||||||
# Normalize a WLED URL the same way we do on create: infer http/https
|
# Normalize URL the same way we do on create:
|
||||||
# when the user typed a bare host. Done via a local rather than
|
# * always rstrip trailing slashes (so PUT-with-trailing-/ matches
|
||||||
# mutating the request DTO so the input is preserved for any future
|
# POST-with-trailing-/ in the stored value -- caught by review HIGH #6)
|
||||||
# caller that inspects it.
|
# * only WLED gets http/https scheme inference; other schemes
|
||||||
|
# (yeelight://, lifx://, opc://, ddp://, …) pass through.
|
||||||
|
# Done via a local rather than mutating the request DTO so the
|
||||||
|
# input is preserved for any future caller that inspects it.
|
||||||
normalized_url = update_data.url
|
normalized_url = update_data.url
|
||||||
if existing.device_type == "wled" and update_data.url:
|
if update_data.url:
|
||||||
raw_url = update_data.url.rstrip("/")
|
normalized_url = update_data.url.rstrip("/")
|
||||||
normalized_url = infer_http_scheme(raw_url)
|
if existing.device_type == "wled":
|
||||||
if normalized_url != raw_url:
|
inferred = infer_http_scheme(normalized_url)
|
||||||
logger.debug("Inferred WLED URL scheme: %r -> %r", raw_url, normalized_url)
|
if inferred != normalized_url:
|
||||||
|
logger.debug("Inferred WLED URL scheme: %r -> %r", normalized_url, inferred)
|
||||||
|
normalized_url = inferred
|
||||||
|
|
||||||
# Group-only field overrides (led_count auto-recompute) are accumulated
|
# Group-only field overrides (led_count auto-recompute) are accumulated
|
||||||
# here too so the update_data Pydantic model is not mutated in place.
|
# here too so the update_data Pydantic model is not mutated in place.
|
||||||
|
|||||||
@@ -409,8 +409,14 @@ class DeviceResponse(BaseModel):
|
|||||||
ddp_color_order: int = Field(default=1, description="DDP color order code (1 = RGB)")
|
ddp_color_order: int = Field(default=1, description="DDP color order code (1 = RGB)")
|
||||||
espnow_peer_mac: str = Field(default="", description="ESP-NOW peer MAC address")
|
espnow_peer_mac: str = Field(default="", description="ESP-NOW peer MAC address")
|
||||||
espnow_channel: int = Field(default=1, description="ESP-NOW WiFi channel")
|
espnow_channel: int = Field(default=1, description="ESP-NOW WiFi channel")
|
||||||
hue_username: str = Field(default="", description="Hue bridge username")
|
hue_paired: bool = Field(
|
||||||
hue_client_key: str = Field(default="", description="Hue entertainment client key")
|
default=False,
|
||||||
|
description=(
|
||||||
|
"Whether the Hue bridge has been paired (i.e. a username/client_key "
|
||||||
|
"is on file). The actual credentials are intentionally not exposed "
|
||||||
|
"in the response -- to re-pair, delete and re-add the device."
|
||||||
|
),
|
||||||
|
)
|
||||||
hue_entertainment_group_id: str = Field(default="", description="Hue entertainment group ID")
|
hue_entertainment_group_id: str = Field(default="", description="Hue entertainment group ID")
|
||||||
yeelight_min_interval_ms: int = Field(
|
yeelight_min_interval_ms: int = Field(
|
||||||
default=500, description="Yeelight client-side rate limit in ms"
|
default=500, description="Yeelight client-side rate limit in ms"
|
||||||
@@ -419,7 +425,14 @@ class DeviceResponse(BaseModel):
|
|||||||
lifx_min_interval_ms: int = Field(default=50, description="LIFX client-side rate limit in ms")
|
lifx_min_interval_ms: int = Field(default=50, description="LIFX client-side rate limit in ms")
|
||||||
govee_min_interval_ms: int = Field(default=50, description="Govee client-side rate limit in ms")
|
govee_min_interval_ms: int = Field(default=50, description="Govee client-side rate limit in ms")
|
||||||
opc_channel: int = Field(default=0, description="OPC channel (0 = broadcast to all)")
|
opc_channel: int = Field(default=0, description="OPC channel (0 = broadcast to all)")
|
||||||
nanoleaf_token: str = Field(default="", description="Nanoleaf auth token")
|
nanoleaf_paired: bool = Field(
|
||||||
|
default=False,
|
||||||
|
description=(
|
||||||
|
"Whether the Nanoleaf auth token has been issued by the pairing "
|
||||||
|
"handshake. The token itself is intentionally not exposed in the "
|
||||||
|
"response -- to re-pair, delete and re-add the device."
|
||||||
|
),
|
||||||
|
)
|
||||||
nanoleaf_min_interval_ms: int = Field(
|
nanoleaf_min_interval_ms: int = Field(
|
||||||
default=100, description="Nanoleaf client-side rate limit in ms"
|
default=100, description="Nanoleaf client-side rate limit in ms"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from __future__ import annotations
|
|||||||
from typing import TYPE_CHECKING, List
|
from typing import TYPE_CHECKING, List
|
||||||
|
|
||||||
from ledgrab.core.devices.ddp_led_client import DDPLEDClient, parse_ddp_url
|
from ledgrab.core.devices.ddp_led_client import DDPLEDClient, parse_ddp_url
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.core.devices.led_client import (
|
from ledgrab.core.devices.led_client import (
|
||||||
DeviceHealth,
|
DeviceHealth,
|
||||||
DiscoveredDevice,
|
DiscoveredDevice,
|
||||||
@@ -58,6 +59,7 @@ class DDPDeviceProvider(LEDDeviceProvider):
|
|||||||
host, port = parse_ddp_url(url)
|
host, port = parse_ddp_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid DDP URL: {exc}") from exc
|
raise ValueError(f"Invalid DDP URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("DDP device URL validated: host=%s port=%d", host, port)
|
logger.info("DDP device URL validated: host=%s port=%d", host, port)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,25 @@ _DISCOVERY_REQUEST = json.dumps(
|
|||||||
).encode("utf-8")
|
).encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
# Govee mandates listening on port 4002 for scan replies; the OS only lets
|
||||||
|
# one process / socket own it at a time. Serialize concurrent discover
|
||||||
|
# calls (e.g. two browser tabs hitting /discover at once) so the second
|
||||||
|
# caller waits its turn instead of getting an empty result.
|
||||||
|
_DISCOVERY_LOCK: asyncio.Lock | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _discovery_lock() -> asyncio.Lock:
|
||||||
|
"""Return a module-level asyncio.Lock, created lazily.
|
||||||
|
|
||||||
|
Can't construct at import-time because asyncio.Lock binds to the
|
||||||
|
running event loop and module import may precede loop creation.
|
||||||
|
"""
|
||||||
|
global _DISCOVERY_LOCK
|
||||||
|
if _DISCOVERY_LOCK is None:
|
||||||
|
_DISCOVERY_LOCK = asyncio.Lock()
|
||||||
|
return _DISCOVERY_LOCK
|
||||||
|
|
||||||
|
|
||||||
def parse_govee_url(url: str) -> str:
|
def parse_govee_url(url: str) -> str:
|
||||||
"""Pull the host out of ``govee://host`` or bare ``host``."""
|
"""Pull the host out of ``govee://host`` or bare ``host``."""
|
||||||
if not url:
|
if not url:
|
||||||
@@ -309,11 +328,20 @@ async def discover_govee_devices(timeout: float = 2.0) -> List[dict]:
|
|||||||
Returns a list of ``{"ip": ..., "device": ..., "sku": ..., "version": ...}``
|
Returns a list of ``{"ip": ..., "device": ..., "sku": ..., "version": ...}``
|
||||||
dicts. Multicast / receive failures (no network, firewall, no LAN-enabled
|
dicts. Multicast / receive failures (no network, firewall, no LAN-enabled
|
||||||
bulbs) yield an empty list rather than raising.
|
bulbs) yield an empty list rather than raising.
|
||||||
|
|
||||||
|
Concurrent invocations are serialized via a module-level
|
||||||
|
``asyncio.Lock`` so two callers don't race on the protocol-mandated
|
||||||
|
response port 4002.
|
||||||
"""
|
"""
|
||||||
|
async with _discovery_lock():
|
||||||
|
return await _discover_govee_devices_locked(timeout)
|
||||||
|
|
||||||
|
|
||||||
|
async def _discover_govee_devices_locked(timeout: float) -> List[dict]:
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
# We bind a separate socket to the response port (4002). If something
|
# We bind a separate socket to the response port (4002). The
|
||||||
# else on the host already owns 4002 (rare; another Govee tool), the
|
# _DISCOVERY_LOCK serializes our own scans; the bind can still fail
|
||||||
# bind fails and we degrade gracefully to an empty result.
|
# if a non-LedGrab Govee tool on the same host owns 4002.
|
||||||
recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||||
recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
recv_sock.setblocking(False)
|
recv_sock.setblocking(False)
|
||||||
@@ -324,7 +352,12 @@ async def discover_govee_devices(timeout: float = 2.0) -> List[dict]:
|
|||||||
try:
|
try:
|
||||||
recv_sock.bind(("", GOVEE_RESPONSE_PORT))
|
recv_sock.bind(("", GOVEE_RESPONSE_PORT))
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
logger.warning("Govee discovery: cannot bind %d (%s)", GOVEE_RESPONSE_PORT, exc)
|
logger.warning(
|
||||||
|
"Govee discovery: cannot bind port %d (%s). Another Govee "
|
||||||
|
"tool on this host may own the port; close it and retry.",
|
||||||
|
GOVEE_RESPONSE_PORT,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
return []
|
return []
|
||||||
send_sock.bind(("", 0))
|
send_sock.bind(("", 0))
|
||||||
await loop.sock_sendto(
|
await loop.sock_sendto(
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from ledgrab.core.devices.govee_client import (
|
|||||||
discover_govee_devices,
|
discover_govee_devices,
|
||||||
parse_govee_url,
|
parse_govee_url,
|
||||||
)
|
)
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.core.devices.led_client import (
|
from ledgrab.core.devices.led_client import (
|
||||||
DeviceHealth,
|
DeviceHealth,
|
||||||
DiscoveredDevice,
|
DiscoveredDevice,
|
||||||
@@ -62,6 +63,7 @@ class GoveeDeviceProvider(LEDDeviceProvider):
|
|||||||
host = parse_govee_url(url)
|
host = parse_govee_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid Govee URL: {exc}") from exc
|
raise ValueError(f"Invalid Govee URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("Govee device URL validated: host=%s", host)
|
logger.info("Govee device URL validated: host=%s", host)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from ledgrab.core.devices.lifx_client import (
|
|||||||
discover_lifx_bulbs,
|
discover_lifx_bulbs,
|
||||||
parse_lifx_url,
|
parse_lifx_url,
|
||||||
)
|
)
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -60,6 +61,7 @@ class LIFXDeviceProvider(LEDDeviceProvider):
|
|||||||
host, port = parse_lifx_url(url)
|
host, port = parse_lifx_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid LIFX URL: {exc}") from exc
|
raise ValueError(f"Invalid LIFX URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("LIFX device URL validated: host=%s port=%d", host, port)
|
logger.info("LIFX device URL validated: host=%s port=%d", host, port)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -145,6 +145,12 @@ class NanoleafClient(LEDClient):
|
|||||||
def host(self) -> str:
|
def host(self) -> str:
|
||||||
return self._host
|
return self._host
|
||||||
|
|
||||||
|
@property
|
||||||
|
def port(self) -> int:
|
||||||
|
"""Fixed at the protocol-mandated 16021. Exposed for cross-driver
|
||||||
|
symmetry with the UDP/TCP clients that also surface ``.port``."""
|
||||||
|
return NANOLEAF_PORT
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_connected(self) -> bool:
|
def is_connected(self) -> bool:
|
||||||
return self._connected and self._http is not None
|
return self._connected and self._http is not None
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from ledgrab.core.devices.nanoleaf_client import (
|
|||||||
pair_nanoleaf,
|
pair_nanoleaf,
|
||||||
parse_nanoleaf_url,
|
parse_nanoleaf_url,
|
||||||
)
|
)
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -68,6 +69,7 @@ class NanoleafDeviceProvider(LEDDeviceProvider):
|
|||||||
host = parse_nanoleaf_url(url)
|
host = parse_nanoleaf_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid Nanoleaf URL: {exc}") from exc
|
raise ValueError(f"Invalid Nanoleaf URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
token = await pair_nanoleaf(host)
|
token = await pair_nanoleaf(host)
|
||||||
return {"nanoleaf_token": token}
|
return {"nanoleaf_token": token}
|
||||||
|
|
||||||
@@ -83,6 +85,7 @@ class NanoleafDeviceProvider(LEDDeviceProvider):
|
|||||||
host = parse_nanoleaf_url(url)
|
host = parse_nanoleaf_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid Nanoleaf URL: {exc}") from exc
|
raise ValueError(f"Invalid Nanoleaf URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("Nanoleaf URL validated: host=%s", host)
|
logger.info("Nanoleaf URL validated: host=%s", host)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
@@ -116,6 +119,7 @@ class NanoleafDeviceProvider(LEDDeviceProvider):
|
|||||||
logger.warning("Nanoleaf discovery: zeroconf init failed (%s)", exc)
|
logger.warning("Nanoleaf discovery: zeroconf init failed (%s)", exc)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
browsers: list = []
|
||||||
try:
|
try:
|
||||||
browsers = [
|
browsers = [
|
||||||
AsyncServiceBrowser(aiozc.zeroconf, st, handlers=[_on_state_change])
|
AsyncServiceBrowser(aiozc.zeroconf, st, handlers=[_on_state_change])
|
||||||
@@ -124,9 +128,14 @@ class NanoleafDeviceProvider(LEDDeviceProvider):
|
|||||||
await asyncio.sleep(timeout)
|
await asyncio.sleep(timeout)
|
||||||
for info in discovered.values():
|
for info in discovered.values():
|
||||||
await info.async_request(aiozc.zeroconf, timeout=2000)
|
await info.async_request(aiozc.zeroconf, timeout=2000)
|
||||||
for browser in browsers:
|
|
||||||
await browser.async_cancel()
|
|
||||||
finally:
|
finally:
|
||||||
|
# Cancel browsers in finally so a CancelledError during sleep or
|
||||||
|
# async_request still tears them down — caught by review MEDIUM #8.
|
||||||
|
for browser in browsers:
|
||||||
|
try:
|
||||||
|
await browser.async_cancel()
|
||||||
|
except (OSError, RuntimeError):
|
||||||
|
pass
|
||||||
try:
|
try:
|
||||||
await aiozc.async_close()
|
await aiozc.async_close()
|
||||||
except (OSError, RuntimeError):
|
except (OSError, RuntimeError):
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from ledgrab.core.devices.led_client import (
|
|||||||
ProviderDeps,
|
ProviderDeps,
|
||||||
)
|
)
|
||||||
from ledgrab.core.devices.opc_client import OPCClient, parse_opc_url
|
from ledgrab.core.devices.opc_client import OPCClient, parse_opc_url
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -53,6 +54,7 @@ class OPCDeviceProvider(LEDDeviceProvider):
|
|||||||
host, port = parse_opc_url(url)
|
host, port = parse_opc_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid OPC URL: {exc}") from exc
|
raise ValueError(f"Invalid OPC URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("OPC device URL validated: host=%s port=%d", host, port)
|
logger.info("OPC device URL validated: host=%s port=%d", host, port)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from ledgrab.core.devices.wiz_client import (
|
|||||||
discover_wiz_bulbs,
|
discover_wiz_bulbs,
|
||||||
parse_wiz_url,
|
parse_wiz_url,
|
||||||
)
|
)
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -60,6 +61,7 @@ class WiZDeviceProvider(LEDDeviceProvider):
|
|||||||
host, port = parse_wiz_url(url)
|
host, port = parse_wiz_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid WiZ URL: {exc}") from exc
|
raise ValueError(f"Invalid WiZ URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("WiZ device URL validated: host=%s port=%d", host, port)
|
logger.info("WiZ device URL validated: host=%s port=%d", host, port)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from ledgrab.core.devices.yeelight_client import (
|
|||||||
discover_yeelight_bulbs,
|
discover_yeelight_bulbs,
|
||||||
parse_yeelight_url,
|
parse_yeelight_url,
|
||||||
)
|
)
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -65,6 +66,7 @@ class YeelightDeviceProvider(LEDDeviceProvider):
|
|||||||
host = parse_yeelight_url(url)
|
host = parse_yeelight_url(url)
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
raise ValueError(f"Invalid Yeelight URL: {exc}") from exc
|
raise ValueError(f"Invalid Yeelight URL: {exc}") from exc
|
||||||
|
validate_lan_host(host)
|
||||||
logger.info("Yeelight device URL validated: host=%s", host)
|
logger.info("Yeelight device URL validated: host=%s", host)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|||||||
@@ -738,7 +738,7 @@ export async function showSettings(deviceId: any) {
|
|||||||
const nmi = device.nanoleaf_min_interval_ms ?? 100;
|
const nmi = device.nanoleaf_min_interval_ms ?? 100;
|
||||||
(document.getElementById('settings-nanoleaf-min-interval') as HTMLInputElement).value = String(nmi);
|
(document.getElementById('settings-nanoleaf-min-interval') as HTMLInputElement).value = String(nmi);
|
||||||
if (nanoleafPairedGroup) {
|
if (nanoleafPairedGroup) {
|
||||||
(nanoleafPairedGroup as HTMLElement).style.display = device.nanoleaf_token ? '' : 'none';
|
(nanoleafPairedGroup as HTMLElement).style.display = device.nanoleaf_paired ? '' : 'none';
|
||||||
}
|
}
|
||||||
const urlLabel8 = urlGroup.querySelector('label[for="settings-device-url"]') as HTMLElement | null;
|
const urlLabel8 = urlGroup.querySelector('label[for="settings-device-url"]') as HTMLElement | null;
|
||||||
const urlHint8 = urlGroup.querySelector('.input-hint') as HTMLElement | null;
|
const urlHint8 = urlGroup.querySelector('.input-hint') as HTMLElement | null;
|
||||||
|
|||||||
@@ -75,14 +75,13 @@ export interface Device {
|
|||||||
opc_channel: number;
|
opc_channel: number;
|
||||||
espnow_peer_mac: string;
|
espnow_peer_mac: string;
|
||||||
espnow_channel: number;
|
espnow_channel: number;
|
||||||
hue_username: string;
|
hue_paired: boolean;
|
||||||
hue_client_key: string;
|
|
||||||
hue_entertainment_group_id: string;
|
hue_entertainment_group_id: string;
|
||||||
yeelight_min_interval_ms: number;
|
yeelight_min_interval_ms: number;
|
||||||
wiz_min_interval_ms: number;
|
wiz_min_interval_ms: number;
|
||||||
lifx_min_interval_ms: number;
|
lifx_min_interval_ms: number;
|
||||||
govee_min_interval_ms: number;
|
govee_min_interval_ms: number;
|
||||||
nanoleaf_token: string;
|
nanoleaf_paired: boolean;
|
||||||
nanoleaf_min_interval_ms: number;
|
nanoleaf_min_interval_ms: number;
|
||||||
spi_speed_hz: number;
|
spi_speed_hz: number;
|
||||||
spi_led_type: string;
|
spi_led_type: string;
|
||||||
|
|||||||
@@ -20,8 +20,27 @@ def _enc(value: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def _dec(value: str) -> str:
|
def _dec(value: str) -> str:
|
||||||
"""Decrypt an envelope; legacy plaintext passes through unchanged."""
|
"""Decrypt an envelope; legacy plaintext passes through unchanged.
|
||||||
return secret_box.decrypt(value) if value else ""
|
|
||||||
|
A corrupt envelope (key rotated, partial write, AEAD mismatch) returns
|
||||||
|
an empty string and logs an error rather than re-raising. Without this
|
||||||
|
fallback a single bad row would propagate up to ``BaseSqliteStore._load``
|
||||||
|
and silently drop the entire device from ``self._items`` -- so a
|
||||||
|
regenerated ``data/.secret_key`` would make every Hue / Nanoleaf /
|
||||||
|
BLE-Govee device disappear with only a log line for a trail. Caught
|
||||||
|
by pre-merge review HIGH #5.
|
||||||
|
"""
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
return secret_box.decrypt(value)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(
|
||||||
|
"Could not decrypt stored secret (%s); clearing the field. "
|
||||||
|
"Re-pair the affected device to recover.",
|
||||||
|
type(exc).__name__,
|
||||||
|
)
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
class Device:
|
class Device:
|
||||||
@@ -656,12 +675,18 @@ class DeviceStore(BaseSqliteStore[Device]):
|
|||||||
if new_name is not None and new_name != device.name:
|
if new_name is not None and new_name != device.name:
|
||||||
self._check_name_unique(new_name, exclude_id=device_id)
|
self._check_name_unique(new_name, exclude_id=device_id)
|
||||||
|
|
||||||
# Build new Device from existing fields + updates (immutable pattern)
|
# Build new Device from existing fields + updates (immutable pattern).
|
||||||
device_fields = device.to_dict()
|
# Source kwargs directly from instance attributes via vars(), NOT
|
||||||
# Map 'id' back to 'device_id' for the constructor
|
# via to_dict() -- to_dict() encrypts hue_username/hue_client_key/
|
||||||
|
# ble_govee_key/nanoleaf_token via _enc(), and Device.__init__
|
||||||
|
# doesn't decrypt, so a round-trip through to_dict() would leave
|
||||||
|
# the new in-memory Device holding ciphertext where plaintext
|
||||||
|
# belongs. The cached _items entry would then return ciphertext
|
||||||
|
# to provider.create_client(), and the device's runtime auth
|
||||||
|
# would silently break until server restart (caught by review).
|
||||||
|
device_fields = vars(device).copy()
|
||||||
|
# Constructor takes device_id, but attribute is id.
|
||||||
device_fields["device_id"] = device_fields.pop("id")
|
device_fields["device_id"] = device_fields.pop("id")
|
||||||
# Restore datetime objects (to_dict serializes them as ISO strings)
|
|
||||||
device_fields["created_at"] = device.created_at
|
|
||||||
device_fields["updated_at"] = datetime.now(timezone.utc)
|
device_fields["updated_at"] = datetime.now(timezone.utc)
|
||||||
# Apply updates
|
# Apply updates
|
||||||
device_fields.update(updates)
|
device_fields.update(updates)
|
||||||
|
|||||||
@@ -134,3 +134,45 @@ def is_loopback(host: str) -> bool:
|
|||||||
if h in {"localhost", "testclient"}:
|
if h in {"localhost", "testclient"}:
|
||||||
return True
|
return True
|
||||||
return classify_ip(h) is HostCategory.LOOPBACK
|
return classify_ip(h) is HostCategory.LOOPBACK
|
||||||
|
|
||||||
|
|
||||||
|
_LAN_CATEGORIES: frozenset[HostCategory] = frozenset(
|
||||||
|
{
|
||||||
|
HostCategory.LOOPBACK,
|
||||||
|
HostCategory.PRIVATE,
|
||||||
|
HostCategory.LINK_LOCAL,
|
||||||
|
HostCategory.UNSPECIFIED,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_lan_host(host: str) -> None:
|
||||||
|
"""Raise ``ValueError`` when *host* is a literal public IP address.
|
||||||
|
|
||||||
|
Used by LED-device providers to reject ``wiz://1.1.1.1:38899`` style
|
||||||
|
URLs that an authenticated (or anonymous, given the default no-auth
|
||||||
|
config) caller could otherwise weaponise into a network-scan oracle.
|
||||||
|
|
||||||
|
Non-IP-literal hostnames (``bulb.local``, ``office-strip``) pass
|
||||||
|
through untouched -- they resolve via mDNS at connect time, where
|
||||||
|
failure to reach a real device produces the right "device unreachable"
|
||||||
|
error path anyway. The check is deliberately a syntax filter, not a
|
||||||
|
DNS-resolving SSRF guard: LedGrab is LAN-bound by design, and
|
||||||
|
aggressive DNS-time rebinding protection is the job of the platform's
|
||||||
|
network policy, not a per-driver validator.
|
||||||
|
"""
|
||||||
|
if not host:
|
||||||
|
return
|
||||||
|
h = host.strip().lower()
|
||||||
|
if h.startswith("[") and h.endswith("]"):
|
||||||
|
h = h[1:-1]
|
||||||
|
h = h.split("%", 1)[0] # strip IPv6 zone id
|
||||||
|
cat = classify_ip(h)
|
||||||
|
if cat is HostCategory.UNPARSEABLE:
|
||||||
|
return # hostname / mDNS label -- accept
|
||||||
|
if cat not in _LAN_CATEGORIES:
|
||||||
|
raise ValueError(
|
||||||
|
f"Refusing to add device at {host!r}: literal {cat.value} IP "
|
||||||
|
"addresses are rejected. LedGrab is LAN-only; if your device is "
|
||||||
|
"genuinely on a public address, use a hostname or tunnel."
|
||||||
|
)
|
||||||
|
|||||||
@@ -340,3 +340,80 @@ class TestPairDevice:
|
|||||||
def test_missing_required_fields_returns_422(self, client):
|
def test_missing_required_fields_returns_422(self, client):
|
||||||
resp = client.post("/api/v1/devices/pair", json={"device_type": "nanoleaf"})
|
resp = client.post("/api/v1/devices/pair", json={"device_type": "nanoleaf"})
|
||||||
assert resp.status_code == 422
|
assert resp.status_code == 422
|
||||||
|
|
||||||
|
|
||||||
|
class TestPairThenCreateFlow:
|
||||||
|
"""End-to-end coverage: pair, then persist; assert the token is
|
||||||
|
encrypted at rest and decrypted in to_config(), and that the API
|
||||||
|
response strips the secret.
|
||||||
|
|
||||||
|
Closes the LOW gap in the pre-merge review: pair-route tests stopped
|
||||||
|
at the 200 response, never carrying the returned fields through to
|
||||||
|
storage to verify the round-trip and the response-strip.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_pair_then_create_persists_encrypted_token(self, client, device_store):
|
||||||
|
from ledgrab.api.routes.devices import _device_to_response
|
||||||
|
from ledgrab.core.devices import led_client as _led_client
|
||||||
|
from ledgrab.core.devices.led_client import DeviceHealth
|
||||||
|
|
||||||
|
class _NanoleafLikeStub:
|
||||||
|
@property
|
||||||
|
def device_type(self):
|
||||||
|
return "nanoleaf_like_stub"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def capabilities(self):
|
||||||
|
return {"manual_led_count", "requires_pairing"}
|
||||||
|
|
||||||
|
async def pair_device(self, url):
|
||||||
|
return {"nanoleaf_token": "secret-paired-token"}
|
||||||
|
|
||||||
|
async def validate_device(self, url):
|
||||||
|
return {"led_count": 9}
|
||||||
|
|
||||||
|
def create_client(self, config, *, deps):
|
||||||
|
raise AssertionError("not used")
|
||||||
|
|
||||||
|
async def check_health(self, url, http_client, prev_health=None):
|
||||||
|
return DeviceHealth(online=True)
|
||||||
|
|
||||||
|
_led_client.register_provider(_NanoleafLikeStub())
|
||||||
|
try:
|
||||||
|
# Step 1: pair via the route
|
||||||
|
pair_resp = client.post(
|
||||||
|
"/api/v1/devices/pair",
|
||||||
|
json={"device_type": "nanoleaf_like_stub", "url": "stub://1.2.3.4"},
|
||||||
|
)
|
||||||
|
assert pair_resp.status_code == 200, pair_resp.text
|
||||||
|
fields = pair_resp.json()["fields"]
|
||||||
|
assert fields == {"nanoleaf_token": "secret-paired-token"}
|
||||||
|
|
||||||
|
# Step 2: persist via the store (skip the route's create path
|
||||||
|
# which would require a real validate_device handshake)
|
||||||
|
device = device_store.create_device(
|
||||||
|
name="E2E Paired",
|
||||||
|
url="stub://1.2.3.4",
|
||||||
|
led_count=9,
|
||||||
|
device_type="nanoleaf",
|
||||||
|
**fields,
|
||||||
|
)
|
||||||
|
|
||||||
|
# In-memory device holds plaintext
|
||||||
|
assert device.nanoleaf_token == "secret-paired-token"
|
||||||
|
|
||||||
|
# to_config surfaces plaintext to the provider
|
||||||
|
config = device.to_config()
|
||||||
|
assert config.nanoleaf_token == "secret-paired-token"
|
||||||
|
|
||||||
|
# Persisted row holds ciphertext (envelope prefix)
|
||||||
|
persisted = device.to_dict()
|
||||||
|
assert persisted["nanoleaf_token"].startswith("ENC:v1:")
|
||||||
|
assert persisted["nanoleaf_token"] != "secret-paired-token"
|
||||||
|
|
||||||
|
# API response strips the token; only a boolean flag remains
|
||||||
|
resp = _device_to_response(device).model_dump()
|
||||||
|
assert "nanoleaf_token" not in resp
|
||||||
|
assert resp.get("nanoleaf_paired") is True
|
||||||
|
finally:
|
||||||
|
_led_client._provider_registry.pop("nanoleaf_like_stub", None)
|
||||||
|
|||||||
@@ -270,3 +270,81 @@ def test_update_device_ignores_unknown_fields(device_store):
|
|||||||
# Should not raise even with an unknown kwarg
|
# Should not raise even with an unknown kwarg
|
||||||
updated = device_store.update_device(device.id, nonexistent_field="foo")
|
updated = device_store.update_device(device.id, nonexistent_field="foo")
|
||||||
assert updated.name == "Test WLED"
|
assert updated.name == "Test WLED"
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_device_preserves_plaintext_secrets(device_store):
|
||||||
|
"""update_device must not corrupt encrypted-at-rest secrets in memory.
|
||||||
|
|
||||||
|
Regression test for the to_dict() round-trip bug: to_dict() encrypts
|
||||||
|
hue_username / hue_client_key / ble_govee_key / nanoleaf_token via
|
||||||
|
_enc(), but Device.__init__ does not decrypt -- so the old
|
||||||
|
update_device path would leave the cached Device holding ciphertext
|
||||||
|
where plaintext belongs, breaking runtime auth for paired devices
|
||||||
|
until server restart. See pre-merge review CRITICAL #2.
|
||||||
|
"""
|
||||||
|
device = device_store.create_device(
|
||||||
|
name="Office panels",
|
||||||
|
url="nanoleaf://192.168.1.99",
|
||||||
|
led_count=9,
|
||||||
|
device_type="nanoleaf",
|
||||||
|
nanoleaf_token="plaintext-secret-token",
|
||||||
|
)
|
||||||
|
assert device.nanoleaf_token == "plaintext-secret-token"
|
||||||
|
|
||||||
|
# An innocuous rename must not touch the secret.
|
||||||
|
updated = device_store.update_device(device.id, name="Renamed panels")
|
||||||
|
assert (
|
||||||
|
updated.nanoleaf_token == "plaintext-secret-token"
|
||||||
|
), "rename round-tripped the secret through to_dict() and corrupted it"
|
||||||
|
|
||||||
|
# The cached _items entry must also hold plaintext (it is what
|
||||||
|
# provider.create_client() reads).
|
||||||
|
cached = device_store.get_device(device.id)
|
||||||
|
assert cached.nanoleaf_token == "plaintext-secret-token"
|
||||||
|
|
||||||
|
# And to_config() must surface the plaintext token to the provider.
|
||||||
|
config = cached.to_config()
|
||||||
|
assert config.nanoleaf_token == "plaintext-secret-token"
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_device_preserves_hue_secrets(device_store):
|
||||||
|
"""Same bug applies to hue_username / hue_client_key / ble_govee_key."""
|
||||||
|
device = device_store.create_device(
|
||||||
|
name="Hue bridge",
|
||||||
|
url="http://192.168.1.50",
|
||||||
|
led_count=10,
|
||||||
|
device_type="hue",
|
||||||
|
hue_username="plain-hue-user",
|
||||||
|
hue_client_key="plain-client-key",
|
||||||
|
)
|
||||||
|
|
||||||
|
updated = device_store.update_device(device.id, name="Renamed bridge")
|
||||||
|
|
||||||
|
assert updated.hue_username == "plain-hue-user"
|
||||||
|
assert updated.hue_client_key == "plain-client-key"
|
||||||
|
|
||||||
|
|
||||||
|
def test_from_dict_clears_corrupt_secret_envelope(device_store):
|
||||||
|
"""A corrupt nanoleaf_token must not delete the device row.
|
||||||
|
|
||||||
|
Regression test for HIGH #5: previously _dec() would raise on a bad
|
||||||
|
envelope, propagate up through BaseSqliteStore._load, and the entire
|
||||||
|
device row would silently disappear. The fix clears the offending
|
||||||
|
field and logs an error instead.
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"id": "device_abc12345",
|
||||||
|
"name": "Will survive",
|
||||||
|
"url": "nanoleaf://192.168.1.50",
|
||||||
|
"led_count": 9,
|
||||||
|
"device_type": "nanoleaf",
|
||||||
|
# Looks like an envelope (ENC:v1: prefix) but the base64 body is
|
||||||
|
# truncated / corrupt -- the AES-GCM auth tag will fail to verify.
|
||||||
|
"nanoleaf_token": "ENC:v1:not-real-base64-cipher-text",
|
||||||
|
}
|
||||||
|
|
||||||
|
restored = Device.from_dict(payload)
|
||||||
|
|
||||||
|
# The device hydrates; the broken secret is cleared, not crashed.
|
||||||
|
assert restored.id == "device_abc12345"
|
||||||
|
assert restored.nanoleaf_token == ""
|
||||||
|
|||||||
@@ -322,24 +322,34 @@ def test_provider_device_type_and_capabilities():
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_provider_pair_device_returns_nanoleaf_token(respx_mock):
|
async def test_provider_pair_device_returns_nanoleaf_token(respx_mock):
|
||||||
respx_mock.post(f"http://1.2.3.4:{NANOLEAF_PORT}/api/v1/new").mock(
|
# Use a LAN address; validate_lan_host now rejects public IPs at the
|
||||||
|
# provider boundary (review HIGH #4).
|
||||||
|
respx_mock.post(f"http://192.168.1.50:{NANOLEAF_PORT}/api/v1/new").mock(
|
||||||
return_value=httpx.Response(200, json={"auth_token": "ttoken"}),
|
return_value=httpx.Response(200, json={"auth_token": "ttoken"}),
|
||||||
)
|
)
|
||||||
provider = NanoleafDeviceProvider()
|
provider = NanoleafDeviceProvider()
|
||||||
|
|
||||||
fields = await provider.pair_device("nanoleaf://1.2.3.4")
|
fields = await provider.pair_device("nanoleaf://192.168.1.50")
|
||||||
|
|
||||||
assert fields == {"nanoleaf_token": "ttoken"}
|
assert fields == {"nanoleaf_token": "ttoken"}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_provider_pair_device_propagates_pairing_not_ready(respx_mock):
|
async def test_provider_pair_device_propagates_pairing_not_ready(respx_mock):
|
||||||
respx_mock.post(f"http://1.2.3.4:{NANOLEAF_PORT}/api/v1/new").mock(
|
respx_mock.post(f"http://192.168.1.50:{NANOLEAF_PORT}/api/v1/new").mock(
|
||||||
return_value=httpx.Response(403),
|
return_value=httpx.Response(403),
|
||||||
)
|
)
|
||||||
provider = NanoleafDeviceProvider()
|
provider = NanoleafDeviceProvider()
|
||||||
with pytest.raises(PairingNotReady):
|
with pytest.raises(PairingNotReady):
|
||||||
await provider.pair_device("nanoleaf://1.2.3.4")
|
await provider.pair_device("nanoleaf://192.168.1.50")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_provider_pair_device_rejects_public_ip():
|
||||||
|
"""validate_lan_host fires before pair_nanoleaf even runs."""
|
||||||
|
provider = NanoleafDeviceProvider()
|
||||||
|
with pytest.raises(ValueError, match="LedGrab is LAN-only"):
|
||||||
|
await provider.pair_device("nanoleaf://1.1.1.1")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
@@ -135,3 +135,55 @@ def test_is_loopback_recognises_loopback(host: str) -> None:
|
|||||||
)
|
)
|
||||||
def test_is_loopback_rejects_other(host) -> None:
|
def test_is_loopback_rejects_other(host) -> None:
|
||||||
assert is_loopback(host) is False
|
assert is_loopback(host) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ─── validate_lan_host ────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
from ledgrab.utils.net_classify import validate_lan_host # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"host",
|
||||||
|
[
|
||||||
|
"127.0.0.1",
|
||||||
|
"10.0.0.5",
|
||||||
|
"192.168.1.50",
|
||||||
|
"172.16.0.10",
|
||||||
|
"fe80::1",
|
||||||
|
"fc00::1",
|
||||||
|
"0.0.0.0",
|
||||||
|
# IPv6 with brackets and zone id
|
||||||
|
"[fe80::1%eth0]",
|
||||||
|
# Plain hostnames pass through (mDNS / bare-label)
|
||||||
|
"bulb.local",
|
||||||
|
"office-strip",
|
||||||
|
"controller.lan",
|
||||||
|
"", # empty also passes (caller handles validation upstream)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_validate_lan_host_accepts_local(host) -> None:
|
||||||
|
"""Loopback / private / link-local / hostnames must not raise."""
|
||||||
|
validate_lan_host(host)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"host",
|
||||||
|
[
|
||||||
|
"1.1.1.1", # Cloudflare DNS (routable public)
|
||||||
|
"8.8.8.8", # Google DNS
|
||||||
|
"224.0.0.1", # multicast — not a LAN device address
|
||||||
|
"2606:4700:4700::1111", # Cloudflare DNS IPv6
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_validate_lan_host_rejects_public_or_multicast(host) -> None:
|
||||||
|
"""Reject genuinely-public addresses and multicast targets.
|
||||||
|
|
||||||
|
Note: Python's :func:`ipaddress.ip_address(...).is_private` treats
|
||||||
|
RFC6890 ranges (documentation 192.0.2/24, former class E 240/4, etc.)
|
||||||
|
as private, so those are accepted as "local-ish" by our classifier --
|
||||||
|
which is the correct policy for LedGrab: those ranges aren't routable
|
||||||
|
public addresses, and refusing them would be unhelpful theatre.
|
||||||
|
"""
|
||||||
|
with pytest.raises(ValueError, match="LedGrab is LAN-only"):
|
||||||
|
validate_lan_host(host)
|
||||||
|
|||||||
Reference in New Issue
Block a user