feat: broadcast notification target + UX improvements
Add broadcast target type that fans out notifications to multiple child targets. Dispatch expands broadcast into children in load_link_data() — dispatcher stays unaware. Children can be toggled on/off via disabled_child_ids in config. Also: dashboard provider card smaller font for names, scroll-to-form on target edit, broadcast nav tab with counter, flag_modified fix for JSON column updates, CLAUDE.md nav tree docs.
This commit is contained in:
@@ -111,8 +111,11 @@ async def list_targets(
|
||||
if lang:
|
||||
chat_languages[f"{bot_id}_{chat_id}"] = lang
|
||||
|
||||
# Build lookup for broadcast child target resolution
|
||||
target_map = {t.id: t for t in targets}
|
||||
|
||||
return [
|
||||
_target_response(t, chat_names, target_receivers.get(t.id, []), chat_languages)
|
||||
_target_response(t, chat_names, target_receivers.get(t.id, []), chat_languages, target_map)
|
||||
for t in targets
|
||||
]
|
||||
|
||||
@@ -124,15 +127,24 @@ async def create_target(
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""Create a new notification target."""
|
||||
valid_types = ("telegram", "webhook", "email", "discord", "slack", "ntfy", "matrix")
|
||||
valid_types = ("telegram", "webhook", "email", "discord", "slack", "ntfy", "matrix", "broadcast")
|
||||
if body.type not in valid_types:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Type must be one of: {', '.join(valid_types)}",
|
||||
)
|
||||
|
||||
# Extract delivery fields from config — they go into a TargetReceiver
|
||||
clean_config, receiver_cfg = _extract_delivery_fields(body.type, body.config)
|
||||
# Broadcast-specific validation
|
||||
if body.type == "broadcast":
|
||||
child_ids = body.config.get("child_target_ids", [])
|
||||
if not isinstance(child_ids, list):
|
||||
raise HTTPException(status_code=400, detail="child_target_ids must be a list")
|
||||
await _validate_broadcast_children(session, child_ids, user.id, exclude_target_id=None)
|
||||
clean_config = {"child_target_ids": child_ids}
|
||||
receiver_cfg: dict[str, Any] = {}
|
||||
else:
|
||||
# Extract delivery fields from config — they go into a TargetReceiver
|
||||
clean_config, receiver_cfg = _extract_delivery_fields(body.type, body.config)
|
||||
|
||||
target = NotificationTarget(
|
||||
user_id=user.id,
|
||||
@@ -190,34 +202,46 @@ async def update_target(
|
||||
|
||||
# If config is being updated, extract any delivery fields
|
||||
if "config" in updates and updates["config"] is not None:
|
||||
clean_config, receiver_cfg = _extract_delivery_fields(target.type, updates["config"])
|
||||
updates["config"] = clean_config
|
||||
if target.type == "broadcast":
|
||||
child_ids = updates["config"].get("child_target_ids", [])
|
||||
if not isinstance(child_ids, list):
|
||||
raise HTTPException(status_code=400, detail="child_target_ids must be a list")
|
||||
await _validate_broadcast_children(session, child_ids, user.id, exclude_target_id=target.id)
|
||||
disabled_ids = updates["config"].get("disabled_child_ids", [])
|
||||
updates["config"] = {"child_target_ids": child_ids, "disabled_child_ids": disabled_ids}
|
||||
else:
|
||||
clean_config, receiver_cfg = _extract_delivery_fields(target.type, updates["config"])
|
||||
updates["config"] = clean_config
|
||||
|
||||
# Update or create receiver if delivery fields were present
|
||||
if receiver_cfg:
|
||||
key = _receiver_key(target.type, receiver_cfg)
|
||||
existing_result = await session.exec(
|
||||
select(TargetReceiver).where(
|
||||
TargetReceiver.target_id == target.id,
|
||||
TargetReceiver.receiver_key == key,
|
||||
# Update or create receiver if delivery fields were present
|
||||
if receiver_cfg:
|
||||
key = _receiver_key(target.type, receiver_cfg)
|
||||
existing_result = await session.exec(
|
||||
select(TargetReceiver).where(
|
||||
TargetReceiver.target_id == target.id,
|
||||
TargetReceiver.receiver_key == key,
|
||||
)
|
||||
)
|
||||
)
|
||||
existing_recv = existing_result.first()
|
||||
if existing_recv:
|
||||
existing_recv.config = receiver_cfg
|
||||
session.add(existing_recv)
|
||||
else:
|
||||
receiver = TargetReceiver(
|
||||
target_id=target.id,
|
||||
name=target.name,
|
||||
config=receiver_cfg,
|
||||
receiver_key=key,
|
||||
enabled=True,
|
||||
)
|
||||
session.add(receiver)
|
||||
existing_recv = existing_result.first()
|
||||
if existing_recv:
|
||||
existing_recv.config = receiver_cfg
|
||||
session.add(existing_recv)
|
||||
else:
|
||||
receiver = TargetReceiver(
|
||||
target_id=target.id,
|
||||
name=target.name,
|
||||
config=receiver_cfg,
|
||||
receiver_key=key,
|
||||
enabled=True,
|
||||
)
|
||||
session.add(receiver)
|
||||
|
||||
for field_name, value in updates.items():
|
||||
setattr(target, field_name, value)
|
||||
# Force SQLAlchemy to detect JSON column change
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
if "config" in updates:
|
||||
flag_modified(target, "config")
|
||||
session.add(target)
|
||||
await session.commit()
|
||||
await session.refresh(target)
|
||||
@@ -257,11 +281,37 @@ async def test_target(
|
||||
return result
|
||||
|
||||
|
||||
async def _validate_broadcast_children(
|
||||
session: AsyncSession,
|
||||
child_ids: list[int],
|
||||
user_id: int,
|
||||
*,
|
||||
exclude_target_id: int | None = None,
|
||||
) -> None:
|
||||
"""Validate broadcast child target IDs.
|
||||
|
||||
- All IDs must exist and belong to the user
|
||||
- No broadcast targets allowed as children (prevents circular refs)
|
||||
- Cannot reference itself
|
||||
"""
|
||||
if not child_ids:
|
||||
return
|
||||
if exclude_target_id and exclude_target_id in child_ids:
|
||||
raise HTTPException(status_code=400, detail="A broadcast target cannot include itself")
|
||||
for child_id in child_ids:
|
||||
child = await session.get(NotificationTarget, child_id)
|
||||
if not child or child.user_id != user_id:
|
||||
raise HTTPException(status_code=400, detail=f"Child target {child_id} not found")
|
||||
if child.type == "broadcast":
|
||||
raise HTTPException(status_code=400, detail="A broadcast target cannot include another broadcast")
|
||||
|
||||
|
||||
def _target_response(
|
||||
target: NotificationTarget,
|
||||
chat_names: dict[str, str] | None = None,
|
||||
receivers: list[TargetReceiver] | None = None,
|
||||
chat_languages: dict[str, str] | None = None,
|
||||
target_map: dict[int, NotificationTarget] | None = None,
|
||||
) -> dict:
|
||||
recv_list = receivers or []
|
||||
resp = {
|
||||
@@ -295,6 +345,14 @@ def _target_response(
|
||||
recv_resp["chat_name"] = chat_names[key]
|
||||
if chat_languages and key in chat_languages:
|
||||
recv_resp["language_code"] = chat_languages[key]
|
||||
# Attach child target summaries for broadcast targets
|
||||
if target.type == "broadcast" and target_map:
|
||||
child_ids = target.config.get("child_target_ids", [])
|
||||
resp["child_targets"] = [
|
||||
{"id": cid, "name": target_map[cid].name, "type": target_map[cid].type, "icon": target_map[cid].icon}
|
||||
for cid in child_ids if cid in target_map
|
||||
]
|
||||
resp["receiver_count"] = len(resp["child_targets"])
|
||||
return resp
|
||||
|
||||
|
||||
|
||||
@@ -96,6 +96,88 @@ def event_allowed_by_config(event: ServiceEvent, tc: TrackingConfig) -> bool:
|
||||
return flag_map.get(event_type, True)
|
||||
|
||||
|
||||
async def _resolve_target(
|
||||
session: AsyncSession,
|
||||
target: NotificationTarget,
|
||||
) -> dict[str, Any]:
|
||||
"""Resolve a single target into dispatch-ready data (config + receivers + credentials).
|
||||
|
||||
Returns a dict with target_type, target_config, and receivers.
|
||||
Does NOT include tracking_config or template_slots — those come from the tracker link.
|
||||
"""
|
||||
# Load receivers as typed Receiver objects
|
||||
recv_result = await session.exec(
|
||||
select(TargetReceiver).where(
|
||||
TargetReceiver.target_id == target.id,
|
||||
TargetReceiver.enabled == True,
|
||||
)
|
||||
)
|
||||
recv_rows = recv_result.all()
|
||||
|
||||
# For Telegram targets, resolve locale from TelegramChat
|
||||
chat_locale_map: dict[str, str] = {}
|
||||
if target.type == "telegram":
|
||||
bot_id = target.config.get("bot_id")
|
||||
if bot_id:
|
||||
chat_ids = [str(r.config.get("chat_id", "")) for r in recv_rows if r.config.get("chat_id")]
|
||||
if chat_ids:
|
||||
chat_result = await session.exec(
|
||||
select(TelegramChat).where(
|
||||
TelegramChat.bot_id == bot_id,
|
||||
TelegramChat.chat_id.in_(chat_ids),
|
||||
)
|
||||
)
|
||||
for chat in chat_result.all():
|
||||
resolved = (
|
||||
getattr(chat, 'language_override', '') or
|
||||
getattr(chat, 'language_code', '') or ''
|
||||
)
|
||||
if resolved:
|
||||
chat_locale_map[chat.chat_id] = resolved[:2].lower()
|
||||
|
||||
receivers: list[Receiver] = []
|
||||
for r in recv_rows:
|
||||
explicit_locale = getattr(r, 'locale', '') or ''
|
||||
locale = explicit_locale
|
||||
if not locale and target.type == "telegram":
|
||||
chat_id = str(r.config.get("chat_id", ""))
|
||||
locale = chat_locale_map.get(chat_id, "")
|
||||
receivers.append(build_receiver(target.type, dict(r.config), locale))
|
||||
|
||||
target_config = dict(target.config)
|
||||
# Inject chat_action for Telegram targets
|
||||
if hasattr(target, 'chat_action') and target.chat_action:
|
||||
target_config["chat_action"] = target.chat_action
|
||||
# Inject bot credentials for bot-backed target types
|
||||
if target.type == "email":
|
||||
email_bot_id = target.config.get("email_bot_id")
|
||||
if email_bot_id:
|
||||
email_bot = await session.get(EmailBot, email_bot_id)
|
||||
if email_bot:
|
||||
target_config["smtp"] = {
|
||||
"host": email_bot.smtp_host,
|
||||
"port": email_bot.smtp_port,
|
||||
"username": email_bot.smtp_username,
|
||||
"password": email_bot.smtp_password,
|
||||
"from_address": email_bot.email,
|
||||
"from_name": email_bot.name,
|
||||
"use_tls": email_bot.smtp_use_tls,
|
||||
}
|
||||
elif target.type == "matrix":
|
||||
matrix_bot_id = target.config.get("matrix_bot_id")
|
||||
if matrix_bot_id:
|
||||
matrix_bot = await session.get(MatrixBot, matrix_bot_id)
|
||||
if matrix_bot:
|
||||
target_config["homeserver_url"] = matrix_bot.homeserver_url
|
||||
target_config["access_token"] = matrix_bot.access_token
|
||||
|
||||
return {
|
||||
"target_type": target.type,
|
||||
"target_config": target_config,
|
||||
"receivers": receivers,
|
||||
}
|
||||
|
||||
|
||||
async def load_link_data(
|
||||
session: AsyncSession,
|
||||
tracker_id: int,
|
||||
@@ -127,45 +209,7 @@ async def load_link_data(
|
||||
if not target:
|
||||
continue
|
||||
|
||||
# Load receivers as typed Receiver objects
|
||||
recv_result = await session.exec(
|
||||
select(TargetReceiver).where(
|
||||
TargetReceiver.target_id == target.id,
|
||||
TargetReceiver.enabled == True,
|
||||
)
|
||||
)
|
||||
recv_rows = recv_result.all()
|
||||
|
||||
# For Telegram targets, resolve locale from TelegramChat
|
||||
chat_locale_map: dict[str, str] = {}
|
||||
if target.type == "telegram":
|
||||
bot_id = target.config.get("bot_id")
|
||||
if bot_id:
|
||||
chat_ids = [str(r.config.get("chat_id", "")) for r in recv_rows if r.config.get("chat_id")]
|
||||
if chat_ids:
|
||||
chat_result = await session.exec(
|
||||
select(TelegramChat).where(
|
||||
TelegramChat.bot_id == bot_id,
|
||||
TelegramChat.chat_id.in_(chat_ids),
|
||||
)
|
||||
)
|
||||
for chat in chat_result.all():
|
||||
resolved = (
|
||||
getattr(chat, 'language_override', '') or
|
||||
getattr(chat, 'language_code', '') or ''
|
||||
)
|
||||
if resolved:
|
||||
chat_locale_map[chat.chat_id] = resolved[:2].lower()
|
||||
|
||||
receivers: list[Receiver] = []
|
||||
for r in recv_rows:
|
||||
explicit_locale = getattr(r, 'locale', '') or ''
|
||||
locale = explicit_locale
|
||||
if not locale and target.type == "telegram":
|
||||
chat_id = str(r.config.get("chat_id", ""))
|
||||
locale = chat_locale_map.get(chat_id, "")
|
||||
receivers.append(build_receiver(target.type, dict(r.config), locale))
|
||||
|
||||
# Load tracking config and template slots (shared across broadcast children)
|
||||
tracking_config = None
|
||||
if tt.tracking_config_id:
|
||||
tracking_config = await session.get(TrackingConfig, tt.tracking_config_id)
|
||||
@@ -184,37 +228,29 @@ async def load_link_data(
|
||||
raw_slots.setdefault(event_key, {})[s.locale] = s.template
|
||||
template_slots = raw_slots
|
||||
|
||||
target_config = dict(target.config)
|
||||
# Inject chat_action for Telegram targets
|
||||
if hasattr(target, 'chat_action') and target.chat_action:
|
||||
target_config["chat_action"] = target.chat_action
|
||||
# Inject bot credentials for bot-backed target types
|
||||
if target.type == "email":
|
||||
email_bot_id = target.config.get("email_bot_id")
|
||||
if email_bot_id:
|
||||
email_bot = await session.get(EmailBot, email_bot_id)
|
||||
if email_bot:
|
||||
target_config["smtp"] = {
|
||||
"host": email_bot.smtp_host,
|
||||
"port": email_bot.smtp_port,
|
||||
"username": email_bot.smtp_username,
|
||||
"password": email_bot.smtp_password,
|
||||
"from_address": email_bot.email,
|
||||
"from_name": email_bot.name,
|
||||
"use_tls": email_bot.smtp_use_tls,
|
||||
}
|
||||
elif target.type == "matrix":
|
||||
matrix_bot_id = target.config.get("matrix_bot_id")
|
||||
if matrix_bot_id:
|
||||
matrix_bot = await session.get(MatrixBot, matrix_bot_id)
|
||||
if matrix_bot:
|
||||
target_config["homeserver_url"] = matrix_bot.homeserver_url
|
||||
target_config["access_token"] = matrix_bot.access_token
|
||||
# Broadcast target: expand into child targets
|
||||
if target.type == "broadcast":
|
||||
child_ids = target.config.get("child_target_ids", [])
|
||||
disabled_ids = set(target.config.get("disabled_child_ids", []))
|
||||
for child_id in child_ids:
|
||||
if child_id in disabled_ids:
|
||||
continue
|
||||
child_target = await session.get(NotificationTarget, child_id)
|
||||
if not child_target or child_target.type == "broadcast":
|
||||
continue
|
||||
resolved = await _resolve_target(session, child_target)
|
||||
link_data.append({
|
||||
**resolved,
|
||||
"tracking_config": tracking_config,
|
||||
"template_config": template_config,
|
||||
"template_slots": template_slots,
|
||||
})
|
||||
continue
|
||||
|
||||
# Regular target
|
||||
resolved = await _resolve_target(session, target)
|
||||
link_data.append({
|
||||
"target_type": target.type,
|
||||
"target_config": target_config,
|
||||
"receivers": receivers,
|
||||
**resolved,
|
||||
"tracking_config": tracking_config,
|
||||
"template_config": template_config,
|
||||
"template_slots": template_slots,
|
||||
|
||||
@@ -309,11 +309,35 @@ async def send_to_receiver(target: NotificationTarget, receiver_config: dict, me
|
||||
|
||||
|
||||
async def send_test_notification(target: NotificationTarget, locale: str = "en") -> dict:
|
||||
"""Send a simple test message."""
|
||||
"""Send a simple test message. For broadcast targets, fans out to all children."""
|
||||
if target.type == "broadcast":
|
||||
return await _send_broadcast_test(target, locale)
|
||||
message = _get_test_message(locale, target.type)
|
||||
return await send_to_target(target, message)
|
||||
|
||||
|
||||
async def _send_broadcast_test(target: NotificationTarget, locale: str) -> dict:
|
||||
"""Send test notifications to all child targets of a broadcast target."""
|
||||
child_ids = target.config.get("child_target_ids", [])
|
||||
if not child_ids:
|
||||
return {"success": False, "error": "No child targets configured"}
|
||||
|
||||
disabled_ids = set(target.config.get("disabled_child_ids", []))
|
||||
engine = get_engine()
|
||||
results: list[dict] = []
|
||||
async with AsyncSession(engine) as session:
|
||||
for child_id in child_ids:
|
||||
if child_id in disabled_ids:
|
||||
continue
|
||||
child = await session.get(NotificationTarget, child_id)
|
||||
if not child or child.type == "broadcast":
|
||||
continue
|
||||
message = _get_test_message(locale, child.type)
|
||||
results.append(await send_to_target(child, message))
|
||||
|
||||
return _aggregate(results)
|
||||
|
||||
|
||||
async def send_test_template_notification(
|
||||
target: NotificationTarget, slot: str, template_str: str
|
||||
) -> dict:
|
||||
|
||||
Reference in New Issue
Block a user