Files
notify-bridge/packages/server/src/notify_bridge_server/database/migrations.py
T
alexei.dolgolyov fe38d20b96 perf(immich): skip full album fetch on idle ticks; delta-fetch for active ones
Optimizes polling for large Immich albums (tested path targets ~200k
assets). Combined impact on idle albums drops per-tick cost from ~150 MB
fetch to ~few hundred bytes; active albums fetch O(changes) instead of
O(library).

Core changes
- ImmichAlbumMeta + get_album_meta() using ?withoutAssets=true as a
  cheap change-detection probe.
- poll() fast-path: skip full fetch when meta fingerprint matches and
  no pending assets are outstanding.
- poll() delta-path: search/metadata with updatedAfter when fingerprint
  changed, falling back to full fetch on count decrease or mixed
  add+remove that delta can't reconcile.
- asyncio.gather over meta probes so a 20-album tracker pays one
  round-trip of latency instead of 20.
- Event payload cap (50 added / 200 removed) so a bulk import can't
  explode a Jinja template or exceed Telegram's message limits.
- Module-level users cache (1h TTL, sha256-keyed) shared across
  providers on the same Immich server.
- Tick-scoped shared-links cache via new
  get_all_shared_links_by_album() — one /api/shared-links request per
  tick instead of one per changed album.

Server changes
- meta_fingerprint JSON column on NotificationTrackerState + migration.
- watcher skips the asset_ids DB rewrite when the fingerprint didn't
  change, avoiding ~8 MB JSON writes on idle ticks for huge albums.
- Adaptive polling: after 10 empty ticks skip 1-in-2, after 30 skip
  1-in-4, reset on first detected change; resets on schedule changes.
- APScheduler jitter (interval/4, capped at 30s) to smooth thundering-
  herd bursts when many trackers share the same scan_interval.
2026-04-22 18:55:26 +03:00

1285 lines
59 KiB
Python

"""Data migrations for schema changes.
Handles converting legacy JSON-array relationships to proper junction tables,
and the Phase 1 entity refactor (tracker → notification_tracker, etc.).
"""
import json
import logging
from typing import Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_IDENT_RE = __import__("re").compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def _assert_ident(ident: str, kind: str = "identifier") -> str:
"""Guard against SQL injection in dynamically interpolated identifiers.
All table/column names flow through here before being embedded into f-strings,
so attacker-controlled values cannot break out even if they reach this layer.
"""
if not isinstance(ident, str) or not _IDENT_RE.match(ident):
raise ValueError(f"Unsafe {kind}: {ident!r}")
return ident
async def _has_column(conn, table: str, column: str) -> bool:
"""Check if a column exists in a SQLite table."""
_assert_ident(table, "table")
cols = await conn.run_sync(
lambda sync_conn: [
row[1]
for row in sync_conn.execute(
text(f"PRAGMA table_info('{table}')")
).fetchall()
]
)
return column in cols
async def _has_table(conn, table: str) -> bool:
"""Check if a table exists in the SQLite database."""
result = await conn.run_sync(
lambda sync_conn: sync_conn.execute(
text(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name=:name"
),
{"name": table},
).fetchone()
)
return result is not None
# ---------------------------------------------------------------------------
# Legacy schema migrations (pre-Phase 1)
# ---------------------------------------------------------------------------
async def migrate_schema(engine: AsyncEngine) -> None:
"""Add missing columns to existing tables (SQLite ALTER TABLE ADD COLUMN)."""
async with engine.begin() as conn:
# --- Tracker table (may still be named "tracker" or already renamed) ---
tracker_table = "notification_tracker" if await _has_table(conn, "notification_tracker") else "tracker"
if await _has_table(conn, tracker_table):
if not await _has_column(conn, tracker_table, "batch_duration"):
await conn.execute(
text(f"ALTER TABLE {tracker_table} ADD COLUMN batch_duration INTEGER DEFAULT 0")
)
logger.info("Added batch_duration column to %s table", tracker_table)
# Add enriched fields to event_log if missing
if await _has_table(conn, "event_log"):
for col, sql in [
("tracker_name", "ALTER TABLE event_log ADD COLUMN tracker_name TEXT DEFAULT ''"),
("provider_id", "ALTER TABLE event_log ADD COLUMN provider_id INTEGER"),
("provider_name", "ALTER TABLE event_log ADD COLUMN provider_name TEXT DEFAULT ''"),
("assets_count", "ALTER TABLE event_log ADD COLUMN assets_count INTEGER DEFAULT 0"),
("user_id", "ALTER TABLE event_log ADD COLUMN user_id INTEGER"),
("action_id", "ALTER TABLE event_log ADD COLUMN action_id INTEGER"),
("action_name", "ALTER TABLE event_log ADD COLUMN action_name TEXT DEFAULT ''"),
]:
if not await _has_column(conn, "event_log", col):
await conn.execute(text(sql))
logger.info("Added %s column to event_log table", col)
# Explicit indexes on the dashboard-query columns. SQLModel's
# ``index=True`` is emitted by ``create_all`` on *new* installs,
# but ALTER TABLE ADD COLUMN doesn't create them on upgrades —
# so the first boot after upgrade would leave these unindexed
# and status.py ``WHERE user_id=...`` would table-scan. The
# indexes are redundant-but-safe once create_all also runs.
for idx_name, col in [
("ix_event_log_user_id", "user_id"),
("ix_event_log_action_id", "action_id"),
("ix_event_log_provider_id", "provider_id"),
]:
await conn.execute(
text(f"CREATE INDEX IF NOT EXISTS {idx_name} ON event_log ({col})")
)
# Backfill user_id from notification_tracker for legacy rows.
# Safe to run repeatedly: only touches rows where user_id is still NULL.
await conn.execute(text("""
UPDATE event_log
SET user_id = (
SELECT user_id FROM notification_tracker
WHERE notification_tracker.id = event_log.notification_tracker_id
)
WHERE event_log.user_id IS NULL
AND event_log.notification_tracker_id IS NOT NULL
"""))
# Add commands_config to telegram_bot if missing
if await _has_table(conn, "telegram_bot"):
if not await _has_column(conn, "telegram_bot", "commands_config"):
await conn.execute(
text("ALTER TABLE telegram_bot ADD COLUMN commands_config TEXT DEFAULT '{}'")
)
logger.info("Added commands_config column to telegram_bot table")
# Add webhook_path_id to telegram_bot if missing
if not await _has_column(conn, "telegram_bot", "webhook_path_id"):
await conn.execute(
text("ALTER TABLE telegram_bot ADD COLUMN webhook_path_id TEXT DEFAULT ''")
)
logger.info("Added webhook_path_id column to telegram_bot table")
# Backfill existing bots with unique IDs
import uuid
bots = (await conn.execute(text("SELECT id FROM telegram_bot"))).fetchall()
for bot in bots:
await conn.execute(
text("UPDATE telegram_bot SET webhook_path_id = :wid WHERE id = :bid"),
{"wid": uuid.uuid4().hex, "bid": bot[0]},
)
if bots:
logger.info("Backfilled webhook_path_id for %d existing bots", len(bots))
# Add update_mode to telegram_bot if missing
if not await _has_column(conn, "telegram_bot", "update_mode"):
await conn.execute(
text("ALTER TABLE telegram_bot ADD COLUMN update_mode TEXT DEFAULT 'polling'")
)
logger.info("Added update_mode column to telegram_bot table")
# Add command_template_config_id to command_config if missing
if await _has_table(conn, "command_config"):
if not await _has_column(conn, "command_config", "command_template_config_id"):
await conn.execute(
text("ALTER TABLE command_config ADD COLUMN command_template_config_id INTEGER")
)
logger.info("Added command_template_config_id column to command_config table")
# Add allowed_album_ids (per-chat album scope) to command_tracker_listener
if await _has_table(conn, "command_tracker_listener"):
if not await _has_column(conn, "command_tracker_listener", "allowed_album_ids"):
await conn.execute(
text("ALTER TABLE command_tracker_listener ADD COLUMN allowed_album_ids TEXT")
)
logger.info("Added allowed_album_ids column to command_tracker_listener table")
# Add date_only_format to template_config if missing
if await _has_table(conn, "template_config"):
if not await _has_column(conn, "template_config", "date_only_format"):
await conn.execute(
text("ALTER TABLE template_config ADD COLUMN date_only_format TEXT DEFAULT '%d.%m.%Y'")
)
logger.info("Added date_only_format column to template_config table")
# Add memory_source to tracking_config if missing
if await _has_table(conn, "tracking_config"):
if not await _has_column(conn, "tracking_config", "memory_source"):
await conn.execute(
text("ALTER TABLE tracking_config ADD COLUMN memory_source TEXT DEFAULT 'albums'")
)
logger.info("Added memory_source column to tracking_config table")
# Add filters JSON column to notification_tracker if missing
if await _has_table(conn, tracker_table):
if not await _has_column(conn, tracker_table, "filters"):
await conn.execute(
text(f"ALTER TABLE {tracker_table} ADD COLUMN filters TEXT DEFAULT '{{}}'")
)
logger.info("Added filters column to %s table", tracker_table)
# Add Gitea tracking flags to tracking_config if missing
if await _has_table(conn, "tracking_config"):
gitea_flags = [
("track_push", "INTEGER DEFAULT 1"),
("track_issue_opened", "INTEGER DEFAULT 1"),
("track_issue_closed", "INTEGER DEFAULT 1"),
("track_issue_commented", "INTEGER DEFAULT 0"),
("track_pr_opened", "INTEGER DEFAULT 1"),
("track_pr_closed", "INTEGER DEFAULT 1"),
("track_pr_merged", "INTEGER DEFAULT 1"),
("track_pr_commented", "INTEGER DEFAULT 0"),
("track_release_published", "INTEGER DEFAULT 1"),
("track_scheduled_message", "INTEGER DEFAULT 1"),
]
for col_name, col_type in gitea_flags:
if not await _has_column(conn, "tracking_config", col_name):
await conn.execute(
text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} {col_type}")
)
logger.info("Added %s column to tracking_config table", col_name)
# Add Planka tracking flags to tracking_config if missing
if await _has_table(conn, "tracking_config"):
planka_flags = [
("track_card_created", "INTEGER DEFAULT 1"),
("track_card_updated", "INTEGER DEFAULT 0"),
("track_card_moved", "INTEGER DEFAULT 1"),
("track_card_deleted", "INTEGER DEFAULT 0"),
("track_card_commented", "INTEGER DEFAULT 1"),
("track_comment_updated", "INTEGER DEFAULT 0"),
("track_board_created", "INTEGER DEFAULT 1"),
("track_board_updated", "INTEGER DEFAULT 0"),
("track_board_deleted", "INTEGER DEFAULT 1"),
("track_list_created", "INTEGER DEFAULT 0"),
("track_list_updated", "INTEGER DEFAULT 0"),
("track_list_deleted", "INTEGER DEFAULT 0"),
("track_attachment_created", "INTEGER DEFAULT 1"),
("track_card_label_added", "INTEGER DEFAULT 0"),
("track_task_completed", "INTEGER DEFAULT 1"),
]
for col_name, col_type in planka_flags:
if not await _has_column(conn, "tracking_config", col_name):
await conn.execute(
text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} {col_type}")
)
logger.info("Added %s column to tracking_config table", col_name)
# Add NUT (UPS) tracking flags to tracking_config if missing
if await _has_table(conn, "tracking_config"):
nut_flags = [
("track_ups_online", "INTEGER DEFAULT 1"),
("track_ups_on_battery", "INTEGER DEFAULT 1"),
("track_ups_low_battery", "INTEGER DEFAULT 1"),
("track_ups_battery_restored", "INTEGER DEFAULT 1"),
("track_ups_comms_lost", "INTEGER DEFAULT 1"),
("track_ups_comms_restored", "INTEGER DEFAULT 1"),
("track_ups_replace_battery", "INTEGER DEFAULT 1"),
("track_ups_overload", "INTEGER DEFAULT 1"),
]
for col_name, col_type in nut_flags:
if not await _has_column(conn, "tracking_config", col_name):
await conn.execute(
text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} {col_type}")
)
logger.info("Added %s column to tracking_config table", col_name)
# Add Generic Webhook tracking flag to tracking_config if missing
if await _has_table(conn, "tracking_config"):
if not await _has_column(conn, "tracking_config", "track_webhook_received"):
await conn.execute(
text("ALTER TABLE tracking_config ADD COLUMN track_webhook_received INTEGER DEFAULT 1")
)
logger.info("Added track_webhook_received column to tracking_config table")
# Add quiet hours to tracking_config if missing.
# Start/end are nullable HH:MM strings; quiet_hours_enabled gates them.
if await _has_table(conn, "tracking_config"):
if not await _has_column(conn, "tracking_config", "quiet_hours_enabled"):
await conn.execute(
text("ALTER TABLE tracking_config ADD COLUMN quiet_hours_enabled INTEGER DEFAULT 0")
)
logger.info("Added quiet_hours_enabled column to tracking_config table")
for col_name in ("quiet_hours_start", "quiet_hours_end"):
if not await _has_column(conn, "tracking_config", col_name):
await conn.execute(
text(f"ALTER TABLE tracking_config ADD COLUMN {col_name} TEXT")
)
logger.info("Added %s column to tracking_config table", col_name)
# Drop legacy template content columns from template_config
# (template content moved to template_slot child rows)
if await _has_table(conn, "template_config"):
legacy_cols = [
"message_assets_added", "message_assets_removed",
"message_collection_renamed", "message_collection_deleted",
"message_sharing_changed", "periodic_summary_message",
"scheduled_assets_message", "memory_mode_message",
]
for col_name in legacy_cols:
if await _has_column(conn, "template_config", col_name):
await conn.execute(
text(f"ALTER TABLE template_config DROP COLUMN {col_name}")
)
logger.info("Dropped legacy column %s from template_config", col_name)
# Add collection_name and shared to tracker_state if missing
state_table = "notification_tracker_state" if await _has_table(conn, "notification_tracker_state") else "tracker_state"
if await _has_table(conn, state_table):
if not await _has_column(conn, state_table, "collection_name"):
await conn.execute(
text(f"ALTER TABLE {state_table} ADD COLUMN collection_name TEXT DEFAULT ''")
)
logger.info("Added collection_name column to %s table", state_table)
if not await _has_column(conn, state_table, "shared"):
await conn.execute(
text(f"ALTER TABLE {state_table} ADD COLUMN shared INTEGER DEFAULT 0")
)
logger.info("Added shared column to %s table", state_table)
# meta_fingerprint — small JSON blob captured from the provider's
# cheap meta probe. An empty default means "unknown, do a full
# fetch next tick" so existing rows don't wrongly skip detection.
if not await _has_column(conn, state_table, "meta_fingerprint"):
await conn.execute(
text(f"ALTER TABLE {state_table} ADD COLUMN meta_fingerprint TEXT DEFAULT '{{}}'")
)
logger.info("Added meta_fingerprint column to %s table", state_table)
# Add language_code to telegram_chat if missing
if await _has_table(conn, "telegram_chat"):
if not await _has_column(conn, "telegram_chat", "language_code"):
await conn.execute(
text("ALTER TABLE telegram_chat ADD COLUMN language_code TEXT DEFAULT ''")
)
logger.info("Added language_code column to telegram_chat table")
# Add language_override to telegram_chat if missing
if not await _has_column(conn, "telegram_chat", "language_override"):
await conn.execute(
text("ALTER TABLE telegram_chat ADD COLUMN language_override TEXT DEFAULT ''")
)
logger.info("Added language_override column to telegram_chat table")
# Add locale to target_receiver if missing
if await _has_table(conn, "target_receiver"):
if not await _has_column(conn, "target_receiver", "locale"):
await conn.execute(
text("ALTER TABLE target_receiver ADD COLUMN locale TEXT DEFAULT ''")
)
logger.info("Added locale column to target_receiver table")
# Add commands_enabled to telegram_chat if missing (default disabled)
if not await _has_column(conn, "telegram_chat", "commands_enabled"):
await conn.execute(
text("ALTER TABLE telegram_chat ADD COLUMN commands_enabled INTEGER DEFAULT 0")
)
logger.info("Added commands_enabled column to telegram_chat table")
# Add webhook_token to service_provider if missing
if await _has_table(conn, "service_provider"):
if not await _has_column(conn, "service_provider", "webhook_token"):
await conn.execute(
text("ALTER TABLE service_provider ADD COLUMN webhook_token TEXT DEFAULT ''")
)
logger.info("Added webhook_token column to service_provider table")
# Backfill existing providers with unique tokens
import uuid
providers = (await conn.execute(text("SELECT id FROM service_provider"))).fetchall()
for row in providers:
await conn.execute(
text("UPDATE service_provider SET webhook_token = :tok WHERE id = :pid"),
{"tok": uuid.uuid4().hex, "pid": row[0]},
)
if providers:
logger.info("Backfilled webhook_token for %d existing providers", len(providers))
# ---------------------------------------------------------------------------
# Legacy tracker_target migration (pre-Phase 1)
# ---------------------------------------------------------------------------
async def migrate_tracker_targets(engine: AsyncEngine) -> None:
"""Migrate legacy Tracker.target_ids JSON arrays to TrackerTarget rows.
Also migrates:
- Tracker.tracking_config_id → TrackerTarget.tracking_config_id
- Tracker.quiet_hours_* → TrackerTarget.quiet_hours_*
- NotificationTarget.template_config_id → TrackerTarget.template_config_id
- TelegramBot.commands_config → TrackerTarget.commands_config (for telegram targets)
Idempotent: skips if legacy columns don't exist or data already migrated.
"""
async with engine.begin() as conn:
# Determine which table name exists (pre- or post-rename)
if await _has_table(conn, "tracker"):
tracker_table = "tracker"
tt_table = "tracker_target"
tracker_id_col = "tracker_id"
elif await _has_table(conn, "notification_tracker"):
tracker_table = "notification_tracker"
tt_table = "notification_tracker_target"
tracker_id_col = "notification_tracker_id"
else:
logger.debug("No tracker table found — skipping migration")
return
# Check if legacy target_ids column exists
if not await _has_column(conn, tracker_table, "target_ids"):
logger.debug("No legacy target_ids column found — skipping migration")
return
# Check if junction table already has data
if await _has_table(conn, tt_table):
tt_count = (
await conn.execute(text(f"SELECT COUNT(*) FROM {tt_table}"))
).scalar()
if tt_count and tt_count > 0:
logger.debug(
"%s table already has %d rows — skipping migration",
tt_table, tt_count,
)
return
# Load legacy data
trackers = (
await conn.execute(
text(
f"SELECT id, target_ids, tracking_config_id, "
f"quiet_hours_start, quiet_hours_end FROM {tracker_table}"
)
)
).fetchall()
if not trackers:
logger.debug("No trackers to migrate")
return
# Load template_config_id from targets (legacy field)
target_template_map: dict[int, int | None] = {}
if await _has_column(conn, "notification_target", "template_config_id"):
targets = (
await conn.execute(
text("SELECT id, template_config_id FROM notification_target")
)
).fetchall()
for t in targets:
target_template_map[t[0]] = t[1]
# Load commands_config from telegram_bots (legacy field)
bot_commands_map: dict[int, str | None] = {}
if await _has_column(conn, "telegram_bot", "commands_config"):
bots = (
await conn.execute(
text("SELECT id, commands_config FROM telegram_bot")
)
).fetchall()
for b in bots:
bot_commands_map[b[0]] = b[1]
# Build target → bot mapping for commands_config migration
target_bot_map: dict[int, int] = {}
if bot_commands_map:
tgt_rows = (
await conn.execute(
text("SELECT id, config FROM notification_target WHERE type='telegram'")
)
).fetchall()
for tgt in tgt_rows:
try:
cfg = json.loads(tgt[1]) if isinstance(tgt[1], str) else tgt[1]
if cfg and "bot_token" in cfg:
for bot_id, _ in bot_commands_map.items():
bot_token_row = (
await conn.execute(
text("SELECT token FROM telegram_bot WHERE id=:bid"),
{"bid": bot_id},
)
).fetchone()
if bot_token_row and bot_token_row[0] == cfg.get("bot_token"):
target_bot_map[tgt[0]] = bot_id
except Exception:
logger.warning(
"Failed to match bot token for target %s", tgt[0],
exc_info=True,
)
# Create junction rows
migrated = 0
for tracker in trackers:
tracker_id = tracker[0]
raw_target_ids = tracker[1]
tracking_config_id = tracker[2]
quiet_hours_start = tracker[3]
quiet_hours_end = tracker[4]
if isinstance(raw_target_ids, str):
try:
target_ids = json.loads(raw_target_ids)
except (json.JSONDecodeError, TypeError):
target_ids = []
elif isinstance(raw_target_ids, list):
target_ids = raw_target_ids
else:
target_ids = []
for target_id in target_ids:
template_config_id = target_template_map.get(target_id)
commands_config = None
if target_id in target_bot_map:
bot_id = target_bot_map[target_id]
raw_cmd = bot_commands_map.get(bot_id)
if raw_cmd:
commands_config = (
raw_cmd if isinstance(raw_cmd, str) else json.dumps(raw_cmd)
)
await conn.execute(
text(
f"INSERT INTO {tt_table} "
f"({tracker_id_col}, target_id, tracking_config_id, "
f"template_config_id, enabled, quiet_hours_start, "
f"quiet_hours_end, commands_config) "
f"VALUES (:tid, :tgtid, :tcid, :tmplid, 1, :qhs, :qhe, :cmd)"
),
{
"tid": tracker_id,
"tgtid": target_id,
"tcid": tracking_config_id,
"tmplid": template_config_id,
"qhs": quiet_hours_start,
"qhe": quiet_hours_end,
"cmd": commands_config,
},
)
migrated += 1
logger.info("Migrated %d tracker-target links", migrated)
# ---------------------------------------------------------------------------
# Phase 1: Entity refactor migration
# ---------------------------------------------------------------------------
async def migrate_entity_refactor(engine: AsyncEngine) -> None:
"""Phase 1 entity refactor — rename tables, add columns, create new tables.
Fully idempotent: every operation checks preconditions before acting.
"""
async with engine.begin() as conn:
# ------------------------------------------------------------------
# 1. Rename table: tracker → notification_tracker
# ------------------------------------------------------------------
if await _has_table(conn, "tracker") and not await _has_table(conn, "notification_tracker"):
await conn.execute(text("ALTER TABLE tracker RENAME TO notification_tracker"))
logger.info("Renamed table tracker → notification_tracker")
# ------------------------------------------------------------------
# 2. Rename table: tracker_target → notification_tracker_target
# and rename column tracker_id → notification_tracker_id
# ------------------------------------------------------------------
if await _has_table(conn, "tracker_target") and not await _has_table(conn, "notification_tracker_target"):
# SQLite doesn't support RENAME COLUMN in older versions, so we
# recreate the table with the new column name.
await conn.execute(text(
"CREATE TABLE notification_tracker_target ("
" id INTEGER PRIMARY KEY,"
" notification_tracker_id INTEGER REFERENCES notification_tracker(id),"
" target_id INTEGER REFERENCES notification_target(id),"
" tracking_config_id INTEGER REFERENCES tracking_config(id),"
" template_config_id INTEGER REFERENCES template_config(id),"
" enabled INTEGER DEFAULT 1,"
" quiet_hours_start TEXT,"
" quiet_hours_end TEXT,"
" commands_config TEXT,"
" created_at TIMESTAMP"
")"
))
await conn.execute(text(
"INSERT INTO notification_tracker_target "
"(id, notification_tracker_id, target_id, tracking_config_id, "
"template_config_id, enabled, quiet_hours_start, quiet_hours_end, "
"commands_config, created_at) "
"SELECT id, tracker_id, target_id, tracking_config_id, "
"template_config_id, enabled, quiet_hours_start, quiet_hours_end, "
"commands_config, created_at "
"FROM tracker_target"
))
await conn.execute(text("DROP TABLE tracker_target"))
logger.info("Renamed table tracker_target → notification_tracker_target (with column rename tracker_id → notification_tracker_id)")
# ------------------------------------------------------------------
# 3. Rename table: tracker_state → notification_tracker_state
# and rename column tracker_id → notification_tracker_id
# ------------------------------------------------------------------
if await _has_table(conn, "tracker_state") and not await _has_table(conn, "notification_tracker_state"):
await conn.execute(text(
"CREATE TABLE notification_tracker_state ("
" id INTEGER PRIMARY KEY,"
" notification_tracker_id INTEGER REFERENCES notification_tracker(id),"
" collection_id TEXT,"
" collection_name TEXT DEFAULT '',"
" shared INTEGER DEFAULT 0,"
" asset_ids TEXT,"
" pending_asset_ids TEXT,"
" last_updated TIMESTAMP"
")"
))
await conn.execute(text(
"INSERT INTO notification_tracker_state "
"(id, notification_tracker_id, collection_id, collection_name, "
"shared, asset_ids, pending_asset_ids, last_updated) "
"SELECT id, tracker_id, collection_id, collection_name, "
"shared, asset_ids, pending_asset_ids, last_updated "
"FROM tracker_state"
))
await conn.execute(text("DROP TABLE tracker_state"))
logger.info("Renamed table tracker_state → notification_tracker_state (with column rename tracker_id → notification_tracker_id)")
# ------------------------------------------------------------------
# 4. Add chat_action column to notification_target
# ------------------------------------------------------------------
if await _has_table(conn, "notification_target"):
if not await _has_column(conn, "notification_target", "chat_action"):
await conn.execute(
text("ALTER TABLE notification_target ADD COLUMN chat_action TEXT")
)
logger.info("Added chat_action column to notification_target table")
# ------------------------------------------------------------------
# 5. Rename tracker_id → notification_tracker_id in event_log
# ------------------------------------------------------------------
if await _has_table(conn, "event_log"):
if await _has_column(conn, "event_log", "tracker_id") and not await _has_column(conn, "event_log", "notification_tracker_id"):
# Recreate event_log with renamed column
await conn.execute(text(
"CREATE TABLE event_log_new ("
" id INTEGER PRIMARY KEY,"
" notification_tracker_id INTEGER REFERENCES notification_tracker(id),"
" tracker_name TEXT DEFAULT '',"
" provider_id INTEGER,"
" provider_name TEXT DEFAULT '',"
" event_type TEXT,"
" collection_id TEXT,"
" collection_name TEXT,"
" assets_count INTEGER DEFAULT 0,"
" details TEXT,"
" created_at TIMESTAMP"
")"
))
await conn.execute(text(
"INSERT INTO event_log_new "
"(id, notification_tracker_id, tracker_name, provider_id, "
"provider_name, event_type, collection_id, collection_name, "
"assets_count, details, created_at) "
"SELECT id, tracker_id, tracker_name, provider_id, "
"provider_name, event_type, collection_id, collection_name, "
"assets_count, details, created_at "
"FROM event_log"
))
await conn.execute(text("DROP TABLE event_log"))
await conn.execute(text("ALTER TABLE event_log_new RENAME TO event_log"))
logger.info("Renamed column tracker_id → notification_tracker_id in event_log")
# ------------------------------------------------------------------
# 6. Create command_config table
# ------------------------------------------------------------------
if not await _has_table(conn, "command_config"):
await conn.execute(text(
"CREATE TABLE command_config ("
" id INTEGER PRIMARY KEY,"
" user_id INTEGER NOT NULL REFERENCES user(id),"
" provider_type TEXT NOT NULL,"
" name TEXT NOT NULL,"
" icon TEXT DEFAULT '',"
" enabled_commands TEXT DEFAULT '[]',"
" locale TEXT DEFAULT 'en',"
" response_mode TEXT DEFAULT 'media',"
" default_count INTEGER DEFAULT 5,"
" rate_limits TEXT DEFAULT '{}',"
" created_at TIMESTAMP"
")"
))
logger.info("Created command_config table")
else:
# Backfill locale column for tables created before locale was on the model
if not await _has_column(conn, "command_config", "locale"):
await conn.execute(
text("ALTER TABLE command_config ADD COLUMN locale TEXT DEFAULT 'en'")
)
logger.info("Added locale column to command_config table")
# ------------------------------------------------------------------
# 7. Create command_tracker table
# ------------------------------------------------------------------
if not await _has_table(conn, "command_tracker"):
await conn.execute(text(
"CREATE TABLE command_tracker ("
" id INTEGER PRIMARY KEY,"
" user_id INTEGER NOT NULL REFERENCES user(id),"
" provider_id INTEGER NOT NULL REFERENCES service_provider(id),"
" command_config_id INTEGER NOT NULL REFERENCES command_config(id),"
" name TEXT NOT NULL,"
" icon TEXT DEFAULT '',"
" enabled INTEGER DEFAULT 1,"
" created_at TIMESTAMP"
")"
))
logger.info("Created command_tracker table")
# ------------------------------------------------------------------
# 8. Create command_tracker_listener table
# ------------------------------------------------------------------
if not await _has_table(conn, "command_tracker_listener"):
await conn.execute(text(
"CREATE TABLE command_tracker_listener ("
" id INTEGER PRIMARY KEY,"
" command_tracker_id INTEGER NOT NULL REFERENCES command_tracker(id),"
" listener_type TEXT NOT NULL,"
" listener_id INTEGER NOT NULL,"
" created_at TIMESTAMP,"
" UNIQUE(command_tracker_id, listener_type, listener_id)"
")"
))
logger.info("Created command_tracker_listener table")
# ------------------------------------------------------------------
# 9. Migrate TelegramBot.commands_config → CommandConfig rows
# ------------------------------------------------------------------
if await _has_table(conn, "telegram_bot") and await _has_column(conn, "telegram_bot", "commands_config"):
# Only migrate if command_config table is empty (idempotent)
cc_count = (await conn.execute(text("SELECT COUNT(*) FROM command_config"))).scalar()
if cc_count == 0:
bots = (await conn.execute(text(
"SELECT id, user_id, commands_config FROM telegram_bot"
))).fetchall()
migrated = 0
for bot in bots:
bot_id, user_id, raw_config = bot[0], bot[1], bot[2]
if not raw_config:
continue
try:
cfg = json.loads(raw_config) if isinstance(raw_config, str) else raw_config
except (json.JSONDecodeError, TypeError):
continue
# Skip empty/default configs
if not cfg or cfg == {}:
continue
# Extract fields from legacy commands_config
enabled_commands = json.dumps(cfg.get("enabled_commands", []))
locale = cfg.get("locale", "en")
response_mode = cfg.get("response_mode", "media")
default_count = cfg.get("default_count", 5)
rate_limits = json.dumps(cfg.get("rate_limits", {}))
provider_type = cfg.get("provider_type", "immich")
await conn.execute(
text(
"INSERT INTO command_config "
"(user_id, provider_type, name, enabled_commands, locale, "
"response_mode, default_count, rate_limits, created_at) "
"VALUES (:uid, :pt, :name, :ec, :locale, :rm, :dc, :rl, CURRENT_TIMESTAMP)"
),
{
"uid": user_id,
"pt": provider_type,
"name": f"Bot #{bot_id} Commands",
"ec": enabled_commands,
"locale": locale,
"rm": response_mode,
"dc": default_count,
"rl": rate_limits,
},
)
migrated += 1
if migrated:
logger.info("Migrated %d bot commands_config → command_config rows", migrated)
# NOTE: We intentionally do NOT drop commands_config from telegram_bot
# or notification_tracker_target. SQLite doesn't support DROP COLUMN in
# all versions, and SQLModel will simply ignore columns not defined on
# the model class. The columns will remain in the DB but are unused.
# ---------------------------------------------------------------------------
# Template slot migration
# ---------------------------------------------------------------------------
# Old column names that existed on template_config before the slot refactor
_LEGACY_TEMPLATE_COLUMNS = [
"message_assets_added",
"message_assets_removed",
"message_collection_renamed",
"message_collection_deleted",
"message_sharing_changed",
"periodic_summary_message",
"scheduled_assets_message",
"memory_mode_message",
]
async def migrate_template_slots(engine: AsyncEngine) -> None:
"""Migrate legacy TemplateConfig column-based templates to TemplateSlot rows.
Reads the old per-column template values via raw SQL (since they're no longer
on the SQLModel class) and inserts them as TemplateSlot rows.
Idempotent: skips if template_slot table already has data or legacy columns
don't exist.
"""
async with engine.begin() as conn:
if not await _has_table(conn, "template_config"):
return
# Check if the legacy columns still exist in the DB
has_legacy = await _has_column(conn, "template_config", "message_assets_added")
if not has_legacy:
logger.debug("No legacy template columns found — skipping slot migration")
return
# Check if template_slot table exists and already has data
if await _has_table(conn, "template_slot"):
slot_count = (await conn.execute(text("SELECT COUNT(*) FROM template_slot"))).scalar()
if slot_count and slot_count > 0:
logger.debug("template_slot table already has %d rows — skipping migration", slot_count)
return
# Create template_slot table if it doesn't exist yet
# (SQLModel.metadata.create_all may have already created it, but be safe)
if not await _has_table(conn, "template_slot"):
await conn.execute(text(
"CREATE TABLE template_slot ("
" id INTEGER PRIMARY KEY,"
" config_id INTEGER NOT NULL REFERENCES template_config(id),"
" slot_name TEXT NOT NULL,"
" template TEXT DEFAULT '',"
" UNIQUE(config_id, slot_name)"
")"
))
logger.info("Created template_slot table")
# Read all template configs with their legacy column values
col_list = ", ".join(_LEGACY_TEMPLATE_COLUMNS)
rows = (await conn.execute(
text(f"SELECT id, {col_list} FROM template_config")
)).fetchall()
migrated = 0
for row in rows:
config_id = row[0]
for i, col_name in enumerate(_LEGACY_TEMPLATE_COLUMNS):
template_text = row[i + 1] or ""
if template_text.strip():
await conn.execute(
text(
"INSERT INTO template_slot (config_id, slot_name, template) "
"VALUES (:cid, :sn, :tmpl)"
),
{"cid": config_id, "sn": col_name, "tmpl": template_text},
)
migrated += 1
if migrated:
logger.info("Migrated %d template slots from legacy columns", migrated)
# ---------------------------------------------------------------------------
# Target receiver migration
# ---------------------------------------------------------------------------
async def migrate_target_receivers(engine: AsyncEngine) -> None:
"""Migrate single chat_id/url from NotificationTarget.config to TargetReceiver rows.
For each existing target that has a chat_id or url in its config JSON and
no receivers yet, creates a TargetReceiver row.
Idempotent: skips targets that already have receivers.
"""
async with engine.begin() as conn:
if not await _has_table(conn, "notification_target"):
return
# Create target_receiver table if it doesn't exist yet
if not await _has_table(conn, "target_receiver"):
await conn.execute(text(
"CREATE TABLE target_receiver ("
" id INTEGER PRIMARY KEY,"
" target_id INTEGER NOT NULL REFERENCES notification_target(id),"
" name TEXT DEFAULT '',"
" config TEXT DEFAULT '{}',"
" receiver_key TEXT DEFAULT '',"
" enabled INTEGER DEFAULT 1,"
" created_at TIMESTAMP,"
" UNIQUE(target_id, receiver_key)"
")"
))
logger.info("Created target_receiver table")
# Check if any receivers already exist
if await _has_table(conn, "target_receiver"):
recv_count = (await conn.execute(text("SELECT COUNT(*) FROM target_receiver"))).scalar()
if recv_count and recv_count > 0:
logger.debug("target_receiver already has %d rows — skipping migration", recv_count)
return
# Read all targets
targets = (await conn.execute(
text("SELECT id, type, config FROM notification_target")
)).fetchall()
migrated = 0
for row in targets:
target_id, target_type, raw_config = row[0], row[1], row[2]
try:
cfg = json.loads(raw_config) if isinstance(raw_config, str) else (raw_config or {})
except (json.JSONDecodeError, TypeError):
cfg = {}
receiver_key = ""
receiver_config = {}
receiver_name = ""
if target_type == "telegram":
chat_id = cfg.get("chat_id", "")
if chat_id:
receiver_key = str(chat_id)
receiver_config = {"chat_id": str(chat_id)}
receiver_name = f"Chat {chat_id}"
elif target_type == "webhook":
url = cfg.get("url", "")
if url:
receiver_key = url
receiver_config = {"url": url, "headers": cfg.get("headers", {})}
receiver_name = url[:50]
if receiver_key:
await conn.execute(
text(
"INSERT INTO target_receiver (target_id, name, config, receiver_key, enabled, created_at) "
"VALUES (:tid, :name, :cfg, :rk, 1, CURRENT_TIMESTAMP)"
),
{
"tid": target_id,
"name": receiver_name,
"cfg": json.dumps(receiver_config),
"rk": receiver_key,
},
)
migrated += 1
if migrated:
logger.info("Migrated %d target receivers from legacy config", migrated)
async def migrate_receivers_from_config(engine: AsyncEngine) -> None:
"""Extract delivery endpoint fields from target.config into TargetReceiver rows.
For each NotificationTarget that still has a delivery field (chat_id, url,
webhook_url, email, topic, room_id) in its config JSON:
1. Create a TargetReceiver row (if one with the same key doesn't exist)
2. Remove the delivery field(s) from the config JSON
Idempotent: checks for existing receiver before creating; only strips fields
that are still present in config.
"""
# Mapping: target_type -> (delivery field in config, receiver config builder)
_DELIVERY_FIELDS: dict[str, dict[str, str]] = {
"telegram": {"chat_id": "chat_id"},
"webhook": {"url": "url"},
"email": {"email": "email"},
"discord": {"webhook_url": "webhook_url"},
"slack": {"webhook_url": "webhook_url"},
"ntfy": {"topic": "topic"},
"matrix": {"room_id": "room_id"},
}
async with engine.begin() as conn:
if not await _has_table(conn, "notification_target"):
return
if not await _has_table(conn, "target_receiver"):
return
targets = (await conn.execute(
text("SELECT id, type, config FROM notification_target")
)).fetchall()
created = 0
cleaned = 0
for row in targets:
target_id, target_type, raw_config = row[0], row[1], row[2]
try:
cfg = json.loads(raw_config) if isinstance(raw_config, str) else (raw_config or {})
except (json.JSONDecodeError, TypeError):
cfg = {}
field_map = _DELIVERY_FIELDS.get(target_type, {})
if not field_map:
continue
# Check if any delivery field is present in config
delivery_field = list(field_map.keys())[0] # e.g. "chat_id", "url"
delivery_value = cfg.get(delivery_field)
if not delivery_value:
continue
# Build receiver config
receiver_config: dict[str, Any] = {delivery_field: delivery_value}
# For webhook, also move headers to receiver config
if target_type == "webhook" and "headers" in cfg:
receiver_config["headers"] = cfg["headers"]
receiver_key = str(delivery_value)
# Check if receiver already exists
existing = (await conn.execute(
text(
"SELECT id FROM target_receiver "
"WHERE target_id = :tid AND receiver_key = :rk"
),
{"tid": target_id, "rk": receiver_key},
)).fetchone()
if not existing:
# Derive a name for the receiver
if target_type == "telegram":
name = f"Chat {delivery_value}"
elif target_type == "webhook":
name = str(delivery_value)[:50]
elif target_type == "email":
name = str(delivery_value)
else:
name = str(delivery_value)[:50]
await conn.execute(
text(
"INSERT INTO target_receiver "
"(target_id, name, config, receiver_key, enabled, created_at) "
"VALUES (:tid, :name, :cfg, :rk, 1, CURRENT_TIMESTAMP)"
),
{
"tid": target_id,
"name": name,
"cfg": json.dumps(receiver_config),
"rk": receiver_key,
},
)
created += 1
# Remove delivery fields from config
new_cfg = dict(cfg)
new_cfg.pop(delivery_field, None)
# For webhook, also remove headers (moved to receiver)
if target_type == "webhook":
new_cfg.pop("headers", None)
if new_cfg != cfg:
await conn.execute(
text(
"UPDATE notification_target SET config = :cfg WHERE id = :tid"
),
{"cfg": json.dumps(new_cfg), "tid": target_id},
)
cleaned += 1
if created:
logger.info("Created %d receiver rows from target config delivery fields", created)
if cleaned:
logger.info("Cleaned delivery fields from %d target configs", cleaned)
async def migrate_template_locale(engine: AsyncEngine) -> None:
"""Add locale column to template_config and command_template_config.
Backfill locale from name: "(RU)" -> "ru", else "en" for system-owned rows.
"""
async with engine.begin() as conn:
for table in ("template_config", "command_template_config"):
if await _has_column(conn, table, "locale"):
continue
logger.info("Adding locale column to %s", table)
await conn.execute(text(f"ALTER TABLE {table} ADD COLUMN locale TEXT DEFAULT ''"))
# Backfill system-owned rows
await conn.execute(text(
f"UPDATE {table} SET locale = 'ru' WHERE user_id = 0 AND name LIKE '%(RU)%'"
))
await conn.execute(text(
f"UPDATE {table} SET locale = 'en' WHERE user_id = 0 AND locale = ''"
))
async def migrate_command_slot_locale(engine: AsyncEngine) -> None:
"""Add locale column to command_template_slot and merge system EN/RU configs.
1. Recreate command_template_slot with locale column and new unique constraint
2. Backfill locale from parent config's locale (or 'en')
3. Merge "Default Commands (RU)" slots into "Default Commands (EN)" with locale='ru'
4. Rename merged config, update references, delete orphan RU config
"""
async with engine.begin() as conn:
if not await _has_table(conn, "command_template_slot"):
return
# Skip if locale column already exists (idempotent)
if await _has_column(conn, "command_template_slot", "locale"):
return
logger.info("Adding locale column to command_template_slot and merging system configs")
# Step 1: Recreate table with locale column and new unique constraint
await conn.execute(text(
"CREATE TABLE command_template_slot_new ("
" id INTEGER PRIMARY KEY,"
" config_id INTEGER NOT NULL REFERENCES command_template_config(id),"
" slot_name TEXT NOT NULL,"
" locale TEXT NOT NULL DEFAULT 'en',"
" template TEXT DEFAULT '',"
" UNIQUE(config_id, slot_name, locale)"
")"
))
# Step 2: Copy existing data, deriving locale from parent config
await conn.execute(text(
"INSERT INTO command_template_slot_new (id, config_id, slot_name, locale, template) "
"SELECT s.id, s.config_id, s.slot_name, "
" CASE WHEN c.locale != '' THEN c.locale ELSE 'en' END, "
" s.template "
"FROM command_template_slot s "
"LEFT JOIN command_template_config c ON s.config_id = c.id"
))
await conn.execute(text("DROP TABLE command_template_slot"))
await conn.execute(text(
"ALTER TABLE command_template_slot_new RENAME TO command_template_slot"
))
# Step 3: Merge system EN/RU configs into one
# Find the system EN and RU config IDs
en_row = (await conn.execute(text(
"SELECT id FROM command_template_config "
"WHERE user_id = 0 AND (locale = 'en' OR name LIKE '%(EN)%') "
"LIMIT 1"
))).fetchone()
ru_row = (await conn.execute(text(
"SELECT id FROM command_template_config "
"WHERE user_id = 0 AND (locale = 'ru' OR name LIKE '%(RU)%') "
"LIMIT 1"
))).fetchone()
if en_row and ru_row and en_row[0] != ru_row[0]:
en_id, ru_id = en_row[0], ru_row[0]
# Move RU slots to the EN config (they already have locale='ru')
await conn.execute(text(
"UPDATE command_template_slot SET config_id = :en_id "
"WHERE config_id = :ru_id"
), {"en_id": en_id, "ru_id": ru_id})
# Update any command_config references from RU to EN
if await _has_table(conn, "command_config"):
await conn.execute(text(
"UPDATE command_config SET command_template_config_id = :en_id "
"WHERE command_template_config_id = :ru_id"
), {"en_id": en_id, "ru_id": ru_id})
# Delete the orphan RU config
await conn.execute(text(
"DELETE FROM command_template_config WHERE id = :ru_id"
), {"ru_id": ru_id})
# Rename the merged config
await conn.execute(text(
"UPDATE command_template_config SET name = 'Default Commands', "
"description = 'Default Immich command templates', locale = '' "
"WHERE id = :en_id"
), {"en_id": en_id})
logger.info(
"Merged system command template configs (EN=%d, RU=%d) into single config %d",
en_id, ru_id, en_id,
)
async def migrate_notification_slot_locale(engine: AsyncEngine) -> None:
"""Add locale column to template_slot and merge system EN/RU configs per provider.
1. Recreate template_slot with locale column and new unique constraint
2. Backfill locale from parent config's locale (or 'en')
3. For each provider: merge "Default X (RU)" slots into "Default X (EN)" with locale='ru'
4. Rename merged configs, update references, delete orphan RU configs
"""
async with engine.begin() as conn:
if not await _has_table(conn, "template_slot"):
return
# Skip if locale column already exists (idempotent)
if await _has_column(conn, "template_slot", "locale"):
return
logger.info("Adding locale column to template_slot and merging system configs")
# Step 1: Recreate table with locale column and new unique constraint
await conn.execute(text(
"CREATE TABLE template_slot_new ("
" id INTEGER PRIMARY KEY,"
" config_id INTEGER NOT NULL REFERENCES template_config(id),"
" slot_name TEXT NOT NULL,"
" locale TEXT NOT NULL DEFAULT 'en',"
" template TEXT DEFAULT '',"
" UNIQUE(config_id, slot_name, locale)"
")"
))
# Step 2: Copy existing data, deriving locale from parent config
await conn.execute(text(
"INSERT INTO template_slot_new (id, config_id, slot_name, locale, template) "
"SELECT s.id, s.config_id, s.slot_name, "
" CASE WHEN c.locale != '' THEN c.locale ELSE 'en' END, "
" s.template "
"FROM template_slot s "
"LEFT JOIN template_config c ON s.config_id = c.id"
))
await conn.execute(text("DROP TABLE template_slot"))
await conn.execute(text(
"ALTER TABLE template_slot_new RENAME TO template_slot"
))
# Step 3: Merge system EN/RU configs per provider type
providers = (await conn.execute(text(
"SELECT DISTINCT provider_type FROM template_config WHERE user_id = 0"
))).fetchall()
for (provider_type,) in providers:
en_row = (await conn.execute(text(
"SELECT id FROM template_config "
"WHERE user_id = 0 AND provider_type = :pt "
" AND (locale = 'en' OR name LIKE '%(EN)%') "
"LIMIT 1"
), {"pt": provider_type})).fetchone()
ru_row = (await conn.execute(text(
"SELECT id FROM template_config "
"WHERE user_id = 0 AND provider_type = :pt "
" AND (locale = 'ru' OR name LIKE '%(RU)%') "
"LIMIT 1"
), {"pt": provider_type})).fetchone()
if en_row and ru_row and en_row[0] != ru_row[0]:
en_id, ru_id = en_row[0], ru_row[0]
# Move RU slots to the EN config (they already have locale='ru')
await conn.execute(text(
"UPDATE template_slot SET config_id = :en_id "
"WHERE config_id = :ru_id"
), {"en_id": en_id, "ru_id": ru_id})
# Update notification_tracker_target references from RU to EN
if await _has_table(conn, "notification_tracker_target"):
await conn.execute(text(
"UPDATE notification_tracker_target SET template_config_id = :en_id "
"WHERE template_config_id = :ru_id"
), {"en_id": en_id, "ru_id": ru_id})
# Delete the orphan RU config
await conn.execute(text(
"DELETE FROM template_config WHERE id = :ru_id"
), {"ru_id": ru_id})
# Rename the merged config (strip locale suffix)
label = provider_type.capitalize()
await conn.execute(text(
"UPDATE template_config SET name = :name, "
"description = :desc, locale = '' "
"WHERE id = :en_id"
), {"name": f"Default {label}", "desc": f"Default {label} templates", "en_id": en_id})
logger.info(
"Merged system notification template configs for %s (EN=%d, RU=%d) into %d",
provider_type, en_id, ru_id, en_id,
)
async def migrate_user_token_version(engine: AsyncEngine) -> None:
"""Add token_version column to user for JWT revocation on password change."""
async with engine.begin() as conn:
if not await _has_table(conn, "user"):
return
if not await _has_column(conn, "user", "token_version"):
await conn.execute(
text("ALTER TABLE user ADD COLUMN token_version INTEGER NOT NULL DEFAULT 1")
)
logger.info("Added token_version column to user table")