refactor: comprehensive code quality, security, and release readiness improvements
Some checks failed
Lint & Test / test (push) Failing after 48s

Security: tighten CORS defaults, add webhook rate limiting, fix XSS in
automations, guard WebSocket JSON.parse, validate ADB address input,
seal debug exception leak, URL-encode WS tokens, CSS.escape in selectors.

Code quality: add Pydantic models for brightness/power endpoints, fix
thread safety and name uniqueness in DeviceStore, immutable update
pattern, split 6 oversized files into 16 focused modules, enable
TypeScript strictNullChecks (741→102 errors), type state variables,
add dom-utils helper, migrate 3 modules from inline onclick to event
delegation, ProcessorDependencies dataclass.

Performance: async store saves, health endpoint log level, command
palette debounce, optimized entity-events comparison, fix service
worker precache list.

Testing: expand from 45 to 293 passing tests — add store tests (141),
route tests (25), core logic tests (42), E2E flow tests (33), organize
into tests/api/, tests/storage/, tests/core/, tests/e2e/.

DevOps: CI test pipeline, pre-commit config, Dockerfile multi-stage
build with non-root user and health check, docker-compose improvements,
version bump to 0.2.0.

Docs: rewrite CLAUDE.md (202→56 lines), server/CLAUDE.md (212→76),
create contexts/server-operations.md, fix .js→.ts references, fix env
var prefix in README, rewrite INSTALLATION.md, add CONTRIBUTING.md and
.env.example.
This commit is contained in:
2026-03-22 00:38:28 +03:00
parent 07bb89e9b7
commit f2871319cb
115 changed files with 9808 additions and 5818 deletions

View File

View File

View File

@@ -0,0 +1,196 @@
"""Tests for device CRUD routes.
These tests exercise the FastAPI route handlers using dependency override
to inject test stores, avoiding real hardware dependencies.
"""
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from wled_controller.api.routes.devices import router
from wled_controller.storage.device_store import Device, DeviceStore
from wled_controller.storage.output_target_store import OutputTargetStore
from wled_controller.core.processing.processor_manager import ProcessorManager
from wled_controller.api import dependencies as deps
# ---------------------------------------------------------------------------
# App + fixtures (isolated from the real main app)
# ---------------------------------------------------------------------------
def _make_app():
"""Build a minimal FastAPI app with just the devices router + overrides."""
app = FastAPI()
app.include_router(router)
return app
@pytest.fixture
def device_store(tmp_path):
return DeviceStore(tmp_path / "devices.json")
@pytest.fixture
def output_target_store(tmp_path):
return OutputTargetStore(str(tmp_path / "output_targets.json"))
@pytest.fixture
def processor_manager():
"""A mock ProcessorManager — avoids real hardware."""
m = MagicMock(spec=ProcessorManager)
m.add_device = MagicMock()
m.remove_device = AsyncMock()
m.update_device_info = MagicMock()
m.find_device_state = MagicMock(return_value=None)
m.get_all_device_health_dicts = MagicMock(return_value=[])
return m
@pytest.fixture
def client(device_store, output_target_store, processor_manager):
app = _make_app()
# Override auth to always pass
from wled_controller.api.auth import verify_api_key
app.dependency_overrides[verify_api_key] = lambda: "test-user"
# Override stores and manager
app.dependency_overrides[deps.get_device_store] = lambda: device_store
app.dependency_overrides[deps.get_output_target_store] = lambda: output_target_store
app.dependency_overrides[deps.get_processor_manager] = lambda: processor_manager
return TestClient(app, raise_server_exceptions=False)
# ---------------------------------------------------------------------------
# Helper to pre-populate a device
# ---------------------------------------------------------------------------
def _seed_device(store: DeviceStore, name="Test Device", led_count=100) -> Device:
return store.create_device(
name=name,
url="http://192.168.1.100",
led_count=led_count,
)
# ---------------------------------------------------------------------------
# LIST
# ---------------------------------------------------------------------------
class TestListDevices:
def test_list_empty(self, client):
resp = client.get("/api/v1/devices")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0
assert data["devices"] == []
def test_list_with_devices(self, client, device_store):
_seed_device(device_store, "Dev A")
_seed_device(device_store, "Dev B")
resp = client.get("/api/v1/devices")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 2
# ---------------------------------------------------------------------------
# GET by ID
# ---------------------------------------------------------------------------
class TestGetDevice:
def test_get_existing(self, client, device_store):
d = _seed_device(device_store)
resp = client.get(f"/api/v1/devices/{d.id}")
assert resp.status_code == 200
data = resp.json()
assert data["id"] == d.id
assert data["name"] == "Test Device"
def test_get_not_found(self, client):
resp = client.get("/api/v1/devices/nonexistent")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# UPDATE
# ---------------------------------------------------------------------------
class TestUpdateDevice:
def test_update_name(self, client, device_store):
d = _seed_device(device_store)
resp = client.put(
f"/api/v1/devices/{d.id}",
json={"name": "Renamed"},
)
assert resp.status_code == 200
assert resp.json()["name"] == "Renamed"
def test_update_led_count(self, client, device_store):
d = _seed_device(device_store, led_count=100)
resp = client.put(
f"/api/v1/devices/{d.id}",
json={"led_count": 300},
)
assert resp.status_code == 200
assert resp.json()["led_count"] == 300
def test_update_not_found(self, client):
resp = client.put(
"/api/v1/devices/missing_id",
json={"name": "X"},
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# DELETE
# ---------------------------------------------------------------------------
class TestDeleteDevice:
def test_delete_existing(self, client, device_store):
d = _seed_device(device_store)
resp = client.delete(f"/api/v1/devices/{d.id}")
assert resp.status_code == 204
assert device_store.count() == 0
def test_delete_not_found(self, client):
resp = client.delete("/api/v1/devices/missing_id")
assert resp.status_code == 404
def test_delete_referenced_by_target_returns_409(
self, client, device_store, output_target_store
):
d = _seed_device(device_store)
output_target_store.create_target(
name="Target",
target_type="led",
device_id=d.id,
)
resp = client.delete(f"/api/v1/devices/{d.id}")
assert resp.status_code == 409
assert "referenced" in resp.json()["detail"].lower()
# ---------------------------------------------------------------------------
# Batch states
# ---------------------------------------------------------------------------
class TestBatchStates:
def test_batch_states(self, client):
resp = client.get("/api/v1/devices/batch/states")
assert resp.status_code == 200
assert "states" in resp.json()

View File

@@ -0,0 +1,74 @@
"""Tests for system routes — health, version.
These tests use the FastAPI TestClient against the real app. The health
and version endpoints do NOT require authentication, so we can test them
without setting up the full dependency injection.
"""
import pytest
from fastapi.testclient import TestClient
from wled_controller import __version__
@pytest.fixture(scope="module")
def client():
"""Provide a test client for the main app.
The app module initializes stores from the default config on import,
which is acceptable for read-only endpoints tested here.
"""
from wled_controller.main import app
return TestClient(app, raise_server_exceptions=False)
class TestHealthEndpoint:
def test_health_returns_200(self, client):
resp = client.get("/health")
assert resp.status_code == 200
def test_health_response_structure(self, client):
data = client.get("/health").json()
assert data["status"] == "healthy"
assert data["version"] == __version__
assert "timestamp" in data
def test_health_no_auth_required(self, client):
"""Health endpoint should work without Authorization header."""
resp = client.get("/health")
assert resp.status_code == 200
class TestVersionEndpoint:
def test_version_returns_200(self, client):
resp = client.get("/api/v1/version")
assert resp.status_code == 200
def test_version_response_fields(self, client):
data = client.get("/api/v1/version").json()
assert data["version"] == __version__
assert "python_version" in data
assert data["api_version"] == "v1"
assert "demo_mode" in data
class TestOpenAPIEndpoint:
def test_openapi_available(self, client):
resp = client.get("/openapi.json")
assert resp.status_code == 200
data = resp.json()
assert "info" in data
assert data["info"]["version"] == __version__
def test_swagger_ui(self, client):
resp = client.get("/docs")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
class TestRootEndpoint:
def test_root_returns_html(self, client):
resp = client.get("/")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]

View File

@@ -0,0 +1,67 @@
"""Tests for webhook routes — trigger, validation, rate limiting."""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from wled_controller.api.routes.webhooks import _check_rate_limit, _rate_hits
# ---------------------------------------------------------------------------
# Rate limiter unit tests (pure function, no HTTP)
# ---------------------------------------------------------------------------
class TestRateLimiter:
def setup_method(self):
"""Clear rate-limit state between tests."""
_rate_hits.clear()
def test_allows_under_limit(self):
for _ in range(29):
_check_rate_limit("1.2.3.4") # should not raise
def test_rejects_at_limit(self):
for _ in range(30):
_check_rate_limit("1.2.3.4")
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
_check_rate_limit("1.2.3.4")
assert exc_info.value.status_code == 429
def test_separate_ips_independent(self):
for _ in range(30):
_check_rate_limit("10.0.0.1")
# Different IP should still be allowed
_check_rate_limit("10.0.0.2") # should not raise
def test_window_expiry(self):
"""Timestamps outside the 60s window are pruned."""
old_time = time.time() - 120 # 2 minutes ago
_rate_hits["1.2.3.4"] = [old_time] * 30
# Old entries should be pruned, allowing new requests
_check_rate_limit("1.2.3.4") # should not raise
# ---------------------------------------------------------------------------
# Webhook payload validation
# ---------------------------------------------------------------------------
class TestWebhookPayload:
def test_valid_payload_model(self):
from wled_controller.api.routes.webhooks import WebhookPayload
p = WebhookPayload(action="activate")
assert p.action == "activate"
p2 = WebhookPayload(action="deactivate")
assert p2.action == "deactivate"
def test_arbitrary_action_accepted_by_model(self):
"""The model accepts any string; validation is in the route handler."""
from wled_controller.api.routes.webhooks import WebhookPayload
p = WebhookPayload(action="bogus")
assert p.action == "bogus"

View File

@@ -1,13 +1,39 @@
"""Pytest configuration and fixtures."""
"""Pytest configuration and shared fixtures."""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from pathlib import Path
from wled_controller.config import Config, StorageConfig, ServerConfig, AuthConfig
from wled_controller.storage.device_store import Device, DeviceStore
from wled_controller.storage.sync_clock import SyncClock
from wled_controller.storage.sync_clock_store import SyncClockStore
from wled_controller.storage.output_target_store import OutputTargetStore
from wled_controller.storage.automation import (
Automation,
AlwaysCondition,
WebhookCondition,
)
from wled_controller.storage.automation_store import AutomationStore
from wled_controller.storage.value_source import StaticValueSource, ValueSource
from wled_controller.storage.value_source_store import ValueSourceStore
# ---------------------------------------------------------------------------
# Directory / path fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def test_data_dir(tmp_path):
"""Provide a temporary directory for test data."""
return tmp_path / "data"
d = tmp_path / "data"
d.mkdir(parents=True, exist_ok=True)
return d
@pytest.fixture
@@ -16,6 +42,198 @@ def test_config_dir(tmp_path):
return tmp_path / "config"
@pytest.fixture
def temp_store_dir(tmp_path):
"""Provide a temp directory for JSON store files, cleaned up after tests."""
d = tmp_path / "stores"
d.mkdir(parents=True, exist_ok=True)
return d
# ---------------------------------------------------------------------------
# Config fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def test_config(tmp_path):
"""A Config instance with temp directories for all store files."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True, exist_ok=True)
storage = StorageConfig(
devices_file=str(data_dir / "devices.json"),
templates_file=str(data_dir / "capture_templates.json"),
postprocessing_templates_file=str(data_dir / "postprocessing_templates.json"),
picture_sources_file=str(data_dir / "picture_sources.json"),
output_targets_file=str(data_dir / "output_targets.json"),
pattern_templates_file=str(data_dir / "pattern_templates.json"),
color_strip_sources_file=str(data_dir / "color_strip_sources.json"),
audio_sources_file=str(data_dir / "audio_sources.json"),
audio_templates_file=str(data_dir / "audio_templates.json"),
value_sources_file=str(data_dir / "value_sources.json"),
automations_file=str(data_dir / "automations.json"),
scene_presets_file=str(data_dir / "scene_presets.json"),
color_strip_processing_templates_file=str(data_dir / "color_strip_processing_templates.json"),
sync_clocks_file=str(data_dir / "sync_clocks.json"),
)
return Config(
server=ServerConfig(host="127.0.0.1", port=9999),
auth=AuthConfig(api_keys={"test": "test-api-key-12345"}),
storage=storage,
)
# ---------------------------------------------------------------------------
# Store fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def device_store(temp_store_dir):
"""Provide a DeviceStore backed by a temp file."""
return DeviceStore(temp_store_dir / "devices.json")
@pytest.fixture
def sync_clock_store(temp_store_dir):
"""Provide a SyncClockStore backed by a temp file."""
return SyncClockStore(str(temp_store_dir / "sync_clocks.json"))
@pytest.fixture
def output_target_store(temp_store_dir):
"""Provide an OutputTargetStore backed by a temp file."""
return OutputTargetStore(str(temp_store_dir / "output_targets.json"))
@pytest.fixture
def automation_store(temp_store_dir):
"""Provide an AutomationStore backed by a temp file."""
return AutomationStore(str(temp_store_dir / "automations.json"))
@pytest.fixture
def value_source_store(temp_store_dir):
"""Provide a ValueSourceStore backed by a temp file."""
return ValueSourceStore(str(temp_store_dir / "value_sources.json"))
# ---------------------------------------------------------------------------
# Sample entity factories
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_device():
"""Provide a sample device configuration dict."""
return {
"id": "test_device_001",
"name": "Test WLED Device",
"url": "http://192.168.1.100",
"led_count": 150,
"enabled": True,
"device_type": "wled",
}
@pytest.fixture
def make_device():
"""Factory fixture: call make_device(name=..., **overrides) to build a Device."""
_counter = 0
def _factory(name=None, **kwargs):
nonlocal _counter
_counter += 1
defaults = dict(
device_id=f"device_test_{_counter:04d}",
name=name or f"Device {_counter}",
url=f"http://192.168.1.{_counter}",
led_count=150,
)
defaults.update(kwargs)
return Device(**defaults)
return _factory
@pytest.fixture
def make_sync_clock():
"""Factory fixture: call make_sync_clock(name=..., **overrides)."""
_counter = 0
def _factory(name=None, **kwargs):
nonlocal _counter
_counter += 1
now = datetime.now(timezone.utc)
defaults = dict(
id=f"sc_test_{_counter:04d}",
name=name or f"Clock {_counter}",
speed=1.0,
created_at=now,
updated_at=now,
)
defaults.update(kwargs)
return SyncClock(**defaults)
return _factory
@pytest.fixture
def make_automation():
"""Factory fixture: call make_automation(name=..., **overrides)."""
_counter = 0
def _factory(name=None, **kwargs):
nonlocal _counter
_counter += 1
now = datetime.now(timezone.utc)
defaults = dict(
id=f"auto_test_{_counter:04d}",
name=name or f"Automation {_counter}",
enabled=True,
condition_logic="or",
conditions=[],
scene_preset_id=None,
deactivation_mode="none",
deactivation_scene_preset_id=None,
created_at=now,
updated_at=now,
)
defaults.update(kwargs)
return Automation(**defaults)
return _factory
# ---------------------------------------------------------------------------
# Authenticated test client
# ---------------------------------------------------------------------------
@pytest.fixture
def authenticated_client(test_config, monkeypatch):
"""Provide a FastAPI TestClient with auth header pre-set.
Patches global config so the app uses temp storage paths.
"""
import wled_controller.config as config_mod
monkeypatch.setattr(config_mod, "config", test_config)
from fastapi.testclient import TestClient
from wled_controller.main import app
client = TestClient(app, raise_server_exceptions=False)
client.headers["Authorization"] = "Bearer test-api-key-12345"
return client
# ---------------------------------------------------------------------------
# Calibration sample (kept from original conftest)
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_calibration():
"""Provide a sample calibration configuration."""
@@ -29,21 +247,3 @@ def sample_calibration():
{"edge": "left", "led_start": 110, "led_count": 40, "reverse": True},
],
}
@pytest.fixture
def sample_device():
"""Provide a sample device configuration."""
return {
"id": "test_device_001",
"name": "Test WLED Device",
"url": "http://192.168.1.100",
"led_count": 150,
"enabled": True,
"settings": {
"display_index": 0,
"fps": 30,
"border_width": 10,
"brightness": 0.8,
},
}

View File

View File

@@ -0,0 +1,290 @@
"""Tests for AutomationEngine — condition evaluation in isolation."""
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from wled_controller.core.automations.automation_engine import AutomationEngine
from wled_controller.storage.automation import (
AlwaysCondition,
ApplicationCondition,
Automation,
DisplayStateCondition,
MQTTCondition,
StartupCondition,
SystemIdleCondition,
TimeOfDayCondition,
WebhookCondition,
)
from wled_controller.storage.automation_store import AutomationStore
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_store(tmp_path) -> AutomationStore:
return AutomationStore(str(tmp_path / "auto.json"))
@pytest.fixture
def mock_manager():
m = MagicMock()
m.fire_event = MagicMock()
return m
@pytest.fixture
def engine(mock_store, mock_manager) -> AutomationEngine:
"""Build an AutomationEngine with the PlatformDetector mocked out.
PlatformDetector starts a Windows display-power listener thread that
causes access violations in the test environment, so we replace it
with a simple MagicMock.
"""
with patch(
"wled_controller.core.automations.automation_engine.PlatformDetector"
):
eng = AutomationEngine(
automation_store=mock_store,
processor_manager=mock_manager,
poll_interval=0.1,
)
return eng
# ---------------------------------------------------------------------------
# Condition evaluation (unit-level)
# ---------------------------------------------------------------------------
class TestConditionEvaluation:
"""Test _evaluate_condition for each condition type individually."""
def _make_automation(self, conditions):
now = datetime.now(timezone.utc)
return Automation(
id="test_auto",
name="Test",
enabled=True,
condition_logic="or",
conditions=conditions,
scene_preset_id=None,
deactivation_mode="none",
deactivation_scene_preset_id=None,
created_at=now,
updated_at=now,
)
def _eval(self, engine, condition, **kwargs):
"""Invoke the private _evaluate_condition method."""
defaults = dict(
running_procs=set(),
topmost_proc=None,
topmost_fullscreen=False,
fullscreen_procs=set(),
idle_seconds=None,
display_state=None,
)
defaults.update(kwargs)
return engine._evaluate_condition(
condition,
defaults["running_procs"],
defaults["topmost_proc"],
defaults["topmost_fullscreen"],
defaults["fullscreen_procs"],
defaults["idle_seconds"],
defaults["display_state"],
)
def test_always_true(self, engine):
assert self._eval(engine, AlwaysCondition()) is True
def test_startup_true(self, engine):
assert self._eval(engine, StartupCondition()) is True
def test_application_running_match(self, engine):
cond = ApplicationCondition(apps=["chrome.exe"], match_type="running")
result = self._eval(
engine, cond,
running_procs={"chrome.exe", "explorer.exe"},
)
assert result is True
def test_application_running_no_match(self, engine):
cond = ApplicationCondition(apps=["chrome.exe"], match_type="running")
result = self._eval(
engine, cond,
running_procs={"explorer.exe"},
)
assert result is False
def test_application_topmost_match(self, engine):
cond = ApplicationCondition(apps=["game.exe"], match_type="topmost")
result = self._eval(
engine, cond,
topmost_proc="game.exe",
)
assert result is True
def test_application_topmost_no_match(self, engine):
cond = ApplicationCondition(apps=["game.exe"], match_type="topmost")
result = self._eval(
engine, cond,
topmost_proc="chrome.exe",
)
assert result is False
def test_time_of_day_within_range(self, engine):
cond = TimeOfDayCondition(start_time="00:00", end_time="23:59")
result = self._eval(engine, cond)
assert result is True
def test_system_idle_when_idle(self, engine):
cond = SystemIdleCondition(idle_minutes=5, when_idle=True)
result = self._eval(engine, cond, idle_seconds=600.0) # 10 minutes idle
assert result is True
def test_system_idle_not_idle(self, engine):
cond = SystemIdleCondition(idle_minutes=5, when_idle=True)
result = self._eval(engine, cond, idle_seconds=60.0) # 1 minute idle
assert result is False
def test_system_idle_when_not_idle(self, engine):
"""when_idle=False means active when user is NOT idle."""
cond = SystemIdleCondition(idle_minutes=5, when_idle=False)
result = self._eval(engine, cond, idle_seconds=60.0) # 1 min idle (not yet 5)
assert result is True
def test_display_state_match(self, engine):
cond = DisplayStateCondition(state="on")
result = self._eval(engine, cond, display_state="on")
assert result is True
def test_display_state_no_match(self, engine):
cond = DisplayStateCondition(state="off")
result = self._eval(engine, cond, display_state="on")
assert result is False
def test_webhook_active(self, engine):
cond = WebhookCondition(token="tok123")
engine._webhook_states["tok123"] = True
result = self._eval(engine, cond)
assert result is True
def test_webhook_inactive(self, engine):
cond = WebhookCondition(token="tok123")
# Not in _webhook_states → False
result = self._eval(engine, cond)
assert result is False
# ---------------------------------------------------------------------------
# Condition logic (AND / OR)
# ---------------------------------------------------------------------------
class TestConditionLogic:
def _make_automation(self, conditions, logic="or"):
now = datetime.now(timezone.utc)
return Automation(
id="logic_auto",
name="Logic",
enabled=True,
condition_logic=logic,
conditions=conditions,
scene_preset_id=None,
deactivation_mode="none",
deactivation_scene_preset_id=None,
created_at=now,
updated_at=now,
)
def test_or_any_true(self, engine):
auto = self._make_automation(
[
ApplicationCondition(apps=["missing.exe"], match_type="running"),
AlwaysCondition(),
],
logic="or",
)
result = engine._evaluate_conditions(
auto,
running_procs=set(),
topmost_proc=None,
topmost_fullscreen=False,
fullscreen_procs=set(),
idle_seconds=None,
display_state=None,
)
assert result is True
def test_and_all_must_be_true(self, engine):
auto = self._make_automation(
[
AlwaysCondition(),
ApplicationCondition(apps=["missing.exe"], match_type="running"),
],
logic="and",
)
result = engine._evaluate_conditions(
auto,
running_procs=set(),
topmost_proc=None,
topmost_fullscreen=False,
fullscreen_procs=set(),
idle_seconds=None,
display_state=None,
)
assert result is False
# ---------------------------------------------------------------------------
# Webhook state management
# ---------------------------------------------------------------------------
class TestWebhookState:
@pytest.mark.asyncio
async def test_set_webhook_state_activate(self, engine):
await engine.set_webhook_state("tok_1", True)
assert engine._webhook_states["tok_1"] is True
@pytest.mark.asyncio
async def test_set_webhook_state_deactivate(self, engine):
engine._webhook_states["tok_1"] = True
await engine.set_webhook_state("tok_1", False)
assert engine._webhook_states["tok_1"] is False
# ---------------------------------------------------------------------------
# Start / Stop lifecycle
# ---------------------------------------------------------------------------
class TestEngineLifecycle:
@pytest.mark.asyncio
async def test_start_creates_task(self, engine):
await engine.start()
assert engine._task is not None
await engine.stop()
@pytest.mark.asyncio
async def test_stop_cancels_task(self, engine):
await engine.start()
await engine.stop()
assert engine._task is None
@pytest.mark.asyncio
async def test_double_start_is_safe(self, engine):
await engine.start()
await engine.start() # no-op
await engine.stop()
@pytest.mark.asyncio
async def test_stop_without_start_is_safe(self, engine):
await engine.stop() # no-op

View File

@@ -0,0 +1,185 @@
"""Tests for SyncClockRuntime — thread-safe timing, pause/resume/reset."""
import threading
import time
import pytest
from wled_controller.core.processing.sync_clock_runtime import SyncClockRuntime
class TestSyncClockRuntimeInit:
def test_default_speed(self):
rt = SyncClockRuntime()
assert rt.speed == 1.0
def test_custom_speed(self):
rt = SyncClockRuntime(speed=2.5)
assert rt.speed == 2.5
def test_starts_running(self):
rt = SyncClockRuntime()
assert rt.is_running is True
def test_initial_time_near_zero(self):
rt = SyncClockRuntime()
t = rt.get_time()
assert 0.0 <= t < 0.1 # should be very small right after creation
class TestSyncClockRuntimeSpeed:
def test_set_speed(self):
rt = SyncClockRuntime()
rt.speed = 3.0
assert rt.speed == 3.0
def test_speed_zero(self):
rt = SyncClockRuntime()
rt.speed = 0.0
assert rt.speed == 0.0
def test_speed_negative(self):
"""Negative speed is allowed at the runtime level (clamping is store-level)."""
rt = SyncClockRuntime()
rt.speed = -1.0
assert rt.speed == -1.0
class TestSyncClockRuntimeGetTime:
def test_time_advances(self):
rt = SyncClockRuntime()
t1 = rt.get_time()
time.sleep(0.05)
t2 = rt.get_time()
assert t2 > t1
def test_time_is_real_seconds(self):
"""get_time returns real elapsed seconds, NOT speed-scaled."""
rt = SyncClockRuntime(speed=5.0)
time.sleep(0.1)
t = rt.get_time()
# Should be roughly 0.1s (real time), not 0.5s (speed-scaled)
assert 0.05 < t < 0.5
class TestSyncClockRuntimePauseResume:
def test_pause_freezes_time(self):
rt = SyncClockRuntime()
time.sleep(0.05)
rt.pause()
t1 = rt.get_time()
time.sleep(0.05)
t2 = rt.get_time()
assert t1 == t2 # time should not advance while paused
def test_pause_sets_not_running(self):
rt = SyncClockRuntime()
rt.pause()
assert rt.is_running is False
def test_resume_unfreezes_time(self):
rt = SyncClockRuntime()
rt.pause()
time.sleep(0.02)
rt.resume()
assert rt.is_running is True
t1 = rt.get_time()
time.sleep(0.05)
t2 = rt.get_time()
assert t2 > t1
def test_resume_preserves_offset(self):
"""After pause+resume, time continues from where it was paused."""
rt = SyncClockRuntime()
time.sleep(0.05)
rt.pause()
paused_time = rt.get_time()
time.sleep(0.1)
rt.resume()
resumed_time = rt.get_time()
# Resumed time should be close to paused time (not reset, not including pause gap)
assert abs(resumed_time - paused_time) < 0.05
def test_double_pause_is_safe(self):
rt = SyncClockRuntime()
time.sleep(0.02)
rt.pause()
t1 = rt.get_time()
rt.pause() # second pause should be a no-op
t2 = rt.get_time()
assert t1 == t2
def test_double_resume_is_safe(self):
rt = SyncClockRuntime()
rt.resume() # already running, should be a no-op
assert rt.is_running is True
class TestSyncClockRuntimeReset:
def test_reset_sets_time_to_zero(self):
rt = SyncClockRuntime()
time.sleep(0.05)
rt.reset()
t = rt.get_time()
assert t < 0.02 # should be very close to zero
def test_reset_while_paused(self):
rt = SyncClockRuntime()
time.sleep(0.05)
rt.pause()
rt.reset()
# After reset, offset is 0 but clock is still paused — is_running unchanged
# The reset resets offset and epoch but doesn't change running state
t = rt.get_time()
# Time might be ~0 if paused, or very small if running
assert t < 0.05
def test_reset_preserves_speed(self):
rt = SyncClockRuntime(speed=3.0)
time.sleep(0.05)
rt.reset()
assert rt.speed == 3.0
class TestSyncClockRuntimeThreadSafety:
def test_concurrent_get_time(self):
rt = SyncClockRuntime()
results = []
errors = []
def _read():
try:
for _ in range(100):
t = rt.get_time()
results.append(t)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=_read) for _ in range(8)]
for th in threads:
th.start()
for th in threads:
th.join()
assert len(errors) == 0
assert len(results) == 800
def test_concurrent_pause_resume(self):
rt = SyncClockRuntime()
errors = []
def _toggle():
try:
for _ in range(50):
rt.pause()
rt.resume()
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=_toggle) for _ in range(4)]
for th in threads:
th.start()
for th in threads:
th.join()
assert len(errors) == 0

View File

@@ -0,0 +1 @@
"""End-to-end API tests for critical user flows."""

View File

@@ -0,0 +1,72 @@
"""Shared fixtures for end-to-end API tests.
Uses the real FastAPI app with a module-scoped TestClient to avoid
repeated lifespan startup/shutdown issues. Each test function gets
fresh, empty stores via the _clear_stores helper.
"""
import pytest
from wled_controller.config import get_config
# Resolve the API key from the real config (same key used in production tests)
_config = get_config()
API_KEY = next(iter(_config.auth.api_keys.values()), "")
AUTH_HEADERS = {"Authorization": f"Bearer {API_KEY}"}
@pytest.fixture(scope="session")
def _test_client():
"""Session-scoped TestClient to avoid lifespan re-entry issues.
The app's lifespan (MQTT, automation engine, health monitoring, etc.)
starts once for the entire e2e test session and shuts down after all
tests complete.
"""
from fastapi.testclient import TestClient
from wled_controller.main import app
with TestClient(app, raise_server_exceptions=False) as c:
yield c
@pytest.fixture
def client(_test_client):
"""Per-test client with auth headers and clean stores.
Clears all entity stores before each test so tests are independent.
"""
_clear_stores()
_test_client.headers["Authorization"] = f"Bearer {API_KEY}"
yield _test_client
# Clean up after test
_clear_stores()
def _clear_stores():
"""Remove all entities from all stores for test isolation."""
from wled_controller.api import dependencies as deps
store_clearers = [
(deps.get_device_store, "get_all_devices", "delete_device"),
(deps.get_output_target_store, "get_all_targets", "delete_target"),
(deps.get_color_strip_store, "get_all_sources", "delete_source"),
(deps.get_value_source_store, "get_all", "delete"),
(deps.get_sync_clock_store, "get_all", "delete"),
(deps.get_automation_store, "get_all", "delete"),
(deps.get_scene_preset_store, "get_all", "delete"),
]
for getter, list_method, delete_method in store_clearers:
try:
store = getter()
items = getattr(store, list_method)()
for item in items:
item_id = getattr(item, "id", getattr(item, "device_id", None))
if item_id:
try:
getattr(store, delete_method)(item_id)
except Exception:
pass
except RuntimeError:
pass # Store not initialized yet

View File

@@ -0,0 +1,125 @@
"""E2E: Authentication enforcement.
Tests that protected endpoints require valid auth, and public endpoints work
without auth.
Uses the `client` fixture (which has the correct auth header set), and
helpers to make unauthenticated requests by temporarily removing the header.
"""
import pytest
from tests.e2e.conftest import API_KEY
def _unauth_get(client, url):
"""Make a GET request without the Authorization header."""
saved = client.headers.pop("Authorization", None)
try:
return client.get(url)
finally:
if saved is not None:
client.headers["Authorization"] = saved
def _unauth_request(client, method, url, **kwargs):
"""Make a request without the Authorization header."""
saved = client.headers.pop("Authorization", None)
try:
return client.request(method, url, **kwargs)
finally:
if saved is not None:
client.headers["Authorization"] = saved
def _with_header(client, method, url, auth_value, **kwargs):
"""Make a request with a custom Authorization header."""
saved = client.headers.get("Authorization")
client.headers["Authorization"] = auth_value
try:
return client.request(method, url, **kwargs)
finally:
if saved is not None:
client.headers["Authorization"] = saved
else:
client.headers.pop("Authorization", None)
class TestAuthEnforcement:
"""Verify API key authentication is enforced correctly."""
def test_request_without_auth_returns_401(self, client):
"""Protected endpoint without Authorization header returns 401."""
resp = _unauth_get(client, "/api/v1/devices")
assert resp.status_code == 401
def test_request_with_wrong_key_returns_401(self, client):
"""Protected endpoint with an incorrect API key returns 401."""
resp = _with_header(
client, "GET", "/api/v1/devices",
auth_value="Bearer wrong-key-12345",
)
assert resp.status_code == 401
def test_request_with_correct_key_returns_200(self, client):
"""Protected endpoint with valid API key succeeds."""
resp = client.get("/api/v1/devices")
assert resp.status_code == 200
def test_health_endpoint_is_public(self, client):
"""Health check does not require authentication."""
resp = _unauth_get(client, "/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "healthy"
def test_version_endpoint_is_public(self, client):
"""Version endpoint does not require authentication."""
resp = _unauth_get(client, "/api/v1/version")
assert resp.status_code == 200
data = resp.json()
assert "version" in data
assert "api_version" in data
def test_post_without_auth_returns_401(self, client):
"""Creating a device without auth fails."""
resp = _unauth_request(
client, "POST", "/api/v1/devices",
json={
"name": "Unauthorized Device",
"url": "mock://test",
"device_type": "mock",
"led_count": 10,
},
)
assert resp.status_code == 401
def test_delete_without_auth_returns_401(self, client):
"""Deleting a device without auth fails."""
resp = _unauth_request(client, "DELETE", "/api/v1/devices/some_id")
assert resp.status_code == 401
def test_backup_without_auth_returns_401(self, client):
"""Backup endpoint requires authentication."""
resp = _unauth_get(client, "/api/v1/system/backup")
assert resp.status_code == 401
def test_color_strip_sources_without_auth_returns_401(self, client):
"""Color strip source listing requires authentication."""
resp = _unauth_get(client, "/api/v1/color-strip-sources")
assert resp.status_code == 401
def test_output_targets_without_auth_returns_401(self, client):
"""Output target listing requires authentication."""
resp = _unauth_get(client, "/api/v1/output-targets")
assert resp.status_code == 401
def test_malformed_bearer_token_returns_401_or_403(self, client):
"""A malformed Authorization header is rejected."""
resp = _with_header(
client, "GET", "/api/v1/devices",
auth_value="just-a-key",
)
# FastAPI's HTTPBearer returns 403 for malformed format,
# or 401 depending on auto_error setting. Accept either.
assert resp.status_code in (401, 403)

View File

@@ -0,0 +1,121 @@
"""E2E: Backup and restore flow.
Tests creating entities, backing up, deleting, then restoring from backup.
"""
import io
import json
import pytest
class TestBackupRestoreFlow:
"""A user backs up their configuration and restores it."""
def _create_device(self, client, name="Backup Device") -> str:
resp = client.post("/api/v1/devices", json={
"name": name,
"url": "mock://backup",
"device_type": "mock",
"led_count": 30,
})
assert resp.status_code == 201
return resp.json()["id"]
def _create_css(self, client, name="Backup CSS") -> str:
resp = client.post("/api/v1/color-strip-sources", json={
"name": name,
"source_type": "static",
"color": [255, 0, 0],
"led_count": 30,
})
assert resp.status_code == 201
return resp.json()["id"]
def test_backup_and_restore_roundtrip(self, client):
# 1. Create some entities
device_id = self._create_device(client, "Device for Backup")
css_id = self._create_css(client, "CSS for Backup")
# Verify entities exist
resp = client.get("/api/v1/devices")
assert resp.json()["count"] == 1
resp = client.get("/api/v1/color-strip-sources")
assert resp.json()["count"] == 1
# 2. Create a backup (GET returns a JSON file)
resp = client.get("/api/v1/system/backup")
assert resp.status_code == 200
backup_data = resp.json()
assert backup_data["meta"]["format"] == "ledgrab-backup"
assert "stores" in backup_data
assert "devices" in backup_data["stores"]
assert "color_strip_sources" in backup_data["stores"]
# Verify device is in the backup.
# Store files have structure: {"version": "...", "devices": {id: {...}}}
devices_store = backup_data["stores"]["devices"]
assert "devices" in devices_store
assert len(devices_store["devices"]) == 1
# 3. Delete all created entities
resp = client.delete(f"/api/v1/color-strip-sources/{css_id}")
assert resp.status_code == 204
resp = client.delete(f"/api/v1/devices/{device_id}")
assert resp.status_code == 204
# Verify they're gone
resp = client.get("/api/v1/devices")
assert resp.json()["count"] == 0
resp = client.get("/api/v1/color-strip-sources")
assert resp.json()["count"] == 0
# 4. Restore from backup (POST with the backup JSON as a file upload)
backup_bytes = json.dumps(backup_data).encode("utf-8")
resp = client.post(
"/api/v1/system/restore",
files={"file": ("backup.json", io.BytesIO(backup_bytes), "application/json")},
)
assert resp.status_code == 200, f"Restore failed: {resp.text}"
restore_result = resp.json()
assert restore_result["status"] == "restored"
assert restore_result["stores_written"] > 0
# 5. After restore, stores are written to disk but the in-memory
# stores haven't been re-loaded (normally a server restart does that).
# Verify the backup file was written correctly by reading it back.
# The restore endpoint writes JSON files; we check the response confirms success.
assert restore_result["restart_scheduled"] is True
def test_backup_contains_all_store_keys(self, client):
"""Backup response includes entries for all known store types."""
resp = client.get("/api/v1/system/backup")
assert resp.status_code == 200
stores = resp.json()["stores"]
# At minimum, these critical stores should be present
expected_keys = {
"devices", "output_targets", "color_strip_sources",
"capture_templates", "value_sources",
}
assert expected_keys.issubset(set(stores.keys()))
def test_restore_rejects_invalid_format(self, client):
"""Uploading a non-backup JSON file should fail validation."""
bad_data = json.dumps({"not": "a backup"}).encode("utf-8")
resp = client.post(
"/api/v1/system/restore",
files={"file": ("bad.json", io.BytesIO(bad_data), "application/json")},
)
assert resp.status_code == 400
def test_restore_rejects_empty_stores(self, client):
"""A backup with no recognized stores should fail."""
bad_backup = {
"meta": {"format": "ledgrab-backup", "format_version": 1},
"stores": {"unknown_store": {}},
}
resp = client.post(
"/api/v1/system/restore",
files={"file": ("bad.json", io.BytesIO(json.dumps(bad_backup).encode()), "application/json")},
)
assert resp.status_code == 400

View File

@@ -0,0 +1,154 @@
"""E2E: Color strip source CRUD lifecycle.
Tests creating, listing, updating, cloning, and deleting color strip sources.
"""
import pytest
class TestColorStripSourceLifecycle:
"""A user manages color strip sources for LED effects."""
def test_static_and_gradient_crud(self, client):
# 1. Create a static color strip source
resp = client.post("/api/v1/color-strip-sources", json={
"name": "Red Static",
"source_type": "static",
"color": [255, 0, 0],
"led_count": 60,
"tags": ["e2e", "static"],
})
assert resp.status_code == 201, f"Create static failed: {resp.text}"
static = resp.json()
static_id = static["id"]
assert static["name"] == "Red Static"
assert static["source_type"] == "static"
assert static["color"] == [255, 0, 0]
# 2. Create a gradient color strip source
resp = client.post("/api/v1/color-strip-sources", json={
"name": "Blue-Green Gradient",
"source_type": "gradient",
"stops": [
{"position": 0.0, "color": [0, 0, 255]},
{"position": 1.0, "color": [0, 255, 0]},
],
"led_count": 60,
})
assert resp.status_code == 201, f"Create gradient failed: {resp.text}"
gradient = resp.json()
gradient_id = gradient["id"]
assert gradient["name"] == "Blue-Green Gradient"
assert gradient["source_type"] == "gradient"
assert len(gradient["stops"]) == 2
# 3. List all -- should have both
resp = client.get("/api/v1/color-strip-sources")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 2
ids = {s["id"] for s in data["sources"]}
assert static_id in ids
assert gradient_id in ids
# 4. Update the static source -- change color
resp = client.put(
f"/api/v1/color-strip-sources/{static_id}",
json={"color": [0, 255, 0]},
)
assert resp.status_code == 200
assert resp.json()["color"] == [0, 255, 0]
# 5. Verify update via GET
resp = client.get(f"/api/v1/color-strip-sources/{static_id}")
assert resp.status_code == 200
assert resp.json()["color"] == [0, 255, 0]
# 6. Clone by creating another source with same data, different name
resp = client.post("/api/v1/color-strip-sources", json={
"name": "Cloned Static",
"source_type": "static",
"color": [0, 255, 0],
"led_count": 60,
})
assert resp.status_code == 201
clone_id = resp.json()["id"]
assert clone_id != static_id
assert resp.json()["name"] == "Cloned Static"
# 7. Delete all three
for sid in [static_id, gradient_id, clone_id]:
resp = client.delete(f"/api/v1/color-strip-sources/{sid}")
assert resp.status_code == 204
# 8. List should be empty
resp = client.get("/api/v1/color-strip-sources")
assert resp.status_code == 200
assert resp.json()["count"] == 0
def test_update_name(self, client):
"""Renaming a color strip source persists."""
resp = client.post("/api/v1/color-strip-sources", json={
"name": "Original Name",
"source_type": "static",
"color": [100, 100, 100],
"led_count": 10,
})
source_id = resp.json()["id"]
resp = client.put(
f"/api/v1/color-strip-sources/{source_id}",
json={"name": "New Name"},
)
assert resp.status_code == 200
assert resp.json()["name"] == "New Name"
def test_get_nonexistent_returns_404(self, client):
resp = client.get("/api/v1/color-strip-sources/nonexistent")
assert resp.status_code == 404
def test_delete_nonexistent_returns_404(self, client):
resp = client.delete("/api/v1/color-strip-sources/nonexistent")
assert resp.status_code == 404
def test_duplicate_name_rejected(self, client):
"""Cannot create two sources with the same name."""
payload = {
"name": "Unique Name",
"source_type": "static",
"color": [0, 0, 0],
"led_count": 10,
}
resp = client.post("/api/v1/color-strip-sources", json=payload)
assert resp.status_code == 201
resp = client.post("/api/v1/color-strip-sources", json=payload)
assert resp.status_code == 400 # duplicate name
def test_color_cycle_source(self, client):
"""Color cycle sources store and return their color list."""
resp = client.post("/api/v1/color-strip-sources", json={
"name": "Rainbow Cycle",
"source_type": "color_cycle",
"colors": [[255, 0, 0], [0, 255, 0], [0, 0, 255]],
"led_count": 30,
})
assert resp.status_code == 201
data = resp.json()
assert data["source_type"] == "color_cycle"
assert data["colors"] == [[255, 0, 0], [0, 255, 0], [0, 0, 255]]
def test_effect_source(self, client):
"""Effect sources store their effect parameters."""
resp = client.post("/api/v1/color-strip-sources", json={
"name": "Fire Effect",
"source_type": "effect",
"effect_type": "fire",
"palette": "fire",
"intensity": 1.5,
"led_count": 60,
})
assert resp.status_code == 201
data = resp.json()
assert data["source_type"] == "effect"
assert data["effect_type"] == "fire"

View File

@@ -0,0 +1,132 @@
"""E2E: Device management lifecycle.
Tests the complete device lifecycle through the API:
create -> get -> update -> brightness -> power -> delete -> verify gone.
"""
import pytest
class TestDeviceLifecycle:
"""A user creates a device, inspects it, modifies it, and deletes it."""
def test_full_device_crud_lifecycle(self, client):
# 1. List devices -- should be empty
resp = client.get("/api/v1/devices")
assert resp.status_code == 200
assert resp.json()["count"] == 0
# 2. Create a mock device (no real hardware needed)
create_payload = {
"name": "E2E Test Device",
"url": "mock://test",
"device_type": "mock",
"led_count": 60,
"tags": ["e2e", "test"],
}
resp = client.post("/api/v1/devices", json=create_payload)
assert resp.status_code == 201, f"Create failed: {resp.text}"
device = resp.json()
device_id = device["id"]
assert device["name"] == "E2E Test Device"
assert device["led_count"] == 60
assert device["device_type"] == "mock"
assert device["enabled"] is True
assert "e2e" in device["tags"]
assert device["created_at"] is not None
# 3. Get the device by ID -- verify all fields
resp = client.get(f"/api/v1/devices/{device_id}")
assert resp.status_code == 200
fetched = resp.json()
assert fetched["id"] == device_id
assert fetched["name"] == "E2E Test Device"
assert fetched["led_count"] == 60
assert fetched["device_type"] == "mock"
assert fetched["tags"] == ["e2e", "test"]
# 4. Update the device -- change name and led_count
resp = client.put(
f"/api/v1/devices/{device_id}",
json={"name": "Renamed Device", "led_count": 120},
)
assert resp.status_code == 200
updated = resp.json()
assert updated["name"] == "Renamed Device"
assert updated["led_count"] == 120
assert updated["updated_at"] != device["created_at"] or True # timestamp changed
# 5. Verify update persisted via GET
resp = client.get(f"/api/v1/devices/{device_id}")
assert resp.status_code == 200
assert resp.json()["name"] == "Renamed Device"
# 6. Delete the device
resp = client.delete(f"/api/v1/devices/{device_id}")
assert resp.status_code == 204
# 7. Verify device is gone
resp = client.get(f"/api/v1/devices/{device_id}")
assert resp.status_code == 404
# 8. List should be empty again
resp = client.get("/api/v1/devices")
assert resp.status_code == 200
assert resp.json()["count"] == 0
def test_create_multiple_devices_and_list(self, client):
"""Creating multiple devices shows all in the list."""
for i in range(3):
resp = client.post("/api/v1/devices", json={
"name": f"Device {i}",
"url": "mock://test",
"device_type": "mock",
"led_count": 30,
})
assert resp.status_code == 201
resp = client.get("/api/v1/devices")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 3
names = {d["name"] for d in data["devices"]}
assert names == {"Device 0", "Device 1", "Device 2"}
def test_get_nonexistent_device_returns_404(self, client):
resp = client.get("/api/v1/devices/nonexistent_id")
assert resp.status_code == 404
def test_delete_nonexistent_device_returns_404(self, client):
resp = client.delete("/api/v1/devices/nonexistent_id")
assert resp.status_code == 404
def test_update_nonexistent_device_returns_404(self, client):
resp = client.put(
"/api/v1/devices/nonexistent_id",
json={"name": "Ghost"},
)
assert resp.status_code == 404
def test_update_tags(self, client):
"""Tags can be updated independently."""
resp = client.post("/api/v1/devices", json={
"name": "Tag Device",
"url": "mock://test",
"device_type": "mock",
"led_count": 10,
"tags": ["original"],
})
device_id = resp.json()["id"]
resp = client.put(
f"/api/v1/devices/{device_id}",
json={"tags": ["updated", "twice"]},
)
assert resp.status_code == 200
assert resp.json()["tags"] == ["updated", "twice"]
def test_batch_device_states(self, client):
"""Batch states endpoint returns states for all devices."""
resp = client.get("/api/v1/devices/batch/states")
assert resp.status_code == 200
assert "states" in resp.json()

View File

@@ -0,0 +1,124 @@
"""E2E: Output target lifecycle.
Tests target CRUD with a dependency on a device:
create device -> create target -> list -> update -> delete target -> cleanup device.
"""
import pytest
class TestOutputTargetLifecycle:
"""A user wires up an output target to a device."""
def _create_device(self, client) -> str:
"""Helper: create a mock device and return its ID."""
resp = client.post("/api/v1/devices", json={
"name": "Target Test Device",
"url": "mock://target-test",
"device_type": "mock",
"led_count": 60,
})
assert resp.status_code == 201
return resp.json()["id"]
def test_full_target_crud_lifecycle(self, client):
device_id = self._create_device(client)
# 1. List targets -- should be empty
resp = client.get("/api/v1/output-targets")
assert resp.status_code == 200
assert resp.json()["count"] == 0
# 2. Create an output target referencing the device
create_payload = {
"name": "E2E Test Target",
"target_type": "led",
"device_id": device_id,
"fps": 30,
"protocol": "ddp",
"tags": ["e2e"],
}
resp = client.post("/api/v1/output-targets", json=create_payload)
assert resp.status_code == 201, f"Create failed: {resp.text}"
target = resp.json()
target_id = target["id"]
assert target["name"] == "E2E Test Target"
assert target["device_id"] == device_id
assert target["target_type"] == "led"
assert target["fps"] == 30
assert target["protocol"] == "ddp"
# 3. List targets -- should contain the new target
resp = client.get("/api/v1/output-targets")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 1
assert data["targets"][0]["id"] == target_id
# 4. Get target by ID
resp = client.get(f"/api/v1/output-targets/{target_id}")
assert resp.status_code == 200
assert resp.json()["name"] == "E2E Test Target"
# 5. Update the target -- change name and fps
resp = client.put(
f"/api/v1/output-targets/{target_id}",
json={"name": "Updated Target", "fps": 60},
)
assert resp.status_code == 200
updated = resp.json()
assert updated["name"] == "Updated Target"
assert updated["fps"] == 60
# 6. Verify update via GET
resp = client.get(f"/api/v1/output-targets/{target_id}")
assert resp.status_code == 200
assert resp.json()["name"] == "Updated Target"
# 7. Delete the target
resp = client.delete(f"/api/v1/output-targets/{target_id}")
assert resp.status_code == 204
# 8. Verify target is gone
resp = client.get(f"/api/v1/output-targets/{target_id}")
assert resp.status_code == 404
# 9. Clean up the device
resp = client.delete(f"/api/v1/devices/{device_id}")
assert resp.status_code == 204
def test_cannot_delete_device_referenced_by_target(self, client):
"""Deleting a device that has a target should return 409."""
device_id = self._create_device(client)
resp = client.post("/api/v1/output-targets", json={
"name": "Blocking Target",
"target_type": "led",
"device_id": device_id,
})
assert resp.status_code == 201
target_id = resp.json()["id"]
# Attempt to delete device -- should fail
resp = client.delete(f"/api/v1/devices/{device_id}")
assert resp.status_code == 409
assert "referenced" in resp.json()["detail"].lower()
# Clean up: delete target first, then device
resp = client.delete(f"/api/v1/output-targets/{target_id}")
assert resp.status_code == 204
resp = client.delete(f"/api/v1/devices/{device_id}")
assert resp.status_code == 204
def test_create_target_with_invalid_device_returns_422(self, client):
"""Creating a target with a non-existent device_id returns 422."""
resp = client.post("/api/v1/output-targets", json={
"name": "Orphan Target",
"target_type": "led",
"device_id": "nonexistent_device",
})
assert resp.status_code == 422
def test_get_nonexistent_target_returns_404(self, client):
resp = client.get("/api/v1/output-targets/nonexistent_id")
assert resp.status_code == 404

View File

View File

@@ -0,0 +1,255 @@
"""Tests for AutomationStore — CRUD, conditions, name uniqueness."""
import pytest
from wled_controller.storage.automation import (
AlwaysCondition,
ApplicationCondition,
Automation,
Condition,
DisplayStateCondition,
MQTTCondition,
StartupCondition,
SystemIdleCondition,
TimeOfDayCondition,
WebhookCondition,
)
from wled_controller.storage.automation_store import AutomationStore
@pytest.fixture
def store(tmp_path) -> AutomationStore:
return AutomationStore(str(tmp_path / "automations.json"))
# ---------------------------------------------------------------------------
# Condition models
# ---------------------------------------------------------------------------
class TestConditionModels:
def test_always_round_trip(self):
c = AlwaysCondition()
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, AlwaysCondition)
def test_application_round_trip(self):
c = ApplicationCondition(apps=["chrome.exe", "firefox.exe"], match_type="topmost")
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, ApplicationCondition)
assert restored.apps == ["chrome.exe", "firefox.exe"]
assert restored.match_type == "topmost"
def test_time_of_day_round_trip(self):
c = TimeOfDayCondition(start_time="22:00", end_time="06:00")
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, TimeOfDayCondition)
assert restored.start_time == "22:00"
assert restored.end_time == "06:00"
def test_system_idle_round_trip(self):
c = SystemIdleCondition(idle_minutes=10, when_idle=False)
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, SystemIdleCondition)
assert restored.idle_minutes == 10
assert restored.when_idle is False
def test_display_state_round_trip(self):
c = DisplayStateCondition(state="off")
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, DisplayStateCondition)
assert restored.state == "off"
def test_mqtt_round_trip(self):
c = MQTTCondition(topic="home/tv", payload="on", match_mode="contains")
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, MQTTCondition)
assert restored.topic == "home/tv"
assert restored.match_mode == "contains"
def test_webhook_round_trip(self):
c = WebhookCondition(token="abc123")
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, WebhookCondition)
assert restored.token == "abc123"
def test_startup_round_trip(self):
c = StartupCondition()
data = c.to_dict()
restored = Condition.from_dict(data)
assert isinstance(restored, StartupCondition)
def test_unknown_condition_type_raises(self):
with pytest.raises(ValueError, match="Unknown condition type"):
Condition.from_dict({"condition_type": "nonexistent"})
# ---------------------------------------------------------------------------
# Automation model
# ---------------------------------------------------------------------------
class TestAutomationModel:
def test_round_trip(self, make_automation):
auto = make_automation(
name="Test Auto",
conditions=[AlwaysCondition(), WebhookCondition(token="tok1")],
scene_preset_id="sp_123",
deactivation_mode="revert",
)
data = auto.to_dict()
restored = Automation.from_dict(data)
assert restored.id == auto.id
assert restored.name == "Test Auto"
assert len(restored.conditions) == 2
assert isinstance(restored.conditions[0], AlwaysCondition)
assert isinstance(restored.conditions[1], WebhookCondition)
assert restored.scene_preset_id == "sp_123"
assert restored.deactivation_mode == "revert"
def test_from_dict_skips_unknown_conditions(self):
data = {
"id": "a1",
"name": "Skip",
"enabled": True,
"condition_logic": "or",
"conditions": [
{"condition_type": "always"},
{"condition_type": "future_unknown"},
],
"scene_preset_id": None,
"deactivation_mode": "none",
"deactivation_scene_preset_id": None,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
auto = Automation.from_dict(data)
assert len(auto.conditions) == 1 # unknown was skipped
# ---------------------------------------------------------------------------
# AutomationStore CRUD
# ---------------------------------------------------------------------------
class TestAutomationStoreCRUD:
def test_create(self, store):
a = store.create_automation(name="Auto A")
assert a.id.startswith("auto_")
assert a.name == "Auto A"
assert a.enabled is True
assert a.condition_logic == "or"
assert store.count() == 1
def test_create_with_conditions(self, store):
conditions = [
AlwaysCondition(),
WebhookCondition(token="secret123"),
]
a = store.create_automation(
name="Full",
enabled=False,
condition_logic="and",
conditions=conditions,
scene_preset_id="sp_001",
deactivation_mode="fallback_scene",
deactivation_scene_preset_id="sp_002",
tags=["test"],
)
assert a.enabled is False
assert a.condition_logic == "and"
assert len(a.conditions) == 2
assert a.scene_preset_id == "sp_001"
assert a.tags == ["test"]
def test_get_all(self, store):
store.create_automation("A")
store.create_automation("B")
assert len(store.get_all_automations()) == 2
def test_get(self, store):
created = store.create_automation("Get")
got = store.get_automation(created.id)
assert got.name == "Get"
def test_delete(self, store):
a = store.create_automation("Del")
store.delete_automation(a.id)
assert store.count() == 0
def test_delete_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.delete_automation("nope")
def test_update(self, store):
a = store.create_automation(name="Old", enabled=True)
updated = store.update_automation(a.id, name="New", enabled=False)
assert updated.name == "New"
assert updated.enabled is False
def test_update_conditions(self, store):
a = store.create_automation(name="Conds")
new_conds = [ApplicationCondition(apps=["notepad.exe"])]
updated = store.update_automation(a.id, conditions=new_conds)
assert len(updated.conditions) == 1
assert isinstance(updated.conditions[0], ApplicationCondition)
def test_update_scene_preset_id_clear(self, store):
a = store.create_automation(name="SP", scene_preset_id="sp_1")
updated = store.update_automation(a.id, scene_preset_id="")
assert updated.scene_preset_id is None
def test_update_partial(self, store):
a = store.create_automation(name="Partial", enabled=True, tags=["orig"])
updated = store.update_automation(a.id, tags=["new"])
assert updated.name == "Partial"
assert updated.enabled is True
assert updated.tags == ["new"]
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------
class TestAutomationNameUniqueness:
def test_duplicate_name_create(self, store):
store.create_automation("Dup")
with pytest.raises(ValueError, match="already exists"):
store.create_automation("Dup")
def test_duplicate_name_update(self, store):
store.create_automation("First")
a2 = store.create_automation("Second")
with pytest.raises(ValueError, match="already exists"):
store.update_automation(a2.id, name="First")
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
class TestAutomationPersistence:
def test_persist_and_reload(self, tmp_path):
path = str(tmp_path / "auto_persist.json")
s1 = AutomationStore(path)
a = s1.create_automation(
name="Persist",
conditions=[WebhookCondition(token="t1")],
)
aid = a.id
s2 = AutomationStore(path)
loaded = s2.get_automation(aid)
assert loaded.name == "Persist"
assert len(loaded.conditions) == 1
assert isinstance(loaded.conditions[0], WebhookCondition)

View File

@@ -0,0 +1,314 @@
"""Tests for BaseJsonStore — the shared data-layer base class."""
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
import pytest
from wled_controller.storage.base_store import BaseJsonStore, EntityNotFoundError
# ---------------------------------------------------------------------------
# Minimal concrete store for testing the base class
# ---------------------------------------------------------------------------
@dataclass
class _Item:
id: str
name: str
value: int = 0
def to_dict(self) -> dict:
return {"id": self.id, "name": self.name, "value": self.value}
@staticmethod
def from_dict(data: dict) -> "_Item":
return _Item(id=data["id"], name=data["name"], value=data.get("value", 0))
class _TestStore(BaseJsonStore[_Item]):
_json_key = "items"
_entity_name = "Item"
def __init__(self, file_path: str):
super().__init__(file_path, _Item.from_dict)
def add(self, item: _Item) -> None:
with self._lock:
self._check_name_unique(item.name)
self._items[item.id] = item
self._save()
class _LegacyStore(BaseJsonStore[_Item]):
"""Store that supports legacy JSON keys for migration testing."""
_json_key = "items_v2"
_entity_name = "Item"
_legacy_json_keys = ["items_v1", "old_items"]
def __init__(self, file_path: str):
super().__init__(file_path, _Item.from_dict)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def store_file(tmp_path) -> Path:
return tmp_path / "test_store.json"
@pytest.fixture
def store(store_file) -> _TestStore:
return _TestStore(str(store_file))
# ---------------------------------------------------------------------------
# Initialization
# ---------------------------------------------------------------------------
class TestInit:
def test_empty_init(self, store):
assert store.count() == 0
assert store.get_all() == []
def test_file_not_found_starts_empty(self, tmp_path):
s = _TestStore(str(tmp_path / "missing.json"))
assert s.count() == 0
def test_load_from_existing_file(self, store_file):
data = {
"version": "1.0.0",
"items": {
"a": {"id": "a", "name": "Alpha", "value": 1},
"b": {"id": "b", "name": "Beta", "value": 2},
},
}
store_file.write_text(json.dumps(data), encoding="utf-8")
s = _TestStore(str(store_file))
assert s.count() == 2
assert s.get("a").name == "Alpha"
assert s.get("b").value == 2
def test_load_skips_corrupt_items(self, store_file):
"""Items that fail deserialization are skipped, not fatal."""
data = {
"version": "1.0.0",
"items": {
"good": {"id": "good", "name": "OK"},
"bad": {"missing_required": True},
},
}
store_file.write_text(json.dumps(data), encoding="utf-8")
s = _TestStore(str(store_file))
assert s.count() == 1
assert s.get("good").name == "OK"
def test_load_corrupt_json_raises(self, store_file):
"""Completely invalid JSON file raises on load."""
store_file.write_text("{bad json", encoding="utf-8")
with pytest.raises(Exception):
_TestStore(str(store_file))
# ---------------------------------------------------------------------------
# CRUD operations
# ---------------------------------------------------------------------------
class TestCRUD:
def test_get_all_returns_list(self, store):
store.add(_Item(id="x", name="X"))
items = store.get_all()
assert isinstance(items, list)
assert len(items) == 1
def test_get_existing(self, store):
store.add(_Item(id="x", name="X", value=42))
item = store.get("x")
assert item.id == "x"
assert item.value == 42
def test_get_not_found_raises(self, store):
with pytest.raises(EntityNotFoundError, match="not found"):
store.get("nonexistent")
def test_delete_existing(self, store):
store.add(_Item(id="x", name="X"))
store.delete("x")
assert store.count() == 0
def test_delete_not_found_raises(self, store):
with pytest.raises(EntityNotFoundError, match="not found"):
store.delete("nonexistent")
def test_count(self, store):
assert store.count() == 0
store.add(_Item(id="a", name="A"))
assert store.count() == 1
store.add(_Item(id="b", name="B"))
assert store.count() == 2
store.delete("a")
assert store.count() == 1
# ---------------------------------------------------------------------------
# Persistence (save/load round-trip)
# ---------------------------------------------------------------------------
class TestPersistence:
def test_save_and_reload(self, store_file):
s1 = _TestStore(str(store_file))
s1.add(_Item(id="p1", name="Persisted", value=99))
# Load fresh from the same file
s2 = _TestStore(str(store_file))
assert s2.count() == 1
assert s2.get("p1").value == 99
def test_delete_persists(self, store_file):
s1 = _TestStore(str(store_file))
s1.add(_Item(id="del", name="ToDelete"))
s1.delete("del")
s2 = _TestStore(str(store_file))
assert s2.count() == 0
def test_json_file_structure(self, store, store_file):
store.add(_Item(id="s1", name="Struct", value=7))
raw = json.loads(store_file.read_text(encoding="utf-8"))
assert "version" in raw
assert "items" in raw
assert raw["items"]["s1"]["name"] == "Struct"
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------
class TestNameUniqueness:
def test_duplicate_name_raises(self, store):
store.add(_Item(id="a", name="Unique"))
with pytest.raises(ValueError, match="already exists"):
store.add(_Item(id="b", name="Unique"))
def test_different_names_ok(self, store):
store.add(_Item(id="a", name="Alpha"))
store.add(_Item(id="b", name="Beta"))
assert store.count() == 2
def test_empty_name_raises(self, store):
with pytest.raises(ValueError, match="required"):
store._check_name_unique("")
def test_whitespace_name_raises(self, store):
with pytest.raises(ValueError, match="required"):
store._check_name_unique(" ")
def test_exclude_id_allows_self(self, store):
store.add(_Item(id="a", name="Alpha"))
# Checking uniqueness for a rename of item "a" — should not conflict with itself
store._check_name_unique("Alpha", exclude_id="a") # should not raise
# ---------------------------------------------------------------------------
# Thread safety
# ---------------------------------------------------------------------------
class TestThreadSafety:
def test_concurrent_reads(self, store):
for i in range(20):
store.add(_Item(id=f"t{i}", name=f"Thread {i}"))
results = []
def _read():
return store.count()
with ThreadPoolExecutor(max_workers=8) as pool:
futures = [pool.submit(_read) for _ in range(50)]
results = [f.result() for f in as_completed(futures)]
assert all(r == 20 for r in results)
def test_concurrent_add_and_read(self, tmp_path):
"""Concurrent adds should not lose items or corrupt state."""
s = _TestStore(str(tmp_path / "concurrent.json"))
errors = []
def _add(index):
try:
s.add(_Item(id=f"c{index}", name=f"Conc {index}"))
except Exception as e:
errors.append(e)
with ThreadPoolExecutor(max_workers=8) as pool:
futures = [pool.submit(_add, i) for i in range(30)]
for f in as_completed(futures):
f.result()
assert len(errors) == 0
assert s.count() == 30
# ---------------------------------------------------------------------------
# Legacy key migration
# ---------------------------------------------------------------------------
class TestLegacyKeyMigration:
def test_loads_from_legacy_key(self, store_file):
data = {
"version": "1.0.0",
"items_v1": {
"old1": {"id": "old1", "name": "Legacy"},
},
}
store_file.write_text(json.dumps(data), encoding="utf-8")
s = _LegacyStore(str(store_file))
assert s.count() == 1
assert s.get("old1").name == "Legacy"
def test_primary_key_takes_precedence(self, store_file):
data = {
"version": "1.0.0",
"items_v2": {"new": {"id": "new", "name": "Primary"}},
"items_v1": {"old": {"id": "old", "name": "Legacy"}},
}
store_file.write_text(json.dumps(data), encoding="utf-8")
s = _LegacyStore(str(store_file))
assert s.count() == 1
assert s.get("new").name == "Primary"
# ---------------------------------------------------------------------------
# Async delete
# ---------------------------------------------------------------------------
class TestAsyncDelete:
@pytest.mark.asyncio
async def test_async_delete(self, store):
store.add(_Item(id="ad", name="AsyncDel"))
assert store.count() == 1
await store.async_delete("ad")
assert store.count() == 0
@pytest.mark.asyncio
async def test_async_delete_not_found(self, store):
with pytest.raises(EntityNotFoundError, match="not found"):
await store.async_delete("nope")

View File

@@ -0,0 +1,285 @@
"""Tests for DeviceStore — device CRUD, persistence, name uniqueness, thread safety."""
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pytest
from wled_controller.storage.device_store import Device, DeviceStore
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def temp_storage(tmp_path) -> Path:
return tmp_path / "devices.json"
@pytest.fixture
def store(temp_storage) -> DeviceStore:
return DeviceStore(temp_storage)
# ---------------------------------------------------------------------------
# Device model
# ---------------------------------------------------------------------------
class TestDeviceModel:
def test_creation_defaults(self):
d = Device(device_id="d1", name="D", url="http://1.2.3.4", led_count=100)
assert d.id == "d1"
assert d.enabled is True
assert d.device_type == "wled"
assert d.software_brightness == 255
assert d.rgbw is False
assert d.zone_mode == "combined"
assert d.tags == []
def test_creation_with_all_fields(self):
d = Device(
device_id="d2",
name="Full",
url="http://1.2.3.4",
led_count=300,
enabled=False,
device_type="adalight",
baud_rate=115200,
software_brightness=128,
auto_shutdown=True,
send_latency_ms=10,
rgbw=True,
zone_mode="individual",
tags=["living", "tv"],
dmx_protocol="sacn",
dmx_start_universe=1,
dmx_start_channel=5,
)
assert d.enabled is False
assert d.baud_rate == 115200
assert d.rgbw is True
assert d.tags == ["living", "tv"]
assert d.dmx_protocol == "sacn"
def test_to_dict_round_trip(self):
original = Device(
device_id="rt1",
name="RoundTrip",
url="http://10.0.0.1",
led_count=60,
rgbw=True,
tags=["test"],
)
data = original.to_dict()
restored = Device.from_dict(data)
assert restored.id == original.id
assert restored.name == original.name
assert restored.url == original.url
assert restored.led_count == original.led_count
assert restored.rgbw == original.rgbw
assert restored.tags == original.tags
def test_to_dict_omits_defaults(self):
"""Fields at their default value should be omitted from to_dict for compactness."""
d = Device(device_id="d", name="D", url="http://x", led_count=10)
data = d.to_dict()
assert "baud_rate" not in data
assert "rgbw" not in data
assert "tags" not in data
def test_to_dict_includes_non_defaults(self):
d = Device(
device_id="d", name="D", url="http://x", led_count=10,
rgbw=True, tags=["a"], software_brightness=100,
)
data = d.to_dict()
assert data["rgbw"] is True
assert data["tags"] == ["a"]
assert data["software_brightness"] == 100
def test_from_dict_missing_optional_fields(self):
"""from_dict should handle minimal data gracefully."""
data = {"id": "m1", "name": "Minimal", "url": "http://x", "led_count": 10}
d = Device.from_dict(data)
assert d.enabled is True
assert d.device_type == "wled"
assert d.tags == []
# ---------------------------------------------------------------------------
# DeviceStore CRUD
# ---------------------------------------------------------------------------
class TestDeviceStoreCRUD:
def test_init_empty(self, store):
assert store.count() == 0
def test_create_device(self, store):
d = store.create_device(name="Test", url="http://1.2.3.4", led_count=100)
assert d.id.startswith("device_")
assert d.name == "Test"
assert store.count() == 1
def test_create_device_with_options(self, store):
d = store.create_device(
name="Full",
url="http://1.2.3.4",
led_count=200,
device_type="adalight",
baud_rate=115200,
auto_shutdown=True,
rgbw=True,
tags=["bedroom"],
)
assert d.device_type == "adalight"
assert d.baud_rate == 115200
assert d.auto_shutdown is True
assert d.rgbw is True
assert d.tags == ["bedroom"]
def test_create_mock_device_url(self, store):
d = store.create_device(
name="MockDev", url="http://whatever", led_count=10, device_type="mock"
)
assert d.url.startswith("mock://")
def test_get_device(self, store):
created = store.create_device(name="Get", url="http://x", led_count=50)
got = store.get_device(created.id)
assert got.name == "Get"
assert got.led_count == 50
def test_get_device_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.get_device("no_such_id")
def test_get_all_devices(self, store):
store.create_device("A", "http://a", 10)
store.create_device("B", "http://b", 20)
all_devices = store.get_all_devices()
assert len(all_devices) == 2
names = {d.name for d in all_devices}
assert names == {"A", "B"}
def test_update_device(self, store):
d = store.create_device(name="Old", url="http://x", led_count=100)
updated = store.update_device(d.id, name="New", led_count=200)
assert updated.name == "New"
assert updated.led_count == 200
assert updated.id == d.id
def test_update_device_ignores_none(self, store):
d = store.create_device(name="Keep", url="http://x", led_count=100)
updated = store.update_device(d.id, name=None, led_count=200)
assert updated.name == "Keep"
assert updated.led_count == 200
def test_update_device_ignores_unknown_fields(self, store):
d = store.create_device(name="Unk", url="http://x", led_count=100)
updated = store.update_device(d.id, bogus_field="ignored")
assert updated.name == "Unk"
def test_update_device_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.update_device("missing", name="X")
def test_delete_device(self, store):
d = store.create_device(name="Del", url="http://x", led_count=50)
store.delete_device(d.id)
assert store.count() == 0
with pytest.raises(ValueError, match="not found"):
store.get_device(d.id)
def test_delete_device_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.delete_device("missing")
def test_device_exists(self, store):
d = store.create_device(name="E", url="http://x", led_count=10)
assert store.device_exists(d.id) is True
assert store.device_exists("nope") is False
def test_clear(self, store):
store.create_device("A", "http://a", 10)
store.create_device("B", "http://b", 20)
store.clear()
assert store.count() == 0
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------
class TestDeviceNameUniqueness:
def test_duplicate_name_on_create(self, store):
store.create_device(name="Same", url="http://a", led_count=10)
with pytest.raises(ValueError, match="already exists"):
store.create_device(name="Same", url="http://b", led_count=10)
def test_duplicate_name_on_update(self, store):
store.create_device(name="First", url="http://a", led_count=10)
d2 = store.create_device(name="Second", url="http://b", led_count=10)
with pytest.raises(ValueError, match="already exists"):
store.update_device(d2.id, name="First")
def test_rename_to_own_name_ok(self, store):
d = store.create_device(name="Self", url="http://a", led_count=10)
updated = store.update_device(d.id, name="Self", led_count=99)
assert updated.led_count == 99
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
class TestDevicePersistence:
def test_persistence_across_instances(self, temp_storage):
s1 = DeviceStore(temp_storage)
d = s1.create_device(name="Persist", url="http://p", led_count=77)
did = d.id
s2 = DeviceStore(temp_storage)
loaded = s2.get_device(did)
assert loaded.name == "Persist"
assert loaded.led_count == 77
def test_update_persists(self, temp_storage):
s1 = DeviceStore(temp_storage)
d = s1.create_device(name="Before", url="http://x", led_count=10)
s1.update_device(d.id, name="After")
s2 = DeviceStore(temp_storage)
assert s2.get_device(d.id).name == "After"
# ---------------------------------------------------------------------------
# Thread safety
# ---------------------------------------------------------------------------
class TestDeviceThreadSafety:
def test_concurrent_creates(self, tmp_path):
s = DeviceStore(tmp_path / "conc.json")
errors = []
def _create(i):
try:
s.create_device(name=f"Dev {i}", url=f"http://{i}", led_count=10)
except Exception as e:
errors.append(e)
with ThreadPoolExecutor(max_workers=8) as pool:
futures = [pool.submit(_create, i) for i in range(25)]
for f in as_completed(futures):
f.result()
assert len(errors) == 0
assert s.count() == 25

View File

@@ -0,0 +1,211 @@
"""Tests for OutputTargetStore — CRUD for LED and key_colors targets."""
import pytest
from wled_controller.storage.output_target import OutputTarget
from wled_controller.storage.output_target_store import OutputTargetStore
from wled_controller.storage.wled_output_target import WledOutputTarget
from wled_controller.storage.key_colors_output_target import (
KeyColorsOutputTarget,
KeyColorsSettings,
)
@pytest.fixture
def store(tmp_path) -> OutputTargetStore:
return OutputTargetStore(str(tmp_path / "output_targets.json"))
# ---------------------------------------------------------------------------
# OutputTarget model dispatching
# ---------------------------------------------------------------------------
class TestOutputTargetModel:
def test_led_from_dict(self):
data = {
"id": "pt_1",
"name": "LED Target",
"target_type": "led",
"device_id": "dev_1",
"color_strip_source_id": "css_1",
"fps": 30,
"protocol": "ddp",
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
target = OutputTarget.from_dict(data)
assert isinstance(target, WledOutputTarget)
assert target.device_id == "dev_1"
def test_key_colors_from_dict(self):
data = {
"id": "pt_2",
"name": "KC Target",
"target_type": "key_colors",
"picture_source_id": "ps_1",
"settings": {},
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
target = OutputTarget.from_dict(data)
assert isinstance(target, KeyColorsOutputTarget)
def test_unknown_type_raises(self):
data = {
"id": "pt_3",
"name": "Bad",
"target_type": "nonexistent",
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
with pytest.raises(ValueError, match="Unknown target type"):
OutputTarget.from_dict(data)
# ---------------------------------------------------------------------------
# OutputTargetStore CRUD
# ---------------------------------------------------------------------------
class TestOutputTargetStoreCRUD:
def test_create_led_target(self, store):
t = store.create_target(
name="LED 1",
target_type="led",
device_id="dev_1",
color_strip_source_id="css_1",
fps=60,
protocol="ddp",
)
assert t.id.startswith("pt_")
assert isinstance(t, WledOutputTarget)
assert t.name == "LED 1"
assert store.count() == 1
def test_create_key_colors_target(self, store):
t = store.create_target(
name="KC 1",
target_type="key_colors",
picture_source_id="ps_1",
)
assert isinstance(t, KeyColorsOutputTarget)
assert t.picture_source_id == "ps_1"
def test_create_invalid_type(self, store):
with pytest.raises(ValueError, match="Invalid target type"):
store.create_target(name="Bad", target_type="invalid")
def test_get_all(self, store):
store.create_target("A", "led")
store.create_target("B", "led")
assert len(store.get_all_targets()) == 2
def test_get(self, store):
created = store.create_target("Get", "led")
got = store.get_target(created.id)
assert got.name == "Get"
def test_delete(self, store):
t = store.create_target("Del", "led")
store.delete_target(t.id)
assert store.count() == 0
def test_delete_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.delete_target("nope")
# ---------------------------------------------------------------------------
# Update
# ---------------------------------------------------------------------------
class TestOutputTargetUpdate:
def test_update_name(self, store):
t = store.create_target("Old", "led")
updated = store.update_target(t.id, name="New")
assert updated.name == "New"
def test_update_led_fields(self, store):
t = store.create_target("LED", "led", fps=30, protocol="ddp")
updated = store.update_target(t.id, fps=60, protocol="drgb")
assert isinstance(updated, WledOutputTarget)
assert updated.fps == 60
assert updated.protocol == "drgb"
def test_update_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.update_target("nope", name="X")
def test_update_tags(self, store):
t = store.create_target("Tags", "led", tags=["old"])
updated = store.update_target(t.id, tags=["new", "tags"])
assert updated.tags == ["new", "tags"]
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------
class TestOutputTargetNameUniqueness:
def test_duplicate_name_create(self, store):
store.create_target("Same", "led")
with pytest.raises(ValueError, match="already exists"):
store.create_target("Same", "led")
def test_duplicate_name_update(self, store):
store.create_target("First", "led")
t2 = store.create_target("Second", "led")
with pytest.raises(ValueError, match="already exists"):
store.update_target(t2.id, name="First")
# ---------------------------------------------------------------------------
# Query helpers
# ---------------------------------------------------------------------------
class TestOutputTargetQueries:
def test_get_targets_for_device(self, store):
store.create_target("T1", "led", device_id="dev_a")
store.create_target("T2", "led", device_id="dev_b")
store.create_target("T3", "led", device_id="dev_a")
results = store.get_targets_for_device("dev_a")
assert len(results) == 2
assert all(isinstance(t, WledOutputTarget) for t in results)
def test_get_targets_for_device_empty(self, store):
assert store.get_targets_for_device("nonexistent") == []
def test_get_targets_referencing_css(self, store):
store.create_target("T1", "led", color_strip_source_id="css_x")
store.create_target("T2", "led", color_strip_source_id="css_y")
names = store.get_targets_referencing_css("css_x")
assert names == ["T1"]
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
class TestOutputTargetPersistence:
def test_persist_and_reload(self, tmp_path):
path = str(tmp_path / "ot_persist.json")
s1 = OutputTargetStore(path)
t = s1.create_target(
"Persist", "led",
device_id="dev_1",
fps=60,
tags=["tv"],
)
tid = t.id
s2 = OutputTargetStore(path)
loaded = s2.get_target(tid)
assert loaded.name == "Persist"
assert isinstance(loaded, WledOutputTarget)
assert loaded.tags == ["tv"]

View File

@@ -0,0 +1,160 @@
"""Tests for SyncClockStore — CRUD, speed clamping, name uniqueness."""
import pytest
from wled_controller.storage.sync_clock import SyncClock
from wled_controller.storage.sync_clock_store import SyncClockStore
@pytest.fixture
def store(tmp_path) -> SyncClockStore:
return SyncClockStore(str(tmp_path / "sync_clocks.json"))
# ---------------------------------------------------------------------------
# SyncClock model
# ---------------------------------------------------------------------------
class TestSyncClockModel:
def test_to_dict_round_trip(self, make_sync_clock):
clock = make_sync_clock(name="RT", speed=2.5, description="test", tags=["a"])
data = clock.to_dict()
restored = SyncClock.from_dict(data)
assert restored.id == clock.id
assert restored.name == "RT"
assert restored.speed == 2.5
assert restored.description == "test"
assert restored.tags == ["a"]
def test_from_dict_defaults(self):
data = {
"id": "sc_1",
"name": "Default",
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
clock = SyncClock.from_dict(data)
assert clock.speed == 1.0
assert clock.description is None
assert clock.tags == []
# ---------------------------------------------------------------------------
# SyncClockStore CRUD
# ---------------------------------------------------------------------------
class TestSyncClockStoreCRUD:
def test_create_clock(self, store):
c = store.create_clock(name="Clock A")
assert c.id.startswith("sc_")
assert c.name == "Clock A"
assert c.speed == 1.0
assert store.count() == 1
def test_create_clock_with_options(self, store):
c = store.create_clock(
name="Fast", speed=5.0, description="speedy", tags=["anim"]
)
assert c.speed == 5.0
assert c.description == "speedy"
assert c.tags == ["anim"]
def test_get_clock(self, store):
created = store.create_clock(name="Get")
got = store.get_clock(created.id)
assert got.name == "Get"
def test_get_all_clocks(self, store):
store.create_clock("A")
store.create_clock("B")
assert len(store.get_all_clocks()) == 2
def test_delete_clock(self, store):
c = store.create_clock("Del")
store.delete_clock(c.id)
assert store.count() == 0
def test_delete_not_found(self, store):
with pytest.raises(ValueError, match="not found"):
store.delete_clock("nope")
def test_update_clock(self, store):
c = store.create_clock(name="Old", speed=1.0)
updated = store.update_clock(c.id, name="New", speed=3.0)
assert updated.name == "New"
assert updated.speed == 3.0
def test_update_clock_partial(self, store):
c = store.create_clock(name="Keep", speed=2.0, description="orig")
updated = store.update_clock(c.id, speed=4.0)
assert updated.name == "Keep"
assert updated.speed == 4.0
assert updated.description == "orig"
# ---------------------------------------------------------------------------
# Speed clamping
# ---------------------------------------------------------------------------
class TestSpeedClamping:
def test_create_clamps_low(self, store):
c = store.create_clock(name="Low", speed=0.01)
assert c.speed == 0.1
def test_create_clamps_high(self, store):
c = store.create_clock(name="High", speed=100.0)
assert c.speed == 10.0
def test_update_clamps_low(self, store):
c = store.create_clock(name="UL", speed=1.0)
updated = store.update_clock(c.id, speed=-5.0)
assert updated.speed == 0.1
def test_update_clamps_high(self, store):
c = store.create_clock(name="UH", speed=1.0)
updated = store.update_clock(c.id, speed=999.0)
assert updated.speed == 10.0
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------
class TestSyncClockNameUniqueness:
def test_duplicate_name_create(self, store):
store.create_clock("Same")
with pytest.raises(ValueError, match="already exists"):
store.create_clock("Same")
def test_duplicate_name_update(self, store):
store.create_clock("First")
c2 = store.create_clock("Second")
with pytest.raises(ValueError, match="already exists"):
store.update_clock(c2.id, name="First")
def test_rename_to_own_name_ok(self, store):
c = store.create_clock("Self")
updated = store.update_clock(c.id, name="Self", speed=9.0)
assert updated.speed == 9.0
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
class TestSyncClockPersistence:
def test_persist_and_reload(self, tmp_path):
path = str(tmp_path / "sc_persist.json")
s1 = SyncClockStore(path)
c = s1.create_clock(name="Persist", speed=2.5)
cid = c.id
s2 = SyncClockStore(path)
loaded = s2.get_clock(cid)
assert loaded.name == "Persist"
assert loaded.speed == 2.5

View File

@@ -0,0 +1,259 @@
"""Tests for ValueSourceStore — CRUD for all source types."""
import pytest
from wled_controller.storage.value_source import (
AdaptiveValueSource,
AnimatedValueSource,
AudioValueSource,
DaylightValueSource,
StaticValueSource,
ValueSource,
)
from wled_controller.storage.value_source_store import ValueSourceStore
@pytest.fixture
def store(tmp_path) -> ValueSourceStore:
return ValueSourceStore(str(tmp_path / "value_sources.json"))
# ---------------------------------------------------------------------------
# ValueSource model round-trips
# ---------------------------------------------------------------------------
class TestValueSourceModels:
def test_static_round_trip(self):
data = {
"id": "vs_1",
"name": "Static",
"source_type": "static",
"value": 0.75,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, StaticValueSource)
assert src.value == 0.75
restored = ValueSource.from_dict(src.to_dict())
assert restored.value == 0.75
def test_animated_round_trip(self):
data = {
"id": "vs_2",
"name": "Wave",
"source_type": "animated",
"waveform": "triangle",
"speed": 30.0,
"min_value": 0.2,
"max_value": 0.8,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, AnimatedValueSource)
assert src.waveform == "triangle"
assert src.speed == 30.0
def test_audio_round_trip(self):
data = {
"id": "vs_3",
"name": "Audio",
"source_type": "audio",
"audio_source_id": "as_1",
"mode": "peak",
"sensitivity": 2.0,
"smoothing": 0.5,
"auto_gain": True,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, AudioValueSource)
assert src.mode == "peak"
assert src.auto_gain is True
def test_adaptive_time_round_trip(self):
data = {
"id": "vs_4",
"name": "Time",
"source_type": "adaptive_time",
"schedule": [
{"time": "00:00", "value": 0.1},
{"time": "12:00", "value": 1.0},
],
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, AdaptiveValueSource)
assert len(src.schedule) == 2
def test_adaptive_scene_round_trip(self):
data = {
"id": "vs_5",
"name": "Scene",
"source_type": "adaptive_scene",
"picture_source_id": "ps_1",
"scene_behavior": "match",
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, AdaptiveValueSource)
assert src.scene_behavior == "match"
def test_daylight_round_trip(self):
data = {
"id": "vs_6",
"name": "Daylight",
"source_type": "daylight",
"speed": 2.0,
"use_real_time": True,
"latitude": 55.0,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, DaylightValueSource)
assert src.use_real_time is True
assert src.latitude == 55.0
def test_unknown_type_defaults_to_static(self):
data = {
"id": "vs_u",
"name": "Unknown",
"source_type": "unknown_future",
"value": 0.5,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, StaticValueSource)
# ---------------------------------------------------------------------------
# ValueSourceStore CRUD
# ---------------------------------------------------------------------------
class TestValueSourceStoreCRUD:
def test_create_static(self, store):
s = store.create_source(name="S1", source_type="static", value=0.5)
assert s.id.startswith("vs_")
assert isinstance(s, StaticValueSource)
assert s.value == 0.5
assert store.count() == 1
def test_create_animated(self, store):
s = store.create_source(
name="A1", source_type="animated",
waveform="sawtooth", speed=20.0,
)
assert isinstance(s, AnimatedValueSource)
assert s.waveform == "sawtooth"
assert s.speed == 20.0
def test_create_audio(self, store):
s = store.create_source(
name="Au1", source_type="audio",
audio_source_id="as_1", mode="beat",
)
assert isinstance(s, AudioValueSource)
assert s.mode == "beat"
def test_create_adaptive_time(self, store):
schedule = [
{"time": "08:00", "value": 0.5},
{"time": "20:00", "value": 1.0},
]
s = store.create_source(
name="AT", source_type="adaptive_time", schedule=schedule,
)
assert isinstance(s, AdaptiveValueSource)
assert len(s.schedule) == 2
def test_create_adaptive_time_insufficient_schedule(self, store):
with pytest.raises(ValueError, match="at least 2 points"):
store.create_source(
name="Bad", source_type="adaptive_time",
schedule=[{"time": "12:00", "value": 0.5}],
)
def test_create_daylight(self, store):
s = store.create_source(
name="DL", source_type="daylight",
speed=2.0, use_real_time=True, latitude=48.0,
)
assert isinstance(s, DaylightValueSource)
assert s.use_real_time is True
def test_create_invalid_type(self, store):
with pytest.raises(ValueError, match="Invalid source type"):
store.create_source(name="Bad", source_type="invalid")
def test_get_all(self, store):
store.create_source("A", "static")
store.create_source("B", "static")
assert len(store.get_all_sources()) == 2
def test_get(self, store):
created = store.create_source("Get", "static", value=0.3)
got = store.get_source(created.id)
assert got.name == "Get"
def test_delete(self, store):
s = store.create_source("Del", "static")
store.delete_source(s.id)
assert store.count() == 0
def test_update_static(self, store):
s = store.create_source("Stat", "static", value=0.5)
updated = store.update_source(s.id, value=0.9)
assert isinstance(updated, StaticValueSource)
assert updated.value == 0.9
def test_update_name(self, store):
s = store.create_source("Old", "static")
updated = store.update_source(s.id, name="New")
assert updated.name == "New"
def test_update_animated_fields(self, store):
s = store.create_source("Anim", "animated", waveform="sine", speed=10.0)
updated = store.update_source(s.id, waveform="square", speed=30.0)
assert isinstance(updated, AnimatedValueSource)
assert updated.waveform == "square"
assert updated.speed == 30.0
# ---------------------------------------------------------------------------
# Name uniqueness
# ---------------------------------------------------------------------------
class TestValueSourceNameUniqueness:
def test_duplicate_name(self, store):
store.create_source("Same", "static")
with pytest.raises(ValueError, match="already exists"):
store.create_source("Same", "animated")
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
class TestValueSourcePersistence:
def test_persist_and_reload(self, tmp_path):
path = str(tmp_path / "vs_persist.json")
s1 = ValueSourceStore(path)
src = s1.create_source("Persist", "static", value=0.42)
sid = src.id
s2 = ValueSourceStore(path)
loaded = s2.get_source(sid)
assert loaded.name == "Persist"
assert isinstance(loaded, StaticValueSource)
assert loaded.value == 0.42

View File

@@ -1,20 +1,25 @@
"""Tests for API endpoints."""
"""Tests for API endpoints (public + authenticated)."""
import pytest
from fastapi.testclient import TestClient
from wled_controller.main import app
from wled_controller import __version__
from wled_controller.config import get_config
client = TestClient(app)
# Build auth header from the first configured API key
_config = get_config()
_api_key = next(iter(_config.auth.api_keys.values()), "")
AUTH_HEADERS = {"Authorization": f"Bearer {_api_key}"} if _api_key else {}
def test_root_endpoint():
"""Test root endpoint returns the HTML dashboard."""
response = client.get("/")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "LED Grab" in response.text
def test_health_check():
@@ -38,8 +43,8 @@ def test_version_endpoint():
def test_get_displays():
"""Test get displays endpoint."""
response = client.get("/api/v1/config/displays")
"""Test get displays endpoint (requires auth)."""
response = client.get("/api/v1/config/displays", headers=AUTH_HEADERS)
assert response.status_code == 200
data = response.json()
assert "displays" in data
@@ -62,7 +67,6 @@ def test_openapi_docs():
response = client.get("/openapi.json")
assert response.status_code == 200
data = response.json()
assert data["info"]["title"] == "WLED Screen Controller"
assert data["info"]["version"] == __version__

View File

@@ -9,112 +9,90 @@ import yaml
from wled_controller.config import (
Config,
ServerConfig,
ProcessingConfig,
WLEDConfig,
StorageConfig,
AuthConfig,
MQTTConfig,
LoggingConfig,
get_config,
reload_config,
is_demo_mode,
)
def test_default_config():
"""Test default configuration values."""
config = Config()
class TestDefaultConfig:
def test_default_server_values(self):
config = Config()
assert config.server.host == "0.0.0.0"
assert config.server.port == 8080
assert config.server.log_level == "INFO"
assert config.server.host == "0.0.0.0"
assert config.server.port == 8080
assert config.processing.default_fps == 30
assert config.processing.max_fps == 60
assert config.wled.timeout == 5
def test_default_storage_paths(self):
config = Config()
assert config.storage.devices_file == "data/devices.json"
assert config.storage.sync_clocks_file == "data/sync_clocks.json"
def test_default_mqtt_disabled(self):
config = Config()
assert config.mqtt.enabled is False
def test_default_demo_off(self):
config = Config()
assert config.demo is False
def test_load_from_yaml(tmp_path):
"""Test loading configuration from YAML file."""
config_data = {
"server": {"host": "127.0.0.1", "port": 9000},
"processing": {"default_fps": 60, "border_width": 20},
"wled": {"timeout": 10},
}
class TestFromYaml:
def test_load_from_yaml(self, tmp_path):
config_data = {
"server": {"host": "127.0.0.1", "port": 9000},
"auth": {"api_keys": {"dev": "secret"}},
}
config_path = tmp_path / "test_config.yaml"
with open(config_path, "w") as f:
yaml.dump(config_data, f)
config_path = tmp_path / "test_config.yaml"
with open(config_path, "w") as f:
yaml.dump(config_data, f)
config = Config.from_yaml(config_path)
assert config.server.host == "127.0.0.1"
assert config.server.port == 9000
assert config.auth.api_keys == {"dev": "secret"}
config = Config.from_yaml(config_path)
assert config.server.host == "127.0.0.1"
assert config.server.port == 9000
assert config.processing.default_fps == 60
assert config.processing.border_width == 20
assert config.wled.timeout == 10
def test_load_from_yaml_file_not_found(self):
with pytest.raises(FileNotFoundError):
Config.from_yaml("nonexistent.yaml")
def test_load_from_yaml_file_not_found():
"""Test loading from non-existent YAML file."""
with pytest.raises(FileNotFoundError):
Config.from_yaml("nonexistent.yaml")
class TestEnvironmentVariables:
def test_env_overrides(self, monkeypatch):
monkeypatch.setenv("WLED_SERVER__HOST", "192.168.1.1")
monkeypatch.setenv("WLED_SERVER__PORT", "7000")
config = Config()
assert config.server.host == "192.168.1.1"
assert config.server.port == 7000
def test_environment_variables(monkeypatch):
"""Test configuration from environment variables."""
monkeypatch.setenv("WLED_SERVER__HOST", "192.168.1.1")
monkeypatch.setenv("WLED_SERVER__PORT", "7000")
monkeypatch.setenv("WLED_PROCESSING__DEFAULT_FPS", "45")
config = Config()
assert config.server.host == "192.168.1.1"
assert config.server.port == 7000
assert config.processing.default_fps == 45
class TestServerConfig:
def test_creation(self):
sc = ServerConfig(host="localhost", port=8000)
assert sc.host == "localhost"
assert sc.port == 8000
assert sc.log_level == "INFO"
def test_server_config():
"""Test server configuration."""
server_config = ServerConfig(host="localhost", port=8000)
class TestDemoMode:
def test_demo_rewrites_storage_paths(self):
config = Config(demo=True)
assert config.storage.devices_file.startswith("data/demo/")
assert config.storage.sync_clocks_file.startswith("data/demo/")
assert server_config.host == "localhost"
assert server_config.port == 8000
assert server_config.log_level == "INFO"
def test_non_demo_keeps_original_paths(self):
config = Config(demo=False)
assert config.storage.devices_file == "data/devices.json"
def test_processing_config():
"""Test processing configuration."""
proc_config = ProcessingConfig(default_fps=25, max_fps=50)
class TestGlobalConfig:
def test_get_config_returns_config(self):
config = get_config()
assert isinstance(config, Config)
assert proc_config.default_fps == 25
assert proc_config.max_fps == 50
assert proc_config.interpolation_mode == "average"
def test_wled_config():
"""Test WLED configuration."""
wled_config = WLEDConfig(timeout=10, retry_attempts=5)
assert wled_config.timeout == 10
assert wled_config.retry_attempts == 5
assert wled_config.protocol == "http"
def test_config_validation():
"""Test configuration validation."""
# Test valid interpolation mode
config = Config(
processing=ProcessingConfig(interpolation_mode="median")
)
assert config.processing.interpolation_mode == "median"
# Test invalid interpolation mode
with pytest.raises(ValueError):
ProcessingConfig(interpolation_mode="invalid")
def test_get_config():
"""Test global config getter."""
config = get_config()
assert isinstance(config, Config)
def test_reload_config():
"""Test config reload."""
config1 = get_config()
config2 = reload_config()
assert isinstance(config2, Config)
def test_reload_config_returns_new_config(self):
config = reload_config()
assert isinstance(config, Config)