feat(notify-bridge): phase 8 - integration and wiring

Wire all components into a working application:
- Scheduler service: APScheduler loads enabled trackers, polls at intervals
- Watcher service: orchestrates poll -> detect -> notify flow
  - Eagerly loads DB data, then creates aiohttp session for provider
  - Saves tracker state after each poll
  - Logs events to EventLog table
  - Dispatches notifications to targets with template rendering
- Manual trigger endpoint: POST /api/trackers/{id}/trigger
- Scheduler starts on app lifespan startup
- Full end-to-end flow verified: server starts cleanly

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 23:55:15 +03:00
parent 9dfd1b79cd
commit 08814e9ae2
4 changed files with 249 additions and 0 deletions
@@ -102,3 +102,18 @@ async def delete_tracker(
raise HTTPException(status_code=404, detail="Tracker not found") raise HTTPException(status_code=404, detail="Tracker not found")
await session.delete(tracker) await session.delete(tracker)
await session.commit() await session.commit()
@router.post("/{tracker_id}/trigger")
async def trigger_tracker(
tracker_id: int,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
tracker = await session.get(Tracker, tracker_id)
if not tracker or tracker.user_id != user.id:
raise HTTPException(status_code=404, detail="Tracker not found")
from ..services.watcher import check_tracker
result = await check_tracker(tracker_id)
return result
@@ -20,6 +20,8 @@ from .api.template_vars import router as template_vars_router
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
await init_db() await init_db()
await _seed_default_templates() await _seed_default_templates()
from .services.scheduler import start_scheduler
await start_scheduler()
yield yield
@@ -0,0 +1,66 @@
"""APScheduler-based polling scheduler for trackers."""
from __future__ import annotations
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
_LOGGER = logging.getLogger(__name__)
_scheduler: AsyncIOScheduler | None = None
def get_scheduler() -> AsyncIOScheduler:
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler()
return _scheduler
async def start_scheduler() -> None:
scheduler = get_scheduler()
if not scheduler.running:
scheduler.start()
_LOGGER.info("Scheduler started")
await _load_tracker_jobs()
async def _load_tracker_jobs() -> None:
"""Load enabled trackers and schedule polling jobs."""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import Tracker
engine = get_engine()
scheduler = get_scheduler()
async with AsyncSession(engine) as session:
result = await session.exec(select(Tracker).where(Tracker.enabled == True))
trackers = result.all()
for tracker in trackers:
job_id = f"tracker_{tracker.id}"
if scheduler.get_job(job_id):
continue
scheduler.add_job(
_poll_tracker,
"interval",
seconds=tracker.scan_interval,
id=job_id,
args=[tracker.id],
replace_existing=True,
)
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
async def _poll_tracker(tracker_id: int) -> None:
"""Poll a tracker for changes."""
from .watcher import check_tracker
try:
await check_tracker(tracker_id)
except Exception as e:
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
@@ -0,0 +1,166 @@
"""Watcher service — orchestrates poll -> detect -> notify flow."""
from __future__ import annotations
import json
import logging
from typing import Any
import aiohttp
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
from notify_bridge_core.providers.immich import ImmichServiceProvider
from ..database.engine import get_engine
from ..database.models import (
EventLog,
NotificationTarget,
ServiceProvider,
TemplateConfig,
Tracker,
TrackerState,
)
_LOGGER = logging.getLogger(__name__)
async def check_tracker(tracker_id: int) -> dict[str, Any]:
"""Poll a tracker's provider for changes and dispatch notifications."""
engine = get_engine()
# Load all DB data eagerly before entering aiohttp context
async with AsyncSession(engine) as session:
tracker = await session.get(Tracker, tracker_id)
if not tracker or not tracker.enabled:
return {"status": "skipped", "reason": "disabled or not found"}
provider = await session.get(ServiceProvider, tracker.provider_id)
if not provider:
return {"status": "error", "reason": "provider not found"}
# Load tracker state
result = await session.exec(
select(TrackerState).where(TrackerState.tracker_id == tracker_id)
)
states = result.all()
state_dict: dict[str, Any] = {}
for s in states:
state_dict[s.collection_id] = {
"name": "",
"asset_ids": s.asset_ids,
"pending_asset_ids": s.pending_asset_ids,
"shared": False,
}
# Load targets
targets_db: list[NotificationTarget] = []
for tid in (tracker.target_ids or []):
t = await session.get(NotificationTarget, tid)
if t:
targets_db.append(t)
# Load template configs for targets
template_configs: dict[int, TemplateConfig | None] = {}
for t in targets_db:
if t.template_config_id:
tc = await session.get(TemplateConfig, t.template_config_id)
template_configs[t.id] = tc
else:
template_configs[t.id] = None
# Snapshot the data we need
provider_type = provider.type
provider_config = dict(provider.config)
provider_name = provider.name
collection_ids = list(tracker.collection_ids or [])
# Now create aiohttp session and poll
events: list[ServiceEvent] = []
new_state: dict[str, Any] = {}
if provider_type == "immich":
async with aiohttp.ClientSession() as http_session:
immich = ImmichServiceProvider(
http_session,
provider_config.get("url", ""),
provider_config.get("api_key", ""),
provider_config.get("external_domain"),
provider_name,
)
connected = await immich.connect()
if not connected:
return {"status": "error", "reason": "failed to connect to provider"}
events, new_state = await immich.poll(collection_ids, state_dict)
else:
return {"status": "error", "reason": f"unsupported provider type: {provider_type}"}
# Save updated state and log events
async with AsyncSession(engine) as session:
for cid, cstate in new_state.items():
existing = None
for s in states:
if s.collection_id == cid:
existing = s
break
if existing:
existing.asset_ids = cstate.get("asset_ids", [])
existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
session.add(existing)
else:
new_ts = TrackerState(
tracker_id=tracker_id,
collection_id=cid,
asset_ids=cstate.get("asset_ids", []),
pending_asset_ids=cstate.get("pending_asset_ids", []),
)
session.add(new_ts)
for event in events:
log = EventLog(
tracker_id=tracker_id,
event_type=event.event_type.value,
collection_id=event.collection_id,
collection_name=event.collection_name,
details={
"added_count": event.added_count,
"removed_count": event.removed_count,
"provider_type": event.provider_type.value,
},
)
session.add(log)
await session.commit()
# Dispatch notifications
if events and targets_db:
dispatcher = NotificationDispatcher()
for event in events:
target_configs = []
for t in targets_db:
tc = template_configs.get(t.id)
slots = None
if tc:
slots = {
"assets_added": tc.message_assets_added,
"assets_removed": tc.message_assets_removed,
"collection_renamed": tc.message_collection_renamed,
"collection_deleted": tc.message_collection_deleted,
"sharing_changed": tc.message_sharing_changed,
}
target_configs.append(TargetConfig(
type=t.type,
config=t.config,
template_slots=slots,
))
await dispatcher.dispatch(event, target_configs)
return {
"status": "ok",
"events_detected": len(events),
"collections_checked": len(collection_ids),
}