feat(notify-bridge): phase 8 - integration and wiring
Wire all components into a working application:
- Scheduler service: APScheduler loads enabled trackers, polls at intervals
- Watcher service: orchestrates poll -> detect -> notify flow
- Eagerly loads DB data, then creates aiohttp session for provider
- Saves tracker state after each poll
- Logs events to EventLog table
- Dispatches notifications to targets with template rendering
- Manual trigger endpoint: POST /api/trackers/{id}/trigger
- Scheduler starts on app lifespan startup
- Full end-to-end flow verified: server starts cleanly
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -102,3 +102,18 @@ async def delete_tracker(
|
|||||||
raise HTTPException(status_code=404, detail="Tracker not found")
|
raise HTTPException(status_code=404, detail="Tracker not found")
|
||||||
await session.delete(tracker)
|
await session.delete(tracker)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{tracker_id}/trigger")
|
||||||
|
async def trigger_tracker(
|
||||||
|
tracker_id: int,
|
||||||
|
user: User = Depends(get_current_user),
|
||||||
|
session: AsyncSession = Depends(get_session),
|
||||||
|
):
|
||||||
|
tracker = await session.get(Tracker, tracker_id)
|
||||||
|
if not tracker or tracker.user_id != user.id:
|
||||||
|
raise HTTPException(status_code=404, detail="Tracker not found")
|
||||||
|
|
||||||
|
from ..services.watcher import check_tracker
|
||||||
|
result = await check_tracker(tracker_id)
|
||||||
|
return result
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ from .api.template_vars import router as template_vars_router
|
|||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
await init_db()
|
await init_db()
|
||||||
await _seed_default_templates()
|
await _seed_default_templates()
|
||||||
|
from .services.scheduler import start_scheduler
|
||||||
|
await start_scheduler()
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,66 @@
|
|||||||
|
"""APScheduler-based polling scheduler for trackers."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_scheduler: AsyncIOScheduler | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_scheduler() -> AsyncIOScheduler:
|
||||||
|
global _scheduler
|
||||||
|
if _scheduler is None:
|
||||||
|
_scheduler = AsyncIOScheduler()
|
||||||
|
return _scheduler
|
||||||
|
|
||||||
|
|
||||||
|
async def start_scheduler() -> None:
|
||||||
|
scheduler = get_scheduler()
|
||||||
|
if not scheduler.running:
|
||||||
|
scheduler.start()
|
||||||
|
_LOGGER.info("Scheduler started")
|
||||||
|
|
||||||
|
await _load_tracker_jobs()
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_tracker_jobs() -> None:
|
||||||
|
"""Load enabled trackers and schedule polling jobs."""
|
||||||
|
from sqlmodel import select
|
||||||
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
|
from ..database.engine import get_engine
|
||||||
|
from ..database.models import Tracker
|
||||||
|
|
||||||
|
engine = get_engine()
|
||||||
|
scheduler = get_scheduler()
|
||||||
|
|
||||||
|
async with AsyncSession(engine) as session:
|
||||||
|
result = await session.exec(select(Tracker).where(Tracker.enabled == True))
|
||||||
|
trackers = result.all()
|
||||||
|
|
||||||
|
for tracker in trackers:
|
||||||
|
job_id = f"tracker_{tracker.id}"
|
||||||
|
if scheduler.get_job(job_id):
|
||||||
|
continue
|
||||||
|
|
||||||
|
scheduler.add_job(
|
||||||
|
_poll_tracker,
|
||||||
|
"interval",
|
||||||
|
seconds=tracker.scan_interval,
|
||||||
|
id=job_id,
|
||||||
|
args=[tracker.id],
|
||||||
|
replace_existing=True,
|
||||||
|
)
|
||||||
|
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
|
||||||
|
|
||||||
|
|
||||||
|
async def _poll_tracker(tracker_id: int) -> None:
|
||||||
|
"""Poll a tracker for changes."""
|
||||||
|
from .watcher import check_tracker
|
||||||
|
try:
|
||||||
|
await check_tracker(tracker_id)
|
||||||
|
except Exception as e:
|
||||||
|
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
||||||
@@ -0,0 +1,166 @@
|
|||||||
|
"""Watcher service — orchestrates poll -> detect -> notify flow."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from sqlmodel import select
|
||||||
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
|
|
||||||
|
from notify_bridge_core.models.events import ServiceEvent
|
||||||
|
from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig
|
||||||
|
from notify_bridge_core.providers.immich import ImmichServiceProvider
|
||||||
|
|
||||||
|
from ..database.engine import get_engine
|
||||||
|
from ..database.models import (
|
||||||
|
EventLog,
|
||||||
|
NotificationTarget,
|
||||||
|
ServiceProvider,
|
||||||
|
TemplateConfig,
|
||||||
|
Tracker,
|
||||||
|
TrackerState,
|
||||||
|
)
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def check_tracker(tracker_id: int) -> dict[str, Any]:
|
||||||
|
"""Poll a tracker's provider for changes and dispatch notifications."""
|
||||||
|
engine = get_engine()
|
||||||
|
|
||||||
|
# Load all DB data eagerly before entering aiohttp context
|
||||||
|
async with AsyncSession(engine) as session:
|
||||||
|
tracker = await session.get(Tracker, tracker_id)
|
||||||
|
if not tracker or not tracker.enabled:
|
||||||
|
return {"status": "skipped", "reason": "disabled or not found"}
|
||||||
|
|
||||||
|
provider = await session.get(ServiceProvider, tracker.provider_id)
|
||||||
|
if not provider:
|
||||||
|
return {"status": "error", "reason": "provider not found"}
|
||||||
|
|
||||||
|
# Load tracker state
|
||||||
|
result = await session.exec(
|
||||||
|
select(TrackerState).where(TrackerState.tracker_id == tracker_id)
|
||||||
|
)
|
||||||
|
states = result.all()
|
||||||
|
state_dict: dict[str, Any] = {}
|
||||||
|
for s in states:
|
||||||
|
state_dict[s.collection_id] = {
|
||||||
|
"name": "",
|
||||||
|
"asset_ids": s.asset_ids,
|
||||||
|
"pending_asset_ids": s.pending_asset_ids,
|
||||||
|
"shared": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Load targets
|
||||||
|
targets_db: list[NotificationTarget] = []
|
||||||
|
for tid in (tracker.target_ids or []):
|
||||||
|
t = await session.get(NotificationTarget, tid)
|
||||||
|
if t:
|
||||||
|
targets_db.append(t)
|
||||||
|
|
||||||
|
# Load template configs for targets
|
||||||
|
template_configs: dict[int, TemplateConfig | None] = {}
|
||||||
|
for t in targets_db:
|
||||||
|
if t.template_config_id:
|
||||||
|
tc = await session.get(TemplateConfig, t.template_config_id)
|
||||||
|
template_configs[t.id] = tc
|
||||||
|
else:
|
||||||
|
template_configs[t.id] = None
|
||||||
|
|
||||||
|
# Snapshot the data we need
|
||||||
|
provider_type = provider.type
|
||||||
|
provider_config = dict(provider.config)
|
||||||
|
provider_name = provider.name
|
||||||
|
collection_ids = list(tracker.collection_ids or [])
|
||||||
|
|
||||||
|
# Now create aiohttp session and poll
|
||||||
|
events: list[ServiceEvent] = []
|
||||||
|
new_state: dict[str, Any] = {}
|
||||||
|
|
||||||
|
if provider_type == "immich":
|
||||||
|
async with aiohttp.ClientSession() as http_session:
|
||||||
|
immich = ImmichServiceProvider(
|
||||||
|
http_session,
|
||||||
|
provider_config.get("url", ""),
|
||||||
|
provider_config.get("api_key", ""),
|
||||||
|
provider_config.get("external_domain"),
|
||||||
|
provider_name,
|
||||||
|
)
|
||||||
|
connected = await immich.connect()
|
||||||
|
if not connected:
|
||||||
|
return {"status": "error", "reason": "failed to connect to provider"}
|
||||||
|
|
||||||
|
events, new_state = await immich.poll(collection_ids, state_dict)
|
||||||
|
else:
|
||||||
|
return {"status": "error", "reason": f"unsupported provider type: {provider_type}"}
|
||||||
|
|
||||||
|
# Save updated state and log events
|
||||||
|
async with AsyncSession(engine) as session:
|
||||||
|
for cid, cstate in new_state.items():
|
||||||
|
existing = None
|
||||||
|
for s in states:
|
||||||
|
if s.collection_id == cid:
|
||||||
|
existing = s
|
||||||
|
break
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
existing.asset_ids = cstate.get("asset_ids", [])
|
||||||
|
existing.pending_asset_ids = cstate.get("pending_asset_ids", [])
|
||||||
|
session.add(existing)
|
||||||
|
else:
|
||||||
|
new_ts = TrackerState(
|
||||||
|
tracker_id=tracker_id,
|
||||||
|
collection_id=cid,
|
||||||
|
asset_ids=cstate.get("asset_ids", []),
|
||||||
|
pending_asset_ids=cstate.get("pending_asset_ids", []),
|
||||||
|
)
|
||||||
|
session.add(new_ts)
|
||||||
|
|
||||||
|
for event in events:
|
||||||
|
log = EventLog(
|
||||||
|
tracker_id=tracker_id,
|
||||||
|
event_type=event.event_type.value,
|
||||||
|
collection_id=event.collection_id,
|
||||||
|
collection_name=event.collection_name,
|
||||||
|
details={
|
||||||
|
"added_count": event.added_count,
|
||||||
|
"removed_count": event.removed_count,
|
||||||
|
"provider_type": event.provider_type.value,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
session.add(log)
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# Dispatch notifications
|
||||||
|
if events and targets_db:
|
||||||
|
dispatcher = NotificationDispatcher()
|
||||||
|
for event in events:
|
||||||
|
target_configs = []
|
||||||
|
for t in targets_db:
|
||||||
|
tc = template_configs.get(t.id)
|
||||||
|
slots = None
|
||||||
|
if tc:
|
||||||
|
slots = {
|
||||||
|
"assets_added": tc.message_assets_added,
|
||||||
|
"assets_removed": tc.message_assets_removed,
|
||||||
|
"collection_renamed": tc.message_collection_renamed,
|
||||||
|
"collection_deleted": tc.message_collection_deleted,
|
||||||
|
"sharing_changed": tc.message_sharing_changed,
|
||||||
|
}
|
||||||
|
target_configs.append(TargetConfig(
|
||||||
|
type=t.type,
|
||||||
|
config=t.config,
|
||||||
|
template_slots=slots,
|
||||||
|
))
|
||||||
|
await dispatcher.dispatch(event, target_configs)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"events_detected": len(events),
|
||||||
|
"collections_checked": len(collection_ids),
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user