feat(http-endpoints): introduce HTTP endpoint output target stack

New output kind that POSTs the current strip frame to a user-configured
HTTP endpoint, alongside WLED / MQTT / Hue. Stack mirrors the existing
output-target shape end-to-end: storage model + store, FastAPI router +
Pydantic schemas, JS feature module + modal template, router wiring in
api/__init__.py and the modal include in index.html. Tests cover both
the routes and the store.
This commit is contained in:
2026-05-23 00:47:31 +03:00
parent 06273ba2bc
commit d6cc80074d
10 changed files with 1825 additions and 0 deletions
+2
View File
@@ -27,6 +27,7 @@ from .routes.update import router as update_router
from .routes.assets import router as assets_router
from .routes.home_assistant import router as home_assistant_router
from .routes.mqtt import router as mqtt_router
from .routes.http_endpoints import router as http_endpoints_router
from .routes.game_integration import router as game_integration_router
from .routes.audio_processing_templates import router as audio_processing_templates_router
from .routes.audio_filters import router as audio_filters_router
@@ -59,6 +60,7 @@ router.include_router(update_router)
router.include_router(assets_router)
router.include_router(home_assistant_router)
router.include_router(mqtt_router)
router.include_router(http_endpoints_router)
router.include_router(game_integration_router)
router.include_router(audio_processing_templates_router)
router.include_router(audio_filters_router)
@@ -0,0 +1,270 @@
"""HTTP endpoint routes: CRUD + one-shot test."""
import json
from fastapi import APIRouter, Depends, HTTPException
from ledgrab.api.auth import AuthRequired
from ledgrab.api.dependencies import (
fire_entity_event,
get_http_endpoint_store,
)
from ledgrab.api.schemas.http_endpoints import (
HTTPEndpointCreate,
HTTPEndpointListResponse,
HTTPEndpointResponse,
HTTPEndpointUpdate,
HTTPTestRequest,
HTTPTestResponse,
)
from ledgrab.storage.base_store import EntityNotFoundError
from ledgrab.storage.http_endpoint import HTTPEndpoint
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.utils import get_logger
from ledgrab.utils.safe_source import safe_request_bounded, validate_polling_url
logger = get_logger(__name__)
router = APIRouter()
def _warn_if_plaintext_token(url: str, auth_token: str, *, action: str) -> None:
"""Log a warning when an auth token would be sent over plaintext http://."""
if auth_token and url.lower().startswith("http://"):
logger.warning(
"HTTP endpoint %s: auth_token will be sent over plaintext http:// to %s. "
"Anyone on the network path can read it. Consider https:// if the "
"target supports TLS.",
action,
url,
)
def _to_response(endpoint: HTTPEndpoint) -> HTTPEndpointResponse:
return HTTPEndpointResponse(
id=endpoint.id,
name=endpoint.name,
url=endpoint.url,
method=endpoint.method,
auth_token_set=bool(endpoint.auth_token),
headers=dict(endpoint.headers),
timeout_s=endpoint.timeout_s,
description=endpoint.description,
tags=endpoint.tags,
icon=getattr(endpoint, "icon", "") or "",
icon_color=getattr(endpoint, "icon_color", "") or "",
created_at=endpoint.created_at,
updated_at=endpoint.updated_at,
)
@router.get(
"/api/v1/http/endpoints",
response_model=HTTPEndpointListResponse,
tags=["HTTP"],
)
async def list_http_endpoints(
_auth: AuthRequired,
store: HTTPEndpointStore = Depends(get_http_endpoint_store),
):
endpoints = store.get_all_endpoints()
return HTTPEndpointListResponse(
endpoints=[_to_response(e) for e in endpoints],
count=len(endpoints),
)
@router.post(
"/api/v1/http/endpoints",
response_model=HTTPEndpointResponse,
status_code=201,
tags=["HTTP"],
)
async def create_http_endpoint(
data: HTTPEndpointCreate,
_auth: AuthRequired,
store: HTTPEndpointStore = Depends(get_http_endpoint_store),
):
validate_polling_url(data.url)
_warn_if_plaintext_token(data.url, data.auth_token, action="create")
try:
endpoint = store.create_endpoint(
name=data.name,
url=data.url,
method=data.method,
auth_token=data.auth_token,
headers=data.headers,
timeout_s=data.timeout_s,
description=data.description,
tags=data.tags,
icon=data.icon,
icon_color=data.icon_color,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
fire_entity_event("http_endpoint", "created", endpoint.id)
return _to_response(endpoint)
@router.get(
"/api/v1/http/endpoints/{endpoint_id}",
response_model=HTTPEndpointResponse,
tags=["HTTP"],
)
async def get_http_endpoint(
endpoint_id: str,
_auth: AuthRequired,
store: HTTPEndpointStore = Depends(get_http_endpoint_store),
):
try:
endpoint = store.get_endpoint(endpoint_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"HTTP endpoint {endpoint_id} not found")
return _to_response(endpoint)
@router.put(
"/api/v1/http/endpoints/{endpoint_id}",
response_model=HTTPEndpointResponse,
tags=["HTTP"],
)
async def update_http_endpoint(
endpoint_id: str,
data: HTTPEndpointUpdate,
_auth: AuthRequired,
store: HTTPEndpointStore = Depends(get_http_endpoint_store),
):
if data.url is not None:
validate_polling_url(data.url)
final_url = data.url
final_token = data.auth_token
if final_url is None or final_token is None:
try:
existing = store.get_endpoint(endpoint_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"HTTP endpoint {endpoint_id} not found")
if final_url is None:
final_url = existing.url
if final_token is None:
final_token = existing.auth_token
_warn_if_plaintext_token(final_url, final_token, action="update")
try:
endpoint = store.update_endpoint(
endpoint_id,
name=data.name,
url=data.url,
method=data.method,
auth_token=data.auth_token,
headers=data.headers,
timeout_s=data.timeout_s,
description=data.description,
tags=data.tags,
icon=data.icon,
icon_color=data.icon_color,
)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"HTTP endpoint {endpoint_id} not found")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
fire_entity_event("http_endpoint", "updated", endpoint.id)
return _to_response(endpoint)
@router.delete(
"/api/v1/http/endpoints/{endpoint_id}",
status_code=204,
tags=["HTTP"],
)
async def delete_http_endpoint(
endpoint_id: str,
_auth: AuthRequired,
store: HTTPEndpointStore = Depends(get_http_endpoint_store),
):
try:
store.delete_endpoint(endpoint_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"HTTP endpoint {endpoint_id} not found")
fire_entity_event("http_endpoint", "deleted", endpoint_id)
async def _run_http_test(
method: str,
url: str,
headers: dict[str, str],
timeout_s: float,
) -> HTTPTestResponse:
"""Shared one-shot fetch + response shaping for both test endpoints."""
try:
status, body_bytes, error = await safe_request_bounded(
method, url, headers=headers, timeout=timeout_s
)
except HTTPException:
raise
except Exception as exc:
return HTTPTestResponse(success=False, error=f"Unexpected error: {type(exc).__name__}")
if error and status == 0:
return HTTPTestResponse(success=False, error=error)
try:
body_text = body_bytes.decode("utf-8")
except UnicodeDecodeError:
body_text = body_bytes.decode("utf-8", errors="replace")
try:
body_json = json.loads(body_text) if body_text else None
except (json.JSONDecodeError, ValueError):
body_json = None
preview = body_text[:500] if body_text else None
is_success = 200 <= status < 300
return HTTPTestResponse(
success=is_success,
status_code=status,
body_preview=preview,
body_json=body_json,
error=None if is_success else f"HTTP {status}",
)
@router.post(
"/api/v1/http/endpoints/test",
response_model=HTTPTestResponse,
tags=["HTTP"],
)
async def test_http_endpoint(
data: HTTPTestRequest,
_auth: AuthRequired,
):
"""One-shot fetch to validate URL + auth before saving."""
headers = dict(data.headers)
if data.auth_token and not any(k.lower() == "authorization" for k in headers):
headers["Authorization"] = f"Bearer {data.auth_token}"
return await _run_http_test(data.method, data.url, headers, data.timeout_s)
@router.post(
"/api/v1/http/endpoints/{endpoint_id}/test",
response_model=HTTPTestResponse,
tags=["HTTP"],
)
async def test_saved_http_endpoint(
endpoint_id: str,
_auth: AuthRequired,
store: HTTPEndpointStore = Depends(get_http_endpoint_store),
):
"""Run the stored endpoint configuration (URL + auth + headers + timeout).
Useful for the "test" button on the endpoint card: avoids the user
having to open the editor and re-enter the auth token (which is
never returned to the client).
"""
try:
endpoint = store.get_endpoint(endpoint_id)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail=f"HTTP endpoint {endpoint_id} not found")
return await _run_http_test(
endpoint.method,
endpoint.url,
endpoint.build_request_headers(),
endpoint.timeout_s,
)
@@ -0,0 +1,166 @@
"""HTTP endpoint schemas (CRUD + one-shot test)."""
import re
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional
from urllib.parse import urlparse
from pydantic import BaseModel, Field, field_validator
# RFC 7230 token chars for header names + reject any control character in values.
_HEADER_NAME_RE = re.compile(r"^[A-Za-z0-9!#$%&'*+\-.^_`|~]+$")
_HEADER_CONTROL_CHARS_RE = re.compile(r"[\x00-\x1f\x7f]")
def _validate_headers(value: Dict[str, str]) -> Dict[str, str]:
"""Reject header names/values that could enable CRLF injection."""
if value is None:
return {}
cleaned: Dict[str, str] = {}
for name, raw in value.items():
if not isinstance(name, str) or not isinstance(raw, str):
raise ValueError(f"Header name/value must be strings: {name!r}")
if not _HEADER_NAME_RE.match(name):
raise ValueError(f"Invalid HTTP header name: {name!r}")
if _HEADER_CONTROL_CHARS_RE.search(raw):
raise ValueError(
f"Invalid HTTP header value for {name!r} (contains control characters)"
)
cleaned[name] = raw
return cleaned
def _validate_url(value: str) -> str:
"""Reject URLs that embed ``user:pass@`` so credentials can't leak
into server logs (e.g. via the plaintext-token warning helper).
Schemes + IP-block checks are enforced later by
:func:`ledgrab.utils.safe_source.validate_polling_url`.
"""
if not value:
return value
parsed = urlparse(value)
if parsed.username is not None or parsed.password is not None:
raise ValueError(
"URL must not embed credentials (user:pass@). "
"Use the auth_token field or a custom Authorization header instead."
)
return value
class HTTPEndpointCreate(BaseModel):
"""Request to create an HTTP endpoint."""
name: str = Field(min_length=1, max_length=100)
url: str = Field(min_length=1, description="http or https URL")
method: Literal["GET", "HEAD"] = "GET"
auth_token: str = Field(
default="",
description=(
"Optional bearer token — sent as 'Authorization: Bearer <token>'. "
"Add a custom Authorization entry in `headers` to override."
),
)
headers: Dict[str, str] = Field(default_factory=dict)
timeout_s: float = Field(default=10.0, gt=0)
description: Optional[str] = Field(None, max_length=500)
tags: List[str] = Field(default_factory=list)
icon: Optional[str] = Field(None, max_length=64)
icon_color: Optional[str] = Field(None, max_length=32)
@field_validator("headers")
@classmethod
def _check_headers(cls, value):
return _validate_headers(value)
@field_validator("url")
@classmethod
def _check_url(cls, value):
return _validate_url(value)
class HTTPEndpointUpdate(BaseModel):
"""Request to update an HTTP endpoint.
All fields optional — ``None`` keeps the existing value. Sending an
empty string for ``auth_token`` CLEARS the stored token; omit the
field (or send ``null``) to keep it.
"""
name: Optional[str] = Field(None, min_length=1, max_length=100)
url: Optional[str] = Field(None, min_length=1)
method: Optional[Literal["GET", "HEAD"]] = None
auth_token: Optional[str] = Field(None, description="null = keep existing; '' = clear.")
headers: Optional[Dict[str, str]] = None
timeout_s: Optional[float] = Field(None, gt=0)
description: Optional[str] = Field(None, max_length=500)
tags: Optional[List[str]] = None
icon: Optional[str] = Field(None, max_length=64)
icon_color: Optional[str] = Field(None, max_length=32)
@field_validator("headers")
@classmethod
def _check_headers(cls, value):
if value is None:
return None
return _validate_headers(value)
@field_validator("url")
@classmethod
def _check_url(cls, value):
if value is None:
return None
return _validate_url(value)
class HTTPEndpointResponse(BaseModel):
"""HTTP endpoint response. Note: ``auth_token`` is NEVER returned —
use ``auth_token_set`` to know whether one is configured."""
id: str
name: str
url: str
method: str
auth_token_set: bool = False
headers: Dict[str, str] = Field(default_factory=dict)
timeout_s: float
description: Optional[str] = None
tags: List[str] = Field(default_factory=list)
icon: Optional[str] = Field(None, max_length=64)
icon_color: Optional[str] = Field(None, max_length=32)
created_at: datetime
updated_at: datetime
class HTTPEndpointListResponse(BaseModel):
endpoints: List[HTTPEndpointResponse]
count: int
class HTTPTestRequest(BaseModel):
"""One-shot test request to validate URL + auth before saving."""
url: str
method: Literal["GET", "HEAD"] = "GET"
auth_token: str = ""
headers: Dict[str, str] = Field(default_factory=dict)
timeout_s: float = Field(default=10.0, gt=0)
@field_validator("headers")
@classmethod
def _check_headers(cls, value):
return _validate_headers(value)
@field_validator("url")
@classmethod
def _check_url(cls, value):
return _validate_url(value)
class HTTPTestResponse(BaseModel):
success: bool
status_code: Optional[int] = None
body_preview: Optional[str] = Field(None, description="First 500 chars of the body")
body_json: Any = None
error: Optional[str] = None
@@ -0,0 +1,618 @@
/**
* HTTP Endpoints — CRUD, test, cards.
*
* An HTTP endpoint is a connection definition only (URL + auth +
* headers + timeout). Polling cadence is owned by HTTPValueSource —
* one endpoint can back many value sources at different intervals.
*
* Structurally mirrors `home-assistant-sources.ts` (HA-source CRUD UI):
* register icon adapter → modal subclass with dirty-check → save/edit/
* clone/delete handlers → card builder → event delegation.
*/
import {
_cachedHTTPEndpoints, httpEndpointsCache,
} from '../core/state.ts';
import { fetchWithAuth, escapeHtml } from '../core/api.ts';
import { t } from '../core/i18n.ts';
import { Modal } from '../core/modal.ts';
import { showToast, showConfirm } from '../core/ui.ts';
import {
ICON_EDIT, ICON_TRASH, ICON_EYE, ICON_EYE_OFF,
ICON_TEST, ICON_OK, ICON_WARNING,
} from '../core/icons.ts';
import * as P from '../core/icon-paths.ts';
import { wrapCard } from '../core/card-colors.ts';
import type { ModCardOpts, ModChipOpts, LedState } from '../core/mod-card.ts';
import { TagInput, renderTagChips } from '../core/tag-input.ts';
import { makeCardIconFields } from '../core/card-icon.ts';
import { registerIconEntityType, makeSimpleIconAdapter } from './icon-picker.ts';
import { IconSelect } from '../core/icon-select.ts';
import type { HTTPEndpoint, HTTPEndpointWritePayload, HTTPMethod, HTTPTestResponse } from '../types.ts';
registerIconEntityType('http_endpoint', makeSimpleIconAdapter<HTTPEndpoint>({
cache: httpEndpointsCache,
endpointPrefix: '/http/endpoints',
reload: async () => {
if (typeof (window as any).loadIntegrations === 'function') {
await (window as any).loadIntegrations();
}
},
typeLabelKey: 'device.icon.entity.http_endpoint',
typeLabelFallback: 'HTTP endpoint',
cardSelectors: (id) => [
`[data-card-section="http-endpoints"] [data-id="${CSS.escape(id)}"]`,
],
}));
const ICON_HTTP = `<svg class="icon" viewBox="0 0 24 24">${P.globe}</svg>`;
// ── Modal ─────────────────────────────────────────────────────
let _httpTagsInput: TagInput | null = null;
let _httpMethodIconSelect: IconSelect | null = null;
/** In-memory headers; mirrored into hidden snapshot field. */
let _httpHeaders: Array<{ name: string; value: string }> = [];
class HTTPEndpointModal extends Modal {
constructor() { super('http-endpoint-modal'); }
onForceClose() {
if (_httpTagsInput) { _httpTagsInput.destroy(); _httpTagsInput = null; }
if (_httpMethodIconSelect) { _httpMethodIconSelect.destroy(); _httpMethodIconSelect = null; }
_httpHeaders = [];
const out = document.getElementById('http-endpoint-test-output');
if (out) { out.innerHTML = ''; out.style.display = 'none'; }
}
snapshotValues() {
// Headers are compared as a sorted snapshot so the dirty-check is
// immune to a backend serialization that emits keys in a different
// order than the user originally entered them (false-positive
// "discard changes?" on reopen of multi-header endpoints).
const headersSnapshot = [..._httpHeaders]
.sort((a, b) => a.name.localeCompare(b.name));
return {
name: (document.getElementById('http-endpoint-name') as HTMLInputElement).value,
url: (document.getElementById('http-endpoint-url') as HTMLInputElement).value,
method: (document.getElementById('http-endpoint-method') as HTMLSelectElement).value,
auth_token: (document.getElementById('http-endpoint-auth-token') as HTMLInputElement).value,
timeout_s: (document.getElementById('http-endpoint-timeout') as HTMLInputElement).value,
description: (document.getElementById('http-endpoint-description') as HTMLInputElement).value,
headers: JSON.stringify(headersSnapshot),
tags: JSON.stringify(_httpTagsInput ? _httpTagsInput.getValue() : []),
};
}
}
const httpEndpointModal = new HTTPEndpointModal();
// ── Method IconSelect (GET / HEAD) ────────────────────────────
function _ensureMethodIconSelect() {
const sel = document.getElementById('http-endpoint-method') as HTMLSelectElement | null;
if (!sel) return;
const items = [
{
value: 'GET',
icon: `<svg class="icon" viewBox="0 0 24 24">${P.download}</svg>`,
label: 'GET',
desc: t('http_endpoint.method.get.desc'),
},
{
value: 'HEAD',
icon: `<svg class="icon" viewBox="0 0 24 24">${P.listChecks}</svg>`,
label: 'HEAD',
desc: t('http_endpoint.method.head.desc'),
},
];
if (_httpMethodIconSelect) { _httpMethodIconSelect.updateItems(items); return; }
_httpMethodIconSelect = new IconSelect({ target: sel, items, columns: 2 });
}
// ── Headers list (key/value rows) ─────────────────────────────
// Visual model mirrors `.group-child-row` (cards.css): bordered rows on
// `--bg-color` with a subtle hover ring, inputs share the same height
// and rounded corners, trash button on the right. Empty state matches
// the `.pp-filter-empty` dashed-border pattern.
function _renderHeaderRows() {
const list = document.getElementById('http-endpoint-headers-list');
if (!list) return;
if (_httpHeaders.length === 0) {
list.innerHTML = `<div class="http-headers-empty">${escapeHtml(t('http_endpoint.headers.empty'))}</div>`;
return;
}
list.innerHTML = _httpHeaders.map((h, idx) => `
<div class="http-header-row" data-idx="${idx}">
<span class="http-header-index" aria-hidden="true">${idx + 1}</span>
<div class="http-header-fields">
<input type="text" class="http-header-name" placeholder="${escapeHtml(t('http_endpoint.headers.name_placeholder'))}" value="${escapeHtml(h.name)}" spellcheck="false" autocomplete="off">
<input type="text" class="http-header-value" placeholder="${escapeHtml(t('http_endpoint.headers.value_placeholder'))}" value="${escapeHtml(h.value)}" spellcheck="false" autocomplete="off">
</div>
<button type="button" class="btn btn-icon btn-secondary http-header-remove" title="${escapeHtml(t('common.delete'))}" aria-label="${escapeHtml(t('common.delete'))}">${ICON_TRASH}</button>
</div>
`).join('');
list.querySelectorAll<HTMLInputElement>('.http-header-name').forEach((inp) => {
inp.addEventListener('input', () => {
const row = inp.closest<HTMLElement>('.http-header-row')!;
const idx = parseInt(row.dataset.idx || '0', 10);
if (_httpHeaders[idx]) _httpHeaders[idx].name = inp.value;
});
});
list.querySelectorAll<HTMLInputElement>('.http-header-value').forEach((inp) => {
inp.addEventListener('input', () => {
const row = inp.closest<HTMLElement>('.http-header-row')!;
const idx = parseInt(row.dataset.idx || '0', 10);
if (_httpHeaders[idx]) _httpHeaders[idx].value = inp.value;
});
});
list.querySelectorAll<HTMLButtonElement>('.http-header-remove').forEach((btn) => {
btn.addEventListener('click', () => {
const row = btn.closest<HTMLElement>('.http-header-row')!;
const idx = parseInt(row.dataset.idx || '0', 10);
_httpHeaders.splice(idx, 1);
_renderHeaderRows();
});
});
}
export function addHTTPEndpointHeader() {
_httpHeaders.push({ name: '', value: '' });
_renderHeaderRows();
// Focus the newest row so the user can immediately start typing.
const list = document.getElementById('http-endpoint-headers-list');
if (list) {
const names = list.querySelectorAll<HTMLInputElement>('.http-header-name');
names[names.length - 1]?.focus();
}
}
// ── Show / Close ──────────────────────────────────────────────
export async function showHTTPEndpointModal(editData: HTTPEndpoint | null = null): Promise<void> {
const isEdit = !!editData;
const titleKey = isEdit ? 'http_endpoint.edit' : 'http_endpoint.add';
document.getElementById('http-endpoint-modal-title')!.innerHTML = `${ICON_HTTP} ${t(titleKey)}`;
(document.getElementById('http-endpoint-id') as HTMLInputElement).value = editData?.id || '';
(document.getElementById('http-endpoint-error') as HTMLElement).style.display = 'none';
const nameInput = document.getElementById('http-endpoint-name') as HTMLInputElement;
const urlInput = document.getElementById('http-endpoint-url') as HTMLInputElement;
const methodSel = document.getElementById('http-endpoint-method') as HTMLSelectElement;
const tokenInput = document.getElementById('http-endpoint-auth-token') as HTMLInputElement;
const timeoutInput = document.getElementById('http-endpoint-timeout') as HTMLInputElement;
const descInput = document.getElementById('http-endpoint-description') as HTMLInputElement;
nameInput.value = editData?.name || '';
urlInput.value = editData?.url || '';
methodSel.value = editData?.method || 'GET';
tokenInput.value = ''; // never expose stored token
tokenInput.type = 'password';
timeoutInput.value = String(editData?.timeout_s ?? 10);
descInput.value = editData?.description || '';
// Headers: snapshot from data into our editable buffer
_httpHeaders = editData?.headers
? Object.entries(editData.headers).map(([name, value]) => ({ name, value }))
: [];
_renderHeaderRows();
_ensureMethodIconSelect();
if (_httpMethodIconSelect) _httpMethodIconSelect.setValue(editData?.method || 'GET');
// Show "leave blank to keep" hint only when editing an endpoint
// that already has a token configured.
const tokenHint = document.getElementById('http-endpoint-token-hint');
if (tokenHint) tokenHint.style.display = (isEdit && editData?.auth_token_set) ? '' : 'none';
// Reveal password toggle
const revealBtn = document.getElementById('http-endpoint-token-reveal');
if (revealBtn) revealBtn.innerHTML = ICON_EYE;
// Inject icon into the inline Test button
const testBtnIcon = document.querySelector('#http-endpoint-test-btn .http-endpoint-test-btn-icon');
if (testBtnIcon) testBtnIcon.innerHTML = ICON_TEST;
// Reset test output
const out = document.getElementById('http-endpoint-test-output');
if (out) { out.innerHTML = ''; out.style.display = 'none'; }
// Tags
if (_httpTagsInput) { _httpTagsInput.destroy(); _httpTagsInput = null; }
_httpTagsInput = new TagInput(
document.getElementById('http-endpoint-tags-container'),
{ placeholder: t('tags.placeholder') }
);
_httpTagsInput.setValue(isEdit ? (editData?.tags || []) : []);
httpEndpointModal.open();
httpEndpointModal.snapshot();
}
export async function closeHTTPEndpointModal(): Promise<void> {
await httpEndpointModal.close();
}
export function toggleHTTPEndpointTokenVisibility() {
const inp = document.getElementById('http-endpoint-auth-token') as HTMLInputElement;
const btn = document.getElementById('http-endpoint-token-reveal');
if (!inp || !btn) return;
if (inp.type === 'password') {
inp.type = 'text';
btn.innerHTML = ICON_EYE_OFF;
} else {
inp.type = 'password';
btn.innerHTML = ICON_EYE;
}
}
// ── Header collection (commits live <input> values to buffer) ─
function _collectHeaders(): Record<string, string> {
// Pull current input values to be safe (in case of paste / IME).
const list = document.getElementById('http-endpoint-headers-list');
if (list) {
list.querySelectorAll<HTMLElement>('.http-header-row').forEach((row) => {
const idx = parseInt(row.dataset.idx || '0', 10);
const name = (row.querySelector('.http-header-name') as HTMLInputElement)?.value || '';
const value = (row.querySelector('.http-header-value') as HTMLInputElement)?.value || '';
if (_httpHeaders[idx]) { _httpHeaders[idx].name = name; _httpHeaders[idx].value = value; }
});
}
const out: Record<string, string> = {};
for (const h of _httpHeaders) {
const name = h.name.trim();
if (!name) continue;
out[name] = h.value;
}
return out;
}
// ── Save ──────────────────────────────────────────────────────
export async function saveHTTPEndpoint(): Promise<void> {
const id = (document.getElementById('http-endpoint-id') as HTMLInputElement).value;
if (httpEndpointModal.closeIfPristine(id)) return;
const name = (document.getElementById('http-endpoint-name') as HTMLInputElement).value.trim();
const url = (document.getElementById('http-endpoint-url') as HTMLInputElement).value.trim();
const method = (document.getElementById('http-endpoint-method') as HTMLSelectElement).value as HTTPMethod;
const token = (document.getElementById('http-endpoint-auth-token') as HTMLInputElement).value;
const timeoutRaw = (document.getElementById('http-endpoint-timeout') as HTMLInputElement).value;
const description = (document.getElementById('http-endpoint-description') as HTMLInputElement).value.trim() || undefined;
const headers = _collectHeaders();
const timeout_s = parseFloat(timeoutRaw);
if (!name) {
httpEndpointModal.showError(t('http_endpoint.error.name_required'));
return;
}
if (!url) {
httpEndpointModal.showError(t('http_endpoint.error.url_required'));
return;
}
if (isNaN(timeout_s) || timeout_s <= 0) {
httpEndpointModal.showError(t('http_endpoint.error.timeout_invalid'));
return;
}
const payload: HTTPEndpointWritePayload = {
name, url, method, headers, timeout_s, description,
tags: _httpTagsInput ? _httpTagsInput.getValue() : [],
};
// Auth token semantics (per backend schema):
// POST — empty string means "no token"
// PUT new — non-empty replaces; empty string CLEARS; omit to KEEP existing.
// For PUT we omit the field unless the user typed something so we don't
// accidentally clear a previously-configured token.
if (!id) {
payload.auth_token = token;
} else if (token) {
payload.auth_token = token;
}
try {
const method = id ? 'PUT' : 'POST';
const url = id ? `/http/endpoints/${id}` : '/http/endpoints';
const resp = await fetchWithAuth(url, {
method,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
if (!resp.ok) {
const err = await resp.json().catch(() => ({}));
throw new Error(err.detail || `HTTP ${resp.status}`);
}
showToast(t(id ? 'http_endpoint.updated' : 'http_endpoint.created'), 'success');
httpEndpointModal.forceClose();
httpEndpointsCache.invalidate();
if (typeof (window as any).loadIntegrations === 'function') await (window as any).loadIntegrations();
} catch (e: any) {
if (e.isAuth) return;
httpEndpointModal.showError(e.message);
}
}
// ── Edit / Clone / Delete ─────────────────────────────────────
export async function editHTTPEndpoint(endpointId: string): Promise<void> {
try {
const resp = await fetchWithAuth(`/http/endpoints/${endpointId}`);
if (!resp.ok) throw new Error(t('http_endpoint.error.load'));
const data: HTTPEndpoint = await resp.json();
await showHTTPEndpointModal(data);
} catch (e: any) {
if (e.isAuth) return;
showToast(e.message, 'error');
}
}
export async function cloneHTTPEndpoint(endpointId: string): Promise<void> {
try {
const resp = await fetchWithAuth(`/http/endpoints/${endpointId}`);
if (!resp.ok) throw new Error(t('http_endpoint.error.load'));
const data = await resp.json();
delete data.id;
data.name = data.name + ' (copy)';
// Cloning never reveals the token — user must re-enter if needed.
data.auth_token_set = false;
await showHTTPEndpointModal(data);
} catch (e: any) {
if (e.isAuth) return;
showToast(e.message, 'error');
}
}
export async function deleteHTTPEndpoint(endpointId: string): Promise<void> {
const confirmed = await showConfirm(t('http_endpoint.delete.confirm'));
if (!confirmed) return;
try {
const resp = await fetchWithAuth(`/http/endpoints/${endpointId}`, { method: 'DELETE' });
if (!resp.ok) {
const err = await resp.json().catch(() => ({}));
throw new Error(err.detail || `HTTP ${resp.status}`);
}
showToast(t('http_endpoint.deleted'), 'success');
httpEndpointsCache.invalidate();
if (typeof (window as any).loadIntegrations === 'function') await (window as any).loadIntegrations();
} catch (e: any) {
if (e.isAuth) return;
showToast(e.message, 'error');
}
}
// ── Test (in-form, pre-save) ──────────────────────────────────
/** Builds a `HTTPTestRequest` from the live form values and renders
* the response inline. Works for both new and edit modes. */
export async function testHTTPEndpoint(): Promise<void> {
const url = (document.getElementById('http-endpoint-url') as HTMLInputElement).value.trim();
const method = (document.getElementById('http-endpoint-method') as HTMLSelectElement).value as HTTPMethod;
const token = (document.getElementById('http-endpoint-auth-token') as HTMLInputElement).value;
const timeoutRaw = (document.getElementById('http-endpoint-timeout') as HTMLInputElement).value;
const headers = _collectHeaders();
const timeout_s = parseFloat(timeoutRaw);
const out = document.getElementById('http-endpoint-test-output');
if (!out) return;
// Render validation errors inline next to the Test button — the
// modal-level banner at the top of the form is invisible from here.
const renderValidationFail = (msg: string): void => {
out.style.display = '';
out.innerHTML = `<div class="http-test-result http-test-fail">
<span class="http-test-badge http-test-badge-fail">${escapeHtml(t('http_endpoint.test.failed'))}</span>
<code class="http-test-error">${escapeHtml(msg)}</code>
</div>`;
};
if (!url) {
renderValidationFail(t('http_endpoint.error.url_required'));
return;
}
if (isNaN(timeout_s) || timeout_s <= 0) {
renderValidationFail(t('http_endpoint.error.timeout_invalid'));
return;
}
const testBtn = document.getElementById('http-endpoint-test-btn');
if (testBtn) testBtn.classList.add('loading');
out.style.display = '';
out.innerHTML = `<div class="http-test-pending">
<span class="http-test-pending-spinner" aria-hidden="true"></span>
<span>${escapeHtml(t('http_endpoint.test.pending'))}</span>
</div>`;
try {
const resp = await fetchWithAuth('/http/endpoints/test', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url, method, auth_token: token, headers, timeout_s }),
});
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const data: HTTPTestResponse = await resp.json();
_renderTestResult(out, data);
} catch (e: any) {
if (e.isAuth) return;
out.innerHTML = _renderTestErrorHtml(e.message);
} finally {
if (testBtn) testBtn.classList.remove('loading');
}
}
function _renderTestErrorHtml(message: string): string {
return `<div class="http-test-result http-test-fail">
<div class="http-test-line">
<span class="http-test-badge http-test-badge-fail">${ICON_WARNING}<span>${escapeHtml(t('http_endpoint.test.failed'))}</span></span>
</div>
<code class="http-test-error">${escapeHtml(message)}</code>
</div>`;
}
function _renderTestResult(out: HTMLElement, data: HTTPTestResponse) {
const statusClass = data.success ? 'http-test-ok' : 'http-test-fail';
const badgeClass = data.success ? 'http-test-badge-ok' : 'http-test-badge-fail';
const badgeIcon = data.success ? ICON_OK : ICON_WARNING;
const badgeText = data.success
? t('http_endpoint.test.success')
: t('http_endpoint.test.failed');
const statusLine = data.status_code != null
? `<span class="http-test-status" title="HTTP ${data.status_code}">${data.status_code}</span>`
: '';
const errLine = data.error && !data.success
? `<code class="http-test-error">${escapeHtml(data.error)}</code>`
: '';
// Show JSON body when available — easier to copy a json_path from than raw text.
let bodyHtml = '';
let bodyLabel = '';
if (data.body_json != null) {
let pretty: string;
try { pretty = JSON.stringify(data.body_json, null, 2); }
catch { pretty = String(data.body_json); }
bodyLabel = `<div class="http-test-body-label">${escapeHtml(t('http_endpoint.test.body.json'))}</div>`;
bodyHtml = `<pre class="http-test-body">${escapeHtml(pretty)}</pre>`;
} else if (data.body_preview) {
bodyLabel = `<div class="http-test-body-label">${escapeHtml(t('http_endpoint.test.body.text'))}</div>`;
bodyHtml = `<pre class="http-test-body">${escapeHtml(data.body_preview)}</pre>`;
}
out.innerHTML = `<div class="http-test-result ${statusClass}">
<div class="http-test-line">
<span class="http-test-badge ${badgeClass}">${badgeIcon}<span>${escapeHtml(badgeText)}</span></span>
${statusLine}
</div>
${errLine}
${bodyLabel}
${bodyHtml}
</div>`;
}
// ── Card-level test (uses stored config, no form needed) ──────
async function _testHTTPEndpointFromCard(endpointId: string): Promise<void> {
try {
const resp = await fetchWithAuth(`/http/endpoints/${endpointId}/test`, { method: 'POST' });
if (!resp.ok) {
const err = await resp.json().catch(() => ({}));
throw new Error(err.detail || `HTTP ${resp.status}`);
}
const data: HTTPTestResponse = await resp.json();
if (data.success) {
const status = data.status_code != null ? ` (${data.status_code})` : '';
showToast(`${t('http_endpoint.test.success')}${status}`, 'success');
} else {
const detail = data.error || `HTTP ${data.status_code ?? '?'}`;
showToast(`${t('http_endpoint.test.failed')}: ${detail}`, 'error');
}
} catch (e: any) {
if (e.isAuth) return;
showToast(`${t('http_endpoint.test.failed')}: ${e.message}`, 'error');
}
}
// ── Card rendering ────────────────────────────────────────────
export function createHTTPEndpointCard(endpoint: HTTPEndpoint) {
const hasAuth = !!endpoint.auth_token_set;
const headerCount = Object.keys(endpoint.headers || {}).length;
const leds: LedState[] = ['on'];
const chips: ModChipOpts[] = [
{
icon: `<svg class="icon" viewBox="0 0 24 24">${P.globe}</svg>`,
text: endpoint.url,
title: endpoint.url,
},
{
icon: `<svg class="icon" viewBox="0 0 24 24">${P.refreshCw}</svg>`,
text: endpoint.method,
},
];
if (hasAuth) {
chips.push({
icon: `<svg class="icon" viewBox="0 0 24 24">${P.lock}</svg>`,
text: t('http_endpoint.auth.set'),
});
}
if (headerCount > 0) {
chips.push({
icon: `<svg class="icon" viewBox="0 0 24 24">${P.listChecks}</svg>`,
text: t('http_endpoint.headers.count').replace('{n}', String(headerCount)),
});
}
const mod: ModCardOpts = {
head: {
badge: { text: 'HTTP · ENDPOINT' },
name: endpoint.name,
metaHtml: escapeHtml(endpoint.url),
leds,
...makeCardIconFields('http_endpoint', endpoint.id, endpoint),
menu: {
duplicateOnclick: `cloneHTTPEndpoint('${endpoint.id}')`,
hideOnclick: `toggleCardHidden('http-endpoints','${endpoint.id}')`,
deleteOnclick: `deleteHTTPEndpoint('${endpoint.id}')`,
},
},
body: {
desc: endpoint.description || undefined,
chips,
},
foot: {
patchState: 'idle',
patchLabel: `${endpoint.method} · ${Math.round(endpoint.timeout_s)}s`,
iconActions: [
{ icon: ICON_TEST, onclick: '', title: t('http_endpoint.test'), dataAttrs: { 'data-action': 'test' } },
{ icon: ICON_EDIT, onclick: '', title: t('common.edit'), dataAttrs: { 'data-action': 'edit' } },
],
},
running: false,
};
const cardHtml = wrapCard({ type: 'template-card', dataAttr: 'data-id', id: endpoint.id, mod });
const tagsHtml = renderTagChips(endpoint.tags);
return tagsHtml ? cardHtml.replace(/<\/div>\s*$/, `${tagsHtml}</div>`) : cardHtml;
}
// ── Event delegation ──────────────────────────────────────────
const _httpEndpointActions: Record<string, (id: string) => void> = {
clone: cloneHTTPEndpoint,
edit: editHTTPEndpoint,
test: _testHTTPEndpointFromCard,
};
export function initHTTPEndpointDelegation(container: HTMLElement): void {
container.addEventListener('click', (e: MouseEvent) => {
const btn = (e.target as HTMLElement).closest<HTMLElement>('[data-action]');
if (!btn) return;
const section = btn.closest<HTMLElement>('[data-card-section="http-endpoints"]');
if (!section) return;
const card = btn.closest<HTMLElement>('[data-id]');
if (!card) return;
const action = btn.dataset.action;
const id = card.getAttribute('data-id');
if (!action || !id) return;
const handler = _httpEndpointActions[action];
if (handler) {
e.stopPropagation();
handler(id);
}
});
}
// ── Expose to global scope for HTML onclick handlers ──────────
window.showHTTPEndpointModal = showHTTPEndpointModal;
window.closeHTTPEndpointModal = closeHTTPEndpointModal;
window.saveHTTPEndpoint = saveHTTPEndpoint;
window.editHTTPEndpoint = editHTTPEndpoint;
window.cloneHTTPEndpoint = cloneHTTPEndpoint;
window.deleteHTTPEndpoint = deleteHTTPEndpoint;
window.testHTTPEndpoint = testHTTPEndpoint;
window.addHTTPEndpointHeader = addHTTPEndpointHeader;
window.toggleHTTPEndpointTokenVisibility = toggleHTTPEndpointTokenVisibility;
+126
View File
@@ -0,0 +1,126 @@
"""HTTP endpoint data model.
An ``HTTPEndpoint`` is a *connection definition*: where to fetch (URL),
how to authenticate (bearer token), what headers to send, and how long
to wait. It owns nothing about *what* to extract or *how often* to poll
— those concerns live on consumers (HTTPValueSource for periodic polling
+ extraction; potentially other consumers in future).
Mirrors the MQTT/HA source pattern (storage/mqtt_source.py,
storage/home_assistant_source.py).
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional
from ledgrab.utils import secret_box
def _parse_common(data: dict) -> dict:
"""Extract common fields from a dict, parsing timestamps."""
created = data.get("created_at", "")
updated = data.get("updated_at", "")
return {
"id": data["id"],
"name": data["name"],
"created_at": (
datetime.fromisoformat(created)
if isinstance(created, str) and created
else datetime.now(timezone.utc)
),
"updated_at": (
datetime.fromisoformat(updated)
if isinstance(updated, str) and updated
else datetime.now(timezone.utc)
),
"description": data.get("description"),
"tags": data.get("tags") or [],
}
@dataclass
class HTTPEndpoint:
"""HTTP endpoint connection configuration."""
id: str
name: str
created_at: datetime
updated_at: datetime
url: str = ""
method: str = "GET" # "GET" | "HEAD"
auth_token: str = "" # convenience: becomes Authorization: Bearer <token>
headers: Dict[str, str] = field(default_factory=dict)
timeout_s: float = 10.0
description: Optional[str] = None
tags: List[str] = field(default_factory=list)
icon: str = ""
icon_color: str = ""
def __post_init__(self) -> None:
# Invariant: ``self.auth_token`` is always plaintext at runtime.
# If a caller constructed this from a raw dict that still holds the
# encrypted envelope, decrypt now so ``build_request_headers``
# doesn't accidentally send ``Authorization: Bearer <envelope>``.
if self.auth_token and secret_box.is_encrypted(self.auth_token):
try:
self.auth_token = secret_box.decrypt(self.auth_token)
except Exception:
self.auth_token = ""
@property
def plaintext_token(self) -> str:
return secret_box.decrypt(self.auth_token) if self.auth_token else ""
def build_request_headers(self) -> Dict[str, str]:
"""Compose the headers actually sent on a fetch.
Order: explicit ``headers`` first, ``Authorization`` derived from
``auth_token`` only if the caller did not already supply one.
The check is case-insensitive so a user-supplied ``authorization``
(lower) wins over the bearer-token shortcut.
"""
result: Dict[str, str] = dict(self.headers)
already_has_auth = any(k.lower() == "authorization" for k in result)
if self.auth_token and not already_has_auth:
result["Authorization"] = f"Bearer {self.auth_token}"
return result
def to_dict(self) -> dict:
# Always persist auth_token in encrypted envelope form. If the field
# already contains an envelope, encrypt() is a no-op.
stored_token = secret_box.encrypt(self.auth_token) if self.auth_token else ""
d = {
"id": self.id,
"name": self.name,
"url": self.url,
"method": self.method,
"auth_token": stored_token,
"headers": dict(self.headers),
"timeout_s": self.timeout_s,
"description": self.description,
"tags": self.tags,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
if self.icon:
d["icon"] = self.icon
if self.icon_color:
d["icon_color"] = self.icon_color
return d
@staticmethod
def from_dict(data: dict) -> "HTTPEndpoint":
common = _parse_common(data)
raw_token = data.get("auth_token", "")
token = secret_box.decrypt(raw_token) if raw_token else ""
return HTTPEndpoint(
**common,
url=data.get("url", ""),
method=data.get("method", "GET"),
auth_token=token,
headers=dict(data.get("headers") or {}),
timeout_s=float(data.get("timeout_s", 10.0)),
icon=data.get("icon", ""),
icon_color=data.get("icon_color", ""),
)
@@ -0,0 +1,147 @@
"""HTTP endpoint storage using SQLite. Mirrors MQTTSourceStore."""
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional
from ledgrab.storage.base_sqlite_store import BaseSqliteStore
from ledgrab.storage.database import Database
from ledgrab.storage.http_endpoint import HTTPEndpoint
from ledgrab.utils import get_logger, secret_box
logger = get_logger(__name__)
class HTTPEndpointStore(BaseSqliteStore[HTTPEndpoint]):
"""Persistent storage for HTTP endpoint connection definitions."""
_table_name = "http_endpoints"
_entity_name = "HTTP endpoint"
def __init__(self, db: Database):
super().__init__(db, HTTPEndpoint.from_dict)
self._migrate_plaintext_tokens()
def _migrate_plaintext_tokens(self) -> None:
"""Encrypt any stored auth tokens still in plaintext at rest."""
migrated = 0
try:
rows = self._db.load_all(self._table_name)
except Exception as exc:
logger.error("Could not inspect rows for HTTP token migration: %s", exc)
return
for row in rows:
sid = row.get("id")
raw = row.get("auth_token", "") or ""
if not sid or not raw:
continue
if secret_box.is_encrypted(raw):
continue
endpoint = self._items.get(sid)
if endpoint is None:
continue
try:
self._save_item(sid, endpoint)
migrated += 1
except Exception as exc:
logger.error("Failed to migrate HTTP token for %s: %s", sid, exc)
if migrated:
logger.warning(
"MIGRATION: encrypted %d plaintext HTTP auth token(s) at rest.",
migrated,
)
# Backward-compatible aliases (mirror MQTT store)
get_all_endpoints = BaseSqliteStore.get_all
get_endpoint = BaseSqliteStore.get
delete_endpoint = BaseSqliteStore.delete
def create_endpoint(
self,
name: str,
url: str,
method: str = "GET",
auth_token: str = "",
headers: Optional[Dict[str, str]] = None,
timeout_s: float = 10.0,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
icon: Optional[str] = None,
icon_color: Optional[str] = None,
) -> HTTPEndpoint:
if not url:
raise ValueError("url is required")
if method not in ("GET", "HEAD"):
raise ValueError(f"Unsupported method: {method!r}. Use GET or HEAD.")
if timeout_s <= 0:
raise ValueError("timeout_s must be > 0")
self._check_name_unique(name)
eid = f"htep_{uuid.uuid4().hex[:8]}"
now = datetime.now(timezone.utc)
endpoint = HTTPEndpoint(
id=eid,
name=name,
created_at=now,
updated_at=now,
url=url,
method=method,
auth_token=auth_token,
headers=dict(headers or {}),
timeout_s=timeout_s,
description=description,
tags=tags or [],
icon=icon or "",
icon_color=icon_color or "",
)
self._items[eid] = endpoint
self._save_item(eid, endpoint)
logger.info(f"Created HTTP endpoint: {name} ({eid})")
return endpoint
def update_endpoint(
self,
endpoint_id: str,
name: Optional[str] = None,
url: Optional[str] = None,
method: Optional[str] = None,
auth_token: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout_s: Optional[float] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
icon: Optional[str] = None,
icon_color: Optional[str] = None,
) -> HTTPEndpoint:
existing = self.get(endpoint_id)
if name is not None and name != existing.name:
self._check_name_unique(name)
if method is not None and method not in ("GET", "HEAD"):
raise ValueError(f"Unsupported method: {method!r}. Use GET or HEAD.")
if timeout_s is not None and timeout_s <= 0:
raise ValueError("timeout_s must be > 0")
updated = HTTPEndpoint(
id=existing.id,
name=name if name is not None else existing.name,
created_at=existing.created_at,
updated_at=datetime.now(timezone.utc),
url=url if url is not None else existing.url,
method=method if method is not None else existing.method,
auth_token=auth_token if auth_token is not None else existing.auth_token,
headers=dict(headers) if headers is not None else dict(existing.headers),
timeout_s=timeout_s if timeout_s is not None else existing.timeout_s,
description=description if description is not None else existing.description,
tags=tags if tags is not None else existing.tags,
icon=icon if icon is not None else existing.icon,
icon_color=icon_color if icon_color is not None else existing.icon_color,
)
self._items[endpoint_id] = updated
self._save_item(endpoint_id, updated)
logger.info(f"Updated HTTP endpoint: {updated.name} ({endpoint_id})")
return updated
+1
View File
@@ -256,6 +256,7 @@
{% include 'modals/weather-source-editor.html' %}
{% include 'modals/ha-source-editor.html' %}
{% include 'modals/mqtt-source-editor.html' %}
{% include 'modals/http-endpoint-editor.html' %}
{% include 'modals/ha-light-editor.html' %}
{% include 'modals/z2m-light-editor.html' %}
{% include 'modals/asset-upload.html' %}
@@ -0,0 +1,142 @@
<!-- HTTP Endpoint Editor Modal — sectioned rack-panel layout
matching the settings-modal-redesign vocabulary. Channels: signal
(identity), cyan (request: URL/method/auth/timeout), amber
(headers), notes (description). Test button lives in the request
section and renders results inline. -->
<div id="http-endpoint-modal" class="modal" role="dialog" aria-modal="true" aria-labelledby="http-endpoint-modal-title">
<div class="modal-content">
<div class="modal-header">
<h2 id="http-endpoint-modal-title" data-i18n="http_endpoint.add">Add HTTP Endpoint</h2>
<button class="modal-close-btn" onclick="closeHTTPEndpointModal()" data-i18n-aria-label="aria.close">&#x2715;</button>
</div>
<div class="modal-body">
<form id="http-endpoint-form" onsubmit="return false;">
<input type="hidden" id="http-endpoint-id">
<div id="http-endpoint-error" class="error-message" style="display: none;"></div>
<!-- ── 01 · IDENTITY ───────────────────────────────── -->
<section class="ds-section" data-ds-key="identity" data-ch="signal">
<div class="ds-section-header">
<span class="ds-section-dot" aria-hidden="true"></span>
<span class="ds-section-title" data-i18n="settings.section.identity">Identity</span>
<span class="ds-section-index" aria-hidden="true">01</span>
</div>
<div class="ds-section-body">
<div class="form-group ds-name-group">
<div class="label-row">
<label for="http-endpoint-name" data-i18n="http_endpoint.name">Name:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="http_endpoint.name.hint">A descriptive name for this endpoint</small>
<input type="text" id="http-endpoint-name" data-i18n-placeholder="http_endpoint.name.placeholder" placeholder="Plex now-playing" required>
<div id="http-endpoint-tags-container"></div>
</div>
</div>
</section>
<!-- ── 02 · REQUEST ────────────────────────────────── -->
<section class="ds-section" data-ds-key="request" data-ch="cyan">
<div class="ds-section-header">
<span class="ds-section-dot" aria-hidden="true"></span>
<span class="ds-section-title" data-i18n="http_endpoint.section.request">Request</span>
<span class="ds-section-index" aria-hidden="true">02</span>
</div>
<div class="ds-section-body">
<div class="form-group">
<div class="label-row">
<label for="http-endpoint-url" data-i18n="http_endpoint.url">URL:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="http_endpoint.url.hint">Full http(s) URL to poll. Local addresses are allowed.</small>
<input type="text" id="http-endpoint-url" placeholder="https://192.168.1.100/api/status" required>
</div>
<div class="form-group">
<div class="label-row">
<label for="http-endpoint-method" data-i18n="http_endpoint.method">Method:</label>
</div>
<select id="http-endpoint-method">
<option value="GET">GET</option>
<option value="HEAD">HEAD</option>
</select>
</div>
<div class="form-group">
<div class="label-row">
<label for="http-endpoint-auth-token" data-i18n="http_endpoint.auth_token">Auth Token (optional):</label>
<button type="button" class="btn btn-icon btn-secondary" id="http-endpoint-token-reveal" onclick="toggleHTTPEndpointTokenVisibility()" data-i18n-title="http_endpoint.auth_token.reveal" title="Show/hide"></button>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="http_endpoint.auth_token.hint">Sent as 'Authorization: Bearer &lt;token&gt;'. Add a custom Authorization header to override.</small>
<small id="http-endpoint-token-hint" class="input-hint" style="display:none" data-i18n="http_endpoint.auth_token.edit_hint">Leave blank to keep the current token</small>
<input type="password" id="http-endpoint-auth-token" placeholder="Bearer token" autocomplete="new-password">
</div>
<div class="form-group">
<div class="label-row">
<label for="http-endpoint-timeout" data-i18n="http_endpoint.timeout">Timeout (s):</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="http_endpoint.timeout.hint">Maximum seconds to wait for a single request.</small>
<input type="number" id="http-endpoint-timeout" min="0.5" step="0.5" value="10">
</div>
<div class="form-group http-endpoint-test-group">
<button type="button" class="btn btn-secondary http-endpoint-test-btn" id="http-endpoint-test-btn" onclick="testHTTPEndpoint()">
<span class="http-endpoint-test-btn-icon" aria-hidden="true"></span>
<span data-i18n="http_endpoint.test">Test request</span>
</button>
<div id="http-endpoint-test-output" class="http-test-output" style="display:none"></div>
</div>
</div>
</section>
<!-- ── 03 · HEADERS ────────────────────────────────── -->
<section class="ds-section" data-ds-key="headers" data-ch="amber">
<div class="ds-section-header">
<span class="ds-section-dot" aria-hidden="true"></span>
<span class="ds-section-title" data-i18n="http_endpoint.section.headers">Headers</span>
<span class="ds-section-index" aria-hidden="true">03</span>
</div>
<div class="ds-section-body">
<div class="form-group">
<div class="label-row">
<label data-i18n="http_endpoint.headers">Custom Headers:</label>
<button type="button" class="hint-toggle" onclick="toggleHint(this)" title="?" data-i18n-aria-label="aria.hint">?</button>
</div>
<small class="input-hint" style="display:none" data-i18n="http_endpoint.headers.hint">Optional request headers (e.g. X-API-Key, Accept).</small>
<div id="http-endpoint-headers-list" class="http-headers-list"></div>
<button type="button" class="btn btn-secondary btn-add-header" onclick="addHTTPEndpointHeader()">
<span class="btn-add-header-icon" aria-hidden="true">+</span>
<span data-i18n="http_endpoint.headers.add">Add header</span>
</button>
</div>
</div>
</section>
<!-- ── 04 · NOTES ──────────────────────────────────── -->
<section class="ds-section" data-ds-key="notes" data-ch="amber">
<div class="ds-section-header">
<span class="ds-section-dot" aria-hidden="true"></span>
<span class="ds-section-title" data-i18n="settings.section.notes">Notes</span>
<span class="ds-section-index" aria-hidden="true">04</span>
</div>
<div class="ds-section-body">
<div class="form-group">
<div class="label-row">
<label for="http-endpoint-description" data-i18n="http_endpoint.description">Description (optional):</label>
</div>
<input type="text" id="http-endpoint-description" placeholder="">
</div>
</div>
</section>
</form>
</div>
<div class="modal-footer">
<button class="btn btn-icon btn-secondary" onclick="closeHTTPEndpointModal()" title="Cancel" data-i18n-title="settings.button.cancel" data-i18n-aria-label="aria.cancel">&#x2715;</button>
<button class="btn btn-icon btn-primary" onclick="saveHTTPEndpoint()" title="Save" data-i18n-title="settings.button.save" data-i18n-aria-label="aria.save">&#x2713;</button>
</div>
</div>
</div>
@@ -0,0 +1,185 @@
"""Tests for HTTP endpoint routes — CRUD, no-leak, /test, LAN policy."""
from unittest.mock import MagicMock
import httpx
import pytest
import respx
from fastapi import FastAPI
from fastapi.testclient import TestClient
from ledgrab.api import dependencies as deps
from ledgrab.api.routes.http_endpoints import router
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
@pytest.fixture
def _route_db(tmp_path):
from ledgrab.storage.database import Database
db = Database(tmp_path / "test.db")
yield db
db.close()
@pytest.fixture
def http_store(_route_db):
return HTTPEndpointStore(_route_db)
@pytest.fixture
def client(http_store):
app = FastAPI()
app.include_router(router)
from ledgrab.api.auth import verify_api_key
app.dependency_overrides[verify_api_key] = lambda: "test-user"
app.dependency_overrides[deps.get_http_endpoint_store] = lambda: http_store
# Stub fire_entity_event's processor_manager
deps._deps["processor_manager"] = MagicMock()
return TestClient(app, raise_server_exceptions=False)
# ---------------------------------------------------------------------------
# CRUD shape + no-leak guarantee
# ---------------------------------------------------------------------------
class TestCRUDRoutes:
def test_create_and_list(self, client):
r = client.post(
"/api/v1/http/endpoints",
json={
"name": "Plex",
"url": "https://example.com/status/sessions",
"auth_token": "very-secret",
"headers": {"Accept": "application/json"},
},
)
assert r.status_code == 201
body = r.json()
assert body["id"].startswith("htep_")
assert body["auth_token_set"] is True
assert "very-secret" not in r.text
lst = client.get("/api/v1/http/endpoints").json()
assert lst["count"] == 1
def test_response_never_exposes_auth_token(self, client):
"""Defence-in-depth: scan every CRUD response for the token string."""
token = "uniq-token-12345-no-collisions"
post = client.post(
"/api/v1/http/endpoints",
json={
"name": "x",
"url": "https://example.com",
"auth_token": token,
},
)
assert post.status_code == 201
endpoint_id = post.json()["id"]
get = client.get(f"/api/v1/http/endpoints/{endpoint_id}")
put = client.put(f"/api/v1/http/endpoints/{endpoint_id}", json={"name": "renamed"})
lst = client.get("/api/v1/http/endpoints")
for resp in (post, get, put, lst):
assert token not in resp.text, f"token leaked in {resp.url}"
def test_method_allowlist_at_schema_layer(self, client):
r = client.post(
"/api/v1/http/endpoints",
json={"name": "x", "url": "https://example.com", "method": "POST"},
)
assert r.status_code == 422
def test_crlf_in_header_rejected(self, client):
r = client.post(
"/api/v1/http/endpoints",
json={
"name": "x",
"url": "https://example.com",
"headers": {"X-Inject": "value\r\nX-Smuggle: yes"},
},
)
assert r.status_code == 422
def test_invalid_header_name_rejected(self, client):
r = client.post(
"/api/v1/http/endpoints",
json={
"name": "x",
"url": "https://example.com",
"headers": {"Bad Name With Space": "v"},
},
)
assert r.status_code == 422
# ---------------------------------------------------------------------------
# /test one-shot endpoint + LAN policy
# ---------------------------------------------------------------------------
class TestOneShotEndpoint:
@respx.mock
def test_test_success(self, client):
respx.get("https://example.com/status").mock(
return_value=httpx.Response(200, json={"ok": True})
)
r = client.post(
"/api/v1/http/endpoints/test",
json={"url": "https://example.com/status"},
)
assert r.status_code == 200
body = r.json()
assert body["success"] is True
assert body["status_code"] == 200
assert body["body_json"] == {"ok": True}
@respx.mock
def test_test_sends_auth_header(self, client):
route = respx.get("https://example.com/secure").mock(
return_value=httpx.Response(200, json={})
)
client.post(
"/api/v1/http/endpoints/test",
json={
"url": "https://example.com/secure",
"auth_token": "tok-abc",
},
)
sent = route.calls.last.request
assert sent.headers.get("authorization") == "Bearer tok-abc"
def test_test_rejects_loopback_url(self, client):
# Loopback is always blocked, even with the relaxed polling policy.
r = client.post(
"/api/v1/http/endpoints/test",
json={"url": "http://127.0.0.1:9000/"},
)
assert r.status_code == 400
def test_test_rejects_metadata_link_local(self, client):
# AWS/GCP metadata at 169.254.169.254 stays blocked.
r = client.post(
"/api/v1/http/endpoints/test",
json={"url": "http://169.254.169.254/latest/meta-data/"},
)
assert r.status_code == 400
@respx.mock
def test_test_allows_private_lan_ip(self, client):
"""Relaxed polling policy MUST permit private IPs — primary use case."""
respx.get("http://192.168.1.100/status").mock(
return_value=httpx.Response(200, json={"ok": True})
)
r = client.post(
"/api/v1/http/endpoints/test",
json={"url": "http://192.168.1.100/status"},
)
assert r.status_code == 200
assert r.json()["success"] is True
@@ -0,0 +1,168 @@
"""Tests for HTTPEndpointStore — CRUD + auth_token encryption + header policy."""
import pytest
from ledgrab.storage.database import Database
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
from ledgrab.utils import secret_box
@pytest.fixture
def http_store(tmp_path):
db = Database(tmp_path / "test.db")
store = HTTPEndpointStore(db)
yield store
db.close()
class TestCRUD:
def test_create_basic(self, http_store):
e = http_store.create_endpoint(
name="Plex",
url="https://plex.example.com/status/sessions",
)
assert e.id.startswith("htep_")
assert e.url == "https://plex.example.com/status/sessions"
assert e.method == "GET"
assert e.timeout_s == 10.0
assert e.headers == {}
assert e.auth_token == ""
def test_create_with_auth_and_headers(self, http_store):
e = http_store.create_endpoint(
name="Plex",
url="https://plex.example.com/status/sessions",
auth_token="my-secret-token",
headers={"Accept": "application/json"},
timeout_s=5.0,
)
assert e.auth_token == "my-secret-token"
assert e.headers == {"Accept": "application/json"}
assert e.timeout_s == 5.0
def test_create_requires_url(self, http_store):
with pytest.raises(ValueError, match="url is required"):
http_store.create_endpoint(name="x", url="")
def test_create_rejects_unsupported_method(self, http_store):
with pytest.raises(ValueError, match="Unsupported method"):
http_store.create_endpoint(name="x", url="https://example.com", method="POST")
def test_create_rejects_zero_timeout(self, http_store):
with pytest.raises(ValueError, match="timeout_s"):
http_store.create_endpoint(name="x", url="https://example.com", timeout_s=0)
def test_unique_name(self, http_store):
http_store.create_endpoint(name="dup", url="https://a")
with pytest.raises(ValueError, match="already exists"):
http_store.create_endpoint(name="dup", url="https://b")
def test_update(self, http_store):
e = http_store.create_endpoint(name="Plex", url="https://a")
updated = http_store.update_endpoint(
e.id,
url="https://b",
auth_token="new-token",
)
assert updated.url == "https://b"
assert updated.auth_token == "new-token"
assert updated.created_at == e.created_at
assert updated.updated_at > e.updated_at
def test_update_with_empty_token_clears(self, tmp_path):
"""Sending ``auth_token=""`` MUST clear the stored token (the
endpoint route distinguishes None=keep from ""=clear; the store
layer is the source of truth for the on-disk state)."""
db = Database(tmp_path / "test.db")
store = HTTPEndpointStore(db)
e = store.create_endpoint(name="x", url="https://a", auth_token="initial-secret")
assert e.auth_token == "initial-secret"
cleared = store.update_endpoint(e.id, auth_token="")
assert cleared.auth_token == ""
# Re-open the DB to confirm the change is persisted, not just in
# the in-memory cache.
db.close()
db2 = Database(tmp_path / "test.db")
store2 = HTTPEndpointStore(db2)
reloaded = store2.get_endpoint(e.id)
assert reloaded.auth_token == ""
db2.close()
def test_delete(self, http_store):
e = http_store.create_endpoint(name="x", url="https://a")
http_store.delete_endpoint(e.id)
from ledgrab.storage.base_store import EntityNotFoundError
with pytest.raises(EntityNotFoundError):
http_store.get_endpoint(e.id)
class TestTokenEncryption:
"""auth_token must be encrypted at rest, decrypted on load."""
def test_token_encrypted_at_rest(self, tmp_path):
db = Database(tmp_path / "test.db")
store = HTTPEndpointStore(db)
store.create_endpoint(name="x", url="https://a", auth_token="plaintext-secret")
rows = db.load_all("http_endpoints")
assert len(rows) == 1
raw = rows[0]["auth_token"]
assert raw != "plaintext-secret"
assert secret_box.is_encrypted(raw)
db.close()
def test_token_decrypted_on_load(self, tmp_path):
db_path = tmp_path / "test.db"
db = Database(db_path)
store = HTTPEndpointStore(db)
eid = store.create_endpoint(name="x", url="https://a", auth_token="round-trip").id
db.close()
db2 = Database(db_path)
store2 = HTTPEndpointStore(db2)
loaded = store2.get_endpoint(eid)
assert loaded.auth_token == "round-trip"
db2.close()
class TestHeadersAndAuth:
def test_build_request_headers_with_token(self, http_store):
e = http_store.create_endpoint(
name="x",
url="https://a",
auth_token="abc",
headers={"Accept": "application/json"},
)
h = e.build_request_headers()
assert h["Authorization"] == "Bearer abc"
assert h["Accept"] == "application/json"
def test_custom_authorization_header_wins(self, http_store):
e = http_store.create_endpoint(
name="x",
url="https://a",
auth_token="bearer-token",
headers={"Authorization": "Basic dXNlcjpwYXNz"},
)
h = e.build_request_headers()
assert h["Authorization"] == "Basic dXNlcjpwYXNz"
def test_case_insensitive_authorization_check(self, http_store):
"""Lowercase 'authorization' supplied by the user must win too."""
e = http_store.create_endpoint(
name="x",
url="https://a",
auth_token="should-not-be-sent",
headers={"authorization": "Basic xyz"},
)
h = e.build_request_headers()
# No second Authorization key — user's lowercase variant is preserved as-is
assert h.get("authorization") == "Basic xyz"
assert "Authorization" not in h
def test_no_auth_no_header(self, http_store):
e = http_store.create_endpoint(name="x", url="https://a")
assert "Authorization" not in e.build_request_headers()