feat(notify-bridge): phase 8 - integration and wiring
Wire all components into a working application:
- Scheduler service: APScheduler loads enabled trackers, polls at intervals
- Watcher service: orchestrates poll -> detect -> notify flow
- Eagerly loads DB data, then creates aiohttp session for provider
- Saves tracker state after each poll
- Logs events to EventLog table
- Dispatches notifications to targets with template rendering
- Manual trigger endpoint: POST /api/trackers/{id}/trigger
- Scheduler starts on app lifespan startup
- Full end-to-end flow verified: server starts cleanly
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -102,3 +102,18 @@ async def delete_tracker(
|
||||
raise HTTPException(status_code=404, detail="Tracker not found")
|
||||
await session.delete(tracker)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@router.post("/{tracker_id}/trigger")
|
||||
async def trigger_tracker(
|
||||
tracker_id: int,
|
||||
user: User = Depends(get_current_user),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
tracker = await session.get(Tracker, tracker_id)
|
||||
if not tracker or tracker.user_id != user.id:
|
||||
raise HTTPException(status_code=404, detail="Tracker not found")
|
||||
|
||||
from ..services.watcher import check_tracker
|
||||
result = await check_tracker(tracker_id)
|
||||
return result
|
||||
|
||||
@@ -20,6 +20,8 @@ from .api.template_vars import router as template_vars_router
|
||||
async def lifespan(app: FastAPI):
|
||||
await init_db()
|
||||
await _seed_default_templates()
|
||||
from .services.scheduler import start_scheduler
|
||||
await start_scheduler()
|
||||
yield
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
"""APScheduler-based polling scheduler for trackers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_scheduler: AsyncIOScheduler | None = None
|
||||
|
||||
|
||||
def get_scheduler() -> AsyncIOScheduler:
|
||||
global _scheduler
|
||||
if _scheduler is None:
|
||||
_scheduler = AsyncIOScheduler()
|
||||
return _scheduler
|
||||
|
||||
|
||||
async def start_scheduler() -> None:
|
||||
scheduler = get_scheduler()
|
||||
if not scheduler.running:
|
||||
scheduler.start()
|
||||
_LOGGER.info("Scheduler started")
|
||||
|
||||
await _load_tracker_jobs()
|
||||
|
||||
|
||||
async def _load_tracker_jobs() -> None:
|
||||
"""Load enabled trackers and schedule polling jobs."""
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import Tracker
|
||||
|
||||
engine = get_engine()
|
||||
scheduler = get_scheduler()
|
||||
|
||||
async with AsyncSession(engine) as session:
|
||||
result = await session.exec(select(Tracker).where(Tracker.enabled == True))
|
||||
trackers = result.all()
|
||||
|
||||
for tracker in trackers:
|
||||
job_id = f"tracker_{tracker.id}"
|
||||
if scheduler.get_job(job_id):
|
||||
continue
|
||||
|
||||
scheduler.add_job(
|
||||
_poll_tracker,
|
||||
"interval",
|
||||
seconds=tracker.scan_interval,
|
||||
id=job_id,
|
||||
args=[tracker.id],
|
||||
replace_existing=True,
|
||||
)
|
||||
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
|
||||
|
||||
|
||||
async def _poll_tracker(tracker_id: int) -> None:
|
||||
"""Poll a tracker for changes."""
|
||||
from .watcher import check_tracker
|
||||
try:
|
||||
await check_tracker(tracker_id)
|
||||
except Exception as e:
|
||||
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
||||
@@ -0,0 +1,166 @@
|
||||
"""Watcher service — orchestrates poll -> detect -> notify flow."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from notify_bridge_core.models.events import ServiceEvent
|
||||
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
|
||||
from notify_bridge_core.providers.immich import ImmichServiceProvider
|
||||
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import (
|
||||
EventLog,
|
||||
NotificationTarget,
|
||||
ServiceProvider,
|
||||
TemplateConfig,
|
||||
Tracker,
|
||||
TrackerState,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||
"""Poll a tracker's provider for changes and dispatch notifications."""
|
||||
engine = get_engine()
|
||||
|
||||
# Load all DB data eagerly before entering aiohttp context
|
||||
async with AsyncSession(engine) as session:
|
||||
tracker = await session.get(Tracker, tracker_id)
|
||||
if not tracker or not tracker.enabled:
|
||||
return {"status": "skipped", "reason": "disabled or not found"}
|
||||
|
||||
provider = await session.get(ServiceProvider, tracker.provider_id)
|
||||
if not provider:
|
||||
return {"status": "error", "reason": "provider not found"}
|
||||
|
||||
# Load tracker state
|
||||
result = await session.exec(
|
||||
select(TrackerState).where(TrackerState.tracker_id == tracker_id)
|
||||
)
|
||||
states = result.all()
|
||||
state_dict: dict[str, Any] = {}
|
||||
for s in states:
|
||||
state_dict[s.collection_id] = {
|
||||
"name": "",
|
||||
"asset_ids": s.asset_ids,
|
||||
"pending_asset_ids": s.pending_asset_ids,
|
||||
"shared": False,
|
||||
}
|
||||
|
||||
# Load targets
|
||||
targets_db: list[NotificationTarget] = []
|
||||
for tid in (tracker.target_ids or []):
|
||||
t = await session.get(NotificationTarget, tid)
|
||||
if t:
|
||||
targets_db.append(t)
|
||||
|
||||
# Load template configs for targets
|
||||
template_configs: dict[int, TemplateConfig | None] = {}
|
||||
for t in targets_db:
|
||||
if t.template_config_id:
|
||||
tc = await session.get(TemplateConfig, t.template_config_id)
|
||||
template_configs[t.id] = tc
|
||||
else:
|
||||
template_configs[t.id] = None
|
||||
|
||||
# Snapshot the data we need
|
||||
provider_type = provider.type
|
||||
provider_config = dict(provider.config)
|
||||
provider_name = provider.name
|
||||
collection_ids = list(tracker.collection_ids or [])
|
||||
|
||||
# Now create aiohttp session and poll
|
||||
events: list[ServiceEvent] = []
|
||||
new_state: dict[str, Any] = {}
|
||||
|
||||
if provider_type == "immich":
|
||||
async with aiohttp.ClientSession() as http_session:
|
||||
immich = ImmichServiceProvider(
|
||||
http_session,
|
||||
provider_config.get("url", ""),
|
||||
provider_config.get("api_key", ""),
|
||||
provider_config.get("external_domain"),
|
||||
provider_name,
|
||||
)
|
||||
connected = await immich.connect()
|
||||
if not connected:
|
||||
return {"status": "error", "reason": "failed to connect to provider"}
|
||||
|
||||
events, new_state = await immich.poll(collection_ids, state_dict)
|
||||
else:
|
||||
return {"status": "error", "reason": f"unsupported provider type: {provider_type}"}
|
||||
|
||||
# Save updated state and log events
|
||||
async with AsyncSession(engine) as session:
|
||||
for cid, cstate in new_state.items():
|
||||
existing = None
|
||||
for s in states:
|
||||
if s.collection_id == cid:
|
||||
existing = s
|
||||
break
|
||||
|
||||
if existing:
|
||||
existing.asset_ids = cstate.get("asset_ids", [])
|
||||
existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
|
||||
session.add(existing)
|
||||
else:
|
||||
new_ts = TrackerState(
|
||||
tracker_id=tracker_id,
|
||||
collection_id=cid,
|
||||
asset_ids=cstate.get("asset_ids", []),
|
||||
pending_asset_ids=cstate.get("pending_asset_ids", []),
|
||||
)
|
||||
session.add(new_ts)
|
||||
|
||||
for event in events:
|
||||
log = EventLog(
|
||||
tracker_id=tracker_id,
|
||||
event_type=event.event_type.value,
|
||||
collection_id=event.collection_id,
|
||||
collection_name=event.collection_name,
|
||||
details={
|
||||
"added_count": event.added_count,
|
||||
"removed_count": event.removed_count,
|
||||
"provider_type": event.provider_type.value,
|
||||
},
|
||||
)
|
||||
session.add(log)
|
||||
|
||||
await session.commit()
|
||||
|
||||
# Dispatch notifications
|
||||
if events and targets_db:
|
||||
dispatcher = NotificationDispatcher()
|
||||
for event in events:
|
||||
target_configs = []
|
||||
for t in targets_db:
|
||||
tc = template_configs.get(t.id)
|
||||
slots = None
|
||||
if tc:
|
||||
slots = {
|
||||
"assets_added": tc.message_assets_added,
|
||||
"assets_removed": tc.message_assets_removed,
|
||||
"collection_renamed": tc.message_collection_renamed,
|
||||
"collection_deleted": tc.message_collection_deleted,
|
||||
"sharing_changed": tc.message_sharing_changed,
|
||||
}
|
||||
target_configs.append(TargetConfig(
|
||||
type=t.type,
|
||||
config=t.config,
|
||||
template_slots=slots,
|
||||
))
|
||||
await dispatcher.dispatch(event, target_configs)
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"events_detected": len(events),
|
||||
"collections_checked": len(collection_ids),
|
||||
}
|
||||
Reference in New Issue
Block a user