feat(value-sources): add sandboxed-Jinja template combinator

A new `template` value source evaluates a hardened, sandboxed Jinja
expression over the live values of other value sources — the system's
first float combinator.

Backend:
- Shared engine (utils/template_expr.py): ImmutableSandboxedEnvironment with
  filters/tests and auto-injected globals stripped; only min/max/abs/round/
  clamp exposed; rejects **, string/collection-literal repetition, attribute
  access and non-global calls; NaN/inf-safe result coercion.
- TemplateValueSource model + TemplateValueStream runtime: compile-once,
  primitives-only eval context, raw[name] exposure, eval_interval throttle,
  ref-counted input acquire/release, rename-safe hot-update.
- Validation: unbound-variable + reserved-name rejection, reference
  cycle/depth guards (depth-only at create, full cycle at update), runtime
  acquire() depth backstop, and delete referential-integrity.
- API: Create/Update/Response schemas + discriminated unions, _RESPONSE_MAP,
  and an advisory POST /value-sources/validate-template endpoint.
- Demo seed: a static source plus a template combinator example.

Frontend:
- Editor modal section: repeatable inputs list (EntitySelect rows), a
  zero-dependency Jinja syntax highlighter, a hints/reference panel, and a
  debounced live validator that gates Save (stale-response-safe).
- Graph editor: read-only template node with one edge per input.
- i18n (en/ru/zh), icon, and card rendering.

Tests: engine, stream, factory/cycle, validate endpoint, and demo seed.
This commit is contained in:
2026-06-01 18:53:56 +03:00
parent 12b40e6071
commit 6de61b965e
30 changed files with 2805 additions and 12 deletions
+7 -2
View File
@@ -461,17 +461,22 @@ Reusable audio filter chains.
## Value sources ## Value sources
Dynamic data inputs (brightness and other parameters): static, animated, audio, adaptive, color, sensor, HTTP, and Home Assistant. Dynamic data inputs (brightness and other parameters): static, animated, audio, adaptive, color, sensor, HTTP, Home Assistant, and `template` — a sandboxed-Jinja **combinator** that evaluates an expression over the live values of other value sources.
| Method | Path | Description | | Method | Path | Description |
| ------ | ---- | ----------- | | ------ | ---- | ----------- |
| GET | `/api/v1/value-sources` | List all value sources (optional `source_type`). | | GET | `/api/v1/value-sources` | List all value sources (optional `source_type`). |
| POST | `/api/v1/value-sources` | Create a value source (discriminated by `source_type`). | | POST | `/api/v1/value-sources` | Create a value source (discriminated by `source_type`). |
| POST | `/api/v1/value-sources/validate-template` | Validate a template expression + inputs (advisory; always `200` with `{valid, error, errors, warnings, variables}`). |
| GET | `/api/v1/value-sources/{source_id}` | Get a value source by ID. | | GET | `/api/v1/value-sources/{source_id}` | Get a value source by ID. |
| PUT | `/api/v1/value-sources/{source_id}` | Update a value source; hot-reloads running streams. | | PUT | `/api/v1/value-sources/{source_id}` | Update a value source; hot-reloads running streams. |
| DELETE | `/api/v1/value-sources/{source_id}` | Delete a value source (`409` if referenced). | | DELETE | `/api/v1/value-sources/{source_id}` | Delete a value source (`400` if referenced by a target or another value source). |
| WS | `/api/v1/value-sources/{source_id}/test/ws` | Real-time value output stream (~20 Hz). | | WS | `/api/v1/value-sources/{source_id}/test/ws` | Real-time value output stream (~20 Hz). |
### Template value source (`source_type: "template"`)
A `float` combinator. Fields: `template` (a Jinja *expression*), `inputs` (`[{name, value_source_id}]` bindings to other value sources), `default_value` (fallback in `[0,1]` on any error), and `eval_interval` (optional re-eval throttle in seconds; `0`/null = every poll). At runtime each input is exposed by its `name` (the source's normalized `0..1` value) plus `raw[name]` (its un-normalized value, where available). Globals: `min`, `max`, `abs`, `round`, `clamp(x, lo=0, hi=1)`. The expression runs in a hardened `ImmutableSandboxedEnvironment` (no statements/blocks, filters, attribute access, `**`, or string repetition); results are coerced, NaN/inf-rejected, and clamped to `[0,1]`. Reference cycles and over-deep nesting are rejected at save time. For time-of-day logic, bind an `adaptive_time` or `daylight` source as an input.
## Weather sources ## Weather sources
Weather data providers feeding weather-driven value sources. Weather data providers feeding weather-driven value sources.
@@ -4,6 +4,7 @@ import asyncio
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, Body, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi import APIRouter, Body, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, Field
from ledgrab.api.auth import AuthRequired from ledgrab.api.auth import AuthRequired
from ledgrab.api.dependencies import ( from ledgrab.api.dependencies import (
@@ -27,6 +28,8 @@ from ledgrab.api.schemas.value_sources import (
StaticColorValueSourceResponse, StaticColorValueSourceResponse,
StaticValueSourceResponse, StaticValueSourceResponse,
SystemMetricsValueSourceResponse, SystemMetricsValueSourceResponse,
TemplateInput,
TemplateValueSourceResponse,
ValueSourceCreate, ValueSourceCreate,
ValueSourceListResponse, ValueSourceListResponse,
ValueSourceResponse, ValueSourceResponse,
@@ -46,6 +49,7 @@ from ledgrab.storage.value_source import (
StaticColorValueSource, StaticColorValueSource,
StaticValueSource, StaticValueSource,
SystemMetricsValueSource, SystemMetricsValueSource,
TemplateValueSource,
ValueSource, ValueSource,
) )
from ledgrab.storage.value_source_store import ValueSourceStore from ledgrab.storage.value_source_store import ValueSourceStore
@@ -231,6 +235,22 @@ _RESPONSE_MAP = {
max_value=s.max_value, max_value=s.max_value,
smoothing=s.smoothing, smoothing=s.smoothing,
), ),
TemplateValueSource: lambda s: TemplateValueSourceResponse(
id=s.id,
name=s.name,
description=s.description,
tags=s.tags,
icon=getattr(s, "icon", "") or "",
icon_color=getattr(s, "icon_color", "") or "",
created_at=s.created_at,
updated_at=s.updated_at,
template=s.template,
inputs=[
TemplateInput(name=i["name"], value_source_id=i["value_source_id"]) for i in s.inputs
],
default_value=s.default_value,
eval_interval=s.eval_interval,
),
} }
@@ -395,6 +415,13 @@ async def delete_value_source(
if getattr(target, "brightness_value_source_id", "") == source_id: if getattr(target, "brightness_value_source_id", "") == source_id:
raise ValueError(f"Cannot delete: referenced by target '{target.name}'") raise ValueError(f"Cannot delete: referenced by target '{target.name}'")
# Check if any other value source (template / gradient_map) references it.
referencing = store.find_referencing_sources(source_id)
if referencing:
raise ValueError(
"Cannot delete: referenced by value source(s) " + ", ".join(referencing)
)
store.delete_source(source_id) store.delete_source(source_id)
fire_entity_event("value_source", "deleted", source_id) fire_entity_event("value_source", "deleted", source_id)
except EntityNotFoundError as e: except EntityNotFoundError as e:
@@ -404,6 +431,100 @@ async def delete_value_source(
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
class ValidateTemplateRequest(BaseModel):
"""Request body for the advisory template-validation endpoint."""
template: str = Field(description="Jinja2 expression to validate", max_length=2000)
inputs: list[TemplateInput] = Field(default_factory=list, description="Named input bindings")
id: str | None = Field(None, description="Source id when editing (enables cycle detection)")
@router.post("/api/v1/value-sources/validate-template", tags=["Value Sources"])
async def validate_template_value_source(
payload: ValidateTemplateRequest,
_auth: AuthRequired,
store: ValueSourceStore = Depends(get_value_source_store),
):
"""Validate a template expression + inputs without persisting anything.
Advisory: always returns HTTP 200 with ``{valid, error, errors, warnings,
variables}``. Powers the live editor validator (which must run before a
source exists), reusing the exact factory/store validation so the client and
server can never disagree. ``errors`` are blocking (save disabled);
``warnings`` are non-blocking (e.g. unknown/unbound inputs — create is
lenient about those).
"""
from ledgrab.utils.template_expr import (
TemplateValidationError,
extract_variables,
validate_input_name,
validate_template_expression,
)
errors: list[str] = []
warnings: list[str] = []
# 1) Expression compiles and is safe (cost-guarded).
try:
validate_template_expression(payload.template)
except TemplateValidationError as e:
errors.append(str(e))
# 2) Input names valid / unique / non-reserved (blocking).
seen: set[str] = set()
for inp in payload.inputs:
try:
validate_input_name(inp.name)
except TemplateValidationError as e:
errors.append(str(e))
continue
if inp.name in seen:
errors.append(f"duplicate input name: {inp.name}")
seen.add(inp.name)
# 3) Referenced sources exist (non-blocking warning — create is lenient).
missing = [
inp.value_source_id
for inp in payload.inputs
if inp.value_source_id and not _source_exists(store, inp.value_source_id)
]
if missing:
warnings.append("unknown value source(s): " + ", ".join(sorted(set(missing))))
# 4) Variables referenced in the expression but not bound to an input
# (blocking): at runtime they raise UndefinedError, so the template would
# silently always return default_value. This is almost always a typo, so
# flag it as an error rather than letting "valid" mislead the user.
used = set(extract_variables(payload.template))
undeclared = used - seen
if undeclared:
errors.append("unbound variable(s): " + ", ".join(sorted(undeclared)))
# 5) Cycle check when editing an existing source (blocking).
if payload.id:
child_ids = [i.value_source_id for i in payload.inputs if i.value_source_id]
try:
store.validate_nesting(payload.id, child_ids)
except ValueError as e:
errors.append(str(e))
return {
"valid": not errors,
"error": errors[0] if errors else None,
"errors": errors,
"warnings": warnings,
"variables": extract_variables(payload.template),
}
def _source_exists(store: ValueSourceStore, source_id: str) -> bool:
try:
store.get_source(source_id)
return True
except Exception:
return False
# ===== REAL-TIME VALUE SOURCE TEST WEBSOCKET ===== # ===== REAL-TIME VALUE SOURCE TEST WEBSOCKET =====
@@ -10,6 +10,17 @@ from pydantic import BaseModel, Discriminator, Field, Tag
# ===================================================================== # =====================================================================
class TemplateInput(BaseModel):
"""A single ``{name -> value_source_id}`` binding for a template source."""
name: str = Field(
description="Variable name used in the expression (valid identifier)",
min_length=1,
max_length=64,
)
value_source_id: str = Field("", description="Bound value source ID (empty = unbound)")
class _ValueSourceResponseBase(BaseModel): class _ValueSourceResponseBase(BaseModel):
"""Shared fields for all value source responses.""" """Shared fields for all value source responses."""
@@ -162,6 +173,19 @@ class HTTPValueSourceResponse(_ValueSourceResponseBase):
smoothing: float = Field(description="EMA smoothing factor (0.0-1.0)") smoothing: float = Field(description="EMA smoothing factor (0.0-1.0)")
class TemplateValueSourceResponse(_ValueSourceResponseBase):
source_type: Literal["template"] = "template"
return_type: Literal["float"] = "float"
template: str = Field(description="Jinja2 expression")
inputs: List[TemplateInput] = Field(
default_factory=list, description="Named value-source bindings"
)
default_value: float = Field(description="Fallback when the expression errors (0.0-1.0)")
eval_interval: float | None = Field(
None, description="Re-eval throttle in seconds (None/0 = every poll)"
)
ValueSourceResponse = Annotated[ ValueSourceResponse = Annotated[
Annotated[StaticValueSourceResponse, Tag("static")] Annotated[StaticValueSourceResponse, Tag("static")]
| Annotated[AnimatedValueSourceResponse, Tag("animated")] | Annotated[AnimatedValueSourceResponse, Tag("animated")]
@@ -176,7 +200,8 @@ ValueSourceResponse = Annotated[
| Annotated[GradientMapValueSourceResponse, Tag("gradient_map")] | Annotated[GradientMapValueSourceResponse, Tag("gradient_map")]
| Annotated[CSSExtractValueSourceResponse, Tag("css_extract")] | Annotated[CSSExtractValueSourceResponse, Tag("css_extract")]
| Annotated[SystemMetricsValueSourceResponse, Tag("system_metrics")] | Annotated[SystemMetricsValueSourceResponse, Tag("system_metrics")]
| Annotated[HTTPValueSourceResponse, Tag("http")], | Annotated[HTTPValueSourceResponse, Tag("http")]
| Annotated[TemplateValueSourceResponse, Tag("template")],
Discriminator("source_type"), Discriminator("source_type"),
] ]
@@ -330,6 +355,27 @@ class HTTPValueSourceCreate(_ValueSourceCreateBase):
smoothing: float = Field(0.0, description="EMA smoothing (0.0-1.0)", ge=0.0, le=1.0) smoothing: float = Field(0.0, description="EMA smoothing (0.0-1.0)", ge=0.0, le=1.0)
class TemplateValueSourceCreate(_ValueSourceCreateBase):
source_type: Literal["template"] = "template"
template: str = Field(
description=(
"Jinja2 expression (no statements/blocks). Inputs are exposed by name and via "
"raw[name]; globals: min, max, abs, round, clamp(x, lo=0, hi=1)."
),
min_length=1,
max_length=2000,
)
inputs: List[TemplateInput] = Field(
default_factory=list, description="Named value-source bindings"
)
default_value: float = Field(
0.0, description="Fallback when the expression errors (0.0-1.0)", ge=0.0, le=1.0
)
eval_interval: float | None = Field(
None, description="Re-eval throttle in seconds (None/0 = every poll)", ge=0.0
)
ValueSourceCreate = Annotated[ ValueSourceCreate = Annotated[
Annotated[StaticValueSourceCreate, Tag("static")] Annotated[StaticValueSourceCreate, Tag("static")]
| Annotated[AnimatedValueSourceCreate, Tag("animated")] | Annotated[AnimatedValueSourceCreate, Tag("animated")]
@@ -344,7 +390,8 @@ ValueSourceCreate = Annotated[
| Annotated[GradientMapValueSourceCreate, Tag("gradient_map")] | Annotated[GradientMapValueSourceCreate, Tag("gradient_map")]
| Annotated[CSSExtractValueSourceCreate, Tag("css_extract")] | Annotated[CSSExtractValueSourceCreate, Tag("css_extract")]
| Annotated[SystemMetricsValueSourceCreate, Tag("system_metrics")] | Annotated[SystemMetricsValueSourceCreate, Tag("system_metrics")]
| Annotated[HTTPValueSourceCreate, Tag("http")], | Annotated[HTTPValueSourceCreate, Tag("http")]
| Annotated[TemplateValueSourceCreate, Tag("template")],
Discriminator("source_type"), Discriminator("source_type"),
] ]
@@ -490,6 +537,20 @@ class HTTPValueSourceUpdate(_ValueSourceUpdateBase):
smoothing: float | None = Field(None, description="EMA smoothing", ge=0.0, le=1.0) smoothing: float | None = Field(None, description="EMA smoothing", ge=0.0, le=1.0)
class TemplateValueSourceUpdate(_ValueSourceUpdateBase):
source_type: Literal["template"] = "template"
template: str | None = Field(
None, description="Jinja2 expression", min_length=1, max_length=2000
)
inputs: List[TemplateInput] | None = Field(None, description="Named value-source bindings")
default_value: float | None = Field(
None, description="Fallback when the expression errors (0.0-1.0)", ge=0.0, le=1.0
)
eval_interval: float | None = Field(
None, description="Re-eval throttle in seconds (0 = every poll)", ge=0.0
)
ValueSourceUpdate = Annotated[ ValueSourceUpdate = Annotated[
Annotated[StaticValueSourceUpdate, Tag("static")] Annotated[StaticValueSourceUpdate, Tag("static")]
| Annotated[AnimatedValueSourceUpdate, Tag("animated")] | Annotated[AnimatedValueSourceUpdate, Tag("animated")]
@@ -504,7 +565,8 @@ ValueSourceUpdate = Annotated[
| Annotated[GradientMapValueSourceUpdate, Tag("gradient_map")] | Annotated[GradientMapValueSourceUpdate, Tag("gradient_map")]
| Annotated[CSSExtractValueSourceUpdate, Tag("css_extract")] | Annotated[CSSExtractValueSourceUpdate, Tag("css_extract")]
| Annotated[SystemMetricsValueSourceUpdate, Tag("system_metrics")] | Annotated[SystemMetricsValueSourceUpdate, Tag("system_metrics")]
| Annotated[HTTPValueSourceUpdate, Tag("http")], | Annotated[HTTPValueSourceUpdate, Tag("http")]
| Annotated[TemplateValueSourceUpdate, Tag("template")],
Discriminator("source_type"), Discriminator("source_type"),
] ]
+40
View File
@@ -40,6 +40,11 @@ _AS_IDS = {
"system": "as_demo0001", "system": "as_demo0001",
} }
_VS_IDS = {
"level": "vs_demo0001",
"boost": "vs_demo0002",
}
_TPL_ID = "tpl_demo0001" _TPL_ID = "tpl_demo0001"
_SCENE_ID = "scene_demo0001" _SCENE_ID = "scene_demo0001"
@@ -86,6 +91,7 @@ def seed_demo_data(db: Database) -> None:
_insert_entities(db, "picture_sources", _build_picture_sources()) _insert_entities(db, "picture_sources", _build_picture_sources())
_insert_entities(db, "color_strip_sources", _build_color_strip_sources()) _insert_entities(db, "color_strip_sources", _build_color_strip_sources())
_insert_entities(db, "audio_sources", _build_audio_sources()) _insert_entities(db, "audio_sources", _build_audio_sources())
_insert_entities(db, "value_sources", _build_value_sources())
_insert_entities(db, "scene_presets", _build_scene_presets()) _insert_entities(db, "scene_presets", _build_scene_presets())
logger.info("Demo seed data complete") logger.info("Demo seed data complete")
@@ -334,6 +340,40 @@ def _build_audio_sources() -> dict:
} }
# ── Value Sources ──────────────────────────────────────────────────
def _build_value_sources() -> dict:
"""A static float source plus a template combinator that references it,
so demo mode showcases the Jinja template value source out of the box."""
return {
_VS_IDS["level"]: {
"id": _VS_IDS["level"],
"name": "Base Level",
"source_type": "static",
"description": "A constant brightness level (demo input for the template below)",
"tags": ["demo"],
"value": 0.5,
"created_at": _NOW,
"updated_at": _NOW,
},
_VS_IDS["boost"]: {
"id": _VS_IDS["boost"],
"name": "Boosted Level (template)",
"source_type": "template",
"return_type": "float",
"description": "Jinja combinator: clamps 1.5x the Base Level into [0,1]",
"tags": ["demo"],
"template": "clamp(level * 1.5)",
"inputs": [{"name": "level", "value_source_id": _VS_IDS["level"]}],
"default_value": 0.0,
"eval_interval": None,
"created_at": _NOW,
"updated_at": _NOW,
},
}
# ── Scene Presets ────────────────────────────────────────────────── # ── Scene Presets ──────────────────────────────────────────────────
@@ -267,6 +267,20 @@ def _build_http(source, d: ValueStreamDeps):
) )
def _build_template(source, d: ValueStreamDeps):
# References other value sources via d.value_stream_manager (recursively
# acquired in start()), exactly like _build_gradient_map.
from ledgrab.core.processing.value_stream import TemplateValueStream
return TemplateValueStream(
template=source.template,
inputs=source.inputs,
default_value=source.default_value,
eval_interval=source.eval_interval,
value_stream_manager=d.value_stream_manager,
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Registry # Registry
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -290,6 +304,7 @@ STREAM_BUILDERS: dict[str, StreamBuilder] = {
"system_metrics": _build_system_metrics, "system_metrics": _build_system_metrics,
"game_event": _build_game_event, "game_event": _build_game_event,
"http": _build_http, "http": _build_http,
"template": _build_template,
} }
@@ -34,6 +34,11 @@ import numpy as np
from ledgrab.core.processing import metric_readers as _metric_readers from ledgrab.core.processing import metric_readers as _metric_readers
from ledgrab.storage.base_store import EntityNotFoundError from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.utils import get_logger from ledgrab.utils import get_logger
from ledgrab.utils.template_expr import (
TemplateValidationError,
compile_template,
finalize_result,
)
# Compiled once — used by ``_extract_simple_path`` on every poll. # Compiled once — used by ``_extract_simple_path`` on every poll.
_NAME_HEAD_RE = re.compile(r"^([^\[]*)") _NAME_HEAD_RE = re.compile(r"^([^\[]*)")
@@ -53,6 +58,12 @@ if TYPE_CHECKING:
logger = get_logger(__name__) logger = get_logger(__name__)
# Runtime cap on recursive value-stream acquisition (referencing sources like
# template / gradient_map re-enter acquire() from start()). Higher than the
# storage-level MAX_VALUE_SOURCE_DEPTH (8) so legitimate chains never trip it;
# it only fires on a cycle that bypassed storage validation.
_MAX_ACQUIRE_DEPTH = 12
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Base class # Base class
@@ -1365,6 +1376,168 @@ class GradientMapValueStream(ValueStream):
self._inner_stream = None self._inner_stream = None
# ---------------------------------------------------------------------------
# Template (Jinja expression combinator)
# ---------------------------------------------------------------------------
class TemplateValueStream(ValueStream):
"""Evaluates a hardened sandboxed-Jinja expression over the live values of
other value sources (the system's float combinator).
Acquires each referenced input stream from the manager on ``start()`` and
releases it on ``stop()`` — the same ref-counted protocol as
:class:`GradientMapValueStream`, but over a *set* of inputs. Acquisition is
tracked per unique ``value_source_id`` so two variables bound to the same
source share one ref. ``get_value()`` builds a primitives-only context
(each input's normalized ``get_value()`` plus a float-only ``raw`` dict),
evaluates the compiled expression, then coerces / NaN-guards / clamps the
result. Any error — or an uncompilable template — falls back to
``default_value``. An optional ``eval_interval`` caches the last result to
bound steady-state evaluation cost.
"""
def __init__(
self,
template: str,
inputs: List[dict],
default_value: float = 0.0,
eval_interval: float | None = None,
value_stream_manager: "ValueStreamManager" | None = None,
):
self._template = template
self._inputs = [dict(i) for i in (inputs or [])]
self._default = max(0.0, min(1.0, float(default_value)))
self._eval_interval = float(eval_interval) if eval_interval else 0.0
self._vsm = value_stream_manager
self._streams_by_id: Dict[str, ValueStream] = {} # value_source_id -> stream
self._expr = self._compile(template)
self._last_value: float = self._default
self._last_eval: float = 0.0
self._has_value = False
self._error_logged = False
@staticmethod
def _compile(template: str):
"""Compile once; return ``None`` (→ always default) on invalid template.
Creation should already have rejected invalid templates via the factory;
this is defense in depth so a bad row never crashes the engine.
"""
try:
return compile_template(template)
except TemplateValidationError as e:
logger.warning("TemplateValueStream: invalid template, using default (%s)", e)
return None
@staticmethod
def _unique_ids(inputs: List[dict]) -> set:
return {i["value_source_id"] for i in inputs if i.get("value_source_id")}
def start(self) -> None:
if not self._vsm:
return
for vs_id in self._unique_ids(self._inputs):
try:
self._streams_by_id[vs_id] = self._vsm.acquire(vs_id)
except Exception as e:
logger.warning("TemplateValueStream: failed to acquire input %s: %s", vs_id, e)
def stop(self) -> None:
if self._vsm:
for vs_id in list(self._streams_by_id):
try:
self._vsm.release(vs_id)
except Exception as e:
logger.debug("TemplateValueStream: release %s failed: %s", vs_id, e)
self._streams_by_id.clear()
self._has_value = False
def get_value(self) -> float:
if self._expr is None:
return self._default
if (
self._eval_interval > 0.0
and self._has_value
and (time.monotonic() - self._last_eval) < self._eval_interval
):
return self._last_value
try:
ctx: Dict[str, Any] = {}
raw: Dict[str, float] = {}
for inp in self._inputs:
name = inp.get("name")
vs_id = inp.get("value_source_id")
if not name or not vs_id:
continue
stream = self._streams_by_id.get(vs_id)
if stream is None:
continue
ctx[name] = float(stream.get_value())
getter = getattr(stream, "get_raw_value", None)
if getter is not None:
rv = getter()
if rv is not None:
try:
raw[name] = float(rv)
except (TypeError, ValueError):
# Non-numeric raw values never cross into the sandbox.
pass
ctx["raw"] = raw
# Globals (min/max/abs/round/clamp) resolve from SANDBOX_ENV.globals.
value = finalize_result(self._expr(**ctx), self._default)
except Exception as e:
if not self._error_logged:
logger.warning("TemplateValueStream eval error (using default): %s", e)
self._error_logged = True
value = self._default
self._last_value = value
self._last_eval = time.monotonic()
self._has_value = True
return value
def update_source(self, source: "ValueSource") -> None:
from ledgrab.storage.value_source import TemplateValueSource
if not isinstance(source, TemplateValueSource):
return
if source.template != self._template:
self._template = source.template
self._expr = self._compile(source.template)
self._error_logged = False
self._default = max(0.0, min(1.0, float(source.default_value)))
self._eval_interval = float(source.eval_interval) if source.eval_interval else 0.0
new_inputs = [dict(i) for i in (source.inputs or [])]
old_ids = set(self._streams_by_id)
new_ids = self._unique_ids(new_inputs)
if self._vsm:
# Release-before-acquire (mirrors GradientMapValueStream); safe under
# ref-counting. Unchanged ids keep their existing stream untouched.
for vs_id in old_ids - new_ids:
try:
self._vsm.release(vs_id)
except Exception as e:
logger.debug("TemplateValueStream: release %s failed: %s", vs_id, e)
self._streams_by_id.pop(vs_id, None)
for vs_id in new_ids - old_ids:
try:
self._streams_by_id[vs_id] = self._vsm.acquire(vs_id)
except Exception as e:
logger.warning("TemplateValueStream: acquire %s failed: %s", vs_id, e)
# Rebuild inputs (re-keys variable names on rename even when id unchanged,
# since get_value() maps name -> stream via value_source_id each tick).
self._inputs = new_inputs
self._has_value = False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# CSS Extract # CSS Extract
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -1644,6 +1817,10 @@ class ValueStreamManager:
self._http_endpoint_store = http_endpoint_store self._http_endpoint_store = http_endpoint_store
self._streams: Dict[str, ValueStream] = {} # vs_id → stream self._streams: Dict[str, ValueStream] = {} # vs_id → stream
self._ref_counts: Dict[str, int] = {} # vs_id → ref count self._ref_counts: Dict[str, int] = {} # vs_id → ref count
# Recursion-depth backstop for referencing sources (template / gradient
# map). A cycle that slipped past storage validation (e.g. a hand-edited
# DB or restored backup) would otherwise overflow the stack at acquire().
self._acquire_depth = 0
# Tracks which clock_id (if any) was acquired for each stream so we # Tracks which clock_id (if any) was acquired for each stream so we
# can release/swap it without re-querying the store at teardown time. # can release/swap it without re-querying the store at teardown time.
self._stream_clock_ids: Dict[str, str] = {} # vs_id → clock_id self._stream_clock_ids: Dict[str, str] = {} # vs_id → clock_id
@@ -1659,9 +1836,29 @@ class ValueStreamManager:
logger.info(f"Shared value stream {vs_id} (refs={self._ref_counts[vs_id]})") logger.info(f"Shared value stream {vs_id} (refs={self._ref_counts[vs_id]})")
return self._streams[vs_id] return self._streams[vs_id]
if self._acquire_depth >= _MAX_ACQUIRE_DEPTH:
logger.warning(
"Value source acquire depth limit (%d) reached at %s; returning "
"static fallback (possible reference cycle)",
_MAX_ACQUIRE_DEPTH,
vs_id,
)
# The intermediate referencing streams built while descending a
# cyclic chain are not stop()'d here — but this only triggers on a
# stored cycle that storage validation already rejects (e.g. a
# hand-edited DB / corrupt restore), so those transient objects are
# simply garbage-collected. Normal graphs never reach this depth.
return StaticValueStream(0.5)
source = self._value_source_store.get_source(vs_id) source = self._value_source_store.get_source(vs_id)
stream = self._create_stream(source, vs_id) # Increment around create+start: a referencing stream (template /
stream.start() # gradient_map) re-enters acquire() from its own start().
self._acquire_depth += 1
try:
stream = self._create_stream(source, vs_id)
stream.start()
finally:
self._acquire_depth -= 1
self._streams[vs_id] = stream self._streams[vs_id] = stream
self._ref_counts[vs_id] = 1 self._ref_counts[vs_id] = 1
logger.info(f"Acquired value stream {vs_id} (type={source.source_type})") logger.info(f"Acquired value stream {vs_id} (type={source.source_type})")
@@ -298,6 +298,214 @@ select.field-invalid {
line-height: 1.3; line-height: 1.3;
} }
.field-ok-msg {
display: flex;
align-items: center;
gap: 4px;
color: var(--success-color);
font-size: 0.78rem;
margin-top: 4px;
line-height: 1.3;
}
.field-ok-msg .icon { width: 14px; height: 14px; }
.field-error-msg .icon { width: 14px; height: 14px; vertical-align: -2px; margin-right: 3px; }
.field-warn-msg {
display: block;
color: var(--text-muted);
font-size: 0.76rem;
margin-top: 4px;
line-height: 1.35;
}
/* ── Jinja expression editor ─────────────────────────────────────
A transparent <textarea> layered over a synced highlight <pre>.
Both share identical type metrics so the colour layer aligns with
the typed glyphs. The shared box rules below MUST stay in sync. */
.jinja-editor {
position: relative;
display: block;
border: 1px solid var(--border-color);
border-radius: var(--radius-sm);
background: var(--surface-2, color-mix(in srgb, var(--text-color) 6%, var(--bg-color)));
overflow: hidden;
transition: border-color 0.15s ease, box-shadow 0.15s ease;
}
.jinja-editor:focus-within {
border-color: var(--primary-color);
box-shadow: 0 0 0 2px color-mix(in srgb, var(--primary-color) 18%, transparent);
}
.jinja-editor.field-invalid {
border-color: var(--danger-color);
box-shadow: 0 0 0 2px color-mix(in srgb, var(--danger-color) 15%, transparent);
}
/* Shared metrics — applied identically to both layers. */
.jinja-hl,
.jinja-input {
margin: 0;
padding: 10px 12px;
font-family: var(--font-mono);
font-size: 0.86rem;
line-height: 1.55;
letter-spacing: 0;
tab-size: 2;
white-space: pre;
word-wrap: normal;
overflow: auto;
box-sizing: border-box;
}
.jinja-hl {
position: absolute;
inset: 0;
pointer-events: none;
color: var(--text-color);
overflow: hidden; /* scroll is mirrored from the textarea */
user-select: none;
}
.jinja-input {
position: relative;
display: block;
width: 100%;
min-height: 4.6em; /* ~3 lines */
resize: vertical;
border: none;
background: transparent;
color: transparent; /* glyphs are painted by .jinja-hl underneath */
caret-color: var(--text-color);
outline: none;
}
/* The global `textarea:focus` rule (higher specificity than `.jinja-input`)
sets an opaque background; on this overlay editor that would cover the
`.jinja-hl` highlight layer and hide the transparent glyphs on focus. Keep
the textarea fully transparent — the focus ring is drawn by the wrapper's
`.jinja-editor:focus-within`. */
.jinja-input:focus {
background: transparent;
color: transparent;
box-shadow: none;
}
.jinja-input::selection { background: color-mix(in srgb, var(--primary-color) 30%, transparent); }
/* Token palette — restrained, three accents plus muted operators. */
.jinja-hl .tok-str { color: var(--success-color); }
.jinja-hl .tok-num { color: #d19a66; }
.jinja-hl .tok-fn { color: var(--primary-color); font-weight: 600; }
.jinja-hl .tok-raw { color: #c678dd; font-style: italic; }
.jinja-hl .tok-var { color: #61afef; }
.jinja-hl .tok-op { color: var(--text-muted); }
/* ── Template-input rows ─────────────────────────────────────── */
.template-inputs-list {
display: flex;
flex-direction: column;
gap: 8px;
margin-bottom: 8px;
}
.template-input-row {
display: grid;
grid-template-columns: minmax(96px, 0.42fr) minmax(0, 1fr) auto;
align-items: center;
gap: 8px;
}
.template-input-name {
font-family: var(--font-mono);
font-size: 0.84rem;
}
.template-input-row .entity-select-trigger { width: 100%; }
.template-inputs-empty {
color: var(--text-muted);
font-size: 0.82rem;
padding: 6px 2px;
}
/* ── Expression hints panel ──────────────────────────────────── */
.jinja-hints {
margin-top: 10px;
border: 1px solid var(--border-color);
border-radius: var(--radius-sm);
background: color-mix(in srgb, var(--text-color) 3%, transparent);
}
.jinja-hints > summary {
cursor: pointer;
list-style: none;
padding: 8px 12px;
font-size: 0.8rem;
font-weight: 600;
color: var(--text-secondary);
user-select: none;
}
.jinja-hints > summary::-webkit-details-marker { display: none; }
.jinja-hints > summary::before {
content: '';
display: inline-block;
margin-right: 6px;
transition: transform 0.15s ease;
color: var(--text-muted);
}
.jinja-hints[open] > summary::before { transform: rotate(90deg); }
.jinja-hints-body {
padding: 4px 14px 12px;
font-size: 0.8rem;
line-height: 1.5;
color: var(--text-secondary);
}
.jinja-hints-body code,
.jinja-hints-body .jinja-hints-vars code {
font-family: var(--font-mono);
font-size: 0.78rem;
padding: 1px 5px;
border-radius: var(--radius-sm);
background: color-mix(in srgb, var(--text-color) 8%, transparent);
color: var(--text-color);
}
.jinja-hints-section { margin-top: 8px; }
.jinja-hints-section-title {
display: block;
font-size: 0.72rem;
text-transform: uppercase;
letter-spacing: 0.06em;
color: var(--text-muted);
margin-bottom: 3px;
}
.jinja-hints-vars {
display: flex;
flex-wrap: wrap;
gap: 5px;
}
.jinja-hints-vars .tok-var-chip {
font-family: var(--font-mono);
font-size: 0.76rem;
padding: 1px 6px;
border-radius: var(--radius-pill);
background: color-mix(in srgb, #61afef 16%, transparent);
color: #61afef;
}
.jinja-hints-examples { margin: 4px 0 0; padding-left: 0; list-style: none; }
.jinja-hints-examples li { margin: 3px 0; }
.jinja-hints-empty { color: var(--text-muted); font-size: 0.76rem; font-style: italic; }
.jinja-hints-time { color: var(--text-muted); font-size: 0.76rem; }
/* Remove browser autofill styling */ /* Remove browser autofill styling */
input:-webkit-autofill, input:-webkit-autofill,
input:-webkit-autofill:hover, input:-webkit-autofill:hover,
+2
View File
@@ -187,6 +187,7 @@ import {
addSchedulePoint, addSchedulePoint,
addAnimatedColor, removeAnimatedColor, addAnimatedColor, removeAnimatedColor,
addColorSchedulePoint, removeColorSchedulePoint, addColorSchedulePoint, removeColorSchedulePoint,
addTemplateInput,
testValueSource, closeTestValueSourceModal, testValueSource, closeTestValueSourceModal,
} from './features/value-sources.ts'; } from './features/value-sources.ts';
@@ -584,6 +585,7 @@ Object.assign(window, {
removeAnimatedColor, removeAnimatedColor,
addColorSchedulePoint, addColorSchedulePoint,
removeColorSchedulePoint, removeColorSchedulePoint,
addTemplateInput,
testValueSource, testValueSource,
closeTestValueSourceModal, closeTestValueSourceModal,
@@ -176,6 +176,11 @@ const CONNECTION_MAP: ConnectionEntry[] = [
{ targetKind: 'value_source', field: 'value_source_id', sourceKind: 'value_source', edgeType: 'value', endpoint: '/value-sources/{id}', cache: valueSourcesCache }, { targetKind: 'value_source', field: 'value_source_id', sourceKind: 'value_source', edgeType: 'value', endpoint: '/value-sources/{id}', cache: valueSourcesCache },
{ targetKind: 'value_source', field: 'gradient_id', sourceKind: 'gradient', edgeType: 'gradient', endpoint: '/value-sources/{id}', cache: valueSourcesCache }, { targetKind: 'value_source', field: 'gradient_id', sourceKind: 'gradient', edgeType: 'gradient', endpoint: '/value-sources/{id}', cache: valueSourcesCache },
{ targetKind: 'value_source', field: 'color_strip_source_id', sourceKind: 'color_strip_source', edgeType: 'colorstrip', endpoint: '/value-sources/{id}', cache: valueSourcesCache }, { targetKind: 'value_source', field: 'color_strip_source_id', sourceKind: 'color_strip_source', edgeType: 'colorstrip', endpoint: '/value-sources/{id}', cache: valueSourcesCache },
// TODO: template.inputs[] drag-wiring — template value sources reference one
// inner value source per bound input (field path inputs[<name>].value_source_id).
// These render as read-only 'value' edges in graph-layout for now; a list-aware
// CONNECTION_MAP entry (with list/index/ref slot metadata) would make them
// re-wirable from the graph the way composite layers / mapped zones are.
// Color strip sources // Color strip sources
{ targetKind: 'color_strip_source', field: 'picture_source_id', sourceKind: 'picture_source', edgeType: 'picture', endpoint: '/color-strip-sources/{id}', cache: colorStripSourcesCache }, { targetKind: 'color_strip_source', field: 'picture_source_id', sourceKind: 'picture_source', edgeType: 'picture', endpoint: '/color-strip-sources/{id}', cache: colorStripSourcesCache },
@@ -357,6 +357,16 @@ function buildGraph(e: EntitiesInput): { nodes: LayoutNode[]; edges: LayoutEdge[
// references (and drag-editable) — render them so they're visible. // references (and drag-editable) — render them so they're visible.
if (s.value_source_id) addEdge(s.value_source_id, s.id, 'value_source_id'); if (s.value_source_id) addEdge(s.value_source_id, s.id, 'value_source_id');
if (s.color_strip_source_id) addEdge(s.color_strip_source_id, s.id, 'color_strip_source_id'); if (s.color_strip_source_id) addEdge(s.color_strip_source_id, s.id, 'color_strip_source_id');
// Template value sources reference one inner value source per bound input.
// Each `inputs[].value_source_id` is a real 'value' edge; the dotted field
// path marks it non-editable (drag-wiring deferred — see graph-connections).
if (s.source_type === 'template' && Array.isArray(s.inputs)) {
s.inputs.forEach((inp: any) => {
if (inp?.value_source_id) {
addEdge(inp.value_source_id, s.id, `inputs[${inp.name}].value_source_id`);
}
});
}
} }
// Color strip source edges // Color strip source edges
@@ -106,6 +106,7 @@ const SUBTYPE_ICONS = {
value_source: { value_source: {
static: P.layoutDashboard, animated: P.refreshCw, audio: P.music, static: P.layoutDashboard, animated: P.refreshCw, audio: P.music,
adaptive_time: P.clock, adaptive_scene: P.cloudSun, daylight: P.sun, adaptive_time: P.clock, adaptive_scene: P.cloudSun, daylight: P.sun,
template: P.code,
}, },
audio_source: { capture: P.volume2, processed: P.slidersHorizontal }, audio_source: { capture: P.volume2, processed: P.slidersHorizontal },
output_target: { led: P.lightbulb, wled: P.lightbulb, ha_light: P.lightbulb }, output_target: { led: P.lightbulb, wled: P.lightbulb, ha_light: P.lightbulb },
@@ -43,6 +43,7 @@ const _valueSourceTypeIcons = {
system_metrics: _svg(P.cpu), system_metrics: _svg(P.cpu),
game_event: _svg(P.gamepad2), game_event: _svg(P.gamepad2),
http: _svg(P.globe), http: _svg(P.globe),
template: _svg(P.code),
}; };
const _audioSourceTypeIcons = { capture: _svg(P.volume2), processed: _svg(P.slidersHorizontal) }; const _audioSourceTypeIcons = { capture: _svg(P.volume2), processed: _svg(P.slidersHorizontal) };
const _deviceTypeIcons = { const _deviceTypeIcons = {
@@ -0,0 +1,191 @@
/**
* Tiny zero-dependency Jinja-expression highlighter.
*
* A transparent <textarea> is layered over a synced <pre class="jinja-hl">.
* On every input the text is re-tokenised and painted into the <pre> so the
* caret and selection stay native while the colours live underneath. The two
* layers share identical font metrics (set in CSS via --font-mono) so the
* highlight aligns pixel-perfectly with the typed glyphs.
*
* Tokenised: strings, numbers, the sandbox globals (min|max|abs|round|clamp),
* the `raw` keyword, bound input variable names (supplied live via
* getInputNames), and operators. Everything else renders as plain text.
*
* Usage:
* const ed = create({ textarea, getInputNames: () => ['audio','temp'], onChange });
* ed.refresh(); // re-paint after the input list or value changes externally
* ed.destroy();
*/
/** Globals available inside the sandboxed expression (see backend contract). */
const JINJA_GLOBALS = new Set(['min', 'max', 'abs', 'round', 'clamp']);
const JINJA_RAW = 'raw';
export interface JinjaEditorOpts {
textarea: HTMLTextAreaElement;
getInputNames: () => string[];
onChange?: (value: string) => void;
}
export interface JinjaEditorHandle {
/** Re-paint the highlight layer (e.g. after the bound-input list changed). */
refresh: () => void;
/** Detach listeners and remove the highlight overlay. */
destroy: () => void;
}
function escapeHtml(s: string): string {
return s
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;');
}
type TokenKind = 'str' | 'num' | 'fn' | 'raw' | 'var' | 'op' | 'text';
interface Token {
kind: TokenKind;
text: string;
}
/**
* Tokenise a Jinja expression. Deliberately small — this is presentational
* only; the backend is the source of truth for validity.
*/
function tokenize(src: string, inputNames: Set<string>): Token[] {
const tokens: Token[] = [];
const n = src.length;
let i = 0;
const push = (kind: TokenKind, text: string) => {
// Coalesce consecutive plain-text runs to keep the DOM tiny.
const last = tokens[tokens.length - 1];
if (kind === 'text' && last && last.kind === 'text') last.text += text;
else tokens.push({ kind, text });
};
while (i < n) {
const ch = src[i];
// Strings — single or double quoted, with simple escape passthrough.
if (ch === '"' || ch === "'") {
const quote = ch;
let j = i + 1;
while (j < n && src[j] !== quote) {
if (src[j] === '\\' && j + 1 < n) j += 2;
else j += 1;
}
j = Math.min(j + 1, n); // include closing quote if present
push('str', src.slice(i, j));
i = j;
continue;
}
// Numbers — integer / float.
if (ch >= '0' && ch <= '9') {
let j = i + 1;
while (j < n && /[0-9._]/.test(src[j])) j += 1;
push('num', src.slice(i, j));
i = j;
continue;
}
// Identifiers — globals, the `raw` keyword, bound input names, or plain.
if (/[A-Za-z_]/.test(ch)) {
let j = i + 1;
while (j < n && /[A-Za-z0-9_]/.test(src[j])) j += 1;
const word = src.slice(i, j);
if (JINJA_GLOBALS.has(word)) push('fn', word);
else if (word === JINJA_RAW) push('raw', word);
else if (inputNames.has(word)) push('var', word);
else push('text', word);
i = j;
continue;
}
// Operators / punctuation.
if ('+-*/%()[]<>=!,&|?:'.includes(ch)) {
push('op', ch);
i += 1;
continue;
}
// Whitespace and everything else.
push('text', ch);
i += 1;
}
return tokens;
}
function render(src: string, inputNames: Set<string>): string {
// A trailing newline is swallowed by <pre>; pad it so the highlight box
// keeps the same height as the textarea while typing a fresh line.
const padded = src.endsWith('\n') ? src + ' ' : src;
const html = tokenize(padded, inputNames)
.map(tok =>
tok.kind === 'text'
? escapeHtml(tok.text)
: `<span class="tok-${tok.kind}">${escapeHtml(tok.text)}</span>`,
)
.join('');
return html;
}
export function create({ textarea, getInputNames, onChange }: JinjaEditorOpts): JinjaEditorHandle {
// Wrap the textarea so the highlight layer can sit directly behind it.
const wrap = document.createElement('div');
wrap.className = 'jinja-editor';
const pre = document.createElement('pre');
pre.className = 'jinja-hl';
pre.setAttribute('aria-hidden', 'true');
const parent = textarea.parentNode;
if (parent) {
parent.insertBefore(wrap, textarea);
wrap.appendChild(pre);
wrap.appendChild(textarea);
}
textarea.classList.add('jinja-input');
textarea.spellcheck = false;
textarea.setAttribute('autocomplete', 'off');
textarea.setAttribute('autocapitalize', 'off');
textarea.setAttribute('autocorrect', 'off');
textarea.setAttribute('wrap', 'off');
const paint = () => {
pre.innerHTML = render(textarea.value, new Set(getInputNames()));
// Keep the highlight scrolled in lock-step with the textarea.
pre.scrollTop = textarea.scrollTop;
pre.scrollLeft = textarea.scrollLeft;
};
const onInput = () => {
paint();
if (onChange) onChange(textarea.value);
};
const onScroll = () => {
pre.scrollTop = textarea.scrollTop;
pre.scrollLeft = textarea.scrollLeft;
};
textarea.addEventListener('input', onInput);
textarea.addEventListener('scroll', onScroll);
paint();
return {
refresh: paint,
destroy: () => {
textarea.removeEventListener('input', onInput);
textarea.removeEventListener('scroll', onScroll);
textarea.classList.remove('jinja-input');
// Restore the textarea to its original place, drop the overlay.
if (wrap.parentNode) {
wrap.parentNode.insertBefore(textarea, wrap);
wrap.remove();
}
},
};
}
@@ -28,6 +28,7 @@ import {
ICON_LED_PREVIEW, ICON_ACTIVITY, ICON_TIMER, ICON_MOVE_VERTICAL, ICON_CLOCK, ICON_LED_PREVIEW, ICON_ACTIVITY, ICON_TIMER, ICON_MOVE_VERTICAL, ICON_CLOCK,
ICON_MUSIC, ICON_TRENDING_UP, ICON_MAP_PIN, ICON_MONITOR, ICON_REFRESH, ICON_TRASH, ICON_MUSIC, ICON_TRENDING_UP, ICON_MAP_PIN, ICON_MONITOR, ICON_REFRESH, ICON_TRASH,
ICON_HOME, ICON_RAINBOW, ICON_LINK, ICON_DROPLETS, ICON_GAMEPAD, ICON_X, ICON_HOME, ICON_RAINBOW, ICON_LINK, ICON_DROPLETS, ICON_GAMEPAD, ICON_X,
ICON_CHECK, ICON_FILE_TEXT,
} from '../core/icons.ts'; } from '../core/icons.ts';
import { wrapCard } from '../core/card-colors.ts'; import { wrapCard } from '../core/card-colors.ts';
import type { ModCardOpts, ModChipOpts } from '../core/mod-card.ts'; import type { ModCardOpts, ModChipOpts } from '../core/mod-card.ts';
@@ -56,9 +57,10 @@ registerIconEntityType('value_source', makeSimpleIconAdapter<any>({
import type { IconSelectItem } from '../core/icon-select.ts'; import type { IconSelectItem } from '../core/icon-select.ts';
import * as P from '../core/icon-paths.ts'; import * as P from '../core/icon-paths.ts';
import { EntitySelect } from '../core/entity-palette.ts'; import { EntitySelect } from '../core/entity-palette.ts';
import * as JinjaEditor from '../core/jinja-editor.ts';
import { loadPictureSources } from './streams.ts'; import { loadPictureSources } from './streams.ts';
import { hexToRgbArray, rgbArrayToHex } from './css-gradient-editor.ts'; import { hexToRgbArray, rgbArrayToHex } from './css-gradient-editor.ts';
import type { ValueSource } from '../types.ts'; import type { ValueSource, TemplateInput } from '../types.ts';
export { getValueSourceIcon }; export { getValueSourceIcon };
@@ -78,6 +80,14 @@ let _vsAnimColorClockEntitySelect: EntitySelect | null = null;
let _vsHTTPEndpointEntitySelect: EntitySelect | null = null; let _vsHTTPEndpointEntitySelect: EntitySelect | null = null;
let _vsTagsInput: TagInput | null = null; let _vsTagsInput: TagInput | null = null;
// ── Template value source editor state ──
// (the bound inputs live in the DOM rows; read them via _readTemplateInputRows)
const _vsTemplateInputSelects = new Map<HTMLElement, EntitySelect>();
let _vsTemplateEditor: JinjaEditor.JinjaEditorHandle | null = null;
let _templateValidateTimer: ReturnType<typeof setTimeout> | null = null;
let _templateValidateGen = 0;
let _vsTemplateInputUid = 0;
class ValueSourceModal extends Modal { class ValueSourceModal extends Modal {
constructor() { super('value-source-modal'); } constructor() { super('value-source-modal'); }
@@ -95,6 +105,13 @@ class ValueSourceModal extends Modal {
if (_vsAnimColorClockEntitySelect) { _vsAnimColorClockEntitySelect.destroy(); _vsAnimColorClockEntitySelect = null; } if (_vsAnimColorClockEntitySelect) { _vsAnimColorClockEntitySelect.destroy(); _vsAnimColorClockEntitySelect = null; }
if (_vsGameIntegrationEntitySelect) { _vsGameIntegrationEntitySelect.destroy(); _vsGameIntegrationEntitySelect = null; } if (_vsGameIntegrationEntitySelect) { _vsGameIntegrationEntitySelect.destroy(); _vsGameIntegrationEntitySelect = null; }
if (_vsHTTPEndpointEntitySelect) { _vsHTTPEndpointEntitySelect.destroy(); _vsHTTPEndpointEntitySelect = null; } if (_vsHTTPEndpointEntitySelect) { _vsHTTPEndpointEntitySelect.destroy(); _vsHTTPEndpointEntitySelect = null; }
// Template editor: destroy all per-row EntitySelect portals + the highlighter
// and cancel any pending validation so stale responses are ignored.
_vsTemplateInputSelects.forEach(sel => sel.destroy());
_vsTemplateInputSelects.clear();
if (_vsTemplateEditor) { _vsTemplateEditor.destroy(); _vsTemplateEditor = null; }
if (_templateValidateTimer) { clearTimeout(_templateValidateTimer); _templateValidateTimer = null; }
_templateValidateGen++;
} }
snapshotValues() { snapshotValues() {
@@ -146,6 +163,11 @@ class ValueSourceModal extends Modal {
httpMin: (document.getElementById('value-source-http-min') as HTMLInputElement | null)?.value || '', httpMin: (document.getElementById('value-source-http-min') as HTMLInputElement | null)?.value || '',
httpMax: (document.getElementById('value-source-http-max') as HTMLInputElement | null)?.value || '', httpMax: (document.getElementById('value-source-http-max') as HTMLInputElement | null)?.value || '',
httpSmoothing: (document.getElementById('value-source-http-smoothing') as HTMLInputElement | null)?.value || '', httpSmoothing: (document.getElementById('value-source-http-smoothing') as HTMLInputElement | null)?.value || '',
// Template value source
template: (document.getElementById('value-source-template-expression') as HTMLTextAreaElement | null)?.value || '',
templateInputs: JSON.stringify(_readTemplateInputRows()),
templateDefault: (document.getElementById('value-source-template-default-value') as HTMLInputElement | null)?.value || '',
templateEvalInterval: (document.getElementById('value-source-template-eval-interval') as HTMLInputElement | null)?.value || '',
}; };
} }
} }
@@ -188,7 +210,7 @@ function _autoGenerateVSName() {
/* ── Icon-grid type selector ──────────────────────────────────── */ /* ── Icon-grid type selector ──────────────────────────────────── */
const VS_FLOAT_TYPE_KEYS = ['static', 'animated', 'audio', 'adaptive_time', 'adaptive_scene', 'daylight', 'ha_entity', 'system_metrics', 'game_event', 'http']; const VS_FLOAT_TYPE_KEYS = ['static', 'animated', 'audio', 'adaptive_time', 'adaptive_scene', 'daylight', 'ha_entity', 'system_metrics', 'game_event', 'http', 'template'];
const VS_COLOR_TYPE_KEYS = ['static_color', 'animated_color', 'adaptive_time_color', 'gradient_map', 'css_extract']; const VS_COLOR_TYPE_KEYS = ['static_color', 'animated_color', 'adaptive_time_color', 'gradient_map', 'css_extract'];
const VS_TYPE_KEYS = [...VS_FLOAT_TYPE_KEYS, ...VS_COLOR_TYPE_KEYS]; const VS_TYPE_KEYS = [...VS_FLOAT_TYPE_KEYS, ...VS_COLOR_TYPE_KEYS];
@@ -633,6 +655,15 @@ export async function showValueSourceModal(editData: any, presetType: any = null
(document.getElementById('value-source-http-min') as HTMLInputElement).value = String(editData.min_value ?? 0); (document.getElementById('value-source-http-min') as HTMLInputElement).value = String(editData.min_value ?? 0);
(document.getElementById('value-source-http-max') as HTMLInputElement).value = String(editData.max_value ?? 100); (document.getElementById('value-source-http-max') as HTMLInputElement).value = String(editData.max_value ?? 100);
_setSlider('value-source-http-smoothing', editData.smoothing ?? 0); _setSlider('value-source-http-smoothing', editData.smoothing ?? 0);
} else if (editData.source_type === 'template') {
(document.getElementById('value-source-template-expression') as HTMLTextAreaElement).value = editData.template || '';
_setSlider('value-source-template-default-value', editData.default_value ?? 0.0);
(document.getElementById('value-source-template-default-value-display') as HTMLElement).textContent =
parseFloat(String(editData.default_value ?? 0.0)).toFixed(2);
(document.getElementById('value-source-template-eval-interval') as HTMLInputElement).value = String(editData.eval_interval ?? 0);
_ensureTemplateEditor();
_populateTemplateInputsUI(editData.inputs ?? []);
_runTemplateValidationDebounced();
} }
} else { } else {
(document.getElementById('value-source-name') as HTMLInputElement).value = ''; (document.getElementById('value-source-name') as HTMLInputElement).value = '';
@@ -707,6 +738,21 @@ export async function showValueSourceModal(editData: any, presetType: any = null
const httpMax = document.getElementById('value-source-http-max') as HTMLInputElement | null; const httpMax = document.getElementById('value-source-http-max') as HTMLInputElement | null;
if (httpMax) httpMax.value = '100'; if (httpMax) httpMax.value = '100';
_setSlider('value-source-http-smoothing', 0); _setSlider('value-source-http-smoothing', 0);
// Template value source defaults
const tmplExpr = document.getElementById('value-source-template-expression') as HTMLTextAreaElement | null;
if (tmplExpr) tmplExpr.value = '';
_setSlider('value-source-template-default-value', 0.0);
const tmplDefDisp = document.getElementById('value-source-template-default-value-display') as HTMLElement | null;
if (tmplDefDisp) tmplDefDisp.textContent = '0.00';
const tmplEval = document.getElementById('value-source-template-eval-interval') as HTMLInputElement | null;
if (tmplEval) tmplEval.value = '0';
if (presetType === 'template') {
_ensureTemplateEditor();
_populateTemplateInputsUI([]);
_runTemplateValidationDebounced();
} else {
_populateTemplateInputsUI([]);
}
_autoGenerateVSName(); _autoGenerateVSName();
} }
@@ -763,6 +809,19 @@ export function onValueSourceTypeChange() {
// before the integrations tab has been visited. // before the integrations tab has been visited.
httpEndpointsCache.fetch().then(() => _populateVSHTTPEndpointDropdown('')); httpEndpointsCache.fetch().then(() => _populateVSHTTPEndpointDropdown(''));
} }
const templateSec = document.getElementById('value-source-template-section') as HTMLElement | null;
if (templateSec) templateSec.style.display = type === 'template' ? '' : 'none';
if (type === 'template') {
_ensureTemplateEditor();
_runTemplateValidationDebounced();
} else {
// Leaving template type — cancel any pending/in-flight validation so a
// stale response cannot re-disable save on a non-template form, then
// restore the button.
if (_templateValidateTimer) { clearTimeout(_templateValidateTimer); _templateValidateTimer = null; }
_templateValidateGen++;
_setVSSaveEnabled(true);
}
(document.getElementById('value-source-adaptive-range-section') as HTMLElement).style.display = (document.getElementById('value-source-adaptive-range-section') as HTMLElement).style.display =
(type === 'adaptive_time' || type === 'adaptive_scene' || type === 'daylight') ? '' : 'none'; (type === 'adaptive_time' || type === 'adaptive_scene' || type === 'daylight') ? '' : 'none';
@@ -962,6 +1021,26 @@ export async function saveValueSource() {
errorEl.style.display = ''; errorEl.style.display = '';
return; return;
} }
} else if (sourceType === 'template') {
const inputs = _getTemplateInputsFromUI();
if (inputs === null) return; // toast already shown
payload.template = (document.getElementById('value-source-template-expression') as HTMLTextAreaElement).value;
payload.inputs = inputs;
payload.default_value = parseFloat((document.getElementById('value-source-template-default-value') as HTMLInputElement).value) || 0.0;
const evalRaw = (document.getElementById('value-source-template-eval-interval') as HTMLInputElement).value.trim();
payload.eval_interval = evalRaw === '' ? null : (parseFloat(evalRaw) || 0);
if (!payload.template.trim()) {
errorEl.textContent = t('value_source.template.error.invalid_expr');
errorEl.style.display = '';
return;
}
// Final server-side gate: block save when the expression is invalid.
const result = await _validateTemplateRequest(payload.template, inputs, id || undefined);
if (result && result.valid === false) {
errorEl.textContent = (result.errors && result.errors[0]) || result.error || t('value_source.template.error.invalid_expr');
errorEl.style.display = '';
return;
}
} }
try { try {
@@ -1373,6 +1452,7 @@ const VALUE_BADGE: Record<string, string> = {
gradient_map: 'VALUE · MAP', gradient_map: 'VALUE · MAP',
css_extract: 'VALUE · STRIP', css_extract: 'VALUE · STRIP',
system_metrics: 'VALUE · SYS', system_metrics: 'VALUE · SYS',
template: 'VALUE · EXPR',
}; };
function _valueSourceChipsAndExtras(src: ValueSource): { chips: ModChipOpts[]; metaText: string; extra: string } { function _valueSourceChipsAndExtras(src: ValueSource): { chips: ModChipOpts[]; metaText: string; extra: string } {
@@ -1503,6 +1583,16 @@ function _valueSourceChipsAndExtras(src: ValueSource): { chips: ModChipOpts[]; m
const metricLabel = t(`value_source.metric.${(src as any).metric}`) || (src as any).metric; const metricLabel = t(`value_source.metric.${(src as any).metric}`) || (src as any).metric;
chips.push({ icon: ICON_ACTIVITY, text: metricLabel }); chips.push({ icon: ICON_ACTIVITY, text: metricLabel });
metaText = metricLabel; metaText = metricLabel;
} else if (src.source_type === 'template') {
const inputs = (src as any).inputs || [];
const nInputs = inputs.length;
const expr = ((src as any).template || '').trim();
chips.push({
icon: ICON_FILE_TEXT,
text: `${nInputs} ${nInputs === 1 ? t('value_source.template.input_count_one') : t('value_source.template.input_count')}`,
});
if (expr) chips.push({ icon: ICON_ACTIVITY, text: expr.length > 28 ? expr.slice(0, 27) + '…' : expr });
metaText = `${t('value_source.type.template')} · ${nInputs} ${t('value_source.template.input_count')}`;
} }
return { chips, metaText, extra }; return { chips, metaText, extra };
@@ -1970,3 +2060,302 @@ function _populateCSSSourceDropdown(selectedId: string) {
}); });
} }
} }
// ── Template (Jinja expression) helpers ────────────────────────
/** Reserved identifiers an input variable may NOT shadow. */
const TEMPLATE_RESERVED_NAMES = new Set([
'min', 'max', 'abs', 'round', 'clamp', 'raw', 'range', 'dict', 'namespace',
]);
const TEMPLATE_IDENT_RE = /^[A-Za-z_][A-Za-z0-9_]*$/;
/** Lazily attach the syntax-highlighting overlay to the expression textarea. */
function _ensureTemplateEditor() {
const ta = document.getElementById('value-source-template-expression') as HTMLTextAreaElement | null;
if (!ta) return;
if (_vsTemplateEditor) { _vsTemplateEditor.refresh(); return; }
_vsTemplateEditor = JinjaEditor.create({
textarea: ta,
getInputNames: () => _readTemplateInputRows().map(i => i.name).filter(Boolean),
onChange: () => _runTemplateValidationDebounced(),
});
}
/** Bound-input names that are currently valid identifiers (for the highlighter + hints). */
function _currentTemplateVarNames(): string[] {
return _readTemplateInputRows()
.map(i => i.name)
.filter(n => TEMPLATE_IDENT_RE.test(n));
}
/**
* Append one `.template-input-row` (a variable-name field + a value-source
* EntitySelect + a remove button). Invoked from the inline add button.
*/
export function addTemplateInput(name: string = '', vsId: string = '') {
const list = document.getElementById('value-source-template-inputs-list');
if (!list) return;
const row = document.createElement('div');
row.className = 'template-input-row';
const nameInput = document.createElement('input');
nameInput.type = 'text';
nameInput.className = 'template-input-name';
nameInput.placeholder = t('value_source.template.input_name');
nameInput.value = name;
nameInput.spellcheck = false;
nameInput.autocomplete = 'off';
const select = document.createElement('select');
select.id = `value-source-template-input-${++_vsTemplateInputUid}`;
const floatSources = _templateFloatSources();
select.innerHTML = `<option value=""></option>` + floatSources.map(s =>
`<option value="${escapeHtml(s.id)}"${s.id === vsId ? ' selected' : ''}>${escapeHtml(s.name)}</option>`
).join('');
select.value = vsId || '';
const removeBtn = document.createElement('button');
removeBtn.type = 'button';
removeBtn.className = 'btn btn-icon btn-danger btn-sm';
removeBtn.innerHTML = ICON_TRASH;
removeBtn.setAttribute('aria-label', t('common.remove') || 'Remove');
row.appendChild(nameInput);
row.appendChild(select);
row.appendChild(removeBtn);
// Remove any "no inputs yet" placeholder before adding a real row.
const empty = list.querySelector('.template-inputs-empty');
if (empty) empty.remove();
list.appendChild(row);
const es = new EntitySelect({
target: select,
getItems: () => _templateFloatSources().map(s => ({
value: s.id,
label: s.name,
icon: getValueSourceIcon(s.source_type),
desc: s.source_type,
})),
placeholder: t('palette.search'),
allowNone: true,
noneLabel: '—',
onChange: () => _runTemplateValidationDebounced(),
});
_vsTemplateInputSelects.set(select, es);
nameInput.addEventListener('input', () => {
_vsTemplateEditor?.refresh();
_runTemplateValidationDebounced();
});
removeBtn.addEventListener('click', () => {
const sel = _vsTemplateInputSelects.get(select);
if (sel) { sel.destroy(); _vsTemplateInputSelects.delete(select); }
row.remove();
if (!list.querySelector('.template-input-row')) _showTemplateInputsEmpty(list);
_vsTemplateEditor?.refresh();
_renderTemplateHintVars();
_runTemplateValidationDebounced();
});
_renderTemplateHintVars();
}
/** Float value sources eligible to bind, excluding the source being edited. */
function _templateFloatSources(): ValueSource[] {
const selfId = (document.getElementById('value-source-id') as HTMLInputElement).value || undefined;
return _cachedValueSources.filter(v => v.return_type === 'float' && v.id !== selfId);
}
function _showTemplateInputsEmpty(list: HTMLElement) {
const empty = document.createElement('div');
empty.className = 'template-inputs-empty';
empty.textContent = t('value_source.template.inputs.empty');
list.appendChild(empty);
}
/** Raw row read — no validation, no toast (used by snapshot + the live validator). */
function _readTemplateInputRows(): TemplateInput[] {
const rows = document.querySelectorAll('#value-source-template-inputs-list .template-input-row');
const out: TemplateInput[] = [];
rows.forEach(row => {
const name = (row.querySelector('.template-input-name') as HTMLInputElement)?.value.trim() ?? '';
const select = row.querySelector('select') as HTMLSelectElement | null;
out.push({ name, value_source_id: select?.value || '' });
});
return out;
}
/**
* Validated read for save: every name must be a unique, non-reserved, valid
* identifier. Shows a toast and returns null on the first failure.
*/
function _getTemplateInputsFromUI(): TemplateInput[] | null {
const rows = _readTemplateInputRows();
const seen = new Set<string>();
for (const inp of rows) {
if (!inp.name) {
showToast(t('value_source.template.error.missing_input'), 'error');
return null;
}
if (!TEMPLATE_IDENT_RE.test(inp.name)) {
showToast(`${t('value_source.template.error.invalid_name')}: ${inp.name}`, 'error');
return null;
}
if (TEMPLATE_RESERVED_NAMES.has(inp.name)) {
showToast(`${t('value_source.template.error.reserved_name')}: ${inp.name}`, 'error');
return null;
}
if (seen.has(inp.name)) {
showToast(`${t('value_source.template.error.duplicate_name')}: ${inp.name}`, 'error');
return null;
}
seen.add(inp.name);
}
return rows;
}
/** Rebuild the inputs list from scratch (destroying any leaked EntitySelects). */
function _populateTemplateInputsUI(inputs: TemplateInput[]) {
_vsTemplateInputSelects.forEach(sel => sel.destroy());
_vsTemplateInputSelects.clear();
const list = document.getElementById('value-source-template-inputs-list');
if (!list) return;
list.innerHTML = '';
if (inputs.length === 0) {
_showTemplateInputsEmpty(list);
} else {
inputs.forEach(i => addTemplateInput(i.name, i.value_source_id));
}
_vsTemplateEditor?.refresh();
_renderTemplateHintVars();
}
/** Paint the live list of bound variable chips into the hints panel. */
function _renderTemplateHintVars() {
const host = document.getElementById('value-source-template-hint-vars');
if (!host) return;
const names = _currentTemplateVarNames();
if (names.length === 0) {
host.innerHTML = `<span class="jinja-hints-empty">${escapeHtml(t('value_source.template.hints.no_inputs'))}</span>`;
return;
}
host.innerHTML = names
.map(n => `<span class="tok-var-chip">${escapeHtml(n)}</span>`)
.join('');
}
// ── Live validation (debounced + generation-tagged) ────────────
interface TemplateValidateResult {
valid: boolean;
error: string | null;
errors: string[];
warnings: string[];
variables: string[];
}
/** Enable/disable the value-source modal's primary (save) button. */
function _setVSSaveEnabled(enabled: boolean) {
const btn = document.querySelector('#value-source-modal .modal-footer .btn-primary') as HTMLButtonElement | null;
if (btn) btn.disabled = !enabled;
}
function _runTemplateValidationDebounced() {
if (_vsTemplateEditor) _vsTemplateEditor.refresh();
_renderTemplateHintVars();
if (_templateValidateTimer) clearTimeout(_templateValidateTimer);
_templateValidateTimer = setTimeout(() => { void _validateTemplate(); }, 300);
}
/** POST the current expression/inputs to the validate endpoint (no UI side effects). */
async function _validateTemplateRequest(
template: string, inputs: TemplateInput[], id?: string,
): Promise<TemplateValidateResult | null> {
try {
const resp = await fetchWithAuth('/value-sources/validate-template', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ template, inputs, ...(id ? { id } : {}) }),
});
if (!resp.ok) return null;
return await resp.json() as TemplateValidateResult;
} catch {
return null;
}
}
/** Live validator: paints inline error/ok/warn state and gates the save button. */
async function _validateTemplate() {
const editor = document.querySelector('#value-source-template-section .jinja-editor') as HTMLElement | null;
const errEl = document.getElementById('value-source-template-error') as HTMLElement | null;
const okEl = document.getElementById('value-source-template-ok') as HTMLElement | null;
const warnEl = document.getElementById('value-source-template-warn') as HTMLElement | null;
const ta = document.getElementById('value-source-template-expression') as HTMLTextAreaElement | null;
if (!ta) return;
// If the user switched away from template while this was pending, do nothing.
if ((document.getElementById('value-source-type') as HTMLSelectElement).value !== 'template') {
_setVSSaveEnabled(true);
return;
}
const template = ta.value;
// Drop blank-name rows: TemplateInput.name is min_length=1 server-side, so
// posting a half-typed row would 422 and silently blank the live validator.
const inputs = _readTemplateInputRows().filter(i => i.name.trim());
const id = (document.getElementById('value-source-id') as HTMLInputElement).value || undefined;
const gen = ++_templateValidateGen;
const result = await _validateTemplateRequest(template, inputs, id);
if (gen !== _templateValidateGen) return; // a newer request superseded this one
// The user may have switched away from template during the await — never
// touch a non-template form's save state.
if ((document.getElementById('value-source-type') as HTMLSelectElement).value !== 'template') {
_setVSSaveEnabled(true);
return;
}
const setSaveEnabled = _setVSSaveEnabled;
const clearMsgs = () => {
if (errEl) { errEl.style.display = 'none'; errEl.textContent = ''; }
if (okEl) { okEl.style.display = 'none'; okEl.innerHTML = ''; }
if (warnEl) { warnEl.style.display = 'none'; warnEl.textContent = ''; }
};
if (!result) {
// Network/endpoint failure — don't block the user; clear inline state.
editor?.classList.remove('field-invalid');
clearMsgs();
setSaveEnabled(true);
return;
}
clearMsgs();
if (result.valid === false) {
editor?.classList.add('field-invalid');
if (errEl) {
errEl.innerHTML = `${ICON_X}<span></span>`;
const span = errEl.querySelector('span');
if (span) span.textContent = (result.errors && result.errors[0]) || result.error || t('value_source.template.error.invalid_expr');
errEl.style.display = '';
}
setSaveEnabled(false);
return;
}
editor?.classList.remove('field-invalid');
setSaveEnabled(true);
if (okEl && template.trim()) {
okEl.innerHTML = `${ICON_CHECK}<span></span>`;
const span = okEl.querySelector('span');
if (span) span.textContent = t('value_source.template.valid');
okEl.style.display = '';
}
if (warnEl && result.warnings && result.warnings.length > 0) {
warnEl.textContent = result.warnings.join(' · ');
warnEl.style.display = '';
}
}
+1
View File
@@ -336,6 +336,7 @@ startTargetOverlay: (...args: any[]) => any;
removeAnimatedColor: (...args: any[]) => any; removeAnimatedColor: (...args: any[]) => any;
addColorSchedulePoint: (...args: any[]) => any; addColorSchedulePoint: (...args: any[]) => any;
removeColorSchedulePoint: (...args: any[]) => any; removeColorSchedulePoint: (...args: any[]) => any;
addTemplateInput: (...args: any[]) => any;
testValueSource: (...args: any[]) => any; testValueSource: (...args: any[]) => any;
closeTestValueSourceModal: (...args: any[]) => any; closeTestValueSourceModal: (...args: any[]) => any;
+2
View File
@@ -76,6 +76,8 @@ export type {
SystemMetricsValueSource, SystemMetricsValueSource,
GameEventValueSource, GameEventValueSource,
HTTPValueSource, HTTPValueSource,
TemplateValueSource,
TemplateInput,
ValueSource, ValueSource,
ValueSourceListResponse, ValueSourceListResponse,
} from './types/value-source.ts'; } from './types/value-source.ts';
@@ -9,7 +9,12 @@ export type ValueSourceType =
| 'adaptive_time' | 'adaptive_scene' | 'daylight' | 'adaptive_time' | 'adaptive_scene' | 'daylight'
| 'static_color' | 'animated_color' | 'adaptive_time_color' | 'static_color' | 'animated_color' | 'adaptive_time_color'
| 'ha_entity' | 'gradient_map' | 'css_extract' | 'ha_entity' | 'gradient_map' | 'css_extract'
| 'system_metrics' | 'game_event' | 'http'; | 'system_metrics' | 'game_event' | 'http' | 'template';
export interface TemplateInput {
name: string;
value_source_id: string;
}
export interface SchedulePoint { export interface SchedulePoint {
time: string; time: string;
@@ -175,6 +180,15 @@ export interface HTTPValueSource extends ValueSourceBase {
smoothing: number; smoothing: number;
} }
export interface TemplateValueSource extends ValueSourceBase {
source_type: 'template';
return_type: 'float';
template: string;
inputs: TemplateInput[];
default_value: number;
eval_interval?: number | null;
}
export type ValueSource = export type ValueSource =
| StaticValueSource | StaticValueSource
| AnimatedValueSource | AnimatedValueSource
@@ -190,7 +204,8 @@ export type ValueSource =
| CSSExtractValueSource | CSSExtractValueSource
| SystemMetricsValueSource | SystemMetricsValueSource
| GameEventValueSource | GameEventValueSource
| HTTPValueSource; | HTTPValueSource
| TemplateValueSource;
export interface ValueSourceListResponse { export interface ValueSourceListResponse {
sources: ValueSource[]; sources: ValueSource[];
+33
View File
@@ -3073,6 +3073,39 @@
"value_source.http.modulator.hint": "Only used when this source drives brightness or color. Automation rules read the raw extracted value and ignore these settings.", "value_source.http.modulator.hint": "Only used when this source drives brightness or color. Automation rules read the raw extracted value and ignore these settings.",
"value_source.http.endpoint_required": "HTTP endpoint is required", "value_source.http.endpoint_required": "HTTP endpoint is required",
"value_source.http.interval_invalid": "Interval must be at least 1 second", "value_source.http.interval_invalid": "Interval must be at least 1 second",
"value_source.type.template": "Jinja Template",
"value_source.type.template.desc": "Combine bound inputs with a sandboxed Jinja expression to compute a 0-1 value.",
"value_source.template.expression": "Expression:",
"value_source.template.expression.hint": "A sandboxed Jinja expression returning a number. Bound inputs are available by name; use raw[name] for the un-normalized value.",
"value_source.template.expression.placeholder": "clamp((temp - 18) / 10, 0, 1)",
"value_source.template.inputs": "Inputs:",
"value_source.template.inputs.hint": "Bind float value sources to variable names you reference in the expression.",
"value_source.template.inputs.empty": "No inputs yet. Click + to bind a value source.",
"value_source.template.add_input": "+ Add Input",
"value_source.template.input_name": "variable name",
"value_source.template.input_count": "inputs",
"value_source.template.input_count_one": "input",
"value_source.template.default_value": "Default Value:",
"value_source.template.default_value.hint": "Output used when the expression cannot be evaluated (e.g. an input is missing).",
"value_source.template.eval_interval": "Eval Interval (s):",
"value_source.template.eval_interval.hint": "How often to re-evaluate the expression. 0 = every poll (re-evaluate as fast as the inputs update).",
"value_source.template.valid": "Expression is valid",
"value_source.template.hints.title": "Expression help",
"value_source.template.hints.inputs_title": "Bound inputs",
"value_source.template.hints.no_inputs": "No inputs bound yet",
"value_source.template.hints.globals_title": "Globals",
"value_source.template.hints.globals": "min(a, b), max(a, b), abs(x), round(x), clamp(x, lo, hi)",
"value_source.template.hints.raw_title": "Raw values",
"value_source.template.hints.raw": "raw[name] gives the un-normalized value of an input that has one.",
"value_source.template.hints.examples_title": "Examples",
"value_source.template.hints.time": "Tip: for time-of-day logic, bind an Adaptive (Time) or Daylight source as an input.",
"value_source.template.error.invalid_expr": "Expression is invalid",
"value_source.template.error.cycle": "This expression would create a dependency cycle",
"value_source.template.error.missing_input": "Every input needs a variable name",
"value_source.template.error.invalid_name": "Invalid variable name",
"value_source.template.error.reserved_name": "Reserved name cannot be used as an input",
"value_source.template.error.duplicate_name": "Duplicate input name",
"value_source.template.error.unbound": "Expression references an unbound variable",
"automations.rule.http_poll": "HTTP Poll", "automations.rule.http_poll": "HTTP Poll",
"automations.rule.http_poll.desc": "Activate when the latest extracted value from an HTTP value source matches.", "automations.rule.http_poll.desc": "Activate when the latest extracted value from an HTTP value source matches.",
"automations.rule.http_poll.hint": "Compares the latest extracted value against your input. The value source decides what gets extracted (raw body or JSON path).", "automations.rule.http_poll.hint": "Compares the latest extracted value against your input. The value source decides what gets extracted (raw body or JSON path).",
+33
View File
@@ -2755,6 +2755,39 @@
"value_source.http.modulator.hint": "Используется только когда этот источник управляет яркостью или цветом. Правила автоматизации читают извлечённое значение в исходном виде и игнорируют эти настройки.", "value_source.http.modulator.hint": "Используется только когда этот источник управляет яркостью или цветом. Правила автоматизации читают извлечённое значение в исходном виде и игнорируют эти настройки.",
"value_source.http.endpoint_required": "Требуется HTTP-эндпоинт", "value_source.http.endpoint_required": "Требуется HTTP-эндпоинт",
"value_source.http.interval_invalid": "Интервал должен быть не меньше 1 секунды", "value_source.http.interval_invalid": "Интервал должен быть не меньше 1 секунды",
"value_source.type.template": "Шаблон Jinja",
"value_source.type.template.desc": "Объедините привязанные входы в изолированном выражении Jinja для вычисления значения 0-1.",
"value_source.template.expression": "Выражение:",
"value_source.template.expression.hint": "Изолированное выражение Jinja, возвращающее число. Привязанные входы доступны по имени; используйте raw[name] для ненормализованного значения.",
"value_source.template.expression.placeholder": "clamp((temp - 18) / 10, 0, 1)",
"value_source.template.inputs": "Входы:",
"value_source.template.inputs.hint": "Привяжите числовые источники значений к именам переменных, используемым в выражении.",
"value_source.template.inputs.empty": "Пока нет входов. Нажмите +, чтобы привязать источник значений.",
"value_source.template.add_input": "+ Добавить вход",
"value_source.template.input_name": "имя переменной",
"value_source.template.input_count": "входов",
"value_source.template.input_count_one": "вход",
"value_source.template.default_value": "Значение по умолчанию:",
"value_source.template.default_value.hint": "Значение на выходе, когда выражение нельзя вычислить (например, отсутствует вход).",
"value_source.template.eval_interval": "Интервал вычисления (с):",
"value_source.template.eval_interval.hint": "Как часто пересчитывать выражение. 0 = при каждом опросе (так быстро, как обновляются входы).",
"value_source.template.valid": "Выражение корректно",
"value_source.template.hints.title": "Справка по выражению",
"value_source.template.hints.inputs_title": "Привязанные входы",
"value_source.template.hints.no_inputs": "Входы ещё не привязаны",
"value_source.template.hints.globals_title": "Глобальные функции",
"value_source.template.hints.globals": "min(a, b), max(a, b), abs(x), round(x), clamp(x, lo, hi)",
"value_source.template.hints.raw_title": "Исходные значения",
"value_source.template.hints.raw": "raw[name] даёт ненормализованное значение входа, если оно есть.",
"value_source.template.hints.examples_title": "Примеры",
"value_source.template.hints.time": "Совет: для логики времени суток привяжите источник «Адаптивный (время)» или «Дневной цикл» как вход.",
"value_source.template.error.invalid_expr": "Некорректное выражение",
"value_source.template.error.cycle": "Это выражение создало бы циклическую зависимость",
"value_source.template.error.missing_input": "Каждому входу нужно имя переменной",
"value_source.template.error.invalid_name": "Некорректное имя переменной",
"value_source.template.error.reserved_name": "Зарезервированное имя нельзя использовать как вход",
"value_source.template.error.duplicate_name": "Повторяющееся имя входа",
"value_source.template.error.unbound": "Выражение ссылается на непривязанную переменную",
"automations.rule.http_poll": "HTTP-опрос", "automations.rule.http_poll": "HTTP-опрос",
"automations.rule.http_poll.desc": "Срабатывает, когда последнее значение HTTP-источника соответствует условию.", "automations.rule.http_poll.desc": "Срабатывает, когда последнее значение HTTP-источника соответствует условию.",
"automations.rule.http_poll.hint": "Сравнивает последнее извлечённое значение с вашим вводом. Что именно извлекается (тело или JSON-путь), задаётся в источнике-значении.", "automations.rule.http_poll.hint": "Сравнивает последнее извлечённое значение с вашим вводом. Что именно извлекается (тело или JSON-путь), задаётся в источнике-значении.",
+33
View File
@@ -2749,6 +2749,39 @@
"value_source.http.modulator.hint": "仅当此源用于驱动亮度或颜色时使用。自动化规则会直接读取提取的原始值,并忽略这些设置。", "value_source.http.modulator.hint": "仅当此源用于驱动亮度或颜色时使用。自动化规则会直接读取提取的原始值,并忽略这些设置。",
"value_source.http.endpoint_required": "需要 HTTP 端点", "value_source.http.endpoint_required": "需要 HTTP 端点",
"value_source.http.interval_invalid": "间隔至少为 1 秒", "value_source.http.interval_invalid": "间隔至少为 1 秒",
"value_source.type.template": "Jinja 模板",
"value_source.type.template.desc": "使用沙盒化的 Jinja 表达式组合已绑定的输入,计算出 0-1 的值。",
"value_source.template.expression": "表达式:",
"value_source.template.expression.hint": "返回数字的沙盒化 Jinja 表达式。已绑定的输入可按名称使用;使用 raw[name] 获取未归一化的值。",
"value_source.template.expression.placeholder": "clamp((temp - 18) / 10, 0, 1)",
"value_source.template.inputs": "输入:",
"value_source.template.inputs.hint": "将浮点值源绑定到你在表达式中引用的变量名。",
"value_source.template.inputs.empty": "暂无输入。点击 + 绑定一个值源。",
"value_source.template.add_input": "+ 添加输入",
"value_source.template.input_name": "变量名",
"value_source.template.input_count": "个输入",
"value_source.template.input_count_one": "个输入",
"value_source.template.default_value": "默认值:",
"value_source.template.default_value.hint": "当表达式无法求值(例如缺少输入)时使用的输出值。",
"value_source.template.eval_interval": "求值间隔(秒):",
"value_source.template.eval_interval.hint": "重新求值表达式的频率。0 = 每次轮询(随输入更新一样快地重新求值)。",
"value_source.template.valid": "表达式有效",
"value_source.template.hints.title": "表达式帮助",
"value_source.template.hints.inputs_title": "已绑定的输入",
"value_source.template.hints.no_inputs": "尚未绑定输入",
"value_source.template.hints.globals_title": "全局函数",
"value_source.template.hints.globals": "min(a, b), max(a, b), abs(x), round(x), clamp(x, lo, hi)",
"value_source.template.hints.raw_title": "原始值",
"value_source.template.hints.raw": "raw[name] 给出某个输入的未归一化值(如果有)。",
"value_source.template.hints.examples_title": "示例",
"value_source.template.hints.time": "提示:要实现一天中的时间逻辑,请将“自适应(时间)”或“日光周期”源绑定为输入。",
"value_source.template.error.invalid_expr": "表达式无效",
"value_source.template.error.cycle": "该表达式会造成依赖循环",
"value_source.template.error.missing_input": "每个输入都需要一个变量名",
"value_source.template.error.invalid_name": "变量名无效",
"value_source.template.error.reserved_name": "保留名称不能用作输入",
"value_source.template.error.duplicate_name": "输入名称重复",
"value_source.template.error.unbound": "表达式引用了未绑定的变量",
"automations.rule.http_poll": "HTTP 轮询", "automations.rule.http_poll": "HTTP 轮询",
"automations.rule.http_poll.desc": "当 HTTP 值源的最新提取值匹配时激活。", "automations.rule.http_poll.desc": "当 HTTP 值源的最新提取值匹配时激活。",
"automations.rule.http_poll.hint": "将最新的提取值与您的输入进行比较。提取的内容(原始响应体或 JSON 路径)由值源决定。", "automations.rule.http_poll.hint": "将最新的提取值与您的输入进行比较。提取的内容(原始响应体或 JSON 路径)由值源决定。",
@@ -66,6 +66,11 @@ class ValueSource:
"use_real_time": None, "use_real_time": None,
"latitude": None, "latitude": None,
"longitude": None, "longitude": None,
# Template (Jinja expression combinator)
"template": None,
"inputs": None,
"default_value": None,
"eval_interval": None,
} }
if self.icon: if self.icon:
d["icon"] = self.icon d["icon"] = self.icon
@@ -643,6 +648,69 @@ class HTTPValueSource(ValueSource):
) )
def _coerce_float(value, fallback):
"""Best-effort float; returns ``fallback`` on None/non-numeric.
Keeps a tampered DB / buggy migration from dropping the whole row when
BaseSqliteStore's loader swallows a per-row deserialization error.
"""
try:
return float(value)
except (TypeError, ValueError):
return fallback
@dataclass
class TemplateValueSource(ValueSource):
"""Value source that evaluates a sandboxed Jinja expression over the live
values of other value sources (the system's float combinator).
``template`` is a Jinja *expression* (no statements/blocks) evaluated by the
hardened engine in :mod:`ledgrab.utils.template_expr`. Each entry in
``inputs`` binds a variable ``name`` to another value source by id; at
runtime the variable holds that source's normalized ``get_value()`` (0..1)
and ``raw[name]`` holds its un-normalized ``get_raw_value()`` (float) where
the stream exposes one. The callables ``min``/``max``/``abs``/``round``/
``clamp`` are available. The result is coerced to float, NaN/inf rejected,
and clamped to [0, 1]; any error falls back to ``default_value``.
"""
template: str = ""
inputs: List[dict] = field(default_factory=list) # [{name, value_source_id}]
default_value: float = 0.0 # fallback when the expression errors (0.0-1.0)
eval_interval: float | None = None # re-eval throttle (s); None/0 = every poll
def to_dict(self) -> dict:
d = super().to_dict()
d["template"] = self.template
d["inputs"] = [dict(i) for i in self.inputs]
d["default_value"] = self.default_value
d["eval_interval"] = self.eval_interval
d["return_type"] = "float"
return d
@classmethod
def from_dict(cls, data: dict) -> "TemplateValueSource":
common = _parse_common_fields(data)
raw_inputs = data.get("inputs") or []
inputs = [
{
"name": str(i.get("name", "")),
"value_source_id": str(i.get("value_source_id", "")),
}
for i in raw_inputs
if isinstance(i, dict)
]
return cls(
**common,
source_type="template",
template=data.get("template") or "",
inputs=inputs,
default_value=_coerce_float(data.get("default_value"), 0.0),
eval_interval=_coerce_float(data.get("eval_interval"), None),
)
# -- Source type registry -- # -- Source type registry --
# Maps source_type string to its subclass for factory dispatch. # Maps source_type string to its subclass for factory dispatch.
_VALUE_SOURCE_MAP: Dict[str, Type[ValueSource]] = { _VALUE_SOURCE_MAP: Dict[str, Type[ValueSource]] = {
@@ -661,4 +729,5 @@ _VALUE_SOURCE_MAP: Dict[str, Type[ValueSource]] = {
"system_metrics": SystemMetricsValueSource, "system_metrics": SystemMetricsValueSource,
"game_event": GameEventValueSource, "game_event": GameEventValueSource,
"http": HTTPValueSource, "http": HTTPValueSource,
"template": TemplateValueSource,
} }
@@ -40,6 +40,7 @@ from ledgrab.storage.value_source import (
StaticColorValueSource, StaticColorValueSource,
StaticValueSource, StaticValueSource,
SystemMetricsValueSource, SystemMetricsValueSource,
TemplateValueSource,
ValueSource, ValueSource,
_VALUE_SOURCE_MAP, _VALUE_SOURCE_MAP,
) )
@@ -365,6 +366,79 @@ def _build_http(
) )
def _validate_template_inputs(inputs: list | None) -> list:
"""Validate + normalize template input bindings.
Each input is ``{name, value_source_id}``; ``name`` must be a valid,
non-reserved identifier and unique. ``value_source_id`` is *not* required to
resolve (lenient, like gradient_map) — a missing/unknown id just yields a
runtime fallback to ``default_value``.
"""
from ledgrab.utils.template_expr import validate_input_name
result: list = []
seen: set = set()
for item in inputs or []:
if not isinstance(item, dict):
raise ValueError("each template input must be an object with name and value_source_id")
name = str(item.get("name", "")).strip()
vs_id = str(item.get("value_source_id", "")).strip()
validate_input_name(name) # identifier + reserved-name check (raises ValueError)
if name in seen:
raise ValueError(f"duplicate template input name: {name!r}")
seen.add(name)
result.append({"name": name, "value_source_id": vs_id})
return result
def _reject_unbound_template_vars(template: str, inputs: list) -> None:
"""Reject expression variables that aren't bound to an input.
An unbound variable raises ``UndefinedError`` at runtime, so the template
would silently always return ``default_value`` — almost always a typo. The
globals (min/max/abs/round/clamp) and ``raw`` are excluded by
``extract_variables``, so anything left over is genuinely unbound.
"""
from ledgrab.utils.template_expr import extract_variables
declared = {i["name"] for i in inputs}
undeclared = sorted(set(extract_variables(template)) - declared)
if undeclared:
raise ValueError("expression uses unbound variable(s): " + ", ".join(undeclared))
def _build_template(
*,
common: dict,
template: str | None = None,
inputs: list | None = None,
default_value: float | None = None,
eval_interval: float | None = None,
**_,
) -> ValueSource:
from ledgrab.utils.template_expr import validate_template_expression
tpl = (template or "").strip()
if not tpl:
raise ValueError("template expression is required for template type")
validate_template_expression(tpl) # raises ValueError on compile / cost-bomb
clean_inputs = _validate_template_inputs(inputs)
_reject_unbound_template_vars(tpl, clean_inputs)
dv = default_value if default_value is not None else 0.0
if not (0.0 <= dv <= 1.0):
raise ValueError("default_value must be between 0.0 and 1.0")
ei = float(eval_interval) if eval_interval is not None else None
if ei is not None and ei < 0.0:
raise ValueError("eval_interval must be >= 0")
return TemplateValueSource(
**common,
template=tpl,
inputs=clean_inputs,
default_value=dv,
eval_interval=ei,
)
CREATE_BUILDERS: Dict[str, CreateBuilder] = { CREATE_BUILDERS: Dict[str, CreateBuilder] = {
"static": _build_static, "static": _build_static,
"animated": _build_animated, "animated": _build_animated,
@@ -381,6 +455,7 @@ CREATE_BUILDERS: Dict[str, CreateBuilder] = {
"system_metrics": _build_system_metrics, "system_metrics": _build_system_metrics,
"game_event": _build_game_event, "game_event": _build_game_event,
"http": _build_http, "http": _build_http,
"template": _build_template,
} }
@@ -685,6 +760,45 @@ def _apply_http(
source.smoothing = smoothing source.smoothing = smoothing
def _apply_template(
source: TemplateValueSource,
*,
template=None,
inputs=None,
default_value=None,
eval_interval=None,
**_,
) -> None:
from ledgrab.utils.template_expr import validate_template_expression
# Compute the prospective final state and validate it BEFORE mutating, so a
# rejected update never leaves the cached object half-applied. (inputs/
# template may each change independently; unbound vars are checked against
# the combined final state.)
final_template = template.strip() if template is not None else source.template
final_inputs = _validate_template_inputs(inputs) if inputs is not None else source.inputs
if template is not None:
if not final_template:
raise ValueError("template expression cannot be empty")
validate_template_expression(final_template)
if template is not None or inputs is not None:
_reject_unbound_template_vars(final_template, final_inputs)
if template is not None:
source.template = final_template
if inputs is not None:
source.inputs = final_inputs
if default_value is not None:
if not (0.0 <= default_value <= 1.0):
raise ValueError("default_value must be between 0.0 and 1.0")
source.default_value = default_value
if eval_interval is not None:
if eval_interval < 0.0:
raise ValueError("eval_interval must be >= 0")
source.eval_interval = eval_interval
UPDATE_APPLIERS: Dict[str, UpdateApplier] = { UPDATE_APPLIERS: Dict[str, UpdateApplier] = {
"static": _apply_static, "static": _apply_static,
"animated": _apply_animated, "animated": _apply_animated,
@@ -701,6 +815,7 @@ UPDATE_APPLIERS: Dict[str, UpdateApplier] = {
"system_metrics": _apply_system_metrics, "system_metrics": _apply_system_metrics,
"game_event": _apply_game_event, "game_event": _apply_game_event,
"http": _apply_http, "http": _apply_http,
"template": _apply_template,
} }
@@ -12,7 +12,11 @@ from datetime import datetime, timezone
from ledgrab.storage.base_sqlite_store import BaseSqliteStore from ledgrab.storage.base_sqlite_store import BaseSqliteStore
from ledgrab.storage.database import Database from ledgrab.storage.database import Database
from ledgrab.storage.value_source import ValueSource from ledgrab.storage.value_source import (
GradientMapValueSource,
TemplateValueSource,
ValueSource,
)
from ledgrab.storage.value_source_factories import ( from ledgrab.storage.value_source_factories import (
apply_update as _apply_value_source_update, apply_update as _apply_value_source_update,
build_source as _build_value_source, build_source as _build_value_source,
@@ -21,6 +25,11 @@ from ledgrab.utils import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
# Storage-level cap on value-source reference nesting depth. The runtime acquire
# backstop (ValueStreamManager) uses a higher cap so legitimate chains never trip
# it. Color-strip sources cap at 4; value-source chains are flatter combinators.
MAX_VALUE_SOURCE_DEPTH = 8
class ValueSourceStore(BaseSqliteStore[ValueSource]): class ValueSourceStore(BaseSqliteStore[ValueSource]):
"""Persistent storage for value sources.""" """Persistent storage for value sources."""
@@ -67,6 +76,12 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]):
**kwargs, **kwargs,
) )
# Reject over-deep reference chains (cycles are impossible at create:
# the new id is not yet referenceable by anything).
child_ids = self._child_ids_of(source)
if child_ids:
self.validate_nesting(None, child_ids)
# Name-uniqueness happens last so we never burn a uuid on a source # Name-uniqueness happens last so we never burn a uuid on a source
# we end up rejecting AND so the user-facing error precedence # we end up rejecting AND so the user-facing error precedence
# (type errors before name errors) matches the old code's order. # (type errors before name errors) matches the old code's order.
@@ -88,6 +103,13 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]):
""" """
source = self.get(source_id) source = self.get(source_id)
# Cycle/depth guard FIRST — before any field mutation — so a rejection
# never leaves the cached object half-mutated. validate_nesting works
# off the prospective child ids derived from kwargs without applying them.
child_ids = self._prospective_child_ids(source, kwargs)
if child_ids:
self.validate_nesting(source_id, child_ids)
name = kwargs.pop("name", None) name = kwargs.pop("name", None)
if name is not None: if name is not None:
self._check_name_unique(name, exclude_id=source_id) self._check_name_unique(name, exclude_id=source_id)
@@ -119,3 +141,110 @@ class ValueSourceStore(BaseSqliteStore[ValueSource]):
logger.info(f"Updated value source: {source_id}") logger.info(f"Updated value source: {source_id}")
return source return source
# ── Reference graph (cycle / depth / referential integrity) ──────
#
# Value sources may reference other value sources: gradient_map via
# ``value_source_id`` and template via ``inputs[].value_source_id``. (Note:
# css_extract.color_strip_source_id points into the *color strip* store, a
# different graph, so it is intentionally not followed here.) Without a
# guard, a cycle would infinitely recurse in ValueStreamManager.acquire().
@staticmethod
def _child_ids_of(source: ValueSource) -> list[str]:
"""Value-source ids that ``source`` references (gradient_map / template)."""
if isinstance(source, TemplateValueSource):
return [
i["value_source_id"]
for i in source.inputs
if isinstance(i, dict) and i.get("value_source_id")
]
if isinstance(source, GradientMapValueSource):
return [source.value_source_id] if source.value_source_id else []
return []
def _prospective_child_ids(self, source: ValueSource, kwargs: dict) -> list[str]:
"""Child ids the source *would* reference after applying ``kwargs``.
Computed without mutating ``source`` so cycle/depth validation can run
before the update is applied (a raise must not leave a half-mutated
cached object).
"""
if isinstance(source, TemplateValueSource):
inputs = kwargs.get("inputs")
if inputs is None:
inputs = source.inputs
return [
i["value_source_id"]
for i in (inputs or [])
if isinstance(i, dict) and i.get("value_source_id")
]
if isinstance(source, GradientMapValueSource):
vs_id = kwargs.get("value_source_id")
if vs_id is None:
vs_id = source.value_source_id
return [vs_id] if vs_id else []
return []
def get_transitive_dependencies(self, source_id: str) -> set[str]:
"""All value-source ids reachable from ``source_id`` via reference edges."""
seen: set[str] = set()
stack = self._children_of_id(source_id)
while stack:
cid = stack.pop()
if cid in seen:
continue
seen.add(cid)
stack.extend(self._children_of_id(cid))
return seen
def _children_of_id(self, source_id: str) -> list[str]:
try:
src = self.get(source_id)
except Exception:
return []
return self._child_ids_of(src)
def _max_depth(self, ids: list[str], visiting: set[str]) -> int:
"""Longest reference chain length starting at any id in ``ids``."""
best = 0
for cid in ids:
if cid in visiting:
continue # cycle — handled separately; don't recurse forever
try:
src = self.get(cid)
except Exception:
depth = 1 # unresolved leaf still counts as one hop
else:
depth = 1 + self._max_depth(self._child_ids_of(src), visiting | {cid})
best = max(best, depth)
return best
def validate_nesting(self, parent_id: str | None, child_ids: list[str]) -> None:
"""Reject self-reference, circular dependencies, and over-deep nesting.
``parent_id`` is ``None`` at create time a brand-new node has no id, so
no existing source can reference it and a cycle through it is impossible;
only the depth check applies. At update time the cycle check runs too.
"""
if 1 + self._max_depth(child_ids, set()) > MAX_VALUE_SOURCE_DEPTH:
raise ValueError(
f"value source reference chain too deep (max {MAX_VALUE_SOURCE_DEPTH})"
)
if parent_id is None:
return
for cid in child_ids:
if cid == parent_id:
raise ValueError("a value source cannot reference itself")
if parent_id in self.get_transitive_dependencies(cid):
raise ValueError(f"input {cid!r} creates a circular value-source dependency")
def find_referencing_sources(self, source_id: str) -> list[str]:
"""Names of value sources that reference ``source_id`` (for delete-protection)."""
names: list[str] = []
for src in self.get_all():
if src.id == source_id:
continue
if source_id in self._child_ids_of(src):
names.append(src.name)
return names
@@ -62,6 +62,7 @@
<option value="css_extract" data-i18n="value_source.type.css_extract">Strip Extract</option> <option value="css_extract" data-i18n="value_source.type.css_extract">Strip Extract</option>
<option value="system_metrics" data-i18n="value_source.type.system_metrics">System Metrics</option> <option value="system_metrics" data-i18n="value_source.type.system_metrics">System Metrics</option>
<option value="http" data-i18n="value_source.type.http">HTTP Poll</option> <option value="http" data-i18n="value_source.type.http">HTTP Poll</option>
<option value="template" data-i18n="value_source.type.template">Jinja Template</option>
</select> </select>
</div> </div>
@@ -456,6 +457,82 @@
</div> </div>
</div> </div>
<!-- Template (Jinja expression) fields -->
<div id="value-source-template-section" style="display:none">
<div class="form-group">
<div class="label-row">
<label for="value-source-template-expression" data-i18n="value_source.template.expression">Expression:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.template.expression.hint">A sandboxed Jinja expression returning a number. Bound inputs are available by name; use raw[name] for the un-normalized value.</small>
<!-- The highlight overlay is injected around this textarea by jinja-editor.ts -->
<textarea id="value-source-template-expression" rows="3" spellcheck="false"
data-i18n-placeholder="value_source.template.expression.placeholder"
placeholder="clamp((temp - 18) / 10, 0, 1)"></textarea>
<div id="value-source-template-error" class="field-error-msg" style="display:none"></div>
<div id="value-source-template-ok" class="field-ok-msg" style="display:none"></div>
<div id="value-source-template-warn" class="field-warn-msg" style="display:none"></div>
<details class="jinja-hints">
<summary data-i18n="value_source.template.hints.title">Expression help</summary>
<div class="jinja-hints-body">
<div class="jinja-hints-section">
<span class="jinja-hints-section-title" data-i18n="value_source.template.hints.inputs_title">Bound inputs</span>
<div id="value-source-template-hint-vars" class="jinja-hints-vars"></div>
</div>
<div class="jinja-hints-section">
<span class="jinja-hints-section-title" data-i18n="value_source.template.hints.globals_title">Globals</span>
<div data-i18n="value_source.template.hints.globals">min(a, b), max(a, b), abs(x), round(x), clamp(x, lo, hi)</div>
</div>
<div class="jinja-hints-section">
<span class="jinja-hints-section-title" data-i18n="value_source.template.hints.raw_title">Raw values</span>
<div data-i18n="value_source.template.hints.raw">raw[name] gives the un-normalized value of an input that has one.</div>
</div>
<div class="jinja-hints-section">
<span class="jinja-hints-section-title" data-i18n="value_source.template.hints.examples_title">Examples</span>
<ul class="jinja-hints-examples">
<li><code>min(audio * 2, 1)</code></li>
<li><code>clamp((temp - 18) / 10, 0, 1)</code></li>
<li><code>(a + b) / 2</code></li>
</ul>
</div>
<div class="jinja-hints-section">
<small class="jinja-hints-time" data-i18n="value_source.template.hints.time">Tip: for time-of-day logic, bind an Adaptive (Time) or Daylight source as an input.</small>
</div>
</div>
</details>
</div>
<div class="form-group">
<div class="label-row">
<label data-i18n="value_source.template.inputs">Inputs:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.template.inputs.hint">Bind float value sources to variable names you reference in the expression.</small>
<div id="value-source-template-inputs-list" class="template-inputs-list"></div>
<button type="button" class="btn btn-secondary btn-sm" onclick="addTemplateInput()" data-i18n="value_source.template.add_input">+ Add Input</button>
</div>
<div class="form-group">
<div class="label-row">
<label for="value-source-template-default-value"><span data-i18n="value_source.template.default_value">Default Value:</span> <span id="value-source-template-default-value-display">0.00</span></label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.template.default_value.hint">Output used when the expression cannot be evaluated (e.g. an input is missing).</small>
<input type="range" id="value-source-template-default-value" min="0" max="1" step="0.01" value="0"
oninput="document.getElementById('value-source-template-default-value-display').textContent = parseFloat(this.value).toFixed(2)">
</div>
<div class="form-group">
<div class="label-row">
<label for="value-source-template-eval-interval" data-i18n="value_source.template.eval_interval">Eval Interval (s):</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="value_source.template.eval_interval.hint">How often to re-evaluate the expression. 0 = every poll (re-evaluate as fast as the inputs update).</small>
<input type="number" id="value-source-template-eval-interval" min="0" step="0.1" value="0">
</div>
</div>
<!-- CSS Extract fields --> <!-- CSS Extract fields -->
<div id="value-source-css-extract-section" style="display:none"> <div id="value-source-css-extract-section" style="display:none">
<div class="form-group"> <div class="form-group">
+209
View File
@@ -0,0 +1,209 @@
"""Hardened sandboxed-Jinja expression engine for template value sources.
Single source of truth for compiling, validating, and evaluating user-authored
Jinja *expressions* that combine the live values of other value sources into a
single float in [0, 1]. Imported by the storage factory (create/update
validation), the runtime ``TemplateValueStream``, and the validate-template API
route, so the client and server can never disagree about what is valid.
Security model a user-authored expression is attacker-influenceable config
that runs server-side (LAN device, shareable backups), so we layer defenses:
* :class:`~jinja2.sandbox.ImmutableSandboxedEnvironment` blocks ``__class__`` /
``mro`` traversal, mutation, and unsafe attribute/method access.
* ALL default filters and tests are stripped (``|attr``, ``|pprint``, ``|map``,
``|format`` ) and the auto-injected globals (``range``, ``dict``,
``namespace``, ``cycler``, ``lipsum``, ``joiner``) are removed none are
needed for numeric math and several are escape/DoS amplifiers.
* Only five vetted numeric callables are exposed: ``min``, ``max``, ``abs``,
``round``, ``clamp``.
* The evaluation context contains ONLY primitive floats plus a flat dict of
floats (``raw``) never any application object so even a hypothetical
sandbox escape has nothing privileged to pivot to.
* Obvious cost bombs are rejected at validate time: the ``**`` (power) operator
and string/list ``*`` repetition can hang or OOM the interpreter, which
``try/except`` cannot catch.
* The result is coerced to float, NaN/inf are rejected (they are valid floats,
not exceptions, so clamping alone would silently keep them), then clamped.
"""
from __future__ import annotations
import math
import re
from typing import Any
from jinja2 import nodes
from jinja2.exceptions import TemplateError
from jinja2.sandbox import ImmutableSandboxedEnvironment
def clamp(x: float, lo: float = 0.0, hi: float = 1.0) -> float:
"""Clamp ``x`` into ``[lo, hi]`` (defaults to the unit interval)."""
return max(lo, min(hi, x))
# The five callables a template author may use. Kept as the *only* names in the
# environment globals so no other builtin/Jinja helper is reachable.
GLOBALS: dict[str, Any] = {
"min": min,
"max": max,
"abs": abs,
"round": round,
"clamp": clamp,
}
# Input variable names that would shadow a global or the ``raw`` dict (and thus
# silently break the expression) or that Jinja auto-injects. Rejected at save
# time so the user gets a clear error instead of a template that always falls
# back to ``default_value``.
RESERVED_NAMES: frozenset[str] = frozenset(
{*GLOBALS, "raw", "range", "dict", "namespace", "cycler", "lipsum", "joiner"}
)
_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
class TemplateValidationError(ValueError):
"""Raised when a template expression (or an input name) is invalid/unsafe.
Subclasses :class:`ValueError` so existing route handlers that map
``ValueError -> HTTP 400`` surface a clean message automatically.
"""
def _build_env() -> ImmutableSandboxedEnvironment:
env = ImmutableSandboxedEnvironment(autoescape=False)
# Strip the entire default filter/test surface — numeric expressions need
# none of them, and |attr/|pprint/|map/|format are escape/DoS amplifiers.
env.filters.clear()
env.tests.clear()
# Replace the auto-injected globals (range/dict/namespace/cycler/...) with
# only our five vetted callables.
env.globals.clear()
env.globals.update(GLOBALS)
return env
# Module-level shared environment. Compiled expressions and evaluation are
# cheap; the environment itself is immutable config built once.
SANDBOX_ENV = _build_env()
def _clean_jinja_error(exc: TemplateError) -> str:
"""A user-facing one-line message from a Jinja compile error."""
msg = str(exc) or exc.__class__.__name__
return msg.strip()
def _guard_ast(template: str) -> None:
"""Reject cost-bomb / disallowed constructs via the Jinja AST.
Called only on already-compilable single expressions, so wrapping in an
output block is safe (a ``}}`` inside a string literal stays inside it).
"""
try:
tree = SANDBOX_ENV.parse("{{ (" + template + ") }}")
except TemplateError as exc: # pragma: no cover - compile already gated this
raise TemplateValidationError(_clean_jinja_error(exc)) from exc
if next(tree.find_all(nodes.Pow), None) is not None:
raise TemplateValidationError("the '**' (power) operator is not allowed")
if next(tree.find_all((nodes.Filter, nodes.Test)), None) is not None:
raise TemplateValidationError("filters and tests are not allowed")
# Collection literals have no use in a numeric expression and enable a
# memory-bomb via repetition ([0] * 10**8 allocates gigabytes). The only
# collection access we need is raw[...] subscript, which is a Getitem, not a
# literal. Forbid list/tuple/dict literals outright.
if next(tree.find_all((nodes.List, nodes.Tuple, nodes.Dict)), None) is not None:
raise TemplateValidationError("list/tuple/dict literals are not allowed")
# Attribute access (``a.b``) has no use in numeric expressions and is the
# classic sandbox-escape vector (``__class__``, ``.format`` …). Raw values
# are read by subscript (``raw['x']``), which stays allowed.
if next(tree.find_all(nodes.Getattr), None) is not None:
raise TemplateValidationError("attribute access ('.') is not allowed")
# Only the five vetted globals may be called — blocks dict()/namespace()/
# cycler()/etc. at validate time with a clear message (they would also fail
# at runtime since the environment globals are cleared, but failing early is
# better UX and defense in depth).
for call in tree.find_all(nodes.Call):
fn = call.node
if not (isinstance(fn, nodes.Name) and fn.name in GLOBALS):
name = getattr(fn, "name", None) or fn.__class__.__name__
raise TemplateValidationError(
f"only min/max/abs/round/clamp may be called (got {name!r})"
)
# float * float is fine; reject only string/list repetition (the OOM path).
for mul in tree.find_all(nodes.Mul):
for side in (mul.left, mul.right):
if isinstance(side, nodes.Const) and isinstance(side.value, (str, list, tuple)):
raise TemplateValidationError("string/list repetition is not allowed")
def compile_template(template: str):
"""Compile ``template`` to a reusable Jinja ``Expression``.
Raises :class:`TemplateValidationError` if empty, uncompilable, or it uses a
disallowed/cost-bomb construct. Globals (min/max/abs/round/clamp) resolve
from ``SANDBOX_ENV.globals`` at call time, so the returned expression is
invoked with only the data context: ``expr(**ctx)``.
"""
if not template or not template.strip():
raise TemplateValidationError("expression is empty")
try:
expr = SANDBOX_ENV.compile_expression(template)
except TemplateError as exc:
raise TemplateValidationError(_clean_jinja_error(exc)) from exc
_guard_ast(template)
return expr
def validate_template_expression(template: str) -> None:
"""Validate ``template`` (compile + guard); raise on any problem."""
compile_template(template)
def validate_input_name(name: str) -> None:
"""Validate a single template input variable name; raise on any problem."""
if not name or not _IDENT_RE.match(name):
raise TemplateValidationError(f"input name {name!r} is not a valid identifier")
if name in RESERVED_NAMES:
raise TemplateValidationError(f"input name {name!r} is reserved")
def extract_variables(template: str) -> list[str]:
"""Return the free input variables referenced by ``template``.
Excludes the globals and ``raw`` so the validate endpoint reports only the
names the author is expected to bind. Returns ``[]`` if unparsable.
"""
from jinja2 import meta
try:
tree = SANDBOX_ENV.parse("{{ (" + template + ") }}")
except TemplateError:
return []
undeclared = meta.find_undeclared_variables(tree)
return sorted(undeclared - set(GLOBALS) - {"raw"})
def finalize_result(value: Any, default: float, lo: float = 0.0, hi: float = 1.0) -> float:
"""Coerce an evaluated result to a safe float in ``[lo, hi]``.
Non-numeric ``default``; NaN/inf ``default`` (they are valid floats, so
clamping alone would silently keep them); otherwise clamp.
"""
try:
f = float(value)
except (TypeError, ValueError, OverflowError):
# OverflowError: float() of a multi-hundred-digit int (e.g. a chained
# big-int multiply). Treated as "not a usable number" → default.
return default
if math.isnan(f) or math.isinf(f):
return default
return clamp(f, lo, hi)
@@ -0,0 +1,182 @@
"""Tests for template value source API: CRUD, validate-template, delete-protection."""
from unittest.mock import MagicMock
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.routes.value_sources import router
from ledgrab.storage.value_source_store import ValueSourceStore
@pytest.fixture
def _route_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "test.db")
yield db
db.close()
@pytest.fixture
def store(_route_db):
return ValueSourceStore(_route_db)
@pytest.fixture
def client(store):
app = FastAPI()
app.include_router(router)
from ledgrab.api.auth import verify_api_key
app.dependency_overrides[verify_api_key] = lambda: "test-user"
app.dependency_overrides[deps.get_value_source_store] = lambda: store
app.dependency_overrides[deps.get_processor_manager] = lambda: MagicMock()
app.dependency_overrides[deps.get_output_target_store] = lambda: MagicMock(
get_all_targets=lambda: []
)
deps._deps["processor_manager"] = MagicMock()
return TestClient(app, raise_server_exceptions=False)
def _create(client, **over):
body = {
"source_type": "template",
"name": "Combo",
"template": "min(a * 2, 1)",
"inputs": [{"name": "a", "value_source_id": ""}],
"default_value": 0.2,
}
body.update(over)
return client.post("/api/v1/value-sources", json=body)
class TestCRUD:
def test_create_get_list_roundtrip(self, client):
r = _create(client)
assert r.status_code == 201, r.text
body = r.json()
assert body["source_type"] == "template"
assert body["return_type"] == "float"
assert body["template"] == "min(a * 2, 1)"
assert body["inputs"] == [{"name": "a", "value_source_id": ""}]
assert body["default_value"] == 0.2
sid = body["id"]
got = client.get(f"/api/v1/value-sources/{sid}").json()
assert got["template"] == "min(a * 2, 1)"
lst = client.get("/api/v1/value-sources").json()
assert any(s["id"] == sid and s["source_type"] == "template" for s in lst["sources"])
def test_update(self, client):
sid = _create(client).json()["id"]
r = client.put(
f"/api/v1/value-sources/{sid}",
json={"source_type": "template", "template": "clamp(a * 3)"},
)
assert r.status_code == 200, r.text
assert r.json()["template"] == "clamp(a * 3)"
def test_create_compile_error_returns_400(self, client):
r = _create(client, template="a +")
assert r.status_code == 400
def test_create_reserved_name_returns_400(self, client):
r = _create(client, inputs=[{"name": "min", "value_source_id": ""}])
assert r.status_code == 400
class TestDeleteProtection:
def test_delete_blocked_when_referenced(self, client):
base = client.post(
"/api/v1/value-sources",
json={"source_type": "static", "name": "Base", "value": 0.5},
).json()
_create(
client,
name="Uses",
template="b",
inputs=[{"name": "b", "value_source_id": base["id"]}],
)
r = client.delete(f"/api/v1/value-sources/{base['id']}")
assert r.status_code == 400
assert "referenced by" in r.json()["detail"]
class TestValidateEndpoint:
def _validate(self, client, **body):
return client.post("/api/v1/value-sources/validate-template", json=body)
def test_valid_expression(self, client):
r = self._validate(
client,
template="min(a, b)",
inputs=[{"name": "a", "value_source_id": ""}, {"name": "b", "value_source_id": ""}],
)
assert r.status_code == 200
data = r.json()
assert data["valid"] is True
assert set(data["variables"]) == {"a", "b"}
def test_compile_error(self, client):
r = self._validate(client, template="a +", inputs=[])
data = r.json()
assert data["valid"] is False
assert data["error"]
def test_reserved_name(self, client):
r = self._validate(
client, template="min(0,1)", inputs=[{"name": "raw", "value_source_id": ""}]
)
assert r.json()["valid"] is False
def test_missing_input_is_warning_not_error(self, client):
r = self._validate(
client, template="a", inputs=[{"name": "a", "value_source_id": "vs_nope"}]
)
data = r.json()
assert data["valid"] is True
assert data["warnings"]
def test_unbound_variable_is_error(self, client):
# Typo: expression uses 'ha_enti' but the input is named 'ha_entity'.
r = self._validate(
client, template="ha_enti", inputs=[{"name": "ha_entity", "value_source_id": ""}]
)
data = r.json()
assert data["valid"] is False
assert any("unbound" in e for e in data["errors"])
def test_cycle_detected_with_id(self, client):
t1 = _create(client, name="T1", template="clamp(0.5)", inputs=[]).json()
t2 = _create(
client,
name="T2",
template="x",
inputs=[{"name": "x", "value_source_id": t1["id"]}],
).json()
# Editing t1 to point at t2 would close a cycle.
r = self._validate(
client, template="x", inputs=[{"name": "x", "value_source_id": t2["id"]}], id=t1["id"]
)
assert r.json()["valid"] is False
class TestResponseMapCoverage:
def test_template_in_response_map(self):
from ledgrab.api.routes.value_sources import _RESPONSE_MAP
from ledgrab.storage.value_source import TemplateValueSource
assert TemplateValueSource in _RESPONSE_MAP
def test_template_in_all_unions(self):
from ledgrab.api.schemas import value_sources as sch
for union_name in ("ValueSourceResponse", "ValueSourceCreate", "ValueSourceUpdate"):
src = repr(getattr(sch, union_name))
assert "template" in src.lower() or "Template" in src
@@ -0,0 +1,231 @@
"""Tests for TemplateValueStream (the Jinja combinator runtime)."""
from collections import defaultdict
from datetime import datetime, timezone
import pytest
from ledgrab.core.processing.value_stream import (
TemplateValueStream,
ValueStreamManager,
)
from ledgrab.storage.value_source import TemplateValueSource
# --- Fakes for precise control over input values / raw -----------------------
class _FakeStream:
_NO_RAW = object()
def __init__(self, value, raw=_NO_RAW):
self._value = value
self._raw = raw
def get_value(self):
return self._value
# get_raw_value only exists when a raw value was provided
def __getattr__(self, name):
if name == "get_raw_value" and self._raw is not _FakeStream._NO_RAW:
return lambda: self._raw
raise AttributeError(name)
class _FakeVSM:
def __init__(self, streams):
self._streams = streams # id -> _FakeStream
self.refcounts = defaultdict(int)
def acquire(self, vs_id):
self.refcounts[vs_id] += 1
return self._streams[vs_id]
def release(self, vs_id):
self.refcounts[vs_id] -= 1
def _inputs(*pairs):
return [{"name": n, "value_source_id": i} for n, i in pairs]
def _make(template, inputs, streams, default_value=0.0, eval_interval=None):
vsm = _FakeVSM(streams)
stream = TemplateValueStream(
template=template,
inputs=inputs,
default_value=default_value,
eval_interval=eval_interval,
value_stream_manager=vsm,
)
stream.start()
return stream, vsm
class TestEvaluation:
def test_eval_with_inputs(self):
stream, vsm = _make("min(a * 2, 1)", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.3)})
assert vsm.refcounts["vs_a"] == 1
assert stream.get_value() == pytest.approx(0.6)
def test_clamps_out_of_range(self):
stream, _ = _make("a * 10", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.5)})
assert stream.get_value() == 1.0 # 5.0 clamped
def test_two_inputs(self):
stream, _ = _make(
"(a + b) / 2",
_inputs(("a", "vs_a"), ("b", "vs_b")),
{"vs_a": _FakeStream(0.2), "vs_b": _FakeStream(0.8)},
)
assert stream.get_value() == pytest.approx(0.5)
def test_shared_id_single_ref(self):
# Two variables bound to the same source share one acquisition.
stream, vsm = _make(
"min(a + b, 1)",
_inputs(("a", "vs_x"), ("b", "vs_x")),
{"vs_x": _FakeStream(0.3)},
)
assert vsm.refcounts["vs_x"] == 1
assert stream.get_value() == pytest.approx(0.6)
class TestErrorHandling:
def test_div_by_zero_returns_default(self):
stream, _ = _make(
"a / 0", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.5)}, default_value=0.25
)
assert stream.get_value() == 0.25
def test_missing_variable_returns_default(self):
# template references 'b' but only 'a' is bound
stream, _ = _make(
"a + b", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.5)}, default_value=0.1
)
assert stream.get_value() == 0.1
def test_nan_returns_default(self):
stream, _ = _make(
"a - a", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(float("inf"))}, default_value=0.3
)
# inf - inf = nan -> default
assert stream.get_value() == 0.3
def test_invalid_template_uses_default(self):
stream, _ = _make(
"a +", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.5)}, default_value=0.42
)
assert stream.get_value() == 0.42
class TestRawExposure:
def test_raw_present_when_stream_exposes_it(self):
stream, _ = _make(
"raw['t'] / 100",
_inputs(("t", "vs_t")),
{"vs_t": _FakeStream(0.5, raw=42.0)},
)
assert stream.get_value() == pytest.approx(0.42)
def test_raw_absent_without_getter(self):
# input stream has no get_raw_value -> raw['t'] -> None -> error -> default
stream, _ = _make(
"raw['t'] / 100",
_inputs(("t", "vs_t")),
{"vs_t": _FakeStream(0.5)},
default_value=0.2,
)
assert stream.get_value() == 0.2
def test_non_numeric_raw_is_dropped(self):
# raw value is a string -> never crosses into sandbox -> raw['t'] absent
stream, _ = _make(
"raw['t'] / 100",
_inputs(("t", "vs_t")),
{"vs_t": _FakeStream(0.5, raw="playing")},
default_value=0.15,
)
assert stream.get_value() == 0.15
class TestLifecycle:
def test_stop_releases_all(self):
stream, vsm = _make(
"min(a + b, 1)",
_inputs(("a", "vs_a"), ("b", "vs_b")),
{"vs_a": _FakeStream(0.1), "vs_b": _FakeStream(0.2)},
)
stream.stop()
assert vsm.refcounts["vs_a"] == 0
assert vsm.refcounts["vs_b"] == 0
def test_eval_interval_caches(self):
backing = _FakeStream(0.2)
stream, _ = _make("a", _inputs(("a", "vs_a")), {"vs_a": backing}, eval_interval=3600.0)
first = stream.get_value()
backing._value = 0.9 # change the live input
# Cached within the interval -> still the first value.
assert stream.get_value() == pytest.approx(first)
class TestHotUpdate:
def test_swap_input_releases_old_acquires_new(self):
stream, vsm = _make(
"a", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.1), "vs_b": _FakeStream(0.9)}
)
assert vsm.refcounts["vs_a"] == 1
new_src = TemplateValueSource(
id="t1",
name="t",
source_type="template",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
template="a",
inputs=_inputs(("a", "vs_b")),
default_value=0.0,
)
stream.update_source(new_src)
assert vsm.refcounts["vs_a"] == 0 # old released
assert vsm.refcounts["vs_b"] == 1 # new acquired
assert stream.get_value() == pytest.approx(0.9)
def test_rename_keeps_same_source(self):
stream, vsm = _make("a", _inputs(("a", "vs_a")), {"vs_a": _FakeStream(0.7)})
renamed = TemplateValueSource(
id="t1",
name="t",
source_type="template",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
template="b", # variable renamed a -> b, same source id
inputs=_inputs(("b", "vs_a")),
default_value=0.0,
)
stream.update_source(renamed)
assert vsm.refcounts["vs_a"] == 1 # not re-acquired (unchanged id)
assert stream.get_value() == pytest.approx(0.7)
class TestAcquireDepthBackstop:
def test_self_reference_does_not_overflow(self):
"""A cycle that bypassed storage validation must not stack-overflow."""
now = datetime.now(timezone.utc)
src = TemplateValueSource(
id="vs_cycle",
name="cycle",
source_type="template",
created_at=now,
updated_at=now,
template="x",
inputs=_inputs(("x", "vs_cycle")),
default_value=0.0,
)
class _CycleStore:
def get_source(self, vs_id):
return src
manager = ValueStreamManager(value_source_store=_CycleStore())
stream = manager.acquire("vs_cycle") # must terminate, not recurse forever
assert isinstance(stream.get_value(), float)
+50
View File
@@ -0,0 +1,50 @@
"""Demo-seed regression tests (value sources, incl. the template combinator)."""
from ledgrab.core.demo_seed import seed_demo_data
from ledgrab.storage.database import Database
from ledgrab.storage.value_source import StaticValueSource, TemplateValueSource
from ledgrab.storage.value_source_store import ValueSourceStore
def _seed(tmp_path):
db = Database(tmp_path / "demo.db")
seed_demo_data(db)
return db
def test_demo_seeds_template_value_source(tmp_path):
db = _seed(tmp_path)
try:
store = ValueSourceStore(db)
by_id = {s.id: s for s in store.get_all_sources()}
base = by_id["vs_demo0001"]
boost = by_id["vs_demo0002"]
assert isinstance(base, StaticValueSource)
assert isinstance(boost, TemplateValueSource)
assert boost.template == "clamp(level * 1.5)"
assert boost.inputs == [{"name": "level", "value_source_id": "vs_demo0001"}]
# The reference graph is intact and consistent.
assert store.get_transitive_dependencies("vs_demo0002") == {"vs_demo0001"}
assert store.find_referencing_sources("vs_demo0001") == [boost.name]
finally:
db.close()
def test_demo_template_evaluates_through_manager(tmp_path):
"""The seeded template must actually evaluate over its seeded input."""
from ledgrab.core.processing.value_stream import ValueStreamManager
db = _seed(tmp_path)
try:
store = ValueSourceStore(db)
vsm = ValueStreamManager(value_source_store=store)
stream = vsm.acquire("vs_demo0002")
try:
# base level 0.5 -> clamp(0.5 * 1.5) = 0.75
assert abs(stream.get_value() - 0.75) < 1e-6
finally:
vsm.release("vs_demo0002")
finally:
db.close()
@@ -0,0 +1,231 @@
"""Tests for the template value source: model, factory, cycle/depth, refs."""
from datetime import datetime, timezone
import pytest
from ledgrab.storage.value_source import TemplateValueSource
class TestModelRoundTrip:
def _make(self, **over):
now = datetime.now(timezone.utc)
defaults = dict(
id="vs_t1",
name="Combo",
source_type="template",
created_at=now,
updated_at=now,
template="min(a * 2, 1)",
inputs=[{"name": "a", "value_source_id": "vs_a"}],
default_value=0.2,
eval_interval=1.5,
)
defaults.update(over)
return TemplateValueSource(**defaults)
def test_to_from_dict_idempotent(self):
src = self._make()
rebuilt = TemplateValueSource.from_dict(src.to_dict())
assert rebuilt.template == src.template
assert rebuilt.inputs == src.inputs
assert rebuilt.default_value == src.default_value
assert rebuilt.eval_interval == src.eval_interval
assert rebuilt.to_dict()["return_type"] == "float"
def test_old_row_deserializes_with_defaults(self):
"""A row written before template fields existed must load safely."""
now = datetime.now(timezone.utc).isoformat()
src = TemplateValueSource.from_dict(
{
"id": "vs_old",
"name": "Old",
"source_type": "template",
"created_at": now,
"updated_at": now,
}
)
assert src.template == ""
assert src.inputs == []
assert src.default_value == 0.0
assert src.eval_interval is None
def test_dirty_scalars_coerce_to_defaults(self):
"""Non-numeric stored scalars must not drop the whole row on load."""
src = TemplateValueSource.from_dict(
{
"id": "x",
"name": "n",
"source_type": "template",
"template": "a",
"default_value": "not-a-number",
"eval_interval": "bad",
}
)
assert src.default_value == 0.0
assert src.eval_interval is None
def test_inputs_normalized_from_dirty_data(self):
src = TemplateValueSource.from_dict(
{
"id": "x",
"name": "n",
"source_type": "template",
"inputs": [{"name": "a", "value_source_id": "vs_a"}, "junk", {"bad": 1}],
}
)
# non-dict entries dropped; dict entries coerced to {name, value_source_id}
assert src.inputs == [
{"name": "a", "value_source_id": "vs_a"},
{"name": "", "value_source_id": ""},
]
class TestFactoryCreate:
def test_create_valid(self, value_source_store):
src = value_source_store.create_source(
"Combo",
"template",
template="min(a * 2, 1)",
inputs=[{"name": "a", "value_source_id": ""}],
default_value=0.3,
)
assert isinstance(src, TemplateValueSource)
assert src.id.startswith("vs_")
assert src.default_value == 0.3
def test_empty_template_rejected(self, value_source_store):
with pytest.raises(ValueError):
value_source_store.create_source("X", "template", template=" ", inputs=[])
def test_compile_error_rejected(self, value_source_store):
with pytest.raises(ValueError):
value_source_store.create_source("X", "template", template="a +", inputs=[])
def test_cost_bomb_rejected(self, value_source_store):
with pytest.raises(ValueError):
value_source_store.create_source("X", "template", template="10 ** 10", inputs=[])
def test_reserved_input_name_rejected(self, value_source_store):
with pytest.raises(ValueError):
value_source_store.create_source(
"X",
"template",
template="min(0, 1)",
inputs=[{"name": "min", "value_source_id": "vs_a"}],
)
def test_duplicate_input_name_rejected(self, value_source_store):
with pytest.raises(ValueError):
value_source_store.create_source(
"X",
"template",
template="a",
inputs=[
{"name": "a", "value_source_id": "vs_a"},
{"name": "a", "value_source_id": "vs_b"},
],
)
def test_default_value_out_of_range_rejected(self, value_source_store):
with pytest.raises(ValueError):
value_source_store.create_source(
"X",
"template",
template="a",
inputs=[{"name": "a", "value_source_id": ""}],
default_value=5.0,
)
def test_unbound_variable_rejected(self, value_source_store):
# 'ha_enti' is referenced but only 'ha_entity' is bound (typo) → reject.
with pytest.raises(ValueError):
value_source_store.create_source(
"X",
"template",
template="ha_enti",
inputs=[{"name": "ha_entity", "value_source_id": ""}],
)
class TestFactoryUpdate:
def test_partial_update_template_only(self, value_source_store):
src = value_source_store.create_source(
"X",
"template",
template="a",
inputs=[{"name": "a", "value_source_id": ""}],
default_value=0.1,
)
updated = value_source_store.update_source(src.id, template="clamp(a * 3)")
assert updated.template == "clamp(a * 3)"
assert updated.default_value == 0.1 # unchanged
def test_update_invalid_template_rejected(self, value_source_store):
src = value_source_store.create_source("X", "template", template="clamp(0.5)", inputs=[])
with pytest.raises(ValueError):
value_source_store.update_source(src.id, template="a |")
class TestCycleAndDepth:
def test_self_reference_rejected(self, value_source_store):
t = value_source_store.create_source("T", "template", template="clamp(0.5)", inputs=[])
with pytest.raises(ValueError):
value_source_store.update_source(t.id, inputs=[{"name": "x", "value_source_id": t.id}])
def test_circular_reference_rejected(self, value_source_store):
t1 = value_source_store.create_source("T1", "template", template="clamp(0.5)", inputs=[])
t2 = value_source_store.create_source(
"T2",
"template",
template="x",
inputs=[{"name": "x", "value_source_id": t1.id}],
)
# t1 -> t2 -> t1 would be a cycle
with pytest.raises(ValueError):
value_source_store.update_source(
t1.id, inputs=[{"name": "x", "value_source_id": t2.id}]
)
def test_deep_chain_rejected(self, value_source_store):
prev = value_source_store.create_source("L0", "template", template="clamp(0.5)", inputs=[])
created = 1
with pytest.raises(ValueError):
for i in range(1, 12):
node = value_source_store.create_source(
f"L{i}",
"template",
template="x",
inputs=[{"name": "x", "value_source_id": prev.id}],
)
prev = node
created += 1
# Should have rejected before building an unbounded chain.
assert created <= 8
def test_get_transitive_dependencies(self, value_source_store):
leaf = value_source_store.create_source(
"leaf", "template", template="clamp(0.5)", inputs=[]
)
mid = value_source_store.create_source(
"mid", "template", template="x", inputs=[{"name": "x", "value_source_id": leaf.id}]
)
top = value_source_store.create_source(
"top", "template", template="x", inputs=[{"name": "x", "value_source_id": mid.id}]
)
deps = value_source_store.get_transitive_dependencies(top.id)
assert deps == {mid.id, leaf.id}
class TestReferencingSources:
def test_find_referencing_sources(self, value_source_store):
base = value_source_store.create_source("Base", "static", value=0.5)
tmpl = value_source_store.create_source(
"Uses",
"template",
template="b",
inputs=[{"name": "b", "value_source_id": base.id}],
)
refs = value_source_store.find_referencing_sources(base.id)
assert tmpl.name in refs
assert value_source_store.find_referencing_sources(tmpl.id) == []
+136
View File
@@ -0,0 +1,136 @@
"""Tests for the hardened sandboxed-Jinja expression engine."""
import pytest
from ledgrab.utils.template_expr import (
GLOBALS,
RESERVED_NAMES,
TemplateValidationError,
clamp,
compile_template,
extract_variables,
finalize_result,
validate_input_name,
validate_template_expression,
)
class TestCompileAndEval:
def test_basic_eval(self):
assert compile_template("min(a * 2, 1)")(a=0.3, raw={}) == pytest.approx(0.6)
def test_clamp_global(self):
assert compile_template("clamp((t - 18) / 10)")(t=22.5, raw={}) == pytest.approx(0.45)
def test_raw_subscript(self):
assert compile_template("raw['t'] / 100")(raw={"t": 42.0}) == pytest.approx(0.42)
def test_ternary_and_comparison(self):
expr = compile_template("a if a > 0.5 else b")
assert expr(a=0.8, b=0.1, raw={}) == pytest.approx(0.8)
assert expr(a=0.2, b=0.1, raw={}) == pytest.approx(0.1)
def test_all_globals_callable(self):
for tpl in ("min(a, b)", "max(a, b)", "abs(a - b)", "round(a, 1)", "clamp(a)"):
compile_template(tpl)(a=0.4, b=0.6, raw={})
class TestRejections:
@pytest.mark.parametrize(
"tpl",
[
"",
" ",
"a +", # syntax error
"10 ** 3", # power bomb
"'a' * 1000", # string repetition
"a | pprint", # filter
"a is defined", # test
"a.__class__", # attribute access
"raw['s'].format(1)", # str gadget via attribute
"dict(x=1)", # non-global call
"namespace(x=1)",
"range(3)",
"cycler(1, 2)",
"[0] * 1000000", # list-literal repetition (memory bomb)
"(1,) * 1000000", # tuple-literal repetition (memory bomb)
"[1, 2, 3]", # bare list literal
"{1: 2}", # dict literal
],
)
def test_rejected(self, tpl):
with pytest.raises(TemplateValidationError):
validate_template_expression(tpl)
@pytest.mark.parametrize(
"tpl",
[
"min(a * 2, 1)",
"(a + b) / 2",
"clamp((t - 18) / 10, 0, 1)",
"raw['x'] / 100",
"a if a > b else b",
"abs(a - b)",
],
)
def test_accepted(self, tpl):
validate_template_expression(tpl) # must not raise
class TestFinalizeResult:
def test_nan_returns_default(self):
assert finalize_result(float("nan"), 0.25) == 0.25
def test_inf_returns_default(self):
assert finalize_result(float("inf"), 0.25) == 0.25
assert finalize_result(float("-inf"), 0.25) == 0.25
def test_non_numeric_returns_default(self):
assert finalize_result("nope", 0.25) == 0.25
assert finalize_result(None, 0.25) == 0.25
def test_overflow_returns_default(self):
# float() of a multi-hundred-digit int (chained big-int multiply) raises
# OverflowError, not ValueError — must still fall back, not propagate.
assert finalize_result(10**400, 0.25) == 0.25
def test_clamps_to_unit(self):
assert finalize_result(5.0, 0.0) == 1.0
assert finalize_result(-1.0, 0.0) == 0.0
assert finalize_result(0.5, 0.0) == pytest.approx(0.5)
def test_clamp_helper(self):
assert clamp(2.0) == 1.0
assert clamp(-2.0) == 0.0
assert clamp(5.0, 0.0, 10.0) == 5.0
class TestInputNames:
@pytest.mark.parametrize("name", ["audio", "cpu_load", "_x", "Temp2"])
def test_valid(self, name):
validate_input_name(name)
@pytest.mark.parametrize("name", ["", "1bad", "has space", "a-b", "a.b"])
def test_invalid_identifier(self, name):
with pytest.raises(TemplateValidationError):
validate_input_name(name)
@pytest.mark.parametrize("name", sorted(RESERVED_NAMES))
def test_reserved(self, name):
with pytest.raises(TemplateValidationError):
validate_input_name(name)
def test_globals_are_reserved(self):
assert set(GLOBALS).issubset(RESERVED_NAMES)
assert "raw" in RESERVED_NAMES
class TestExtractVariables:
def test_excludes_globals_and_raw(self):
assert extract_variables("min(a, raw['x']) + b") == ["a", "b"]
def test_empty_for_uncompilable(self):
assert extract_variables("a +") == []
def test_constant_expression(self):
assert extract_variables("clamp(0.5)") == []