Compare commits

..

6 Commits

Author SHA1 Message Date
alexei.dolgolyov 1ada5ac334 feat(automations): weekday + timezone scheduling for time-of-day rule
Extend the time-of-day condition from a bare server-local HH:MM window to a real
schedule: pick which weekdays it is active (0=Mon..6=Sun, empty = every day) and
an optional IANA timezone (empty = server local). Closes the parity gap where
even a $5 WLED chip has weekday timers.

- Overnight windows (start > end) count toward the day they START on, so the
  after-midnight tail is matched against the previous weekday.
- Timezones are resolved via zoneinfo, cached, and fall back to server-local
  with a one-time warning on an invalid name (the ~1Hz tick never log-spams).
- Backward compatible: new fields default to all-days / server-local, so
  existing automations are unchanged (no migration).
- Frontend: weekday chips + timezone input on the rule editor, day/timezone in
  the rule summary, styles + i18n (en/ru/zh).

10 unit tests (weekday filter, overnight start-day semantics, tz fallback,
round-trip, invalid-day filtering); full suite green (1936 passed).
(Geographic sunrise/sunset triggers are a natural follow-up — the daylight
value source already has the solar math to reuse.)
2026-06-04 23:54:03 +03:00
alexei.dolgolyov e18d56c838 feat(processing): built-in 'look' presets (Cinematic/Vivid/Cozy/Soft/Cool)
Seed five curated, read-only post-processing templates so a non-expert gets
instant good-looking output before discovering the filter pipeline. Each is an
opinionated chain of existing filters (auto-crop/saturation/contrast/colour-
temperature/temporal-blur) tuned for a use case (films, games, evening ambience,
low-flicker, crisp cool-white).

Mirrors the built-in-gradient pattern: adds is_builtin to PostprocessingTemplate,
seeds missing looks on store init (idempotent, additive — no migration), and
makes built-ins read-only (update/delete raise -> 400; clone to customise).
Surfaced via the existing template picker + is_builtin in the response/type.

7 unit tests (seeding, idempotency, read-only protection, round-trip); full
suite green (1926 passed). (A runtime intensity slider is a follow-up — it needs
a filter-chain parameterisation layer.)
2026-06-04 23:43:11 +03:00
alexei.dolgolyov 7728aecb4f feat(wled): native realtime UDP output (DRGB/DRGBW/DNRGB) with auto-revert
Add WLED's native realtime UDP protocol (port 21324) as a third output mode for
LED targets, alongside DDP and HTTP. For the device LedGrab drives most, this
brings three user-visible wins DDP lacks:

- Auto-revert: every packet carries a timeout byte, so if the stream stops
  (host hiccup/sleep/crash) WLED returns to its preset instead of freezing on
  the last frame.
- Correct RGBW whites: the DRGBW variant carries an explicit white channel.
- Lighter on weak Wi-Fi: raw RGB with a 2-byte header.

New WledRealtimeClient auto-selects DRGB (<=490), DRGBW (<=367), or chunked
DNRGB (>490). WLED applies its own per-bus colour order in realtime mode, so we
send plain RGB and the user's colour-order config just works. Protocol 'udp' is
threaded through WLEDConfig/provider/processor and the schema pattern; the
target editor gains a protocol option + badge + i18n (en/ru/zh).

8 unit tests for the packet builder; full suite green (1919 passed).
2026-06-04 23:34:26 +03:00
alexei.dolgolyov e28ab5a956 Merge feat/power-budget-abl: automatic brightness limiting (ABL) / power budget 2026-06-04 23:22:18 +03:00
alexei.dolgolyov 1e395fd09e Merge fix/verified-bugs: weak default key, broken MQTT route, scene brightness sync 2026-06-04 23:22:18 +03:00
alexei.dolgolyov ffee156c17 feat(targets): automatic brightness limiting (ABL) / per-LED power budget
Cap an addressable strip's estimated current draw to a PSU budget so bright/
white scenes can't brown out an under-spec'd supply (voltage sag -> red/orange
shift, flicker, controller resets) — a classic 'it's broken' first impression.

- New core/processing/power_limit.py: pure current estimate (full white over N
  LEDs draws N * mA_per_led) + a (0,1] scale to land a frame on budget.
- Applied in WledTargetProcessor._send_to_device (single choke point, every send
  path; scales into a reusable scratch buffer, never mutates shared frames).
- Two per-target fields on LED targets: max_milliamps (0 = unlimited) and
  milliamps_per_led (default 55), threaded through model/store/manager/processor/
  schema/route with hot-update via update_target_settings. Additive with safe
  defaults (no data migration needed; legacy targets read as unlimited).
- Frontend: editor fields + i18n (en/ru/zh) + LedOutputTarget type.
- Tests: 10 unit tests for the estimator/scale; full suite green (1911 passed).
2026-06-04 22:56:50 +03:00
32 changed files with 1032 additions and 30 deletions
@@ -52,6 +52,8 @@ def _rule_from_schema(s: RuleSchema) -> Rule:
"time_of_day": lambda: TimeOfDayRule(
start_time=s.start_time or "00:00",
end_time=s.end_time or "23:59",
days_of_week=s.days_of_week or [],
timezone=s.timezone or "",
),
"system_idle": lambda: SystemIdleRule(
idle_minutes=s.idle_minutes if s.idle_minutes is not None else 5,
@@ -70,6 +70,8 @@ def _led_target_to_response(target: WledOutputTarget) -> LedOutputTargetResponse
min_brightness_threshold=target.min_brightness_threshold.to_dict(),
adaptive_fps=target.adaptive_fps,
protocol=target.protocol,
max_milliamps=target.max_milliamps,
milliamps_per_led=target.milliamps_per_led,
description=target.description,
tags=target.tags,
icon=getattr(target, "icon", "") or "",
@@ -302,6 +304,8 @@ async def create_target(
min_brightness_threshold=data.min_brightness_threshold,
adaptive_fps=data.adaptive_fps,
protocol=data.protocol,
max_milliamps=data.max_milliamps,
milliamps_per_led=data.milliamps_per_led,
)
case HALightOutputTargetCreate():
if data.source_kind == "color_vs":
@@ -464,6 +468,8 @@ async def update_target(
min_brightness_threshold=data.min_brightness_threshold,
adaptive_fps=data.adaptive_fps,
protocol=data.protocol,
max_milliamps=data.max_milliamps,
milliamps_per_led=data.milliamps_per_led,
)
css_changed = data.color_strip_source_id is not None
brightness_changed = data.brightness is not None
@@ -476,6 +482,8 @@ async def update_target(
data.min_brightness_threshold,
data.adaptive_fps,
data.brightness,
data.max_milliamps,
data.milliamps_per_led,
)
)
device_changed = data.device_id is not None
@@ -51,6 +51,7 @@ def _pp_template_to_response(t) -> PostprocessingTemplateResponse:
tags=t.tags,
icon=getattr(t, "icon", "") or "",
icon_color=getattr(t, "icon_color", "") or "",
is_builtin=getattr(t, "is_builtin", False),
)
@@ -30,6 +30,14 @@ class RuleSchema(BaseModel):
# Time-of-day rule fields
start_time: str | None = Field(None, description="Start time HH:MM (for time_of_day rule)")
end_time: str | None = Field(None, description="End time HH:MM (for time_of_day rule)")
days_of_week: list[int] | None = Field(
None,
description="Active weekdays for time_of_day rule (0=Mon..6=Sun). Empty/null = every day.",
)
timezone: str | None = Field(
None,
description="IANA timezone for time_of_day rule (e.g. 'Europe/Berlin'). Empty = server local.",
)
# System idle rule fields
idle_minutes: int | None = Field(
None, description="Idle timeout in minutes (for system_idle rule)"
@@ -91,7 +91,11 @@ class LedOutputTargetResponse(_OutputTargetResponseBase):
adaptive_fps: bool = Field(
default=False, description="Auto-reduce FPS when device is unresponsive"
)
protocol: str = Field(default="ddp", description="Send protocol (ddp or http)")
protocol: str = Field(default="ddp", description="Send protocol (ddp, udp, or http)")
max_milliamps: int = Field(
default=0, description="ABL: PSU current budget in mA (0 = unlimited)"
)
milliamps_per_led: int = Field(default=55, description="ABL: full-white draw of one LED in mA")
class HALightOutputTargetResponse(_OutputTargetResponseBase):
@@ -233,8 +237,20 @@ class LedOutputTargetCreate(_OutputTargetCreateBase):
)
protocol: str = Field(
default="ddp",
pattern="^(ddp|http)$",
description="Send protocol: ddp (UDP) or http (JSON API)",
pattern="^(ddp|http|udp)$",
description="Send protocol: ddp (DDP/UDP), udp (WLED native realtime UDP), or http (JSON API)",
)
max_milliamps: int = Field(
default=0,
ge=0,
le=200000,
description="Automatic brightness limiting: PSU current budget in mA (0 = unlimited)",
)
milliamps_per_led: int = Field(
default=55,
ge=1,
le=200,
description="ABL: estimated full-white draw of a single LED, in mA",
)
@@ -370,7 +386,15 @@ class LedOutputTargetUpdate(_OutputTargetUpdateBase):
None, description="Auto-reduce FPS when device is unresponsive"
)
protocol: str | None = Field(
None, pattern="^(ddp|http)$", description="Send protocol: ddp (UDP) or http (JSON API)"
None,
pattern="^(ddp|http|udp)$",
description="Send protocol: ddp (DDP/UDP), udp (WLED native realtime UDP), or http (JSON API)",
)
max_milliamps: int | None = Field(
None, ge=0, le=200000, description="ABL: PSU current budget in mA (0 = unlimited)"
)
milliamps_per_led: int | None = Field(
None, ge=1, le=200, description="ABL: full-white draw of one LED in mA"
)
@@ -70,6 +70,7 @@ class PostprocessingTemplateResponse(BaseModel):
max_length=32,
description="Optional CSS color override for the icon. Empty/null inherits the channel accent.",
)
is_builtin: bool = Field(default=False, description="True for read-only curated 'look' presets")
class PostprocessingTemplateListResponse(BaseModel):
@@ -26,6 +26,33 @@ from ledgrab.utils import get_logger
logger = get_logger(__name__)
# Cache resolved IANA timezones (and remember invalid names) so the ~1 Hz
# automation tick neither re-parses tzdata nor log-spams on a bad name.
_TZ_CACHE: Dict[str, object] = {}
_TZ_WARNED: set = set()
def _now_in_tz(tz_name: str) -> datetime:
"""Current local time, in ``tz_name`` (IANA) if given, else the server's."""
if not tz_name:
return datetime.now()
tz = _TZ_CACHE.get(tz_name)
if tz is None:
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(tz_name)
_TZ_CACHE[tz_name] = tz
except Exception:
if tz_name not in _TZ_WARNED:
_TZ_WARNED.add(tz_name)
logger.warning(
"Invalid timezone %r for time-of-day rule; using server local time",
tz_name,
)
return datetime.now()
return datetime.now(tz)
@dataclass(frozen=True)
class _RuleEvalContext:
@@ -519,16 +546,26 @@ class AutomationEngine:
@staticmethod
def _evaluate_time_of_day(rule: TimeOfDayRule) -> bool:
now = datetime.now()
now = _now_in_tz(rule.timezone)
current = now.hour * 60 + now.minute
parts_s = rule.start_time.split(":")
parts_e = rule.end_time.split(":")
start = int(parts_s[0]) * 60 + int(parts_s[1])
end = int(parts_e[0]) * 60 + int(parts_e[1])
days = rule.days_of_week
if start <= end:
return start <= current <= end
# Overnight range (e.g. 22:00 → 06:00)
return current >= start or current <= end
if not (start <= current <= end):
return False
return not days or now.weekday() in days
# Overnight range (e.g. 22:00 → 06:00): the window belongs to its
# START day, so the after-midnight tail is matched against yesterday.
if current >= start: # evening portion — today's window
return not days or now.weekday() in days
if current <= end: # early-morning portion — yesterday's window
return not days or ((now.weekday() - 1) % 7) in days
return False
@staticmethod
def _evaluate_idle(rule: SystemIdleRule, idle_seconds: float | None) -> bool:
@@ -23,6 +23,11 @@ class BaseDeviceConfig:
class WLEDConfig(BaseDeviceConfig):
device_type: Literal["wled"] = "wled"
use_ddp: bool = False
# WLED native realtime UDP (port 21324) — mutually exclusive with use_ddp.
# realtime_timeout = seconds WLED stays in realtime after the last packet
# before reverting to its normal effect/preset (graceful auto-revert).
use_realtime: bool = False
realtime_timeout: int = 2
@dataclass(frozen=True)
+53 -9
View File
@@ -86,6 +86,8 @@ class WLEDClient(LEDClient):
retry_attempts: int = 3,
retry_delay: int = 1,
use_ddp: bool = False,
use_realtime: bool = False,
realtime_timeout: int = 2,
):
"""Initialize WLED client.
@@ -95,12 +97,17 @@ class WLEDClient(LEDClient):
retry_attempts: Number of retry attempts on failure
retry_delay: Delay between retries in seconds
use_ddp: Force DDP protocol (auto-enabled for >500 LEDs)
use_realtime: Use WLED native realtime UDP (port 21324) instead of DDP
realtime_timeout: Seconds WLED stays in realtime after the last packet
before reverting to its normal effect/preset (1-255)
"""
self.url = url.rstrip("/")
self.timeout = timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.use_ddp = use_ddp
self.use_realtime = use_realtime
self.realtime_timeout = realtime_timeout
# Extract hostname/IP from URL for DDP
parsed = urlparse(self.url)
@@ -108,6 +115,7 @@ class WLEDClient(LEDClient):
self._client: httpx.AsyncClient | None = None
self._ddp_client: DDPClient | None = None
self._realtime_client = None # WledRealtimeClient when use_realtime
self._connected = False
self._pre_connect_state: dict | None = None
@@ -127,8 +135,9 @@ class WLEDClient(LEDClient):
# Test connection by getting device info
info = await self.get_info()
# Auto-enable DDP for large LED counts
if info.led_count > self.HTTP_MAX_LEDS and not self.use_ddp:
# Auto-enable DDP for large LED counts (unless the user explicitly
# chose native realtime UDP, which handles any size via DNRGB).
if info.led_count > self.HTTP_MAX_LEDS and not self.use_ddp and not self.use_realtime:
logger.info(
f"Device has {info.led_count} LEDs (>{self.HTTP_MAX_LEDS}), "
"auto-enabling DDP protocol"
@@ -138,8 +147,30 @@ class WLEDClient(LEDClient):
# Snapshot device state BEFORE any mutations (for auto-restore)
self._pre_connect_state = await self.snapshot_device_state()
# Create WLED native realtime UDP client if selected
if self.use_realtime:
from ledgrab.core.devices.wled_realtime_client import WledRealtimeClient
self._realtime_client = WledRealtimeClient(
self.host, rgbw=info.rgbw, timeout_secs=self.realtime_timeout
)
await self._realtime_client.connect()
try:
await self._request(
"POST",
"/json/state",
json_data={"on": True, "lor": 0, "AudioReactive": {"on": False}},
)
except Exception as e:
logger.warning(f"Could not configure device for realtime UDP: {e}")
logger.info(
"WLED native realtime UDP enabled (port 21324, %ds timeout, %s)",
self.realtime_timeout,
"RGBW" if info.rgbw else "RGB",
)
# Create DDP client if needed
if self.use_ddp:
elif self.use_ddp:
self._ddp_client = DDPClient(self.host, rgbw=False)
# Pass per-bus config so DDP client can apply per-bus color reordering
if info.buses:
@@ -191,6 +222,9 @@ class WLEDClient(LEDClient):
if self._ddp_client:
await self._ddp_client.close()
self._ddp_client = None
if self._realtime_client:
await self._realtime_client.close()
self._realtime_client = None
self._connected = False
logger.debug(f"Closed connection to {self.url}")
@@ -201,8 +235,10 @@ class WLEDClient(LEDClient):
@property
def supports_fast_send(self) -> bool:
"""True when DDP is active and ready for fire-and-forget sends."""
return self.use_ddp and self._ddp_client is not None
"""True when DDP or native realtime UDP is active (fire-and-forget)."""
return (self.use_ddp and self._ddp_client is not None) or (
self.use_realtime and self._realtime_client is not None
)
async def _request(
self,
@@ -384,7 +420,10 @@ class WLEDClient(LEDClient):
raise ValueError(f"Invalid RGB values at index {idx}: {tuple(pixel_arr[idx])}")
validated_pixels = pixel_arr.astype(np.uint8) if pixel_arr.dtype != np.uint8 else pixel_arr
# Use DDP protocol if enabled
# Native realtime UDP takes precedence, then DDP, then HTTP
if self.use_realtime and self._realtime_client:
self._realtime_client.send_pixels_numpy(validated_pixels)
return True
if self.use_ddp and self._ddp_client:
return await self._send_pixels_ddp(validated_pixels, brightness)
else:
@@ -485,8 +524,10 @@ class WLEDClient(LEDClient):
pixels: numpy array (N, 3) uint8 or list of (R, G, B) tuples
brightness: Global brightness (0-255)
"""
if not self.use_ddp or not self._ddp_client:
raise RuntimeError("send_pixels_fast requires DDP; use send_pixels for HTTP")
if not (self.use_ddp and self._ddp_client) and not (
self.use_realtime and self._realtime_client
):
raise RuntimeError("send_pixels_fast requires DDP or realtime UDP; use send_pixels")
if isinstance(pixels, np.ndarray):
pixel_array = pixels
@@ -494,7 +535,10 @@ class WLEDClient(LEDClient):
pixel_array = np.array(pixels, dtype=np.uint8)
# Note: brightness already applied by processor loop (_cached_brightness)
self._ddp_client.send_pixels_numpy(pixel_array)
if self.use_realtime and self._realtime_client:
self._realtime_client.send_pixels_numpy(pixel_array)
else:
self._ddp_client.send_pixels_numpy(pixel_array)
# ===== LEDClient abstraction methods =====
@@ -86,6 +86,8 @@ class WLEDDeviceProvider(LEDDeviceProvider):
return WLEDClient(
config.device_url,
use_ddp=config.use_ddp,
use_realtime=config.use_realtime,
realtime_timeout=config.realtime_timeout,
)
async def check_health(self, url: str, http_client, prev_health=None) -> DeviceHealth:
@@ -0,0 +1,153 @@
"""WLED native realtime UDP client (port 21324).
WLED exposes a family of "realtime" UDP protocols separate from DDP. Compared to
the DDP path this gives three user-visible wins for the device LedGrab drives
most:
* **Auto-revert** — every packet carries a *timeout* byte. If LedGrab stops
streaming (host hiccup, sleep, crash), WLED returns to its normal effect /
preset after that many seconds instead of freezing on the last frame.
* **Correct RGBW whites** — the DRGBW variant carries an explicit white channel,
so RGBW strips are driven correctly instead of leaving W uncontrolled.
* **Lighter on weak Wi-Fi** — raw RGB with a 2-byte header, no DDP framing.
Unlike the DDP path, WLED applies the configured per-bus color order itself in
realtime mode, so this sender transmits plain RGB (no manual reordering) — the
user's WLED colour-order setting just works.
Packet layout (first byte selects the protocol)::
DRGB (2): [2][timeout] + R G B per LED (<= 490 LEDs)
DRGBW (3): [3][timeout] + R G B W per LED (<= 367 LEDs)
DNRGB (4): [4][timeout][start_hi][start_lo] + R G B per LED (chunked, 489/pkt)
The ``timeout`` byte is in **seconds** (1-255). DNRGB carries a 16-bit start
index so strips larger than one packet are sent as several chunks.
Ref: https://kno.wled.ge/interfaces/udp-realtime/
"""
from __future__ import annotations
import asyncio
import numpy as np
from ledgrab.utils import get_logger
logger = get_logger(__name__)
REALTIME_PORT = 21324
# Protocol selector (first byte).
_DRGB = 2
_DRGBW = 3
_DNRGB = 4
# Per-protocol LED capacity (bounded by the ~1500-byte UDP payload).
_MAX_DRGB = 490 # 2 + 490*3 = 1472
_MAX_DRGBW = 367 # 2 + 367*4 = 1470
_MAX_DNRGB_CHUNK = 489 # 4 + 489*3 = 1471
# Default seconds WLED stays in realtime after the last packet before reverting.
DEFAULT_REALTIME_TIMEOUT = 2
def _clamp_timeout(seconds: int) -> int:
"""Clamp the realtime timeout to the on-wire 1-255 range."""
return max(1, min(255, int(seconds)))
class WledRealtimeClient:
"""Fire-and-forget UDP sender for WLED native realtime protocols."""
def __init__(
self,
host: str,
port: int = REALTIME_PORT,
rgbw: bool = False,
timeout_secs: int = DEFAULT_REALTIME_TIMEOUT,
) -> None:
self.host = host
self.port = port
self.rgbw = rgbw
self.timeout_secs = _clamp_timeout(timeout_secs)
self._transport: asyncio.DatagramTransport | None = None
self._protocol: asyncio.DatagramProtocol | None = None
# Reusable RGBW scratch (resized on demand) so the hot path doesn't
# allocate a fresh (N, 4) array per frame.
self._rgbw_buf: np.ndarray | None = None
self._rgbw_buf_n: int = 0
async def connect(self) -> bool:
"""Open the UDP datagram endpoint to the device."""
loop = asyncio.get_running_loop()
self._transport, self._protocol = await loop.create_datagram_endpoint(
asyncio.DatagramProtocol, remote_addr=(self.host, self.port)
)
logger.info(
"WLED realtime client connected to %s:%d (timeout %ds, %s)",
self.host,
self.port,
self.timeout_secs,
"RGBW" if self.rgbw else "RGB",
)
return True
async def close(self) -> None:
"""Close the datagram endpoint."""
if self._transport is not None:
self._transport.close()
self._transport = None
self._protocol = None
logger.debug("Closed WLED realtime connection to %s:%d", self.host, self.port)
@property
def is_connected(self) -> bool:
return self._transport is not None
def _ensure_rgbw_buf(self, n: int) -> np.ndarray:
"""Return an ``(n, 4)`` uint8 RGBW buffer with the white channel zeroed."""
if self._rgbw_buf is None or self._rgbw_buf_n != n:
self._rgbw_buf = np.zeros((n, 4), dtype=np.uint8)
self._rgbw_buf_n = n
return self._rgbw_buf
def build_packets(self, pixels: np.ndarray) -> list[bytes]:
"""Build the realtime UDP packet(s) for one ``(N, 3)`` uint8 RGB frame.
Exposed (and pure) for unit testing the wire format. Picks DRGBW for
RGBW strips within range, DRGB for small RGB strips, otherwise DNRGB
chunks. The white channel is sent as 0 (colour comes from the RGB LEDs).
"""
pixels = np.ascontiguousarray(pixels, dtype=np.uint8)
n = len(pixels)
t = self.timeout_secs
if n == 0:
return []
if self.rgbw and n <= _MAX_DRGBW:
buf = self._ensure_rgbw_buf(n)
buf[:, 0:3] = pixels
# white channel already zeroed and left at 0
return [bytes([_DRGBW, t]) + buf.tobytes()]
if n <= _MAX_DRGB and not self.rgbw:
return [bytes([_DRGB, t]) + pixels.tobytes()]
# DNRGB: 16-bit start index, chunked. Covers >490 RGB and >367 RGBW
# (the white channel is dropped for oversized RGBW strips).
packets: list[bytes] = []
for start in range(0, n, _MAX_DNRGB_CHUNK):
end = min(start + _MAX_DNRGB_CHUNK, n)
header = bytes([_DNRGB, t, (start >> 8) & 0xFF, start & 0xFF])
packets.append(header + pixels[start:end].tobytes())
return packets
def send_pixels_numpy(self, pixels: np.ndarray) -> bool:
"""Send one frame of ``(N, 3)`` uint8 RGB pixels (fire-and-forget)."""
if self._transport is None:
return False
for packet in self.build_packets(pixels):
self._transport.sendto(packet)
return True
@@ -0,0 +1,58 @@
"""Automatic brightness limiting (ABL) — keep a strip within a PSU current budget.
Estimates the current an addressable LED strip would draw for a frame of
already-brightness-scaled RGB bytes and, if it exceeds the configured budget,
returns a uniform scale factor to bring it back under budget. This prevents the
classic under-spec'd-PSU failure mode: a full-white scene browning out the rail
(voltage sag -> red/orange shift, flicker, controller resets) — which reads to a
new user as "this software is broken".
Model: one addressable LED at full white ``(255, 255, 255)`` draws
``milliamps_per_led`` mA, and current scales linearly with the sum of channel
values, so a frame's draw is::
estimated_ma = sum(channel_bytes) * milliamps_per_led / (255 * 3)
(``255 * 3 = 765`` channel-units == one LED at full white.) Standby/idle current
is intentionally ignored: the limiter only needs to catch the high-draw frames
that cause brownouts, and the default 55 mA/LED already carries real-world
headroom. The same convention as WLED's "maximum current" setting.
"""
from __future__ import annotations
import numpy as np
# Channel units in one LED at full white (R + G + B = 255 * 3).
_FULL_WHITE_UNITS = 765.0
# Typical full-white draw of a single WS2812/SK6812-class LED, in mA.
DEFAULT_MILLIAMPS_PER_LED = 55
def estimate_current_ma(colors: np.ndarray, milliamps_per_led: int) -> float:
"""Estimate strip draw (mA) for already-brightness-scaled RGB bytes.
``colors`` is an ``(N, 3)`` uint8 array of the values actually sent to the
strip. Full white over ``N`` LEDs returns ``N * milliamps_per_led``.
"""
if milliamps_per_led <= 0 or colors.size == 0:
return 0.0
channel_sum = float(int(colors.sum()))
return channel_sum * milliamps_per_led / _FULL_WHITE_UNITS
def power_limit_scale(colors: np.ndarray, max_milliamps: int, milliamps_per_led: int) -> float:
"""Return a scale in ``(0, 1]`` that keeps estimated draw within budget.
Returns ``1.0`` when limiting is disabled (``max_milliamps <= 0``) or the
frame is already within budget. Because current is linear in the channel
values, scaling every pixel by ``max_milliamps / estimated`` lands the frame
exactly on the budget.
"""
if max_milliamps <= 0 or milliamps_per_led <= 0:
return 1.0
estimated = estimate_current_ma(colors, milliamps_per_led)
if estimated <= max_milliamps:
return 1.0
return max_milliamps / estimated
@@ -407,6 +407,8 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
min_brightness_threshold: int = 0,
adaptive_fps: bool = False,
protocol: str = "ddp",
max_milliamps: int = 0,
milliamps_per_led: int = 55,
):
"""Register a WLED target processor."""
if target_id in self._processors:
@@ -425,6 +427,8 @@ class ProcessorManager(AutoRestartMixin, DeviceHealthMixin, DeviceTestModeMixin)
min_brightness_threshold=min_brightness_threshold,
adaptive_fps=adaptive_fps,
protocol=protocol,
max_milliamps=max_milliamps,
milliamps_per_led=milliamps_per_led,
ctx=self._build_context(),
)
self._processors[target_id] = proc
@@ -17,6 +17,7 @@ from ledgrab.core.devices.led_client import (
get_device_capabilities,
)
from ledgrab.core.capture.screen_capture import get_available_displays
from ledgrab.core.processing.power_limit import DEFAULT_MILLIAMPS_PER_LED, power_limit_scale
from ledgrab.core.processing.target_processor import (
ProcessingMetrics,
TargetContext,
@@ -62,6 +63,8 @@ class WledTargetProcessor(TargetProcessor):
min_brightness_threshold: int = 0,
adaptive_fps: bool = False,
protocol: str = "ddp",
max_milliamps: int = 0,
milliamps_per_led: int = 55,
ctx: TargetContext = None,
):
from ledgrab.storage.bindable import BindableFloat, bfloat
@@ -81,6 +84,13 @@ class WledTargetProcessor(TargetProcessor):
self._min_brightness_threshold = int(bfloat(min_brightness_threshold, 0.0))
self._adaptive_fps = adaptive_fps
self._protocol = protocol
# Automatic brightness limiting (ABL). 0 mA budget = disabled.
self._max_milliamps = max(0, int(max_milliamps or 0))
self._milliamps_per_led = max(1, int(milliamps_per_led or DEFAULT_MILLIAMPS_PER_LED))
# Reusable scratch for in-place power scaling (allocated on first use).
self._power_u16: np.ndarray | None = None
self._power_out: np.ndarray | None = None
self._power_n = 0
# Adaptive FPS / liveness probe runtime state
self._effective_fps: int = self._target_fps
@@ -146,9 +156,15 @@ class WledTargetProcessor(TargetProcessor):
from ledgrab.core.devices.device_config import WLEDConfig as _WLEDConfig
config = _dev.to_config()
# use_ddp is a target-derived protocol setting — override on WLEDConfig
# The target's protocol selects how we drive a WLED device:
# "ddp" -> DDP UDP (4048) "udp" -> WLED native realtime UDP (21324)
# "http" -> JSON API (use_ddp and use_realtime are exclusive)
if isinstance(config, _WLEDConfig):
config = _replace(config, use_ddp=(self._protocol == "ddp"))
config = _replace(
config,
use_ddp=(self._protocol == "ddp"),
use_realtime=(self._protocol == "udp"),
)
self._device_config = config
# Connect to LED device
@@ -313,6 +329,12 @@ class WledTargetProcessor(TargetProcessor):
self._adaptive_fps = settings["adaptive_fps"]
if not self._adaptive_fps:
self._effective_fps = self._target_fps
if "max_milliamps" in settings:
self._max_milliamps = max(0, int(settings["max_milliamps"] or 0))
if "milliamps_per_led" in settings:
self._milliamps_per_led = max(
1, int(settings["milliamps_per_led"] or DEFAULT_MILLIAMPS_PER_LED)
)
logger.info(f"Updated settings for target {self._target_id}")
def update_device(self, device_id: str) -> None:
@@ -787,8 +809,33 @@ class WledTargetProcessor(TargetProcessor):
np.copyto(out, blend, casting="unsafe") # float32 → uint8
return out
def _apply_power_limit(self, colors: np.ndarray) -> np.ndarray:
"""Scale ``colors`` down to stay within the PSU current budget (ABL).
Returns ``colors`` unchanged when limiting is disabled or the frame is
already within budget; otherwise returns a scaled copy in a reusable
scratch buffer (the input is never mutated — it may be a shared frame).
"""
if self._max_milliamps <= 0:
return colors
scale = power_limit_scale(colors, self._max_milliamps, self._milliamps_per_led)
if scale >= 1.0:
return colors
factor = int(scale * 256) # 0..255 fixed-point multiplier
n = len(colors)
if self._power_u16 is None or self._power_n != n:
self._power_n = n
self._power_u16 = np.empty((n, 3), dtype=np.uint16)
self._power_out = np.empty((n, 3), dtype=np.uint8)
np.copyto(self._power_u16, colors, casting="unsafe")
self._power_u16 *= factor
self._power_u16 >>= 8
np.copyto(self._power_out, self._power_u16, casting="unsafe")
return self._power_out
async def _send_to_device(self, send_colors: np.ndarray) -> float:
"""Send colors to LED device and return send time in ms."""
send_colors = self._apply_power_limit(send_colors)
t_start = time.perf_counter()
if self._led_client.supports_fast_send:
self._led_client.send_pixels_fast(send_colors)
@@ -152,6 +152,50 @@
border-left: 1px solid var(--border-color);
}
/* Weekday + timezone scheduling (time_of_day rule) */
.rule-weekday-block,
.rule-tz-block {
margin-top: 12px;
}
.rule-field-label {
display: block;
font-size: 0.65rem;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.06em;
color: var(--text-muted);
margin-bottom: 6px;
}
.weekday-chips {
display: flex;
flex-wrap: wrap;
gap: 6px;
}
.weekday-chip {
flex: 1 1 auto;
min-width: 40px;
padding: 6px 8px;
font-size: 0.75rem;
font-weight: 600;
border: 1px solid var(--border-color);
border-radius: 6px;
background: var(--card-bg);
color: var(--text-muted);
cursor: pointer;
transition: background 0.12s, color 0.12s, border-color 0.12s;
}
.weekday-chip:hover {
border-color: var(--primary-color);
}
.weekday-chip.active {
background: var(--primary-color);
border-color: var(--primary-color);
color: #fff;
}
.rule-tz-block input.rule-timezone {
width: 100%;
}
.time-range-label {
font-size: 0.65rem;
font-weight: 700;
@@ -340,11 +340,15 @@ const RULE_CHIP_RENDERERS: Record<RuleType, RuleChipBuilder> = {
const matchLabel = t('automations.rule.application.match_type.' + (c.match_type || 'running'));
return { icon: _icon(P.smartphone), text: `${apps} (${matchLabel})`, title: t('automations.rule.application') };
},
time_of_day: (c) => ({
icon: ICON_CLOCK,
text: `${c.start_time || '00:00'} ${c.end_time || '23:59'}`,
title: t('automations.rule.time_of_day'),
}),
time_of_day: (c) => {
const days: number[] = Array.isArray(c.days_of_week) ? c.days_of_week : [];
let text = `${c.start_time || '00:00'} ${c.end_time || '23:59'}`;
if (days.length && days.length < 7) {
text += ` · ${[...days].sort((a, b) => a - b).map((d) => t('weekday.short.' + d)).join(' ')}`;
}
if (c.timezone) text += ` · ${c.timezone}`;
return { icon: ICON_CLOCK, text, title: t('automations.rule.time_of_day') };
},
system_idle: (c) => {
const mode = c.when_idle !== false ? t('automations.rule.system_idle.when_idle') : t('automations.rule.system_idle.when_active');
return { icon: ICON_TIMER, text: `${c.idle_minutes || 5}m (${mode})`, title: t('automations.rule.system_idle') };
@@ -878,6 +882,11 @@ function _renderTimeOfDayFields(container: HTMLElement, data: any): void {
const [sh, sm] = startTime.split(':').map(Number);
const [eh, em] = endTime.split(':').map(Number);
const pad = (n: number) => String(n).padStart(2, '0');
const days: number[] = Array.isArray(data.days_of_week) ? data.days_of_week : [];
const tz: string = data.timezone || '';
const dayChips = [0, 1, 2, 3, 4, 5, 6]
.map((d) => `<button type="button" class="weekday-chip${days.includes(d) ? ' active' : ''}" data-day="${d}">${t('weekday.short.' + d)}</button>`)
.join('');
container.innerHTML = `
<div class="rule-fields">
<input type="hidden" class="rule-start-time" value="${startTime}">
@@ -901,9 +910,21 @@ function _renderTimeOfDayFields(container: HTMLElement, data: any): void {
</div>
</div>
</div>
<div class="rule-weekday-block">
<span class="rule-field-label">${t('automations.rule.time_of_day.days')}</span>
<div class="weekday-chips">${dayChips}</div>
<small class="rule-hint-desc">${t('automations.rule.time_of_day.days_hint')}</small>
</div>
<div class="rule-tz-block">
<label class="rule-field-label">${t('automations.rule.time_of_day.timezone')}</label>
<input type="text" class="rule-timezone" placeholder="${t('automations.rule.time_of_day.timezone.placeholder')}" value="${tz}">
</div>
<small class="rule-hint-desc">${t('automations.rule.time_of_day.overnight_hint')}</small>
</div>`;
_wireTimeRangePicker(container);
container.querySelectorAll('.weekday-chip').forEach((chip) => {
chip.addEventListener('click', () => chip.classList.toggle('active'));
});
}
function _renderSystemIdleFields(container: HTMLElement, data: any): void {
@@ -1314,6 +1335,9 @@ const RULE_COLLECTORS: Record<RuleType, RuleCollector> = {
rule_type: 'time_of_day',
start_time: (row.querySelector('.rule-start-time') as HTMLInputElement).value || '00:00',
end_time: (row.querySelector('.rule-end-time') as HTMLInputElement).value || '23:59',
days_of_week: Array.from(row.querySelectorAll('.weekday-chip.active'))
.map((el) => parseInt((el as HTMLElement).dataset.day || '0', 10)),
timezone: ((row.querySelector('.rule-timezone') as HTMLInputElement)?.value || '').trim(),
}),
system_idle: (row) => ({
rule_type: 'system_idle',
@@ -171,6 +171,8 @@ class TargetEditorModal extends Modal {
fps: _fpsWidget ? JSON.stringify(_fpsWidget.getValue()) : '30',
keepalive_interval: (document.getElementById('target-editor-keepalive-interval') as HTMLInputElement).value,
adaptive_fps: (document.getElementById('target-editor-adaptive-fps') as HTMLInputElement).checked,
max_milliamps: (document.getElementById('target-editor-max-milliamps') as HTMLInputElement).value,
milliamps_per_led: (document.getElementById('target-editor-ma-per-led') as HTMLInputElement).value,
tags: JSON.stringify(_targetTagsInput ? _targetTagsInput.getValue() : []),
};
}
@@ -181,8 +183,13 @@ const targetEditorModal = new TargetEditorModal();
function _protocolBadge(device: any, target: any) {
const dt = device?.device_type;
if (!dt || dt === 'wled') {
const proto = target.protocol === 'http' ? 'HTTP' : 'DDP';
return `${target.protocol === 'http' ? ICON_GLOBE : ICON_RADIO} ${proto}`;
const wledMap: Record<string, [string, string]> = {
http: [ICON_GLOBE, 'HTTP'],
udp: [ICON_RADIO, 'WLED UDP'],
ddp: [ICON_RADIO, 'DDP'],
};
const [icon, label] = wledMap[target.protocol] || wledMap.ddp;
return `${icon} ${label}`;
}
const map = {
openrgb: [ICON_PALETTE, 'OpenRGB SDK'],
@@ -311,10 +318,11 @@ function _ensureProtocolIconSelect() {
if (!sel) return;
const items = [
{ value: 'ddp', icon: _pIcon(P.radio), label: t('targets.protocol.ddp'), desc: t('targets.protocol.ddp.desc') },
{ value: 'udp', icon: _pIcon(P.radio), label: t('targets.protocol.udp'), desc: t('targets.protocol.udp.desc') },
{ value: 'http', icon: _pIcon(P.globe), label: t('targets.protocol.http'), desc: t('targets.protocol.http.desc') },
];
if (_protocolIconSelect) { _protocolIconSelect.updateItems(items); return; }
_protocolIconSelect = new IconSelect({ target: sel as HTMLSelectElement, items, columns: 2 });
_protocolIconSelect = new IconSelect({ target: sel as HTMLSelectElement, items, columns: 3 });
}
function _ensureBrightnessWidget(): BindableScalarWidget {
@@ -401,6 +409,8 @@ export async function showTargetEditor(targetId: string | null = null, cloneData
(document.getElementById('target-editor-adaptive-fps') as HTMLInputElement).checked = target.adaptive_fps ?? false;
(document.getElementById('target-editor-protocol') as HTMLSelectElement).value = target.protocol || 'ddp';
(document.getElementById('target-editor-max-milliamps') as HTMLInputElement).value = String(target.max_milliamps ?? 0);
(document.getElementById('target-editor-ma-per-led') as HTMLInputElement).value = String(target.milliamps_per_led ?? 55);
_populateCssDropdown(target.color_strip_source_id || '');
_ensureBrightnessWidget().setValue(target.brightness ?? 1.0);
@@ -419,6 +429,8 @@ export async function showTargetEditor(targetId: string | null = null, cloneData
(document.getElementById('target-editor-adaptive-fps') as HTMLInputElement).checked = cloneData.adaptive_fps ?? false;
(document.getElementById('target-editor-protocol') as HTMLSelectElement).value = cloneData.protocol || 'ddp';
(document.getElementById('target-editor-max-milliamps') as HTMLInputElement).value = String(cloneData.max_milliamps ?? 0);
(document.getElementById('target-editor-ma-per-led') as HTMLInputElement).value = String(cloneData.milliamps_per_led ?? 55);
_populateCssDropdown(cloneData.color_strip_source_id || '');
_ensureBrightnessWidget().setValue(cloneData.brightness ?? 1.0);
@@ -435,6 +447,8 @@ export async function showTargetEditor(targetId: string | null = null, cloneData
(document.getElementById('target-editor-adaptive-fps') as HTMLInputElement).checked = false;
(document.getElementById('target-editor-protocol') as HTMLSelectElement).value = 'ddp';
(document.getElementById('target-editor-max-milliamps') as HTMLInputElement).value = '0';
(document.getElementById('target-editor-ma-per-led') as HTMLInputElement).value = '55';
_populateCssDropdown('');
_ensureBrightnessWidget().setValue(1.0);
@@ -515,6 +529,8 @@ export async function saveTargetEditor() {
const adaptiveFps = (document.getElementById('target-editor-adaptive-fps') as HTMLInputElement).checked;
const protocol = (document.getElementById('target-editor-protocol') as HTMLSelectElement).value;
const maxMilliamps = Math.max(0, Math.round(Number((document.getElementById('target-editor-max-milliamps') as HTMLInputElement).value) || 0));
const milliampsPerLed = Math.max(1, Math.round(Number((document.getElementById('target-editor-ma-per-led') as HTMLInputElement).value) || 55));
const payload: any = {
name,
@@ -526,6 +542,8 @@ export async function saveTargetEditor() {
keepalive_interval: standbyInterval,
adaptive_fps: adaptiveFps,
protocol,
max_milliamps: maxMilliamps,
milliamps_per_led: milliampsPerLed,
tags: _targetTagsInput ? _targetTagsInput.getValue() : [],
};
@@ -50,6 +50,8 @@ export interface LedOutputTarget extends OutputTargetBase {
min_brightness_threshold?: BindableFloat;
adaptive_fps: boolean;
protocol: string;
max_milliamps?: number;
milliamps_per_led?: number;
}
export type HALightSourceKind = 'css' | 'color_vs';
@@ -30,6 +30,7 @@ export interface PostprocessingTemplate {
description?: string;
icon?: string;
icon_color?: string;
is_builtin?: boolean;
created_at: string;
updated_at: string;
}
+17
View File
@@ -1235,6 +1235,17 @@
"automations.rule.time_of_day.start_time": "Start Time:",
"automations.rule.time_of_day.end_time": "End Time:",
"automations.rule.time_of_day.overnight_hint": "For overnight ranges (e.g. 22:0006:00), set start time after end time.",
"automations.rule.time_of_day.days": "Active days",
"automations.rule.time_of_day.days_hint": "Leave all unselected for every day. Overnight windows count toward the day they start on.",
"automations.rule.time_of_day.timezone": "Timezone",
"automations.rule.time_of_day.timezone.placeholder": "Server local (e.g. Europe/Berlin)",
"weekday.short.0": "Mon",
"weekday.short.1": "Tue",
"weekday.short.2": "Wed",
"weekday.short.3": "Thu",
"weekday.short.4": "Fri",
"weekday.short.5": "Sat",
"weekday.short.6": "Sun",
"automations.rule.system_idle": "System Idle",
"automations.rule.system_idle.desc": "User idle/active",
"automations.rule.system_idle.idle_minutes": "Idle Timeout (minutes):",
@@ -2079,8 +2090,14 @@
"targets.adaptive_fps.hint": "Automatically reduce send rate when the device becomes unresponsive, and gradually recover when it stabilizes. Recommended for WiFi devices with weak signal.",
"targets.protocol": "Protocol:",
"targets.protocol.hint": "DDP sends pixels via fast UDP (recommended for most setups). HTTP uses the JSON API — slower but reliable, limited to ~500 LEDs.",
"targets.power_limit": "Max current (ABL):",
"targets.power_limit.hint": "Caps the strip's estimated current draw to your power-supply budget to prevent brownouts (voltage sag, color shift, flicker) on bright/white scenes. Set it to your PSU's rated current, leaving some headroom. 0 = unlimited.",
"targets.power_limit.ma_suffix": "mA (0 = unlimited)",
"targets.power_limit.per_led": "mA per LED (full white):",
"targets.protocol.ddp": "DDP (UDP)",
"targets.protocol.ddp.desc": "Fast raw UDP packets — recommended",
"targets.protocol.udp": "WLED UDP (realtime)",
"targets.protocol.udp.desc": "WLED native realtime — RGBW whites + auto-revert if the stream drops",
"targets.protocol.http": "HTTP",
"targets.protocol.http.desc": "JSON API — slower, ≤500 LEDs",
"targets.protocol.serial": "Serial",
+17
View File
@@ -1269,6 +1269,17 @@
"automations.rule.time_of_day.start_time": "Время начала:",
"automations.rule.time_of_day.end_time": "Время окончания:",
"automations.rule.time_of_day.overnight_hint": "Для ночных диапазонов (например 22:00–06:00) укажите время начала позже времени окончания.",
"automations.rule.time_of_day.days": "Активные дни",
"automations.rule.time_of_day.days_hint": "Оставьте всё невыбранным для всех дней. Ночные окна относятся ко дню, когда они начинаются.",
"automations.rule.time_of_day.timezone": "Часовой пояс",
"automations.rule.time_of_day.timezone.placeholder": "Локальное время сервера (напр. Europe/Berlin)",
"weekday.short.0": "Пн",
"weekday.short.1": "Вт",
"weekday.short.2": "Ср",
"weekday.short.3": "Чт",
"weekday.short.4": "Пт",
"weekday.short.5": "Сб",
"weekday.short.6": "Вс",
"automations.rule.system_idle": "Бездействие системы",
"automations.rule.system_idle.desc": "Бездействие/активность",
"automations.rule.system_idle.idle_minutes": "Тайм-аут бездействия (минуты):",
@@ -1939,8 +1950,14 @@
"targets.adaptive_fps.hint": "Автоматически снижает частоту отправки, когда устройство перестаёт отвечать, и постепенно восстанавливает её при стабилизации. Рекомендуется для WiFi-устройств со слабым сигналом.",
"targets.protocol": "Протокол:",
"targets.protocol.hint": "DDP отправляет пиксели по быстрому UDP (рекомендуется). HTTP использует JSON API — медленнее, но надёжнее, ограничение ~500 LED.",
"targets.power_limit": "Макс. ток (ABL):",
"targets.power_limit.hint": "Ограничивает расчётный ток ленты бюджетом блока питания, чтобы избежать просадок напряжения (сдвиг цвета, мерцание, перезагрузки) на ярких/белых сценах. Укажите номинальный ток вашего БП с запасом. 0 = без ограничения.",
"targets.power_limit.ma_suffix": "мА (0 = без ограничения)",
"targets.power_limit.per_led": "мА на светодиод (полный белый):",
"targets.protocol.ddp": "DDP (UDP)",
"targets.protocol.ddp.desc": "Быстрые UDP-пакеты — рекомендуется",
"targets.protocol.udp": "WLED UDP (realtime)",
"targets.protocol.udp.desc": "Нативный realtime WLED — корректный RGBW и авто-возврат при обрыве потока",
"targets.protocol.http": "HTTP",
"targets.protocol.http.desc": "JSON API — медленнее, ≤500 LED",
"targets.protocol.serial": "Serial",
+17
View File
@@ -1265,6 +1265,17 @@
"automations.rule.time_of_day.start_time": "开始时间:",
"automations.rule.time_of_day.end_time": "结束时间:",
"automations.rule.time_of_day.overnight_hint": "跨夜时段(如 22:00–06:00),请将开始时间设为晚于结束时间。",
"automations.rule.time_of_day.days": "生效日期",
"automations.rule.time_of_day.days_hint": "全部不选表示每天生效。跨夜时段归属于其开始的那一天。",
"automations.rule.time_of_day.timezone": "时区",
"automations.rule.time_of_day.timezone.placeholder": "服务器本地时间(如 Europe/Berlin",
"weekday.short.0": "周一",
"weekday.short.1": "周二",
"weekday.short.2": "周三",
"weekday.short.3": "周四",
"weekday.short.4": "周五",
"weekday.short.5": "周六",
"weekday.short.6": "周日",
"automations.rule.system_idle": "系统空闲",
"automations.rule.system_idle.desc": "空闲/活跃",
"automations.rule.system_idle.idle_minutes": "空闲超时(分钟):",
@@ -1935,8 +1946,14 @@
"targets.adaptive_fps.hint": "当设备无响应时自动降低发送速率,稳定后逐步恢复。推荐用于信号较弱的WiFi设备。",
"targets.protocol": "协议:",
"targets.protocol.hint": "DDP通过快速UDP发送像素(推荐)。HTTP使用JSON API——较慢但可靠,限制约500个LED。",
"targets.power_limit": "最大电流 (ABL)",
"targets.power_limit.hint": "将灯带的估算电流限制在电源预算内,以防止明亮/白色场景下的电压骤降(颜色偏移、闪烁、重启)。请设为电源的额定电流并留有余量。0 = 不限制。",
"targets.power_limit.ma_suffix": "mA0 = 不限制)",
"targets.power_limit.per_led": "每颗 LED 电流(全白):",
"targets.protocol.ddp": "DDP (UDP)",
"targets.protocol.ddp.desc": "快速UDP数据包 - 推荐",
"targets.protocol.udp": "WLED UDP(实时)",
"targets.protocol.udp.desc": "WLED 原生实时 — 正确的 RGBW 白色,断流时自动恢复",
"targets.protocol.http": "HTTP",
"targets.protocol.http.desc": "JSON API - 较慢,≤500 LED",
"targets.protocol.serial": "串口",
+15 -2
View File
@@ -65,27 +65,40 @@ class ApplicationRule(Rule):
@dataclass
class TimeOfDayRule(Rule):
"""Activate during a specific time range (server local time).
"""Activate during a specific time range.
Supports overnight ranges: if start_time > end_time, the range wraps
around midnight (e.g. 22:00 → 06:00).
around midnight (e.g. 22:00 → 06:00) — an overnight window belongs to the
day it *starts* on. ``days_of_week`` (0=Mon .. 6=Sun, empty = every day)
restricts which days the window is active. ``timezone`` is an IANA name
(e.g. "Europe/Berlin"); empty = the server's local time.
"""
rule_type: str = "time_of_day"
start_time: str = "00:00" # HH:MM
end_time: str = "23:59" # HH:MM
days_of_week: List[int] = field(default_factory=list) # 0=Mon..6=Sun; empty=all days
timezone: str = "" # IANA tz name; empty = server local time
def to_dict(self) -> dict:
d = super().to_dict()
d["start_time"] = self.start_time
d["end_time"] = self.end_time
d["days_of_week"] = self.days_of_week
d["timezone"] = self.timezone
return d
@classmethod
def from_dict(cls, data: dict) -> "TimeOfDayRule":
raw_days = data.get("days_of_week") or []
days = sorted(
{int(d) for d in raw_days if isinstance(d, (int, float)) and 0 <= int(d) <= 6}
)
return cls(
start_time=data.get("start_time", "00:00"),
end_time=data.get("end_time", "23:59"),
days_of_week=days,
timezone=data.get("timezone", "") or "",
)
@@ -95,6 +95,8 @@ class OutputTargetStore(BaseSqliteStore[OutputTarget]):
min_brightness_threshold: Any = 0,
adaptive_fps: bool = False,
protocol: str = "ddp",
max_milliamps: int = 0,
milliamps_per_led: int = 55,
description: str | None = None,
tags: List[str] | None = None,
# legacy compat
@@ -116,6 +118,8 @@ class OutputTargetStore(BaseSqliteStore[OutputTarget]):
min_brightness_threshold=BindableFloat.from_raw(min_brightness_threshold, default=0.0),
adaptive_fps=adaptive_fps,
protocol=protocol,
max_milliamps=max(0, int(max_milliamps or 0)),
milliamps_per_led=max(1, int(milliamps_per_led or 55)),
description=description,
created_at=now,
updated_at=now,
@@ -335,6 +339,8 @@ class OutputTargetStore(BaseSqliteStore[OutputTarget]):
min_brightness_threshold: Any = None,
adaptive_fps: bool | None = None,
protocol: str | None = None,
max_milliamps: int | None = None,
milliamps_per_led: int | None = None,
description: str | None = None,
tags: List[str] | None = None,
icon: str | None = None,
@@ -356,6 +362,8 @@ class OutputTargetStore(BaseSqliteStore[OutputTarget]):
min_brightness_threshold=min_brightness_threshold,
adaptive_fps=adaptive_fps,
protocol=protocol,
max_milliamps=max_milliamps,
milliamps_per_led=milliamps_per_led,
description=description,
tags=tags,
icon=icon,
@@ -20,6 +20,7 @@ class PostprocessingTemplate:
tags: List[str] = field(default_factory=list)
icon: str = ""
icon_color: str = ""
is_builtin: bool = False
def to_dict(self) -> dict:
"""Convert template to dictionary."""
@@ -31,6 +32,7 @@ class PostprocessingTemplate:
"updated_at": self.updated_at.isoformat(),
"description": self.description,
"tags": self.tags,
"is_builtin": self.is_builtin,
}
if self.icon:
d["icon"] = self.icon
@@ -61,4 +63,5 @@ class PostprocessingTemplate:
tags=data.get("tags", []),
icon=data.get("icon", "") or "",
icon_color=data.get("icon_color", "") or "",
is_builtin=data.get("is_builtin", False),
)
@@ -15,6 +15,57 @@ from ledgrab.utils import get_logger
logger = get_logger(__name__)
# Curated, read-only "look" presets — opinionated filter chains that give
# instant good-looking output before a user discovers the filter pipeline.
# Each entry: id-suffix -> (display name, description, [(filter_id, options), ...]).
# Only verified filters/option keys are used.
_BUILTIN_LOOKS: dict[str, tuple[str, str, list[tuple[str, dict]]]] = {
"cinematic": (
"Cinematic",
"Letterbox-aware, gently smoothed, mild colour boost — tuned for films.",
[
("auto_crop", {"threshold": 16, "min_bar_size": 20, "min_aspect_ratio": 1.4}),
("saturation", {"value": 1.12}),
("temporal_blur", {"strength": 0.35}),
],
),
"vivid": (
"Vivid",
"Punchy and responsive with high saturation — tuned for games.",
[
("saturation", {"value": 1.4}),
("contrast", {"value": 1.18}),
],
),
"cozy": (
"Cozy",
"Warm, dim and smooth — relaxed evening ambience.",
[
("color_correction", {"temperature": 3800}),
("brightness", {"value": 0.85}),
("saturation", {"value": 0.95}),
("temporal_blur", {"strength": 0.45}),
],
),
"soft": (
"Soft",
"Heavily smoothed and calm — minimises flicker on busy content.",
[
("temporal_blur", {"strength": 0.55}),
("saturation", {"value": 0.98}),
],
),
"cool": (
"Cool",
"Crisp, cool-white and clean — a modern, neutral look.",
[
("color_correction", {"temperature": 8000}),
("saturation", {"value": 1.1}),
],
),
}
class PostprocessingTemplateStore(BaseSqliteStore[PostprocessingTemplate]):
"""Storage for postprocessing templates.
@@ -29,11 +80,42 @@ class PostprocessingTemplateStore(BaseSqliteStore[PostprocessingTemplate]):
def __init__(self, db: Database):
super().__init__(db, PostprocessingTemplate.from_dict)
self._ensure_initial_template()
self._seed_missing_builtins()
# Backward-compatible aliases
get_all_templates = BaseSqliteStore.get_all
get_template = BaseSqliteStore.get
delete_template = BaseSqliteStore.delete
def _seed_missing_builtins(self) -> None:
"""Seed any curated built-in "look" templates not yet in the store."""
now = datetime.now(timezone.utc)
added = 0
for key, (name, description, chain) in _BUILTIN_LOOKS.items():
tid = f"pp_builtin_{key}"
if tid in self._items:
continue
template = PostprocessingTemplate(
id=tid,
name=name,
filters=[FilterInstance(fid, dict(opts)) for fid, opts in chain],
created_at=now,
updated_at=now,
description=description,
tags=["look"],
is_builtin=True,
)
self._items[tid] = template
self._save_item(tid, template)
added += 1
if added:
logger.info(f"Seeded {added} new built-in look templates")
def delete_template(self, template_id: str) -> None:
"""Delete a template. Built-in looks are read-only."""
template = self.get(template_id)
if getattr(template, "is_builtin", False):
raise ValueError("Built-in look templates cannot be deleted. Clone to customise.")
self.delete(template_id)
def _ensure_initial_template(self) -> None:
"""Auto-create a default postprocessing template if none exist."""
@@ -114,6 +196,9 @@ class PostprocessingTemplateStore(BaseSqliteStore[PostprocessingTemplate]):
) -> PostprocessingTemplate:
template = self.get(template_id)
if getattr(template, "is_builtin", False):
raise ValueError("Built-in look templates are read-only. Clone to customise.")
if name is not None:
self._check_name_unique(name, exclude_id=template_id)
template.name = name
@@ -24,6 +24,11 @@ class WledOutputTarget(OutputTarget, type_key="led"):
min_brightness_threshold: BindableFloat = field(default_factory=lambda: BindableFloat(0.0))
adaptive_fps: bool = False # auto-reduce FPS when device is unresponsive
protocol: str = "ddp" # "ddp" (UDP) or "http" (JSON API)
# Automatic brightness limiting (ABL): cap estimated strip draw to a PSU
# budget. max_milliamps <= 0 disables it. milliamps_per_led is the full-white
# draw of one LED (WS2812-class default 55 mA).
max_milliamps: int = 0
milliamps_per_led: int = 55
def register_with_manager(self, manager) -> None:
"""Register this WLED target with the processor manager."""
@@ -39,6 +44,8 @@ class WledOutputTarget(OutputTarget, type_key="led"):
min_brightness_threshold=self.min_brightness_threshold,
adaptive_fps=self.adaptive_fps,
protocol=self.protocol,
max_milliamps=self.max_milliamps,
milliamps_per_led=self.milliamps_per_led,
)
def sync_with_manager(
@@ -59,6 +66,8 @@ class WledOutputTarget(OutputTarget, type_key="led"):
"state_check_interval": self.state_check_interval,
"min_brightness_threshold": self.min_brightness_threshold,
"adaptive_fps": self.adaptive_fps,
"max_milliamps": self.max_milliamps,
"milliamps_per_led": self.milliamps_per_led,
},
)
if css_changed:
@@ -81,6 +90,8 @@ class WledOutputTarget(OutputTarget, type_key="led"):
min_brightness_threshold=None,
adaptive_fps=None,
protocol=None,
max_milliamps=None,
milliamps_per_led=None,
description=None,
tags: List[str] | None = None,
icon: str | None = None,
@@ -122,6 +133,10 @@ class WledOutputTarget(OutputTarget, type_key="led"):
self.adaptive_fps = adaptive_fps
if protocol is not None:
self.protocol = protocol
if max_milliamps is not None:
self.max_milliamps = max(0, int(max_milliamps))
if milliamps_per_led is not None:
self.milliamps_per_led = max(1, int(milliamps_per_led))
@property
def has_picture_source(self) -> bool:
@@ -139,6 +154,8 @@ class WledOutputTarget(OutputTarget, type_key="led"):
d["min_brightness_threshold"] = self.min_brightness_threshold.to_dict()
d["adaptive_fps"] = self.adaptive_fps
d["protocol"] = self.protocol
d["max_milliamps"] = self.max_milliamps
d["milliamps_per_led"] = self.milliamps_per_led
return d
@classmethod
@@ -165,6 +182,8 @@ class WledOutputTarget(OutputTarget, type_key="led"):
),
adaptive_fps=data.get("adaptive_fps", False),
protocol=data.get("protocol", "ddp"),
max_milliamps=int(data.get("max_milliamps", 0) or 0),
milliamps_per_led=int(data.get("milliamps_per_led", 55) or 55),
description=data.get("description"),
tags=data.get("tags", []),
icon=data.get("icon", ""),
@@ -123,6 +123,7 @@
<small class="input-hint" style="display:none" data-i18n="targets.protocol.hint">DDP sends pixels via fast UDP (recommended). HTTP uses the JSON API — slower but reliable, limited to ~500 LEDs.</small>
<select id="target-editor-protocol">
<option value="ddp">DDP (UDP)</option>
<option value="udp">WLED UDP (realtime)</option>
<option value="http">HTTP</option>
</select>
</div>
@@ -138,6 +139,22 @@
<small class="input-hint" style="display:none" data-i18n="targets.keepalive_interval.hint">How often to resend the last frame when the screen is static, to keep the device in live mode (0.5-5.0s)</small>
<input type="range" id="target-editor-keepalive-interval" min="0.5" max="5.0" step="0.5" value="1.0" oninput="document.getElementById('target-editor-keepalive-interval-value').textContent = this.value">
</div>
<div class="form-group" id="target-editor-power-limit-group">
<div class="label-row">
<label for="target-editor-max-milliamps" data-i18n="targets.power_limit">Max current (ABL):</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="targets.power_limit.hint">Caps the strip's estimated current draw to your power-supply budget to prevent brownouts (voltage sag, color shift, flicker) on bright/white scenes. Set it to your PSU's rated current, leaving some headroom. 0 = unlimited.</small>
<div class="label-row">
<input type="number" id="target-editor-max-milliamps" min="0" max="200000" step="100" value="0">
<span data-i18n="targets.power_limit.ma_suffix">mA (0 = unlimited)</span>
</div>
<div class="label-row">
<label for="target-editor-ma-per-led" data-i18n="targets.power_limit.per_led">mA per LED (full white):</label>
<input type="number" id="target-editor-ma-per-led" min="1" max="200" step="1" value="55">
</div>
</div>
</div>
</details>
</div>
+81
View File
@@ -0,0 +1,81 @@
"""Tests for built-in curated 'look' postprocessing templates."""
import pytest
from ledgrab.core.filters.registry import FilterRegistry
from ledgrab.storage.postprocessing_template import PostprocessingTemplate
from ledgrab.storage.postprocessing_template_store import (
_BUILTIN_LOOKS,
PostprocessingTemplateStore,
)
def test_builtins_are_seeded(tmp_db):
store = PostprocessingTemplateStore(tmp_db)
for key in _BUILTIN_LOOKS:
tpl = store.get_template(f"pp_builtin_{key}")
assert tpl.is_builtin is True
assert tpl.filters # non-empty chain
def test_builtin_filters_use_registered_ids(tmp_db):
store = PostprocessingTemplateStore(tmp_db)
for key in _BUILTIN_LOOKS:
tpl = store.get_template(f"pp_builtin_{key}")
for fi in tpl.filters:
assert FilterRegistry.is_registered(fi.filter_id), fi.filter_id
def test_seeding_is_idempotent(tmp_db):
PostprocessingTemplateStore(tmp_db)
store2 = PostprocessingTemplateStore(tmp_db)
ids = [t.id for t in store2.get_all_templates() if t.id.startswith("pp_builtin_")]
assert sorted(ids) == sorted(f"pp_builtin_{k}" for k in _BUILTIN_LOOKS)
def test_builtin_update_is_blocked(tmp_db):
store = PostprocessingTemplateStore(tmp_db)
with pytest.raises(ValueError, match="read-only"):
store.update_template("pp_builtin_vivid", name="Hacked")
def test_builtin_delete_is_blocked(tmp_db):
store = PostprocessingTemplateStore(tmp_db)
with pytest.raises(ValueError, match="cannot be deleted"):
store.delete_template("pp_builtin_vivid")
def test_user_template_still_editable_and_deletable(tmp_db):
store = PostprocessingTemplateStore(tmp_db)
tpl = store.create_template("My Look", filters=[])
assert tpl.is_builtin is False
store.update_template(tpl.id, description="changed")
store.delete_template(tpl.id)
with pytest.raises(ValueError):
store.get_template(tpl.id)
def test_is_builtin_round_trips_through_dict():
tpl = PostprocessingTemplate.from_dict(
{
"id": "pp_x",
"name": "x",
"filters": [],
"created_at": "2026-01-01T00:00:00+00:00",
"updated_at": "2026-01-01T00:00:00+00:00",
"is_builtin": True,
}
)
assert tpl.is_builtin is True
assert tpl.to_dict()["is_builtin"] is True
# legacy dict without the field defaults to False
legacy = PostprocessingTemplate.from_dict(
{
"id": "pp_y",
"name": "y",
"filters": [],
"created_at": "2026-01-01T00:00:00+00:00",
"updated_at": "2026-01-01T00:00:00+00:00",
}
)
assert legacy.is_builtin is False
+70
View File
@@ -0,0 +1,70 @@
"""Unit tests for automatic brightness limiting (ABL) current estimation."""
import numpy as np
import pytest
from ledgrab.core.processing.power_limit import (
DEFAULT_MILLIAMPS_PER_LED,
estimate_current_ma,
power_limit_scale,
)
def test_default_ma_per_led_constant():
assert DEFAULT_MILLIAMPS_PER_LED == 55
def test_full_white_draws_ma_per_led_times_count():
colors = np.full((100, 3), 255, dtype=np.uint8)
assert estimate_current_ma(colors, 55) == pytest.approx(100 * 55)
def test_black_draws_zero():
colors = np.zeros((100, 3), dtype=np.uint8)
assert estimate_current_ma(colors, 55) == 0.0
def test_half_white_is_half_current():
full = estimate_current_ma(np.full((100, 3), 255, dtype=np.uint8), 55)
half = estimate_current_ma(np.full((100, 3), 128, dtype=np.uint8), 55)
assert half == pytest.approx(full * 128 / 255, rel=1e-6)
def test_zero_ma_per_led_draws_zero():
colors = np.full((100, 3), 255, dtype=np.uint8)
assert estimate_current_ma(colors, 0) == 0.0
def test_empty_frame_is_safe():
colors = np.zeros((0, 3), dtype=np.uint8)
assert estimate_current_ma(colors, 55) == 0.0
assert power_limit_scale(colors, 1000, 55) == 1.0
def test_scale_is_one_when_disabled():
colors = np.full((100, 3), 255, dtype=np.uint8)
assert power_limit_scale(colors, 0, 55) == 1.0
assert power_limit_scale(colors, -1, 55) == 1.0
def test_scale_is_one_within_budget():
colors = np.full((100, 3), 255, dtype=np.uint8) # 5500 mA at 55 mA/LED
assert power_limit_scale(colors, 6000, 55) == 1.0
assert power_limit_scale(colors, 5500, 55) == 1.0 # exactly on budget
def test_scale_brings_full_white_to_budget():
colors = np.full((100, 3), 255, dtype=np.uint8) # 5500 mA
scale = power_limit_scale(colors, 2750, 55) # half budget
assert scale == pytest.approx(0.5, rel=1e-6)
def test_applying_scale_lands_within_budget():
colors = np.full((100, 3), 255, dtype=np.uint8) # 5500 mA
budget = 2750
scale = power_limit_scale(colors, budget, 55)
# Mirror the processor's fixed-point application (factor/256).
factor = int(scale * 256)
scaled = ((colors.astype(np.uint16) * factor) >> 8).astype(np.uint8)
# Fixed-point rounding can only ever round DOWN, so we never exceed budget.
assert estimate_current_ma(scaled, 55) <= budget
+78
View File
@@ -0,0 +1,78 @@
"""Tests for time-of-day automation scheduling (weekday + timezone + overnight)."""
import datetime as dt
from ledgrab.core.automations import automation_engine as ae
from ledgrab.core.automations.automation_engine import AutomationEngine, _now_in_tz
from ledgrab.storage.automation import TimeOfDayRule
_eval = AutomationEngine._evaluate_time_of_day
def _patch_now(monkeypatch, fixed: dt.datetime) -> None:
monkeypatch.setattr(ae, "_now_in_tz", lambda tz: fixed)
def test_within_window_every_day(monkeypatch):
_patch_now(monkeypatch, dt.datetime(2026, 6, 3, 20, 0))
assert _eval(TimeOfDayRule(start_time="18:00", end_time="23:00")) is True
def test_outside_window(monkeypatch):
_patch_now(monkeypatch, dt.datetime(2026, 6, 3, 12, 0))
assert _eval(TimeOfDayRule(start_time="18:00", end_time="23:00")) is False
def test_weekday_filter(monkeypatch):
fixed = dt.datetime(2026, 6, 3, 20, 0)
wd = fixed.weekday()
_patch_now(monkeypatch, fixed)
assert _eval(TimeOfDayRule("time_of_day", "18:00", "23:00", days_of_week=[wd])) is True
assert (
_eval(TimeOfDayRule("time_of_day", "18:00", "23:00", days_of_week=[(wd + 1) % 7])) is False
)
def test_overnight_evening_uses_today(monkeypatch):
fixed = dt.datetime(2026, 6, 3, 23, 0) # evening tail of a 22:00->06:00 window
wd = fixed.weekday()
_patch_now(monkeypatch, fixed)
assert _eval(TimeOfDayRule("time_of_day", "22:00", "06:00", days_of_week=[wd])) is True
assert (
_eval(TimeOfDayRule("time_of_day", "22:00", "06:00", days_of_week=[(wd + 1) % 7])) is False
)
def test_overnight_morning_uses_yesterday(monkeypatch):
fixed = dt.datetime(2026, 6, 3, 3, 0) # morning tail belongs to yesterday's window
today = fixed.weekday()
yesterday = (today - 1) % 7
_patch_now(monkeypatch, fixed)
assert _eval(TimeOfDayRule("time_of_day", "22:00", "06:00", days_of_week=[yesterday])) is True
assert _eval(TimeOfDayRule("time_of_day", "22:00", "06:00", days_of_week=[today])) is False
def test_from_dict_filters_invalid_days():
rule = TimeOfDayRule.from_dict({"days_of_week": [0, 7, -1, 3, 3, "x", 2.0]})
assert rule.days_of_week == [0, 2, 3]
def test_to_dict_round_trips_new_fields():
rule = TimeOfDayRule("time_of_day", "08:00", "20:00", days_of_week=[1, 2], timezone="UTC")
d = rule.to_dict()
assert d["days_of_week"] == [1, 2]
assert d["timezone"] == "UTC"
again = TimeOfDayRule.from_dict(d)
assert again.days_of_week == [1, 2] and again.timezone == "UTC"
def test_now_in_tz_invalid_falls_back_to_local():
assert _now_in_tz("Not/AZone").tzinfo is None
def test_now_in_tz_valid_is_aware():
assert _now_in_tz("UTC").tzinfo is not None
def test_now_in_tz_empty_is_local():
assert _now_in_tz("").tzinfo is None
+94
View File
@@ -0,0 +1,94 @@
"""Unit tests for the WLED native realtime UDP packet builder."""
import numpy as np
from ledgrab.core.devices.wled_realtime_client import (
DEFAULT_REALTIME_TIMEOUT,
WledRealtimeClient,
_clamp_timeout,
)
def _rgb(n: int) -> np.ndarray:
return np.arange(n * 3, dtype=np.uint8).reshape(n, 3)
def test_drgb_small_rgb_strip():
c = WledRealtimeClient("1.2.3.4", timeout_secs=2)
pixels = _rgb(10)
packets = c.build_packets(pixels)
assert len(packets) == 1
p = packets[0]
assert p[0] == 2 # DRGB
assert p[1] == 2 # timeout seconds
assert len(p) == 2 + 10 * 3
assert p[2:] == pixels.tobytes()
def test_drgbw_sets_explicit_white_zero():
c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=5)
pixels = np.full((4, 3), 200, dtype=np.uint8)
packets = c.build_packets(pixels)
assert len(packets) == 1
p = packets[0]
assert p[0] == 3 # DRGBW
assert p[1] == 5
assert len(p) == 2 + 4 * 4
body = np.frombuffer(p[2:], dtype=np.uint8).reshape(4, 4)
assert (body[:, 0:3] == 200).all()
assert (body[:, 3] == 0).all() # white channel zeroed
def test_dnrgb_chunks_large_rgb_strip():
c = WledRealtimeClient("1.2.3.4", timeout_secs=3)
n = 1000 # > 490 -> DNRGB, > 489 per chunk -> 3 packets (489+489+22)
pixels = _rgb(n)
packets = c.build_packets(pixels)
assert len(packets) == 3
# Each packet starts with [4][timeout][start_hi][start_lo]
starts = []
total_leds = 0
for p in packets:
assert p[0] == 4 # DNRGB
assert p[1] == 3 # timeout
start = (p[2] << 8) | p[3]
starts.append(start)
leds = (len(p) - 4) // 3
total_leds += leds
assert starts == [0, 489, 978]
assert total_leds == n
def test_dnrgb_reassembles_to_original():
c = WledRealtimeClient("1.2.3.4", timeout_secs=1)
n = 700
pixels = _rgb(n)
out = bytearray()
for p in c.build_packets(pixels):
out += p[4:]
assert bytes(out) == pixels.tobytes()
def test_empty_frame_no_packets():
c = WledRealtimeClient("1.2.3.4")
assert c.build_packets(np.zeros((0, 3), dtype=np.uint8)) == []
def test_timeout_clamped_to_wire_range():
assert _clamp_timeout(0) == 1
assert _clamp_timeout(-5) == 1
assert _clamp_timeout(255) == 255
assert _clamp_timeout(1000) == 255
assert WledRealtimeClient("h", timeout_secs=0).timeout_secs == 1
def test_rgbw_over_capacity_falls_back_to_dnrgb():
# 400 RGBW LEDs (> 367) can't use DRGBW; falls back to DNRGB (RGB).
c = WledRealtimeClient("1.2.3.4", rgbw=True, timeout_secs=2)
packets = c.build_packets(_rgb(400))
assert all(p[0] == 4 for p in packets) # DNRGB
def test_default_timeout_constant():
assert DEFAULT_REALTIME_TIMEOUT == 2
assert WledRealtimeClient("h").timeout_secs == 2