feat: provider-strict configs, slot-based templates, broadcast targets, email bots, command templates

Major architectural improvements:
- Provider-type enforcement: configs validated against provider type at assignment
- TemplateConfig migrated to slot-based pattern (TemplateSlot child table)
- Broadcast targets: TargetReceiver child table for multi-receiver dispatch
- EmailBot: first-class email sender entity with SMTP config, test connection
- CommandTemplateConfig: generic slot-based command response templates
- Provider capability registry: dynamic slot/event/command definitions per provider
- CommandTracker play/pause button matches NotificationTracker style
This commit is contained in:
2026-03-21 16:33:24 +03:00
parent 371ea70756
commit 846d480d38
27 changed files with 2355 additions and 205 deletions
@@ -0,0 +1,230 @@
"""Command template configuration CRUD API routes.
Template content is stored in CommandTemplateSlot child rows (one per slot_name).
Slot names correspond to command names (e.g. 'status', 'help', 'albums').
"""
import logging
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from jinja2.sandbox import SandboxedEnvironment
from jinja2 import TemplateSyntaxError, UndefinedError, StrictUndefined
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import CommandTemplateConfig, CommandTemplateSlot, User
_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/command-template-configs", tags=["command-template-configs"])
class CommandTemplateConfigCreate(BaseModel):
provider_type: str
name: str
description: str | None = None
icon: str | None = None
slots: dict[str, str] = {} # slot_name -> template text
class CommandTemplateConfigUpdate(BaseModel):
name: str | None = None
description: str | None = None
icon: str | None = None
slots: dict[str, str] | None = None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _load_slots(session: AsyncSession, config_id: int) -> dict[str, str]:
result = await session.exec(
select(CommandTemplateSlot).where(CommandTemplateSlot.config_id == config_id)
)
return {s.slot_name: s.template for s in result.all()}
async def _save_slots(session: AsyncSession, config_id: int, slots: dict[str, str]) -> None:
for slot_name, template_text in slots.items():
result = await session.exec(
select(CommandTemplateSlot).where(
CommandTemplateSlot.config_id == config_id,
CommandTemplateSlot.slot_name == slot_name,
)
)
existing = result.first()
if existing:
existing.template = template_text
session.add(existing)
else:
session.add(CommandTemplateSlot(
config_id=config_id,
slot_name=slot_name,
template=template_text,
))
async def _response(session: AsyncSession, c: CommandTemplateConfig) -> dict[str, Any]:
slots = await _load_slots(session, c.id)
return {
"id": c.id,
"user_id": c.user_id,
"provider_type": c.provider_type,
"name": c.name,
"description": c.description,
"icon": c.icon,
"slots": slots,
"created_at": c.created_at.isoformat(),
}
async def _get(session: AsyncSession, config_id: int, user_id: int) -> CommandTemplateConfig:
config = await session.get(CommandTemplateConfig, config_id)
if not config or (config.user_id != user_id and config.user_id != 0):
raise HTTPException(status_code=404, detail="Command template config not found")
return config
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.get("")
async def list_configs(
provider_type: str | None = None,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
from sqlalchemy import or_
query = select(CommandTemplateConfig).where(
or_(CommandTemplateConfig.user_id == user.id, CommandTemplateConfig.user_id == 0)
)
if provider_type:
query = query.where(CommandTemplateConfig.provider_type == provider_type)
result = await session.exec(query)
return [await _response(session, c) for c in result.all()]
@router.post("", status_code=status.HTTP_201_CREATED)
async def create_config(
body: CommandTemplateConfigCreate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
config = CommandTemplateConfig(
user_id=user.id,
provider_type=body.provider_type,
name=body.name,
description=body.description or "",
icon=body.icon or "",
)
session.add(config)
await session.flush()
if body.slots:
await _save_slots(session, config.id, body.slots)
await session.commit()
await session.refresh(config)
return await _response(session, config)
@router.get("/{config_id}")
async def get_config(
config_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
config = await _get(session, config_id, user.id)
return await _response(session, config)
@router.put("/{config_id}")
async def update_config(
config_id: int,
body: CommandTemplateConfigUpdate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
config = await _get(session, config_id, user.id)
for field, value in body.model_dump(exclude_unset=True, exclude={"slots"}).items():
if value is not None:
setattr(config, field, value)
session.add(config)
if body.slots is not None:
await _save_slots(session, config.id, body.slots)
await session.commit()
await session.refresh(config)
return await _response(session, config)
@router.delete("/{config_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_config(
config_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
config = await _get(session, config_id, user.id)
slot_result = await session.exec(
select(CommandTemplateSlot).where(CommandTemplateSlot.config_id == config.id)
)
for slot in slot_result.all():
await session.delete(slot)
await session.delete(config)
await session.commit()
class PreviewRequest(BaseModel):
template: str
@router.post("/preview-raw")
async def preview_raw(
body: PreviewRequest,
user: User = Depends(get_current_user),
):
"""Render arbitrary Jinja2 template text with sample command context."""
sample_ctx = {
"trackers_active": 2,
"trackers_total": 3,
"total_albums": 5,
"last_event": "2026-03-19 14:30",
"albums": [
{"name": "Family Photos", "asset_count": 142, "url": "https://example.com/albums/1"},
{"name": "Vacation 2025", "asset_count": 87, "url": "https://example.com/albums/2"},
],
"events": [
{"type": "assets_added", "album": "Family Photos", "count": 3, "date": "2026-03-19 14:30"},
{"type": "assets_removed", "album": "Vacation 2025", "count": 1, "date": "2026-03-19 12:00"},
],
"people": ["Alice", "Bob", "Charlie"],
"assets": [
{"filename": "IMG_001.jpg", "type": "IMAGE", "created_at": "2026-03-19T14:30:00"},
{"filename": "VID_002.mp4", "type": "VIDEO", "created_at": "2026-03-19T15:00:00"},
],
"search_query": "sunset",
"search_results_count": 5,
"command": "status",
"bot_name": "NotifyBridgeBot",
"locale": "en",
}
try:
env = SandboxedEnvironment(autoescape=False)
env.from_string(body.template)
except TemplateSyntaxError as e:
return {"rendered": None, "error": e.message, "error_line": e.lineno}
try:
strict_env = SandboxedEnvironment(autoescape=False, undefined=StrictUndefined)
tmpl = strict_env.from_string(body.template)
rendered = tmpl.render(**sample_ctx)
return {"rendered": rendered}
except UndefinedError as e:
return {"rendered": None, "error": str(e), "error_line": None, "error_type": "undefined"}
except Exception as e:
return {"rendered": None, "error": str(e), "error_line": None}
@@ -0,0 +1,148 @@
"""Email bot management API routes."""
import logging
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import EmailBot, User
_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/email-bots", tags=["email-bots"])
class EmailBotCreate(BaseModel):
name: str
icon: str = ""
email: str
smtp_host: str
smtp_port: int = 587
smtp_username: str = ""
smtp_password: str = ""
smtp_use_tls: bool = True
class EmailBotUpdate(BaseModel):
name: str | None = None
icon: str | None = None
email: str | None = None
smtp_host: str | None = None
smtp_port: int | None = None
smtp_username: str | None = None
smtp_password: str | None = None
smtp_use_tls: bool | None = None
@router.get("")
async def list_email_bots(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
result = await session.exec(
select(EmailBot).where(EmailBot.user_id == user.id)
)
return [_response(b) for b in result.all()]
@router.post("", status_code=status.HTTP_201_CREATED)
async def create_email_bot(
body: EmailBotCreate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
bot = EmailBot(user_id=user.id, **body.model_dump())
session.add(bot)
await session.commit()
await session.refresh(bot)
return _response(bot)
@router.get("/{bot_id}")
async def get_email_bot(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
bot = await _get_user_bot(session, bot_id, user.id)
return _response(bot)
@router.put("/{bot_id}")
async def update_email_bot(
bot_id: int,
body: EmailBotUpdate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
bot = await _get_user_bot(session, bot_id, user.id)
for field, value in body.model_dump(exclude_unset=True).items():
setattr(bot, field, value)
session.add(bot)
await session.commit()
await session.refresh(bot)
return _response(bot)
@router.delete("/{bot_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_email_bot(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
bot = await _get_user_bot(session, bot_id, user.id)
await session.delete(bot)
await session.commit()
@router.post("/{bot_id}/test")
async def test_email_bot(
bot_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Send a test email to the bot's own address to verify SMTP connection."""
bot = await _get_user_bot(session, bot_id, user.id)
from notify_bridge_core.notifications.email.client import EmailClient, SmtpConfig
client = EmailClient(SmtpConfig(
host=bot.smtp_host,
port=bot.smtp_port,
username=bot.smtp_username,
password=bot.smtp_password,
from_address=bot.email,
from_name=bot.name,
use_tls=bot.smtp_use_tls,
))
result = await client.send(
to_email=bot.email,
subject="Notify Bridge — Test Connection",
body_text="This is a test email from Notify Bridge. Your SMTP settings are working correctly.",
)
return result
def _response(bot: EmailBot) -> dict:
return {
"id": bot.id,
"name": bot.name,
"icon": bot.icon,
"email": bot.email,
"smtp_host": bot.smtp_host,
"smtp_port": bot.smtp_port,
"smtp_username": bot.smtp_username,
"smtp_password": "***" if bot.smtp_password else "",
"smtp_use_tls": bot.smtp_use_tls,
"created_at": bot.created_at.isoformat(),
}
async def _get_user_bot(session: AsyncSession, bot_id: int, user_id: int) -> EmailBot:
bot = await session.get(EmailBot, bot_id)
if not bot or bot.user_id != user_id:
raise HTTPException(status_code=404, detail="Email bot not found")
return bot
@@ -16,6 +16,7 @@ from ..database.models import (
NotificationTrackerTarget,
ServiceProvider,
TemplateConfig,
TemplateSlot,
TrackingConfig,
User,
)
@@ -65,7 +66,7 @@ async def create_notification_tracker_target(
session: AsyncSession = Depends(get_session),
):
"""Link a target to a notification tracker with per-link configuration."""
await _get_user_tracker(session, tracker_id, user.id)
tracker = await _get_user_tracker(session, tracker_id, user.id)
# Validate target exists and belongs to user
target = await session.get(NotificationTarget, body.target_id)
@@ -85,15 +86,30 @@ async def create_notification_tracker_target(
detail="Target is already linked to this tracker",
)
# Validate config ownership
# Resolve tracker's provider type for config validation
provider = await session.get(ServiceProvider, tracker.provider_id)
if not provider:
raise HTTPException(status_code=404, detail="Provider not found")
# Validate config ownership + provider type match
if body.tracking_config_id:
tc = await session.get(TrackingConfig, body.tracking_config_id)
if not tc or tc.user_id != user.id:
raise HTTPException(status_code=404, detail="Tracking config not found")
if tc.provider_type != provider.type:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Tracking config provider type '{tc.provider_type}' does not match tracker provider '{provider.type}'",
)
if body.template_config_id:
tpc = await session.get(TemplateConfig, body.template_config_id)
if not tpc or (tpc.user_id != user.id and tpc.user_id != 0):
raise HTTPException(status_code=404, detail="Template config not found")
if tpc.provider_type != provider.type:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Template config provider type '{tpc.provider_type}' does not match tracker provider '{provider.type}'",
)
tt = NotificationTrackerTarget(tracker_id=tracker_id, **body.model_dump())
session.add(tt)
@@ -111,21 +127,34 @@ async def update_notification_tracker_target(
session: AsyncSession = Depends(get_session),
):
"""Update a notification tracker-target link's configuration."""
await _get_user_tracker(session, tracker_id, user.id)
tracker = await _get_user_tracker(session, tracker_id, user.id)
tt = await session.get(NotificationTrackerTarget, tracker_target_id)
if not tt or tt.tracker_id != tracker_id:
raise HTTPException(status_code=404, detail="Tracker-target link not found")
provider = await session.get(ServiceProvider, tracker.provider_id)
if not provider:
raise HTTPException(status_code=404, detail="Provider not found")
updates = body.model_dump(exclude_unset=True)
# Validate config ownership if being changed
# Validate config ownership + provider type match if being changed
if "tracking_config_id" in updates and updates["tracking_config_id"]:
tc = await session.get(TrackingConfig, updates["tracking_config_id"])
if not tc or tc.user_id != user.id:
raise HTTPException(status_code=404, detail="Tracking config not found")
if tc.provider_type != provider.type:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Tracking config provider type '{tc.provider_type}' does not match tracker provider '{provider.type}'",
)
if "template_config_id" in updates and updates["template_config_id"]:
tpc = await session.get(TemplateConfig, updates["template_config_id"])
if not tpc or (tpc.user_id != user.id and tpc.user_id != 0):
raise HTTPException(status_code=404, detail="Template config not found")
if tpc.provider_type != provider.type:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Template config provider type '{tpc.provider_type}' does not match tracker provider '{provider.type}'",
)
for field, value in updates.items():
setattr(tt, field, value)
@@ -183,15 +212,24 @@ async def test_notification_tracker_target(
# For periodic/scheduled/memory — fetch real data from provider
template_config = None
template_str = ""
if tt.template_config_id:
template_config = await session.get(TemplateConfig, tt.template_config_id)
slot_map = {
"periodic": "periodic_summary_message",
"scheduled": "scheduled_assets_message",
"memory": "memory_mode_message",
}
template_str = getattr(template_config, slot_map[test_type], "") if template_config else ""
if template_config:
slot_map = {
"periodic": "periodic_summary_message",
"scheduled": "scheduled_assets_message",
"memory": "memory_mode_message",
}
slot_name = slot_map[test_type]
slot_result = await session.exec(
select(TemplateSlot).where(
TemplateSlot.config_id == template_config.id,
TemplateSlot.slot_name == slot_name,
)
)
slot = slot_result.first()
template_str = slot.template if slot else ""
# Load provider and tracker data eagerly before aiohttp context
provider = await session.get(ServiceProvider, tracker.provider_id)
@@ -94,6 +94,40 @@ async def create_provider(
return _provider_response(provider)
@router.get("/capabilities")
async def list_provider_capabilities():
"""List capabilities for all registered provider types."""
from notify_bridge_core.providers.capabilities import get_all_capabilities
result = {}
for pt, caps in get_all_capabilities().items():
result[pt] = {
"provider_type": caps.provider_type,
"display_name": caps.display_name,
"notification_slots": caps.notification_slots,
"command_slots": caps.command_slots,
"events": caps.events,
"commands": caps.commands,
}
return result
@router.get("/capabilities/{provider_type}")
async def get_provider_capabilities(provider_type: str):
"""Get capabilities for a provider type (events, slots, commands)."""
from notify_bridge_core.providers.capabilities import get_capabilities
caps = get_capabilities(provider_type)
if not caps:
raise HTTPException(status_code=404, detail=f"Unknown provider type: {provider_type}")
return {
"provider_type": caps.provider_type,
"display_name": caps.display_name,
"notification_slots": caps.notification_slots,
"command_slots": caps.command_slots,
"events": caps.events,
"commands": caps.commands,
}
@router.get("/{provider_id}")
async def get_provider(
provider_id: int,
@@ -0,0 +1,147 @@
"""Target receiver management API routes (nested under targets)."""
import logging
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import NotificationTarget, TargetReceiver, User
_LOGGER = logging.getLogger(__name__)
router = APIRouter(prefix="/api/targets/{target_id}/receivers", tags=["target-receivers"])
class ReceiverCreate(BaseModel):
name: str = ""
config: dict[str, Any] = {}
enabled: bool = True
class ReceiverUpdate(BaseModel):
name: str | None = None
config: dict[str, Any] | None = None
enabled: bool | None = None
def _receiver_key(target_type: str, config: dict[str, Any]) -> str:
"""Derive a unique key for deduplication from receiver config."""
if target_type == "telegram":
return str(config.get("chat_id", ""))
elif target_type == "webhook":
return config.get("url", "")
elif target_type == "email":
return config.get("email", "")
return ""
@router.get("")
async def list_receivers(
target_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
target = await _get_user_target(session, target_id, user.id)
result = await session.exec(
select(TargetReceiver).where(TargetReceiver.target_id == target.id)
)
return [_response(r) for r in result.all()]
@router.post("", status_code=status.HTTP_201_CREATED)
async def create_receiver(
target_id: int,
body: ReceiverCreate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
target = await _get_user_target(session, target_id, user.id)
key = _receiver_key(target.type, body.config)
if not key:
raise HTTPException(status_code=400, detail="Receiver config must include a delivery endpoint (chat_id, url, or email)")
# Check for duplicate
existing = await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target.id,
TargetReceiver.receiver_key == key,
)
)
if existing.first():
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Receiver already exists for this target")
receiver = TargetReceiver(
target_id=target.id,
name=body.name,
config=body.config,
receiver_key=key,
enabled=body.enabled,
)
session.add(receiver)
await session.commit()
await session.refresh(receiver)
return _response(receiver)
@router.put("/{receiver_id}")
async def update_receiver(
target_id: int,
receiver_id: int,
body: ReceiverUpdate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
await _get_user_target(session, target_id, user.id)
receiver = await session.get(TargetReceiver, receiver_id)
if not receiver or receiver.target_id != target_id:
raise HTTPException(status_code=404, detail="Receiver not found")
for field, value in body.model_dump(exclude_unset=True).items():
setattr(receiver, field, value)
# Update receiver_key if config changed
if body.config is not None:
target = await session.get(NotificationTarget, target_id)
receiver.receiver_key = _receiver_key(target.type, receiver.config)
session.add(receiver)
await session.commit()
await session.refresh(receiver)
return _response(receiver)
@router.delete("/{receiver_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_receiver(
target_id: int,
receiver_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
await _get_user_target(session, target_id, user.id)
receiver = await session.get(TargetReceiver, receiver_id)
if not receiver or receiver.target_id != target_id:
raise HTTPException(status_code=404, detail="Receiver not found")
await session.delete(receiver)
await session.commit()
def _response(r: TargetReceiver) -> dict:
return {
"id": r.id,
"target_id": r.target_id,
"name": r.name,
"config": dict(r.config),
"receiver_key": r.receiver_key,
"enabled": r.enabled,
"created_at": r.created_at.isoformat(),
}
async def _get_user_target(session: AsyncSession, target_id: int, user_id: int) -> NotificationTarget:
target = await session.get(NotificationTarget, target_id)
if not target or target.user_id != user_id:
raise HTTPException(status_code=404, detail="Target not found")
return target
@@ -10,7 +10,7 @@ from typing import Any
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import NotificationTarget, NotificationTrackerTarget, TelegramBot, TelegramChat, User
from ..database.models import NotificationTarget, NotificationTrackerTarget, TargetReceiver, TelegramBot, TelegramChat, User
from ..services.notifier import send_test_notification
_LOGGER = logging.getLogger(__name__)
@@ -61,7 +61,15 @@ async def list_targets(
if chat:
chat_names[f"{bot_id}_{chat_id}"] = chat.title or chat.username or ""
return [_target_response(t, chat_names) for t in targets]
# Load receiver counts
receiver_counts: dict[int, int] = {}
for tgt in targets:
recv_result = await session.exec(
select(TargetReceiver).where(TargetReceiver.target_id == tgt.id)
)
receiver_counts[tgt.id] = len(recv_result.all())
return [_target_response(t, chat_names, receiver_counts.get(t.id, 0)) for t in targets]
@router.post("", status_code=status.HTTP_201_CREATED)
@@ -71,10 +79,10 @@ async def create_target(
session: AsyncSession = Depends(get_session),
):
"""Create a new notification target."""
if body.type not in ("telegram", "webhook"):
if body.type not in ("telegram", "webhook", "email"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Type must be 'telegram' or 'webhook'",
detail="Type must be 'telegram', 'webhook', or 'email'",
)
target = NotificationTarget(
user_id=user.id,
@@ -124,7 +132,7 @@ async def delete_target(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Delete a notification target and its tracker links."""
"""Delete a notification target, its tracker links, and receivers."""
target = await _get_user_target(session, target_id, user.id)
# Delete associated tracker-target links
result = await session.exec(
@@ -132,6 +140,12 @@ async def delete_target(
)
for tt in result.all():
await session.delete(tt)
# Delete receivers
recv_result = await session.exec(
select(TargetReceiver).where(TargetReceiver.target_id == target_id)
)
for r in recv_result.all():
await session.delete(r)
await session.delete(target)
await session.commit()
@@ -149,7 +163,7 @@ async def test_target(
return result
def _target_response(target: NotificationTarget, chat_names: dict[str, str] | None = None) -> dict:
def _target_response(target: NotificationTarget, chat_names: dict[str, str] | None = None, receiver_count: int = 0) -> dict:
resp = {
"id": target.id,
"type": target.type,
@@ -157,6 +171,7 @@ def _target_response(target: NotificationTarget, chat_names: dict[str, str] | No
"icon": target.icon,
"config": _safe_config(target),
"chat_action": target.chat_action,
"receiver_count": receiver_count,
"created_at": target.created_at.isoformat(),
}
# Attach resolved chat name for telegram targets
@@ -1,6 +1,11 @@
"""Template configuration CRUD API routes."""
"""Template configuration CRUD API routes.
Template content is stored in TemplateSlot child rows (one per slot_name).
The API exposes slots as a flat dict in create/update/response payloads.
"""
import logging
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
@@ -12,7 +17,7 @@ from jinja2 import TemplateSyntaxError, UndefinedError, StrictUndefined
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import TemplateConfig, User
from ..database.models import TemplateConfig, TemplateSlot, User
from ..services.sample_context import _SAMPLE_CONTEXT
_LOGGER = logging.getLogger(__name__)
@@ -25,21 +30,83 @@ class TemplateConfigCreate(BaseModel):
name: str
description: str | None = None
icon: str | None = None
message_assets_added: str | None = None
message_assets_removed: str | None = None
message_collection_renamed: str | None = None
message_collection_deleted: str | None = None
message_sharing_changed: str | None = None
periodic_summary_message: str | None = None
scheduled_assets_message: str | None = None
memory_mode_message: str | None = None
date_format: str | None = None
date_only_format: str | None = None
slots: dict[str, str] = {} # slot_name -> template text
TemplateConfigUpdate = TemplateConfigCreate # Same shape, all optional
class TemplateConfigUpdate(BaseModel):
name: str | None = None
description: str | None = None
icon: str | None = None
date_format: str | None = None
date_only_format: str | None = None
slots: dict[str, str] | None = None # partial update: only provided slots change
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _load_slots(session: AsyncSession, config_id: int) -> dict[str, str]:
"""Load all template slots for a config as a dict."""
result = await session.exec(
select(TemplateSlot).where(TemplateSlot.config_id == config_id)
)
return {s.slot_name: s.template for s in result.all()}
async def _save_slots(
session: AsyncSession, config_id: int, slots: dict[str, str]
) -> None:
"""Create or update template slots for a config."""
for slot_name, template_text in slots.items():
result = await session.exec(
select(TemplateSlot).where(
TemplateSlot.config_id == config_id,
TemplateSlot.slot_name == slot_name,
)
)
existing = result.first()
if existing:
existing.template = template_text
session.add(existing)
else:
session.add(TemplateSlot(
config_id=config_id,
slot_name=slot_name,
template=template_text,
))
async def _response(session: AsyncSession, c: TemplateConfig) -> dict[str, Any]:
"""Build API response dict for a TemplateConfig, including its slots."""
slots = await _load_slots(session, c.id)
return {
"id": c.id,
"user_id": c.user_id,
"provider_type": c.provider_type,
"name": c.name,
"description": c.description,
"icon": c.icon,
"date_format": c.date_format,
"date_only_format": c.date_only_format,
"slots": slots,
"created_at": c.created_at.isoformat(),
}
async def _get(session: AsyncSession, config_id: int, user_id: int) -> TemplateConfig:
config = await session.get(TemplateConfig, config_id)
if not config or (config.user_id != user_id and config.user_id != 0):
raise HTTPException(status_code=404, detail="Template config not found")
return config
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.get("")
async def list_configs(
provider_type: str | None = None,
@@ -53,7 +120,7 @@ async def list_configs(
if provider_type:
query = query.where(TemplateConfig.provider_type == provider_type)
result = await session.exec(query)
return [_response(c) for c in result.all()]
return [await _response(session, c) for c in result.all()]
@router.get("/variables")
@@ -180,12 +247,22 @@ async def create_config(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
data = {k: v for k, v in body.model_dump().items() if v is not None}
config = TemplateConfig(user_id=user.id, **data)
config = TemplateConfig(
user_id=user.id,
provider_type=body.provider_type,
name=body.name,
description=body.description or "",
icon=body.icon or "",
date_format=body.date_format or "%d.%m.%Y, %H:%M UTC",
date_only_format=body.date_only_format or "%d.%m.%Y",
)
session.add(config)
await session.flush() # get config.id
if body.slots:
await _save_slots(session, config.id, body.slots)
await session.commit()
await session.refresh(config)
return _response(config)
return await _response(session, config)
@router.get("/{config_id}")
@@ -194,7 +271,8 @@ async def get_config(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
return _response(await _get(session, config_id, user.id))
config = await _get(session, config_id, user.id)
return await _response(session, config)
@router.put("/{config_id}")
@@ -205,13 +283,15 @@ async def update_config(
session: AsyncSession = Depends(get_session),
):
config = await _get(session, config_id, user.id)
for field, value in body.model_dump(exclude_unset=True).items():
for field, value in body.model_dump(exclude_unset=True, exclude={"slots"}).items():
if value is not None:
setattr(config, field, value)
session.add(config)
if body.slots is not None:
await _save_slots(session, config.id, body.slots)
await session.commit()
await session.refresh(config)
return _response(config)
return await _response(session, config)
@router.delete("/{config_id}", status_code=status.HTTP_204_NO_CONTENT)
@@ -221,6 +301,12 @@ async def delete_config(
session: AsyncSession = Depends(get_session),
):
config = await _get(session, config_id, user.id)
# Delete child slots first
slot_result = await session.exec(
select(TemplateSlot).where(TemplateSlot.config_id == config.id)
)
for slot in slot_result.all():
await session.delete(slot)
await session.delete(config)
await session.commit()
@@ -234,9 +320,10 @@ async def preview_config(
):
"""Render a specific template slot with sample data."""
config = await _get(session, config_id, user.id)
template_body = getattr(config, slot, None)
if template_body is None:
raise HTTPException(status_code=400, detail=f"Unknown slot: {slot}")
slots = await _load_slots(session, config.id)
template_body = slots.get(slot, "")
if not template_body:
raise HTTPException(status_code=400, detail=f"Slot '{slot}' has no template")
try:
env = SandboxedEnvironment(autoescape=False)
tmpl = env.from_string(template_body)
@@ -320,17 +407,3 @@ async def preview_raw(
return {"rendered": None, "error": str(e), "error_line": None, "error_type": "undefined"}
except Exception as e:
return {"rendered": None, "error": str(e), "error_line": None}
def _response(c: TemplateConfig) -> dict:
return {k: getattr(c, k) for k in TemplateConfig.model_fields if k not in ("user_id", "created_at")} | {
"user_id": c.user_id,
"created_at": c.created_at.isoformat(),
}
async def _get(session: AsyncSession, config_id: int, user_id: int) -> TemplateConfig:
config = await session.get(TemplateConfig, config_id)
if not config or (config.user_id != user_id and config.user_id != 0):
raise HTTPException(status_code=404, detail="Template config not found")
return config
@@ -17,6 +17,8 @@ from ..database.engine import get_engine
from ..services import make_immich_provider
from ..database.models import (
CommandConfig,
CommandTemplateConfig,
CommandTemplateSlot,
CommandTracker,
CommandTrackerListener,
EventLog,
@@ -51,6 +53,23 @@ def _check_rate_limit(bot_id: int, chat_id: str, cmd: str, limits: dict[str, int
return None
def _render_cmd_template(
templates: dict[str, str], slot_name: str, context: dict[str, Any]
) -> str | None:
"""Try to render a command template. Returns None if no template or error."""
template_str = templates.get(slot_name)
if not template_str:
return None
try:
from jinja2.sandbox import SandboxedEnvironment
env = SandboxedEnvironment(autoescape=False)
tmpl = env.from_string(template_str)
return tmpl.render(**context)
except Exception as e:
_LOGGER.warning("Failed to render command template '%s': %s", slot_name, e)
return None
async def _resolve_command_context(
bot: TelegramBot,
) -> list[tuple[CommandTracker, CommandConfig, ServiceProvider]]:
@@ -87,7 +106,20 @@ async def _resolve_command_context(
continue
tuples.append((tracker, config, provider))
return tuples
# Load command template slots from the first config that has one
cmd_template_slots: dict[str, str] = {}
for _, config, _ in tuples:
if config.command_template_config_id:
slot_result = await session.exec(
select(CommandTemplateSlot).where(
CommandTemplateSlot.config_id == config.command_template_config_id
)
)
cmd_template_slots = {s.slot_name: s.template for s in slot_result.all()}
if cmd_template_slots:
break
return tuples, cmd_template_slots
def _merge_command_context(
@@ -125,10 +157,13 @@ async def handle_command(
if not cmd:
return None
ctx = await _resolve_command_context(bot)
enabled, locale, response_mode, default_count, rate_limits = _merge_command_context(ctx)
ctx_tuples, cmd_templates = await _resolve_command_context(bot)
enabled, locale, response_mode, default_count, rate_limits = _merge_command_context(ctx_tuples)
if cmd == "start":
result = _render_cmd_template(cmd_templates, "start", {"locale": locale, "bot_name": bot.name})
if result:
return result
msgs = {
"en": "Hi! I'm your Notify Bridge bot. Use /help to see available commands.",
"ru": "Привет! Я бот Notify Bridge. Используйте /help для списка команд.",
@@ -141,6 +176,9 @@ async def handle_command(
# Rate limit check
wait = _check_rate_limit(bot.id, chat_id, cmd, rate_limits)
if wait is not None:
result = _render_cmd_template(cmd_templates, "rate_limited", {"wait": wait, "locale": locale})
if result:
return result
msgs = {
"en": f"Please wait {wait}s before using this command again.",
"ru": f"Подождите {wait} сек. перед повторным использованием.",
@@ -151,7 +189,7 @@ async def handle_command(
# Build providers map from command context
providers_map: dict[int, ServiceProvider] = {}
for _, _, provider in ctx:
for _, _, provider in ctx_tuples:
providers_map[provider.id] = provider
# Dispatch
@@ -105,6 +105,14 @@ async def migrate_schema(engine: AsyncEngine) -> None:
)
logger.info("Added update_mode column to telegram_bot table")
# Add command_template_config_id to command_config if missing
if await _has_table(conn, "command_config"):
if not await _has_column(conn, "command_config", "command_template_config_id"):
await conn.execute(
text("ALTER TABLE command_config ADD COLUMN command_template_config_id INTEGER")
)
logger.info("Added command_template_config_id column to command_config table")
# Add date_only_format to template_config if missing
if await _has_table(conn, "template_config"):
if not await _has_column(conn, "template_config", "date_only_format"):
@@ -537,3 +545,171 @@ async def migrate_entity_refactor(engine: AsyncEngine) -> None:
# or notification_tracker_target. SQLite doesn't support DROP COLUMN in
# all versions, and SQLModel will simply ignore columns not defined on
# the model class. The columns will remain in the DB but are unused.
# ---------------------------------------------------------------------------
# Template slot migration
# ---------------------------------------------------------------------------
# Old column names that existed on template_config before the slot refactor
_LEGACY_TEMPLATE_COLUMNS = [
"message_assets_added",
"message_assets_removed",
"message_collection_renamed",
"message_collection_deleted",
"message_sharing_changed",
"periodic_summary_message",
"scheduled_assets_message",
"memory_mode_message",
]
async def migrate_template_slots(engine: AsyncEngine) -> None:
"""Migrate legacy TemplateConfig column-based templates to TemplateSlot rows.
Reads the old per-column template values via raw SQL (since they're no longer
on the SQLModel class) and inserts them as TemplateSlot rows.
Idempotent: skips if template_slot table already has data or legacy columns
don't exist.
"""
async with engine.begin() as conn:
if not await _has_table(conn, "template_config"):
return
# Check if the legacy columns still exist in the DB
has_legacy = await _has_column(conn, "template_config", "message_assets_added")
if not has_legacy:
logger.debug("No legacy template columns found — skipping slot migration")
return
# Check if template_slot table exists and already has data
if await _has_table(conn, "template_slot"):
slot_count = (await conn.execute(text("SELECT COUNT(*) FROM template_slot"))).scalar()
if slot_count and slot_count > 0:
logger.debug("template_slot table already has %d rows — skipping migration", slot_count)
return
# Create template_slot table if it doesn't exist yet
# (SQLModel.metadata.create_all may have already created it, but be safe)
if not await _has_table(conn, "template_slot"):
await conn.execute(text(
"CREATE TABLE template_slot ("
" id INTEGER PRIMARY KEY,"
" config_id INTEGER NOT NULL REFERENCES template_config(id),"
" slot_name TEXT NOT NULL,"
" template TEXT DEFAULT '',"
" UNIQUE(config_id, slot_name)"
")"
))
logger.info("Created template_slot table")
# Read all template configs with their legacy column values
col_list = ", ".join(_LEGACY_TEMPLATE_COLUMNS)
rows = (await conn.execute(
text(f"SELECT id, {col_list} FROM template_config")
)).fetchall()
migrated = 0
for row in rows:
config_id = row[0]
for i, col_name in enumerate(_LEGACY_TEMPLATE_COLUMNS):
template_text = row[i + 1] or ""
if template_text.strip():
await conn.execute(
text(
"INSERT INTO template_slot (config_id, slot_name, template) "
"VALUES (:cid, :sn, :tmpl)"
),
{"cid": config_id, "sn": col_name, "tmpl": template_text},
)
migrated += 1
if migrated:
logger.info("Migrated %d template slots from legacy columns", migrated)
# ---------------------------------------------------------------------------
# Target receiver migration
# ---------------------------------------------------------------------------
async def migrate_target_receivers(engine: AsyncEngine) -> None:
"""Migrate single chat_id/url from NotificationTarget.config to TargetReceiver rows.
For each existing target that has a chat_id or url in its config JSON and
no receivers yet, creates a TargetReceiver row.
Idempotent: skips targets that already have receivers.
"""
async with engine.begin() as conn:
if not await _has_table(conn, "notification_target"):
return
# Create target_receiver table if it doesn't exist yet
if not await _has_table(conn, "target_receiver"):
await conn.execute(text(
"CREATE TABLE target_receiver ("
" id INTEGER PRIMARY KEY,"
" target_id INTEGER NOT NULL REFERENCES notification_target(id),"
" name TEXT DEFAULT '',"
" config TEXT DEFAULT '{}',"
" receiver_key TEXT DEFAULT '',"
" enabled INTEGER DEFAULT 1,"
" created_at TIMESTAMP,"
" UNIQUE(target_id, receiver_key)"
")"
))
logger.info("Created target_receiver table")
# Check if any receivers already exist
if await _has_table(conn, "target_receiver"):
recv_count = (await conn.execute(text("SELECT COUNT(*) FROM target_receiver"))).scalar()
if recv_count and recv_count > 0:
logger.debug("target_receiver already has %d rows — skipping migration", recv_count)
return
# Read all targets
targets = (await conn.execute(
text("SELECT id, type, config FROM notification_target")
)).fetchall()
migrated = 0
for row in targets:
target_id, target_type, raw_config = row[0], row[1], row[2]
try:
cfg = json.loads(raw_config) if isinstance(raw_config, str) else (raw_config or {})
except (json.JSONDecodeError, TypeError):
cfg = {}
receiver_key = ""
receiver_config = {}
receiver_name = ""
if target_type == "telegram":
chat_id = cfg.get("chat_id", "")
if chat_id:
receiver_key = str(chat_id)
receiver_config = {"chat_id": str(chat_id)}
receiver_name = f"Chat {chat_id}"
elif target_type == "webhook":
url = cfg.get("url", "")
if url:
receiver_key = url
receiver_config = {"url": url, "headers": cfg.get("headers", {})}
receiver_name = url[:50]
if receiver_key:
await conn.execute(
text(
"INSERT INTO target_receiver (target_id, name, config, receiver_key, enabled, created_at) "
"VALUES (:tid, :name, :cfg, :rk, 1, CURRENT_TIMESTAMP)"
),
{
"tid": target_id,
"name": receiver_name,
"cfg": json.dumps(receiver_config),
"rk": receiver_key,
},
)
migrated += 1
if migrated:
logger.info("Migrated %d target receivers from legacy config", migrated)
@@ -6,7 +6,7 @@ from datetime import datetime, timezone
from typing import Any
from uuid import uuid4
from sqlalchemy import UniqueConstraint
from sqlalchemy import UniqueConstraint, Text
from sqlmodel import JSON, Column, Field, SQLModel
@@ -53,6 +53,24 @@ class TelegramBot(SQLModel, table=True):
created_at: datetime = Field(default_factory=_utcnow)
class EmailBot(SQLModel, table=True):
"""Email sender — SMTP connection for sending email notifications."""
__tablename__ = "email_bot"
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id")
name: str
icon: str = Field(default="")
email: str # From address
smtp_host: str
smtp_port: int = Field(default=587)
smtp_username: str = Field(default="")
smtp_password: str = Field(default="")
smtp_use_tls: bool = Field(default=True)
created_at: datetime = Field(default_factory=_utcnow)
class TelegramChat(SQLModel, table=True):
__tablename__ = "telegram_chat"
@@ -124,7 +142,10 @@ class TrackingConfig(SQLModel, table=True):
class TemplateConfig(SQLModel, table=True):
"""Jinja2 message templates. Tied to a provider type."""
"""Jinja2 message templates. Tied to a provider type.
Template content is stored in TemplateSlot child rows (one per slot).
"""
__tablename__ = "template_config"
@@ -135,32 +156,41 @@ class TemplateConfig(SQLModel, table=True):
description: str = Field(default="")
icon: str = Field(default="")
# Event-driven notification templates
message_assets_added: str = Field(default="")
message_assets_removed: str = Field(default="")
message_collection_renamed: str = Field(default="")
message_collection_deleted: str = Field(default="")
message_sharing_changed: str = Field(default="")
# Scheduled notification templates
periodic_summary_message: str = Field(default="")
scheduled_assets_message: str = Field(default="")
memory_mode_message: str = Field(default="")
date_format: str = Field(default="%d.%m.%Y, %H:%M UTC")
date_only_format: str = Field(default="%d.%m.%Y")
created_at: datetime = Field(default_factory=_utcnow)
class TemplateSlot(SQLModel, table=True):
"""One Jinja2 template for a specific slot within a TemplateConfig.
Slot names are provider-specific (e.g. 'message_assets_added' for Immich).
"""
__tablename__ = "template_slot"
__table_args__ = (
UniqueConstraint("config_id", "slot_name", name="uq_template_slot"),
)
id: int | None = Field(default=None, primary_key=True)
config_id: int = Field(foreign_key="template_config.id", index=True)
slot_name: str
template: str = Field(default="", sa_column=Column(Text, default=""))
class NotificationTarget(SQLModel, table=True):
"""Where to send notifications. Pure delivery endpoint."""
"""Where to send notifications. Pure delivery endpoint.
Target-level config holds connection/display settings (e.g. bot_token,
disable_url_preview). Actual delivery endpoints live in TargetReceiver rows.
"""
__tablename__ = "notification_target"
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id")
type: str # "telegram" or "webhook"
type: str # "telegram", "webhook", or "email"
name: str
icon: str = Field(default="")
config: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
@@ -168,6 +198,28 @@ class NotificationTarget(SQLModel, table=True):
created_at: datetime = Field(default_factory=_utcnow)
class TargetReceiver(SQLModel, table=True):
"""One delivery endpoint within a NotificationTarget (broadcast support).
For Telegram: config = {"chat_id": "12345"}
For Webhook: config = {"url": "https://...", "headers": {...}}
For Email: config = {"email": "user@example.com", "name": "..."}
"""
__tablename__ = "target_receiver"
__table_args__ = (
UniqueConstraint("target_id", "receiver_key", name="uq_target_receiver"),
)
id: int | None = Field(default=None, primary_key=True)
target_id: int = Field(foreign_key="notification_target.id", index=True)
name: str = Field(default="")
config: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
receiver_key: str = Field(default="") # dedup key (e.g. chat_id, url, email)
enabled: bool = Field(default=True)
created_at: datetime = Field(default_factory=_utcnow)
class NotificationTracker(SQLModel, table=True):
"""Watches a provider's collections for changes."""
@@ -246,9 +298,43 @@ class CommandConfig(SQLModel, table=True):
response_mode: str = Field(default="media") # "media" or "text"
default_count: int = Field(default=5)
rate_limits: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
command_template_config_id: int | None = Field(
default=None, foreign_key="command_template_config.id"
)
created_at: datetime = Field(default_factory=_utcnow)
class CommandTemplateConfig(SQLModel, table=True):
"""Jinja2 templates for command responses. Provider-specific via slots."""
__tablename__ = "command_template_config"
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(default=0) # 0 = system-owned
provider_type: str
name: str
description: str = Field(default="")
icon: str = Field(default="")
created_at: datetime = Field(default_factory=_utcnow)
class CommandTemplateSlot(SQLModel, table=True):
"""One Jinja2 template for a specific command response slot.
Slot names match command names (e.g. 'status', 'help', 'albums').
"""
__tablename__ = "command_template_slot"
__table_args__ = (
UniqueConstraint("config_id", "slot_name", name="uq_command_template_slot"),
)
id: int | None = Field(default=None, primary_key=True)
config_id: int = Field(foreign_key="command_template_config.id", index=True)
slot_name: str
template: str = Field(default="", sa_column=Column(Text, default=""))
class CommandTracker(SQLModel, table=True):
"""Links a provider to a command config for interactive bot commands."""
@@ -20,13 +20,16 @@ from .api.notification_tracker_targets import router as notification_tracker_tar
from .api.tracking_configs import router as tracking_configs_router
from .api.template_configs import router as template_configs_router
from .api.targets import router as targets_router
from .api.target_receivers import router as target_receivers_router
from .api.telegram_bots import router as telegram_bots_router
from .api.email_bots import router as email_bots_router
from .api.users import router as users_router
from .api.status import router as status_router
from .api.template_vars import router as template_vars_router
from .api.app_settings import router as app_settings_router
from .api.command_configs import router as command_configs_router
from .api.command_trackers import router as command_trackers_router
from .api.command_template_configs import router as command_template_configs_router
from .commands.webhook import router as webhook_router, set_webhook_secret
@@ -35,11 +38,13 @@ async def lifespan(app: FastAPI):
await init_db()
# Run data migrations (idempotent)
from .database.engine import get_engine
from .database.migrations import migrate_schema, migrate_tracker_targets, migrate_entity_refactor
from .database.migrations import migrate_schema, migrate_tracker_targets, migrate_entity_refactor, migrate_template_slots, migrate_target_receivers
engine = get_engine()
await migrate_schema(engine)
await migrate_tracker_targets(engine)
await migrate_entity_refactor(engine)
await migrate_template_slots(engine)
await migrate_target_receivers(engine)
await _seed_default_templates()
# Configure webhook secret from DB setting (falls back to env var)
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
@@ -63,12 +68,15 @@ app.include_router(notification_tracker_targets_router)
app.include_router(tracking_configs_router)
app.include_router(template_configs_router)
app.include_router(targets_router)
app.include_router(target_receivers_router)
app.include_router(telegram_bots_router)
app.include_router(email_bots_router)
app.include_router(users_router)
app.include_router(status_router)
app.include_router(app_settings_router)
app.include_router(command_configs_router)
app.include_router(command_trackers_router)
app.include_router(command_template_configs_router)
app.include_router(webhook_router)
@@ -78,11 +86,14 @@ async def health():
async def _seed_default_templates():
"""Seed or update default (system-owned) templates on startup."""
"""Seed or update default (system-owned) templates on startup.
Uses TemplateSlot child rows for template content.
"""
from sqlmodel import func, select
from sqlmodel.ext.asyncio.session import AsyncSession
from .database.engine import get_engine
from .database.models import TemplateConfig
from .database.models import TemplateConfig, TemplateSlot
from notify_bridge_core.templates.defaults import load_default_templates
engine = get_engine()
@@ -102,9 +113,15 @@ async def _seed_default_templates():
provider_type="immich",
name=name,
description=f"Default Immich templates ({locale.upper()})",
**slots,
)
session.add(config)
await session.flush() # get config.id
for slot_name, template_text in slots.items():
session.add(TemplateSlot(
config_id=config.id,
slot_name=slot_name,
template=template_text,
))
else:
# Update existing system-owned templates from files
result = await session.exec(
@@ -116,9 +133,24 @@ async def _seed_default_templates():
slots = load_default_templates(locale)
if not slots:
continue
for key, value in slots.items():
setattr(config, key, value)
session.add(config)
for slot_name, template_text in slots.items():
# Upsert: find existing slot or create new
slot_result = await session.exec(
select(TemplateSlot).where(
TemplateSlot.config_id == config.id,
TemplateSlot.slot_name == slot_name,
)
)
existing = slot_result.first()
if existing:
existing.template = template_text
session.add(existing)
else:
session.add(TemplateSlot(
config_id=config.id,
slot_name=slot_name,
template=template_text,
))
await session.commit()
@@ -5,7 +5,11 @@ from typing import Any
import aiohttp
from ..database.models import NotificationTarget
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import NotificationTarget, TargetReceiver
_LOGGER = logging.getLogger(__name__)
@@ -26,52 +30,162 @@ def _get_test_message(locale: str, target_type: str) -> str:
return msgs.get(target_type, msgs.get("webhook", "Test"))
async def _load_receivers(target_id: int) -> list[dict]:
"""Load enabled receivers for a target from DB."""
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target_id,
TargetReceiver.enabled == True,
)
)
return [dict(r.config) for r in result.all()]
async def send_to_target(target: NotificationTarget, message: str) -> dict:
"""Send a message to a target, respecting all target config settings.
"""Send a message to a target, broadcasting to all receivers.
This is the SINGLE send path used by dispatch, test, and real-data notifications.
"""
try:
receivers = await _load_receivers(target.id)
if target.type == "telegram":
return await _send_telegram(target, message)
return await _send_telegram_broadcast(target, message, receivers)
elif target.type == "webhook":
return await _send_webhook(target, message)
return await _send_webhook_broadcast(target, message, receivers)
elif target.type == "email":
return await _send_email_broadcast(target, message, receivers)
return {"success": False, "error": f"Unknown target type: {target.type}"}
except Exception as e:
_LOGGER.error("Send failed: %s", e)
return {"success": False, "error": str(e)}
async def _send_telegram(target: NotificationTarget, message: str) -> dict:
async def _send_telegram_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.telegram.client import TelegramClient
bot_token = target.config.get("bot_token")
chat_id = target.config.get("chat_id")
disable_preview = target.config.get("disable_url_preview", False)
if not bot_token or not chat_id:
return {"success": False, "error": "Missing bot_token or chat_id"}
if not bot_token:
return {"success": False, "error": "Missing bot_token"}
# Fall back to legacy chat_id if no receivers
if not receivers:
chat_id = target.config.get("chat_id")
if chat_id:
receivers = [{"chat_id": str(chat_id)}]
else:
return {"success": False, "error": "No receivers configured"}
results: list[dict] = []
async with aiohttp.ClientSession() as session:
client = TelegramClient(session, bot_token)
return await client.send_message(
chat_id=str(chat_id),
text=message,
disable_web_page_preview=bool(disable_preview),
)
for recv in receivers:
chat_id = recv.get("chat_id")
if not chat_id:
continue
result = await client.send_message(
chat_id=str(chat_id),
text=message,
disable_web_page_preview=bool(disable_preview),
)
results.append(result)
successes = sum(1 for r in results if r.get("success"))
if successes == len(results) and results:
return {"success": True, "receivers": len(results)}
elif successes > 0:
return {"success": True, "receivers": len(results), "partial_failures": len(results) - successes}
elif results:
return results[0]
return {"success": False, "error": "No valid receivers"}
async def _send_webhook(target: NotificationTarget, message: str, event_type: str = "notification") -> dict:
async def _send_webhook_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.webhook.client import WebhookClient
url = target.config.get("url")
headers = target.config.get("headers", {})
if not url:
return {"success": False, "error": "Missing url in target config"}
# Fall back to legacy url if no receivers
if not receivers:
url = target.config.get("url")
headers = target.config.get("headers", {})
if url:
receivers = [{"url": url, "headers": headers}]
else:
return {"success": False, "error": "No receivers configured"}
results: list[dict] = []
async with aiohttp.ClientSession() as session:
client = WebhookClient(session, url, headers)
return await client.send({"message": message, "event_type": event_type})
for recv in receivers:
url = recv.get("url")
headers = recv.get("headers", {})
if not url:
continue
client = WebhookClient(session, url, headers)
results.append(await client.send({"message": message, "event_type": "notification"}))
successes = sum(1 for r in results if r.get("success"))
if successes == len(results) and results:
return {"success": True, "receivers": len(results)}
elif successes > 0:
return {"success": True, "receivers": len(results), "partial_failures": len(results) - successes}
elif results:
return results[0]
return {"success": False, "error": "No valid receivers"}
async def _send_email_broadcast(target: NotificationTarget, message: str, receivers: list[dict]) -> dict:
from notify_bridge_core.notifications.email.client import EmailClient, SmtpConfig
from ..database.models import EmailBot
email_bot_id = target.config.get("email_bot_id")
if not email_bot_id:
return {"success": False, "error": "No email bot configured for this target"}
engine = get_engine()
async with AsyncSession(engine) as session:
email_bot = await session.get(EmailBot, email_bot_id)
if not email_bot:
return {"success": False, "error": "Email bot not found"}
smtp_cfg = SmtpConfig(
host=email_bot.smtp_host,
port=email_bot.smtp_port,
username=email_bot.smtp_username,
password=email_bot.smtp_password,
from_address=email_bot.email,
from_name=email_bot.name,
use_tls=email_bot.smtp_use_tls,
)
if not smtp_cfg.host or not smtp_cfg.from_address:
return {"success": False, "error": "Email bot SMTP not configured"}
if not receivers:
return {"success": False, "error": "No email receivers configured"}
client = EmailClient(smtp_cfg)
results: list[dict] = []
for recv in receivers:
email = recv.get("email")
if not email:
continue
result = await client.send(
to_email=email,
subject="Notification from Notify Bridge",
body_text=message,
to_name=recv.get("name", ""),
)
results.append(result)
successes = sum(1 for r in results if r.get("success"))
if successes == len(results) and results:
return {"success": True, "receivers": len(results)}
elif successes > 0:
return {"success": True, "receivers": len(results), "partial_failures": len(results) - successes}
elif results:
return results[0]
return {"success": False, "error": "No valid email receivers"}
# --- Public API used by routes ---
@@ -17,13 +17,16 @@ from notify_bridge_core.storage import JsonFileBackend
from ..database.engine import get_engine
from ..database.models import (
EmailBot,
EventLog,
NotificationTarget,
NotificationTracker,
NotificationTrackerState,
NotificationTrackerTarget,
ServiceProvider,
TargetReceiver,
TemplateConfig,
TemplateSlot,
TrackingConfig,
)
@@ -129,19 +132,59 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
if not target:
continue
# Load receivers for this target
recv_result = await session.exec(
select(TargetReceiver).where(
TargetReceiver.target_id == target.id,
TargetReceiver.enabled == True,
)
)
receivers = [dict(r.config) for r in recv_result.all()]
tracking_config = None
if tt.tracking_config_id:
tracking_config = await session.get(TrackingConfig, tt.tracking_config_id)
template_config = None
template_slots: dict[str, str] | None = None
if tt.template_config_id:
template_config = await session.get(TemplateConfig, tt.template_config_id)
if template_config:
slot_result = await session.exec(
select(TemplateSlot).where(TemplateSlot.config_id == template_config.id)
)
raw_slots = {s.slot_name: s.template for s in slot_result.all()}
# Map slot names to event_type values for dispatcher lookup
template_slots = {}
for slot_name, tmpl_text in raw_slots.items():
# Strip "message_" prefix for event-type slots
event_key = slot_name.removeprefix("message_") if slot_name.startswith("message_") else slot_name
template_slots[event_key] = tmpl_text
target_config = dict(target.config)
# Inject SMTP config for email targets from EmailBot
if target.type == "email":
email_bot_id = target.config.get("email_bot_id")
if email_bot_id:
email_bot = await session.get(EmailBot, email_bot_id)
if email_bot:
target_config["smtp"] = {
"host": email_bot.smtp_host,
"port": email_bot.smtp_port,
"username": email_bot.smtp_username,
"password": email_bot.smtp_password,
"from_address": email_bot.email,
"from_name": email_bot.name,
"use_tls": email_bot.smtp_use_tls,
}
link_data.append({
"target_type": target.type,
"target_config": dict(target.config),
"target_config": target_config,
"receivers": receivers,
"tracking_config": tracking_config,
"template_config": template_config,
"template_slots": template_slots,
})
# Snapshot the data we need
@@ -249,26 +292,17 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
_LOGGER.info(" Skipped by tracking config filter")
continue
# Build template slots from template config
tmpl = ld["template_config"]
slots = None
if tmpl:
slots = {
"assets_added": tmpl.message_assets_added,
"assets_removed": tmpl.message_assets_removed,
"collection_renamed": tmpl.message_collection_renamed,
"collection_deleted": tmpl.message_collection_deleted,
"sharing_changed": tmpl.message_sharing_changed,
}
target_configs.append(TargetConfig(
type=ld["target_type"],
config=ld["target_config"],
template_slots=slots,
template_slots=ld["template_slots"],
date_format=tmpl.date_format if tmpl else "%d.%m.%Y, %H:%M UTC",
date_only_format=tmpl.date_only_format if tmpl and tmpl.date_only_format else "%d.%m.%Y",
provider_api_key=provider_config.get("api_key"),
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", ""),
receivers=ld["receivers"],
))
if target_configs: