feat: webhook payload history — store and display recent incoming payloads

Backend:
- WebhookPayloadLog model (provider_id, method, headers, body, status, extracted_fields, error_message)
- Auto-log payloads in generic_webhook() with matched/unmatched/error status
- Auto-prune beyond max_stored_payloads per provider
- Header filtering (only Content-Type, User-Agent, X-* stored; no Authorization)
- GET/DELETE /api/providers/{id}/webhook-logs endpoints
- store_payloads + max_stored_payloads in WebhookProviderConfig

Frontend:
- WebhookPayloadHistory.svelte — expandable log viewer with status badges, JSON body, headers, extracted fields
- payloadHistory flag on webhook provider descriptor
- max_stored_payloads config field (0 = disabled)
- Password confirmation field on change password modal
- i18n keys for webhook logs (en + ru)
This commit is contained in:
2026-03-28 13:54:54 +03:00
parent c41182ffd0
commit 6113a0039c
13 changed files with 459 additions and 5 deletions
@@ -99,6 +99,8 @@ class WebhookProviderConfig(BaseModel):
payload_mappings: list[PayloadMapping] = []
event_type_path: str | None = None
collection_path: str | None = None
store_payloads: bool = True
max_stored_payloads: int = 20 # 1-100
_PROVIDER_CONFIG_MODELS: dict[str, type[BaseModel]] = {
@@ -0,0 +1,76 @@
"""Webhook payload log API routes."""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import delete as sa_delete
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import ServiceProvider, User, WebhookPayloadLog
from .helpers import get_owned_entity
_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/providers", tags=["webhook-logs"])
@router.get("/{provider_id}/webhook-logs")
async def list_webhook_logs(
provider_id: int,
limit: int = 20,
offset: int = 0,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
"""List recent webhook payload logs for a provider."""
provider = await get_owned_entity(
session, ServiceProvider, provider_id, user.id,
not_found_msg="Provider not found",
)
if provider.type != "webhook":
raise HTTPException(status_code=400, detail="Not a webhook provider")
result = await session.exec(
select(WebhookPayloadLog)
.where(WebhookPayloadLog.provider_id == provider_id)
.order_by(WebhookPayloadLog.created_at.desc())
.offset(offset)
.limit(min(limit, 100))
)
return [
{
"id": log.id,
"provider_id": log.provider_id,
"method": log.method,
"headers": log.headers,
"body": log.body,
"status": log.status,
"extracted_fields": log.extracted_fields,
"error_message": log.error_message,
"created_at": log.created_at.isoformat() if log.created_at else None,
}
for log in result.all()
]
@router.delete("/{provider_id}/webhook-logs", status_code=204)
async def clear_webhook_logs(
provider_id: int,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
"""Clear all webhook payload logs for a provider."""
await get_owned_entity(
session, ServiceProvider, provider_id, user.id,
not_found_msg="Provider not found",
)
await session.execute(
sa_delete(WebhookPayloadLog)
.where(WebhookPayloadLog.provider_id == provider_id)
)
await session.commit()
@@ -18,10 +18,13 @@ from notify_bridge_core.providers.planka.event_parser import parse_webhook as pa
from notify_bridge_core.providers.webhook.event_parser import parse_webhook as parse_generic_webhook
from ..database.engine import get_engine
from sqlalchemy import delete as sa_delete, func
from ..database.models import (
EventLog,
NotificationTracker,
ServiceProvider,
WebhookPayloadLog,
)
from ..services.dispatch_helpers import event_allowed_by_config, load_link_data
@@ -334,6 +337,62 @@ def _verify_generic_webhook_auth(
return False
def _filter_headers(raw_headers: dict[str, str]) -> dict[str, str]:
"""Keep only safe headers for logging (no Authorization)."""
safe: dict[str, str] = {}
for k, v in raw_headers.items():
kl = k.lower()
if kl in ("content-type", "user-agent") or kl.startswith("x-"):
safe[k] = v
return safe
async def _save_webhook_log(
session: AsyncSession,
provider_id: int,
method: str,
headers: dict[str, str],
body: dict[str, Any] | str,
status: str,
extracted_fields: dict[str, Any] | None = None,
error_message: str = "",
max_count: int = 20,
) -> None:
"""Insert a webhook payload log entry and prune old ones."""
try:
body_json = body if isinstance(body, dict) else {}
session.add(WebhookPayloadLog(
provider_id=provider_id,
method=method,
headers=headers,
body=body_json,
status=status,
extracted_fields=extracted_fields or {},
error_message=error_message,
))
await session.flush()
count_result = await session.exec(
select(func.count(WebhookPayloadLog.id))
.where(WebhookPayloadLog.provider_id == provider_id)
)
total = count_result.one()
if total > max_count:
oldest = await session.exec(
select(WebhookPayloadLog.id)
.where(WebhookPayloadLog.provider_id == provider_id)
.order_by(WebhookPayloadLog.created_at.asc())
.limit(total - max_count)
)
ids_to_delete = list(oldest.all())
if ids_to_delete:
await session.execute(
sa_delete(WebhookPayloadLog)
.where(WebhookPayloadLog.id.in_(ids_to_delete))
)
except Exception:
_LOGGER.warning("Failed to save webhook payload log for provider %d", provider_id, exc_info=True)
@router.post("/webhook/{provider_id}")
async def generic_webhook(provider_id: int, request: Request):
"""Receive a generic webhook, extract variables via JSONPath, and dispatch notifications."""
@@ -348,6 +407,9 @@ async def generic_webhook(provider_id: int, request: Request):
provider_config = provider.config or {}
provider_name = provider.name
store_payloads = provider_config.get("store_payloads", True)
max_stored = min(max(int(provider_config.get("max_stored_payloads", 20)), 1), 100)
raw_body = await request.body()
# Enforce payload size limit BEFORE parsing JSON
@@ -357,16 +419,32 @@ async def generic_webhook(provider_id: int, request: Request):
if not _verify_generic_webhook_auth(provider_config, request, raw_body):
raise HTTPException(status_code=403, detail="Authentication failed")
safe_headers = _filter_headers(dict(request.headers))
# Parse JSON payload
try:
payload = await request.json()
except Exception:
if store_payloads:
async with AsyncSession(get_engine()) as log_session:
await _save_webhook_log(
log_session, provider_id, request.method, safe_headers,
{}, "error", error_message="Invalid JSON", max_count=max_stored,
)
await log_session.commit()
raise HTTPException(status_code=400, detail="Invalid JSON")
# Parse via JSONPath mappings
req_headers = dict(request.headers)
event = parse_generic_webhook(payload, provider_name, provider_config, headers=req_headers)
if event is None:
if store_payloads:
async with AsyncSession(get_engine()) as log_session:
await _save_webhook_log(
log_session, provider_id, request.method, safe_headers,
payload, "unmatched", max_count=max_stored,
)
await log_session.commit()
return {"ok": True, "skipped": "parse failed"}
# Inject source IP
@@ -427,6 +505,15 @@ async def generic_webhook(provider_id: int, request: Request):
tracker.id, r.get("error", "unknown"),
)
# Log matched payload
if store_payloads:
await _save_webhook_log(
session, provider_id, request.method, safe_headers,
payload, "matched" if dispatched > 0 else "unmatched",
extracted_fields=dict(event.extra),
max_count=max_stored,
)
await session.commit()
return {"ok": True, "dispatched": dispatched}