From 08814e9ae247f5cd745a3466251348441723e8ab Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Thu, 19 Mar 2026 23:55:15 +0300 Subject: [PATCH] feat(notify-bridge): phase 8 - integration and wiring Wire all components into a working application: - Scheduler service: APScheduler loads enabled trackers, polls at intervals - Watcher service: orchestrates poll -> detect -> notify flow - Eagerly loads DB data, then creates aiohttp session for provider - Saves tracker state after each poll - Logs events to EventLog table - Dispatches notifications to targets with template rendering - Manual trigger endpoint: POST /api/trackers/{id}/trigger - Scheduler starts on app lifespan startup - Full end-to-end flow verified: server starts cleanly Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/notify_bridge_server/api/trackers.py | 15 ++ .../server/src/notify_bridge_server/main.py | 2 + .../services/scheduler.py | 66 +++++++ .../notify_bridge_server/services/watcher.py | 166 ++++++++++++++++++ 4 files changed, 249 insertions(+) create mode 100644 packages/server/src/notify_bridge_server/services/scheduler.py create mode 100644 packages/server/src/notify_bridge_server/services/watcher.py diff --git a/packages/server/src/notify_bridge_server/api/trackers.py b/packages/server/src/notify_bridge_server/api/trackers.py index a32f9cc..519554c 100644 --- a/packages/server/src/notify_bridge_server/api/trackers.py +++ b/packages/server/src/notify_bridge_server/api/trackers.py @@ -102,3 +102,18 @@ async def delete_tracker( raise HTTPException(status_code=404, detail="Tracker not found") await session.delete(tracker) await session.commit() + + +@router.post("/{tracker_id}/trigger") +async def trigger_tracker( + tracker_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + tracker = await session.get(Tracker, tracker_id) + if not tracker or tracker.user_id != user.id: + raise HTTPException(status_code=404, detail="Tracker not found") + + from ..services.watcher import check_tracker + result = await check_tracker(tracker_id) + return result diff --git a/packages/server/src/notify_bridge_server/main.py b/packages/server/src/notify_bridge_server/main.py index c650dbb..7c496c9 100644 --- a/packages/server/src/notify_bridge_server/main.py +++ b/packages/server/src/notify_bridge_server/main.py @@ -20,6 +20,8 @@ from .api.template_vars import router as template_vars_router async def lifespan(app: FastAPI): await init_db() await _seed_default_templates() + from .services.scheduler import start_scheduler + await start_scheduler() yield diff --git a/packages/server/src/notify_bridge_server/services/scheduler.py b/packages/server/src/notify_bridge_server/services/scheduler.py new file mode 100644 index 0000000..3a6727b --- /dev/null +++ b/packages/server/src/notify_bridge_server/services/scheduler.py @@ -0,0 +1,66 @@ +"""APScheduler-based polling scheduler for trackers.""" + +from __future__ import annotations + +import logging + +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +_LOGGER = logging.getLogger(__name__) + +_scheduler: AsyncIOScheduler | None = None + + +def get_scheduler() -> AsyncIOScheduler: + global _scheduler + if _scheduler is None: + _scheduler = AsyncIOScheduler() + return _scheduler + + +async def start_scheduler() -> None: + scheduler = get_scheduler() + if not scheduler.running: + scheduler.start() + _LOGGER.info("Scheduler started") + + await _load_tracker_jobs() + + +async def _load_tracker_jobs() -> None: + """Load enabled trackers and schedule polling jobs.""" + from sqlmodel import select + from sqlmodel.ext.asyncio.session import AsyncSession + from ..database.engine import get_engine + from ..database.models import Tracker + + engine = get_engine() + scheduler = get_scheduler() + + async with AsyncSession(engine) as session: + result = await session.exec(select(Tracker).where(Tracker.enabled == True)) + trackers = result.all() + + for tracker in trackers: + job_id = f"tracker_{tracker.id}" + if scheduler.get_job(job_id): + continue + + scheduler.add_job( + _poll_tracker, + "interval", + seconds=tracker.scan_interval, + id=job_id, + args=[tracker.id], + replace_existing=True, + ) + _LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval) + + +async def _poll_tracker(tracker_id: int) -> None: + """Poll a tracker for changes.""" + from .watcher import check_tracker + try: + await check_tracker(tracker_id) + except Exception as e: + _LOGGER.error("Error polling tracker %d: %s", tracker_id, e) diff --git a/packages/server/src/notify_bridge_server/services/watcher.py b/packages/server/src/notify_bridge_server/services/watcher.py new file mode 100644 index 0000000..0317cad --- /dev/null +++ b/packages/server/src/notify_bridge_server/services/watcher.py @@ -0,0 +1,166 @@ +"""Watcher service — orchestrates poll -> detect -> notify flow.""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +import aiohttp +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from notify_bridge_core.models.events import ServiceEvent +from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig +from notify_bridge_core.providers.immich import ImmichServiceProvider + +from ..database.engine import get_engine +from ..database.models import ( + EventLog, + NotificationTarget, + ServiceProvider, + TemplateConfig, + Tracker, + TrackerState, +) + +_LOGGER = logging.getLogger(__name__) + + +async def check_tracker(tracker_id: int) -> dict[str, Any]: + """Poll a tracker's provider for changes and dispatch notifications.""" + engine = get_engine() + + # Load all DB data eagerly before entering aiohttp context + async with AsyncSession(engine) as session: + tracker = await session.get(Tracker, tracker_id) + if not tracker or not tracker.enabled: + return {"status": "skipped", "reason": "disabled or not found"} + + provider = await session.get(ServiceProvider, tracker.provider_id) + if not provider: + return {"status": "error", "reason": "provider not found"} + + # Load tracker state + result = await session.exec( + select(TrackerState).where(TrackerState.tracker_id == tracker_id) + ) + states = result.all() + state_dict: dict[str, Any] = {} + for s in states: + state_dict[s.collection_id] = { + "name": "", + "asset_ids": s.asset_ids, + "pending_asset_ids": s.pending_asset_ids, + "shared": False, + } + + # Load targets + targets_db: list[NotificationTarget] = [] + for tid in (tracker.target_ids or []): + t = await session.get(NotificationTarget, tid) + if t: + targets_db.append(t) + + # Load template configs for targets + template_configs: dict[int, TemplateConfig | None] = {} + for t in targets_db: + if t.template_config_id: + tc = await session.get(TemplateConfig, t.template_config_id) + template_configs[t.id] = tc + else: + template_configs[t.id] = None + + # Snapshot the data we need + provider_type = provider.type + provider_config = dict(provider.config) + provider_name = provider.name + collection_ids = list(tracker.collection_ids or []) + + # Now create aiohttp session and poll + events: list[ServiceEvent] = [] + new_state: dict[str, Any] = {} + + if provider_type == "immich": + async with aiohttp.ClientSession() as http_session: + immich = ImmichServiceProvider( + http_session, + provider_config.get("url", ""), + provider_config.get("api_key", ""), + provider_config.get("external_domain"), + provider_name, + ) + connected = await immich.connect() + if not connected: + return {"status": "error", "reason": "failed to connect to provider"} + + events, new_state = await immich.poll(collection_ids, state_dict) + else: + return {"status": "error", "reason": f"unsupported provider type: {provider_type}"} + + # Save updated state and log events + async with AsyncSession(engine) as session: + for cid, cstate in new_state.items(): + existing = None + for s in states: + if s.collection_id == cid: + existing = s + break + + if existing: + existing.asset_ids = cstate.get("asset_ids", []) + existing.pending_asset_ids = cstate.get("pending_asset_ids", []) + session.add(existing) + else: + new_ts = TrackerState( + tracker_id=tracker_id, + collection_id=cid, + asset_ids=cstate.get("asset_ids", []), + pending_asset_ids=cstate.get("pending_asset_ids", []), + ) + session.add(new_ts) + + for event in events: + log = EventLog( + tracker_id=tracker_id, + event_type=event.event_type.value, + collection_id=event.collection_id, + collection_name=event.collection_name, + details={ + "added_count": event.added_count, + "removed_count": event.removed_count, + "provider_type": event.provider_type.value, + }, + ) + session.add(log) + + await session.commit() + + # Dispatch notifications + if events and targets_db: + dispatcher = NotificationDispatcher() + for event in events: + target_configs = [] + for t in targets_db: + tc = template_configs.get(t.id) + slots = None + if tc: + slots = { + "assets_added": tc.message_assets_added, + "assets_removed": tc.message_assets_removed, + "collection_renamed": tc.message_collection_renamed, + "collection_deleted": tc.message_collection_deleted, + "sharing_changed": tc.message_sharing_changed, + } + target_configs.append(TargetConfig( + type=t.type, + config=t.config, + template_slots=slots, + )) + await dispatcher.dispatch(event, target_configs) + + return { + "status": "ok", + "events_detected": len(events), + "collections_checked": len(collection_ids), + }