"""Watcher service — orchestrates poll -> detect -> notify flow.""" from __future__ import annotations import logging from typing import Any import aiohttp from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.models.events import ServiceEvent from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig from notify_bridge_core.notifications.telegram.cache import TelegramFileCache from notify_bridge_core.storage import JsonFileBackend from ..database.engine import get_engine from ..database.models import ( EventLog, NotificationTracker, NotificationTrackerState, ServiceProvider, ) from .dispatch_helpers import event_allowed_by_config, load_link_data _LOGGER = logging.getLogger(__name__) # Module-level Telegram file caches — shared across dispatches for reuse _url_cache: TelegramFileCache | None = None _asset_cache: TelegramFileCache | None = None async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFileCache | None]: """Lazily initialize shared Telegram file caches using NOTIFY_BRIDGE_DATA_DIR.""" global _url_cache, _asset_cache if _url_cache is not None: return _url_cache, _asset_cache import os from pathlib import Path data_dir = os.environ.get("NOTIFY_BRIDGE_DATA_DIR") if not data_dir: return None, None cache_dir = Path(data_dir) / "cache" _url_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_url_cache.json")) _asset_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_asset_cache.json")) await _url_cache.async_load() await _asset_cache.async_load() _LOGGER.info("Initialized Telegram file caches in %s", cache_dir) return _url_cache, _asset_cache async def check_tracker(tracker_id: int) -> dict[str, Any]: """Poll a tracker's provider for changes and dispatch notifications.""" engine = get_engine() # Load all DB data eagerly before entering aiohttp context async with AsyncSession(engine) as session: tracker = await session.get(NotificationTracker, tracker_id) if not tracker or not tracker.enabled: return {"status": "skipped", "reason": "disabled or not found"} provider = await session.get(ServiceProvider, tracker.provider_id) if not provider: return {"status": "error", "reason": "provider not found"} # Load tracker state result = await session.exec( select(NotificationTrackerState).where(NotificationTrackerState.tracker_id == tracker_id) ) states = result.all() state_dict: dict[str, Any] = {} for s in states: state_dict[s.collection_id] = { "name": s.collection_name or "", "asset_ids": s.asset_ids, "pending_asset_ids": s.pending_asset_ids, "shared": bool(s.shared), } # Load tracker-target links link_data = await load_link_data(session, tracker_id, check_quiet_hours=True) # Snapshot the data we need provider_type = provider.type provider_config = dict(provider.config) provider_name = provider.name tracker_name = tracker.name tracker_filters = dict(tracker.filters) if tracker.filters else {} collection_ids = list(tracker.collection_ids or []) # Now create aiohttp session and poll events: list[ServiceEvent] = [] new_state: dict[str, Any] = {} if provider_type == "immich": from notify_bridge_core.providers.immich import ImmichServiceProvider async with aiohttp.ClientSession() as http_session: immich = ImmichServiceProvider( http_session, provider_config.get("url", ""), provider_config.get("api_key", ""), provider_config.get("external_domain"), provider_name, ) connected = await immich.connect() if not connected: return {"status": "error", "reason": "failed to connect to provider"} events, new_state = await immich.poll(collection_ids, state_dict) elif provider_type == "gitea": # Gitea is webhook-based — events arrive via /api/webhooks/gitea endpoint. # The scheduler still calls check_tracker but there's nothing to poll. return {"status": "ok", "events_detected": 0, "collections_checked": 0} elif provider_type == "planka": # Planka is webhook-based — events arrive via /api/webhooks/planka endpoint. return {"status": "ok", "events_detected": 0, "collections_checked": 0} elif provider_type == "scheduler": from notify_bridge_core.providers.scheduler import SchedulerServiceProvider custom_vars = tracker_filters.get("custom_variables", {}) sched = SchedulerServiceProvider( name=provider_name, tracker_name=tracker_name, custom_variables=custom_vars, ) events, new_state = await sched.poll(collection_ids, state_dict) else: return {"status": "error", "reason": f"unsupported provider type: {provider_type}"} # Save updated state and log events async with AsyncSession(engine) as session: for cid, cstate in new_state.items(): existing = None for s in states: if s.collection_id == cid: existing = s break if existing: existing.asset_ids = cstate.get("asset_ids", []) existing.pending_asset_ids = cstate.get("pending_asset_ids", []) existing.collection_name = cstate.get("name", "") existing.shared = cstate.get("shared", False) session.add(existing) else: new_ts = NotificationTrackerState( tracker_id=tracker_id, collection_id=cid, collection_name=cstate.get("name", ""), shared=cstate.get("shared", False), asset_ids=cstate.get("asset_ids", []), pending_asset_ids=cstate.get("pending_asset_ids", []), ) session.add(new_ts) for event in events: assets_count = event.added_count or event.removed_count or 0 log = EventLog( tracker_id=tracker_id, tracker_name=tracker.name, provider_id=provider.id, provider_name=provider_name, event_type=event.event_type.value, collection_id=event.collection_id, collection_name=event.collection_name, assets_count=assets_count, details={ "added_count": event.added_count, "removed_count": event.removed_count, "provider_type": event.provider_type.value, }, ) session.add(log) await session.commit() # Dispatch notifications — per-link config resolution # Filter out empty events (e.g. assets_added with 0 added) events = [ e for e in events if not (e.event_type.value == "assets_added" and e.added_count == 0) and not (e.event_type.value == "assets_removed" and e.removed_count == 0) ] _LOGGER.info( "Tracker %d: %d events after filter, %d links", tracker_id, len(events), len(link_data), ) if events and link_data: url_cache, asset_cache = await _get_telegram_caches() dispatcher = NotificationDispatcher(url_cache=url_cache, asset_cache=asset_cache) for event in events: _LOGGER.info( "Dispatching event %s for %s (added=%d removed=%d)", event.event_type.value, event.collection_name, event.added_count, event.removed_count, ) target_configs = [] for ld in link_data: # Apply per-link event filtering from tracking config tc = ld["tracking_config"] if tc and not event_allowed_by_config(event, tc): _LOGGER.info(" Skipped by tracking config filter") continue tmpl = ld["template_config"] target_configs.append(TargetConfig( type=ld["target_type"], config=ld["target_config"], template_slots=ld["template_slots"], date_format=tmpl.date_format if tmpl else "%d.%m.%Y, %H:%M UTC", date_only_format=tmpl.date_only_format if tmpl and tmpl.date_only_format else "%d.%m.%Y", provider_api_key=provider_config.get("api_key"), provider_internal_url=provider_config.get("url", ""), provider_external_url=provider_config.get("external_domain", ""), receivers=ld["receivers"], )) if target_configs: results = await dispatcher.dispatch(event, target_configs) for r in results: if r.get("success"): _LOGGER.info(" Notification sent successfully") else: _LOGGER.error(" Notification failed: %s", r.get("error", "unknown")) return { "status": "ok", "events_detected": len(events), "collections_checked": len(collection_ids), }