feat: roadmap batch (2026-06-19) — solar/linear-light/dither/nanoleaf + integrations

Eight roadmap features from the 2026-06-19 review, each a full vertical
(backend + tests + frontend + i18n en/ru/zh); ~67 new unit tests:

- automations: SolarRule sunrise/sunset trigger (new utils/solar.py, shared
  with the daylight cycle; window logic mirrors TimeOfDayRule)
- ci: best-effort arm64 multi-arch Docker manifest via QEMU + docker manifest
  (release.yml; amd64 path untouched, continue-on-error)
- game-integration: wire the orphaned LoLPoller via a LoLPollManager + a shared
  runtime_state module (poll lifecycle on enable/CRUD/startup/shutdown)
- ui: color-harmony gradient generator (complementary/analogous/triadic/...)
- effects: audio-reactive palette modulation (new audio_energy_tap; brightness/
  saturation modulation across all 12 procedural effects)
- capture: linear-light blending + spatio-temporal dithering, opt-in per
  calibration (new utils/linear_light.py, utils/dither.py)
- devices: Nanoleaf extControl v2 per-panel UDP streaming (per_panel mode)

Also bundles the pending 2026-06-18 production-review fixes and other
in-progress work already in the working tree (manual-trigger rule, etc.),
since they share files and could not be cleanly separated.

Gate: ruff + tsc clean; pytest 2654 passed / 2 skipped. The single failing
test (automation manual_trigger handler coverage) is a separate in-progress
item owned elsewhere, intentionally left as-is.
This commit is contained in:
2026-06-22 23:21:24 +03:00
parent 126d8f2449
commit 6745e25b20
91 changed files with 4390 additions and 540 deletions
@@ -0,0 +1,102 @@
"""Tests for the manual-trigger automation route (POST /automations/{id}/trigger).
The AutomationEngine is replaced with a lightweight fake so the route layer is
tested without driving the real evaluation loop or scene application.
"""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.routes.automations import router
from ledgrab.storage.automation import ManualTriggerRule
from ledgrab.storage.automation_store import AutomationStore
class FakeEngine:
"""Stand-in exposing only what the trigger route calls."""
def __init__(self, result=("triggered", [])):
self.result = result
self.calls = []
async def fire_manual_trigger(self, automation):
self.calls.append(automation.id)
return self.result
@pytest.fixture
def _route_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "test.db")
yield db
db.close()
@pytest.fixture
def automation_store(_route_db) -> AutomationStore:
store = AutomationStore(_route_db)
store.create_automation(
name="Manual one",
enabled=True,
rule_logic="or",
rules=[ManualTriggerRule()],
scene_preset_id=None,
)
return store
@pytest.fixture
def fake_engine():
return FakeEngine()
@pytest.fixture
def client(automation_store, fake_engine):
app = FastAPI()
app.include_router(router)
from ledgrab.api.auth import verify_api_key
app.dependency_overrides[verify_api_key] = lambda: "test-user"
app.dependency_overrides[deps.get_automation_store] = lambda: automation_store
app.dependency_overrides[deps.get_automation_engine] = lambda: fake_engine
# Routes may fire entity events through the processor manager; give it a stub.
deps._deps["processor_manager"] = None
return TestClient(app, raise_server_exceptions=False)
def _first_id(store: AutomationStore) -> str:
return store.get_all_automations()[0].id
class TestTriggerRoute:
def test_trigger_returns_status(self, client, automation_store, fake_engine):
aid = _first_id(automation_store)
resp = client.post(f"/api/v1/automations/{aid}/trigger")
assert resp.status_code == 200
assert resp.json() == {"status": "triggered", "errors": []}
assert fake_engine.calls == [aid]
def test_trigger_skipped(self, client, automation_store, fake_engine):
fake_engine.result = ("skipped", [])
aid = _first_id(automation_store)
resp = client.post(f"/api/v1/automations/{aid}/trigger")
assert resp.status_code == 200
assert resp.json()["status"] == "skipped"
def test_trigger_partial_errors(self, client, automation_store, fake_engine):
fake_engine.result = ("partial", ["dev1: timeout"])
aid = _first_id(automation_store)
resp = client.post(f"/api/v1/automations/{aid}/trigger")
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "partial"
assert body["errors"] == ["dev1: timeout"]
def test_trigger_unknown_id_404(self, client, fake_engine):
resp = client.post("/api/v1/automations/auto_ghost/trigger")
assert resp.status_code == 404
assert fake_engine.calls == []
@@ -161,10 +161,42 @@ class TestCreateIntegration:
description="My game",
tags=["fps"],
)
assert data["adapter_config"] == {"auth_token": "secret123"}
# The auth_token is a live shared secret and must NEVER be echoed back
# over the API — it is masked to "" in every response.
assert data["adapter_config"] == {"auth_token": ""}
assert data["description"] == "My game"
assert data["tags"] == ["fps"]
def test_update_with_blank_token_preserves_secret(self, client, game_store):
"""The API masks secrets, so the edit form re-submits a blank token for
an unchanged secret. The update must PRESERVE the stored secret rather
than overwrite it with the blank (otherwise a no-op edit wipes the key).
"""
created = _create_integration(client, adapter_config={"auth_token": "secret123"})
gi_id = created["id"]
resp = client.put(
f"/api/v1/game-integrations/{gi_id}",
json={"name": "Renamed", "adapter_config": {"auth_token": ""}},
)
assert resp.status_code == 200, resp.text
# The stored (decrypted) secret is unchanged despite the blank submit.
cfg = game_store.get_integration(gi_id)
assert cfg.adapter_config.get("auth_token") == "secret123"
def test_update_with_new_token_replaces_secret(self, client, game_store):
"""A non-empty token in the update is a deliberate change and is kept."""
created = _create_integration(client, adapter_config={"auth_token": "secret123"})
gi_id = created["id"]
resp = client.put(
f"/api/v1/game-integrations/{gi_id}",
json={"adapter_config": {"auth_token": "rotated456"}},
)
assert resp.status_code == 200, resp.text
assert game_store.get_integration(gi_id).adapter_config.get("auth_token") == "rotated456"
def test_create_duplicate_name(self, client):
_create_integration(client, name="Unique")
resp = client.post(
@@ -132,8 +132,18 @@ def test_prune_by_max_entries(tmp_db):
recorder = _mock_recorder()
engine = ActivityLogRetentionEngine(repo=repo, db=db, recorder=recorder)
for _ in range(10):
repo.record(_make_entry())
# Give each entry a distinct, increasing timestamp and capture insertion
# order so we can assert *which* five survive — not just the count. The
# engine settings→prune path must keep the NEWEST five (keeping the wrong
# half of an audit log would otherwise pass a count-only assertion).
from ledgrab.storage.activity_log import ActivityLogFilters
base = datetime(2026, 1, 1, tzinfo=timezone.utc)
ids = []
for i in range(10):
e = _make_entry(ts=base + timedelta(hours=i))
ids.append(e.id)
repo.record(e)
assert repo.count() == 10
@@ -141,6 +151,10 @@ def test_prune_by_max_entries(tmp_db):
engine._prune()
assert repo.count() == 5
remaining = {r.id for r in repo.query(ActivityLogFilters(), limit=20)}
# Newest five (highest seq / latest ts) survive; oldest five are pruned.
assert all(sid in remaining for sid in ids[5:])
assert all(sid not in remaining for sid in ids[:5])
def test_prune_disabled_is_noop(tmp_db):
@@ -467,13 +467,18 @@ def test_activity_logged_event_payload_shape():
def test_entry_id_format():
"""Entry IDs must be 'al_' followed by 8 hex characters."""
"""Entry IDs must be 'al_' followed by the full 32-hex uuid4.
Widened from 8 hex (32 bits) to the full 128-bit uuid4 so a collision on the
UNIQUE id column — which the best-effort recorder would silently drop — is
astronomically unlikely even against the full retention window.
"""
recorder, persisted, _ = _make_recorder()
recorder.record(category=ActivityCategory.SYSTEM, action="a", message="m")
entry_id = persisted[0].id
assert entry_id.startswith("al_"), f"id does not start with 'al_': {entry_id!r}"
suffix = entry_id[3:]
assert len(suffix) == 8, f"id suffix length is {len(suffix)}, expected 8: {entry_id!r}"
assert len(suffix) == 32, f"id suffix length is {len(suffix)}, expected 32: {entry_id!r}"
assert all(c in "0123456789abcdef" for c in suffix), f"id suffix is not hex: {suffix!r}"
+126 -1
View File
@@ -1,7 +1,7 @@
"""Tests for AutomationEngine — rule evaluation in isolation."""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -10,6 +10,7 @@ from ledgrab.storage.automation import (
ApplicationRule,
Automation,
DisplayStateRule,
ManualTriggerRule,
StartupRule,
SystemIdleRule,
TimeOfDayRule,
@@ -552,6 +553,130 @@ class TestHTTPValueStreamExtraction:
assert _extract_simple_path(body, "MediaContainer.size", "") == 2
assert _extract_simple_path(body, "MediaContainer.Metadata[0].title", "") == "Show"
# ---------------------------------------------------------------------------
# Manual trigger — one-shot apply gated by the automation's rules
# ---------------------------------------------------------------------------
class TestManualTrigger:
"""fire_manual_trigger: evaluate rules with the manual term True, then
apply the scene once (without entering the sticky active state)."""
def _make(self, rules, *, enabled=True, logic="or", scene=None, aid="auto_manual"):
now = datetime.now(timezone.utc)
return Automation(
id=aid,
name="Manual",
enabled=enabled,
rule_logic=logic,
rules=rules,
scene_preset_id=scene,
deactivation_mode="none",
deactivation_scene_preset_id=None,
created_at=now,
updated_at=now,
)
def test_handle_manual_inert_by_default(self, engine):
# Outside a manual fire the rule reads False, so a manual-only
# automation never activates from the background tick.
assert engine._handle_manual(ManualTriggerRule(), None) is False
@pytest.mark.asyncio
async def test_fires_no_scene_one_shot(self, engine):
auto = self._make([ManualTriggerRule()])
status, errors = await engine.fire_manual_trigger(auto)
assert status == "triggered"
assert errors == []
# One-shot: records last_activated but does NOT enter the sticky state
# (so the background tick has nothing to reconcile away → no bounce).
assert auto.id not in engine._active_automations
assert auto.id in engine._last_activated
# The transient flag is always cleared after the evaluation.
assert engine._manual_fire_active is False
@pytest.mark.asyncio
async def test_skipped_when_and_companion_false(self, engine):
# manual AND an unset webhook → AND fails → nothing applied.
auto = self._make([ManualTriggerRule(), WebhookRule(token="nope")], logic="and")
status, errors = await engine.fire_manual_trigger(auto)
assert status == "skipped"
assert errors == []
assert auto.id not in engine._last_activated
@pytest.mark.asyncio
async def test_fires_or_when_companion_false(self, engine):
# manual OR an unset webhook → manual alone satisfies "or".
auto = self._make([ManualTriggerRule(), WebhookRule(token="nope")], logic="or")
status, _errors = await engine.fire_manual_trigger(auto)
assert status == "triggered"
@pytest.mark.asyncio
async def test_works_when_disabled(self, engine):
# enabled gates only the background loop; a manual trigger ignores it.
auto = self._make([ManualTriggerRule()], enabled=False)
status, _errors = await engine.fire_manual_trigger(auto)
assert status == "triggered"
@pytest.mark.asyncio
async def test_manual_only_automation_inert_in_background_tick(self, engine, mock_store):
created = mock_store.create_automation(
name="manual-bg",
enabled=True,
rule_logic="or",
rules=[ManualTriggerRule()],
scene_preset_id=None,
)
await engine.trigger_evaluate()
assert created.id not in engine._active_automations
@pytest.mark.asyncio
async def test_audits_with_user_actor(self, engine):
rec = MagicMock()
with patch("ledgrab.core.activity_log.recorder.get_module_recorder", return_value=rec):
await engine.fire_manual_trigger(self._make([ManualTriggerRule()]))
rec.record.assert_called_once()
kwargs = rec.record.call_args.kwargs
assert kwargs["action"] == "automation.triggered"
# No explicit actor → recorder resolves the current user (not "system").
assert "actor" not in kwargs
@pytest.mark.asyncio
async def test_applies_scene_activated_maps_to_triggered(self, engine):
engine._scene_preset_store = MagicMock()
preset = MagicMock()
preset.name = "Scene"
engine._scene_preset_store.get_preset.return_value = preset
engine._target_store = MagicMock()
engine._device_store = MagicMock()
auto = self._make([ManualTriggerRule()], scene="scene_x")
with patch(
"ledgrab.core.scenes.scene_activator.apply_scene_state",
new=AsyncMock(return_value=("activated", [])),
) as apply_mock:
status, errors = await engine.fire_manual_trigger(auto)
apply_mock.assert_awaited_once()
assert status == "triggered"
assert errors == []
@pytest.mark.asyncio
async def test_applies_scene_partial_passthrough(self, engine):
engine._scene_preset_store = MagicMock()
preset = MagicMock()
preset.name = "Scene"
engine._scene_preset_store.get_preset.return_value = preset
engine._target_store = MagicMock()
engine._device_store = MagicMock()
auto = self._make([ManualTriggerRule()], scene="scene_x")
with patch(
"ledgrab.core.scenes.scene_activator.apply_scene_state",
new=AsyncMock(return_value=("partial", ["dev1: timeout"])),
):
status, errors = await engine.fire_manual_trigger(auto)
assert status == "partial"
assert errors == ["dev1: timeout"]
def test_chained_indices(self):
from ledgrab.core.processing.value_stream import _extract_simple_path
@@ -21,6 +21,7 @@ from ledgrab.storage.automation import (
HTTPPollRule,
MQTTRule,
Rule,
SolarRule,
StartupRule,
SystemIdleRule,
TimeOfDayRule,
@@ -31,6 +32,7 @@ EXPECTED_RULE_TYPES = {
StartupRule,
ApplicationRule,
TimeOfDayRule,
SolarRule,
SystemIdleRule,
DisplayStateRule,
MQTTRule,
@@ -13,6 +13,7 @@ Coverage targets
from __future__ import annotations
import asyncio
from unittest.mock import MagicMock, patch
import pytest
@@ -233,7 +234,7 @@ class TestAuthInstrumentation:
req = self._make_mock_request()
with pytest.raises(Exception): # HTTPException 401
verify_api_key(req, None)
asyncio.run(verify_api_key(req, None))
# At least one warning record about auth
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
@@ -251,7 +252,7 @@ class TestAuthInstrumentation:
req = self._make_mock_request(client_ip="127.0.0.1")
with pytest.raises(Exception): # HTTPException 401
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
# At least one warning-level auth record
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
@@ -286,7 +287,7 @@ class TestAuthInstrumentation:
req = self._make_mock_request(client_ip="192.168.1.100")
with pytest.raises(Exception): # HTTPException 401
verify_api_key(req, None)
asyncio.run(verify_api_key(req, None))
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
assert len(warnings) >= 1
@@ -302,7 +303,7 @@ class TestAuthInstrumentation:
req = self._make_mock_request(client_ip="10.0.0.5")
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
auth_records = [e for e in persisted if e.category == ActivityCategory.AUTH]
assert len(auth_records) >= 1
@@ -7,6 +7,7 @@ shape, and self-referential exclusion.
from __future__ import annotations
import asyncio
import time
from unittest.mock import AsyncMock, MagicMock, patch
@@ -95,7 +96,7 @@ class TestNoSecretLeakage:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
assert len(persisted) >= 1, "Expected at least one auth record"
for entry in persisted:
@@ -121,7 +122,7 @@ class TestNoSecretLeakage:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, None)
asyncio.run(verify_api_key(req, None))
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
assert len(warnings) >= 1
@@ -152,7 +153,7 @@ class TestNoSecretLeakage:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
warnings = [e for e in persisted if e.severity == ActivitySeverity.WARNING]
assert len(warnings) >= 1
@@ -284,7 +285,7 @@ class TestNoSecretLeakage:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
for entry in persisted:
for v in entry.metadata.values():
@@ -367,7 +368,7 @@ class TestBestEffortResilience:
# Should raise the HTTP 401, not the recorder RuntimeError
with pytest.raises(Exception) as exc_info:
verify_api_key(req, None) # missing creds
asyncio.run(verify_api_key(req, None)) # missing creds
# Must be an HTTPException (401), NOT the RuntimeError from the recorder
assert "RuntimeError" not in type(exc_info.value).__name__
@@ -734,7 +735,7 @@ class TestNoDuplicateRecords:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
rejected = [e for e in persisted if e.action == "auth.rejected"]
assert len(rejected) == 1, f"Expected exactly 1 auth.rejected record, got {len(rejected)}"
@@ -768,7 +769,7 @@ class TestMetadataShape:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
rejected = [e for e in persisted if e.action == "auth.rejected"]
assert len(rejected) >= 1
@@ -800,7 +801,7 @@ class TestMetadataShape:
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
rejected = [e for e in persisted if e.action == "auth.rejected"]
assert rejected[0].metadata["client"] == _EXPECTED_IP
@@ -1041,7 +1042,7 @@ class TestCategorySeverityContract:
cfg.auth.api_keys = {"dev": "good"}
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
self._check_all_entries(persisted)
auth_records = [e for e in persisted if e.category == "auth"]
@@ -1384,7 +1385,7 @@ class TestAuthFailureThrottle:
cfg.auth.api_keys = {"dev": "correct-key"}
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
return persisted_ref
@@ -1414,7 +1415,7 @@ class TestAuthFailureThrottle:
cfg.auth.api_keys = {"dev": "real-key"}
mock_cfg.return_value = cfg
try:
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
except Exception:
exceptions_raised += 1
@@ -1444,7 +1445,7 @@ class TestAuthFailureThrottle:
cfg.auth.api_keys = {"dev": "right"}
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
rejected = [e for e in all_persisted if e.action == "auth.rejected"]
assert len(rejected) == len(
@@ -1477,7 +1478,7 @@ class TestAuthFailureThrottle:
cfg.auth.api_keys = {"dev": "correct"}
mock_cfg.return_value = cfg
with pytest.raises(Exception):
verify_api_key(req, creds)
asyncio.run(verify_api_key(req, creds))
rejected = [e for e in all_persisted if e.action == "auth.rejected"]
assert (
+40
View File
@@ -131,3 +131,43 @@ def test_build_frame_total_length(led_count):
frame = client._build_frame(pixels, brightness=255)
assert len(frame) == 6 + led_count * 3
async def test_close_settles_before_port_close(monkeypatch):
"""close() must let the board paint the black frame before resetting it.
The black frame has to be written AND given settle time before
``serial.close()`` toggles DTR (Arduino auto-reset). If the reset wins the
race the strip latches its last lit frame and "stays on". This guards the
ordering: write → flush → sleep(settle) → close.
"""
import concurrent.futures
from unittest.mock import MagicMock
from ledgrab.core.devices import adalight_client as mod
client = _make_client(led_count=3)
events: list[str] = []
serial = MagicMock()
serial.is_open = True
serial.write.side_effect = lambda *_a, **_k: events.append("write")
serial.flush.side_effect = lambda *_a, **_k: events.append("flush")
serial.close.side_effect = lambda *_a, **_k: events.append("close")
client._serial = serial
client._connected = True
client._tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
async def fake_sleep(_seconds):
events.append(f"sleep:{_seconds}")
monkeypatch.setattr(mod.asyncio, "sleep", fake_sleep)
await client.close()
# Black frame is written and flushed, the board is given settle time, and
# ONLY THEN is the port closed (which resets the board).
assert events == ["write", "flush", f"sleep:{mod.BLACK_FRAME_SETTLE_DELAY}", "close"]
assert mod.BLACK_FRAME_SETTLE_DELAY > 0
+113
View File
@@ -0,0 +1,113 @@
"""Tests for spatio-temporal dithering of the final 8-bit quantization.
The load-bearing property is *temporal convergence*: over many frames, the
dithered output time-averages back to the true sub-integer value (that's what
lets the eye see a higher effective bit depth instead of banding).
"""
from __future__ import annotations
import numpy as np
from ledgrab.core.capture.calibration import calibration_from_dict, calibration_to_dict
from ledgrab.core.capture.edge_interpolation import average_edge_to_leds
from ledgrab.utils.dither import ordered_dither_quantize
class TestOrderedDither:
def test_single_frame_is_floor_or_ceil(self):
vals = np.array([[100.4, 50.0, 200.7]], dtype=np.float32)
out = ordered_dither_quantize(vals, frame_index=7)
assert out.dtype == np.uint8
assert out[0, 0] in (100, 101)
assert out[0, 1] == 50 # exact integer never moves
assert out[0, 2] in (200, 201)
def test_integers_are_stable_across_frames(self):
vals = np.array([[10.0, 20.0, 30.0]], dtype=np.float32)
for f in range(20):
out = ordered_dither_quantize(vals, frame_index=f)
assert list(out[0]) == [10, 20, 30]
def test_temporal_average_converges_to_true_value(self):
vals = np.array([[100.4, 50.9, 200.25]], dtype=np.float32)
acc = np.zeros(3, dtype=np.float64)
n = 2000
for f in range(n):
acc += ordered_dither_quantize(vals, frame_index=f)[0]
avg = acc / n
assert abs(avg[0] - 100.4) < 0.3
assert abs(avg[1] - 50.9) < 0.3
assert abs(avg[2] - 200.25) < 0.3
def test_same_threshold_across_channels_preserves_hue_steps(self):
# All three channels share the per-LED threshold, so an equal-channel
# grey never splits into a coloured pixel.
vals = np.array([[123.5, 123.5, 123.5]], dtype=np.float32)
for f in range(50):
out = ordered_dither_quantize(vals, frame_index=f)
assert out[0, 0] == out[0, 1] == out[0, 2]
def test_clips_to_byte_range(self):
vals = np.array([[-5.0, 255.9, 300.0]], dtype=np.float32)
out = ordered_dither_quantize(vals, frame_index=3)
assert out[0, 0] == 0
assert out[0, 1] == 255
assert out[0, 2] == 255
class TestEdgeReductionDither:
def _half_edge(self):
# 2 columns (black, white) → 1 LED whose mean is exactly 127.5, i.e. a
# value the plain uint8 path can only represent as 127.
e = np.zeros((2, 2, 3), dtype=np.uint8)
e[:, 1, :] = 255
return e
def test_dithered_output_varies_by_frame(self):
edge = self._half_edge()
seen = {
int(average_edge_to_leds(edge, "top", 1, {}, "k", dither=True, frame_index=f)[0, 0])
for f in range(50)
}
# The 127.5 LED straddles two codes → dither flips it across frames.
assert seen == {127, 128}
def test_temporal_average_recovers_the_half_step(self):
edge = self._half_edge()
# Plain quantization truncates 127.5 → 127; dither recovers ~127.5.
plain = int(average_edge_to_leds(edge, "top", 1, {}, "k")[0, 0])
acc = np.zeros(3, dtype=np.float64)
n = 1500
for f in range(n):
acc += average_edge_to_leds(edge, "top", 1, {}, "k", dither=True, frame_index=f)[0]
avg = acc[0] / n
assert plain == 127
assert abs(avg - 127.5) < 0.2
class TestCalibrationRoundTrip:
def test_dither_round_trips(self):
cfg = calibration_from_dict(
{
"mode": "simple",
"layout": "clockwise",
"start_position": "bottom_left",
"leds_top": 5,
"dither": True,
}
)
assert cfg.dither is True
assert calibration_to_dict(cfg).get("dither") is True
def test_dither_default_off_and_omitted(self):
cfg = calibration_from_dict(
{
"mode": "simple",
"layout": "clockwise",
"start_position": "bottom_left",
"leds_top": 5,
}
)
assert cfg.dither is False
assert "dither" not in calibration_to_dict(cfg)
+92
View File
@@ -0,0 +1,92 @@
"""Tests for linear-light blending in the per-LED reduction.
Verifies the sRGB↔linear conversion correctness and that the edge-reduction
kernel, when ``linear=True``, blends in linear light (a mid-grey from black +
white is brighter than the gamma-space mean) — plus the CalibrationConfig
round-trip of the opt-in flag.
"""
from __future__ import annotations
import numpy as np
from ledgrab.core.capture.calibration import calibration_from_dict, calibration_to_dict
from ledgrab.core.capture.edge_interpolation import average_edge_to_leds
from ledgrab.utils.linear_light import (
SRGB_TO_LINEAR_LUT,
linear_to_srgb_uint8,
srgb_to_linear,
)
class TestConversions:
def test_lut_endpoints_and_monotonic(self):
assert SRGB_TO_LINEAR_LUT.shape == (256,)
assert SRGB_TO_LINEAR_LUT[0] == 0.0
assert abs(SRGB_TO_LINEAR_LUT[255] - 1.0) < 1e-6
assert np.all(np.diff(SRGB_TO_LINEAR_LUT) > 0) # strictly increasing
def test_mid_grey_decodes_below_half(self):
# sRGB 0.5 (≈128) is ~0.214 in linear light, not 0.5.
assert 0.18 < float(SRGB_TO_LINEAR_LUT[128]) < 0.25
def test_round_trip_is_near_identity(self):
ramp = np.arange(256, dtype=np.uint8)
back = linear_to_srgb_uint8(srgb_to_linear(ramp))
assert np.max(np.abs(back.astype(int) - ramp.astype(int))) <= 1
def test_linear_mean_of_black_and_white_is_brighter(self):
lin = (srgb_to_linear(np.array([0, 255], dtype=np.uint8))).mean()
encoded = int(linear_to_srgb_uint8(np.array([lin], dtype=np.float32))[0])
assert encoded > 127 # brighter than the sRGB mean (127)
assert 180 < encoded < 195 # ~188
class TestEdgeReductionLinear:
def _edge(self):
# top edge (axis=0): shape (rows=2, width=4, 3); two black + two white cols
e = np.zeros((2, 4, 3), dtype=np.uint8)
e[:, 2:, :] = 255
return e
def test_srgb_blend_is_plain_mean(self):
out = average_edge_to_leds(self._edge(), "top", 1, {}, "k", linear=False)
assert int(out[0, 0]) == 127
def test_linear_blend_is_brighter(self):
out = average_edge_to_leds(self._edge(), "top", 1, {}, "k", linear=True)
assert int(out[0, 0]) > 127
assert 180 < int(out[0, 0]) < 195
def test_uniform_edge_unchanged_by_linear(self):
e = np.full((2, 4, 3), 200, dtype=np.uint8)
out = average_edge_to_leds(e, "top", 1, {}, "k", linear=True)
# A flat colour survives the decode→mean→encode round-trip (±1).
assert abs(int(out[0, 0]) - 200) <= 1
class TestCalibrationRoundTrip:
def test_linear_blend_round_trips_simple(self):
cfg = calibration_from_dict(
{
"mode": "simple",
"layout": "clockwise",
"start_position": "bottom_left",
"leds_top": 5,
"linear_blend": True,
}
)
assert cfg.linear_blend is True
assert calibration_to_dict(cfg).get("linear_blend") is True
def test_default_is_off_and_omitted(self):
cfg = calibration_from_dict(
{
"mode": "simple",
"layout": "clockwise",
"start_position": "bottom_left",
"leds_top": 5,
}
)
assert cfg.linear_blend is False
assert "linear_blend" not in calibration_to_dict(cfg)
+178
View File
@@ -0,0 +1,178 @@
"""Tests for the LoL poll manager + shared game-integration payload processing.
Covers the runtime wiring that was previously missing: the orphaned
``LoLPoller`` is now driven by ``LoLPollManager``, and polled payloads flow
through the same ``process_payload`` core the HTTP ingest route uses.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import pytest
from ledgrab.core.game_integration import runtime_state
from ledgrab.core.game_integration.adapters.lol_adapter import LoLAdapter
from ledgrab.core.game_integration.lol_poll_manager import LoLPollManager
# ── Fakes ───────────────────────────────────────────────────────────────
class _FakeBus:
def __init__(self) -> None:
self.published: list[Any] = []
def publish(self, event: Any) -> None:
self.published.append(event)
@dataclass
class _FakeConfig:
id: str
enabled: bool
adapter_type: str
adapter_config: dict = field(default_factory=dict)
class _FakePoller:
"""Stand-in for LoLPoller so the manager test never spawns real threads."""
instances: list["_FakePoller"] = []
def __init__(self, adapter_config: dict, callback: Any) -> None:
self.adapter_config = adapter_config
self.callback = callback
self.started = False
self.stopped = False
_FakePoller.instances.append(self)
def start(self) -> None:
self.started = True
def stop(self) -> None:
self.stopped = True
@pytest.fixture
def fake_poller(monkeypatch):
_FakePoller.instances = []
monkeypatch.setattr("ledgrab.core.game_integration.lol_poll_manager.LoLPoller", _FakePoller)
return _FakePoller
def _lol(id: str = "lol1", enabled: bool = True, **cfg) -> _FakeConfig:
return _FakeConfig(id=id, enabled=enabled, adapter_type="lol", adapter_config=cfg)
# ── process_payload ─────────────────────────────────────────────────────
class TestProcessPayload:
def setup_method(self):
runtime_state.cleanup_state("pp_test")
def test_publishes_parsed_events_and_records_stats(self):
bus = _FakeBus()
payload = {
"activePlayer": {
"championStats": {"currentHealth": 50, "maxHealth": 100},
"summonerName": "Me",
"level": 9,
},
"allPlayers": [],
}
events = runtime_state.process_payload("pp_test", LoLAdapter, {}, payload, bus)
assert len(events) >= 1
assert any(e.event_type == "health" and abs(e.value - 0.5) < 1e-6 for e in events)
assert len(bus.published) == len(events)
assert runtime_state.get_stats("pp_test")["event_count"] == len(events)
def test_swallows_parse_errors(self):
bus = _FakeBus()
class _Boom:
ADAPTER_TYPE = "boom"
@classmethod
def parse_payload(cls, *a):
raise ValueError("bad frame")
events = runtime_state.process_payload("pp_test", _Boom, {}, {}, bus)
assert events == []
assert bus.published == []
# ── LoLPollManager lifecycle ────────────────────────────────────────────
class TestLoLPollManager:
def test_sync_starts_poller_for_enabled_lol(self, fake_poller):
mgr = LoLPollManager(_FakeBus())
mgr.sync([_lol()])
assert mgr.active_count == 1
assert fake_poller.instances[-1].started is True
def test_sync_ignores_non_lol_and_disabled(self, fake_poller):
mgr = LoLPollManager(_FakeBus())
mgr.sync(
[
_FakeConfig("cs", True, "cs2", {}),
_lol("off", enabled=False),
]
)
assert mgr.active_count == 0
assert fake_poller.instances == []
def test_sync_is_idempotent_for_unchanged_config(self, fake_poller):
mgr = LoLPollManager(_FakeBus())
mgr.sync([_lol(poll_interval_ms=500)])
first = fake_poller.instances[-1]
mgr.sync([_lol(poll_interval_ms=500)])
assert mgr.active_count == 1
assert len(fake_poller.instances) == 1 # not restarted
assert first.stopped is False
def test_sync_restarts_on_config_change(self, fake_poller):
mgr = LoLPollManager(_FakeBus())
mgr.sync([_lol(poll_interval_ms=500)])
first = fake_poller.instances[-1]
mgr.sync([_lol(poll_interval_ms=1000)])
assert first.stopped is True
assert len(fake_poller.instances) == 2
assert mgr.active_count == 1
def test_sync_stops_when_removed_or_disabled(self, fake_poller):
mgr = LoLPollManager(_FakeBus())
mgr.sync([_lol()])
poller = fake_poller.instances[-1]
mgr.sync([]) # integration gone
assert poller.stopped is True
assert mgr.active_count == 0
def test_stop_all(self, fake_poller):
mgr = LoLPollManager(_FakeBus())
mgr.sync([_lol("a"), _lol("b")])
assert mgr.active_count == 2
mgr.stop_all()
assert mgr.active_count == 0
assert all(p.stopped for p in fake_poller.instances)
def test_callback_routes_polled_data_through_process_payload(self, fake_poller):
bus = _FakeBus()
mgr = LoLPollManager(bus)
mgr.sync([_lol("cbtest")])
callback = fake_poller.instances[-1].callback
callback(
{
"activePlayer": {
"championStats": {"currentHealth": 80, "maxHealth": 100},
"summonerName": "Me",
"level": 1,
},
"allPlayers": [],
}
)
assert any(e.event_type == "health" for e in bus.published)
+89
View File
@@ -0,0 +1,89 @@
"""Tests for Nanoleaf extControl v2 per-panel streaming.
The device-side (panelLayout fetch, extControl enable, UDP send) needs a real
controller to validate; here we lock down the parts that DON'T: panel
ordering, strip→panel resampling, the exact UDP packet framing, and the
``nanoleaf_per_panel`` config round-trip through the device store.
"""
from __future__ import annotations
import struct
from ledgrab.core.devices.nanoleaf_client import (
build_extcontrol_v2_packet,
map_pixels_to_panels,
order_panels,
)
from ledgrab.storage.device_store import Device
class TestOrderPanels:
def test_sorts_by_x_then_y_and_drops_controller(self):
position = [
{"panelId": 5, "x": 100, "y": 0},
{"panelId": 3, "x": 0, "y": 0},
{"panelId": 0, "x": 50, "y": 50}, # controller / rhythm → dropped
{"panelId": 7, "x": 0, "y": 100},
]
assert order_panels(position) == [3, 7, 5]
def test_ignores_non_integer_panel_ids(self):
assert order_panels([{"panelId": "x", "x": 0, "y": 0}, {"x": 1, "y": 1}]) == []
class TestMapPixelsToPanels:
def test_nearest_neighbour_resample(self):
pixels = [[10, 10, 10], [20, 20, 20], [30, 30, 30], [40, 40, 40]]
out = map_pixels_to_panels(pixels, [1, 2])
assert out == [(1, 10, 10, 10), (2, 30, 30, 30)]
def test_empty_strip_is_black(self):
assert map_pixels_to_panels([], [9, 8]) == [(9, 0, 0, 0), (8, 0, 0, 0)]
def test_more_panels_than_pixels_repeats(self):
out = map_pixels_to_panels([[255, 0, 0]], [1, 2, 3])
assert out == [(1, 255, 0, 0), (2, 255, 0, 0), (3, 255, 0, 0)]
class TestPacket:
def test_framing_is_byte_exact(self):
panels = [(100, 10, 20, 30), (200, 40, 50, 60)]
pkt = build_extcontrol_v2_packet(panels)
assert len(pkt) == 2 + 2 * 8 # uint16 header + 8 bytes/panel
assert struct.unpack(">H", pkt[0:2])[0] == 2
pid, r, g, b, w, trans = struct.unpack(">HBBBBH", pkt[2:10])
assert (pid, r, g, b, w, trans) == (100, 10, 20, 30, 0, 1)
pid2, r2, g2, b2, w2, trans2 = struct.unpack(">HBBBBH", pkt[10:18])
assert (pid2, r2, g2, b2, w2, trans2) == (200, 40, 50, 60, 0, 1)
def test_values_are_masked_to_byte_range(self):
pkt = build_extcontrol_v2_packet([(70000, 300, -5, 256)])
pid, r, g, b, w, trans = struct.unpack(">HBBBBH", pkt[2:10])
assert pid == 70000 & 0xFFFF
assert r == 300 & 0xFF and b == 256 & 0xFF
class TestConfigRoundTrip:
def _device(self, per_panel: bool) -> Device:
return Device(
device_id="d1",
name="Shapes",
url="nanoleaf://1.2.3.4",
led_count=10,
device_type="nanoleaf",
nanoleaf_token="tok",
nanoleaf_per_panel=per_panel,
)
def test_per_panel_round_trips_through_store(self):
d = self._device(True)
assert d.to_dict().get("nanoleaf_per_panel") is True
back = Device.from_dict(d.to_dict())
assert back.nanoleaf_per_panel is True
assert back.to_config().nanoleaf_per_panel is True
def test_default_off_is_omitted(self):
d = self._device(False)
assert "nanoleaf_per_panel" not in d.to_dict()
assert d.to_config().nanoleaf_per_panel is False
+181
View File
@@ -0,0 +1,181 @@
"""Tests for audio-reactive palette modulation on procedural effects.
Covers the model round-trip, the AudioEnergyTap (resolve/acquire/energy via
fakes), and the per-frame brightness/saturation modulation applied to a
rendered effect frame.
"""
from __future__ import annotations
from datetime import datetime, timezone
import numpy as np
from ledgrab.core.processing.audio_energy_tap import AudioEnergyTap
from ledgrab.core.processing.effect_stream import EffectColorStripStream
from ledgrab.storage.color_strip_source import EffectColorStripSource
def _make_source(**overrides) -> EffectColorStripSource:
base = dict(
id="fx1",
name="fx",
source_type="effect",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
effect_type="plasma",
)
base.update(overrides)
return EffectColorStripSource.create_from_kwargs(**base)
# ── Model ───────────────────────────────────────────────────────────────
class TestModel:
def test_defaults(self):
s = _make_source()
assert s.audio_reactive is False
assert s.reactive_mode == "brightness"
assert s.reactive_audio_source_id == ""
def test_round_trip_preserves_reactive_fields(self):
s = _make_source(
audio_reactive=True,
reactive_audio_source_id="as_42",
reactive_mode="both",
reactive_intensity=0.5,
)
back = EffectColorStripSource.from_dict(s.to_dict())
assert back.audio_reactive is True
assert back.reactive_audio_source_id == "as_42"
assert back.reactive_mode == "both"
assert back.reactive_intensity.value == 0.5
def test_apply_update_changes_reactive_fields(self):
s = _make_source()
s.apply_update(audio_reactive=True, reactive_mode="saturation", reactive_intensity=0.9)
assert s.audio_reactive is True
assert s.reactive_mode == "saturation"
assert s.reactive_intensity.value == 0.9
# ── AudioEnergyTap ──────────────────────────────────────────────────────
class _Analysis:
def __init__(self, rms):
self.rms = rms
class _CaptureStream:
def __init__(self, rms=0.25):
self._rms = rms
def get_latest_analysis(self):
return _Analysis(self._rms)
class _Resolved:
device_index = 3
is_loopback = True
audio_template_id = ""
class _SourceStore:
def resolve_audio_source(self, sid):
return _Resolved()
class _Manager:
def __init__(self):
self.acquired = []
self.released = []
self.stream = _CaptureStream()
def acquire(self, device, loopback, engine_type=None, engine_config=None):
self.acquired.append((device, loopback))
return self.stream
def release(self, device, loopback, engine_type=None):
self.released.append((device, loopback))
class TestAudioEnergyTap:
def test_unavailable_without_manager(self):
tap = AudioEnergyTap(None)
assert tap.available is False
tap.start()
assert tap.energy() == 0.0
def test_configure_resolves_capture_params(self):
mgr = _Manager()
tap = AudioEnergyTap(mgr, _SourceStore())
tap.configure("as_1")
tap.start()
assert tap.active is True
assert mgr.acquired == [(3, True)]
def test_energy_smooths_rms(self):
mgr = _Manager()
tap = AudioEnergyTap(mgr, _SourceStore())
tap.configure("as_1")
tap.start()
# rms 0.25 * gain 4 = 1.0 (clamped); EMA rises toward 1.0
e1 = tap.energy()
e2 = tap.energy()
assert 0.0 < e1 < e2 <= 1.0
def test_stop_releases_and_resets(self):
mgr = _Manager()
tap = AudioEnergyTap(mgr, _SourceStore())
tap.configure("as_1")
tap.start()
tap.stop()
assert tap.active is False
assert mgr.released == [(3, True)]
assert tap.energy() == 0.0
# ── Modulation ──────────────────────────────────────────────────────────
class _FixedTap:
def __init__(self, energy):
self._e = energy
def energy(self, smoothing=0.4):
return self._e
class TestModulation:
def _stream(self, mode, intensity, energy):
src = _make_source(audio_reactive=True, reactive_mode=mode, reactive_intensity=intensity)
st = EffectColorStripStream(src)
st._audio_tap = _FixedTap(energy)
return st
def test_brightness_silence_dims_to_zero(self):
st = self._stream("brightness", 1.0, 0.0)
buf = np.array([[200, 100, 50]], dtype=np.uint8)
st._apply_audio_modulation(buf)
assert np.array_equal(buf, np.array([[0, 0, 0]], dtype=np.uint8))
def test_brightness_full_energy_preserves(self):
st = self._stream("brightness", 1.0, 1.0)
buf = np.array([[200, 100, 50]], dtype=np.uint8)
st._apply_audio_modulation(buf)
assert np.array_equal(buf, np.array([[200, 100, 50]], dtype=np.uint8))
def test_saturation_silence_desaturates_to_luminance(self):
st = self._stream("saturation", 1.0, 0.0)
buf = np.array([[200, 100, 50]], dtype=np.uint8)
st._apply_audio_modulation(buf)
# All channels collapse to the luminance value (greyscale).
assert buf[0, 0] == buf[0, 1] == buf[0, 2]
def test_zero_intensity_is_noop(self):
st = self._stream("both", 0.0, 0.0)
buf = np.array([[200, 100, 50]], dtype=np.uint8)
st._apply_audio_modulation(buf)
assert np.array_equal(buf, np.array([[200, 100, 50]], dtype=np.uint8))
+334
View File
@@ -0,0 +1,334 @@
"""Regression tests for the 2026-06-18 production-readiness review fixes.
Each test maps to a confirmed finding from the review and would fail against
the pre-fix code. Grouped by area; see the inline finding tags (#N).
"""
from __future__ import annotations
import threading
import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from ledgrab.storage.activity_log import (
ActivityCategory,
ActivityLogEntry,
ActivityLogFilters,
ActivitySeverity,
)
from ledgrab.storage.activity_log_repository import ActivityLogRepository
from ledgrab.storage.database import Database
def _entry(*, ts: datetime | None = None, message: str = "m") -> ActivityLogEntry:
return ActivityLogEntry(
id="al_" + uuid.uuid4().hex[:8],
ts=ts or datetime.now(timezone.utc),
category=ActivityCategory.SYSTEM,
action="test.action",
severity=ActivitySeverity.INFO,
actor="system",
message=message,
)
@pytest.fixture
def repo(tmp_db: Database) -> ActivityLogRepository:
return ActivityLogRepository(tmp_db)
# ---------------------------------------------------------------------------
# #8 — activity-log entry id has full 128-bit entropy
# ---------------------------------------------------------------------------
def test_new_id_uses_full_uuid_hex():
from ledgrab.core.activity_log.recorder import _new_id
val = _new_id()
assert val.startswith("al_")
hex_part = val[3:]
assert len(hex_part) == 32 # full uuid4 hex (was 8 → collision-prone)
int(hex_part, 16) # parses as hex
# ---------------------------------------------------------------------------
# #26 — sanitize_display honours its bounded-length contract for tiny maxlen
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("maxlen", [0, 1, 2, 5])
def test_sanitize_display_respects_bound(maxlen):
from ledgrab.core.activity_log.sanitize import sanitize_display
result = sanitize_display("abcdef", maxlen=maxlen)
assert len(result) <= maxlen
def test_sanitize_display_degenerate_maxlen_returns_empty():
from ledgrab.core.activity_log.sanitize import sanitize_display
assert sanitize_display("abcdef", maxlen=0) == ""
assert sanitize_display("abcdef", maxlen=-3) == ""
# ---------------------------------------------------------------------------
# #10 — non-ASCII Bearer / WS token does not raise from compare_digest
# ---------------------------------------------------------------------------
def test_match_api_key_non_ascii_token_returns_none_not_raises():
from ledgrab.api.auth import _match_api_key
with patch("ledgrab.api.auth.get_config") as cfg:
c = MagicMock()
c.auth.api_keys = {"dev": "correct-key"}
cfg.return_value = c
# café contains a non-ASCII char; must cleanly fail to match, not raise.
assert _match_api_key("café") is None
assert _match_api_key("correct-key") == "dev"
# ---------------------------------------------------------------------------
# #15 — game adapters tolerate non-ASCII attacker-controlled tokens
# ---------------------------------------------------------------------------
def test_generic_webhook_adapter_non_ascii_header_returns_false():
from ledgrab.core.game_integration.adapters.generic_webhook_adapter import (
GenericWebhookAdapter,
)
ok = GenericWebhookAdapter.validate_auth(
{"Authorization": "Bearer café"}, {}, {"auth_token": "secret123"}
)
assert ok is False # no TypeError
def test_cs2_adapter_non_ascii_payload_token_returns_false():
from ledgrab.core.game_integration.adapters.cs2_adapter import CS2Adapter
ok = CS2Adapter.validate_auth({}, {"auth": {"token": "café"}}, {"auth_token": "secret123"})
assert ok is False # no TypeError
# ---------------------------------------------------------------------------
# #2 — async auth dependency sets the actor ContextVar visibly to the handler
# ---------------------------------------------------------------------------
def test_async_auth_dep_actor_visible_to_handler():
import asyncio
from ledgrab.api.auth import verify_api_key
# Guard against a regression back to a sync dependency (which would run the
# contextvar mutation in a throwaway threadpool context the handler can't see).
assert asyncio.iscoroutinefunction(verify_api_key)
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from ledgrab.core.activity_log.context import current_actor
app = FastAPI()
# Use Depends() as a default value rather than the ``AuthRequired`` Annotated
# alias: this module has ``from __future__ import annotations`` (stringized
# annotations), and the route is defined in a local scope FastAPI can't
# resolve the alias from — the default-value form sidesteps that entirely.
@app.get("/whoami")
async def whoami(auth=Depends(verify_api_key)):
return {"label": auth, "actor": current_actor.get()}
with patch("ledgrab.api.auth.get_config") as cfg:
c = MagicMock()
c.auth.api_keys = {"dev": "k"}
cfg.return_value = c
client = TestClient(app)
resp = client.get("/whoami", headers={"Authorization": "Bearer k"})
assert resp.status_code == 200
body = resp.json()
assert body["label"] == "dev"
# Pre-fix (sync dep) this would be "system"; the async dep makes it visible.
assert body["actor"] == "dev"
# ---------------------------------------------------------------------------
# #9 / #12 — auth-failure throttle dict survives concurrent access
# ---------------------------------------------------------------------------
def test_should_record_auth_failure_concurrent_no_exception():
from ledgrab.api import auth as auth_mod
auth_mod._auth_record_last.clear()
# Pre-fill to one below the hard cap so the eviction branch is hot.
cap = auth_mod._AUTH_THROTTLE_HARD_CAP
for i in range(cap - 1):
auth_mod._auth_record_last[f"seed-{i}"] = 0.0
errors: list[BaseException] = []
def hammer(base: int):
try:
for j in range(500):
auth_mod._should_record_auth_failure(f"{base}.{j}")
except BaseException as exc: # noqa: BLE001 - capture any escape
errors.append(exc)
threads = [threading.Thread(target=hammer, args=(t,)) for t in range(16)]
for th in threads:
th.start()
for th in threads:
th.join(timeout=30)
assert not errors, f"throttle raised under concurrency: {errors[:3]}"
assert len(auth_mod._auth_record_last) <= cap
auth_mod._auth_record_last.clear()
# ---------------------------------------------------------------------------
# #1 / #4 — since/until filter normalises naive + non-UTC-offset datetimes
# ---------------------------------------------------------------------------
def test_until_boundary_includes_entry_at_exact_instant(repo: ActivityLogRepository):
# Entry stored at exactly 12:00 UTC.
instant = datetime(2026, 6, 18, 12, 0, 0, tzinfo=timezone.utc)
repo.record(_entry(ts=instant, message="boundary"))
# A naive datetime-local value at the same wall-clock (no offset) — the
# realistic frontend path. Pre-fix this lexically excluded the row.
naive_until = datetime(2026, 6, 18, 12, 0, 0) # noqa: DTZ001 - intentional naive
page = repo.query(ActivityLogFilters(until=naive_until), limit=10)
assert len(page) == 1
def test_since_with_non_utc_offset_includes_correct_instant(repo: ActivityLogRepository):
# Stored at 13:30 UTC.
repo.record(_entry(ts=datetime(2026, 6, 18, 13, 30, tzinfo=timezone.utc), message="x"))
# since expressed in +02:00 == 13:00 UTC → row (13:30Z) must be included.
since_incl = datetime(2026, 6, 18, 15, 0, tzinfo=timezone(timedelta(hours=2)))
assert len(repo.query(ActivityLogFilters(since=since_incl), limit=10)) == 1
# since == 16:00+02:00 == 14:00 UTC → row (13:30Z) must be excluded.
since_excl = datetime(2026, 6, 18, 16, 0, tzinfo=timezone(timedelta(hours=2)))
assert len(repo.query(ActivityLogFilters(since=since_excl), limit=10)) == 0
# ---------------------------------------------------------------------------
# #21 — iter_export advances the keyset cursor correctly across batches
# ---------------------------------------------------------------------------
def test_iter_export_multi_batch_no_gaps_or_dupes(repo: ActivityLogRepository):
n = 7
for i in range(n):
repo.record(_entry(message=f"e{i}"))
# batch_size=2 over 7 rows → 4 batches, exercising cursor advancement.
exported = list(repo.iter_export(batch_size=2))
ids = [e.id for e in exported]
assert len(ids) == n
assert len(set(ids)) == n # no duplicates across batch boundaries
# Identical set to a single-batch run.
assert set(ids) == {e.id for e in repo.iter_export(batch_size=1000)}
# ---------------------------------------------------------------------------
# #13 — undecryptable secret envelope is preserved, not discarded
# ---------------------------------------------------------------------------
def test_decrypt_failure_preserves_envelope(monkeypatch):
from ledgrab.storage import game_integration as gi
envelope = "ENC:v1:undecryptable-blob"
monkeypatch.setattr(gi.secret_box, "is_encrypted", lambda v: v == envelope)
def _boom(_v):
raise ValueError("secret key missing")
monkeypatch.setattr(gi.secret_box, "decrypt", _boom)
result = gi._decrypt_adapter_config({"auth_token": envelope, "other": "x"})
# Pre-fix: result["auth_token"] == "" (data loss on the next write-through).
assert result["auth_token"] == envelope
assert result["other"] == "x"
# ---------------------------------------------------------------------------
# #22 — real-thread / real-DB concurrency on the repository
# ---------------------------------------------------------------------------
def test_repo_concurrent_writes_are_consistent(repo: ActivityLogRepository):
threads_n, per_thread = 8, 100
total = threads_n * per_thread
def worker():
for _ in range(per_thread):
repo.record(_entry())
threads = [threading.Thread(target=worker) for _ in range(threads_n)]
for th in threads:
th.start()
for th in threads:
th.join(timeout=60)
assert repo.count() == total
exported = list(repo.iter_export(batch_size=50))
assert len(exported) == total
assert len({e.id for e in exported}) == total # unique, no corruption
# ---------------------------------------------------------------------------
# #23 — CSV export strips control chars from string cells (defense-in-depth)
# ---------------------------------------------------------------------------
def test_csv_export_strips_control_chars(repo: ActivityLogRepository):
import csv
import io
from ledgrab.api.routes.activity_log import _CSV_COLUMNS, _export_csv_generator
evil = "evil\x00dev\x1b[31mred\x1b[0m\r\ninject"
repo.record(_entry(message=evil))
text = b"".join(_export_csv_generator(repo, ActivityLogFilters())).decode("utf-8")
rows = list(csv.reader(io.StringIO(text)))
assert len(rows) == 2 # header + 1 data row (newline did not split the field)
msg_cell = rows[1][_CSV_COLUMNS.index("message")]
for bad in ("\x00", "\x1b", "\r", "\n"):
assert bad not in msg_cell, f"control char {bad!r} survived into the CSV cell"
# ---------------------------------------------------------------------------
# #44 — non-serialisable metadata is dropped best-effort, never raises
# ---------------------------------------------------------------------------
def test_non_serializable_metadata_does_not_raise():
from ledgrab.core.activity_log.recorder import ActivityRecorder
persisted: list = []
repo = MagicMock()
# Route through to_row() so the real json.dumps codec runs (and raises).
repo.record.side_effect = lambda entry: persisted.append(entry.to_row())
recorder = ActivityRecorder(repo, MagicMock(), loop=None)
# Must not raise into the caller despite the un-encodable values.
recorder.record(
category=ActivityCategory.SYSTEM,
action="bad.metadata",
message="m",
metadata={"when": datetime.now(timezone.utc), "tags": {1, 2, 3}},
)
assert persisted == [], "non-serialisable entry must not persist"
+188
View File
@@ -0,0 +1,188 @@
"""Tests for SolarRule (sunrise/sunset automation trigger).
Covers the data model (round-trip + input validation), the engine's
``_evaluate_solar`` window logic (sunrise/sunset patched to fixed hours so the
test is location/clock-independent), the shared solar math, and the API
schema ↔ Rule mapping.
"""
from __future__ import annotations
from datetime import datetime
from unittest.mock import patch
from ledgrab.api.routes.automations import _rule_from_schema, _rule_to_schema
from ledgrab.api.schemas.automations import RuleSchema
from ledgrab.core.automations import automation_engine
from ledgrab.core.automations.automation_engine import AutomationEngine
from ledgrab.storage.automation import Rule, SolarRule
from ledgrab.utils.solar import compute_solar_times
# Fixed solar times the engine math is patched to: sunrise 06:00, sunset 19:00.
_SUNRISE_MIN = 6 * 60 # 360
_SUNSET_MIN = 19 * 60 # 1140
# ── Data model ──────────────────────────────────────────────────────────
class TestSolarRuleModel:
def test_defaults_are_night_window(self):
r = SolarRule()
assert r.rule_type == "solar"
assert r.start_event == "sunset"
assert r.end_event == "sunrise"
assert r.start_offset_minutes == 0 and r.end_offset_minutes == 0
def test_to_dict_from_dict_round_trip(self):
r = SolarRule(
start_event="sunrise",
start_offset_minutes=-30,
end_event="sunset",
end_offset_minutes=45,
latitude=51.5,
longitude=-0.12,
days_of_week=[5, 6],
timezone="Europe/London",
)
back = SolarRule.from_dict(r.to_dict())
assert back == r
def test_from_dict_dispatches_via_base_rule(self):
d = {"rule_type": "solar", "start_event": "sunrise"}
r = Rule.from_dict(d)
assert isinstance(r, SolarRule)
assert r.start_event == "sunrise"
def test_invalid_event_falls_back_to_default(self):
r = SolarRule.from_dict({"start_event": "noon", "end_event": ""})
assert r.start_event == "sunset"
assert r.end_event == "sunrise"
def test_offsets_are_clamped(self):
r = SolarRule.from_dict({"start_offset_minutes": 9000, "end_offset_minutes": -9000})
assert r.start_offset_minutes == 1439
assert r.end_offset_minutes == -1439
def test_offsets_handle_non_numeric(self):
r = SolarRule.from_dict({"start_offset_minutes": None, "end_offset_minutes": "x"})
assert r.start_offset_minutes == 0 and r.end_offset_minutes == 0
def test_coords_are_clamped(self):
r = SolarRule.from_dict({"latitude": 200.0, "longitude": -400.0})
assert r.latitude == 90.0
assert r.longitude == -180.0
def test_days_of_week_filtered_and_sorted(self):
r = SolarRule.from_dict({"days_of_week": [6, 0, 9, -1, "x", 3, 3]})
assert r.days_of_week == [0, 3, 6]
# ── Engine evaluation ───────────────────────────────────────────────────
def _eval_at(dt: datetime, **rule_kwargs) -> bool:
rule = SolarRule.from_dict(rule_kwargs)
with (
patch.object(automation_engine, "compute_solar_times", return_value=(6.0, 19.0)),
patch.object(automation_engine, "_now_in_tz", return_value=dt),
):
return AutomationEngine._evaluate_solar(rule)
class TestSolarEvaluation:
def test_night_window_active_after_sunset(self):
# default sunset→sunrise; 22:00 is inside the evening portion
assert _eval_at(datetime(2026, 6, 15, 22, 0)) is True
def test_night_window_active_before_sunrise(self):
# 03:00 is inside the after-midnight tail
assert _eval_at(datetime(2026, 6, 15, 3, 0)) is True
def test_night_window_inactive_at_noon(self):
assert _eval_at(datetime(2026, 6, 15, 12, 0)) is False
def test_day_window_active_at_noon(self):
assert (
_eval_at(datetime(2026, 6, 15, 12, 0), start_event="sunrise", end_event="sunset")
is True
)
def test_day_window_inactive_at_night(self):
assert (
_eval_at(datetime(2026, 6, 15, 23, 0), start_event="sunrise", end_event="sunset")
is False
)
def test_start_offset_shifts_boundary_earlier(self):
# sunset-30 = 18:30; the window opens earlier
assert _eval_at(datetime(2026, 6, 15, 18, 45), start_offset_minutes=-30) is True
assert _eval_at(datetime(2026, 6, 15, 18, 15), start_offset_minutes=-30) is False
def test_weekday_restriction_evening(self):
dt = datetime(2026, 6, 15, 22, 0) # evening portion → today's weekday
assert _eval_at(dt, days_of_week=[dt.weekday()]) is True
assert _eval_at(dt, days_of_week=[(dt.weekday() + 1) % 7]) is False
def test_weekday_restriction_overnight_tail_uses_previous_day(self):
dt = datetime(2026, 6, 15, 3, 0) # after-midnight tail → previous weekday
assert _eval_at(dt, days_of_week=[(dt.weekday() - 1) % 7]) is True
assert _eval_at(dt, days_of_week=[dt.weekday()]) is False
# ── Shared solar math ───────────────────────────────────────────────────
class TestComputeSolarTimes:
def test_midlatitude_summer_has_sane_window(self):
sunrise, sunset = compute_solar_times(50.0, 0.0, 172, 0.0) # ~solstice
assert 0.5 <= sunrise <= 11.5
assert 12.5 <= sunset <= 23.5
assert sunrise < sunset
def test_polar_winter_is_clamped_not_degenerate(self):
# High latitude, mid-winter: raw equations collapse, but the clamp
# keeps sunrise < sunset so the trigger never sees an empty window.
sunrise, sunset = compute_solar_times(80.0, 0.0, 355, 0.0)
assert sunrise <= 11.5
assert sunset >= 12.5
assert sunrise < sunset
# ── API schema mapping ──────────────────────────────────────────────────
class TestSolarSchemaMapping:
def test_schema_to_rule_and_back_preserves_fields(self):
schema = RuleSchema(
rule_type="solar",
start_event="sunrise",
start_offset_minutes=-15,
end_event="sunset",
end_offset_minutes=15,
latitude=51.5,
longitude=-0.1,
days_of_week=[5, 6],
timezone="Europe/London",
)
rule = _rule_from_schema(schema)
assert isinstance(rule, SolarRule)
assert rule.start_event == "sunrise"
assert rule.end_offset_minutes == 15
assert rule.latitude == 51.5
# Round-trip back to schema must NOT drop the solar fields.
out = _rule_to_schema(rule)
assert out.start_event == "sunrise"
assert out.end_event == "sunset"
assert out.start_offset_minutes == -15
assert out.latitude == 51.5
assert out.timezone == "Europe/London"
def test_schema_to_rule_tolerates_missing_solar_fields(self):
# A bare solar schema (all optional fields None) must still build a
# valid default night-window rule.
rule = _rule_from_schema(RuleSchema(rule_type="solar"))
assert isinstance(rule, SolarRule)
assert rule.start_event == "sunset"
assert rule.end_event == "sunrise"