fix: HA light target — brightness source, transition=0, dashboard type label
Lint & Test / test (push) Successful in 1m13s

- Add brightness_value_source_id to HALightOutputTarget model, to_dict,
  from_dict, update_fields, register_with_manager, API response
- Wire value stream in HALightTargetProcessor: acquire/release on
  start/stop, multiply brightness in _update_lights loop
- Fix transition=0 not saving (parseFloat("0") || 0.5 was falsy)
- Fix dashboard showing "Key Colors" for HA targets — now "Home Assistant"
- Fix dashboard FPS showing 0/2 — HA targets show target/target
- Add CSS source subtitle to HA target dashboard cards
This commit is contained in:
2026-03-28 16:03:06 +03:00
parent 3e6760f726
commit 381ee75371
18 changed files with 341 additions and 567 deletions
@@ -1,4 +1,5 @@
"""The LED Screen Controller integration."""
from __future__ import annotations
import logging
@@ -18,14 +19,12 @@ from .const import (
CONF_SERVER_URL,
CONF_API_KEY,
DEFAULT_SCAN_INTERVAL,
TARGET_TYPE_KEY_COLORS,
TARGET_TYPE_HA_LIGHT,
DATA_COORDINATOR,
DATA_WS_MANAGER,
DATA_EVENT_LISTENER,
)
from .coordinator import WLEDScreenControllerCoordinator
from .event_listener import EventStreamListener
from .ws_manager import KeyColorsWebSocketManager
_LOGGER = logging.getLogger(__name__)
@@ -56,8 +55,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await coordinator.async_config_entry_first_refresh()
ws_manager = KeyColorsWebSocketManager(hass, server_url, api_key)
event_listener = EventStreamListener(hass, server_url, api_key, coordinator)
await event_listener.start()
@@ -68,11 +65,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
for target_id, target_data in coordinator.data["targets"].items():
info = target_data["info"]
target_type = info.get("target_type", "led")
model = (
"Key Colors Target"
if target_type == TARGET_TYPE_KEY_COLORS
else "LED Target"
)
if target_type == TARGET_TYPE_HA_LIGHT:
model = "HA Light Target"
else:
model = "LED Target"
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, target_id)},
@@ -98,9 +94,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
current_identifiers.add(scenes_identifier)
# Remove devices for targets that no longer exist
for device_entry in dr.async_entries_for_config_entry(
device_registry, entry.entry_id
):
for device_entry in dr.async_entries_for_config_entry(device_registry, entry.entry_id):
if not device_entry.identifiers & current_identifiers:
_LOGGER.info("Removing stale device: %s", device_entry.name)
device_registry.async_remove_device(device_entry.id)
@@ -109,20 +103,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator,
DATA_WS_MANAGER: ws_manager,
DATA_EVENT_LISTENER: event_listener,
}
# Track target and scene IDs to detect changes
known_target_ids = set(
coordinator.data.get("targets", {}).keys() if coordinator.data else []
)
known_target_ids = set(coordinator.data.get("targets", {}).keys() if coordinator.data else [])
known_scene_ids = set(
p["id"] for p in (coordinator.data.get("scene_presets", []) if coordinator.data else [])
)
def _on_coordinator_update() -> None:
"""Manage WS connections and detect target list changes."""
"""Detect target/scene list changes and trigger reload."""
nonlocal known_target_ids, known_scene_ids
if not coordinator.data:
@@ -130,30 +121,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
targets = coordinator.data.get("targets", {})
# Start/stop WS connections for KC targets based on processing state
for target_id, target_data in targets.items():
info = target_data.get("info", {})
state = target_data.get("state") or {}
if info.get("target_type") == TARGET_TYPE_KEY_COLORS:
if state.get("processing"):
if target_id not in ws_manager._connections:
hass.async_create_task(ws_manager.start_listening(target_id))
else:
if target_id in ws_manager._connections:
hass.async_create_task(ws_manager.stop_listening(target_id))
# Reload if target or scene list changed
current_ids = set(targets.keys())
current_scene_ids = set(
p["id"] for p in coordinator.data.get("scene_presets", [])
)
current_scene_ids = set(p["id"] for p in coordinator.data.get("scene_presets", []))
if current_ids != known_target_ids or current_scene_ids != known_scene_ids:
known_target_ids = current_ids
known_scene_ids = current_scene_ids
_LOGGER.info("Target or scene list changed, reloading integration")
hass.async_create_task(
hass.config_entries.async_reload(entry.entry_id)
)
hass.async_create_task(hass.config_entries.async_reload(entry.entry_id))
coordinator.async_add_listener(_on_coordinator_update)
@@ -167,9 +142,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
coord = entry_data.get(DATA_COORDINATOR)
if not coord or not coord.data:
continue
source_ids = {
s["id"] for s in coord.data.get("css_sources", [])
}
source_ids = {s["id"] for s in coord.data.get("css_sources", [])}
if source_id in source_ids:
await coord.push_segments(source_id, segments)
return
@@ -180,10 +153,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
DOMAIN,
"set_leds",
handle_set_leds,
schema=vol.Schema({
vol.Required("source_id"): str,
vol.Required("segments"): list,
}),
schema=vol.Schema(
{
vol.Required("source_id"): str,
vol.Required("segments"): list,
}
),
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@@ -194,7 +169,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
entry_data = hass.data[DOMAIN][entry.entry_id]
await entry_data[DATA_WS_MANAGER].shutdown()
await entry_data[DATA_EVENT_LISTENER].shutdown()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
@@ -15,9 +15,8 @@ WS_MAX_RECONNECT_DELAY = 60 # seconds
# Target types
TARGET_TYPE_LED = "led"
TARGET_TYPE_KEY_COLORS = "key_colors"
TARGET_TYPE_HA_LIGHT = "ha_light"
# Data keys stored in hass.data[DOMAIN][entry_id]
DATA_COORDINATOR = "coordinator"
DATA_WS_MANAGER = "ws_manager"
DATA_EVENT_LISTENER = "event_listener"
@@ -1,4 +1,5 @@
"""Data update coordinator for LED Screen Controller."""
from __future__ import annotations
import asyncio
@@ -14,7 +15,6 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, Upda
from .const import (
DOMAIN,
DEFAULT_TIMEOUT,
TARGET_TYPE_KEY_COLORS,
)
_LOGGER = logging.getLogger(__name__)
@@ -74,27 +74,12 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
state = None
metrics = None
result: dict[str, Any] = {
return target_id, {
"info": target,
"state": state,
"metrics": metrics,
}
# Fetch rectangles for key_colors targets
if target.get("target_type") == TARGET_TYPE_KEY_COLORS:
kc_settings = target.get("key_colors_settings") or {}
template_id = kc_settings.get("pattern_template_id", "")
if template_id:
result["rectangles"] = await self._fetch_rectangles(
template_id
)
else:
result["rectangles"] = []
else:
result["rectangles"] = []
return target_id, result
results = await asyncio.gather(
*(fetch_target_data(t) for t in targets_list),
return_exceptions=True,
@@ -108,13 +93,11 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
targets_data[target_id] = data
# Fetch devices, CSS sources, value sources, and scene presets in parallel
devices_data, css_sources, value_sources, scene_presets = (
await asyncio.gather(
self._fetch_devices(),
self._fetch_css_sources(),
self._fetch_value_sources(),
self._fetch_scene_presets(),
)
devices_data, css_sources, value_sources, scene_presets = await asyncio.gather(
self._fetch_devices(),
self._fetch_css_sources(),
self._fetch_value_sources(),
self._fetch_scene_presets(),
)
return {
@@ -176,23 +159,6 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
resp.raise_for_status()
return await resp.json()
async def _fetch_rectangles(self, template_id: str) -> list[dict]:
"""Fetch rectangles for a pattern template (no cache — always fresh)."""
try:
async with self.session.get(
f"{self.server_url}/api/v1/pattern-templates/{template_id}",
headers=self._auth_headers,
timeout=self._timeout,
) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("rectangles", [])
except Exception as err:
_LOGGER.warning(
"Failed to fetch pattern template %s: %s", template_id, err
)
return []
async def _fetch_devices(self) -> dict[str, dict[str, Any]]:
"""Fetch all devices with capabilities and brightness."""
try:
@@ -225,7 +191,8 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
except Exception as err:
_LOGGER.warning(
"Failed to fetch brightness for device %s: %s",
device_id, err,
device_id,
err,
)
return device_id, entry
@@ -256,7 +223,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to set brightness for device %s: %s %s",
device_id, resp.status, body,
device_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
@@ -273,25 +242,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to set color for device %s: %s %s",
device_id, resp.status, body,
)
resp.raise_for_status()
await self.async_request_refresh()
async def set_kc_brightness(self, target_id: str, brightness: int) -> None:
"""Set brightness for a Key Colors target (0-255 mapped to 0.0-1.0)."""
brightness_float = round(brightness / 255, 4)
async with self.session.put(
f"{self.server_url}/api/v1/output-targets/{target_id}",
headers={**self._auth_headers, "Content-Type": "application/json"},
json={"key_colors_settings": {"brightness": brightness_float}},
timeout=self._timeout,
) as resp:
if resp.status != 200:
body = await resp.text()
_LOGGER.error(
"Failed to set KC brightness for target %s: %s %s",
target_id, resp.status, body,
device_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
@@ -353,7 +306,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to push colors to source %s: %s %s",
source_id, resp.status, body,
source_id,
resp.status,
body,
)
resp.raise_for_status()
@@ -369,7 +324,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to push segments to source %s: %s %s",
source_id, resp.status, body,
source_id,
resp.status,
body,
)
resp.raise_for_status()
@@ -384,7 +341,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to activate scene %s: %s %s",
preset_id, resp.status, body,
preset_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
@@ -401,7 +360,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to update source %s: %s %s",
source_id, resp.status, body,
source_id,
resp.status,
body,
)
resp.raise_for_status()
@@ -417,7 +378,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to update target %s: %s %s",
target_id, resp.status, body,
target_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
@@ -435,7 +398,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to start target %s: %s %s",
target_id, resp.status, body,
target_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
@@ -453,7 +418,9 @@ class WLEDScreenControllerCoordinator(DataUpdateCoordinator):
body = await resp.text()
_LOGGER.error(
"Failed to stop target %s: %s %s",
target_id, resp.status, body,
target_id,
resp.status,
body,
)
resp.raise_for_status()
await self.async_request_refresh()
@@ -1,4 +1,5 @@
"""Number platform for LED Screen Controller (device & KC target brightness)."""
"""Number platform for LED Screen Controller (device brightness & HA light settings)."""
from __future__ import annotations
import logging
@@ -10,7 +11,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, DATA_COORDINATOR, TARGET_TYPE_KEY_COLORS
from .const import DOMAIN, DATA_COORDINATOR, TARGET_TYPE_HA_LIGHT
from .coordinator import WLEDScreenControllerCoordinator
_LOGGER = logging.getLogger(__name__)
@@ -21,24 +22,24 @@ async def async_setup_entry(
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up LED Screen Controller brightness numbers."""
"""Set up LED Screen Controller number entities."""
data = hass.data[DOMAIN][entry.entry_id]
coordinator: WLEDScreenControllerCoordinator = data[DATA_COORDINATOR]
entities = []
entities: list[NumberEntity] = []
if coordinator.data and "targets" in coordinator.data:
devices = coordinator.data.get("devices") or {}
for target_id, target_data in coordinator.data["targets"].items():
info = target_data["info"]
target_type = info.get("target_type", "led")
if info.get("target_type") == TARGET_TYPE_KEY_COLORS:
# KC target — brightness lives in key_colors_settings
entities.append(
WLEDScreenControllerKCBrightness(
coordinator, target_id, entry.entry_id,
)
)
if target_type == TARGET_TYPE_HA_LIGHT:
# HA Light target — expose tunable settings
entities.append(HALightUpdateRate(coordinator, target_id, entry.entry_id))
entities.append(HALightTransition(coordinator, target_id, entry.entry_id))
entities.append(HALightMinBrightness(coordinator, target_id, entry.entry_id))
entities.append(HALightColorTolerance(coordinator, target_id, entry.entry_id))
continue
# LED target — brightness lives on the device
@@ -56,7 +57,10 @@ async def async_setup_entry(
entities.append(
WLEDScreenControllerBrightness(
coordinator, target_id, device_id, entry.entry_id,
coordinator,
target_id,
device_id,
entry.entry_id,
)
)
@@ -117,53 +121,113 @@ class WLEDScreenControllerBrightness(CoordinatorEntity, NumberEntity):
await self.coordinator.set_brightness(self._device_id, int(value))
class WLEDScreenControllerKCBrightness(CoordinatorEntity, NumberEntity):
"""Brightness control for a Key Colors target."""
# --- HA Light target number entities ---
class _HALightNumberBase(CoordinatorEntity, NumberEntity):
"""Base class for HA Light target number entities."""
_attr_has_entity_name = True
_attr_native_min_value = 0
_attr_native_max_value = 255
_attr_native_step = 1
_attr_mode = NumberMode.SLIDER
_attr_icon = "mdi:brightness-6"
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
target_id: str,
entry_id: str,
*,
field_name: str,
) -> None:
"""Initialize the KC brightness number."""
super().__init__(coordinator)
self._target_id = target_id
self._entry_id = entry_id
self._attr_unique_id = f"{target_id}_brightness"
self._attr_translation_key = "brightness"
self._field_name = field_name
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {"identifiers": {(DOMAIN, self._target_id)}}
@property
def native_value(self) -> float | None:
"""Return the current brightness value (0-255)."""
if not self.coordinator.data:
return None
target_data = self.coordinator.data.get("targets", {}).get(self._target_id)
target_data = self._get_target_data()
if not target_data:
return None
kc_settings = target_data.get("info", {}).get("key_colors_settings") or {}
brightness_float = kc_settings.get("brightness", 1.0)
return round(brightness_float * 255)
return target_data.get("info", {}).get(self._field_name)
@property
def available(self) -> bool:
"""Return if entity is available."""
if not self.coordinator.data:
return False
return self._target_id in self.coordinator.data.get("targets", {})
return self._get_target_data() is not None
async def async_set_native_value(self, value: float) -> None:
"""Set brightness value."""
await self.coordinator.set_kc_brightness(self._target_id, int(value))
await self.coordinator.update_target(self._target_id, **{self._field_name: round(value, 2)})
def _get_target_data(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
return self.coordinator.data.get("targets", {}).get(self._target_id)
class HALightUpdateRate(_HALightNumberBase):
"""Update rate (Hz) for an HA Light target."""
_attr_native_min_value = 0.5
_attr_native_max_value = 5.0
_attr_native_step = 0.5
_attr_native_unit_of_measurement = "Hz"
_attr_icon = "mdi:update"
def __init__(
self, coordinator: WLEDScreenControllerCoordinator, target_id: str, entry_id: str
) -> None:
super().__init__(coordinator, target_id, entry_id, field_name="update_rate")
self._attr_unique_id = f"{target_id}_update_rate"
self._attr_translation_key = "ha_light_update_rate"
class HALightTransition(_HALightNumberBase):
"""Transition time (seconds) for an HA Light target."""
_attr_native_min_value = 0.0
_attr_native_max_value = 10.0
_attr_native_step = 0.1
_attr_native_unit_of_measurement = "s"
_attr_icon = "mdi:transition-masked"
def __init__(
self, coordinator: WLEDScreenControllerCoordinator, target_id: str, entry_id: str
) -> None:
super().__init__(coordinator, target_id, entry_id, field_name="transition")
self._attr_unique_id = f"{target_id}_transition"
self._attr_translation_key = "ha_light_transition"
class HALightMinBrightness(_HALightNumberBase):
"""Minimum brightness threshold for an HA Light target."""
_attr_native_min_value = 0
_attr_native_max_value = 255
_attr_native_step = 1
_attr_icon = "mdi:brightness-4"
def __init__(
self, coordinator: WLEDScreenControllerCoordinator, target_id: str, entry_id: str
) -> None:
super().__init__(coordinator, target_id, entry_id, field_name="min_brightness_threshold")
self._attr_unique_id = f"{target_id}_min_brightness"
self._attr_translation_key = "ha_light_min_brightness"
class HALightColorTolerance(_HALightNumberBase):
"""Color tolerance (RGB delta skip threshold) for an HA Light target."""
_attr_native_min_value = 0
_attr_native_max_value = 50
_attr_native_step = 1
_attr_icon = "mdi:palette-outline"
def __init__(
self, coordinator: WLEDScreenControllerCoordinator, target_id: str, entry_id: str
) -> None:
super().__init__(coordinator, target_id, entry_id, field_name="color_tolerance")
self._attr_unique_id = f"{target_id}_color_tolerance"
self._attr_translation_key = "ha_light_color_tolerance"
@@ -1,4 +1,5 @@
"""Select platform for LED Screen Controller (CSS source & brightness source)."""
from __future__ import annotations
import logging
@@ -10,12 +11,12 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, DATA_COORDINATOR, TARGET_TYPE_KEY_COLORS
from .const import DOMAIN, DATA_COORDINATOR, TARGET_TYPE_HA_LIGHT
from .coordinator import WLEDScreenControllerCoordinator
_LOGGER = logging.getLogger(__name__)
NONE_OPTION = "— None —"
NONE_OPTION = "\u2014 None \u2014"
async def async_setup_entry(
@@ -31,23 +32,20 @@ async def async_setup_entry(
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
info = target_data["info"]
target_type = info.get("target_type", "led")
# Only LED targets
if info.get("target_type") == TARGET_TYPE_KEY_COLORS:
continue
# Both LED and HA Light targets have a CSS source
entities.append(CSSSourceSelect(coordinator, target_id, entry.entry_id))
entities.append(
CSSSourceSelect(coordinator, target_id, entry.entry_id)
)
entities.append(
BrightnessSourceSelect(coordinator, target_id, entry.entry_id)
)
# Only LED targets have a brightness value source
if target_type != TARGET_TYPE_HA_LIGHT:
entities.append(BrightnessSourceSelect(coordinator, target_id, entry.entry_id))
async_add_entities(entities)
class CSSSourceSelect(CoordinatorEntity, SelectEntity):
"""Select entity for choosing a color strip source for an LED target."""
"""Select entity for choosing a color strip source for a target."""
_attr_has_entity_name = True
_attr_icon = "mdi:palette"
@@ -100,9 +98,7 @@ class CSSSourceSelect(CoordinatorEntity, SelectEntity):
if source_id is None:
_LOGGER.error("CSS source not found: %s", option)
return
await self.coordinator.update_target(
self._target_id, color_strip_source_id=source_id
)
await self.coordinator.update_target(self._target_id, color_strip_source_id=source_id)
def _name_to_id_map(self) -> dict[str, str]:
sources = (self.coordinator.data or {}).get("css_sources") or []
@@ -165,13 +161,10 @@ class BrightnessSourceSelect(CoordinatorEntity, SelectEntity):
source_id = ""
else:
name_map = {
s["name"]: s["id"]
for s in (self.coordinator.data or {}).get("value_sources") or []
s["name"]: s["id"] for s in (self.coordinator.data or {}).get("value_sources") or []
}
source_id = name_map.get(option)
if source_id is None:
_LOGGER.error("Value source not found: %s", option)
return
await self.coordinator.update_target(
self._target_id, brightness_value_source_id=source_id
)
await self.coordinator.update_target(self._target_id, brightness_value_source_id=source_id)
@@ -1,8 +1,8 @@
"""Sensor platform for LED Screen Controller."""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any
from homeassistant.components.sensor import (
@@ -17,12 +17,10 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DOMAIN,
TARGET_TYPE_KEY_COLORS,
TARGET_TYPE_HA_LIGHT,
DATA_COORDINATOR,
DATA_WS_MANAGER,
)
from .coordinator import WLEDScreenControllerCoordinator
from .ws_manager import KeyColorsWebSocketManager
_LOGGER = logging.getLogger(__name__)
@@ -35,34 +33,19 @@ async def async_setup_entry(
"""Set up LED Screen Controller sensors."""
data = hass.data[DOMAIN][entry.entry_id]
coordinator: WLEDScreenControllerCoordinator = data[DATA_COORDINATOR]
ws_manager: KeyColorsWebSocketManager = data[DATA_WS_MANAGER]
entities: list[SensorEntity] = []
if coordinator.data and "targets" in coordinator.data:
for target_id, target_data in coordinator.data["targets"].items():
entities.append(WLEDScreenControllerFPSSensor(coordinator, target_id, entry.entry_id))
entities.append(
WLEDScreenControllerFPSSensor(coordinator, target_id, entry.entry_id)
)
entities.append(
WLEDScreenControllerStatusSensor(
coordinator, target_id, entry.entry_id
)
WLEDScreenControllerStatusSensor(coordinator, target_id, entry.entry_id)
)
# Add color sensors for Key Colors targets
# Add mapped lights sensor for HA Light targets
info = target_data["info"]
if info.get("target_type") == TARGET_TYPE_KEY_COLORS:
rectangles = target_data.get("rectangles", [])
for rect in rectangles:
entities.append(
WLEDScreenControllerColorSensor(
coordinator=coordinator,
ws_manager=ws_manager,
target_id=target_id,
rectangle_name=rect["name"],
entry_id=entry.entry_id,
)
)
if info.get("target_type") == TARGET_TYPE_HA_LIGHT:
entities.append(HALightMappedLightsSensor(coordinator, target_id, entry.entry_id))
async_add_entities(entities)
@@ -177,79 +160,58 @@ class WLEDScreenControllerStatusSensor(CoordinatorEntity, SensorEntity):
return self.coordinator.data.get("targets", {}).get(self._target_id)
class WLEDScreenControllerColorSensor(CoordinatorEntity, SensorEntity):
"""Color sensor reporting the extracted screen color for a Key Colors rectangle."""
class HALightMappedLightsSensor(CoordinatorEntity, SensorEntity):
"""Sensor showing the number of mapped HA lights for an HA Light target."""
_attr_has_entity_name = True
_attr_icon = "mdi:palette"
_attr_icon = "mdi:lightbulb-group"
def __init__(
self,
coordinator: WLEDScreenControllerCoordinator,
ws_manager: KeyColorsWebSocketManager,
target_id: str,
rectangle_name: str,
entry_id: str,
) -> None:
"""Initialize the color sensor."""
"""Initialize the sensor."""
super().__init__(coordinator)
self._target_id = target_id
self._rectangle_name = rectangle_name
self._ws_manager = ws_manager
self._entry_id = entry_id
self._unregister_ws: Callable[[], None] | None = None
sanitized = rectangle_name.lower().replace(" ", "_").replace("-", "_")
self._attr_unique_id = f"{target_id}_{sanitized}_color"
self._attr_translation_key = "rectangle_color"
self._attr_translation_placeholders = {"rectangle_name": rectangle_name}
self._attr_unique_id = f"{target_id}_mapped_lights"
self._attr_translation_key = "mapped_lights"
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {"identifiers": {(DOMAIN, self._target_id)}}
async def async_added_to_hass(self) -> None:
"""Register WS callback when entity is added."""
await super().async_added_to_hass()
self._unregister_ws = self._ws_manager.register_callback(
self._target_id, self._handle_color_update
)
async def async_will_remove_from_hass(self) -> None:
"""Unregister WS callback when entity is removed."""
if self._unregister_ws:
self._unregister_ws()
self._unregister_ws = None
await super().async_will_remove_from_hass()
def _handle_color_update(self, colors: dict) -> None:
"""Handle incoming color update from WebSocket."""
if self._rectangle_name in colors:
self.async_write_ha_state()
@property
def native_value(self) -> str | None:
"""Return the hex color string (e.g. #FF8800)."""
color = self._get_color()
if color is None:
def native_value(self) -> int | None:
"""Return the number of mapped lights."""
target_data = self._get_target_data()
if not target_data:
return None
return f"#{color['r']:02X}{color['g']:02X}{color['b']:02X}"
mappings = target_data.get("info", {}).get("light_mappings", [])
return len(mappings)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return r, g, b, brightness as attributes."""
color = self._get_color()
if color is None:
"""Return light mapping details as attributes."""
target_data = self._get_target_data()
if not target_data:
return {}
r, g, b = color["r"], color["g"], color["b"]
brightness = int(0.299 * r + 0.587 * g + 0.114 * b)
mappings = target_data.get("info", {}).get("light_mappings", [])
entity_ids = [m.get("entity_id", "") for m in mappings]
return {
"r": r,
"g": g,
"b": b,
"brightness": brightness,
"rgb_color": [r, g, b],
"entity_ids": entity_ids,
"mappings": [
{
"entity_id": m.get("entity_id", ""),
"led_start": m.get("led_start", 0),
"led_end": m.get("led_end", -1),
"brightness_scale": m.get("brightness_scale", 1.0),
}
for m in mappings
],
}
@property
@@ -257,16 +219,6 @@ class WLEDScreenControllerColorSensor(CoordinatorEntity, SensorEntity):
"""Return if entity is available."""
return self._get_target_data() is not None
def _get_color(self) -> dict[str, int] | None:
"""Get the current color for this rectangle from WS manager."""
target_data = self._get_target_data()
if not target_data or not target_data.get("state"):
return None
if not target_data["state"].get("processing"):
return None
colors = self._ws_manager.get_latest_colors(self._target_id)
return colors.get(self._rectangle_name)
def _get_target_data(self) -> dict[str, Any] | None:
if not self.coordinator.data:
return None
@@ -54,13 +54,25 @@
"unavailable": "Unavailable"
}
},
"rectangle_color": {
"name": "{rectangle_name} Color"
"mapped_lights": {
"name": "Mapped Lights"
}
},
"number": {
"brightness": {
"name": "Brightness"
},
"ha_light_update_rate": {
"name": "Update Rate"
},
"ha_light_transition": {
"name": "Transition"
},
"ha_light_min_brightness": {
"name": "Min Brightness"
},
"ha_light_color_tolerance": {
"name": "Color Tolerance"
}
},
"select": {
@@ -54,13 +54,25 @@
"unavailable": "Unavailable"
}
},
"rectangle_color": {
"name": "{rectangle_name} Color"
"mapped_lights": {
"name": "Mapped Lights"
}
},
"number": {
"brightness": {
"name": "Brightness"
},
"ha_light_update_rate": {
"name": "Update Rate"
},
"ha_light_transition": {
"name": "Transition"
},
"ha_light_min_brightness": {
"name": "Min Brightness"
},
"ha_light_color_tolerance": {
"name": "Color Tolerance"
}
},
"select": {
@@ -54,13 +54,25 @@
"unavailable": "Недоступен"
}
},
"rectangle_color": {
"name": "{rectangle_name} Цвет"
"mapped_lights": {
"name": "Привязанные светильники"
}
},
"number": {
"brightness": {
"name": "Яркость"
},
"ha_light_update_rate": {
"name": "Частота обновления"
},
"ha_light_transition": {
"name": "Переход"
},
"ha_light_min_brightness": {
"name": "Мин. яркость"
},
"ha_light_color_tolerance": {
"name": "Допуск цвета"
}
},
"select": {
@@ -1,136 +0,0 @@
"""WebSocket connection manager for Key Colors target color streams."""
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
from collections.abc import Callable
from typing import Any
import aiohttp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import WS_RECONNECT_DELAY, WS_MAX_RECONNECT_DELAY
_LOGGER = logging.getLogger(__name__)
class KeyColorsWebSocketManager:
"""Manages WebSocket connections for Key Colors target color streams."""
def __init__(
self,
hass: HomeAssistant,
server_url: str,
api_key: str,
) -> None:
self._hass = hass
self._server_url = server_url
self._api_key = api_key
self._connections: dict[str, asyncio.Task] = {}
self._callbacks: dict[str, list[Callable]] = {}
self._latest_colors: dict[str, dict[str, dict[str, int]]] = {}
self._shutting_down = False
def _get_ws_url(self, target_id: str) -> str:
"""Build WebSocket URL for a target."""
ws_base = self._server_url.replace("http://", "ws://").replace(
"https://", "wss://"
)
return f"{ws_base}/api/v1/output-targets/{target_id}/ws?token={self._api_key}"
async def start_listening(self, target_id: str) -> None:
"""Start WebSocket connection for a target."""
if target_id in self._connections:
return
task = self._hass.async_create_background_task(
self._ws_loop(target_id),
f"wled_screen_controller_ws_{target_id}",
)
self._connections[target_id] = task
async def stop_listening(self, target_id: str) -> None:
"""Stop WebSocket connection for a target."""
task = self._connections.pop(target_id, None)
if task:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
self._latest_colors.pop(target_id, None)
def register_callback(
self, target_id: str, callback: Callable
) -> Callable[[], None]:
"""Register a callback for color updates. Returns unregister function."""
self._callbacks.setdefault(target_id, []).append(callback)
def unregister() -> None:
cbs = self._callbacks.get(target_id)
if cbs and callback in cbs:
cbs.remove(callback)
return unregister
def get_latest_colors(self, target_id: str) -> dict[str, dict[str, int]]:
"""Get latest colors for a target."""
return self._latest_colors.get(target_id, {})
async def _ws_loop(self, target_id: str) -> None:
"""WebSocket connection loop with reconnection."""
delay = WS_RECONNECT_DELAY
session = async_get_clientsession(self._hass)
while not self._shutting_down:
try:
url = self._get_ws_url(target_id)
async with session.ws_connect(url) as ws:
delay = WS_RECONNECT_DELAY # reset on successful connect
_LOGGER.debug("WS connected for target %s", target_id)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
self._handle_message(target_id, msg.data)
elif msg.type in (
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR,
):
break
except asyncio.CancelledError:
raise
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as err:
_LOGGER.debug("WS connection error for %s: %s", target_id, err)
except Exception as err:
_LOGGER.error("Unexpected WS error for %s: %s", target_id, err)
if self._shutting_down:
break
await asyncio.sleep(delay)
delay = min(delay * 2, WS_MAX_RECONNECT_DELAY)
def _handle_message(self, target_id: str, raw: str) -> None:
"""Handle incoming WebSocket message."""
try:
data = json.loads(raw)
except json.JSONDecodeError:
return
if data.get("type") != "colors_update":
return
colors: dict[str, Any] = data.get("colors", {})
self._latest_colors[target_id] = colors
for cb in self._callbacks.get(target_id, []):
try:
cb(colors)
except Exception:
_LOGGER.exception("Error in WS color callback for %s", target_id)
async def shutdown(self) -> None:
"""Stop all WebSocket connections."""
self._shutting_down = True
for target_id in list(self._connections):
await self.stop_listening(target_id)
+1
View File
@@ -47,6 +47,7 @@ dependencies = [
"openrgb-python>=0.2.15",
"opencv-python-headless>=4.8.0",
"websockets>=13.0",
"just-playback>=0.1.7",
]
[project.optional-dependencies]
@@ -62,6 +62,7 @@ def _target_to_response(target) -> OutputTargetResponse:
target_type=target.target_type,
ha_source_id=target.ha_source_id,
color_strip_source_id=target.color_strip_source_id,
brightness_value_source_id=target.brightness_value_source_id or "",
ha_light_mappings=[
HALightMappingSchema(
entity_id=m.entity_id,
@@ -26,6 +26,7 @@ class HALightTargetProcessor(TargetProcessor):
target_id: str,
ha_source_id: str,
color_strip_source_id: str = "",
brightness_value_source_id: str = "",
light_mappings: Optional[List[HALightMapping]] = None,
update_rate: float = 2.0,
transition: float = 0.5,
@@ -36,6 +37,7 @@ class HALightTargetProcessor(TargetProcessor):
super().__init__(target_id, ctx)
self._ha_source_id = ha_source_id
self._css_id = color_strip_source_id
self._brightness_vs_id = brightness_value_source_id
self._light_mappings = light_mappings or []
self._update_rate = max(0.5, min(5.0, update_rate))
self._transition = transition
@@ -45,6 +47,7 @@ class HALightTargetProcessor(TargetProcessor):
# Runtime state
self._css_stream = None
self._ha_runtime = None
self._value_stream = None # brightness value source stream
self._previous_colors: Dict[str, Tuple[int, int, int]] = {}
self._previous_on: Dict[str, bool] = {} # track on/off state per entity
self._start_time: Optional[float] = None
@@ -76,6 +79,14 @@ class HALightTargetProcessor(TargetProcessor):
except Exception as e:
logger.warning(f"HA light {self._target_id}: failed to acquire HA runtime: {e}")
# Acquire brightness value stream (if configured)
if self._brightness_vs_id and self._ctx.value_stream_manager:
try:
self._value_stream = self._ctx.value_stream_manager.acquire(self._brightness_vs_id)
except Exception as e:
logger.warning(f"HA light {self._target_id}: failed to acquire brightness VS: {e}")
self._value_stream = None
self._is_running = True
self._start_time = time.monotonic()
self._task = asyncio.create_task(self._processing_loop())
@@ -99,6 +110,14 @@ class HALightTargetProcessor(TargetProcessor):
pass
self._css_stream = None
# Release brightness value stream
if self._value_stream is not None and self._ctx.value_stream_manager:
try:
self._ctx.value_stream_manager.release(self._brightness_vs_id)
except Exception:
pass
self._value_stream = None
# Release HA runtime
if self._ha_runtime:
try:
@@ -201,6 +220,14 @@ class HALightTargetProcessor(TargetProcessor):
"""Average LED segments and call HA services for changed lights."""
led_count = len(colors)
# Get brightness multiplier from value source (1.0 if not configured)
vs_multiplier = 1.0
if self._value_stream is not None:
try:
vs_multiplier = self._value_stream.get_value()
except Exception:
vs_multiplier = 1.0
for mapping in self._light_mappings:
if not mapping.entity_id:
continue
@@ -220,9 +247,10 @@ class HALightTargetProcessor(TargetProcessor):
# Calculate brightness (0-255) from max channel
brightness = max(r, g, b)
# Apply brightness scale
if mapping.brightness_scale < 1.0:
brightness = int(brightness * mapping.brightness_scale)
# Apply brightness scale and value source multiplier
eff_scale = mapping.brightness_scale * vs_multiplier
if eff_scale < 1.0:
brightness = int(brightness * eff_scale)
# Check brightness threshold
should_be_on = (
@@ -456,6 +456,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
target_id: str,
ha_source_id: str,
color_strip_source_id: str = "",
brightness_value_source_id: str = "",
light_mappings=None,
update_rate: float = 2.0,
transition: float = 0.5,
@@ -472,6 +473,7 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
target_id=target_id,
ha_source_id=ha_source_id,
color_strip_source_id=color_strip_source_id,
brightness_value_source_id=brightness_value_source_id,
light_mappings=light_mappings or [],
update_rate=update_rate,
transition=transition,
@@ -144,9 +144,10 @@ function _updateRunningMetrics(enrichedRunning: any[]): void {
for (const target of enrichedRunning) {
const state = target.state || {};
const metrics = target.metrics || {};
const fpsCurrent = state.fps_current ?? 0;
const fpsActual = state.fps_actual != null ? state.fps_actual.toFixed(1) : '-';
const fpsTarget = state.fps_target || (target.settings || target.key_colors_settings || {}).fps || '-';
const isHA = target.target_type === 'ha_light';
const fpsTarget = state.fps_target || (target.settings || {}).fps || target.update_rate || '-';
const fpsCurrent = isHA ? fpsTarget : (state.fps_current ?? 0);
const fpsActual = isHA ? String(fpsTarget) : (state.fps_actual != null ? state.fps_actual.toFixed(1) : '-');
const errors = metrics.errors_count || 0;
// Push FPS and update chart
@@ -545,18 +546,21 @@ function renderDashboardTarget(target: any, isRunning: boolean, devicesMap: Reco
const state = target.state || {};
const metrics = target.metrics || {};
const isLed = target.target_type === 'led' || target.target_type === 'wled';
const isHALight = target.target_type === 'ha_light';
const icon = ICON_TARGET;
const typeLabel = isLed ? t('dashboard.type.led') : t('dashboard.type.kc');
const navSubTab = isLed ? 'led-targets' : 'kc-targets';
const navSection = isLed ? 'led-targets' : 'kc-targets';
const navAttr = isLed ? 'data-target-id' : 'data-kc-target-id';
const typeLabel = isLed ? t('dashboard.type.led') : isHALight ? t('ha_light.section.title') : t('dashboard.type.kc');
const navSubTab = isHALight ? 'ha-light-targets' : 'led-targets';
const navSection = isHALight ? 'ha-light-targets' : 'led-targets';
const navAttr = isHALight ? 'data-ha-target-id' : 'data-target-id';
const navOnclick = `if(!event.target.closest('button')){navigateToCard('targets','${navSubTab}','${navSection}','${navAttr}','${target.id}')}`;
let subtitleParts = [typeLabel];
if (isLed) {
const device = target.device_id ? devicesMap[target.device_id] : null;
if (device) {
subtitleParts.push((device.device_type || '').toUpperCase());
if (isLed || isHALight) {
if (isLed) {
const device = target.device_id ? devicesMap[target.device_id] : null;
if (device) {
subtitleParts.push((device.device_type || '').toUpperCase());
}
}
const cssId = target.color_strip_source_id || '';
if (cssId) {
@@ -568,9 +572,9 @@ function renderDashboardTarget(target: any, isRunning: boolean, devicesMap: Reco
}
if (isRunning) {
const fpsCurrent = state.fps_current ?? 0;
const fpsActual = state.fps_actual != null ? state.fps_actual.toFixed(1) : '-';
const fpsTarget = state.fps_target || (target.settings || target.key_colors_settings || {}).fps || '-';
const fpsTarget = state.fps_target || (target.settings || {}).fps || target.update_rate || '-';
const fpsCurrent = isHALight ? fpsTarget : (state.fps_current ?? 0);
const fpsActual = isHALight ? String(fpsTarget) : (state.fps_actual != null ? state.fps_actual.toFixed(1) : '-');
const uptime = formatUptime(metrics.uptime_seconds);
const errors = metrics.errors_count || 0;
@@ -347,7 +347,8 @@ export async function saveHALightEditor(): Promise<void> {
const haSourceId = (document.getElementById('ha-light-editor-ha-source') as HTMLSelectElement).value;
const cssSourceId = (document.getElementById('ha-light-editor-css-source') as HTMLSelectElement).value;
const updateRate = parseFloat((document.getElementById('ha-light-editor-update-rate') as HTMLInputElement).value) || 2.0;
const transition = parseFloat((document.getElementById('ha-light-editor-transition') as HTMLInputElement).value) || 0.5;
const transitionRaw = parseFloat((document.getElementById('ha-light-editor-transition') as HTMLInputElement).value);
const transition = isNaN(transitionRaw) ? 0.5 : transitionRaw;
const description = (document.getElementById('ha-light-editor-description') as HTMLInputElement).value.trim() || null;
if (!name) {
@@ -45,6 +45,7 @@ class HALightOutputTarget(OutputTarget):
ha_source_id: str = "" # references HomeAssistantSource
color_strip_source_id: str = "" # CSS providing the colors
brightness_value_source_id: str = "" # dynamic brightness multiplier
light_mappings: List[HALightMapping] = field(default_factory=list)
update_rate: float = 2.0 # Hz (calls per second, 0.5-5.0)
transition: float = 0.5 # HA transition seconds (smooth fade between colors)
@@ -58,6 +59,7 @@ class HALightOutputTarget(OutputTarget):
target_id=self.id,
ha_source_id=self.ha_source_id,
color_strip_source_id=self.color_strip_source_id,
brightness_value_source_id=self.brightness_value_source_id,
light_mappings=self.light_mappings,
update_rate=self.update_rate,
transition=self.transition,
@@ -96,6 +98,7 @@ class HALightOutputTarget(OutputTarget):
name=None,
ha_source_id=None,
color_strip_source_id=None,
brightness_value_source_id=None,
light_mappings=None,
update_rate=None,
transition=None,
@@ -113,6 +116,10 @@ class HALightOutputTarget(OutputTarget):
self.color_strip_source_id = _resolve_ref(
color_strip_source_id, self.color_strip_source_id
)
if brightness_value_source_id is not None:
self.brightness_value_source_id = _resolve_ref(
brightness_value_source_id, self.brightness_value_source_id
)
if light_mappings is not None:
self.light_mappings = light_mappings
if update_rate is not None:
@@ -128,6 +135,7 @@ class HALightOutputTarget(OutputTarget):
d = super().to_dict()
d["ha_source_id"] = self.ha_source_id
d["color_strip_source_id"] = self.color_strip_source_id
d["brightness_value_source_id"] = self.brightness_value_source_id
d["light_mappings"] = [m.to_dict() for m in self.light_mappings]
d["update_rate"] = self.update_rate
d["transition"] = self.transition
@@ -144,6 +152,7 @@ class HALightOutputTarget(OutputTarget):
target_type="ha_light",
ha_source_id=data.get("ha_source_id", ""),
color_strip_source_id=data.get("color_strip_source_id", ""),
brightness_value_source_id=data.get("brightness_value_source_id", ""),
light_mappings=mappings,
update_rate=data.get("update_rate", 2.0),
transition=data.get("transition", 0.5),
+34 -155
View File
@@ -1,14 +1,11 @@
"""Cross-platform asynchronous sound playback for notification alerts.
Windows: uses winsound.PlaySound (stdlib, no dependencies).
Linux: uses paplay (PulseAudio) or aplay (ALSA) via subprocess.
Uses just_playback (backed by miniaudio) for MP3/WAV/OGG/FLAC playback
with native volume control on all platforms.
All playback is fire-and-forget on a background thread. A new notification
sound cancels any currently playing sound to prevent overlap.
A new notification sound cancels any currently playing sound to prevent overlap.
"""
import subprocess
import sys
import threading
from pathlib import Path
@@ -16,133 +13,29 @@ from wled_controller.utils import get_logger
logger = get_logger(__name__)
# Lock + handle for cancelling previous sound
_play_lock = threading.Lock()
_current_process: subprocess.Popen | None = None
# Hold reference to SND_MEMORY buffer to prevent GC during async playback
_win_sound_buf: bytes | None = None
_lock = threading.Lock()
_playback = None # lazy-init on first use
def _scale_wav_volume(file_path: Path, volume: float) -> bytes | None:
"""Read a WAV file and return a volume-scaled WAV as bytes.
def _get_playback():
"""Lazy-init the Playback singleton (import is heavy, defer until needed)."""
global _playback
if _playback is None:
from just_playback import Playback
Uses stdlib wave + struct to scale PCM samples in memory.
Returns None on error or if the format is unsupported.
"""
import io
import struct
import wave
try:
with wave.open(str(file_path), "rb") as wf:
n_channels = wf.getnchannels()
sample_width = wf.getsampwidth()
framerate = wf.getframerate()
n_frames = wf.getnframes()
raw = wf.readframes(n_frames)
except Exception as e:
logger.debug(f"Failed to read WAV for volume scaling: {e}")
return None
if sample_width not in (1, 2):
return None # Only 8-bit and 16-bit PCM supported
# Scale samples
if sample_width == 2:
fmt = f"<{len(raw) // 2}h"
samples = struct.unpack(fmt, raw)
scaled = struct.pack(fmt, *(max(-32768, min(32767, int(s * volume))) for s in samples))
else:
# 8-bit WAV is unsigned, center at 128
samples = struct.unpack(f"{len(raw)}B", raw)
scaled = struct.pack(
f"{len(raw)}B",
*(max(0, min(255, int((s - 128) * volume + 128))) for s in samples),
)
# Write scaled WAV to memory buffer
buf = io.BytesIO()
with wave.open(buf, "wb") as out:
out.setnchannels(n_channels)
out.setsampwidth(sample_width)
out.setframerate(framerate)
out.writeframes(scaled)
return buf.getvalue()
def _play_windows(file_path: Path, volume: float) -> None:
"""Play a WAV file on Windows using winsound with volume scaling."""
import winsound
global _win_sound_buf
try:
if volume < 1.0:
wav_data = _scale_wav_volume(file_path, volume)
if wav_data:
# Keep a global reference so GC doesn't free the buffer
# while async playback is still using it
_win_sound_buf = wav_data
winsound.PlaySound(wav_data, winsound.SND_MEMORY | winsound.SND_ASYNC)
return
# Full volume or fallback: play file directly
_win_sound_buf = None
winsound.PlaySound(str(file_path), winsound.SND_FILENAME | winsound.SND_ASYNC)
except Exception as e:
logger.error(f"winsound playback failed: {e}")
def _play_linux(file_path: Path, volume: float) -> None:
"""Play a sound file on Linux using paplay or aplay."""
global _current_process
# Cancel previous sound
with _play_lock:
if _current_process is not None:
try:
_current_process.terminate()
except OSError as e:
logger.debug("Failed to terminate previous sound process: %s", e)
pass
_current_process = None
try:
# Try paplay first (PulseAudio/PipeWire) — supports volume
pa_volume = max(0, min(65536, int(volume * 65536)))
proc = subprocess.Popen(
["paplay", f"--volume={pa_volume}", str(file_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
try:
# Fallback to aplay (ALSA) — no volume control
proc = subprocess.Popen(
["aplay", "-q", str(file_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
logger.warning("Neither paplay nor aplay found — cannot play notification sound")
return
with _play_lock:
_current_process = proc
# Wait for completion
proc.wait()
with _play_lock:
if _current_process is proc:
_current_process = None
_playback = Playback()
return _playback
def play_sound_async(file_path: Path, volume: float = 1.0) -> None:
"""Play a sound file asynchronously (fire-and-forget).
"""Play a sound file (non-blocking).
Supports WAV, MP3, OGG, FLAC. A new call stops any currently playing sound.
just_playback.play() is inherently non-blocking (miniaudio backend thread).
Args:
file_path: Path to the sound file (.wav).
volume: Volume level 0.0-1.0 (best-effort, not all backends support it).
file_path: Path to the sound file.
volume: Volume level 0.0-1.0.
"""
if not file_path.exists():
logger.warning(f"Sound file not found: {file_path}")
@@ -150,37 +43,23 @@ def play_sound_async(file_path: Path, volume: float = 1.0) -> None:
volume = max(0.0, min(1.0, volume))
if sys.platform == "win32":
player = _play_windows
else:
player = _play_linux
thread = threading.Thread(
target=player,
args=(file_path, volume),
name="sound-player",
daemon=True,
)
thread.start()
with _lock:
try:
pb = _get_playback()
if pb.active:
pb.stop()
pb.load_file(str(file_path))
pb.set_volume(volume)
pb.play()
except Exception as e:
logger.error(f"Sound playback failed: {e}")
def stop_current_sound() -> None:
"""Stop any currently playing notification sound."""
if sys.platform == "win32":
try:
import winsound
winsound.PlaySound(None, winsound.SND_PURGE)
except Exception as e:
logger.debug("Failed to stop winsound playback: %s", e)
pass
else:
with _play_lock:
global _current_process
if _current_process is not None:
try:
_current_process.terminate()
except OSError as e:
logger.debug("Failed to terminate sound process: %s", e)
pass
_current_process = None
with _lock:
if _playback is not None and _playback.active:
try:
_playback.stop()
except Exception as e:
logger.debug("Failed to stop playback: %s", e)