feat: game integration system

Receive real-time events from games (CS2, Dota 2, LoL, etc.) and drive
LED effects through the existing color strip and value source pipelines.

Core:
- GameEventBus (thread-safe pub/sub) with standardized 23-type event vocabulary
- GameAdapter ABC + AdapterRegistry + MappingAdapter (YAML-driven)
- Built-in adapters: CS2 GSI, Dota 2 GSI, LoL Live Client, Generic Webhook
- Community YAML adapters: Minecraft, Valorant, Rocket League
- GameEventColorStripStream with 5 effects (flash/pulse/sweep/color_shift/breathing)
- GameEventValueSource with EMA smoothing and timeout
- 4 built-in effect presets (FPS Combat, MOBA Health, Racing, Generic Alert)
- Auto-setup for Valve GSI games (Steam path detection, cfg file writing)
- Demo capture engine exposed to non-demo mode

Frontend:
- Game tab in Streams tree navigation with integration cards
- Game integration editor modal with adapter picker, config fields, event mappings
- game_event source type in CSS and ValueSource editors
- Setup instructions overlay (markdown rendered)
- Live event monitor and connection test

API:
- Full CRUD for game integrations
- Event ingestion endpoint (adapter-level auth)
- Adapter metadata, presets, auto-setup, status/diagnostics endpoints
This commit is contained in:
2026-03-31 13:17:52 +03:00
parent b6713be390
commit 492bdb95e3
87 changed files with 12170 additions and 912 deletions
+135
View File
@@ -0,0 +1,135 @@
"""Tests for AdapterRegistry — register, get, duplicates, clear."""
from typing import Any
import pytest
from wled_controller.core.game_integration.adapter_registry import AdapterRegistry
from wled_controller.core.game_integration.base_adapter import GameAdapter
from wled_controller.core.game_integration.events import GameEvent
class FakeAdapter(GameAdapter):
ADAPTER_TYPE = "fake_game"
DISPLAY_NAME = "Fake Game"
GAME_NAME = "FakeGame"
SUPPORTED_EVENTS = ["health", "kill"]
@classmethod
def parse_payload(
cls,
payload: dict[str, Any],
adapter_config: dict[str, Any],
prev_state: dict[str, Any],
) -> tuple[list[GameEvent], dict[str, Any]]:
return [], prev_state
@classmethod
def validate_auth(
cls,
headers: dict[str, str],
payload: dict[str, Any],
adapter_config: dict[str, Any],
) -> bool:
return True
class AnotherAdapter(GameAdapter):
ADAPTER_TYPE = "another_game"
DISPLAY_NAME = "Another Game"
GAME_NAME = "AnotherGame"
SUPPORTED_EVENTS = ["mana"]
@classmethod
def parse_payload(
cls,
payload: dict[str, Any],
adapter_config: dict[str, Any],
prev_state: dict[str, Any],
) -> tuple[list[GameEvent], dict[str, Any]]:
return [], prev_state
@classmethod
def validate_auth(
cls,
headers: dict[str, str],
payload: dict[str, Any],
adapter_config: dict[str, Any],
) -> bool:
return True
@pytest.fixture(autouse=True)
def _clean_registry():
"""Ensure a clean registry for each test."""
AdapterRegistry.clear_registry()
yield
AdapterRegistry.clear_registry()
class TestRegister:
def test_register_and_get(self) -> None:
AdapterRegistry.register(FakeAdapter)
assert AdapterRegistry.get_adapter("fake_game") is FakeAdapter
def test_register_multiple(self) -> None:
AdapterRegistry.register(FakeAdapter)
AdapterRegistry.register(AnotherAdapter)
assert AdapterRegistry.get_adapter("fake_game") is FakeAdapter
assert AdapterRegistry.get_adapter("another_game") is AnotherAdapter
def test_register_duplicate_overwrites(self) -> None:
AdapterRegistry.register(FakeAdapter)
# Re-registering the same type should overwrite without error
AdapterRegistry.register(FakeAdapter)
assert AdapterRegistry.get_adapter("fake_game") is FakeAdapter
def test_register_non_subclass_raises(self) -> None:
with pytest.raises(ValueError, match="must be a subclass"):
AdapterRegistry.register(str) # type: ignore[arg-type]
def test_register_base_type_raises(self) -> None:
with pytest.raises(ValueError, match="reserved type"):
AdapterRegistry.register(GameAdapter) # type: ignore[arg-type]
class TestGetAdapter:
def test_get_unknown_raises(self) -> None:
with pytest.raises(ValueError, match="Unknown adapter type"):
AdapterRegistry.get_adapter("nonexistent")
def test_get_unknown_shows_available(self) -> None:
AdapterRegistry.register(FakeAdapter)
with pytest.raises(ValueError, match="fake_game"):
AdapterRegistry.get_adapter("nonexistent")
class TestGetAll:
def test_get_all_returns_copy(self) -> None:
AdapterRegistry.register(FakeAdapter)
all_adapters = AdapterRegistry.get_all_adapters()
assert "fake_game" in all_adapters
# Mutating the copy should not affect the registry
all_adapters.pop("fake_game")
assert AdapterRegistry.get_adapter("fake_game") is FakeAdapter
def test_get_available_adapters_metadata(self) -> None:
AdapterRegistry.register(FakeAdapter)
available = AdapterRegistry.get_available_adapters()
assert len(available) == 1
meta = available[0]
assert meta["adapter_type"] == "fake_game"
assert meta["display_name"] == "Fake Game"
assert meta["game_name"] == "FakeGame"
assert meta["supported_events"] == ["health", "kill"]
class TestClear:
def test_clear_removes_all(self) -> None:
AdapterRegistry.register(FakeAdapter)
AdapterRegistry.register(AnotherAdapter)
AdapterRegistry.clear_registry()
assert AdapterRegistry.get_all_adapters() == {}
+58 -60
View File
@@ -1,4 +1,4 @@
"""Tests for AutomationEngine — condition evaluation in isolation."""
"""Tests for AutomationEngine — rule evaluation in isolation."""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
@@ -7,14 +7,13 @@ import pytest
from wled_controller.core.automations.automation_engine import AutomationEngine
from wled_controller.storage.automation import (
AlwaysCondition,
ApplicationCondition,
ApplicationRule,
Automation,
DisplayStateCondition,
StartupCondition,
SystemIdleCondition,
TimeOfDayCondition,
WebhookCondition,
DisplayStateRule,
StartupRule,
SystemIdleRule,
TimeOfDayRule,
WebhookRule,
)
from wled_controller.storage.automation_store import AutomationStore
@@ -44,9 +43,7 @@ def engine(mock_store, mock_manager) -> AutomationEngine:
causes access violations in the test environment, so we replace it
with a simple MagicMock.
"""
with patch(
"wled_controller.core.automations.automation_engine.PlatformDetector"
):
with patch("wled_controller.core.automations.automation_engine.PlatformDetector"):
eng = AutomationEngine(
automation_store=mock_store,
processor_manager=mock_manager,
@@ -56,21 +53,21 @@ def engine(mock_store, mock_manager) -> AutomationEngine:
# ---------------------------------------------------------------------------
# Condition evaluation (unit-level)
# Rule evaluation (unit-level)
# ---------------------------------------------------------------------------
class TestConditionEvaluation:
"""Test _evaluate_condition for each condition type individually."""
class TestRuleEvaluation:
"""Test _evaluate_rule for each rule type individually."""
def _make_automation(self, conditions):
def _make_automation(self, rules):
now = datetime.now(timezone.utc)
return Automation(
id="test_auto",
name="Test",
enabled=True,
condition_logic="or",
conditions=conditions,
rule_logic="or",
rules=rules,
scene_preset_id=None,
deactivation_mode="none",
deactivation_scene_preset_id=None,
@@ -78,8 +75,8 @@ class TestConditionEvaluation:
updated_at=now,
)
def _eval(self, engine, condition, **kwargs):
"""Invoke the private _evaluate_condition method."""
def _eval(self, engine, rule, **kwargs):
"""Invoke the private _evaluate_rule method."""
defaults = dict(
running_procs=set(),
topmost_proc=None,
@@ -89,8 +86,8 @@ class TestConditionEvaluation:
display_state=None,
)
defaults.update(kwargs)
return engine._evaluate_condition(
condition,
return engine._evaluate_rule(
rule,
defaults["running_procs"],
defaults["topmost_proc"],
defaults["topmost_fullscreen"],
@@ -99,102 +96,103 @@ class TestConditionEvaluation:
defaults["display_state"],
)
def test_always_true(self, engine):
assert self._eval(engine, AlwaysCondition()) is True
def test_startup_true(self, engine):
assert self._eval(engine, StartupCondition()) is True
assert self._eval(engine, StartupRule()) is True
def test_application_running_match(self, engine):
cond = ApplicationCondition(apps=["chrome.exe"], match_type="running")
rule = ApplicationRule(apps=["chrome.exe"], match_type="running")
result = self._eval(
engine, cond,
engine,
rule,
running_procs={"chrome.exe", "explorer.exe"},
)
assert result is True
def test_application_running_no_match(self, engine):
cond = ApplicationCondition(apps=["chrome.exe"], match_type="running")
rule = ApplicationRule(apps=["chrome.exe"], match_type="running")
result = self._eval(
engine, cond,
engine,
rule,
running_procs={"explorer.exe"},
)
assert result is False
def test_application_topmost_match(self, engine):
cond = ApplicationCondition(apps=["game.exe"], match_type="topmost")
rule = ApplicationRule(apps=["game.exe"], match_type="topmost")
result = self._eval(
engine, cond,
engine,
rule,
topmost_proc="game.exe",
)
assert result is True
def test_application_topmost_no_match(self, engine):
cond = ApplicationCondition(apps=["game.exe"], match_type="topmost")
rule = ApplicationRule(apps=["game.exe"], match_type="topmost")
result = self._eval(
engine, cond,
engine,
rule,
topmost_proc="chrome.exe",
)
assert result is False
def test_time_of_day_within_range(self, engine):
cond = TimeOfDayCondition(start_time="00:00", end_time="23:59")
result = self._eval(engine, cond)
rule = TimeOfDayRule(start_time="00:00", end_time="23:59")
result = self._eval(engine, rule)
assert result is True
def test_system_idle_when_idle(self, engine):
cond = SystemIdleCondition(idle_minutes=5, when_idle=True)
result = self._eval(engine, cond, idle_seconds=600.0) # 10 minutes idle
rule = SystemIdleRule(idle_minutes=5, when_idle=True)
result = self._eval(engine, rule, idle_seconds=600.0) # 10 minutes idle
assert result is True
def test_system_idle_not_idle(self, engine):
cond = SystemIdleCondition(idle_minutes=5, when_idle=True)
result = self._eval(engine, cond, idle_seconds=60.0) # 1 minute idle
rule = SystemIdleRule(idle_minutes=5, when_idle=True)
result = self._eval(engine, rule, idle_seconds=60.0) # 1 minute idle
assert result is False
def test_system_idle_when_not_idle(self, engine):
"""when_idle=False means active when user is NOT idle."""
cond = SystemIdleCondition(idle_minutes=5, when_idle=False)
result = self._eval(engine, cond, idle_seconds=60.0) # 1 min idle (not yet 5)
rule = SystemIdleRule(idle_minutes=5, when_idle=False)
result = self._eval(engine, rule, idle_seconds=60.0) # 1 min idle (not yet 5)
assert result is True
def test_display_state_match(self, engine):
cond = DisplayStateCondition(state="on")
result = self._eval(engine, cond, display_state="on")
rule = DisplayStateRule(state="on")
result = self._eval(engine, rule, display_state="on")
assert result is True
def test_display_state_no_match(self, engine):
cond = DisplayStateCondition(state="off")
result = self._eval(engine, cond, display_state="on")
rule = DisplayStateRule(state="off")
result = self._eval(engine, rule, display_state="on")
assert result is False
def test_webhook_active(self, engine):
cond = WebhookCondition(token="tok123")
rule = WebhookRule(token="tok123")
engine._webhook_states["tok123"] = True
result = self._eval(engine, cond)
result = self._eval(engine, rule)
assert result is True
def test_webhook_inactive(self, engine):
cond = WebhookCondition(token="tok123")
rule = WebhookRule(token="tok123")
# Not in _webhook_states → False
result = self._eval(engine, cond)
result = self._eval(engine, rule)
assert result is False
# ---------------------------------------------------------------------------
# Condition logic (AND / OR)
# Rule logic (AND / OR)
# ---------------------------------------------------------------------------
class TestConditionLogic:
def _make_automation(self, conditions, logic="or"):
class TestRuleLogic:
def _make_automation(self, rules, logic="or"):
now = datetime.now(timezone.utc)
return Automation(
id="logic_auto",
name="Logic",
enabled=True,
condition_logic=logic,
conditions=conditions,
rule_logic=logic,
rules=rules,
scene_preset_id=None,
deactivation_mode="none",
deactivation_scene_preset_id=None,
@@ -205,12 +203,12 @@ class TestConditionLogic:
def test_or_any_true(self, engine):
auto = self._make_automation(
[
ApplicationCondition(apps=["missing.exe"], match_type="running"),
AlwaysCondition(),
ApplicationRule(apps=["missing.exe"], match_type="running"),
StartupRule(),
],
logic="or",
)
result = engine._evaluate_conditions(
result = engine._evaluate_rules(
auto,
running_procs=set(),
topmost_proc=None,
@@ -224,12 +222,12 @@ class TestConditionLogic:
def test_and_all_must_be_true(self, engine):
auto = self._make_automation(
[
AlwaysCondition(),
ApplicationCondition(apps=["missing.exe"], match_type="running"),
StartupRule(),
ApplicationRule(apps=["missing.exe"], match_type="running"),
],
logic="and",
)
result = engine._evaluate_conditions(
result = engine._evaluate_rules(
auto,
running_procs=set(),
topmost_proc=None,
@@ -0,0 +1,282 @@
"""Tests for community adapter YAML loader."""
import textwrap
from pathlib import Path
import pytest
from wled_controller.core.game_integration.community_loader import (
clear_community_adapters,
get_community_adapter,
get_community_adapter_info,
get_community_adapters,
load_community_adapters,
register_community_adapters,
)
@pytest.fixture(autouse=True)
def _clean_registry() -> None:
"""Clear community adapters before and after each test."""
clear_community_adapters()
yield # type: ignore[misc]
clear_community_adapters()
def _write_yaml(directory: Path, name: str, content: str) -> Path:
"""Write a YAML file to a directory."""
path = directory / name
path.write_text(textwrap.dedent(content))
return path
class TestLoadCommunityAdapters:
def test_load_from_directory(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"test_game.yaml",
"""\
name: test_game
game: Test Game
protocol: webhook
mappings:
- source_path: player.health
event: health
min: 0
max: 100
""",
)
adapters = load_community_adapters(tmp_path)
assert "community_test_game" in adapters
assert adapters["community_test_game"].game == "Test Game"
def test_load_multiple_files(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"game_a.yaml",
"""\
name: game_a
game: Game A
protocol: webhook
mappings:
- source_path: hp
event: health
""",
)
_write_yaml(
tmp_path,
"game_b.yaml",
"""\
name: game_b
game: Game B
protocol: webhook
mappings:
- source_path: mp
event: mana
""",
)
adapters = load_community_adapters(tmp_path)
assert len(adapters) == 2
assert "community_game_a" in adapters
assert "community_game_b" in adapters
def test_load_nonexistent_directory(self, tmp_path: Path) -> None:
nonexistent = tmp_path / "does_not_exist"
adapters = load_community_adapters(nonexistent)
assert adapters == {}
def test_skip_invalid_yaml(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"valid.yaml",
"""\
name: valid
game: Valid Game
protocol: webhook
mappings:
- source_path: hp
event: health
""",
)
_write_yaml(
tmp_path,
"invalid.yaml",
"""\
name: invalid
protocol: webhook
mappings: []
""",
)
adapters = load_community_adapters(tmp_path)
assert len(adapters) == 1
assert "community_valid" in adapters
def test_load_yml_extension(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"game.yml",
"""\
name: yml_game
game: YML Game
protocol: webhook
mappings:
- source_path: hp
event: health
""",
)
adapters = load_community_adapters(tmp_path)
assert "community_game" in adapters
def test_empty_directory(self, tmp_path: Path) -> None:
adapters = load_community_adapters(tmp_path)
assert adapters == {}
class TestRegisterCommunityAdapters:
def test_register_and_retrieve(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"test.yaml",
"""\
name: test
game: Test
protocol: webhook
mappings:
- source_path: hp
event: health
""",
)
count = register_community_adapters(tmp_path)
assert count == 1
adapters = get_community_adapters()
assert "community_test" in adapters
def test_get_single_adapter(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"test.yaml",
"""\
name: test
game: Test
protocol: webhook
mappings:
- source_path: hp
event: health
""",
)
register_community_adapters(tmp_path)
adapter = get_community_adapter("community_test")
assert adapter is not None
assert adapter.name == "test"
def test_get_nonexistent_adapter(self) -> None:
adapter = get_community_adapter("community_nonexistent")
assert adapter is None
def test_get_adapter_info(self, tmp_path: Path) -> None:
_write_yaml(
tmp_path,
"my_game.yaml",
"""\
name: My Game Adapter
game: My Game
protocol: webhook
mappings:
- source_path: hp
event: health
- source_path: kills
event: kill
""",
)
register_community_adapters(tmp_path)
info = get_community_adapter_info()
assert len(info) == 1
assert info[0]["adapter_type"] == "community_my_game"
assert info[0]["display_name"] == "My Game Adapter"
assert info[0]["game_name"] == "My Game"
assert info[0]["source"] == "community"
assert "health" in info[0]["supported_events"]
assert "kill" in info[0]["supported_events"]
class TestBuiltInYamlFiles:
"""Test that the shipped YAML adapter files load correctly."""
def test_load_bundled_adapters(self) -> None:
"""Load the built-in game_adapters directory."""
bundled_dir = (
Path(__file__).parent.parent.parent
/ "src"
/ "wled_controller"
/ "data"
/ "game_adapters"
)
if not bundled_dir.exists():
pytest.skip("Bundled adapter directory not found")
adapters = load_community_adapters(bundled_dir)
# We ship minecraft, valorant, rocket_league
assert len(adapters) >= 3
assert "community_minecraft" in adapters
assert "community_valorant" in adapters
assert "community_rocket_league" in adapters
def test_minecraft_adapter_parses(self) -> None:
"""Test that the Minecraft community adapter can parse a payload."""
bundled_dir = (
Path(__file__).parent.parent.parent
/ "src"
/ "wled_controller"
/ "data"
/ "game_adapters"
)
adapters = load_community_adapters(bundled_dir)
mc = adapters.get("community_minecraft")
if mc is None:
pytest.skip("Minecraft adapter not found")
payload = {
"player": {
"health": 15,
"armor": 10,
"food_level": 18,
"experience_level": 30,
},
"stats": {"kills": 5},
}
events, _ = mc.parse_payload(payload, {"adapter_id": "mc_test"}, {})
types = {e.event_type for e in events}
assert "health" in types
assert "armor" in types
def test_rocket_league_adapter_parses(self) -> None:
"""Test that the Rocket League community adapter can parse a payload."""
bundled_dir = (
Path(__file__).parent.parent.parent
/ "src"
/ "wled_controller"
/ "data"
/ "game_adapters"
)
adapters = load_community_adapters(bundled_dir)
rl = adapters.get("community_rocket_league")
if rl is None:
pytest.skip("Rocket League adapter not found")
payload = {
"player": {"boost": 50, "speed": 1150},
"match": {"goals_scored": 2, "goals_conceded": 1, "time_remaining": 150},
"team": {"score_blue": 2, "score_orange": 1},
}
events, _ = rl.parse_payload(payload, {"adapter_id": "rl_test"}, {})
types = {e.event_type for e in events}
assert "energy" in types
assert "speed" in types
+279
View File
@@ -0,0 +1,279 @@
"""Tests for CS2 Game State Integration adapter."""
import pytest
from wled_controller.core.game_integration.adapters.cs2_adapter import CS2Adapter
# ── Realistic CS2 GSI payload samples ────────────────────────────────────
def _make_cs2_payload(
*,
health: int = 100,
armor: int = 100,
money: int = 800,
kills: int = 0,
deaths: int = 0,
round_phase: str | None = None,
bomb: str | None = None,
flashed: int = 0,
team: str = "CT",
ammo_clip: int | None = None,
ammo_clip_max: int | None = None,
auth_token: str | None = None,
) -> dict:
"""Build a realistic CS2 GSI payload."""
payload: dict = {
"player": {
"steamid": "76561198012345678",
"name": "TestPlayer",
"team": team,
"state": {
"health": health,
"armor": armor,
"helmet": True,
"flashed": flashed,
"smoked": 0,
"burning": 0,
"money": money,
"round_kills": 0,
"round_killhs": 0,
"equip_value": 4400,
},
"match_stats": {
"kills": kills,
"assists": 0,
"deaths": deaths,
"mvps": 0,
"score": kills * 2,
},
"weapons": {},
},
}
if ammo_clip is not None and ammo_clip_max is not None:
payload["player"]["weapons"]["weapon_0"] = {
"name": "weapon_ak47",
"paintkit": "default",
"type": "Rifle",
"ammo_clip": ammo_clip,
"ammo_clip_max": ammo_clip_max,
"ammo_reserve": 90,
"state": "active",
}
if round_phase is not None:
payload["round"] = {"phase": round_phase}
if bomb is not None:
payload.setdefault("round", {})["bomb"] = bomb
if auth_token is not None:
payload["auth"] = {"token": auth_token}
return payload
# ── Health/Armor/Money tests ─────────────────────────────────────────────
class TestCS2ContinuousEvents:
def test_health_normalized(self) -> None:
payload = _make_cs2_payload(health=75)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "cs2_test"}, {})
health_events = [e for e in events if e.event_type == "health"]
assert len(health_events) == 1
assert health_events[0].value == pytest.approx(0.75)
def test_health_zero(self) -> None:
payload = _make_cs2_payload(health=0)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "cs2_test"}, {})
health_events = [e for e in events if e.event_type == "health"]
assert health_events[0].value == 0.0
def test_armor_normalized(self) -> None:
payload = _make_cs2_payload(armor=50)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "cs2_test"}, {})
armor_events = [e for e in events if e.event_type == "armor"]
assert len(armor_events) == 1
assert armor_events[0].value == pytest.approx(0.5)
def test_money_normalized(self) -> None:
payload = _make_cs2_payload(money=8000)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "cs2_test"}, {})
gold_events = [e for e in events if e.event_type == "gold"]
assert len(gold_events) == 1
assert gold_events[0].value == pytest.approx(8000.0 / 16000.0)
def test_ammo_normalized(self) -> None:
payload = _make_cs2_payload(ammo_clip=15, ammo_clip_max=30)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "cs2_test"}, {})
ammo_events = [e for e in events if e.event_type == "ammo"]
assert len(ammo_events) == 1
assert ammo_events[0].value == pytest.approx(0.5)
def test_adapter_id_passed_through(self) -> None:
payload = _make_cs2_payload(health=100)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "my_cs2"}, {})
assert all(e.adapter_id == "my_cs2" for e in events)
# ── Kill/Death diff detection tests ──────────────────────────────────────
class TestCS2DiffDetection:
def test_kill_detected_on_increase(self) -> None:
payload1 = _make_cs2_payload(kills=3)
_, state1 = CS2Adapter.parse_payload(payload1, {"adapter_id": "t"}, {})
payload2 = _make_cs2_payload(kills=4)
events, _ = CS2Adapter.parse_payload(payload2, {"adapter_id": "t"}, state1)
kill_events = [e for e in events if e.event_type == "kill"]
assert len(kill_events) == 1
def test_no_kill_on_first_payload(self) -> None:
payload = _make_cs2_payload(kills=5)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, {})
kill_events = [e for e in events if e.event_type == "kill"]
assert len(kill_events) == 0
def test_multiple_kills_detected(self) -> None:
payload1 = _make_cs2_payload(kills=2)
_, state1 = CS2Adapter.parse_payload(payload1, {"adapter_id": "t"}, {})
payload2 = _make_cs2_payload(kills=5)
events, _ = CS2Adapter.parse_payload(payload2, {"adapter_id": "t"}, state1)
kill_events = [e for e in events if e.event_type == "kill"]
assert len(kill_events) == 3
def test_death_detected(self) -> None:
payload1 = _make_cs2_payload(deaths=0)
_, state1 = CS2Adapter.parse_payload(payload1, {"adapter_id": "t"}, {})
payload2 = _make_cs2_payload(deaths=1)
events, _ = CS2Adapter.parse_payload(payload2, {"adapter_id": "t"}, state1)
death_events = [e for e in events if e.event_type == "death"]
assert len(death_events) == 1
def test_no_death_on_same_count(self) -> None:
payload1 = _make_cs2_payload(deaths=2)
_, state1 = CS2Adapter.parse_payload(payload1, {"adapter_id": "t"}, {})
payload2 = _make_cs2_payload(deaths=2)
events, _ = CS2Adapter.parse_payload(payload2, {"adapter_id": "t"}, state1)
death_events = [e for e in events if e.event_type == "death"]
assert len(death_events) == 0
# ── Round/Bomb trigger tests ────────────────────────────────────────────
class TestCS2Triggers:
def test_round_start(self) -> None:
payload = _make_cs2_payload(round_phase="live")
events, state = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, {})
rs = [e for e in events if e.event_type == "round_start"]
assert len(rs) == 1
assert state["round_phase"] == "live"
def test_round_end(self) -> None:
prev = {"round_phase": "live"}
payload = _make_cs2_payload(round_phase="over")
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, prev)
re = [e for e in events if e.event_type == "round_end"]
assert len(re) == 1
def test_no_round_event_on_same_phase(self) -> None:
prev = {"round_phase": "live"}
payload = _make_cs2_payload(round_phase="live")
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, prev)
round_events = [e for e in events if e.event_type in ("round_start", "round_end")]
assert len(round_events) == 0
def test_bomb_planted(self) -> None:
payload = _make_cs2_payload(round_phase="live", bomb="planted")
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, {})
bomb = [e for e in events if e.event_type == "objective_captured"]
assert len(bomb) == 1
def test_bomb_defused(self) -> None:
prev = {"bomb": "planted"}
payload = _make_cs2_payload(round_phase="live", bomb="defused")
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, prev)
defuse = [e for e in events if e.event_type == "objective_lost"]
assert len(defuse) == 1
def test_flashbang_detected(self) -> None:
payload = _make_cs2_payload(flashed=200)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, {"flashed": 0})
flash = [e for e in events if e.event_type == "blinded"]
assert len(flash) == 1
assert flash[0].value == pytest.approx(200.0 / 255.0)
def test_no_flashbang_when_already_flashed(self) -> None:
prev = {"flashed": 200}
payload = _make_cs2_payload(flashed=150)
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, prev)
flash = [e for e in events if e.event_type == "blinded"]
# Already flashed (prev > 0), so no new flash event
assert len(flash) == 0
def test_team_ct(self) -> None:
payload = _make_cs2_payload(team="CT")
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, {})
team = [e for e in events if e.event_type == "team_a"]
assert len(team) == 1
def test_team_t(self) -> None:
payload = _make_cs2_payload(team="T")
events, _ = CS2Adapter.parse_payload(payload, {"adapter_id": "t"}, {})
team = [e for e in events if e.event_type == "team_b"]
assert len(team) == 1
# ── Auth validation tests ───────────────────────────────────────────────
class TestCS2Auth:
def test_auth_valid(self) -> None:
payload = _make_cs2_payload(auth_token="mysecret")
result = CS2Adapter.validate_auth({}, payload, {"auth_token": "mysecret"})
assert result is True
def test_auth_invalid(self) -> None:
payload = _make_cs2_payload(auth_token="wrong")
result = CS2Adapter.validate_auth({}, payload, {"auth_token": "mysecret"})
assert result is False
def test_auth_no_token_configured_accepts_all(self) -> None:
payload = _make_cs2_payload()
result = CS2Adapter.validate_auth({}, payload, {})
assert result is True
def test_auth_missing_payload_token(self) -> None:
payload = _make_cs2_payload() # no auth in payload
result = CS2Adapter.validate_auth({}, payload, {"auth_token": "mysecret"})
assert result is False
# ── Metadata tests ──────────────────────────────────────────────────────
class TestCS2Metadata:
def test_adapter_type(self) -> None:
assert CS2Adapter.ADAPTER_TYPE == "cs2"
def test_supported_events(self) -> None:
assert "health" in CS2Adapter.SUPPORTED_EVENTS
assert "kill" in CS2Adapter.SUPPORTED_EVENTS
assert "death" in CS2Adapter.SUPPORTED_EVENTS
def test_config_schema(self) -> None:
schema = CS2Adapter.get_config_schema()
assert "auth_token" in schema["properties"]
def test_setup_instructions(self) -> None:
instructions = CS2Adapter.get_setup_instructions()
assert "gamestate_integration" in instructions
assert "CS2" in instructions
+204
View File
@@ -0,0 +1,204 @@
"""Tests for Dota 2 Game State Integration adapter."""
import pytest
from wled_controller.core.game_integration.adapters.dota2_adapter import Dota2Adapter
def _make_dota2_payload(
*,
health: int = 1000,
max_health: int = 1000,
mana: float = 500.0,
max_mana: float = 500.0,
gold: int = 625,
kills: int = 0,
deaths: int = 0,
game_state: str | None = None,
auth_token: str | None = None,
) -> dict:
"""Build a realistic Dota 2 GSI payload."""
payload: dict = {
"hero": {
"xpos": -6624,
"ypos": -6592,
"id": 1,
"name": "npc_dota_hero_antimage",
"level": 12,
"alive": True,
"respawn_seconds": 0,
"buyback_cost": 874,
"buyback_cooldown": 0,
"health": health,
"max_health": max_health,
"health_percent": int(100.0 * health / max_health) if max_health else 0,
"mana": mana,
"max_mana": max_mana,
"mana_percent": int(100.0 * mana / max_mana) if max_mana else 0,
},
"player": {
"steamid": "76561198012345678",
"name": "TestPlayer",
"activity": "playing",
"kills": kills,
"deaths": deaths,
"assists": 0,
"last_hits": 120,
"denies": 10,
"kill_streak": 0,
"gold": gold,
"gold_reliable": 200,
"gold_unreliable": gold - 200,
"gpm": 450,
"xpm": 520,
},
}
if game_state is not None:
payload["map"] = {
"name": "start",
"matchid": "12345",
"game_time": 1200,
"clock_time": 1200,
"daytime": True,
"game_state": game_state,
"win_team": "none",
}
if auth_token is not None:
payload["auth"] = {"token": auth_token}
return payload
class TestDota2ContinuousEvents:
def test_health_normalized(self) -> None:
payload = _make_dota2_payload(health=750, max_health=1500)
events, _ = Dota2Adapter.parse_payload(payload, {"adapter_id": "d2"}, {})
hp = [e for e in events if e.event_type == "health"]
assert len(hp) == 1
assert hp[0].value == pytest.approx(0.5)
def test_mana_normalized(self) -> None:
payload = _make_dota2_payload(mana=300.0, max_mana=600.0)
events, _ = Dota2Adapter.parse_payload(payload, {"adapter_id": "d2"}, {})
mp = [e for e in events if e.event_type == "mana"]
assert len(mp) == 1
assert mp[0].value == pytest.approx(0.5)
def test_gold_normalized(self) -> None:
payload = _make_dota2_payload(gold=5000)
events, _ = Dota2Adapter.parse_payload(payload, {"adapter_id": "d2"}, {})
g = [e for e in events if e.event_type == "gold"]
assert len(g) == 1
assert g[0].value == pytest.approx(5000.0 / 99999.0)
def test_gold_custom_max(self) -> None:
payload = _make_dota2_payload(gold=5000)
events, _ = Dota2Adapter.parse_payload(
payload,
{"adapter_id": "d2", "max_gold": 10000},
{},
)
g = [e for e in events if e.event_type == "gold"]
assert g[0].value == pytest.approx(0.5)
class TestDota2DiffDetection:
def test_kill_detected(self) -> None:
p1 = _make_dota2_payload(kills=3)
_, s1 = Dota2Adapter.parse_payload(p1, {"adapter_id": "d2"}, {})
p2 = _make_dota2_payload(kills=4)
events, _ = Dota2Adapter.parse_payload(p2, {"adapter_id": "d2"}, s1)
kills = [e for e in events if e.event_type == "kill"]
assert len(kills) == 1
def test_no_kill_on_first_payload(self) -> None:
p = _make_dota2_payload(kills=5)
events, _ = Dota2Adapter.parse_payload(p, {"adapter_id": "d2"}, {})
kills = [e for e in events if e.event_type == "kill"]
assert len(kills) == 0
def test_death_detected(self) -> None:
p1 = _make_dota2_payload(deaths=1)
_, s1 = Dota2Adapter.parse_payload(p1, {"adapter_id": "d2"}, {})
p2 = _make_dota2_payload(deaths=2)
events, _ = Dota2Adapter.parse_payload(p2, {"adapter_id": "d2"}, s1)
deaths = [e for e in events if e.event_type == "death"]
assert len(deaths) == 1
def test_multiple_kills(self) -> None:
p1 = _make_dota2_payload(kills=0)
_, s1 = Dota2Adapter.parse_payload(p1, {"adapter_id": "d2"}, {})
p2 = _make_dota2_payload(kills=3)
events, _ = Dota2Adapter.parse_payload(p2, {"adapter_id": "d2"}, s1)
kills = [e for e in events if e.event_type == "kill"]
assert len(kills) == 3
class TestDota2MatchFlow:
def test_match_start_pre_game(self) -> None:
p = _make_dota2_payload(game_state="DOTA_GAMERULES_STATE_PRE_GAME")
events, _ = Dota2Adapter.parse_payload(p, {"adapter_id": "d2"}, {})
ms = [e for e in events if e.event_type == "match_start"]
assert len(ms) == 1
def test_match_start_in_progress(self) -> None:
prev = {"game_state": "DOTA_GAMERULES_STATE_PRE_GAME"}
p = _make_dota2_payload(game_state="DOTA_GAMERULES_STATE_GAME_IN_PROGRESS")
events, _ = Dota2Adapter.parse_payload(p, {"adapter_id": "d2"}, prev)
ms = [e for e in events if e.event_type == "match_start"]
assert len(ms) == 1
def test_match_end(self) -> None:
prev = {"game_state": "DOTA_GAMERULES_STATE_GAME_IN_PROGRESS"}
p = _make_dota2_payload(game_state="DOTA_GAMERULES_STATE_POST_GAME")
events, _ = Dota2Adapter.parse_payload(p, {"adapter_id": "d2"}, prev)
me = [e for e in events if e.event_type == "match_end"]
assert len(me) == 1
def test_no_event_on_same_state(self) -> None:
prev = {"game_state": "DOTA_GAMERULES_STATE_GAME_IN_PROGRESS"}
p = _make_dota2_payload(game_state="DOTA_GAMERULES_STATE_GAME_IN_PROGRESS")
events, _ = Dota2Adapter.parse_payload(p, {"adapter_id": "d2"}, prev)
flow = [e for e in events if e.event_type in ("match_start", "match_end")]
assert len(flow) == 0
class TestDota2Auth:
def test_auth_valid(self) -> None:
p = _make_dota2_payload(auth_token="secret")
result = Dota2Adapter.validate_auth({}, p, {"auth_token": "secret"})
assert result is True
def test_auth_invalid(self) -> None:
p = _make_dota2_payload(auth_token="wrong")
result = Dota2Adapter.validate_auth({}, p, {"auth_token": "secret"})
assert result is False
def test_auth_no_config_accepts_all(self) -> None:
p = _make_dota2_payload()
result = Dota2Adapter.validate_auth({}, p, {})
assert result is True
class TestDota2Metadata:
def test_adapter_type(self) -> None:
assert Dota2Adapter.ADAPTER_TYPE == "dota2"
def test_supported_events(self) -> None:
assert "health" in Dota2Adapter.SUPPORTED_EVENTS
assert "mana" in Dota2Adapter.SUPPORTED_EVENTS
assert "gold" in Dota2Adapter.SUPPORTED_EVENTS
def test_config_schema(self) -> None:
schema = Dota2Adapter.get_config_schema()
assert "auth_token" in schema["properties"]
assert "max_gold" in schema["properties"]
def test_setup_instructions(self) -> None:
instructions = Dota2Adapter.get_setup_instructions()
assert "Dota 2" in instructions
+239
View File
@@ -0,0 +1,239 @@
"""Tests for GameEventBus — publish, subscribe, unsubscribe, thread safety."""
import threading
from wled_controller.core.game_integration.event_bus import GameEventBus
from wled_controller.core.game_integration.events import GameEvent
def _make_event(event_type: str = "health", value: float = 0.5) -> GameEvent:
return GameEvent(
adapter_id="test_adapter",
event_type=event_type,
value=value,
raw_data={"test": True},
)
class TestPublishSubscribe:
"""Basic publish/subscribe behavior."""
def test_subscribe_receives_matching_events(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
bus.subscribe("health", received.append)
event = _make_event("health", 0.8)
bus.publish(event)
assert len(received) == 1
assert received[0] is event
def test_subscribe_ignores_non_matching_events(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
bus.subscribe("health", received.append)
bus.publish(_make_event("kill"))
assert len(received) == 0
def test_subscribe_all_receives_every_event(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
bus.subscribe_all(received.append)
bus.publish(_make_event("health"))
bus.publish(_make_event("kill"))
bus.publish(_make_event("death"))
assert len(received) == 3
def test_multiple_subscribers_same_type(self) -> None:
bus = GameEventBus()
received_a: list[GameEvent] = []
received_b: list[GameEvent] = []
bus.subscribe("kill", received_a.append)
bus.subscribe("kill", received_b.append)
bus.publish(_make_event("kill"))
assert len(received_a) == 1
assert len(received_b) == 1
def test_type_and_wildcard_both_receive(self) -> None:
bus = GameEventBus()
type_received: list[GameEvent] = []
wild_received: list[GameEvent] = []
bus.subscribe("health", type_received.append)
bus.subscribe_all(wild_received.append)
bus.publish(_make_event("health"))
assert len(type_received) == 1
assert len(wild_received) == 1
class TestUnsubscribe:
"""Unsubscribe behavior."""
def test_unsubscribe_stops_delivery(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
sub_id = bus.subscribe("health", received.append)
bus.publish(_make_event("health"))
assert len(received) == 1
result = bus.unsubscribe(sub_id)
assert result is True
bus.publish(_make_event("health"))
assert len(received) == 1 # No new events
def test_unsubscribe_wildcard(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
sub_id = bus.subscribe_all(received.append)
bus.publish(_make_event("health"))
assert len(received) == 1
bus.unsubscribe(sub_id)
bus.publish(_make_event("health"))
assert len(received) == 1
def test_unsubscribe_unknown_id_returns_false(self) -> None:
bus = GameEventBus()
assert bus.unsubscribe("nonexistent_id") is False
class TestRecentEvents:
"""Recent events deque."""
def test_get_recent_events_returns_published(self) -> None:
bus = GameEventBus()
bus.publish(_make_event("health", 0.5))
bus.publish(_make_event("kill", 1.0))
recent = bus.get_recent_events()
assert len(recent) == 2
assert recent[0].event_type == "health"
assert recent[1].event_type == "kill"
def test_get_recent_events_respects_limit(self) -> None:
bus = GameEventBus()
for i in range(10):
bus.publish(_make_event("health", i / 10.0))
recent = bus.get_recent_events(limit=3)
assert len(recent) == 3
def test_recent_events_bounded_by_maxlen(self) -> None:
bus = GameEventBus(recent_maxlen=5)
for i in range(10):
bus.publish(_make_event("health", i / 10.0))
recent = bus.get_recent_events(limit=100)
assert len(recent) == 5
class TestStats:
"""Event statistics."""
def test_stats_counts_per_type(self) -> None:
bus = GameEventBus()
bus.publish(_make_event("health"))
bus.publish(_make_event("health"))
bus.publish(_make_event("kill"))
stats = bus.get_stats()
assert stats["event_counts"]["health"] == 2
assert stats["event_counts"]["kill"] == 1
def test_stats_last_timestamp(self) -> None:
bus = GameEventBus()
assert bus.get_stats()["last_event_timestamp"] is None
event = _make_event("health")
bus.publish(event)
stats = bus.get_stats()
assert stats["last_event_timestamp"] == event.timestamp
class TestThreadSafety:
"""Concurrent publish/subscribe must not crash."""
def test_concurrent_publish(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
lock = threading.Lock()
def safe_append(event: GameEvent) -> None:
with lock:
received.append(event)
bus.subscribe_all(safe_append)
num_threads = 10
events_per_thread = 50
threads = []
def publisher(thread_id: int) -> None:
for i in range(events_per_thread):
bus.publish(_make_event("health", i / events_per_thread))
for t_id in range(num_threads):
t = threading.Thread(target=publisher, args=(t_id,))
threads.append(t)
t.start()
for t in threads:
t.join(timeout=10)
assert len(received) == num_threads * events_per_thread
def test_concurrent_subscribe_unsubscribe(self) -> None:
"""Subscribe and unsubscribe from multiple threads simultaneously."""
bus = GameEventBus()
sub_ids: list[str] = []
lock = threading.Lock()
def subscriber() -> None:
sid = bus.subscribe("health", lambda e: None)
with lock:
sub_ids.append(sid)
threads = [threading.Thread(target=subscriber) for _ in range(20)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
assert len(sub_ids) == 20
# Unsubscribe all
for sid in sub_ids:
assert bus.unsubscribe(sid) is True
class TestCallbackError:
"""Callback errors must not crash the bus."""
def test_error_in_callback_does_not_block_others(self) -> None:
bus = GameEventBus()
received: list[GameEvent] = []
def bad_callback(event: GameEvent) -> None:
raise RuntimeError("boom")
bus.subscribe("health", bad_callback)
bus.subscribe("health", received.append)
# Should not raise
bus.publish(_make_event("health"))
# Second subscriber still receives the event
assert len(received) == 1
+492
View File
@@ -0,0 +1,492 @@
"""Tests for GameEventColorStripSource and GameEventColorStripStream."""
import time
from datetime import datetime, timezone
import numpy as np
from wled_controller.core.game_integration.event_bus import GameEventBus
from wled_controller.core.game_integration.events import GameEvent
from wled_controller.core.processing.game_event_stream import GameEventColorStripStream
from wled_controller.storage.bindable import BindableColor
from wled_controller.storage.color_strip_source import (
ColorStripSource,
GameEventColorStripSource,
)
# ── Helpers ──────────────────────────────────────────────────────────
def _make_source(**overrides) -> GameEventColorStripSource:
"""Create a GameEventColorStripSource with sensible defaults."""
defaults = dict(
id="css_test01",
name="Test Game Event Source",
source_type="game_event",
created_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
game_integration_id="gi_abc12345",
idle_color=BindableColor([0, 0, 0]),
event_mappings=[
{
"event_type": "kill",
"effect": "flash",
"color": [255, 0, 0],
"duration_ms": 500,
"intensity": 1.0,
"priority": 1,
},
{
"event_type": "death",
"effect": "pulse",
"color": [0, 0, 255],
"duration_ms": 800,
"intensity": 0.8,
"priority": 2,
},
],
led_count=10,
)
defaults.update(overrides)
return GameEventColorStripSource(**defaults)
def _make_event(event_type: str = "kill", value: float = 1.0) -> GameEvent:
return GameEvent(
adapter_id="test_adapter",
event_type=event_type,
value=value,
)
# ── GameEventColorStripSource serialization tests ────────────────────
class TestGameEventColorStripSourceSerialization:
def test_to_dict_roundtrip(self):
source = _make_source()
d = source.to_dict()
assert d["source_type"] == "game_event"
assert d["game_integration_id"] == "gi_abc12345"
assert d["idle_color"] == [0, 0, 0]
assert len(d["event_mappings"]) == 2
assert d["led_count"] == 10
restored = GameEventColorStripSource.from_dict(d)
assert restored.id == source.id
assert restored.name == source.name
assert restored.source_type == "game_event"
assert restored.game_integration_id == "gi_abc12345"
assert restored.idle_color.color == [0, 0, 0]
assert len(restored.event_mappings) == 2
assert restored.led_count == 10
def test_from_dict_defaults(self):
data = {
"id": "css_min01",
"name": "Minimal",
"source_type": "game_event",
"created_at": "2026-01-01T00:00:00+00:00",
"updated_at": "2026-01-01T00:00:00+00:00",
}
source = GameEventColorStripSource.from_dict(data)
assert source.game_integration_id == ""
assert source.idle_color.color == [0, 0, 0]
assert source.event_mappings == []
assert source.led_count == 0
def test_factory_dispatch(self):
"""ColorStripSource.from_dict dispatches to GameEventColorStripSource."""
data = {
"id": "css_fac01",
"name": "Factory Test",
"source_type": "game_event",
"created_at": "2026-01-01T00:00:00+00:00",
"updated_at": "2026-01-01T00:00:00+00:00",
"game_integration_id": "gi_xyz",
}
source = ColorStripSource.from_dict(data)
assert isinstance(source, GameEventColorStripSource)
assert source.game_integration_id == "gi_xyz"
def test_create_from_kwargs(self):
source = GameEventColorStripSource.create_from_kwargs(
id="css_kw01",
name="KW Test",
source_type="game_event",
created_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
game_integration_id="gi_123",
idle_color=[10, 20, 30],
event_mappings=[{"event_type": "kill", "effect": "flash"}],
led_count=20,
)
assert source.game_integration_id == "gi_123"
assert source.idle_color.color == [10, 20, 30]
assert len(source.event_mappings) == 1
assert source.led_count == 20
def test_create_instance_factory(self):
source = ColorStripSource.create_instance(
source_type="game_event",
id="css_ci01",
name="CI Test",
created_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
game_integration_id="gi_ci",
)
assert isinstance(source, GameEventColorStripSource)
def test_sharable_is_false(self):
source = _make_source()
assert source.sharable is False
class TestGameEventColorStripSourceUpdate:
def test_apply_update_game_integration_id(self):
source = _make_source()
source.apply_update(game_integration_id="gi_new")
assert source.game_integration_id == "gi_new"
def test_apply_update_idle_color(self):
source = _make_source()
source.apply_update(idle_color=[100, 100, 100])
assert source.idle_color.color == [100, 100, 100]
def test_apply_update_event_mappings(self):
source = _make_source()
new_mappings = [{"event_type": "health", "effect": "breathing"}]
source.apply_update(event_mappings=new_mappings)
assert len(source.event_mappings) == 1
assert source.event_mappings[0]["event_type"] == "health"
def test_apply_update_led_count(self):
source = _make_source()
source.apply_update(led_count=50)
assert source.led_count == 50
def test_apply_update_ignores_none(self):
source = _make_source()
original_id = source.game_integration_id
source.apply_update(game_integration_id=None)
assert source.game_integration_id == original_id
# ── GameEventColorStripStream tests ──────────────────────────────────
class TestGameEventColorStripStreamLifecycle:
def test_start_stop(self):
source = _make_source()
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
assert stream._running is True
assert stream._thread is not None
stream.stop()
assert stream._running is False
assert stream._thread is None
# Subscriptions should be cleaned up
assert len(stream._subscription_ids) == 0
def test_double_start_is_safe(self):
source = _make_source()
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
thread1 = stream._thread
stream.start() # Should not create a second thread
assert stream._thread is thread1
stream.stop()
def test_subscriptions_registered(self):
source = _make_source()
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
# Should have subscribed to "kill" and "death"
assert len(stream._subscription_ids) == 2
stream.stop()
def test_no_event_bus_still_starts(self):
"""Stream should start even without an EventBus (just no events)."""
source = _make_source()
stream = GameEventColorStripStream(source, event_bus=None)
stream.start()
assert stream._running is True
stream.stop()
class TestGameEventColorStripStreamRendering:
def test_idle_outputs_idle_color(self):
source = _make_source(idle_color=BindableColor([50, 100, 150]))
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
# Wait for at least one render frame
time.sleep(0.1)
colors = stream.get_latest_colors()
assert colors is not None
assert colors.shape == (10, 3)
# Should be idle color
assert colors[0, 0] == 50
assert colors[0, 1] == 100
assert colors[0, 2] == 150
stream.stop()
def test_event_triggers_effect(self):
source = _make_source(
idle_color=BindableColor([0, 0, 0]),
event_mappings=[
{
"event_type": "kill",
"effect": "flash",
"color": [255, 0, 0],
"duration_ms": 2000,
"intensity": 1.0,
"priority": 1,
}
],
)
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
time.sleep(0.05)
# Fire a kill event
bus.publish(_make_event("kill"))
time.sleep(0.1)
colors = stream.get_latest_colors()
assert colors is not None
# Should have non-zero red (flash effect in progress)
assert colors[0, 0] > 0
stream.stop()
def test_unmatched_event_ignored(self):
source = _make_source(
idle_color=BindableColor([10, 10, 10]),
event_mappings=[
{
"event_type": "kill",
"effect": "flash",
"color": [255, 0, 0],
"duration_ms": 500,
"intensity": 1.0,
"priority": 1,
}
],
)
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
time.sleep(0.05)
# Fire an event type not in mappings
bus.publish(_make_event("health", value=0.5))
time.sleep(0.1)
colors = stream.get_latest_colors()
# Should still be idle color (no matching mapping)
assert colors[0, 0] == 10
assert colors[0, 1] == 10
assert colors[0, 2] == 10
stream.stop()
def test_priority_based_layering(self):
"""Higher priority effects override lower ones."""
source = _make_source(
idle_color=BindableColor([0, 0, 0]),
event_mappings=[
{
"event_type": "kill",
"effect": "flash",
"color": [255, 0, 0],
"duration_ms": 3000,
"intensity": 1.0,
"priority": 1,
},
{
"event_type": "death",
"effect": "flash",
"color": [0, 0, 255],
"duration_ms": 3000,
"intensity": 1.0,
"priority": 5,
},
],
)
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
time.sleep(0.05)
# Fire low-priority kill event
bus.publish(_make_event("kill"))
time.sleep(0.05)
# Fire high-priority death event — should override
bus.publish(_make_event("death"))
time.sleep(0.1)
colors = stream.get_latest_colors()
# Should be blue (death effect), not red (kill effect)
assert colors[0, 2] > colors[0, 0]
stream.stop()
def test_effect_returns_to_idle(self):
"""After effect completes, output returns to idle_color."""
source = _make_source(
idle_color=BindableColor([42, 42, 42]),
event_mappings=[
{
"event_type": "kill",
"effect": "flash",
"color": [255, 0, 0],
"duration_ms": 50, # Very short effect
"intensity": 1.0,
"priority": 1,
}
],
)
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
time.sleep(0.05)
bus.publish(_make_event("kill"))
# Wait for effect to complete
time.sleep(0.2)
colors = stream.get_latest_colors()
# Should be back to idle color
assert colors[0, 0] == 42
assert colors[0, 1] == 42
assert colors[0, 2] == 42
stream.stop()
class TestGameEventColorStripStreamEffects:
"""Test individual effect types produce non-zero output at mid-progress."""
def _fire_and_sample(self, effect: str, color: list) -> np.ndarray:
source = _make_source(
idle_color=BindableColor([0, 0, 0]),
event_mappings=[
{
"event_type": "kill",
"effect": effect,
"color": color,
"duration_ms": 2000,
"intensity": 1.0,
"priority": 1,
}
],
)
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
time.sleep(0.05)
bus.publish(_make_event("kill"))
# Sample in the first quarter of the effect
time.sleep(0.15)
colors = stream.get_latest_colors()
stream.stop()
return colors
def test_flash_produces_output(self):
colors = self._fire_and_sample("flash", [255, 0, 0])
assert colors[0, 0] > 0
def test_pulse_produces_output(self):
colors = self._fire_and_sample("pulse", [0, 255, 0])
assert colors[0, 1] > 0
def test_sweep_produces_output(self):
colors = self._fire_and_sample("sweep", [0, 0, 255])
# At least some LEDs should have non-zero blue
assert np.any(colors[:, 2] > 0)
def test_color_shift_produces_output(self):
colors = self._fire_and_sample("color_shift", [255, 128, 0])
# Should have some non-zero pixels
assert np.any(colors > 0)
def test_breathing_produces_output(self):
colors = self._fire_and_sample("breathing", [128, 0, 255])
# Breathing may start dim, but should have some output by 0.15s into a 2s effect
assert np.any(colors > 0)
class TestGameEventColorStripStreamConfigure:
def test_auto_size(self):
source = _make_source(led_count=0) # auto-size
stream = GameEventColorStripStream(source)
assert stream.led_count == 1 # minimum
stream.configure(60)
assert stream.led_count == 60
def test_fixed_size_not_overridden(self):
source = _make_source(led_count=30)
stream = GameEventColorStripStream(source)
assert stream.led_count == 30
stream.configure(60)
# Should NOT change because auto_size is False
assert stream.led_count == 30
class TestGameEventColorStripStreamUpdate:
def test_update_source_hot_reloads(self):
source = _make_source(
event_mappings=[
{
"event_type": "kill",
"effect": "flash",
"color": [255, 0, 0],
"duration_ms": 500,
"intensity": 1.0,
"priority": 1,
}
],
)
bus = GameEventBus()
stream = GameEventColorStripStream(source, event_bus=bus)
stream.start()
# Update with new mappings
updated_source = _make_source(
event_mappings=[
{
"event_type": "health",
"effect": "breathing",
"color": [0, 255, 0],
"duration_ms": 1000,
"intensity": 0.5,
"priority": 0,
}
],
)
stream.update_source(updated_source)
assert "health" in stream._mapping_lookup
assert "kill" not in stream._mapping_lookup
stream.stop()
@@ -0,0 +1,500 @@
"""Tests for GameEventValueSource (storage model) and GameEventValueStream (runtime)."""
import time
import threading
import pytest
from wled_controller.core.game_integration.event_bus import GameEventBus
from wled_controller.core.game_integration.events import GameEvent
from wled_controller.core.value_sources.game_event_value_source import GameEventValueStream
from wled_controller.storage.value_source import (
GameEventValueSource,
ValueSource,
)
# ---------------------------------------------------------------------------
# GameEventValueSource model tests (Task 5)
# ---------------------------------------------------------------------------
class TestGameEventValueSourceModel:
"""Serialization round-trips and defaults for the storage dataclass."""
def test_round_trip(self):
data = {
"id": "vs_game1",
"name": "Health Monitor",
"source_type": "game_event",
"game_integration_id": "gi_abc123",
"event_type": "health",
"min_game_value": 0.0,
"max_game_value": 100.0,
"smoothing": 0.3,
"default_value": 0.5,
"timeout": 10.0,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, GameEventValueSource)
assert src.game_integration_id == "gi_abc123"
assert src.event_type == "health"
assert src.min_game_value == 0.0
assert src.max_game_value == 100.0
assert src.smoothing == 0.3
assert src.default_value == 0.5
assert src.timeout == 10.0
# Round-trip through to_dict -> from_dict
restored = ValueSource.from_dict(src.to_dict())
assert isinstance(restored, GameEventValueSource)
assert restored.game_integration_id == "gi_abc123"
assert restored.event_type == "health"
assert restored.smoothing == 0.3
assert restored.default_value == 0.5
assert restored.timeout == 10.0
def test_defaults(self):
data = {
"id": "vs_game2",
"name": "Default Test",
"source_type": "game_event",
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, GameEventValueSource)
assert src.game_integration_id == ""
assert src.event_type == "health"
assert src.min_game_value == 0.0
assert src.max_game_value == 100.0
assert src.smoothing == 0.0
assert src.default_value == 0.5
assert src.timeout == 5.0
def test_to_dict_includes_all_fields(self):
data = {
"id": "vs_game3",
"name": "Full",
"source_type": "game_event",
"game_integration_id": "gi_xyz",
"event_type": "ammo",
"min_game_value": 0.0,
"max_game_value": 30.0,
"smoothing": 0.5,
"default_value": 0.0,
"timeout": 3.0,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
d = src.to_dict()
assert d["source_type"] == "game_event"
assert d["game_integration_id"] == "gi_xyz"
assert d["event_type"] == "ammo"
assert d["max_game_value"] == 30.0
assert d["smoothing"] == 0.5
assert d["default_value"] == 0.0
assert d["timeout"] == 3.0
def test_from_dict_with_zero_values(self):
"""Ensure 0.0 values are preserved, not replaced by defaults."""
data = {
"id": "vs_game4",
"name": "Zeros",
"source_type": "game_event",
"min_game_value": 0.0,
"max_game_value": 0.0,
"smoothing": 0.0,
"default_value": 0.0,
"timeout": 0.0,
"created_at": "2025-01-01T00:00:00+00:00",
"updated_at": "2025-01-01T00:00:00+00:00",
}
src = ValueSource.from_dict(data)
assert isinstance(src, GameEventValueSource)
assert src.max_game_value == 0.0
assert src.default_value == 0.0
assert src.timeout == 0.0
# ---------------------------------------------------------------------------
# GameEventValueStream runtime tests (Task 6)
# ---------------------------------------------------------------------------
def _make_event(event_type: str, value: float) -> GameEvent:
return GameEvent(
adapter_id="test_adapter",
event_type=event_type,
value=value,
)
class TestGameEventValueStreamNormalization:
"""Tests for value normalization (min/max mapping)."""
def test_mid_value(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 50.0))
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
def test_min_value(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 0.0))
assert stream.get_value() == pytest.approx(0.0)
stream.stop()
def test_max_value(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 100.0))
assert stream.get_value() == pytest.approx(1.0)
stream.stop()
def test_clamp_above_max(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 150.0))
assert stream.get_value() == pytest.approx(1.0)
stream.stop()
def test_clamp_below_min(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", -10.0))
assert stream.get_value() == pytest.approx(0.0)
stream.stop()
def test_custom_range(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="ammo",
min_game_value=10.0,
max_game_value=60.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("ammo", 35.0))
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
def test_zero_range_returns_midpoint(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=50.0,
max_game_value=50.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 50.0))
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
class TestGameEventValueStreamSmoothing:
"""Tests for EMA smoothing behavior."""
def test_no_smoothing(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
smoothing=0.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 100.0))
assert stream.get_value() == pytest.approx(1.0)
bus.publish(_make_event("health", 0.0))
assert stream.get_value() == pytest.approx(0.0)
stream.stop()
def test_smoothing_dampens_change(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
smoothing=0.8,
event_bus=bus,
)
stream.start()
# First event: no smoothing applied (no previous value)
bus.publish(_make_event("health", 100.0))
assert stream.get_value() == pytest.approx(1.0)
# Second event: smoothing kicks in
# alpha = 1 - 0.8 = 0.2
# smoothed = 0.2 * 0.0 + 0.8 * 1.0 = 0.8
bus.publish(_make_event("health", 0.0))
assert stream.get_value() == pytest.approx(0.8)
# Third event: continues smoothing
# smoothed = 0.2 * 0.0 + 0.8 * 0.8 = 0.64
bus.publish(_make_event("health", 0.0))
assert stream.get_value() == pytest.approx(0.64)
stream.stop()
def test_smoothing_converges(self):
"""With many events at the same value, output should approach that value."""
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
smoothing=0.5,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 100.0))
for _ in range(20):
bus.publish(_make_event("health", 50.0))
assert stream.get_value() == pytest.approx(0.5, abs=0.01)
stream.stop()
class TestGameEventValueStreamTimeout:
"""Tests for timeout / revert to default."""
def test_default_before_any_event(self):
stream = GameEventValueStream(
event_type="health",
default_value=0.5,
event_bus=GameEventBus(),
)
stream.start()
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
def test_timeout_reverts_to_default(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
timeout=0.05, # 50ms timeout for fast test
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 80.0))
assert stream.get_value() == pytest.approx(0.8)
# Wait for timeout
time.sleep(0.1)
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
def test_no_timeout_when_zero(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
timeout=0.0,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 80.0))
time.sleep(0.05)
# With timeout=0.0, should never revert
assert stream.get_value() == pytest.approx(0.8)
stream.stop()
def test_event_resets_timeout(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
timeout=0.1,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 80.0))
time.sleep(0.06)
# Send another event before timeout
bus.publish(_make_event("health", 80.0))
time.sleep(0.06)
# Should still be live (timeout reset)
assert stream.get_value() == pytest.approx(0.8)
stream.stop()
class TestGameEventValueStreamLifecycle:
"""Tests for start/stop and EventBus subscription management."""
def test_stop_unsubscribes(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("health", 100.0))
assert stream.get_value() == pytest.approx(1.0)
stream.stop()
# After stop, events should not affect the stream
assert stream.get_value() == pytest.approx(0.5)
def test_ignores_other_event_types(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
event_bus=bus,
)
stream.start()
bus.publish(_make_event("ammo", 100.0))
# Should still be at default since event_type doesn't match
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
def test_no_event_bus(self):
"""Stream without event bus should return default value."""
stream = GameEventValueStream(
event_type="health",
default_value=0.7,
event_bus=None,
)
stream.start()
assert stream.get_value() == pytest.approx(0.7)
stream.stop()
def test_get_color_raises(self):
stream = GameEventValueStream(
event_type="health",
event_bus=None,
)
with pytest.raises(NotImplementedError):
stream.get_color()
class TestGameEventValueStreamThreadSafety:
"""Tests for concurrent access from multiple threads."""
def test_concurrent_publish_and_read(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
smoothing=0.0,
timeout=5.0,
event_bus=bus,
)
stream.start()
errors = []
def publisher():
try:
for i in range(100):
bus.publish(_make_event("health", float(i)))
except Exception as e:
errors.append(e)
def reader():
try:
for _ in range(100):
val = stream.get_value()
assert 0.0 <= val <= 1.0
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=publisher),
threading.Thread(target=reader),
threading.Thread(target=reader),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5.0)
stream.stop()
assert len(errors) == 0, f"Thread errors: {errors}"
class TestGameEventValueStreamHotUpdate:
"""Tests for update_source hot-update functionality."""
def test_update_source_changes_params(self):
bus = GameEventBus()
stream = GameEventValueStream(
event_type="health",
min_game_value=0.0,
max_game_value=100.0,
default_value=0.5,
timeout=5.0,
event_bus=bus,
)
stream.start()
# Create an updated source config
source = GameEventValueSource(
id="vs_test",
name="Test",
source_type="game_event",
min_game_value=0.0,
max_game_value=200.0,
smoothing=0.5,
default_value=0.3,
timeout=10.0,
created_at="2025-01-01T00:00:00+00:00",
updated_at="2025-01-01T00:00:00+00:00",
)
stream.update_source(source)
# Verify hot-updated params take effect
bus.publish(_make_event("health", 100.0))
# With max_game_value=200, 100 normalizes to 0.5
assert stream.get_value() == pytest.approx(0.5)
stream.stop()
+75
View File
@@ -0,0 +1,75 @@
"""Tests for built-in effect presets."""
import pytest
from wled_controller.core.game_integration.presets import (
EffectPreset,
get_all_presets,
get_preset,
)
from wled_controller.storage.game_integration import EventMapping
class TestPresetData:
"""Verify preset data structure and contents."""
def test_get_all_presets_returns_list(self):
presets = get_all_presets()
assert isinstance(presets, list)
assert len(presets) >= 4
def test_each_preset_has_required_fields(self):
for preset in get_all_presets():
assert isinstance(preset, EffectPreset)
assert preset.key
assert preset.name
assert preset.description
assert len(preset.target_game_types) > 0
assert len(preset.event_mappings) > 0
def test_each_mapping_is_valid(self):
for preset in get_all_presets():
for mapping in preset.event_mappings:
assert isinstance(mapping, EventMapping)
assert mapping.event_type
assert mapping.effect
assert len(mapping.color) == 3
assert all(0 <= c <= 255 for c in mapping.color)
assert mapping.duration_ms > 0
assert 0.0 <= mapping.intensity <= 1.0
assert mapping.priority >= 0
def test_get_preset_by_key(self):
assert get_preset("fps_combat") is not None
assert get_preset("moba_health") is not None
assert get_preset("racing") is not None
assert get_preset("generic_alert") is not None
def test_get_preset_unknown_returns_none(self):
assert get_preset("nonexistent") is None
def test_fps_combat_has_expected_events(self):
preset = get_preset("fps_combat")
assert preset is not None
event_types = {m.event_type for m in preset.event_mappings}
assert "health" in event_types
assert "kill" in event_types
assert "death" in event_types
def test_moba_health_has_expected_events(self):
preset = get_preset("moba_health")
assert preset is not None
event_types = {m.event_type for m in preset.event_mappings}
assert "health" in event_types
assert "mana" in event_types
def test_presets_are_frozen(self):
preset = get_preset("fps_combat")
assert preset is not None
with pytest.raises(AttributeError):
preset.name = "Changed" # type: ignore[misc]
def test_preset_keys_unique(self):
presets = get_all_presets()
keys = [p.key for p in presets]
assert len(keys) == len(set(keys))
+47
View File
@@ -0,0 +1,47 @@
"""Tests verifying game integration wiring in ProcessorDependencies.
Ensures that GameEventBus is properly threaded through to
ColorStripStreamManager and ValueStreamManager.
"""
from wled_controller.core.game_integration.event_bus import GameEventBus
from wled_controller.core.processing.processor_manager import (
ProcessorDependencies,
ProcessorManager,
)
class TestGameEventBusWiring:
"""Verify that game_event_bus propagates through ProcessorManager."""
def test_processor_dependencies_accepts_game_event_bus(self):
bus = GameEventBus()
deps = ProcessorDependencies(game_event_bus=bus)
assert deps.game_event_bus is bus
def test_css_stream_manager_receives_bus(self):
bus = GameEventBus()
deps = ProcessorDependencies(game_event_bus=bus)
pm = ProcessorManager(deps)
css_mgr = pm._color_strip_stream_manager
assert css_mgr._game_event_bus is bus
def test_value_stream_manager_receives_bus(self):
"""ValueStreamManager is only created when value_source_store is provided."""
from unittest.mock import MagicMock
bus = GameEventBus()
mock_vs_store = MagicMock()
deps = ProcessorDependencies(
game_event_bus=bus,
value_source_store=mock_vs_store,
)
pm = ProcessorManager(deps)
vs_mgr = pm._value_stream_manager
assert vs_mgr is not None
assert vs_mgr._event_bus is bus
def test_without_bus_defaults_to_none(self):
deps = ProcessorDependencies()
pm = ProcessorManager(deps)
assert pm._color_strip_stream_manager._game_event_bus is None
@@ -0,0 +1,162 @@
"""Tests for Generic Webhook adapter."""
import pytest
from wled_controller.core.game_integration.adapters.generic_webhook_adapter import (
GenericWebhookAdapter,
)
class TestGenericWebhookParsing:
def test_basic_mapping(self) -> None:
config = {
"adapter_id": "webhook_test",
"mappings": [
{
"source_path": "player.health",
"event": "health",
"min": 0,
"max": 100,
"trigger": "on_value",
},
],
}
payload = {"player": {"health": 75}}
events, _ = GenericWebhookAdapter.parse_payload(payload, config, {})
assert len(events) == 1
assert events[0].event_type == "health"
assert events[0].value == pytest.approx(0.75)
def test_multiple_mappings(self) -> None:
config = {
"adapter_id": "webhook_test",
"mappings": [
{
"source_path": "hp",
"event": "health",
"min": 0,
"max": 100,
"trigger": "on_value",
},
{"source_path": "mp", "event": "mana", "min": 0, "max": 200, "trigger": "on_value"},
],
}
payload = {"hp": 50, "mp": 100}
events, _ = GenericWebhookAdapter.parse_payload(payload, config, {})
assert len(events) == 2
types = {e.event_type for e in events}
assert types == {"health", "mana"}
def test_empty_mappings(self) -> None:
config = {"adapter_id": "test", "mappings": []}
events, state = GenericWebhookAdapter.parse_payload({"hp": 50}, config, {})
assert len(events) == 0
def test_no_mappings_key(self) -> None:
config = {"adapter_id": "test"}
events, _ = GenericWebhookAdapter.parse_payload({"hp": 50}, config, {})
assert len(events) == 0
def test_diff_based_trigger(self) -> None:
config = {
"adapter_id": "test",
"mappings": [
{
"source_path": "kills",
"event": "kill",
"min": 0,
"max": 50,
"trigger": "on_increase",
},
],
}
# First call — establish baseline
_, state1 = GenericWebhookAdapter.parse_payload({"kills": 5}, config, {})
# Increase — should emit
events2, state2 = GenericWebhookAdapter.parse_payload({"kills": 6}, config, state1)
assert len(events2) == 1
assert events2[0].event_type == "kill"
# Same — should not emit
events3, _ = GenericWebhookAdapter.parse_payload({"kills": 6}, config, state2)
assert len(events3) == 0
def test_nested_json_path(self) -> None:
config = {
"adapter_id": "test",
"mappings": [
{
"source_path": "a.b.c",
"event": "health",
"min": 0,
"max": 10,
"trigger": "on_value",
},
],
}
payload = {"a": {"b": {"c": 5}}}
events, _ = GenericWebhookAdapter.parse_payload(payload, config, {})
assert len(events) == 1
assert events[0].value == pytest.approx(0.5)
class TestGenericWebhookAuth:
def test_no_auth_configured(self) -> None:
result = GenericWebhookAdapter.validate_auth({}, {}, {})
assert result is True
def test_bearer_auth_valid(self) -> None:
result = GenericWebhookAdapter.validate_auth(
{"Authorization": "Bearer secret123"},
{},
{"auth_token": "secret123"},
)
assert result is True
def test_bearer_auth_invalid(self) -> None:
result = GenericWebhookAdapter.validate_auth(
{"Authorization": "Bearer wrong"},
{},
{"auth_token": "secret123"},
)
assert result is False
def test_raw_token_auth(self) -> None:
result = GenericWebhookAdapter.validate_auth(
{"Authorization": "secret123"},
{},
{"auth_token": "secret123"},
)
assert result is True
def test_custom_header(self) -> None:
result = GenericWebhookAdapter.validate_auth(
{"X-Custom": "mytoken"},
{},
{"auth_token": "mytoken", "auth_header": "X-Custom"},
)
assert result is True
def test_missing_header(self) -> None:
result = GenericWebhookAdapter.validate_auth(
{},
{},
{"auth_token": "secret"},
)
assert result is False
class TestGenericWebhookMetadata:
def test_adapter_type(self) -> None:
assert GenericWebhookAdapter.ADAPTER_TYPE == "generic_webhook"
def test_config_schema(self) -> None:
schema = GenericWebhookAdapter.get_config_schema()
assert "auth_token" in schema["properties"]
assert "mappings" in schema["properties"]
assert "auth_header" in schema["properties"]
def test_setup_instructions(self) -> None:
instructions = GenericWebhookAdapter.get_setup_instructions()
assert "Webhook" in instructions
+183
View File
@@ -0,0 +1,183 @@
"""Tests for League of Legends Live Client Data API adapter."""
import threading
import pytest
from wled_controller.core.game_integration.adapters.lol_adapter import (
LoLAdapter,
LoLPoller,
)
def _make_lol_payload(
*,
current_health: float = 1000.0,
max_health: float = 1000.0,
resource_value: float = 500.0,
resource_max: float = 500.0,
level: int = 10,
summoner_name: str = "TestPlayer",
current_gold: float | None = None,
) -> dict:
"""Build a realistic LoL Live Client Data payload."""
payload: dict = {
"activePlayer": {
"summonerName": summoner_name,
"level": level,
"currentHealth": current_health,
"championStats": {
"currentHealth": current_health,
"maxHealth": max_health,
"resourceValue": resource_value,
"resourceMax": resource_max,
"attackDamage": 80.0,
"abilityPower": 0.0,
"armor": 60.0,
"magicResist": 40.0,
"moveSpeed": 345.0,
},
},
"allPlayers": [
{
"summonerName": summoner_name,
"championName": "Jinx",
"team": "ORDER",
"scores": {"kills": 5, "deaths": 2, "assists": 7},
},
],
"gameData": {
"gameMode": "CLASSIC",
"gameTime": 1200.5,
"mapName": "Map11",
"mapNumber": 11,
"mapTerrain": "Default",
},
}
if current_gold is not None:
payload["allPlayers"][0]["currentGold"] = current_gold
return payload
class TestLoLContinuousEvents:
def test_health_normalized(self) -> None:
payload = _make_lol_payload(current_health=500.0, max_health=1000.0)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, {})
hp = [e for e in events if e.event_type == "health"]
assert len(hp) == 1
assert hp[0].value == pytest.approx(0.5)
def test_mana_normalized(self) -> None:
payload = _make_lol_payload(resource_value=300.0, resource_max=600.0)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, {})
mp = [e for e in events if e.event_type == "mana"]
assert len(mp) == 1
assert mp[0].value == pytest.approx(0.5)
def test_level_normalized(self) -> None:
payload = _make_lol_payload(level=9)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, {})
lvl = [e for e in events if e.event_type == "speed"]
assert len(lvl) == 1
assert lvl[0].value == pytest.approx(9.0 / 18.0)
def test_level_max(self) -> None:
payload = _make_lol_payload(level=18)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, {})
lvl = [e for e in events if e.event_type == "speed"]
assert lvl[0].value == pytest.approx(1.0)
def test_gold(self) -> None:
payload = _make_lol_payload(current_gold=15000.0)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, {})
gold = [e for e in events if e.event_type == "gold"]
assert len(gold) == 1
assert gold[0].value == pytest.approx(15000.0 / 30000.0)
class TestLoLDeathRespawn:
def test_death_detected_when_health_drops_to_zero(self) -> None:
prev = {"alive": True}
payload = _make_lol_payload(current_health=0.0, max_health=1000.0)
events, state = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, prev)
deaths = [e for e in events if e.event_type == "death"]
assert len(deaths) == 1
assert state["alive"] is False
def test_no_death_when_already_dead(self) -> None:
prev = {"alive": False}
payload = _make_lol_payload(current_health=0.0, max_health=1000.0)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, prev)
deaths = [e for e in events if e.event_type == "death"]
assert len(deaths) == 0
def test_respawn_detected(self) -> None:
prev = {"alive": False}
payload = _make_lol_payload(current_health=800.0, max_health=1000.0)
events, state = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, prev)
respawns = [e for e in events if e.event_type == "objective_progress"]
assert len(respawns) == 1
assert state["alive"] is True
def test_no_respawn_when_already_alive(self) -> None:
prev = {"alive": True}
payload = _make_lol_payload(current_health=800.0, max_health=1000.0)
events, _ = LoLAdapter.parse_payload(payload, {"adapter_id": "lol"}, prev)
respawns = [e for e in events if e.event_type == "objective_progress"]
assert len(respawns) == 0
class TestLoLAuth:
def test_always_accepts(self) -> None:
assert LoLAdapter.validate_auth({}, {}, {}) is True
assert LoLAdapter.validate_auth({"X-Custom": "val"}, {}, {"auth_token": "x"}) is True
class TestLoLMetadata:
def test_adapter_type(self) -> None:
assert LoLAdapter.ADAPTER_TYPE == "lol"
def test_supported_events(self) -> None:
assert "health" in LoLAdapter.SUPPORTED_EVENTS
assert "mana" in LoLAdapter.SUPPORTED_EVENTS
assert "death" in LoLAdapter.SUPPORTED_EVENTS
def test_config_schema(self) -> None:
schema = LoLAdapter.get_config_schema()
assert "poll_interval_ms" in schema["properties"]
def test_setup_instructions(self) -> None:
instructions = LoLAdapter.get_setup_instructions()
assert "League of Legends" in instructions
class TestLoLPoller:
def test_start_stop(self) -> None:
"""Test that the poller starts and stops cleanly."""
called = threading.Event()
def callback(data: dict) -> None:
called.set()
poller = LoLPoller({"poll_interval_ms": 100}, callback)
assert not poller.is_running
poller.start()
assert poller.is_running
poller.stop()
assert not poller.is_running
def test_double_start_no_crash(self) -> None:
"""Starting twice should not create duplicate threads."""
poller = LoLPoller({"poll_interval_ms": 1000}, lambda d: None)
poller.start()
poller.start() # should warn but not crash
poller.stop()
def test_stop_without_start(self) -> None:
"""Stopping without starting should not crash."""
poller = LoLPoller({"poll_interval_ms": 1000}, lambda d: None)
poller.stop() # no-op
+469
View File
@@ -0,0 +1,469 @@
"""Tests for MappingAdapter — YAML parsing, payload translation, validation."""
import textwrap
from pathlib import Path
import pytest
from wled_controller.core.game_integration.mapping_adapter import (
MappingAdapter,
load_adapter_from_yaml,
validate_adapter_yaml,
)
# ── YAML validation tests ───────────────────────────────────────────────
class TestValidateAdapterYaml:
def test_valid_minimal(self) -> None:
data = {
"name": "test_adapter",
"game": "TestGame",
"protocol": "webhook",
"mappings": [
{"source_path": "player.health", "event": "health"},
],
}
errors = validate_adapter_yaml(data)
assert errors == []
def test_missing_name(self) -> None:
data = {
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
}
errors = validate_adapter_yaml(data)
assert any("name" in e for e in errors)
def test_missing_game(self) -> None:
data = {
"name": "test",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
}
errors = validate_adapter_yaml(data)
assert any("game" in e for e in errors)
def test_invalid_protocol(self) -> None:
data = {
"name": "test",
"game": "TestGame",
"protocol": "invalid",
"mappings": [{"source_path": "x", "event": "health"}],
}
errors = validate_adapter_yaml(data)
assert any("protocol" in e for e in errors)
def test_empty_mappings(self) -> None:
data = {
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [],
}
errors = validate_adapter_yaml(data)
assert any("mappings" in e for e in errors)
def test_unknown_event_type(self) -> None:
data = {
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "nonexistent_event"}],
}
errors = validate_adapter_yaml(data)
assert any("unknown event type" in e for e in errors)
def test_invalid_trigger_mode(self) -> None:
data = {
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [
{"source_path": "x", "event": "health", "trigger": "bad_mode"},
],
}
errors = validate_adapter_yaml(data)
assert any("trigger mode" in e for e in errors)
def test_non_numeric_min_max(self) -> None:
data = {
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [
{"source_path": "x", "event": "health", "min": "not_a_number"},
],
}
errors = validate_adapter_yaml(data)
assert any("'min' must be numeric" in e for e in errors)
# ── MappingAdapter payload parsing tests ─────────────────────────────────
class TestMappingAdapterParsePayload:
def _make_adapter(self, mappings: list[dict]) -> MappingAdapter:
return MappingAdapter(
{
"name": "test_adapter",
"game": "TestGame",
"protocol": "webhook",
"mappings": mappings,
}
)
def test_continuous_value_normalization(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "player.health",
"event": "health",
"min": 0,
"max": 100,
"trigger": "on_value",
},
]
)
payload = {"player": {"health": 75}}
events, state = adapter.parse_payload(payload, {"adapter_id": "test"}, {})
assert len(events) == 1
assert events[0].event_type == "health"
assert events[0].value == pytest.approx(0.75)
assert events[0].adapter_id == "test"
def test_value_clamped_to_0_1(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "val",
"event": "health",
"min": 0,
"max": 100,
"trigger": "on_value",
},
]
)
events_over, _ = adapter.parse_payload({"val": 150}, {"adapter_id": "t"}, {})
assert events_over[0].value == 1.0
events_under, _ = adapter.parse_payload({"val": -50}, {"adapter_id": "t"}, {})
assert events_under[0].value == 0.0
def test_on_change_trigger(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "kills",
"event": "kill",
"min": 0,
"max": 50,
"trigger": "on_change",
},
]
)
# First call: no prev_state, should emit
events1, state1 = adapter.parse_payload({"kills": 5}, {"adapter_id": "t"}, {})
assert len(events1) == 1
# Same value: should NOT emit
events2, state2 = adapter.parse_payload({"kills": 5}, {"adapter_id": "t"}, state1)
assert len(events2) == 0
# Changed value: should emit
events3, _ = adapter.parse_payload({"kills": 6}, {"adapter_id": "t"}, state2)
assert len(events3) == 1
def test_on_increase_trigger(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "kills",
"event": "kill",
"min": 0,
"max": 50,
"trigger": "on_increase",
},
]
)
# First call: prev_state empty, should NOT emit (no baseline)
events1, state1 = adapter.parse_payload({"kills": 5}, {"adapter_id": "t"}, {})
assert len(events1) == 0
# Decrease: should NOT emit
events2, state2 = adapter.parse_payload({"kills": 3}, {"adapter_id": "t"}, state1)
assert len(events2) == 0
# Increase: should emit
events3, _ = adapter.parse_payload({"kills": 7}, {"adapter_id": "t"}, state2)
assert len(events3) == 1
def test_on_decrease_trigger(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "hp",
"event": "damage_taken",
"min": 0,
"max": 100,
"trigger": "on_decrease",
},
]
)
_, state1 = adapter.parse_payload({"hp": 100}, {"adapter_id": "t"}, {})
events, _ = adapter.parse_payload({"hp": 80}, {"adapter_id": "t"}, state1)
assert len(events) == 1
assert events[0].event_type == "damage_taken"
def test_missing_path_skipped(self) -> None:
adapter = self._make_adapter(
[
{"source_path": "player.health", "event": "health", "trigger": "on_value"},
]
)
events, _ = adapter.parse_payload({"player": {}}, {"adapter_id": "t"}, {})
assert len(events) == 0
def test_non_numeric_value_emits_trigger(self) -> None:
adapter = self._make_adapter(
[
{"source_path": "status", "event": "buffed", "trigger": "on_value"},
]
)
events, _ = adapter.parse_payload({"status": "active"}, {"adapter_id": "t"}, {})
assert len(events) == 1
assert events[0].value == 1.0
def test_nested_json_path(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "a.b.c.d",
"event": "health",
"min": 0,
"max": 10,
"trigger": "on_value",
},
]
)
payload = {"a": {"b": {"c": {"d": 5}}}}
events, _ = adapter.parse_payload(payload, {"adapter_id": "t"}, {})
assert len(events) == 1
assert events[0].value == pytest.approx(0.5)
def test_multiple_mappings(self) -> None:
adapter = self._make_adapter(
[
{
"source_path": "hp",
"event": "health",
"min": 0,
"max": 100,
"trigger": "on_value",
},
{"source_path": "mp", "event": "mana", "min": 0, "max": 200, "trigger": "on_value"},
]
)
events, _ = adapter.parse_payload(
{"hp": 50, "mp": 100},
{"adapter_id": "t"},
{},
)
assert len(events) == 2
types = {e.event_type for e in events}
assert types == {"health", "mana"}
# ── Auth validation tests ────────────────────────────────────────────────
class TestMappingAdapterAuth:
def test_no_auth_accepts_all(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
}
)
assert adapter.validate_auth({}, {}, {}) is True
def test_header_auth_valid(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
"auth": {"type": "header", "header": "X-Auth-Token"},
}
)
result = adapter.validate_auth(
{"X-Auth-Token": "secret123"},
{},
{"auth_token": "secret123"},
)
assert result is True
def test_header_auth_invalid(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
"auth": {"type": "header", "header": "X-Auth-Token"},
}
)
result = adapter.validate_auth(
{"X-Auth-Token": "wrong"},
{},
{"auth_token": "secret123"},
)
assert result is False
# ── YAML file loading tests ──────────────────────────────────────────────
class TestLoadAdapterFromYaml:
def test_load_valid_yaml(self, tmp_path: Path) -> None:
yaml_content = textwrap.dedent(
"""\
name: cs2_gsi
game: "Counter-Strike 2"
protocol: webhook
mappings:
- source_path: player.state.health
event: health
min: 0
max: 100
- source_path: player.state.armor
event: armor
min: 0
max: 100
auth:
type: header
header: X-GSI-Auth
"""
)
yaml_file = tmp_path / "cs2.yaml"
yaml_file.write_text(yaml_content)
adapter = load_adapter_from_yaml(yaml_file)
assert adapter.name == "cs2_gsi"
assert adapter.game == "Counter-Strike 2"
assert adapter.protocol == "webhook"
assert "health" in adapter.supported_events
assert "armor" in adapter.supported_events
def test_load_nonexistent_file_raises(self) -> None:
with pytest.raises(FileNotFoundError):
load_adapter_from_yaml("/nonexistent/path.yaml")
def test_load_invalid_yaml_raises(self, tmp_path: Path) -> None:
yaml_file = tmp_path / "bad.yaml"
yaml_file.write_text("name: test\n") # Missing required fields
with pytest.raises(ValueError, match="Invalid adapter YAML"):
load_adapter_from_yaml(yaml_file)
def test_load_non_dict_yaml_raises(self, tmp_path: Path) -> None:
yaml_file = tmp_path / "list.yaml"
yaml_file.write_text("- item1\n- item2\n")
with pytest.raises(ValueError, match="must be a dict"):
load_adapter_from_yaml(yaml_file)
def test_loaded_adapter_parses_payload(self, tmp_path: Path) -> None:
yaml_content = textwrap.dedent(
"""\
name: test_game
game: TestGame
protocol: webhook
mappings:
- source_path: hp
event: health
min: 0
max: 100
trigger: on_value
"""
)
yaml_file = tmp_path / "test.yaml"
yaml_file.write_text(yaml_content)
adapter = load_adapter_from_yaml(yaml_file)
events, _ = adapter.parse_payload(
{"hp": 60},
{"adapter_id": "loaded_test"},
{},
)
assert len(events) == 1
assert events[0].value == pytest.approx(0.6)
# ── Properties and metadata tests ────────────────────────────────────────
class TestMappingAdapterMetadata:
def test_config_schema_with_auth(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
"auth": {"type": "header", "header": "X-Token"},
}
)
schema = adapter.get_config_schema()
assert "auth_token" in schema["properties"]
def test_config_schema_without_auth(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
}
)
schema = adapter.get_config_schema()
assert schema["properties"] == {}
def test_setup_instructions_from_yaml(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
"setup_instructions": "# Step 1\nDo this.",
}
)
assert "Step 1" in adapter.get_setup_instructions()
def test_setup_instructions_default(self) -> None:
adapter = MappingAdapter(
{
"name": "test",
"game": "TestGame",
"protocol": "webhook",
"mappings": [{"source_path": "x", "event": "health"}],
}
)
instructions = adapter.get_setup_instructions()
assert "TestGame" in instructions