Files
notify-bridge/packages/server/src/notify_bridge_server/main.py
T
alexei.dolgolyov 7cbb02b1ef
Build and Test / test-backend (push) Successful in 2m38s
Build and Test / test-frontend (push) Successful in 9m44s
Build and Test / build-image (push) Failing after 17m9s
feat(db): pre-migration SQLite snapshots via VACUUM INTO
Take a consistent, atomic copy of the DB at lifespan startup BEFORE
migrations run, so a botched future upgrade is recoverable by restoring
a single file instead of a data-loss incident.

Uses SQLite's VACUUM INTO — safe under WAL, cannot tear against
concurrent writes. Best-effort: failures are logged, never raised —
the main DB remains the source of truth.

Configurable via NOTIFY_BRIDGE_PRE_MIGRATE_SNAPSHOT_KEEP (default 5;
0 disables). Snapshots land in ``data_dir/backups/pre-migrate-<ts>.db``
and the N oldest are pruned each boot.
2026-04-23 19:53:15 +03:00

299 lines
12 KiB
Python

"""Notify Bridge Server — FastAPI application entry point."""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from .config import settings as _log_cfg
from .logging_setup import setup_logging
# Boot logging from env-based config. DB-backed AppSetting rows (``log_level`` /
# ``log_levels`` / ``log_format``) override this after migrations — see the
# lifespan block below.
setup_logging(
level="DEBUG" if _log_cfg.debug else _log_cfg.log_level,
fmt=_log_cfg.log_format,
per_module_levels=_log_cfg.log_levels,
)
_LOGGER = logging.getLogger(__name__)
from .database.engine import init_db
from .database.models import * # noqa: F401,F403 — ensure all models registered
from .auth.routes import router as auth_router
from .api.providers import router as providers_router
from .api.notification_trackers import router as notification_trackers_router
from .api.notification_tracker_targets import router as notification_tracker_targets_router
from .api.tracking_configs import router as tracking_configs_router
from .api.template_configs import router as template_configs_router
from .api.targets import router as targets_router
from .api.target_receivers import router as target_receivers_router
from .api.telegram_bots import router as telegram_bots_router
from .api.email_bots import router as email_bots_router
from .api.matrix_bots import router as matrix_bots_router
from .api.users import router as users_router
from .api.status import router as status_router
from .api.template_vars import router as template_vars_router
from .api.app_settings import router as app_settings_router
from .api.command_configs import router as command_configs_router
from .api.command_trackers import router as command_trackers_router
from .api.command_template_configs import router as command_template_configs_router
from .api.actions import router as actions_router
from .api.action_rules import router as action_rules_router
from .api.action_types import router as action_types_router
from .commands.webhook import router as webhook_router, set_webhook_secret
from .api.webhooks import router as webhooks_router
from .api.webhook_logs import router as webhook_logs_router
from .api.backup import router as backup_router
# Readiness flag — flipped to True once the scheduler has started and the
# app is fully initialized. Exposed via /api/ready for orchestrators.
_READY: bool = False
@asynccontextmanager
async def lifespan(app: FastAPI):
global _READY
await init_db()
# Run data migrations (idempotent)
from .database.engine import get_engine
from .database.migrations import (
migrate_schema,
migrate_tracker_targets,
migrate_entity_refactor,
migrate_template_slots,
migrate_target_receivers,
migrate_template_locale,
migrate_receivers_from_config,
migrate_command_slot_locale,
migrate_notification_slot_locale,
migrate_user_token_version,
migrate_performance_indexes,
migrate_schema_version,
)
from .database.snapshot import snapshot_and_prune
engine = get_engine()
# Take a consistent DB snapshot BEFORE migrations run, so operators can
# roll back a bad upgrade by restoring one file. Best-effort — failures
# are logged, not raised.
await snapshot_and_prune(
engine,
_log_cfg.data_dir / "backups",
keep=_log_cfg.pre_migrate_snapshot_keep,
)
await migrate_schema(engine)
await migrate_tracker_targets(engine)
await migrate_entity_refactor(engine)
await migrate_template_slots(engine)
await migrate_target_receivers(engine)
await migrate_template_locale(engine)
await migrate_receivers_from_config(engine)
await migrate_command_slot_locale(engine)
await migrate_notification_slot_locale(engine)
await migrate_user_token_version(engine)
await migrate_performance_indexes(engine)
await migrate_schema_version(engine)
from .database.seeds import seed_all
await seed_all()
# Apply DB-backed logging settings (override env-based boot config).
# log_format still needs a restart — changing it means swapping the
# handler formatter entirely.
try:
from sqlmodel.ext.asyncio.session import AsyncSession as _AS_log
from .api.app_settings import get_setting as _get_log_setting
from .logging_setup import apply_log_levels
async with _AS_log(engine) as _log_session:
db_level = await _get_log_setting(_log_session, "log_level")
db_levels = await _get_log_setting(_log_session, "log_levels")
apply_log_levels(level=db_level or None, per_module_levels=db_levels)
_LOGGER.info(
"Logging initialized: level=%s overrides=%r format=%s",
db_level or _log_cfg.log_level, db_levels or _log_cfg.log_levels,
_log_cfg.log_format,
)
except Exception: # pragma: no cover — never let logging setup abort boot
_LOGGER.exception("Failed to apply DB-backed log settings; keeping env-based levels")
# Apply any pending restore staged via /api/backup/prepare-restore
from .services.pending_restore import apply_pending_restore_if_any
await apply_pending_restore_if_any()
# Configure webhook secret from DB setting (falls back to env var)
from sqlmodel.ext.asyncio.session import AsyncSession as _AS
from .api.app_settings import get_setting as _get_setting
async with _AS(engine) as _session:
_secret = await _get_setting(_session, "telegram_webhook_secret")
set_webhook_secret(_secret or None)
from .services.scheduler import start_scheduler, get_scheduler
await start_scheduler()
_READY = True
yield
# Graceful shutdown — stop the scheduler FIRST so in-flight jobs finish
# before we close their HTTP session. Then close the shared session and
# dispose the DB engine.
_READY = False
scheduler = get_scheduler()
if scheduler.running:
scheduler.shutdown(wait=True)
from .services.http_session import close_http_session
await close_http_session()
from .database.engine import dispose_engine
await dispose_engine()
try:
from importlib.metadata import version as _pkg_version
_APP_VERSION = _pkg_version("notify-bridge-server")
except Exception: # pragma: no cover — editable install edge cases
_APP_VERSION = "0.0.0+unknown"
app = FastAPI(title="Notify Bridge", version=_APP_VERSION, lifespan=lifespan)
# --- Security headers ---
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest
from starlette.responses import Response as StarletteResponse
_CSP = (
"default-src 'self'; "
"img-src 'self' data: blob: https:; "
"style-src 'self' 'unsafe-inline'; "
"script-src 'self'; "
"connect-src 'self'; "
"font-src 'self' data:; "
"base-uri 'self'; "
"form-action 'self'; "
"frame-ancestors 'none'"
)
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: StarletteRequest, call_next):
response: StarletteResponse = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers.setdefault("Content-Security-Policy", _CSP)
# HSTS only makes sense over HTTPS; set when the edge terminates TLS
# and forwards X-Forwarded-Proto=https.
if request.headers.get("x-forwarded-proto") == "https":
response.headers.setdefault(
"Strict-Transport-Security",
"max-age=31536000; includeSubDomains",
)
return response
app.add_middleware(SecurityHeadersMiddleware)
# --- Rate limiting ---
from .auth.routes import limiter
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# --- CORS ---
from .config import settings as _cfg
_origins = [o.strip() for o in _cfg.cors_allowed_origins.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register routes — static paths before parameterized
app.include_router(auth_router)
app.include_router(template_vars_router)
app.include_router(providers_router)
app.include_router(notification_trackers_router)
app.include_router(notification_tracker_targets_router)
app.include_router(tracking_configs_router)
app.include_router(template_configs_router)
app.include_router(targets_router)
app.include_router(target_receivers_router)
app.include_router(telegram_bots_router)
app.include_router(email_bots_router)
app.include_router(matrix_bots_router)
app.include_router(users_router)
app.include_router(status_router)
app.include_router(app_settings_router)
app.include_router(action_types_router)
app.include_router(action_rules_router)
app.include_router(actions_router)
app.include_router(command_configs_router)
app.include_router(command_trackers_router)
app.include_router(command_template_configs_router)
app.include_router(webhook_router)
app.include_router(webhooks_router)
app.include_router(webhook_logs_router)
app.include_router(backup_router)
@app.get("/api/health")
async def health():
"""Liveness: process is up and responding. Always returns 200 once the
ASGI app has started. Keep this endpoint anonymous and trivially cheap."""
return {"status": "ok", "version": _APP_VERSION}
@app.get("/api/ready")
async def ready():
"""Readiness: migrations and scheduler have started, app can serve traffic.
Returns 503 until the lifespan startup sequence has completed. Use this
for orchestrator readiness probes (Docker, Kubernetes).
"""
if not _READY:
from starlette.responses import JSONResponse
return JSONResponse({"status": "starting"}, status_code=503)
return {"status": "ready", "version": _APP_VERSION}
# --- Serve frontend static files (production) ---
# Must come AFTER all API routes so /api/* takes priority
from pathlib import Path
if _cfg.static_dir and Path(_cfg.static_dir).is_dir():
from fastapi.staticfiles import StaticFiles
from starlette.responses import FileResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
_static_dir = Path(_cfg.static_dir)
class SPAStaticFiles(StaticFiles):
"""StaticFiles that falls back to index.html for SvelteKit client-side routes.
Unknown paths return index.html so that deep links like /settings
hydrate the SPA, while /api/* and real asset 404s behave normally.
"""
async def get_response(self, path: str, scope):
try:
return await super().get_response(path, scope)
except StarletteHTTPException as exc:
if exc.status_code == 404 and not path.startswith("api/"):
return FileResponse(_static_dir / "index.html")
raise
app.mount("/", SPAStaticFiles(directory=_cfg.static_dir, html=True), name="frontend")
def run():
import uvicorn
uvicorn.run(
app,
host=_cfg.host,
port=_cfg.port,
proxy_headers=True,
forwarded_allow_ips=_cfg.forwarded_allow_ips or "127.0.0.1",
timeout_graceful_shutdown=_cfg.graceful_shutdown_seconds,
access_log=not _cfg.debug,
)