Files
notify-bridge/packages/server/src/notify_bridge_server/commands/immich/events.py
T
alexei.dolgolyov e0bae394ee feat: comprehensive code review fixes — security, performance, quality
Backend security:
- Reject Gitea webhooks when webhook_secret is empty (was silently skipping)
- Add slowapi rate limiting on login (5/min) and setup (3/min) endpoints
- Add CORS middleware with configurable origins
- Mask telegram_webhook_secret in settings API response
- Protect system-owned command template configs from regular user modification
- Increase minimum password length to 8 characters

Backend performance:
- Batch queries in _resolve_command_context (3 queries instead of 3N)
- Concurrent album fetching with asyncio.gather in immich commands
- Singleton Jinja2 SandboxedEnvironment (reuse instead of per-render creation)
- TTLCache for rate limits (bounded memory, auto-eviction)
- Optional aiohttp session reuse in send_reply/send_media_group

Backend code quality:
- Extract dispatch_helpers.py (shared link_data loading + event filtering)
- Extract database/seeds.py from main.py (490 lines → dedicated module)
- Split immich_handler.py (415 lines) into commands/immich/ subpackage
- Replace bare except blocks with logged warnings
- Add per-provider config validation (Pydantic models)
- Truncate command input to 512 chars
- Expose usage_* and desc_* slots in capabilities and variables API

Frontend security:
- CSS.escape() for user-controlled querySelector in highlight.ts
- Client-side password min 8 chars validation on setup and password change

Frontend code quality:
- Replace any types with proper interfaces across top files
- Decompose targets/+page.svelte into TargetForm + ReceiverSection
- Fix $derived.by usage, $state mutation patterns
- Add console.warn to empty catch blocks

Frontend UX:
- Auth redirect via goto() with "Redirecting..." state
- Platform-aware Ctrl/Cmd K keyboard hint
- Remove stat-card hover transform

Frontend accessibility:
- Modal: role=dialog, aria-modal, focus trap, restore focus
- EntitySelect/IconGridSelect: listbox/option roles, aria-selected/disabled
2026-03-23 01:59:51 +03:00

200 lines
7.7 KiB
Python

"""Event-related Immich bot commands: events, latest, memory, random."""
from __future__ import annotations
import asyncio
import logging
import random as rng
from datetime import datetime, timezone
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ...database.engine import get_engine
from ...database.models import (
EventLog, NotificationTarget, NotificationTrackerTarget,
ServiceProvider, TelegramBot, TrackingConfig,
)
from ..handler import _get_notification_trackers_for_providers, _render_cmd_template
from .common import _format_assets
_LOGGER = logging.getLogger(__name__)
async def _cmd_events(
bot: TelegramBot, providers_map: dict[int, ServiceProvider],
count: int, locale: str,
) -> dict[str, Any]:
provider_ids = set(providers_map.keys())
trackers = await _get_notification_trackers_for_providers(provider_ids)
tracker_ids = [t.id for t in trackers]
if not tracker_ids:
return {"events": []}
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(EventLog)
.where(EventLog.tracker_id.in_(tracker_ids))
.order_by(EventLog.created_at.desc())
.limit(count)
)
events = result.all()
events_data = [
{"type": e.event_type, "album": e.collection_name,
"count": e.assets_count, "date": e.created_at.strftime("%m/%d %H:%M")}
for e in events
]
return {"events": events_data}
async def cmd_latest(
client: Any, all_album_ids: list[str], count: int,
locale: str, response_mode: str,
cmd_templates: dict[str, dict[str, str]],
) -> str | list[dict[str, Any]]:
"""Handle /latest command with concurrent album fetching."""
album_ids = all_album_ids[:10]
if not album_ids:
return _format_assets([], "latest", "", locale, response_mode, client, cmd_templates)
results = await asyncio.gather(
*[client.get_album(aid) for aid in album_ids],
return_exceptions=True,
)
latest_assets: list[dict[str, Any]] = []
for album_id, result in zip(album_ids, results):
if isinstance(result, Exception):
_LOGGER.warning("Failed to fetch album %s: %s", album_id, result)
continue
if result:
for aid, asset in list(result.assets.items())[:count]:
latest_assets.append({
"id": asset.id, "originalFileName": asset.filename,
"type": asset.type, "createdAt": asset.created_at,
})
latest_assets.sort(key=lambda a: a.get("createdAt", ""), reverse=True)
return _format_assets(latest_assets[:count], "latest", "", locale, response_mode, client, cmd_templates)
async def cmd_random(
client: Any, all_album_ids: list[str], count: int,
locale: str, response_mode: str,
cmd_templates: dict[str, dict[str, str]],
) -> str | list[dict[str, Any]]:
"""Handle /random command with concurrent album fetching."""
album_ids = all_album_ids[:10]
if not album_ids:
return _format_assets([], "random", "", locale, response_mode, client, cmd_templates)
results = await asyncio.gather(
*[client.get_album(aid) for aid in album_ids],
return_exceptions=True,
)
random_assets: list[dict[str, Any]] = []
for album_id, result in zip(album_ids, results):
if isinstance(result, Exception):
_LOGGER.warning("Failed to fetch album %s: %s", album_id, result)
continue
if result:
asset_list = list(result.assets.values())
sampled = rng.sample(asset_list, min(count, len(asset_list)))
for asset in sampled:
random_assets.append({
"id": asset.id, "originalFileName": asset.filename,
"type": asset.type,
})
rng.shuffle(random_assets)
return _format_assets(random_assets[:count], "random", "", locale, response_mode, client, cmd_templates)
async def _check_native_memory(bot: TelegramBot) -> bool:
"""Check if any tracker-target linked to this bot uses native memory source."""
engine = get_engine()
async with AsyncSession(engine) as session:
result = await session.exec(
select(NotificationTarget).where(
NotificationTarget.type == "telegram",
NotificationTarget.user_id == bot.user_id,
)
)
targets = result.all()
bot_target_ids = {t.id for t in targets if t.config.get("bot_token") == bot.token}
if not bot_target_ids:
return False
tt_result = await session.exec(
select(NotificationTrackerTarget).where(
NotificationTrackerTarget.target_id.in_(bot_target_ids)
)
)
for tt in tt_result.all():
if tt.tracking_config_id:
tc = await session.get(TrackingConfig, tt.tracking_config_id)
if tc and tc.memory_source == "native":
return True
return False
async def cmd_memory(
bot: TelegramBot, client: Any, all_album_ids: list[str], count: int,
locale: str, response_mode: str,
cmd_templates: dict[str, dict[str, str]],
) -> str | list[dict[str, Any]]:
"""Handle /memory command with concurrent album fetching."""
use_native = await _check_native_memory(bot)
today = datetime.now(timezone.utc)
memory_assets: list[dict[str, Any]] = []
if use_native:
memories = await client.get_memories()
tracked_ids = set(all_album_ids) if all_album_ids else None
for mem in memories:
year = mem.get("data", {}).get("year")
for raw_asset in mem.get("assets", []):
if tracked_ids:
asset_albums = raw_asset.get("albums", [])
if not any(a.get("id") in tracked_ids for a in asset_albums):
continue
memory_assets.append({
"id": raw_asset.get("id", ""),
"originalFileName": raw_asset.get("originalFileName", ""),
"type": raw_asset.get("type", "IMAGE"),
"createdAt": raw_asset.get("fileCreatedAt", raw_asset.get("createdAt", "")),
"year": year,
})
else:
album_ids = all_album_ids[:10]
if album_ids:
results = await asyncio.gather(
*[client.get_album(aid) for aid in album_ids],
return_exceptions=True,
)
month_day = (today.month, today.day)
for album_id, result in zip(album_ids, results):
if isinstance(result, Exception):
_LOGGER.warning("Failed to fetch album %s: %s", album_id, result)
continue
if result:
for aid, asset in result.assets.items():
try:
dt = datetime.fromisoformat(asset.created_at.replace("Z", "+00:00"))
if (dt.month, dt.day) == month_day and dt.year != today.year:
memory_assets.append({
"id": asset.id, "originalFileName": asset.filename,
"type": asset.type, "createdAt": asset.created_at,
"year": dt.year,
})
except (ValueError, AttributeError):
pass
memory_assets = memory_assets[:count]
if not memory_assets:
return _render_cmd_template(cmd_templates, "no_results", locale, {"command": "memory", "query": ""})
return _format_assets(memory_assets, "memory", "", locale, response_mode, client, cmd_templates)