"""Watcher service — orchestrates poll -> detect -> notify flow.""" from __future__ import annotations import logging from datetime import datetime, time, timezone from typing import Any import aiohttp from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from notify_bridge_core.models.events import ServiceEvent from notify_bridge_core.notifications.dispatcher import NotificationDispatcher, TargetConfig from notify_bridge_core.notifications.telegram.cache import TelegramFileCache from notify_bridge_core.storage import JsonFileBackend from ..database.engine import get_engine from ..database.models import ( EmailBot, EventLog, MatrixBot, NotificationTarget, NotificationTracker, NotificationTrackerState, NotificationTrackerTarget, ServiceProvider, TargetReceiver, TemplateConfig, TemplateSlot, TrackingConfig, ) _LOGGER = logging.getLogger(__name__) # Module-level Telegram file caches — shared across dispatches for reuse _url_cache: TelegramFileCache | None = None _asset_cache: TelegramFileCache | None = None async def _get_telegram_caches() -> tuple[TelegramFileCache | None, TelegramFileCache | None]: """Lazily initialize shared Telegram file caches using NOTIFY_BRIDGE_DATA_DIR.""" global _url_cache, _asset_cache if _url_cache is not None: return _url_cache, _asset_cache import os from pathlib import Path data_dir = os.environ.get("NOTIFY_BRIDGE_DATA_DIR") if not data_dir: return None, None cache_dir = Path(data_dir) / "cache" _url_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_url_cache.json")) _asset_cache = TelegramFileCache(JsonFileBackend(cache_dir / "telegram_asset_cache.json")) await _url_cache.async_load() await _asset_cache.async_load() _LOGGER.info("Initialized Telegram file caches in %s", cache_dir) return _url_cache, _asset_cache def _in_quiet_hours(start: str | None, end: str | None) -> bool: """Check if the current UTC time is within the quiet hours window.""" if not start or not end: return False try: now = datetime.now(timezone.utc).time() t_start = time.fromisoformat(start) t_end = time.fromisoformat(end) if t_start <= t_end: return t_start <= now <= t_end else: # Overnight window (e.g., 22:00 - 06:00) return now >= t_start or now <= t_end except (ValueError, TypeError): return False def _event_allowed_by_config(event: ServiceEvent, tc: TrackingConfig) -> bool: """Check if an event type is allowed by the tracking config's flags.""" event_type = event.event_type.value flag_map = { # Immich events "assets_added": tc.track_assets_added, "assets_removed": tc.track_assets_removed, "collection_renamed": tc.track_collection_renamed, "collection_deleted": tc.track_collection_deleted, "sharing_changed": tc.track_sharing_changed, # Gitea events "push": tc.track_push, "issue_opened": tc.track_issue_opened, "issue_closed": tc.track_issue_closed, "issue_commented": tc.track_issue_commented, "pr_opened": tc.track_pr_opened, "pr_closed": tc.track_pr_closed, "pr_merged": tc.track_pr_merged, "pr_commented": tc.track_pr_commented, "release_published": tc.track_release_published, } return flag_map.get(event_type, True) async def check_tracker(tracker_id: int) -> dict[str, Any]: """Poll a tracker's provider for changes and dispatch notifications.""" engine = get_engine() # Load all DB data eagerly before entering aiohttp context async with AsyncSession(engine) as session: tracker = await session.get(NotificationTracker, tracker_id) if not tracker or not tracker.enabled: return {"status": "skipped", "reason": "disabled or not found"} provider = await session.get(ServiceProvider, tracker.provider_id) if not provider: return {"status": "error", "reason": "provider not found"} # Load tracker state result = await session.exec( select(NotificationTrackerState).where(NotificationTrackerState.tracker_id == tracker_id) ) states = result.all() state_dict: dict[str, Any] = {} for s in states: state_dict[s.collection_id] = { "name": s.collection_name or "", "asset_ids": s.asset_ids, "pending_asset_ids": s.pending_asset_ids, "shared": bool(s.shared), } # Load tracker-target links (replaces old target_ids JSON array) tt_result = await session.exec( select(NotificationTrackerTarget).where(NotificationTrackerTarget.tracker_id == tracker_id) ) tracker_targets = tt_result.all() # For each link, load target + tracking config + template config link_data: list[dict[str, Any]] = [] for tt in tracker_targets: if not tt.enabled: continue if _in_quiet_hours(tt.quiet_hours_start, tt.quiet_hours_end): continue target = await session.get(NotificationTarget, tt.target_id) if not target: continue # Load receivers for this target recv_result = await session.exec( select(TargetReceiver).where( TargetReceiver.target_id == target.id, TargetReceiver.enabled == True, ) ) receivers = [dict(r.config) for r in recv_result.all()] tracking_config = None if tt.tracking_config_id: tracking_config = await session.get(TrackingConfig, tt.tracking_config_id) template_config = None template_slots: dict[str, str] | None = None if tt.template_config_id: template_config = await session.get(TemplateConfig, tt.template_config_id) if template_config: slot_result = await session.exec( select(TemplateSlot).where(TemplateSlot.config_id == template_config.id) ) raw_slots = {s.slot_name: s.template for s in slot_result.all()} # Map slot names to event_type values for dispatcher lookup template_slots = {} for slot_name, tmpl_text in raw_slots.items(): # Strip "message_" prefix for event-type slots event_key = slot_name.removeprefix("message_") if slot_name.startswith("message_") else slot_name template_slots[event_key] = tmpl_text target_config = dict(target.config) # Inject bot credentials for bot-backed target types if target.type == "email": email_bot_id = target.config.get("email_bot_id") if email_bot_id: email_bot = await session.get(EmailBot, email_bot_id) if email_bot: target_config["smtp"] = { "host": email_bot.smtp_host, "port": email_bot.smtp_port, "username": email_bot.smtp_username, "password": email_bot.smtp_password, "from_address": email_bot.email, "from_name": email_bot.name, "use_tls": email_bot.smtp_use_tls, } elif target.type == "matrix": matrix_bot_id = target.config.get("matrix_bot_id") if matrix_bot_id: matrix_bot = await session.get(MatrixBot, matrix_bot_id) if matrix_bot: target_config["homeserver_url"] = matrix_bot.homeserver_url target_config["access_token"] = matrix_bot.access_token link_data.append({ "target_type": target.type, "target_config": target_config, "receivers": receivers, "tracking_config": tracking_config, "template_config": template_config, "template_slots": template_slots, }) # Snapshot the data we need provider_type = provider.type provider_config = dict(provider.config) provider_name = provider.name collection_ids = list(tracker.collection_ids or []) # Now create aiohttp session and poll events: list[ServiceEvent] = [] new_state: dict[str, Any] = {} if provider_type == "immich": from notify_bridge_core.providers.immich import ImmichServiceProvider async with aiohttp.ClientSession() as http_session: immich = ImmichServiceProvider( http_session, provider_config.get("url", ""), provider_config.get("api_key", ""), provider_config.get("external_domain"), provider_name, ) connected = await immich.connect() if not connected: return {"status": "error", "reason": "failed to connect to provider"} events, new_state = await immich.poll(collection_ids, state_dict) elif provider_type == "gitea": # Gitea is webhook-based — events arrive via /api/webhooks/gitea endpoint. # The scheduler still calls check_tracker but there's nothing to poll. return {"status": "ok", "events_detected": 0, "collections_checked": 0} else: return {"status": "error", "reason": f"unsupported provider type: {provider_type}"} # Save updated state and log events async with AsyncSession(engine) as session: for cid, cstate in new_state.items(): existing = None for s in states: if s.collection_id == cid: existing = s break if existing: existing.asset_ids = cstate.get("asset_ids", []) existing.pending_asset_ids = cstate.get("pending_asset_ids", []) existing.collection_name = cstate.get("name", "") existing.shared = cstate.get("shared", False) session.add(existing) else: new_ts = NotificationTrackerState( tracker_id=tracker_id, collection_id=cid, collection_name=cstate.get("name", ""), shared=cstate.get("shared", False), asset_ids=cstate.get("asset_ids", []), pending_asset_ids=cstate.get("pending_asset_ids", []), ) session.add(new_ts) for event in events: assets_count = event.added_count or event.removed_count or 0 log = EventLog( tracker_id=tracker_id, tracker_name=tracker.name, provider_id=provider.id, provider_name=provider_name, event_type=event.event_type.value, collection_id=event.collection_id, collection_name=event.collection_name, assets_count=assets_count, details={ "added_count": event.added_count, "removed_count": event.removed_count, "provider_type": event.provider_type.value, }, ) session.add(log) await session.commit() # Dispatch notifications — per-link config resolution # Filter out empty events (e.g. assets_added with 0 added) events = [ e for e in events if not (e.event_type.value == "assets_added" and e.added_count == 0) and not (e.event_type.value == "assets_removed" and e.removed_count == 0) ] _LOGGER.info( "Tracker %d: %d events after filter, %d links", tracker_id, len(events), len(link_data), ) if events and link_data: url_cache, asset_cache = await _get_telegram_caches() dispatcher = NotificationDispatcher(url_cache=url_cache, asset_cache=asset_cache) for event in events: _LOGGER.info( "Dispatching event %s for %s (added=%d removed=%d)", event.event_type.value, event.collection_name, event.added_count, event.removed_count, ) target_configs = [] for ld in link_data: # Apply per-link event filtering from tracking config tc = ld["tracking_config"] if tc and not _event_allowed_by_config(event, tc): _LOGGER.info(" Skipped by tracking config filter") continue tmpl = ld["template_config"] target_configs.append(TargetConfig( type=ld["target_type"], config=ld["target_config"], template_slots=ld["template_slots"], date_format=tmpl.date_format if tmpl else "%d.%m.%Y, %H:%M UTC", date_only_format=tmpl.date_only_format if tmpl and tmpl.date_only_format else "%d.%m.%Y", provider_api_key=provider_config.get("api_key"), provider_internal_url=provider_config.get("url", ""), provider_external_url=provider_config.get("external_domain", ""), receivers=ld["receivers"], )) if target_configs: results = await dispatcher.dispatch(event, target_configs) for r in results: if r.get("success"): _LOGGER.info(" Notification sent successfully") else: _LOGGER.error(" Notification failed: %s", r.get("error", "unknown")) return { "status": "ok", "events_detected": len(events), "collections_checked": len(collection_ids), }