feat(http-endpoints): introduce HTTP endpoint output target stack
New output kind that POSTs the current strip frame to a user-configured HTTP endpoint, alongside WLED / MQTT / Hue. Stack mirrors the existing output-target shape end-to-end: storage model + store, FastAPI router + Pydantic schemas, JS feature module + modal template, router wiring in api/__init__.py and the modal include in index.html. Tests cover both the routes and the store.
This commit is contained in:
@@ -0,0 +1,185 @@
|
||||
"""Tests for HTTP endpoint routes — CRUD, no-leak, /test, LAN policy."""
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
import respx
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from ledgrab.api import dependencies as deps
|
||||
from ledgrab.api.routes.http_endpoints import router
|
||||
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _route_db(tmp_path):
|
||||
from ledgrab.storage.database import Database
|
||||
|
||||
db = Database(tmp_path / "test.db")
|
||||
yield db
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def http_store(_route_db):
|
||||
return HTTPEndpointStore(_route_db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(http_store):
|
||||
app = FastAPI()
|
||||
app.include_router(router)
|
||||
|
||||
from ledgrab.api.auth import verify_api_key
|
||||
|
||||
app.dependency_overrides[verify_api_key] = lambda: "test-user"
|
||||
app.dependency_overrides[deps.get_http_endpoint_store] = lambda: http_store
|
||||
|
||||
# Stub fire_entity_event's processor_manager
|
||||
deps._deps["processor_manager"] = MagicMock()
|
||||
|
||||
return TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CRUD shape + no-leak guarantee
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCRUDRoutes:
|
||||
def test_create_and_list(self, client):
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints",
|
||||
json={
|
||||
"name": "Plex",
|
||||
"url": "https://example.com/status/sessions",
|
||||
"auth_token": "very-secret",
|
||||
"headers": {"Accept": "application/json"},
|
||||
},
|
||||
)
|
||||
assert r.status_code == 201
|
||||
body = r.json()
|
||||
assert body["id"].startswith("htep_")
|
||||
assert body["auth_token_set"] is True
|
||||
assert "very-secret" not in r.text
|
||||
|
||||
lst = client.get("/api/v1/http/endpoints").json()
|
||||
assert lst["count"] == 1
|
||||
|
||||
def test_response_never_exposes_auth_token(self, client):
|
||||
"""Defence-in-depth: scan every CRUD response for the token string."""
|
||||
token = "uniq-token-12345-no-collisions"
|
||||
post = client.post(
|
||||
"/api/v1/http/endpoints",
|
||||
json={
|
||||
"name": "x",
|
||||
"url": "https://example.com",
|
||||
"auth_token": token,
|
||||
},
|
||||
)
|
||||
assert post.status_code == 201
|
||||
endpoint_id = post.json()["id"]
|
||||
|
||||
get = client.get(f"/api/v1/http/endpoints/{endpoint_id}")
|
||||
put = client.put(f"/api/v1/http/endpoints/{endpoint_id}", json={"name": "renamed"})
|
||||
lst = client.get("/api/v1/http/endpoints")
|
||||
|
||||
for resp in (post, get, put, lst):
|
||||
assert token not in resp.text, f"token leaked in {resp.url}"
|
||||
|
||||
def test_method_allowlist_at_schema_layer(self, client):
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints",
|
||||
json={"name": "x", "url": "https://example.com", "method": "POST"},
|
||||
)
|
||||
assert r.status_code == 422
|
||||
|
||||
def test_crlf_in_header_rejected(self, client):
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints",
|
||||
json={
|
||||
"name": "x",
|
||||
"url": "https://example.com",
|
||||
"headers": {"X-Inject": "value\r\nX-Smuggle: yes"},
|
||||
},
|
||||
)
|
||||
assert r.status_code == 422
|
||||
|
||||
def test_invalid_header_name_rejected(self, client):
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints",
|
||||
json={
|
||||
"name": "x",
|
||||
"url": "https://example.com",
|
||||
"headers": {"Bad Name With Space": "v"},
|
||||
},
|
||||
)
|
||||
assert r.status_code == 422
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /test one-shot endpoint + LAN policy
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOneShotEndpoint:
|
||||
@respx.mock
|
||||
def test_test_success(self, client):
|
||||
respx.get("https://example.com/status").mock(
|
||||
return_value=httpx.Response(200, json={"ok": True})
|
||||
)
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints/test",
|
||||
json={"url": "https://example.com/status"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["success"] is True
|
||||
assert body["status_code"] == 200
|
||||
assert body["body_json"] == {"ok": True}
|
||||
|
||||
@respx.mock
|
||||
def test_test_sends_auth_header(self, client):
|
||||
route = respx.get("https://example.com/secure").mock(
|
||||
return_value=httpx.Response(200, json={})
|
||||
)
|
||||
client.post(
|
||||
"/api/v1/http/endpoints/test",
|
||||
json={
|
||||
"url": "https://example.com/secure",
|
||||
"auth_token": "tok-abc",
|
||||
},
|
||||
)
|
||||
sent = route.calls.last.request
|
||||
assert sent.headers.get("authorization") == "Bearer tok-abc"
|
||||
|
||||
def test_test_rejects_loopback_url(self, client):
|
||||
# Loopback is always blocked, even with the relaxed polling policy.
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints/test",
|
||||
json={"url": "http://127.0.0.1:9000/"},
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
def test_test_rejects_metadata_link_local(self, client):
|
||||
# AWS/GCP metadata at 169.254.169.254 stays blocked.
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints/test",
|
||||
json={"url": "http://169.254.169.254/latest/meta-data/"},
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
@respx.mock
|
||||
def test_test_allows_private_lan_ip(self, client):
|
||||
"""Relaxed polling policy MUST permit private IPs — primary use case."""
|
||||
respx.get("http://192.168.1.100/status").mock(
|
||||
return_value=httpx.Response(200, json={"ok": True})
|
||||
)
|
||||
r = client.post(
|
||||
"/api/v1/http/endpoints/test",
|
||||
json={"url": "http://192.168.1.100/status"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json()["success"] is True
|
||||
@@ -0,0 +1,168 @@
|
||||
"""Tests for HTTPEndpointStore — CRUD + auth_token encryption + header policy."""
|
||||
|
||||
import pytest
|
||||
|
||||
from ledgrab.storage.database import Database
|
||||
from ledgrab.storage.http_endpoint_store import HTTPEndpointStore
|
||||
from ledgrab.utils import secret_box
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def http_store(tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
store = HTTPEndpointStore(db)
|
||||
yield store
|
||||
db.close()
|
||||
|
||||
|
||||
class TestCRUD:
|
||||
def test_create_basic(self, http_store):
|
||||
e = http_store.create_endpoint(
|
||||
name="Plex",
|
||||
url="https://plex.example.com/status/sessions",
|
||||
)
|
||||
assert e.id.startswith("htep_")
|
||||
assert e.url == "https://plex.example.com/status/sessions"
|
||||
assert e.method == "GET"
|
||||
assert e.timeout_s == 10.0
|
||||
assert e.headers == {}
|
||||
assert e.auth_token == ""
|
||||
|
||||
def test_create_with_auth_and_headers(self, http_store):
|
||||
e = http_store.create_endpoint(
|
||||
name="Plex",
|
||||
url="https://plex.example.com/status/sessions",
|
||||
auth_token="my-secret-token",
|
||||
headers={"Accept": "application/json"},
|
||||
timeout_s=5.0,
|
||||
)
|
||||
assert e.auth_token == "my-secret-token"
|
||||
assert e.headers == {"Accept": "application/json"}
|
||||
assert e.timeout_s == 5.0
|
||||
|
||||
def test_create_requires_url(self, http_store):
|
||||
with pytest.raises(ValueError, match="url is required"):
|
||||
http_store.create_endpoint(name="x", url="")
|
||||
|
||||
def test_create_rejects_unsupported_method(self, http_store):
|
||||
with pytest.raises(ValueError, match="Unsupported method"):
|
||||
http_store.create_endpoint(name="x", url="https://example.com", method="POST")
|
||||
|
||||
def test_create_rejects_zero_timeout(self, http_store):
|
||||
with pytest.raises(ValueError, match="timeout_s"):
|
||||
http_store.create_endpoint(name="x", url="https://example.com", timeout_s=0)
|
||||
|
||||
def test_unique_name(self, http_store):
|
||||
http_store.create_endpoint(name="dup", url="https://a")
|
||||
with pytest.raises(ValueError, match="already exists"):
|
||||
http_store.create_endpoint(name="dup", url="https://b")
|
||||
|
||||
def test_update(self, http_store):
|
||||
e = http_store.create_endpoint(name="Plex", url="https://a")
|
||||
updated = http_store.update_endpoint(
|
||||
e.id,
|
||||
url="https://b",
|
||||
auth_token="new-token",
|
||||
)
|
||||
assert updated.url == "https://b"
|
||||
assert updated.auth_token == "new-token"
|
||||
assert updated.created_at == e.created_at
|
||||
assert updated.updated_at > e.updated_at
|
||||
|
||||
def test_update_with_empty_token_clears(self, tmp_path):
|
||||
"""Sending ``auth_token=""`` MUST clear the stored token (the
|
||||
endpoint route distinguishes None=keep from ""=clear; the store
|
||||
layer is the source of truth for the on-disk state)."""
|
||||
db = Database(tmp_path / "test.db")
|
||||
store = HTTPEndpointStore(db)
|
||||
e = store.create_endpoint(name="x", url="https://a", auth_token="initial-secret")
|
||||
assert e.auth_token == "initial-secret"
|
||||
|
||||
cleared = store.update_endpoint(e.id, auth_token="")
|
||||
assert cleared.auth_token == ""
|
||||
|
||||
# Re-open the DB to confirm the change is persisted, not just in
|
||||
# the in-memory cache.
|
||||
db.close()
|
||||
db2 = Database(tmp_path / "test.db")
|
||||
store2 = HTTPEndpointStore(db2)
|
||||
reloaded = store2.get_endpoint(e.id)
|
||||
assert reloaded.auth_token == ""
|
||||
db2.close()
|
||||
|
||||
def test_delete(self, http_store):
|
||||
e = http_store.create_endpoint(name="x", url="https://a")
|
||||
http_store.delete_endpoint(e.id)
|
||||
from ledgrab.storage.base_store import EntityNotFoundError
|
||||
|
||||
with pytest.raises(EntityNotFoundError):
|
||||
http_store.get_endpoint(e.id)
|
||||
|
||||
|
||||
class TestTokenEncryption:
|
||||
"""auth_token must be encrypted at rest, decrypted on load."""
|
||||
|
||||
def test_token_encrypted_at_rest(self, tmp_path):
|
||||
db = Database(tmp_path / "test.db")
|
||||
store = HTTPEndpointStore(db)
|
||||
store.create_endpoint(name="x", url="https://a", auth_token="plaintext-secret")
|
||||
|
||||
rows = db.load_all("http_endpoints")
|
||||
assert len(rows) == 1
|
||||
raw = rows[0]["auth_token"]
|
||||
assert raw != "plaintext-secret"
|
||||
assert secret_box.is_encrypted(raw)
|
||||
db.close()
|
||||
|
||||
def test_token_decrypted_on_load(self, tmp_path):
|
||||
db_path = tmp_path / "test.db"
|
||||
db = Database(db_path)
|
||||
store = HTTPEndpointStore(db)
|
||||
eid = store.create_endpoint(name="x", url="https://a", auth_token="round-trip").id
|
||||
db.close()
|
||||
|
||||
db2 = Database(db_path)
|
||||
store2 = HTTPEndpointStore(db2)
|
||||
loaded = store2.get_endpoint(eid)
|
||||
assert loaded.auth_token == "round-trip"
|
||||
db2.close()
|
||||
|
||||
|
||||
class TestHeadersAndAuth:
|
||||
def test_build_request_headers_with_token(self, http_store):
|
||||
e = http_store.create_endpoint(
|
||||
name="x",
|
||||
url="https://a",
|
||||
auth_token="abc",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
h = e.build_request_headers()
|
||||
assert h["Authorization"] == "Bearer abc"
|
||||
assert h["Accept"] == "application/json"
|
||||
|
||||
def test_custom_authorization_header_wins(self, http_store):
|
||||
e = http_store.create_endpoint(
|
||||
name="x",
|
||||
url="https://a",
|
||||
auth_token="bearer-token",
|
||||
headers={"Authorization": "Basic dXNlcjpwYXNz"},
|
||||
)
|
||||
h = e.build_request_headers()
|
||||
assert h["Authorization"] == "Basic dXNlcjpwYXNz"
|
||||
|
||||
def test_case_insensitive_authorization_check(self, http_store):
|
||||
"""Lowercase 'authorization' supplied by the user must win too."""
|
||||
e = http_store.create_endpoint(
|
||||
name="x",
|
||||
url="https://a",
|
||||
auth_token="should-not-be-sent",
|
||||
headers={"authorization": "Basic xyz"},
|
||||
)
|
||||
h = e.build_request_headers()
|
||||
# No second Authorization key — user's lowercase variant is preserved as-is
|
||||
assert h.get("authorization") == "Basic xyz"
|
||||
assert "Authorization" not in h
|
||||
|
||||
def test_no_auth_no_header(self, http_store):
|
||||
e = http_store.create_endpoint(name="x", url="https://a")
|
||||
assert "Authorization" not in e.build_request_headers()
|
||||
Reference in New Issue
Block a user