feat(notify-bridge): phase 8 - integration and wiring
Wire all components into a working application:
- Scheduler service: APScheduler loads enabled trackers, polls at intervals
- Watcher service: orchestrates poll -> detect -> notify flow
- Eagerly loads DB data, then creates aiohttp session for provider
- Saves tracker state after each poll
- Logs events to EventLog table
- Dispatches notifications to targets with template rendering
- Manual trigger endpoint: POST /api/trackers/{id}/trigger
- Scheduler starts on app lifespan startup
- Full end-to-end flow verified: server starts cleanly
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,66 @@
|
||||
"""APScheduler-based polling scheduler for trackers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_scheduler: AsyncIOScheduler | None = None
|
||||
|
||||
|
||||
def get_scheduler() -> AsyncIOScheduler:
|
||||
global _scheduler
|
||||
if _scheduler is None:
|
||||
_scheduler = AsyncIOScheduler()
|
||||
return _scheduler
|
||||
|
||||
|
||||
async def start_scheduler() -> None:
|
||||
scheduler = get_scheduler()
|
||||
if not scheduler.running:
|
||||
scheduler.start()
|
||||
_LOGGER.info("Scheduler started")
|
||||
|
||||
await _load_tracker_jobs()
|
||||
|
||||
|
||||
async def _load_tracker_jobs() -> None:
|
||||
"""Load enabled trackers and schedule polling jobs."""
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from ..database.engine import get_engine
|
||||
from ..database.models import Tracker
|
||||
|
||||
engine = get_engine()
|
||||
scheduler = get_scheduler()
|
||||
|
||||
async with AsyncSession(engine) as session:
|
||||
result = await session.exec(select(Tracker).where(Tracker.enabled == True))
|
||||
trackers = result.all()
|
||||
|
||||
for tracker in trackers:
|
||||
job_id = f"tracker_{tracker.id}"
|
||||
if scheduler.get_job(job_id):
|
||||
continue
|
||||
|
||||
scheduler.add_job(
|
||||
_poll_tracker,
|
||||
"interval",
|
||||
seconds=tracker.scan_interval,
|
||||
id=job_id,
|
||||
args=[tracker.id],
|
||||
replace_existing=True,
|
||||
)
|
||||
_LOGGER.info("Scheduled tracker %d (%s) every %ds", tracker.id, tracker.name, tracker.scan_interval)
|
||||
|
||||
|
||||
async def _poll_tracker(tracker_id: int) -> None:
|
||||
"""Poll a tracker for changes."""
|
||||
from .watcher import check_tracker
|
||||
try:
|
||||
await check_tracker(tracker_id)
|
||||
except Exception as e:
|
||||
_LOGGER.error("Error polling tracker %d: %s", tracker_id, e)
|
||||
Reference in New Issue
Block a user