feat: generic webhook provider with JSONPath payload extraction
Add a new "webhook" provider type that accepts arbitrary HTTP POST payloads, extracts template variables via user-defined JSONPath mappings, and dispatches notifications through the existing pipeline. Supports three auth modes (HMAC-SHA256, Bearer token, none), bounded JSONPath cache, and 1MB payload limit. Full stack: core provider + event parser, API endpoint, DB migration, capabilities, seeds, default templates (EN/RU), frontend descriptor, i18n.
This commit is contained in:
@@ -251,6 +251,8 @@ async def get_template_variables(
|
||||
"variables": {**scheduled_vars, "assets": "List of asset dicts (use {% for asset in assets %})"},
|
||||
"asset_fields": asset_fields,
|
||||
},
|
||||
# --- Generic Webhook slots ---
|
||||
**_webhook_variables(),
|
||||
# --- Gitea slots ---
|
||||
**_gitea_variables(),
|
||||
# --- Planka slots ---
|
||||
@@ -271,6 +273,23 @@ async def get_template_variables(
|
||||
}
|
||||
|
||||
|
||||
def _webhook_variables() -> dict:
|
||||
return {
|
||||
"message_webhook_received": {
|
||||
"description": "Incoming webhook event notification",
|
||||
"variables": {
|
||||
"service_name": "Provider instance name",
|
||||
"event_type_raw": "Raw event type from payload (or 'webhook_received')",
|
||||
"collection_name": "Collection extracted from payload via collection_path (or empty)",
|
||||
"source_ip": "IP address of the webhook sender",
|
||||
"raw_payload": "Full JSON payload as dict (use raw_payload.field or raw_payload | tojson)",
|
||||
"timestamp": "When the webhook was received",
|
||||
"target_type": "Target type: 'telegram' or 'webhook'",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _gitea_variables() -> dict:
|
||||
common = {
|
||||
"sender": "Username who triggered the event",
|
||||
|
||||
@@ -15,6 +15,7 @@ from notify_bridge_core.models.events import ServiceEvent
|
||||
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
|
||||
from notify_bridge_core.providers.gitea.event_parser import parse_webhook as parse_gitea_webhook
|
||||
from notify_bridge_core.providers.planka.event_parser import parse_webhook as parse_planka_webhook
|
||||
from notify_bridge_core.providers.webhook.event_parser import parse_webhook as parse_generic_webhook
|
||||
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import (
|
||||
@@ -289,6 +290,148 @@ async def planka_webhook(provider_id: int, request: Request):
|
||||
return {"ok": True, "dispatched": dispatched}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Generic Webhook endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _verify_generic_webhook_auth(
|
||||
config: dict[str, Any],
|
||||
request: Request,
|
||||
raw_body: bytes,
|
||||
) -> bool:
|
||||
"""Verify authentication for a generic webhook based on configured auth_mode."""
|
||||
auth_mode = config.get("auth_mode", "none")
|
||||
|
||||
if auth_mode == "none":
|
||||
return True
|
||||
|
||||
secret = config.get("webhook_secret", "")
|
||||
if not secret:
|
||||
return False
|
||||
|
||||
if auth_mode == "hmac_sha256":
|
||||
# Support common signature headers
|
||||
signature = (
|
||||
request.headers.get("X-Hub-Signature-256", "")
|
||||
or request.headers.get("X-Webhook-Signature", "")
|
||||
or request.headers.get("X-Signature-256", "")
|
||||
)
|
||||
# Strip "sha256=" prefix if present (GitHub-style)
|
||||
if signature.startswith("sha256="):
|
||||
signature = signature[7:]
|
||||
if not signature:
|
||||
return False
|
||||
expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
|
||||
return hmac.compare_digest(expected, signature)
|
||||
|
||||
if auth_mode == "bearer_token":
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if auth_header.startswith("Bearer "):
|
||||
token = auth_header[7:]
|
||||
return hmac.compare_digest(token, secret)
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@router.post("/webhook/{provider_id}")
|
||||
async def generic_webhook(provider_id: int, request: Request):
|
||||
"""Receive a generic webhook, extract variables via JSONPath, and dispatch notifications."""
|
||||
engine = get_engine()
|
||||
|
||||
# --- Load provider and validate auth ---
|
||||
async with AsyncSession(engine) as session:
|
||||
provider = await session.get(ServiceProvider, provider_id)
|
||||
if not provider or provider.type != "webhook":
|
||||
raise HTTPException(status_code=404, detail="Provider not found")
|
||||
|
||||
provider_config = provider.config or {}
|
||||
provider_name = provider.name
|
||||
|
||||
raw_body = await request.body()
|
||||
|
||||
# Enforce payload size limit BEFORE parsing JSON
|
||||
if len(raw_body) > 1_000_000:
|
||||
raise HTTPException(status_code=413, detail="Payload too large (max 1 MB)")
|
||||
|
||||
if not _verify_generic_webhook_auth(provider_config, request, raw_body):
|
||||
raise HTTPException(status_code=403, detail="Authentication failed")
|
||||
|
||||
# Parse JSON payload
|
||||
try:
|
||||
payload = await request.json()
|
||||
except Exception:
|
||||
raise HTTPException(status_code=400, detail="Invalid JSON")
|
||||
|
||||
# Parse via JSONPath mappings
|
||||
req_headers = dict(request.headers)
|
||||
event = parse_generic_webhook(payload, provider_name, provider_config, headers=req_headers)
|
||||
if event is None:
|
||||
return {"ok": True, "skipped": "parse failed"}
|
||||
|
||||
# Inject source IP
|
||||
source_ip = request.client.host if request.client else ""
|
||||
event.extra["source_ip"] = source_ip
|
||||
|
||||
# --- Find trackers for this provider and dispatch ---
|
||||
dispatched = 0
|
||||
async with AsyncSession(engine) as session:
|
||||
tracker_result = await session.exec(
|
||||
select(NotificationTracker).where(
|
||||
NotificationTracker.provider_id == provider_id,
|
||||
NotificationTracker.enabled == True,
|
||||
)
|
||||
)
|
||||
trackers = tracker_result.all()
|
||||
|
||||
for tracker in trackers:
|
||||
filters = tracker.filters or {}
|
||||
if not _passes_filters(event, filters):
|
||||
_LOGGER.debug(
|
||||
"Event filtered out for tracker %d (%s)", tracker.id, tracker.name
|
||||
)
|
||||
continue
|
||||
|
||||
link_data = await load_link_data(session, tracker.id)
|
||||
if not link_data:
|
||||
continue
|
||||
|
||||
# Log event
|
||||
session.add(EventLog(
|
||||
tracker_id=tracker.id,
|
||||
tracker_name=tracker.name,
|
||||
provider_id=provider_id,
|
||||
provider_name=provider_name,
|
||||
event_type=event.event_type.value,
|
||||
collection_id=event.collection_id,
|
||||
collection_name=event.collection_name,
|
||||
assets_count=0,
|
||||
details={
|
||||
"provider_type": "webhook",
|
||||
"event_type_raw": event.extra.get("event_type_raw", ""),
|
||||
"source_ip": source_ip,
|
||||
},
|
||||
))
|
||||
|
||||
# Dispatch to targets
|
||||
dispatcher = NotificationDispatcher()
|
||||
target_configs = _build_target_configs(event, link_data, provider_config)
|
||||
if target_configs:
|
||||
results = await dispatcher.dispatch(event, target_configs)
|
||||
for r in results:
|
||||
if r.get("success"):
|
||||
dispatched += 1
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"Notification failed for tracker %d: %s",
|
||||
tracker.id, r.get("error", "unknown"),
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
return {"ok": True, "dispatched": dispatched}
|
||||
|
||||
|
||||
def _build_target_configs(
|
||||
event: ServiceEvent,
|
||||
link_data: list[dict[str, Any]],
|
||||
|
||||
@@ -204,6 +204,14 @@ async def migrate_schema(engine: AsyncEngine) -> None:
|
||||
)
|
||||
logger.info("Added %s column to tracking_config table", col_name)
|
||||
|
||||
# Add Generic Webhook tracking flag to tracking_config if missing
|
||||
if await _has_table(conn, "tracking_config"):
|
||||
if not await _has_column(conn, "tracking_config", "track_webhook_received"):
|
||||
await conn.execute(
|
||||
text("ALTER TABLE tracking_config ADD COLUMN track_webhook_received INTEGER DEFAULT 1")
|
||||
)
|
||||
logger.info("Added track_webhook_received column to tracking_config table")
|
||||
|
||||
# Drop legacy template content columns from template_config
|
||||
# (template content moved to template_slot child rows)
|
||||
if await _has_table(conn, "template_config"):
|
||||
|
||||
@@ -160,6 +160,9 @@ class TrackingConfig(SQLModel, table=True):
|
||||
track_ups_replace_battery: bool = Field(default=True)
|
||||
track_ups_overload: bool = Field(default=True)
|
||||
|
||||
# Generic Webhook event tracking
|
||||
track_webhook_received: bool = Field(default=True)
|
||||
|
||||
# Immich asset display
|
||||
track_images: bool = Field(default=True)
|
||||
track_videos: bool = Field(default=True)
|
||||
|
||||
@@ -153,6 +153,7 @@ async def _seed_default_templates() -> None:
|
||||
await _seed_provider_template(session, "scheduler", "Scheduler")
|
||||
await _seed_provider_template(session, "nut", "NUT")
|
||||
await _seed_provider_template(session, "google_photos", "Google Photos")
|
||||
await _seed_provider_template(session, "webhook", "Generic Webhook")
|
||||
await session.commit()
|
||||
|
||||
|
||||
@@ -179,6 +180,9 @@ async def _seed_default_command_templates() -> None:
|
||||
await _seed_provider_command_template(
|
||||
session, "google_photos", "Default Google Photos Commands", "Default Google Photos command templates",
|
||||
)
|
||||
await _seed_provider_command_template(
|
||||
session, "webhook", "Default Webhook Commands", "Default Generic Webhook command templates",
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@@ -229,6 +233,11 @@ async def _seed_default_tracking_configs() -> None:
|
||||
"name": "Default Scheduler",
|
||||
"track_scheduled_message": True,
|
||||
},
|
||||
{
|
||||
"provider_type": "webhook",
|
||||
"name": "Default Webhook",
|
||||
"track_webhook_received": True,
|
||||
},
|
||||
{
|
||||
"provider_type": "nut",
|
||||
"name": "Default NUT",
|
||||
@@ -309,6 +318,14 @@ async def _seed_default_command_configs() -> None:
|
||||
"default_count": 5,
|
||||
"rate_limits": {"api": 15, "default": 10},
|
||||
},
|
||||
{
|
||||
"provider_type": "webhook",
|
||||
"name": "Default Webhook",
|
||||
"enabled_commands": ["help", "status"],
|
||||
"response_mode": "text",
|
||||
"default_count": 5,
|
||||
"rate_limits": {"default": 10},
|
||||
},
|
||||
{
|
||||
"provider_type": "google_photos",
|
||||
"name": "Default Google Photos",
|
||||
|
||||
@@ -84,6 +84,8 @@ def event_allowed_by_config(event: ServiceEvent, tc: TrackingConfig) -> bool:
|
||||
"task_completed": tc.track_task_completed,
|
||||
# Scheduler events
|
||||
"scheduled_message": tc.track_scheduled_message,
|
||||
# Generic Webhook events
|
||||
"webhook_received": tc.track_webhook_received,
|
||||
# NUT (UPS) events
|
||||
"ups_online": tc.track_ups_online,
|
||||
"ups_on_battery": tc.track_ups_on_battery,
|
||||
|
||||
@@ -189,4 +189,8 @@ _SAMPLE_CONTEXT = {
|
||||
"custom_vars": {"team": "Engineering", "message": "Time for standup!"},
|
||||
"team": "Engineering",
|
||||
"message": "Time for standup!",
|
||||
# Generic Webhook variables (for webhook provider templates)
|
||||
"raw_payload": {"action": "opened", "issue": {"title": "Bug report", "number": 1}, "sender": {"login": "user1"}},
|
||||
"event_type_raw": "webhook_received",
|
||||
"source_ip": "192.168.1.100",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user