Files
notify-bridge/packages/server/src/notify_bridge_server/api/trackers.py
T
alexei.dolgolyov 9eec21a5b2 feat: port full CRUD API routes and frontend pages from Immich Watcher
Backend API (38 routes):
- providers: full CRUD + test connection + list collections + API key masking
- trackers: full CRUD + trigger + history + test-periodic/memory
- tracking-configs: full CRUD with Pydantic models, provider_type filter
- template-configs: full CRUD + preview + preview-raw with two-pass validation
- targets: full CRUD + test notification + config masking
- telegram-bots: full CRUD + chat discovery + token endpoint
- users: full admin CRUD + password reset + self-delete protection
- status: dashboard endpoint with providers/trackers/targets/events counts

Frontend pages updated:
- Dashboard with animated stat cards and event timeline
- Providers with proper components, delete confirm, snackbar
- Trackers/targets/tracking-configs/template-configs/telegram-bots/users
  all use PageHeader, Card, Loading, MdiIcon with correct i18n keys

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 00:49:40 +03:00

203 lines
6.4 KiB
Python

"""Tracker management API routes."""
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..auth.dependencies import get_current_user
from ..database.engine import get_session
from ..database.models import EventLog, NotificationTarget, ServiceProvider, Tracker, User
router = APIRouter(prefix="/api/trackers", tags=["trackers"])
class TrackerCreate(BaseModel):
provider_id: int
name: str
icon: str = ""
collection_ids: list[str] = []
target_ids: list[int] = []
tracking_config_id: int | None = None
scan_interval: int = 60
enabled: bool = True
quiet_hours_start: str | None = None
quiet_hours_end: str | None = None
class TrackerUpdate(BaseModel):
name: str | None = None
icon: str | None = None
collection_ids: list[str] | None = None
target_ids: list[int] | None = None
tracking_config_id: int | None = None
scan_interval: int | None = None
enabled: bool | None = None
quiet_hours_start: str | None = None
quiet_hours_end: str | None = None
@router.get("")
async def list_trackers(
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
result = await session.exec(
select(Tracker).where(Tracker.user_id == user.id)
)
return [_tracker_response(t) for t in result.all()]
@router.post("", status_code=status.HTTP_201_CREATED)
async def create_tracker(
body: TrackerCreate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
provider = await session.get(ServiceProvider, body.provider_id)
if not provider or provider.user_id != user.id:
raise HTTPException(status_code=404, detail="Provider not found")
tracker = Tracker(user_id=user.id, **body.model_dump())
session.add(tracker)
await session.commit()
await session.refresh(tracker)
return _tracker_response(tracker)
@router.get("/{tracker_id}")
async def get_tracker(
tracker_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
return _tracker_response(await _get_user_tracker(session, tracker_id, user.id))
@router.put("/{tracker_id}")
async def update_tracker(
tracker_id: int,
body: TrackerUpdate,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
tracker = await _get_user_tracker(session, tracker_id, user.id)
for field, value in body.model_dump(exclude_unset=True).items():
setattr(tracker, field, value)
session.add(tracker)
await session.commit()
await session.refresh(tracker)
return _tracker_response(tracker)
@router.delete("/{tracker_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_tracker(
tracker_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
tracker = await _get_user_tracker(session, tracker_id, user.id)
await session.delete(tracker)
await session.commit()
@router.post("/{tracker_id}/trigger")
async def trigger_tracker(
tracker_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
tracker = await _get_user_tracker(session, tracker_id, user.id)
from ..services.watcher import check_tracker
result = await check_tracker(tracker.id)
return {"triggered": True, "result": result}
@router.post("/{tracker_id}/test-periodic")
async def test_periodic(
tracker_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Send a test periodic summary notification to all targets."""
tracker = await _get_user_tracker(session, tracker_id, user.id)
from ..services.notifier import send_test_notification
results = []
for tid in list(tracker.target_ids):
target = await session.get(NotificationTarget, tid)
if target:
r = await send_test_notification(target)
results.append({"target": target.name, **r})
return {"test": "periodic_summary", "results": results}
@router.post("/{tracker_id}/test-memory")
async def test_memory(
tracker_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
"""Send a test memory/on-this-day notification to all targets."""
tracker = await _get_user_tracker(session, tracker_id, user.id)
from ..services.notifier import send_test_notification
results = []
for tid in list(tracker.target_ids):
target = await session.get(NotificationTarget, tid)
if target:
r = await send_test_notification(target)
results.append({"target": target.name, **r})
return {"test": "memory_mode", "results": results}
@router.get("/{tracker_id}/history")
async def tracker_history(
tracker_id: int,
limit: int = Query(default=20, ge=1, le=500),
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
await _get_user_tracker(session, tracker_id, user.id)
result = await session.exec(
select(EventLog)
.where(EventLog.tracker_id == tracker_id)
.order_by(EventLog.created_at.desc())
.limit(limit)
)
return [
{
"id": e.id,
"event_type": e.event_type,
"collection_id": e.collection_id,
"collection_name": e.collection_name,
"details": e.details,
"created_at": e.created_at.isoformat(),
}
for e in result.all()
]
def _tracker_response(t: Tracker) -> dict:
return {
"id": t.id,
"name": t.name,
"icon": t.icon,
"provider_id": t.provider_id,
"collection_ids": t.collection_ids,
"target_ids": t.target_ids,
"tracking_config_id": t.tracking_config_id,
"scan_interval": t.scan_interval,
"enabled": t.enabled,
"quiet_hours_start": t.quiet_hours_start,
"quiet_hours_end": t.quiet_hours_end,
"created_at": t.created_at.isoformat(),
}
async def _get_user_tracker(
session: AsyncSession, tracker_id: int, user_id: int
) -> Tracker:
tracker = await session.get(Tracker, tracker_id)
if not tracker or tracker.user_id != user_id:
raise HTTPException(status_code=404, detail="Tracker not found")
return tracker