"""Data migrations for schema changes. Handles converting legacy JSON-array relationships to proper junction tables, and the Phase 1 entity refactor (tracker → notification_tracker, etc.). """ import json import logging from typing import Any from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncEngine logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _IDENT_RE = __import__("re").compile(r"^[A-Za-z_][A-Za-z0-9_]*$") def _assert_ident(ident: str, kind: str = "identifier") -> str: """Guard against SQL injection in dynamically interpolated identifiers. All table/column names flow through here before being embedded into f-strings, so attacker-controlled values cannot break out even if they reach this layer. """ if not isinstance(ident, str) or not _IDENT_RE.match(ident): raise ValueError(f"Unsafe {kind}: {ident!r}") return ident async def _has_column(conn, table: str, column: str) -> bool: """Check if a column exists in a SQLite table.""" _assert_ident(table, "table") cols = await conn.run_sync( lambda sync_conn: [ row[1] for row in sync_conn.execute( text(f"PRAGMA table_info('{table}')") ).fetchall() ] ) return column in cols async def _has_table(conn, table: str) -> bool: """Check if a table exists in the SQLite database.""" result = await conn.run_sync( lambda sync_conn: sync_conn.execute( text( "SELECT name FROM sqlite_master " "WHERE type='table' AND name=:name" ), {"name": table}, ).fetchone() ) return result is not None # --------------------------------------------------------------------------- # Legacy schema migrations (pre-Phase 1) # --------------------------------------------------------------------------- async def migrate_schema(engine: AsyncEngine) -> None: """Add missing columns to existing tables (SQLite ALTER TABLE ADD COLUMN).""" async with engine.begin() as conn: # --- Tracker table (may still be named "tracker" or already renamed) --- tracker_table = "notification_tracker" if await _has_table(conn, "notification_tracker") else "tracker" if await _has_table(conn, tracker_table): # NULL default = adaptive polling disabled for existing trackers. # Operators who want the old back-off behavior can set a positive # value per tracker from the UI. if not await _has_column(conn, tracker_table, "adaptive_max_skip"): await conn.execute( text(f"ALTER TABLE {tracker_table} ADD COLUMN adaptive_max_skip INTEGER") ) logger.info("Added adaptive_max_skip column to %s table", tracker_table) # Add enriched fields to event_log if missing if await _has_table(conn, "event_log"): for col, sql in [ ("tracker_name", "ALTER TABLE event_log ADD COLUMN tracker_name TEXT DEFAULT ''"), ("provider_id", "ALTER TABLE event_log ADD COLUMN provider_id INTEGER"), ("provider_name", "ALTER TABLE event_log ADD COLUMN provider_name TEXT DEFAULT ''"), ("assets_count", "ALTER TABLE event_log ADD COLUMN assets_count INTEGER DEFAULT 0"), ("user_id", "ALTER TABLE event_log ADD COLUMN user_id INTEGER"), ("action_id", "ALTER TABLE event_log ADD COLUMN action_id INTEGER"), ("action_name", "ALTER TABLE event_log ADD COLUMN action_name TEXT DEFAULT ''"), ]: if not await _has_column(conn, "event_log", col): await conn.execute(text(sql)) logger.info("Added %s column to event_log table", col) # Explicit indexes on the dashboard-query columns. SQLModel's # ``index=True`` is emitted by ``create_all`` on *new* installs, # but ALTER TABLE ADD COLUMN doesn't create them on upgrades — # so the first boot after upgrade would leave these unindexed # and status.py ``WHERE user_id=...`` would table-scan. The # indexes are redundant-but-safe once create_all also runs. for idx_name, col in [ ("ix_event_log_user_id", "user_id"), ("ix_event_log_action_id", "action_id"), ("ix_event_log_provider_id", "provider_id"), ]: await conn.execute( text(f"CREATE INDEX IF NOT EXISTS {idx_name} ON event_log ({col})") ) # Backfill user_id from notification_tracker for legacy rows. # Safe to run repeatedly: only touches rows where user_id is still NULL. await conn.execute(text(""" UPDATE event_log SET user_id = ( SELECT user_id FROM notification_tracker WHERE notification_tracker.id = event_log.notification_tracker_id ) WHERE event_log.user_id IS NULL AND event_log.notification_tracker_id IS NOT NULL """)) # Add commands_config to telegram_bot if missing if await _has_table(conn, "telegram_bot"): if not await _has_column(conn, "telegram_bot", "commands_config"): await conn.execute( text("ALTER TABLE telegram_bot ADD COLUMN commands_config TEXT DEFAULT '{}'") ) logger.info("Added commands_config column to telegram_bot table") # Add webhook_path_id to telegram_bot if missing if not await _has_column(conn, "telegram_bot", "webhook_path_id"): await conn.execute( text("ALTER TABLE telegram_bot ADD COLUMN webhook_path_id TEXT DEFAULT ''") ) logger.info("Added webhook_path_id column to telegram_bot table") # Backfill existing bots with unique IDs import uuid bots = (await conn.execute(text("SELECT id FROM telegram_bot"))).fetchall() for bot in bots: await conn.execute( text("UPDATE telegram_bot SET webhook_path_id = :wid WHERE id = :bid"), {"wid": uuid.uuid4().hex, "bid": bot[0]}, ) if bots: logger.info("Backfilled webhook_path_id for %d existing bots", len(bots)) # Add update_mode to telegram_bot if missing. # Existing bots pre-date this feature and were implicitly polling; # preserve that behavior. New bots default to "none" via the # SQLModel field default on fresh schemas. if not await _has_column(conn, "telegram_bot", "update_mode"): await conn.execute( text("ALTER TABLE telegram_bot ADD COLUMN update_mode TEXT DEFAULT 'polling'") ) logger.info("Added update_mode column to telegram_bot table") # Add command_template_config_id to command_config if missing if await _has_table(conn, "command_config"): if not await _has_column(conn, "command_config", "command_template_config_id"): await conn.execute( text("ALTER TABLE command_config ADD COLUMN command_template_config_id INTEGER") ) logger.info("Added command_template_config_id column to command_config table") # Add allowed_album_ids (per-chat album scope) to command_tracker_listener if await _has_table(conn, "command_tracker_listener"): if not await _has_column(conn, "command_tracker_listener", "allowed_album_ids"): await conn.execute( text("ALTER TABLE command_tracker_listener ADD COLUMN allowed_album_ids TEXT") ) logger.info("Added allowed_album_ids column to command_tracker_listener table") # Add date_only_format to template_config if missing if await _has_table(conn, "template_config"): if not await _has_column(conn, "template_config", "date_only_format"): await conn.execute( text("ALTER TABLE template_config ADD COLUMN date_only_format TEXT DEFAULT '%d.%m.%Y'") ) logger.info("Added date_only_format column to template_config table") # Add memory_source to tracking_config if missing if await _has_table(conn, "tracking_config"): if not await _has_column(conn, "tracking_config", "memory_source"): await conn.execute( text("ALTER TABLE tracking_config ADD COLUMN memory_source TEXT DEFAULT 'albums'") ) logger.info("Added memory_source column to tracking_config table") # Add filters JSON column to notification_tracker if missing if await _has_table(conn, tracker_table): if not await _has_column(conn, tracker_table, "filters"): await conn.execute( text(f"ALTER TABLE {tracker_table} ADD COLUMN filters TEXT DEFAULT '{{}}'") ) logger.info("Added filters column to %s table", tracker_table) # Drop legacy batch_duration column from notification_tracker. # The field was removed from the SQLModel class but the column still # exists as NOT NULL in older DBs, so INSERTs from the new code fail # with "NOT NULL constraint failed: notification_tracker.batch_duration". if await _has_table(conn, tracker_table): if await _has_column(conn, tracker_table, "batch_duration"): _assert_ident(tracker_table, "table") await conn.execute( text(f"ALTER TABLE {tracker_table} DROP COLUMN batch_duration") ) logger.info( "Dropped legacy batch_duration column from %s table", tracker_table, ) # Add Gitea tracking flags to tracking_config if missing if await _has_table(conn, "tracking_config"): gitea_flags = [ ("track_push", "INTEGER DEFAULT 1"), ("track_issue_opened", "INTEGER DEFAULT 1"), ("track_issue_closed", "INTEGER DEFAULT 1"), ("track_issue_commented", "INTEGER DEFAULT 0"), ("track_pr_opened", "INTEGER DEFAULT 1"), ("track_pr_closed", "INTEGER DEFAULT 1"), ("track_pr_merged", "INTEGER DEFAULT 1"), ("track_pr_commented", "INTEGER DEFAULT 0"), ("track_release_published", "INTEGER DEFAULT 1"), ("track_scheduled_message", "INTEGER DEFAULT 1"), ] for col_name, col_type in gitea_flags: if not await _has_column(conn, "tracking_config", col_name): await conn.execute( text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} {col_type}") ) logger.info("Added %s column to tracking_config table", col_name) # Add Planka tracking flags to tracking_config if missing if await _has_table(conn, "tracking_config"): planka_flags = [ ("track_card_created", "INTEGER DEFAULT 1"), ("track_card_updated", "INTEGER DEFAULT 0"), ("track_card_moved", "INTEGER DEFAULT 1"), ("track_card_deleted", "INTEGER DEFAULT 0"), ("track_card_commented", "INTEGER DEFAULT 1"), ("track_comment_updated", "INTEGER DEFAULT 0"), ("track_board_created", "INTEGER DEFAULT 1"), ("track_board_updated", "INTEGER DEFAULT 0"), ("track_board_deleted", "INTEGER DEFAULT 1"), ("track_list_created", "INTEGER DEFAULT 0"), ("track_list_updated", "INTEGER DEFAULT 0"), ("track_list_deleted", "INTEGER DEFAULT 0"), ("track_attachment_created", "INTEGER DEFAULT 1"), ("track_card_label_added", "INTEGER DEFAULT 0"), ("track_task_completed", "INTEGER DEFAULT 1"), ] for col_name, col_type in planka_flags: if not await _has_column(conn, "tracking_config", col_name): await conn.execute( text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} {col_type}") ) logger.info("Added %s column to tracking_config table", col_name) # Add NUT (UPS) tracking flags to tracking_config if missing if await _has_table(conn, "tracking_config"): nut_flags = [ ("track_ups_online", "INTEGER DEFAULT 1"), ("track_ups_on_battery", "INTEGER DEFAULT 1"), ("track_ups_low_battery", "INTEGER DEFAULT 1"), ("track_ups_battery_restored", "INTEGER DEFAULT 1"), ("track_ups_comms_lost", "INTEGER DEFAULT 1"), ("track_ups_comms_restored", "INTEGER DEFAULT 1"), ("track_ups_replace_battery", "INTEGER DEFAULT 1"), ("track_ups_overload", "INTEGER DEFAULT 1"), ] for col_name, col_type in nut_flags: if not await _has_column(conn, "tracking_config", col_name): await conn.execute( text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} {col_type}") ) logger.info("Added %s column to tracking_config table", col_name) # Add Generic Webhook tracking flag to tracking_config if missing if await _has_table(conn, "tracking_config"): if not await _has_column(conn, "tracking_config", "track_webhook_received"): await conn.execute( text("ALTER TABLE tracking_config ADD COLUMN track_webhook_received INTEGER DEFAULT 1") ) logger.info("Added track_webhook_received column to tracking_config table") # Add quiet hours to tracking_config if missing. # Start/end are nullable HH:MM strings; quiet_hours_enabled gates them. if await _has_table(conn, "tracking_config"): if not await _has_column(conn, "tracking_config", "quiet_hours_enabled"): await conn.execute( text("ALTER TABLE tracking_config ADD COLUMN quiet_hours_enabled INTEGER DEFAULT 0") ) logger.info("Added quiet_hours_enabled column to tracking_config table") for col_name in ("quiet_hours_start", "quiet_hours_end"): if not await _has_column(conn, "tracking_config", col_name): await conn.execute( text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} TEXT") ) logger.info("Added %s column to tracking_config table", col_name) # Drop legacy template content columns from template_config # (template content moved to template_slot child rows) if await _has_table(conn, "template_config"): legacy_cols = [ "message_assets_added", "message_assets_removed", "message_collection_renamed", "message_collection_deleted", "message_sharing_changed", "periodic_summary_message", "scheduled_assets_message", "memory_mode_message", ] for col_name in legacy_cols: if await _has_column(conn, "template_config", col_name): await conn.execute( text(f"ALTER TABLE template_config DROP COLUMN {col_name}") ) logger.info("Dropped legacy column %s from template_config", col_name) # Add collection_name and shared to tracker_state if missing state_table = "notification_tracker_state" if await _has_table(conn, "notification_tracker_state") else "tracker_state" if await _has_table(conn, state_table): if not await _has_column(conn, state_table, "collection_name"): await conn.execute( text(f"ALTER TABLE {state_table} ADD COLUMN collection_name TEXT DEFAULT ''") ) logger.info("Added collection_name column to %s table", state_table) if not await _has_column(conn, state_table, "shared"): await conn.execute( text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0") ) logger.info("Added shared column to %s table", state_table) # meta_fingerprint — small JSON blob captured from the provider's # cheap meta probe. An empty default means "unknown, do a full # fetch next tick" so existing rows don't wrongly skip detection. if not await _has_column(conn, state_table, "meta_fingerprint"): await conn.execute( text(f"ALTER TABLE {state_table} ADD COLUMN meta_fingerprint TEXT DEFAULT '{{}}'") ) logger.info("Added meta_fingerprint column to %s table", state_table) # Add language_code to telegram_chat if missing if await _has_table(conn, "telegram_chat"): if not await _has_column(conn, "telegram_chat", "language_code"): await conn.execute( text("ALTER TABLE telegram_chat ADD COLUMN language_code TEXT DEFAULT ''") ) logger.info("Added language_code column to telegram_chat table") # Add language_override to telegram_chat if missing if not await _has_column(conn, "telegram_chat", "language_override"): await conn.execute( text("ALTER TABLE telegram_chat ADD COLUMN language_override TEXT DEFAULT ''") ) logger.info("Added language_override column to telegram_chat table") # Add locale to target_receiver if missing if await _has_table(conn, "target_receiver"): if not await _has_column(conn, "target_receiver", "locale"): await conn.execute( text("ALTER TABLE target_receiver ADD COLUMN locale TEXT DEFAULT ''") ) logger.info("Added locale column to target_receiver table") # Add commands_enabled to telegram_chat if missing (default disabled) if not await _has_column(conn, "telegram_chat", "commands_enabled"): await conn.execute( text("ALTER TABLE telegram_chat ADD COLUMN commands_enabled INTEGER DEFAULT 0") ) logger.info("Added commands_enabled column to telegram_chat table") # Add webhook_token to service_provider if missing if await _has_table(conn, "service_provider"): if not await _has_column(conn, "service_provider", "webhook_token"): await conn.execute( text("ALTER TABLE service_provider ADD COLUMN webhook_token TEXT DEFAULT ''") ) logger.info("Added webhook_token column to service_provider table") # Backfill existing providers with unique tokens import uuid providers = (await conn.execute(text("SELECT id FROM service_provider"))).fetchall() for row in providers: await conn.execute( text("UPDATE service_provider SET webhook_token = :tok WHERE id = :pid"), {"tok": uuid.uuid4().hex, "pid": row[0]}, ) if providers: logger.info("Backfilled webhook_token for %d existing providers", len(providers)) # --------------------------------------------------------------------------- # Legacy tracker_target migration (pre-Phase 1) # --------------------------------------------------------------------------- async def migrate_tracker_targets(engine: AsyncEngine) -> None: """Migrate legacy Tracker.target_ids JSON arrays to TrackerTarget rows. Also migrates: - Tracker.tracking_config_id → TrackerTarget.tracking_config_id - Tracker.quiet_hours_* → TrackerTarget.quiet_hours_* - NotificationTarget.template_config_id → TrackerTarget.template_config_id - TelegramBot.commands_config → TrackerTarget.commands_config (for telegram targets) Idempotent: skips if legacy columns don't exist or data already migrated. """ async with engine.begin() as conn: # Determine which table name exists (pre- or post-rename) if await _has_table(conn, "tracker"): tracker_table = "tracker" tt_table = "tracker_target" tracker_id_col = "tracker_id" elif await _has_table(conn, "notification_tracker"): tracker_table = "notification_tracker" tt_table = "notification_tracker_target" tracker_id_col = "notification_tracker_id" else: logger.debug("No tracker table found — skipping migration") return # Check if legacy target_ids column exists if not await _has_column(conn, tracker_table, "target_ids"): logger.debug("No legacy target_ids column found — skipping migration") return # Check if junction table already has data if await _has_table(conn, tt_table): tt_count = ( await conn.execute(text(f"SELECT COUNT(*) FROM {tt_table}")) ).scalar() if tt_count and tt_count > 0: logger.debug( "%s table already has %d rows — skipping migration", tt_table, tt_count, ) return # Load legacy data trackers = ( await conn.execute( text( f"SELECT id, target_ids, tracking_config_id, " f"quiet_hours_start, quiet_hours_end FROM {tracker_table}" ) ) ).fetchall() if not trackers: logger.debug("No trackers to migrate") return # Load template_config_id from targets (legacy field) target_template_map: dict[int, int | None] = {} if await _has_column(conn, "notification_target", "template_config_id"): targets = ( await conn.execute( text("SELECT id, template_config_id FROM notification_target") ) ).fetchall() for t in targets: target_template_map[t[0]] = t[1] # Load commands_config from telegram_bots (legacy field) bot_commands_map: dict[int, str | None] = {} if await _has_column(conn, "telegram_bot", "commands_config"): bots = ( await conn.execute( text("SELECT id, commands_config FROM telegram_bot") ) ).fetchall() for b in bots: bot_commands_map[b[0]] = b[1] # Build target → bot mapping for commands_config migration target_bot_map: dict[int, int] = {} if bot_commands_map: tgt_rows = ( await conn.execute( text("SELECT id, config FROM notification_target WHERE type='telegram'") ) ).fetchall() for tgt in tgt_rows: try: cfg = json.loads(tgt[1]) if isinstance(tgt[1], str) else tgt[1] if cfg and "bot_token" in cfg: for bot_id, _ in bot_commands_map.items(): bot_token_row = ( await conn.execute( text("SELECT token FROM telegram_bot WHERE id=:bid"), {"bid": bot_id}, ) ).fetchone() if bot_token_row and bot_token_row[0] == cfg.get("bot_token"): target_bot_map[tgt[0]] = bot_id except Exception: logger.warning( "Failed to match bot token for target %s", tgt[0], exc_info=True, ) # Create junction rows migrated = 0 for tracker in trackers: tracker_id = tracker[0] raw_target_ids = tracker[1] tracking_config_id = tracker[2] quiet_hours_start = tracker[3] quiet_hours_end = tracker[4] if isinstance(raw_target_ids, str): try: target_ids = json.loads(raw_target_ids) except (json.JSONDecodeError, TypeError): target_ids = [] elif isinstance(raw_target_ids, list): target_ids = raw_target_ids else: target_ids = [] for target_id in target_ids: template_config_id = target_template_map.get(target_id) commands_config = None if target_id in target_bot_map: bot_id = target_bot_map[target_id] raw_cmd = bot_commands_map.get(bot_id) if raw_cmd: commands_config = ( raw_cmd if isinstance(raw_cmd, str) else json.dumps(raw_cmd) ) await conn.execute( text( f"INSERT INTO {tt_table} " f"({tracker_id_col}, target_id, tracking_config_id, " f"template_config_id, enabled, quiet_hours_start, " f"quiet_hours_end, commands_config) " f"VALUES (:tid, :tgtid, :tcid, :tmplid, 1, :qhs, :qhe, :cmd)" ), { "tid": tracker_id, "tgtid": target_id, "tcid": tracking_config_id, "tmplid": template_config_id, "qhs": quiet_hours_start, "qhe": quiet_hours_end, "cmd": commands_config, }, ) migrated += 1 logger.info("Migrated %d tracker-target links", migrated) # --------------------------------------------------------------------------- # Phase 1: Entity refactor migration # --------------------------------------------------------------------------- async def migrate_entity_refactor(engine: AsyncEngine) -> None: """Phase 1 entity refactor — rename tables, add columns, create new tables. Fully idempotent: every operation checks preconditions before acting. """ async with engine.begin() as conn: # ------------------------------------------------------------------ # 1. Rename table: tracker → notification_tracker # ------------------------------------------------------------------ if await _has_table(conn, "tracker") and not await _has_table(conn, "notification_tracker"): await conn.execute(text("ALTER TABLE tracker RENAME TO notification_tracker")) logger.info("Renamed table tracker → notification_tracker") # ------------------------------------------------------------------ # 2. Rename table: tracker_target → notification_tracker_target # and rename column tracker_id → notification_tracker_id # ------------------------------------------------------------------ if await _has_table(conn, "tracker_target") and not await _has_table(conn, "notification_tracker_target"): # SQLite doesn't support RENAME COLUMN in older versions, so we # recreate the table with the new column name. await conn.execute(text( "CREATE TABLE notification_tracker_target (" " id INTEGER PRIMARY KEY," " notification_tracker_id INTEGER REFERENCES notification_tracker(id)," " target_id INTEGER REFERENCES notification_target(id)," " tracking_config_id INTEGER REFERENCES tracking_config(id)," " template_config_id INTEGER REFERENCES template_config(id)," " enabled INTEGER DEFAULT 1," " quiet_hours_start TEXT," " quiet_hours_end TEXT," " commands_config TEXT," " created_at TIMESTAMP" ")" )) await conn.execute(text( "INSERT INTO notification_tracker_target " "(id, notification_tracker_id, target_id, tracking_config_id, " "template_config_id, enabled, quiet_hours_start, quiet_hours_end, " "commands_config, created_at) " "SELECT id, tracker_id, target_id, tracking_config_id, " "template_config_id, enabled, quiet_hours_start, quiet_hours_end, " "commands_config, created_at " "FROM tracker_target" )) await conn.execute(text("DROP TABLE tracker_target")) logger.info("Renamed table tracker_target → notification_tracker_target (with column rename tracker_id → notification_tracker_id)") # ------------------------------------------------------------------ # 3. Rename table: tracker_state → notification_tracker_state # and rename column tracker_id → notification_tracker_id # ------------------------------------------------------------------ if await _has_table(conn, "tracker_state") and not await _has_table(conn, "notification_tracker_state"): await conn.execute(text( "CREATE TABLE notification_tracker_state (" " id INTEGER PRIMARY KEY," " notification_tracker_id INTEGER REFERENCES notification_tracker(id)," " collection_id TEXT," " collection_name TEXT DEFAULT ''," " shared INTEGER DEFAULT 0," " asset_ids TEXT," " pending_asset_ids TEXT," " last_updated TIMESTAMP" ")" )) await conn.execute(text( "INSERT INTO notification_tracker_state " "(id, notification_tracker_id, collection_id, collection_name, " "shared, asset_ids, pending_asset_ids, last_updated) " "SELECT id, tracker_id, collection_id, collection_name, " "shared, asset_ids, pending_asset_ids, last_updated " "FROM tracker_state" )) await conn.execute(text("DROP TABLE tracker_state")) logger.info("Renamed table tracker_state → notification_tracker_state (with column rename tracker_id → notification_tracker_id)") # ------------------------------------------------------------------ # 4. Add chat_action column to notification_target # ------------------------------------------------------------------ if await _has_table(conn, "notification_target"): if not await _has_column(conn, "notification_target", "chat_action"): await conn.execute( text("ALTER TABLE notification_target ADD COLUMN chat_action TEXT") ) logger.info("Added chat_action column to notification_target table") # ------------------------------------------------------------------ # 5. Rename tracker_id → notification_tracker_id in event_log # ------------------------------------------------------------------ if await _has_table(conn, "event_log"): if await _has_column(conn, "event_log", "tracker_id") and not await _has_column(conn, "event_log", "notification_tracker_id"): # Recreate event_log with renamed column await conn.execute(text( "CREATE TABLE event_log_new (" " id INTEGER PRIMARY KEY," " notification_tracker_id INTEGER REFERENCES notification_tracker(id)," " tracker_name TEXT DEFAULT ''," " provider_id INTEGER," " provider_name TEXT DEFAULT ''," " event_type TEXT," " collection_id TEXT," " collection_name TEXT," " assets_count INTEGER DEFAULT 0," " details TEXT," " created_at TIMESTAMP" ")" )) await conn.execute(text( "INSERT INTO event_log_new " "(id, notification_tracker_id, tracker_name, provider_id, " "provider_name, event_type, collection_id, collection_name, " "assets_count, details, created_at) " "SELECT id, tracker_id, tracker_name, provider_id, " "provider_name, event_type, collection_id, collection_name, " "assets_count, details, created_at " "FROM event_log" )) await conn.execute(text("DROP TABLE event_log")) await conn.execute(text("ALTER TABLE event_log_new RENAME TO event_log")) logger.info("Renamed column tracker_id → notification_tracker_id in event_log") # ------------------------------------------------------------------ # 6. Create command_config table # ------------------------------------------------------------------ if not await _has_table(conn, "command_config"): await conn.execute(text( "CREATE TABLE command_config (" " id INTEGER PRIMARY KEY," " user_id INTEGER NOT NULL REFERENCES user(id)," " provider_type TEXT NOT NULL," " name TEXT NOT NULL," " icon TEXT DEFAULT ''," " enabled_commands TEXT DEFAULT '[]'," " locale TEXT DEFAULT 'en'," " response_mode TEXT DEFAULT 'media'," " default_count INTEGER DEFAULT 5," " rate_limits TEXT DEFAULT '{}'," " created_at TIMESTAMP" ")" )) logger.info("Created command_config table") else: # Backfill locale column for tables created before locale was on the model if not await _has_column(conn, "command_config", "locale"): await conn.execute( text("ALTER TABLE command_config ADD COLUMN locale TEXT DEFAULT 'en'") ) logger.info("Added locale column to command_config table") # ------------------------------------------------------------------ # 7. Create command_tracker table # ------------------------------------------------------------------ if not await _has_table(conn, "command_tracker"): await conn.execute(text( "CREATE TABLE command_tracker (" " id INTEGER PRIMARY KEY," " user_id INTEGER NOT NULL REFERENCES user(id)," " provider_id INTEGER NOT NULL REFERENCES service_provider(id)," " command_config_id INTEGER NOT NULL REFERENCES command_config(id)," " name TEXT NOT NULL," " icon TEXT DEFAULT ''," " enabled INTEGER DEFAULT 1," " created_at TIMESTAMP" ")" )) logger.info("Created command_tracker table") # ------------------------------------------------------------------ # 8. Create command_tracker_listener table # ------------------------------------------------------------------ if not await _has_table(conn, "command_tracker_listener"): await conn.execute(text( "CREATE TABLE command_tracker_listener (" " id INTEGER PRIMARY KEY," " command_tracker_id INTEGER NOT NULL REFERENCES command_tracker(id)," " listener_type TEXT NOT NULL," " listener_id INTEGER NOT NULL," " created_at TIMESTAMP," " UNIQUE(command_tracker_id, listener_type, listener_id)" ")" )) logger.info("Created command_tracker_listener table") # ------------------------------------------------------------------ # 9. Migrate TelegramBot.commands_config → CommandConfig rows # ------------------------------------------------------------------ if await _has_table(conn, "telegram_bot") and await _has_column(conn, "telegram_bot", "commands_config"): # Only migrate if command_config table is empty (idempotent) cc_count = (await conn.execute(text("SELECT COUNT(*) FROM command_config"))).scalar() if cc_count == 0: bots = (await conn.execute(text( "SELECT id, user_id, commands_config FROM telegram_bot" ))).fetchall() migrated = 0 for bot in bots: bot_id, user_id, raw_config = bot[0], bot[1], bot[2] if not raw_config: continue try: cfg = json.loads(raw_config) if isinstance(raw_config, str) else raw_config except (json.JSONDecodeError, TypeError): continue # Skip empty/default configs if not cfg or cfg == {}: continue # Extract fields from legacy commands_config enabled_commands = json.dumps(cfg.get("enabled_commands", [])) locale = cfg.get("locale", "en") response_mode = cfg.get("response_mode", "media") default_count = cfg.get("default_count", 5) rate_limits = json.dumps(cfg.get("rate_limits", {})) provider_type = cfg.get("provider_type", "immich") await conn.execute( text( "INSERT INTO command_config " "(user_id, provider_type, name, enabled_commands, locale, " "response_mode, default_count, rate_limits, created_at) " "VALUES (:uid, :pt, :name, :ec, :locale, :rm, :dc, :rl, CURRENT_TIMESTAMP)" ), { "uid": user_id, "pt": provider_type, "name": f"Bot #{bot_id} Commands", "ec": enabled_commands, "locale": locale, "rm": response_mode, "dc": default_count, "rl": rate_limits, }, ) migrated += 1 if migrated: logger.info("Migrated %d bot commands_config → command_config rows", migrated) # NOTE: We intentionally do NOT drop commands_config from telegram_bot # or notification_tracker_target. SQLite doesn't support DROP COLUMN in # all versions, and SQLModel will simply ignore columns not defined on # the model class. The columns will remain in the DB but are unused. # --------------------------------------------------------------------------- # Template slot migration # --------------------------------------------------------------------------- # Old column names that existed on template_config before the slot refactor _LEGACY_TEMPLATE_COLUMNS = [ "message_assets_added", "message_assets_removed", "message_collection_renamed", "message_collection_deleted", "message_sharing_changed", "periodic_summary_message", "scheduled_assets_message", "memory_mode_message", ] async def migrate_template_slots(engine: AsyncEngine) -> None: """Migrate legacy TemplateConfig column-based templates to TemplateSlot rows. Reads the old per-column template values via raw SQL (since they're no longer on the SQLModel class) and inserts them as TemplateSlot rows. Idempotent: skips if template_slot table already has data or legacy columns don't exist. """ async with engine.begin() as conn: if not await _has_table(conn, "template_config"): return # Check if the legacy columns still exist in the DB has_legacy = await _has_column(conn, "template_config", "message_assets_added") if not has_legacy: logger.debug("No legacy template columns found — skipping slot migration") return # Check if template_slot table exists and already has data if await _has_table(conn, "template_slot"): slot_count = (await conn.execute(text("SELECT COUNT(*) FROM template_slot"))).scalar() if slot_count and slot_count > 0: logger.debug("template_slot table already has %d rows — skipping migration", slot_count) return # Create template_slot table if it doesn't exist yet # (SQLModel.metadata.create_all may have already created it, but be safe) if not await _has_table(conn, "template_slot"): await conn.execute(text( "CREATE TABLE template_slot (" " id INTEGER PRIMARY KEY," " config_id INTEGER NOT NULL REFERENCES template_config(id)," " slot_name TEXT NOT NULL," " template TEXT DEFAULT ''," " UNIQUE(config_id, slot_name)" ")" )) logger.info("Created template_slot table") # Read all template configs with their legacy column values col_list = ", ".join(_LEGACY_TEMPLATE_COLUMNS) rows = (await conn.execute( text(f"SELECT id, {col_list} FROM template_config") )).fetchall() migrated = 0 for row in rows: config_id = row[0] for i, col_name in enumerate(_LEGACY_TEMPLATE_COLUMNS): template_text = row[i + 1] or "" if template_text.strip(): await conn.execute( text( "INSERT INTO template_slot (config_id, slot_name, template) " "VALUES (:cid, :sn, :tmpl)" ), {"cid": config_id, "sn": col_name, "tmpl": template_text}, ) migrated += 1 if migrated: logger.info("Migrated %d template slots from legacy columns", migrated) # --------------------------------------------------------------------------- # Target receiver migration # --------------------------------------------------------------------------- async def migrate_target_receivers(engine: AsyncEngine) -> None: """Migrate single chat_id/url from NotificationTarget.config to TargetReceiver rows. For each existing target that has a chat_id or url in its config JSON and no receivers yet, creates a TargetReceiver row. Idempotent: skips targets that already have receivers. """ async with engine.begin() as conn: if not await _has_table(conn, "notification_target"): return # Create target_receiver table if it doesn't exist yet if not await _has_table(conn, "target_receiver"): await conn.execute(text( "CREATE TABLE target_receiver (" " id INTEGER PRIMARY KEY," " target_id INTEGER NOT NULL REFERENCES notification_target(id)," " name TEXT DEFAULT ''," " config TEXT DEFAULT '{}'," " receiver_key TEXT DEFAULT ''," " enabled INTEGER DEFAULT 1," " created_at TIMESTAMP," " UNIQUE(target_id, receiver_key)" ")" )) logger.info("Created target_receiver table") # Check if any receivers already exist if await _has_table(conn, "target_receiver"): recv_count = (await conn.execute(text("SELECT COUNT(*) FROM target_receiver"))).scalar() if recv_count and recv_count > 0: logger.debug("target_receiver already has %d rows — skipping migration", recv_count) return # Read all targets targets = (await conn.execute( text("SELECT id, type, config FROM notification_target") )).fetchall() migrated = 0 for row in targets: target_id, target_type, raw_config = row[0], row[1], row[2] try: cfg = json.loads(raw_config) if isinstance(raw_config, str) else (raw_config or {}) except (json.JSONDecodeError, TypeError): cfg = {} receiver_key = "" receiver_config = {} receiver_name = "" if target_type == "telegram": chat_id = cfg.get("chat_id", "") if chat_id: receiver_key = str(chat_id) receiver_config = {"chat_id": str(chat_id)} receiver_name = f"Chat {chat_id}" elif target_type == "webhook": url = cfg.get("url", "") if url: receiver_key = url receiver_config = {"url": url, "headers": cfg.get("headers", {})} receiver_name = url[:50] if receiver_key: await conn.execute( text( "INSERT INTO target_receiver (target_id, name, config, receiver_key, enabled, created_at) " "VALUES (:tid, :name, :cfg, :rk, 1, CURRENT_TIMESTAMP)" ), { "tid": target_id, "name": receiver_name, "cfg": json.dumps(receiver_config), "rk": receiver_key, }, ) migrated += 1 if migrated: logger.info("Migrated %d target receivers from legacy config", migrated) async def migrate_receivers_from_config(engine: AsyncEngine) -> None: """Extract delivery endpoint fields from target.config into TargetReceiver rows. For each NotificationTarget that still has a delivery field (chat_id, url, webhook_url, email, topic, room_id) in its config JSON: 1. Create a TargetReceiver row (if one with the same key doesn't exist) 2. Remove the delivery field(s) from the config JSON Idempotent: checks for existing receiver before creating; only strips fields that are still present in config. """ # Mapping: target_type -> (delivery field in config, receiver config builder) _DELIVERY_FIELDS: dict[str, dict[str, str]] = { "telegram": {"chat_id": "chat_id"}, "webhook": {"url": "url"}, "email": {"email": "email"}, "discord": {"webhook_url": "webhook_url"}, "slack": {"webhook_url": "webhook_url"}, "ntfy": {"topic": "topic"}, "matrix": {"room_id": "room_id"}, } async with engine.begin() as conn: if not await _has_table(conn, "notification_target"): return if not await _has_table(conn, "target_receiver"): return targets = (await conn.execute( text("SELECT id, type, config FROM notification_target") )).fetchall() created = 0 cleaned = 0 for row in targets: target_id, target_type, raw_config = row[0], row[1], row[2] try: cfg = json.loads(raw_config) if isinstance(raw_config, str) else (raw_config or {}) except (json.JSONDecodeError, TypeError): cfg = {} field_map = _DELIVERY_FIELDS.get(target_type, {}) if not field_map: continue # Check if any delivery field is present in config delivery_field = list(field_map.keys())[0] # e.g. "chat_id", "url" delivery_value = cfg.get(delivery_field) if not delivery_value: continue # Build receiver config receiver_config: dict[str, Any] = {delivery_field: delivery_value} # For webhook, also move headers to receiver config if target_type == "webhook" and "headers" in cfg: receiver_config["headers"] = cfg["headers"] receiver_key = str(delivery_value) # Check if receiver already exists existing = (await conn.execute( text( "SELECT id FROM target_receiver " "WHERE target_id = :tid AND receiver_key = :rk" ), {"tid": target_id, "rk": receiver_key}, )).fetchone() if not existing: # Derive a name for the receiver if target_type == "telegram": name = f"Chat {delivery_value}" elif target_type == "webhook": name = str(delivery_value)[:50] elif target_type == "email": name = str(delivery_value) else: name = str(delivery_value)[:50] await conn.execute( text( "INSERT INTO target_receiver " "(target_id, name, config, receiver_key, enabled, created_at) " "VALUES (:tid, :name, :cfg, :rk, 1, CURRENT_TIMESTAMP)" ), { "tid": target_id, "name": name, "cfg": json.dumps(receiver_config), "rk": receiver_key, }, ) created += 1 # Remove delivery fields from config new_cfg = dict(cfg) new_cfg.pop(delivery_field, None) # For webhook, also remove headers (moved to receiver) if target_type == "webhook": new_cfg.pop("headers", None) if new_cfg != cfg: await conn.execute( text( "UPDATE notification_target SET config = :cfg WHERE id = :tid" ), {"cfg": json.dumps(new_cfg), "tid": target_id}, ) cleaned += 1 if created: logger.info("Created %d receiver rows from target config delivery fields", created) if cleaned: logger.info("Cleaned delivery fields from %d target configs", cleaned) async def migrate_template_locale(engine: AsyncEngine) -> None: """Add locale column to template_config and command_template_config. Backfill locale from name: "(RU)" -> "ru", else "en" for system-owned rows. """ async with engine.begin() as conn: for table in ("template_config", "command_template_config"): if await _has_column(conn, table, "locale"): continue logger.info("Adding locale column to %s", table) await conn.execute(text(f"ALTER TABLE {table} ADD COLUMN locale TEXT DEFAULT ''")) # Backfill system-owned rows await conn.execute(text( f"UPDATE {table} SET locale = 'ru' WHERE user_id = 0 AND name LIKE '%(RU)%'" )) await conn.execute(text( f"UPDATE {table} SET locale = 'en' WHERE user_id = 0 AND locale = ''" )) async def migrate_command_slot_locale(engine: AsyncEngine) -> None: """Add locale column to command_template_slot and merge system EN/RU configs. 1. Recreate command_template_slot with locale column and new unique constraint 2. Backfill locale from parent config's locale (or 'en') 3. Merge "Default Commands (RU)" slots into "Default Commands (EN)" with locale='ru' 4. Rename merged config, update references, delete orphan RU config """ async with engine.begin() as conn: if not await _has_table(conn, "command_template_slot"): return # Skip if locale column already exists (idempotent) if await _has_column(conn, "command_template_slot", "locale"): return logger.info("Adding locale column to command_template_slot and merging system configs") # Step 1: Recreate table with locale column and new unique constraint await conn.execute(text( "CREATE TABLE command_template_slot_new (" " id INTEGER PRIMARY KEY," " config_id INTEGER NOT NULL REFERENCES command_template_config(id)," " slot_name TEXT NOT NULL," " locale TEXT NOT NULL DEFAULT 'en'," " template TEXT DEFAULT ''," " UNIQUE(config_id, slot_name, locale)" ")" )) # Step 2: Copy existing data, deriving locale from parent config await conn.execute(text( "INSERT INTO command_template_slot_new (id, config_id, slot_name, locale, template) " "SELECT s.id, s.config_id, s.slot_name, " " CASE WHEN c.locale != '' THEN c.locale ELSE 'en' END, " " s.template " "FROM command_template_slot s " "LEFT JOIN command_template_config c ON s.config_id = c.id" )) await conn.execute(text("DROP TABLE command_template_slot")) await conn.execute(text( "ALTER TABLE command_template_slot_new RENAME TO command_template_slot" )) # Step 3: Merge system EN/RU configs into one # Find the system EN and RU config IDs en_row = (await conn.execute(text( "SELECT id FROM command_template_config " "WHERE user_id = 0 AND (locale = 'en' OR name LIKE '%(EN)%') " "LIMIT 1" ))).fetchone() ru_row = (await conn.execute(text( "SELECT id FROM command_template_config " "WHERE user_id = 0 AND (locale = 'ru' OR name LIKE '%(RU)%') " "LIMIT 1" ))).fetchone() if en_row and ru_row and en_row[0] != ru_row[0]: en_id, ru_id = en_row[0], ru_row[0] # Move RU slots to the EN config (they already have locale='ru') await conn.execute(text( "UPDATE command_template_slot SET config_id = :en_id " "WHERE config_id = :ru_id" ), {"en_id": en_id, "ru_id": ru_id}) # Update any command_config references from RU to EN if await _has_table(conn, "command_config"): await conn.execute(text( "UPDATE command_config SET command_template_config_id = :en_id " "WHERE command_template_config_id = :ru_id" ), {"en_id": en_id, "ru_id": ru_id}) # Delete the orphan RU config await conn.execute(text( "DELETE FROM command_template_config WHERE id = :ru_id" ), {"ru_id": ru_id}) # Rename the merged config await conn.execute(text( "UPDATE command_template_config SET name = 'Default Commands', " "description = 'Default Immich command templates', locale = '' " "WHERE id = :en_id" ), {"en_id": en_id}) logger.info( "Merged system command template configs (EN=%d, RU=%d) into single config %d", en_id, ru_id, en_id, ) async def migrate_notification_slot_locale(engine: AsyncEngine) -> None: """Add locale column to template_slot and merge system EN/RU configs per provider. 1. Recreate template_slot with locale column and new unique constraint 2. Backfill locale from parent config's locale (or 'en') 3. For each provider: merge "Default X (RU)" slots into "Default X (EN)" with locale='ru' 4. Rename merged configs, update references, delete orphan RU configs """ async with engine.begin() as conn: if not await _has_table(conn, "template_slot"): return # Skip if locale column already exists (idempotent) if await _has_column(conn, "template_slot", "locale"): return logger.info("Adding locale column to template_slot and merging system configs") # Step 1: Recreate table with locale column and new unique constraint await conn.execute(text( "CREATE TABLE template_slot_new (" " id INTEGER PRIMARY KEY," " config_id INTEGER NOT NULL REFERENCES template_config(id)," " slot_name TEXT NOT NULL," " locale TEXT NOT NULL DEFAULT 'en'," " template TEXT DEFAULT ''," " UNIQUE(config_id, slot_name, locale)" ")" )) # Step 2: Copy existing data, deriving locale from parent config await conn.execute(text( "INSERT INTO template_slot_new (id, config_id, slot_name, locale, template) " "SELECT s.id, s.config_id, s.slot_name, " " CASE WHEN c.locale != '' THEN c.locale ELSE 'en' END, " " s.template " "FROM template_slot s " "LEFT JOIN template_config c ON s.config_id = c.id" )) await conn.execute(text("DROP TABLE template_slot")) await conn.execute(text( "ALTER TABLE template_slot_new RENAME TO template_slot" )) # Step 3: Merge system EN/RU configs per provider type providers = (await conn.execute(text( "SELECT DISTINCT provider_type FROM template_config WHERE user_id = 0" ))).fetchall() for (provider_type,) in providers: en_row = (await conn.execute(text( "SELECT id FROM template_config " "WHERE user_id = 0 AND provider_type = :pt " " AND (locale = 'en' OR name LIKE '%(EN)%') " "LIMIT 1" ), {"pt": provider_type})).fetchone() ru_row = (await conn.execute(text( "SELECT id FROM template_config " "WHERE user_id = 0 AND provider_type = :pt " " AND (locale = 'ru' OR name LIKE '%(RU)%') " "LIMIT 1" ), {"pt": provider_type})).fetchone() if en_row and ru_row and en_row[0] != ru_row[0]: en_id, ru_id = en_row[0], ru_row[0] # Move RU slots to the EN config (they already have locale='ru') await conn.execute(text( "UPDATE template_slot SET config_id = :en_id " "WHERE config_id = :ru_id" ), {"en_id": en_id, "ru_id": ru_id}) # Update notification_tracker_target references from RU to EN if await _has_table(conn, "notification_tracker_target"): await conn.execute(text( "UPDATE notification_tracker_target SET template_config_id = :en_id " "WHERE template_config_id = :ru_id" ), {"en_id": en_id, "ru_id": ru_id}) # Delete the orphan RU config await conn.execute(text( "DELETE FROM template_config WHERE id = :ru_id" ), {"ru_id": ru_id}) # Rename the merged config (strip locale suffix) label = provider_type.capitalize() await conn.execute(text( "UPDATE template_config SET name = :name, " "description = :desc, locale = '' " "WHERE id = :en_id" ), {"name": f"Default {label}", "desc": f"Default {label} templates", "en_id": en_id}) logger.info( "Merged system notification template configs for %s (EN=%d, RU=%d) into %d", provider_type, en_id, ru_id, en_id, ) async def migrate_user_token_version(engine: AsyncEngine) -> None: """Add token_version column to user for JWT revocation on password change.""" async with engine.begin() as conn: if not await _has_table(conn, "user"): return if not await _has_column(conn, "user", "token_version"): await conn.execute( text("ALTER TABLE user ADD COLUMN token_version INTEGER NOT NULL DEFAULT 1") ) logger.info("Added token_version column to user table") # --------------------------------------------------------------------------- # Performance indexes — covers every FK / owner column the list endpoints # and the webhook hot-path filter on. All use CREATE INDEX IF NOT EXISTS so # they are safe to re-run on every boot. # --------------------------------------------------------------------------- _INDEXES: list[tuple[str, str, str]] = [ # (index_name, table, columns) ("ix_service_provider_user_id", "service_provider", "user_id"), ("ix_telegram_bot_user_id", "telegram_bot", "user_id"), ("ix_matrix_bot_user_id", "matrix_bot", "user_id"), ("ix_email_bot_user_id", "email_bot", "user_id"), ("ix_telegram_chat_bot_id", "telegram_chat", "bot_id"), ("ix_tracking_config_user_id", "tracking_config", "user_id"), ("ix_tracking_config_provider_type", "tracking_config", "provider_type"), ("ix_notification_target_user_id", "notification_target", "user_id"), ("ix_notification_target_type", "notification_target", "type"), ("ix_notification_tracker_user_id", "notification_tracker", "user_id"), ("ix_notification_tracker_provider_id", "notification_tracker", "provider_id"), # Composite for the webhook hot path: WHERE provider_id = ? AND enabled = true ( "ix_notification_tracker_provider_enabled", "notification_tracker", "provider_id, enabled", ), ("ix_command_config_user_id", "command_config", "user_id"), ("ix_command_template_config_user_id", "command_template_config", "user_id"), ("ix_command_tracker_user_id", "command_tracker", "user_id"), ("ix_command_tracker_provider_id", "command_tracker", "provider_id"), ("ix_action_user_id", "action", "user_id"), ("ix_action_provider_id", "action", "provider_id"), # Dashboard: SELECT event_log WHERE user_id = ? ORDER BY created_at DESC ("ix_event_log_user_created", "event_log", "user_id, created_at DESC"), ("ix_event_log_provider_id", "event_log", "provider_id"), ("ix_event_log_notification_tracker_id", "event_log", "notification_tracker_id"), ("ix_event_log_action_id", "event_log", "action_id"), # Webhook log hot path: WHERE provider_id = ? ORDER BY created_at DESC ( "ix_webhook_payload_log_provider_created", "webhook_payload_log", "provider_id, created_at DESC", ), # Notification tracker join tables ( "ix_notification_tracker_target_notification_tracker_id", "notification_tracker_target", "notification_tracker_id", ), ( "ix_notification_tracker_target_target_id", "notification_tracker_target", "target_id", ), ("ix_target_receiver_target_id", "target_receiver", "target_id"), ("ix_template_slot_config_id", "template_slot", "config_id"), ("ix_command_template_slot_config_id", "command_template_slot", "config_id"), ("ix_action_rule_action_id", "action_rule", "action_id"), ("ix_action_execution_action_started", "action_execution", "action_id, started_at DESC"), ] async def migrate_performance_indexes(engine: AsyncEngine) -> None: """Create missing performance indexes on hot query paths. Every index is created with IF NOT EXISTS so the migration is safe to replay on every boot. We only create the index when the table exists — early boots before other migrations land would otherwise raise. """ async with engine.begin() as conn: for name, table, columns in _INDEXES: _assert_ident(name, "index") _assert_ident(table, "table") # Columns list is a trusted literal constructed above — never user input. if not await _has_table(conn, table): continue try: await conn.execute( text(f"CREATE INDEX IF NOT EXISTS {name} ON {table} ({columns})") ) except Exception: # pragma: no cover — log and continue logger.warning( "Failed to create index %s on %s(%s)", name, table, columns, exc_info=True, ) async def migrate_chat_action_to_column(engine: AsyncEngine) -> None: """Move ``chat_action`` from ``config`` JSON to the dedicated column. Earlier versions of the frontend stored ``chat_action`` inside ``notification_target.config``; the dedicated ``chat_action`` column was rarely set or held a stale default. The dispatcher's resolver overrode the config value with the (stale) column, so a user's UI choice silently had no effect on outgoing chat actions. This backfill takes the config value as authoritative (it's what the UI was writing) and copies it to the column, then strips it from config so the column becomes the single source of truth. Idempotent: a second run finds nothing to migrate. """ async with engine.begin() as conn: if not await _has_table(conn, "notification_target"): return if not await _has_column(conn, "notification_target", "chat_action"): return # Copy config["chat_action"] → column where present. await conn.execute(text( "UPDATE notification_target " "SET chat_action = json_extract(config, '$.chat_action') " "WHERE json_extract(config, '$.chat_action') IS NOT NULL" )) # Strip the legacy key so the column is unambiguous going forward. await conn.execute(text( "UPDATE notification_target " "SET config = json_remove(config, '$.chat_action') " "WHERE json_extract(config, '$.chat_action') IS NOT NULL" )) logger.info("Migrated chat_action from config JSON to column where present") # --------------------------------------------------------------------------- # Schema version tracking — lightweight alternative to Alembic while the # hand-rolled idempotent migrations remain the source of truth. Gives # operators a single-row answer to "what schema is this DB at" and lets # future upgrades short-circuit migrations that already ran. # --------------------------------------------------------------------------- CURRENT_SCHEMA_VERSION = 1 async def migrate_schema_version(engine: AsyncEngine) -> None: """Create schema_version table and bump it to CURRENT_SCHEMA_VERSION.""" async with engine.begin() as conn: await conn.execute( text( "CREATE TABLE IF NOT EXISTS schema_version (" " id INTEGER PRIMARY KEY CHECK (id = 1)," " version INTEGER NOT NULL," " applied_at TEXT NOT NULL" ")" ) ) row = await conn.run_sync( lambda sc: sc.execute( text("SELECT version FROM schema_version WHERE id = 1") ).fetchone() ) from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() if row is None: await conn.execute( text( "INSERT INTO schema_version (id, version, applied_at) " "VALUES (1, :v, :t)" ), {"v": CURRENT_SCHEMA_VERSION, "t": now}, ) logger.info("Initialized schema_version at %d", CURRENT_SCHEMA_VERSION) elif int(row[0]) < CURRENT_SCHEMA_VERSION: await conn.execute( text( "UPDATE schema_version SET version = :v, applied_at = :t " "WHERE id = 1" ), {"v": CURRENT_SCHEMA_VERSION, "t": now}, ) logger.info( "Bumped schema_version from %s to %d", row[0], CURRENT_SCHEMA_VERSION, )