"""Notification target management API routes.""" import logging from fastapi import APIRouter, Depends, HTTPException, Query, status from pydantic import BaseModel from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from typing import Any from ..auth.dependencies import get_current_user from ..database.engine import get_session from ..database.models import NotificationTarget, NotificationTrackerTarget, TelegramBot, TelegramChat, User from ..services.notifier import send_test_notification _LOGGER = logging.getLogger(__name__) router = APIRouter(prefix="/api/targets", tags=["targets"]) class TargetCreate(BaseModel): type: str # "telegram" or "webhook" name: str icon: str = "" config: dict[str, Any] = {} chat_action: str | None = None class TargetUpdate(BaseModel): name: str | None = None icon: str | None = None config: dict[str, Any] | None = None chat_action: str | None = None @router.get("") async def list_targets( user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """List all notification targets for the current user.""" result = await session.exec( select(NotificationTarget).where(NotificationTarget.user_id == user.id) ) targets = result.all() # Resolve chat names for telegram targets chat_names: dict[str, str] = {} for tgt in targets: if tgt.type == "telegram" and tgt.config.get("chat_id"): bot_id = tgt.config.get("bot_id") chat_id = str(tgt.config["chat_id"]) if bot_id: chat_result = await session.exec( select(TelegramChat).where( TelegramChat.bot_id == bot_id, TelegramChat.chat_id == chat_id, ) ) chat = chat_result.first() if chat: chat_names[f"{bot_id}_{chat_id}"] = chat.title or chat.username or "" return [_target_response(t, chat_names) for t in targets] @router.post("", status_code=status.HTTP_201_CREATED) async def create_target( body: TargetCreate, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Create a new notification target.""" if body.type not in ("telegram", "webhook"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Type must be 'telegram' or 'webhook'", ) target = NotificationTarget( user_id=user.id, type=body.type, name=body.name, icon=body.icon, config=body.config, chat_action=body.chat_action, ) session.add(target) await session.commit() await session.refresh(target) return {"id": target.id, "type": target.type, "name": target.name} @router.get("/{target_id}") async def get_target( target_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Get a specific notification target.""" target = await _get_user_target(session, target_id, user.id) return _target_response(target) @router.put("/{target_id}") async def update_target( target_id: int, body: TargetUpdate, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Update a notification target.""" target = await _get_user_target(session, target_id, user.id) for field, value in body.model_dump(exclude_unset=True).items(): setattr(target, field, value) session.add(target) await session.commit() await session.refresh(target) return {"id": target.id, "type": target.type, "name": target.name} @router.delete("/{target_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_target( target_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Delete a notification target and its tracker links.""" target = await _get_user_target(session, target_id, user.id) # Delete associated tracker-target links result = await session.exec( select(NotificationTrackerTarget).where(NotificationTrackerTarget.target_id == target_id) ) for tt in result.all(): await session.delete(tt) await session.delete(target) await session.commit() @router.post("/{target_id}/test") async def test_target( target_id: int, locale: str = Query("en"), user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Send a test notification to a target.""" target = await _get_user_target(session, target_id, user.id) result = await send_test_notification(target, locale=locale) return result def _target_response(target: NotificationTarget, chat_names: dict[str, str] | None = None) -> dict: resp = { "id": target.id, "type": target.type, "name": target.name, "icon": target.icon, "config": _safe_config(target), "chat_action": target.chat_action, "created_at": target.created_at.isoformat(), } # Attach resolved chat name for telegram targets if target.type == "telegram" and chat_names: bot_id = target.config.get("bot_id") chat_id = str(target.config.get("chat_id", "")) key = f"{bot_id}_{chat_id}" if key in chat_names: resp["chat_name"] = chat_names[key] return resp def _safe_config(target: NotificationTarget) -> dict: """Return config with sensitive fields masked.""" config = dict(target.config) if "bot_token" in config: token = config["bot_token"] config["bot_token"] = f"{token[:8]}...{token[-4:]}" if len(token) > 12 else "***" if "api_key" in config: config["api_key"] = "***" return config async def _get_user_target( session: AsyncSession, target_id: int, user_id: int ) -> NotificationTarget: target = await session.get(NotificationTarget, target_id) if not target or target.user_id != user_id: raise HTTPException(status_code=404, detail="Target not found") return target