"""Notify Bridge Server — FastAPI application entry point.""" import logging from contextlib import asynccontextmanager from fastapi import FastAPI # Ensure app-level loggers are visible logging.basicConfig(level=logging.INFO) logging.getLogger("notify_bridge_server").setLevel(logging.DEBUG) logging.getLogger("notify_bridge_core").setLevel(logging.DEBUG) from .database.engine import init_db from .database.models import * # noqa: F401,F403 — ensure all models registered from .auth.routes import router as auth_router from .api.providers import router as providers_router from .api.notification_trackers import router as notification_trackers_router from .api.notification_tracker_targets import router as notification_tracker_targets_router from .api.tracking_configs import router as tracking_configs_router from .api.template_configs import router as template_configs_router from .api.targets import router as targets_router from .api.target_receivers import router as target_receivers_router from .api.telegram_bots import router as telegram_bots_router from .api.email_bots import router as email_bots_router from .api.matrix_bots import router as matrix_bots_router from .api.users import router as users_router from .api.status import router as status_router from .api.template_vars import router as template_vars_router from .api.app_settings import router as app_settings_router from .api.command_configs import router as command_configs_router from .api.command_trackers import router as command_trackers_router from .api.command_template_configs import router as command_template_configs_router from .commands.webhook import router as webhook_router, set_webhook_secret @asynccontextmanager async def lifespan(app: FastAPI): await init_db() # Run data migrations (idempotent) from .database.engine import get_engine from .database.migrations import migrate_schema, migrate_tracker_targets, migrate_entity_refactor, migrate_template_slots, migrate_target_receivers, migrate_template_locale, migrate_receivers_from_config, migrate_command_slot_locale engine = get_engine() await migrate_schema(engine) await migrate_tracker_targets(engine) await migrate_entity_refactor(engine) await migrate_template_slots(engine) await migrate_target_receivers(engine) await migrate_template_locale(engine) await migrate_receivers_from_config(engine) await migrate_command_slot_locale(engine) await _seed_default_templates() await _seed_default_command_templates() # Configure webhook secret from DB setting (falls back to env var) from sqlmodel.ext.asyncio.session import AsyncSession as _AS from .api.app_settings import get_setting as _get_setting async with _AS(engine) as _session: _secret = await _get_setting(_session, "telegram_webhook_secret") set_webhook_secret(_secret or None) from .services.scheduler import start_scheduler, get_scheduler await start_scheduler() yield # Graceful shutdown scheduler = get_scheduler() if scheduler.running: scheduler.shutdown() app = FastAPI(title="Notify Bridge", version="0.1.0", lifespan=lifespan) # Register routes — static paths before parameterized app.include_router(auth_router) app.include_router(template_vars_router) app.include_router(providers_router) app.include_router(notification_trackers_router) app.include_router(notification_tracker_targets_router) app.include_router(tracking_configs_router) app.include_router(template_configs_router) app.include_router(targets_router) app.include_router(target_receivers_router) app.include_router(telegram_bots_router) app.include_router(email_bots_router) app.include_router(matrix_bots_router) app.include_router(users_router) app.include_router(status_router) app.include_router(app_settings_router) app.include_router(command_configs_router) app.include_router(command_trackers_router) app.include_router(command_template_configs_router) app.include_router(webhook_router) @app.get("/api/health") async def health(): return {"status": "ok"} async def _seed_default_templates(): """Seed or update default (system-owned) templates on startup. Uses TemplateSlot child rows for template content. """ from sqlalchemy import text from sqlmodel import func, select from sqlmodel.ext.asyncio.session import AsyncSession from .database.engine import get_engine from .database.models import TemplateConfig, TemplateSlot from notify_bridge_core.templates.defaults import load_default_templates engine = get_engine() async with AsyncSession(engine) as session: # Find existing system-owned templates result = await session.exec( select(TemplateConfig).where(TemplateConfig.user_id == 0) ) system_configs = result.all() existing_locales = { (c.locale if c.locale else ("ru" if "(RU)" in c.name else "en")): c for c in system_configs } for locale in ("en", "ru"): slots = load_default_templates(locale) if not slots: continue if locale not in existing_locales: # Create missing system template via raw SQL # (legacy NOT NULL columns may still exist in the DB) name = f"Default ({locale.upper()})" desc = f"Default Immich templates ({locale.upper()})" # Get column names to build INSERT with defaults for legacy cols col_info = (await session.execute( text("PRAGMA table_info(template_config)") )).fetchall() col_names = [c[1] for c in col_info if c[1] != "id"] values = {} from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() for col in col_names: if col == "user_id": values[col] = 0 elif col == "provider_type": values[col] = "immich" elif col == "name": values[col] = name elif col == "description": values[col] = desc elif col == "created_at": values[col] = now elif col == "date_format": values[col] = "%d.%m.%Y, %H:%M UTC" elif col == "date_only_format": values[col] = "%d.%m.%Y" elif col == "locale": values[col] = locale else: values[col] = "" # empty string for legacy columns cols_str = ", ".join(values.keys()) placeholders = ", ".join(f":{k}" for k in values.keys()) await session.execute( text(f"INSERT INTO template_config ({cols_str}) VALUES ({placeholders})"), values, ) # Get the inserted ID row = (await session.execute(text("SELECT last_insert_rowid()"))).scalar() config_id = row for slot_name, template_text in slots.items(): session.add(TemplateSlot( config_id=config_id, slot_name=slot_name, template=template_text, )) else: # Update existing system template slots config = existing_locales[locale] for slot_name, template_text in slots.items(): slot_result = await session.exec( select(TemplateSlot).where( TemplateSlot.config_id == config.id, TemplateSlot.slot_name == slot_name, ) ) existing = slot_result.first() if existing: existing.template = template_text session.add(existing) else: session.add(TemplateSlot( config_id=config.id, slot_name=slot_name, template=template_text, )) await session.commit() async def _seed_default_command_templates(): """Seed or update default command response templates on startup. Creates a single 'Default Commands' config with locale-aware slots (each slot has an EN and RU version stored as separate rows). """ from sqlmodel import func, select from sqlmodel.ext.asyncio.session import AsyncSession from .database.engine import get_engine from .database.models import CommandTemplateConfig, CommandTemplateSlot from notify_bridge_core.templates.command_defaults import load_default_command_templates engine = get_engine() async with AsyncSession(engine) as session: # Find or create the system-owned config result = await session.exec( select(CommandTemplateConfig).where(CommandTemplateConfig.user_id == 0) ) system_configs = result.all() if not system_configs: # First startup — create single merged config config = CommandTemplateConfig( user_id=0, provider_type="immich", name="Default Commands", description="Default Immich command templates", ) session.add(config) await session.flush() else: config = system_configs[0] # Upsert slots for each locale for locale in ("en", "ru"): slots = load_default_command_templates(locale) if not slots: continue for slot_name, template_text in slots.items(): slot_result = await session.exec( select(CommandTemplateSlot).where( CommandTemplateSlot.config_id == config.id, CommandTemplateSlot.slot_name == slot_name, CommandTemplateSlot.locale == locale, ) ) existing = slot_result.first() if existing: existing.template = template_text session.add(existing) else: session.add(CommandTemplateSlot( config_id=config.id, slot_name=slot_name, locale=locale, template=template_text, )) await session.commit() def run(): import uvicorn uvicorn.run(app, host="0.0.0.0", port=8420)