"""Notify Bridge Server — FastAPI application entry point.""" import logging import uuid from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from slowapi import _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from starlette.requests import Request as StarletteRequest from starlette.responses import Response as StarletteResponse from notify_bridge_core.log_context import bind_log_context from .config import settings as _log_cfg from .logging_setup import setup_logging # Boot logging from env-based config. DB-backed AppSetting rows (``log_level`` / # ``log_levels`` / ``log_format``) override this after migrations — see the # lifespan block below. setup_logging( level="DEBUG" if _log_cfg.debug else _log_cfg.log_level, fmt=_log_cfg.log_format, per_module_levels=_log_cfg.log_levels, ) _LOGGER = logging.getLogger(__name__) from .database.engine import init_db from .database.models import * # noqa: F401,F403 — ensure all models registered from .auth.routes import router as auth_router from .api.providers import router as providers_router from .api.notification_trackers import router as notification_trackers_router from .api.notification_tracker_targets import router as notification_tracker_targets_router from .api.tracking_configs import router as tracking_configs_router from .api.template_configs import router as template_configs_router from .api.targets import router as targets_router from .api.target_receivers import router as target_receivers_router from .api.telegram_bots import router as telegram_bots_router from .api.email_bots import router as email_bots_router from .api.matrix_bots import router as matrix_bots_router from .api.users import router as users_router from .api.status import router as status_router from .api.template_vars import router as template_vars_router from .api.app_settings import router as app_settings_router from .api.command_configs import router as command_configs_router from .api.command_trackers import router as command_trackers_router from .api.command_template_configs import router as command_template_configs_router from .api.actions import router as actions_router from .api.action_rules import router as action_rules_router from .api.action_types import router as action_types_router from .commands.webhook import router as webhook_router, set_webhook_secret from .api.webhooks import router as webhooks_router from .api.webhook_logs import router as webhook_logs_router from .api.backup import router as backup_router from .api.metrics import router as metrics_router # Readiness flag — flipped to True once the scheduler has started and the # app is fully initialized. Exposed via /api/ready for orchestrators. _READY: bool = False @asynccontextmanager async def lifespan(app: FastAPI): global _READY await init_db() # Run data migrations (idempotent) from .database.engine import get_engine from .database.migrations import ( migrate_schema, migrate_tracker_targets, migrate_entity_refactor, migrate_template_slots, migrate_target_receivers, migrate_template_locale, migrate_receivers_from_config, migrate_command_slot_locale, migrate_notification_slot_locale, migrate_user_token_version, migrate_performance_indexes, migrate_chat_action_to_column, migrate_deferred_dispatch_event_log_fk, migrate_deferred_dispatch_unique_pending, migrate_uniqueness_constraints, migrate_eventlog_provider_fk, migrate_schema_version, ) from .database.snapshot import snapshot_and_prune engine = get_engine() # Take a consistent DB snapshot BEFORE migrations run, so operators can # roll back a bad upgrade by restoring one file. Best-effort — failures # are logged, not raised. await snapshot_and_prune( engine, _log_cfg.data_dir / "backups", keep=_log_cfg.pre_migrate_snapshot_keep, ) await migrate_schema(engine) await migrate_tracker_targets(engine) await migrate_entity_refactor(engine) await migrate_template_slots(engine) await migrate_target_receivers(engine) await migrate_template_locale(engine) await migrate_receivers_from_config(engine) await migrate_command_slot_locale(engine) await migrate_notification_slot_locale(engine) await migrate_user_token_version(engine) await migrate_performance_indexes(engine) await migrate_chat_action_to_column(engine) # FK-rebuild MUST run before the unique-index creation: drop+create_all # of deferred_dispatch wipes its indexes; the next migration re-establishes # the partial unique index. await migrate_deferred_dispatch_event_log_fk(engine) await migrate_deferred_dispatch_unique_pending(engine) # Backfill missing UNIQUE indexes on webhook hot paths (deduping any # existing duplicates). Runs after performance_indexes so non-unique # support indexes are already in place. await migrate_uniqueness_constraints(engine) # Document EventLog.provider_id FK strategy on existing tables (no-op # on SQLite besides the log line; new tables get the FK from create_all). await migrate_eventlog_provider_fk(engine) await migrate_schema_version(engine) from .database.seeds import seed_all await seed_all() # Apply DB-backed logging settings (override env-based boot config). # log_format still needs a restart — changing it means swapping the # handler formatter entirely. try: from sqlmodel.ext.asyncio.session import AsyncSession as _AS_log from .api.app_settings import get_setting as _get_log_setting from .logging_setup import apply_log_levels async with _AS_log(engine) as _log_session: db_level = await _get_log_setting(_log_session, "log_level") db_levels = await _get_log_setting(_log_session, "log_levels") apply_log_levels(level=db_level or None, per_module_levels=db_levels) _LOGGER.info( "Logging initialized: level=%s overrides=%r format=%s", db_level or _log_cfg.log_level, db_levels or _log_cfg.log_levels, _log_cfg.log_format, ) except Exception: # pragma: no cover — never let logging setup abort boot _LOGGER.exception("Failed to apply DB-backed log settings; keeping env-based levels") # Apply any pending restore staged via /api/backup/prepare-restore from .services.pending_restore import apply_pending_restore_if_any await apply_pending_restore_if_any() # Configure webhook secret from DB setting (falls back to env var) from sqlmodel.ext.asyncio.session import AsyncSession as _AS from .api.app_settings import get_setting as _get_setting async with _AS(engine) as _session: _secret = await _get_setting(_session, "telegram_webhook_secret") set_webhook_secret(_secret or None) from .services.scheduler import start_scheduler, get_scheduler await start_scheduler() # Phase 1 of the Home Assistant provider: subscription-based ingest runs # outside the polling scheduler. ``start_all`` spawns one supervisor task # per enabled HA provider row. No-op when no HA providers are configured. from .services.ha_subscription import start_all as start_ha_subscriptions await start_ha_subscriptions() _READY = True yield # Graceful shutdown — cancel HA supervisors FIRST so they release their # WS connections before the shared HTTP session is closed. Then stop the # polling scheduler. Order matters: scheduler.shutdown(wait=True) drains # in-flight jobs that may also use the shared session. _READY = False from .services.ha_subscription import stop_all as stop_ha_subscriptions await stop_ha_subscriptions() # Restore the DB-configured baseline level for any temporary DEBUG # overrides before the engine is disposed — so even a forced restart # leaves the world tidy and doesn't leak DEBUG state into the next # process (which would also be wiped by setup_logging() at boot, but # being explicit about shutdown is cheaper than relying on a re-init). from .services.diagnostic_mode import revert_all as revert_diagnostics try: await revert_diagnostics() except Exception: # pragma: no cover — never block shutdown on this. _LOGGER.exception("Failed to revert diagnostic overrides during shutdown") scheduler = get_scheduler() if scheduler.running: scheduler.shutdown(wait=True) from .services.http_session import close_http_session await close_http_session() from .database.engine import dispose_engine await dispose_engine() from .version import resolve_version as _resolve_version _APP_VERSION = _resolve_version() app = FastAPI(title="Notify Bridge", version=_APP_VERSION, lifespan=lifespan) # --- Security headers --- # Bounded character set for accepted inbound X-Request-Id values. Anything # outside this is replaced with a server-generated id so a malicious header # can't smuggle CR/LF into log lines or break grep-by-field parsing. # ``:`` is intentionally excluded so an inbound value can't masquerade as a # server-minted ``disp:`` / ``req:`` id and confuse operator greps. _REQUEST_ID_MAX_LEN = 64 _REQUEST_ID_ALLOWED = set( "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_" ) def _normalize_request_id(raw: str | None) -> str: if not raw: return f"req:{uuid.uuid4().hex[:12]}" raw = raw.strip() if not raw or len(raw) > _REQUEST_ID_MAX_LEN: return f"req:{uuid.uuid4().hex[:12]}" if not all(c in _REQUEST_ID_ALLOWED for c in raw): return f"req:{uuid.uuid4().hex[:12]}" return raw class RequestContextMiddleware(BaseHTTPMiddleware): """Bind a per-request ``request_id`` ContextVar and echo it back. Reads ``X-Request-Id`` from the inbound request (so an upstream proxy with its own correlation system can propagate its id), falling back to a short random ``req:<12 hex>`` value. Always sets the same id on the response ``X-Request-Id`` header so the SPA can surface it for operator-friendly bug reports. Bound via :func:`bind_log_context` so the id appears on every log line emitted during request handling (``[req=...]``) and is picked up by :func:`notify_bridge_core.log_context.enrich_details_with_correlation` when an ``EventLog`` row is written during the same request. """ async def dispatch( self, request: StarletteRequest, call_next: RequestResponseEndpoint, ) -> StarletteResponse: req_id = _normalize_request_id(request.headers.get("x-request-id")) with bind_log_context(request_id=req_id): response: StarletteResponse = await call_next(request) response.headers["X-Request-Id"] = req_id return response _CSP = ( "default-src 'self'; " "img-src 'self' data: blob: https:; " "style-src 'self' 'unsafe-inline'; " # SvelteKit's static adapter emits an inline bootstrap