diff --git a/packages/server/Dockerfile b/packages/server/Dockerfile new file mode 100644 index 0000000..9a7501c --- /dev/null +++ b/packages/server/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.13-slim + +WORKDIR /app + +# Install core library first (changes less often) +COPY packages/core/pyproject.toml packages/core/pyproject.toml +COPY packages/core/src/ packages/core/src/ +RUN pip install --no-cache-dir packages/core/ + +# Install server +COPY packages/server/pyproject.toml packages/server/pyproject.toml +COPY packages/server/src/ packages/server/src/ +RUN pip install --no-cache-dir packages/server/ + +# Create data directory +RUN mkdir -p /data + +ENV IMMICH_WATCHER_DATA_DIR=/data +ENV IMMICH_WATCHER_HOST=0.0.0.0 +ENV IMMICH_WATCHER_PORT=8420 + +EXPOSE 8420 + +VOLUME ["/data"] + +CMD ["immich-watcher"] diff --git a/packages/server/docker-compose.yml b/packages/server/docker-compose.yml new file mode 100644 index 0000000..4ec8815 --- /dev/null +++ b/packages/server/docker-compose.yml @@ -0,0 +1,15 @@ +services: + immich-watcher: + build: + context: ../.. + dockerfile: packages/server/Dockerfile + ports: + - "8420:8420" + volumes: + - watcher-data:/data + environment: + - IMMICH_WATCHER_SECRET_KEY=change-me-in-production + restart: unless-stopped + +volumes: + watcher-data: diff --git a/packages/server/pyproject.toml b/packages/server/pyproject.toml new file mode 100644 index 0000000..0c32927 --- /dev/null +++ b/packages/server/pyproject.toml @@ -0,0 +1,34 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "immich-watcher-server" +version = "0.1.0" +description = "Standalone Immich album change notification server" +requires-python = ">=3.12" +dependencies = [ + "immich-watcher-core==0.1.0", + "fastapi>=0.115", + "uvicorn[standard]>=0.32", + "sqlmodel>=0.0.22", + "aiosqlite>=0.20", + "pyjwt>=2.9", + "bcrypt>=4.2", + "apscheduler>=3.10,<4", + "jinja2>=3.1", + "aiohttp>=3.9", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "httpx>=0.27", +] + +[project.scripts] +immich-watcher = "immich_watcher_server.main:run" + +[tool.hatch.build.targets.wheel] +packages = ["src/immich_watcher_server"] diff --git a/packages/server/src/immich_watcher_server/__init__.py b/packages/server/src/immich_watcher_server/__init__.py new file mode 100644 index 0000000..b6dff55 --- /dev/null +++ b/packages/server/src/immich_watcher_server/__init__.py @@ -0,0 +1 @@ +"""Immich Watcher Server - standalone album change notification service.""" diff --git a/packages/server/src/immich_watcher_server/api/__init__.py b/packages/server/src/immich_watcher_server/api/__init__.py new file mode 100644 index 0000000..d1e594b --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/__init__.py @@ -0,0 +1 @@ +"""API routes package.""" diff --git a/packages/server/src/immich_watcher_server/api/servers.py b/packages/server/src/immich_watcher_server/api/servers.py new file mode 100644 index 0000000..f4c4658 --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/servers.py @@ -0,0 +1,158 @@ +"""Immich server management API routes.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +import aiohttp + +from immich_watcher_core.immich_client import ImmichClient + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import ImmichServer, User + +router = APIRouter(prefix="/api/servers", tags=["servers"]) + + +class ServerCreate(BaseModel): + name: str = "Immich" + url: str + api_key: str + + +class ServerUpdate(BaseModel): + name: str | None = None + url: str | None = None + api_key: str | None = None + + +class ServerResponse(BaseModel): + id: int + name: str + url: str + created_at: str + + +@router.get("") +async def list_servers( + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """List all Immich servers for the current user.""" + result = await session.exec( + select(ImmichServer).where(ImmichServer.user_id == user.id) + ) + servers = result.all() + return [ + {"id": s.id, "name": s.name, "url": s.url, "created_at": s.created_at.isoformat()} + for s in servers + ] + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_server( + body: ServerCreate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Add a new Immich server (validates connection).""" + # Validate connection + async with aiohttp.ClientSession() as http_session: + client = ImmichClient(http_session, body.url, body.api_key) + if not await client.ping(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Cannot connect to Immich server at {body.url}", + ) + # Fetch external domain + external_domain = await client.get_server_config() + + server = ImmichServer( + user_id=user.id, + name=body.name, + url=body.url, + api_key=body.api_key, + external_domain=external_domain, + ) + session.add(server) + await session.commit() + await session.refresh(server) + return {"id": server.id, "name": server.name, "url": server.url} + + +@router.get("/{server_id}") +async def get_server( + server_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Get a specific Immich server.""" + server = await _get_user_server(session, server_id, user.id) + return {"id": server.id, "name": server.name, "url": server.url, "created_at": server.created_at.isoformat()} + + +@router.put("/{server_id}") +async def update_server( + server_id: int, + body: ServerUpdate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Update an Immich server.""" + server = await _get_user_server(session, server_id, user.id) + if body.name is not None: + server.name = body.name + if body.url is not None: + server.url = body.url + if body.api_key is not None: + server.api_key = body.api_key + session.add(server) + await session.commit() + await session.refresh(server) + return {"id": server.id, "name": server.name, "url": server.url} + + +@router.delete("/{server_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_server( + server_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Delete an Immich server.""" + server = await _get_user_server(session, server_id, user.id) + await session.delete(server) + await session.commit() + + +@router.get("/{server_id}/albums") +async def list_albums( + server_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Fetch albums from an Immich server.""" + server = await _get_user_server(session, server_id, user.id) + async with aiohttp.ClientSession() as http_session: + client = ImmichClient(http_session, server.url, server.api_key) + albums = await client.get_albums() + return [ + { + "id": a.get("id"), + "albumName": a.get("albumName"), + "assetCount": a.get("assetCount", 0), + "shared": a.get("shared", False), + } + for a in albums + ] + + +async def _get_user_server( + session: AsyncSession, server_id: int, user_id: int +) -> ImmichServer: + """Get a server owned by the user, or raise 404.""" + server = await session.get(ImmichServer, server_id) + if not server or server.user_id != user_id: + raise HTTPException(status_code=404, detail="Server not found") + return server diff --git a/packages/server/src/immich_watcher_server/api/status.py b/packages/server/src/immich_watcher_server/api/status.py new file mode 100644 index 0000000..3d0e337 --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/status.py @@ -0,0 +1,55 @@ +"""Status/dashboard API route.""" + +from fastapi import APIRouter, Depends +from sqlmodel import func, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import AlbumTracker, EventLog, ImmichServer, NotificationTarget, User + +router = APIRouter(prefix="/api/status", tags=["status"]) + + +@router.get("") +async def get_status( + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Get dashboard status data.""" + servers_count = (await session.exec( + select(func.count()).select_from(ImmichServer).where(ImmichServer.user_id == user.id) + )).one() + + trackers_result = await session.exec( + select(AlbumTracker).where(AlbumTracker.user_id == user.id) + ) + trackers = trackers_result.all() + active_count = sum(1 for t in trackers if t.enabled) + + targets_count = (await session.exec( + select(func.count()).select_from(NotificationTarget).where(NotificationTarget.user_id == user.id) + )).one() + + recent_events = await session.exec( + select(EventLog) + .join(AlbumTracker, EventLog.tracker_id == AlbumTracker.id) + .where(AlbumTracker.user_id == user.id) + .order_by(EventLog.created_at.desc()) + .limit(10) + ) + + return { + "servers": servers_count, + "trackers": {"total": len(trackers), "active": active_count}, + "targets": targets_count, + "recent_events": [ + { + "id": e.id, + "event_type": e.event_type, + "album_name": e.album_name, + "created_at": e.created_at.isoformat(), + } + for e in recent_events.all() + ], + } diff --git a/packages/server/src/immich_watcher_server/api/targets.py b/packages/server/src/immich_watcher_server/api/targets.py new file mode 100644 index 0000000..4648bfb --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/targets.py @@ -0,0 +1,137 @@ +"""Notification target management API routes.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import NotificationTarget, User + +router = APIRouter(prefix="/api/targets", tags=["targets"]) + + +class TargetCreate(BaseModel): + type: str # "telegram" or "webhook" + name: str + config: dict # telegram: {bot_token, chat_id}, webhook: {url, headers?} + + +class TargetUpdate(BaseModel): + name: str | None = None + config: dict | None = None + + +@router.get("") +async def list_targets( + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """List all notification targets for the current user.""" + result = await session.exec( + select(NotificationTarget).where(NotificationTarget.user_id == user.id) + ) + return [ + {"id": t.id, "type": t.type, "name": t.name, "config": _safe_config(t), "created_at": t.created_at.isoformat()} + for t in result.all() + ] + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_target( + body: TargetCreate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Create a new notification target.""" + if body.type not in ("telegram", "webhook"): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Type must be 'telegram' or 'webhook'", + ) + target = NotificationTarget( + user_id=user.id, + type=body.type, + name=body.name, + config=body.config, + ) + session.add(target) + await session.commit() + await session.refresh(target) + return {"id": target.id, "type": target.type, "name": target.name} + + +@router.get("/{target_id}") +async def get_target( + target_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Get a specific notification target.""" + target = await _get_user_target(session, target_id, user.id) + return {"id": target.id, "type": target.type, "name": target.name, "config": _safe_config(target)} + + +@router.put("/{target_id}") +async def update_target( + target_id: int, + body: TargetUpdate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Update a notification target.""" + target = await _get_user_target(session, target_id, user.id) + if body.name is not None: + target.name = body.name + if body.config is not None: + target.config = body.config + session.add(target) + await session.commit() + await session.refresh(target) + return {"id": target.id, "type": target.type, "name": target.name} + + +@router.delete("/{target_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_target( + target_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Delete a notification target.""" + target = await _get_user_target(session, target_id, user.id) + await session.delete(target) + await session.commit() + + +@router.post("/{target_id}/test") +async def test_target( + target_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Send a test notification to a target.""" + target = await _get_user_target(session, target_id, user.id) + from ..services.notifier import send_test_notification + result = await send_test_notification(target) + return result + + +def _safe_config(target: NotificationTarget) -> dict: + """Return config with sensitive fields masked.""" + config = dict(target.config) + if "bot_token" in config: + token = config["bot_token"] + config["bot_token"] = f"{token[:8]}...{token[-4:]}" if len(token) > 12 else "***" + if "api_key" in config: + config["api_key"] = "***" + return config + + +async def _get_user_target( + session: AsyncSession, target_id: int, user_id: int +) -> NotificationTarget: + target = await session.get(NotificationTarget, target_id) + if not target or target.user_id != user_id: + raise HTTPException(status_code=404, detail="Target not found") + return target diff --git a/packages/server/src/immich_watcher_server/api/templates.py b/packages/server/src/immich_watcher_server/api/templates.py new file mode 100644 index 0000000..182ba6e --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/templates.py @@ -0,0 +1,144 @@ +"""Message template management API routes.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +import jinja2 + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import MessageTemplate, User + +router = APIRouter(prefix="/api/templates", tags=["templates"]) + +# Sample data for template preview +_SAMPLE_CONTEXT = { + "album_name": "Family Photos", + "album_url": "https://immich.example.com/share/abc123", + "added_count": 3, + "removed_count": 0, + "change_type": "assets_added", + "people": ["Alice", "Bob"], + "added_assets": [ + {"filename": "IMG_001.jpg", "type": "IMAGE", "owner": "Alice", "created_at": "2024-03-19T10:30:00Z"}, + {"filename": "IMG_002.jpg", "type": "IMAGE", "owner": "Bob", "created_at": "2024-03-19T11:00:00Z"}, + {"filename": "VID_003.mp4", "type": "VIDEO", "owner": "Alice", "created_at": "2024-03-19T11:30:00Z"}, + ], +} + + +class TemplateCreate(BaseModel): + name: str + body: str + is_default: bool = False + + +class TemplateUpdate(BaseModel): + name: str | None = None + body: str | None = None + is_default: bool | None = None + + +@router.get("") +async def list_templates( + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """List all templates for the current user.""" + result = await session.exec( + select(MessageTemplate).where(MessageTemplate.user_id == user.id) + ) + return [ + {"id": t.id, "name": t.name, "body": t.body, "is_default": t.is_default, "created_at": t.created_at.isoformat()} + for t in result.all() + ] + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_template( + body: TemplateCreate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Create a new message template.""" + template = MessageTemplate( + user_id=user.id, + name=body.name, + body=body.body, + is_default=body.is_default, + ) + session.add(template) + await session.commit() + await session.refresh(template) + return {"id": template.id, "name": template.name} + + +@router.get("/{template_id}") +async def get_template( + template_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Get a specific template.""" + template = await _get_user_template(session, template_id, user.id) + return {"id": template.id, "name": template.name, "body": template.body, "is_default": template.is_default} + + +@router.put("/{template_id}") +async def update_template( + template_id: int, + body: TemplateUpdate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Update a template.""" + template = await _get_user_template(session, template_id, user.id) + for field, value in body.model_dump(exclude_unset=True).items(): + setattr(template, field, value) + session.add(template) + await session.commit() + await session.refresh(template) + return {"id": template.id, "name": template.name} + + +@router.delete("/{template_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_template( + template_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Delete a template.""" + template = await _get_user_template(session, template_id, user.id) + await session.delete(template) + await session.commit() + + +@router.post("/{template_id}/preview") +async def preview_template( + template_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Render a template with sample data.""" + template = await _get_user_template(session, template_id, user.id) + try: + env = jinja2.Environment(autoescape=False) + tmpl = env.from_string(template.body) + rendered = tmpl.render(**_SAMPLE_CONTEXT) + return {"rendered": rendered} + except jinja2.TemplateError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Template error: {e}", + ) + + +async def _get_user_template( + session: AsyncSession, template_id: int, user_id: int +) -> MessageTemplate: + template = await session.get(MessageTemplate, template_id) + if not template or template.user_id != user_id: + raise HTTPException(status_code=404, detail="Template not found") + return template diff --git a/packages/server/src/immich_watcher_server/api/trackers.py b/packages/server/src/immich_watcher_server/api/trackers.py new file mode 100644 index 0000000..1879aeb --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/trackers.py @@ -0,0 +1,188 @@ +"""Album tracker management API routes.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ..auth.dependencies import get_current_user +from ..database.engine import get_session +from ..database.models import AlbumTracker, EventLog, ImmichServer, User + +router = APIRouter(prefix="/api/trackers", tags=["trackers"]) + + +class TrackerCreate(BaseModel): + server_id: int + name: str + album_ids: list[str] + event_types: list[str] = ["assets_added"] + target_ids: list[int] = [] + template_id: int | None = None + scan_interval: int = 60 + enabled: bool = True + quiet_hours_start: str | None = None + quiet_hours_end: str | None = None + + +class TrackerUpdate(BaseModel): + name: str | None = None + album_ids: list[str] | None = None + event_types: list[str] | None = None + target_ids: list[int] | None = None + template_id: int | None = None + scan_interval: int | None = None + enabled: bool | None = None + quiet_hours_start: str | None = None + quiet_hours_end: str | None = None + + +@router.get("") +async def list_trackers( + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """List all trackers for the current user.""" + result = await session.exec( + select(AlbumTracker).where(AlbumTracker.user_id == user.id) + ) + return [_tracker_response(t) for t in result.all()] + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_tracker( + body: TrackerCreate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Create a new album tracker.""" + # Verify server ownership + server = await session.get(ImmichServer, body.server_id) + if not server or server.user_id != user.id: + raise HTTPException(status_code=404, detail="Server not found") + + tracker = AlbumTracker( + user_id=user.id, + server_id=body.server_id, + name=body.name, + album_ids=body.album_ids, + event_types=body.event_types, + target_ids=body.target_ids, + template_id=body.template_id, + scan_interval=body.scan_interval, + enabled=body.enabled, + quiet_hours_start=body.quiet_hours_start, + quiet_hours_end=body.quiet_hours_end, + ) + session.add(tracker) + await session.commit() + await session.refresh(tracker) + return _tracker_response(tracker) + + +@router.get("/{tracker_id}") +async def get_tracker( + tracker_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Get a specific tracker.""" + tracker = await _get_user_tracker(session, tracker_id, user.id) + return _tracker_response(tracker) + + +@router.put("/{tracker_id}") +async def update_tracker( + tracker_id: int, + body: TrackerUpdate, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Update a tracker.""" + tracker = await _get_user_tracker(session, tracker_id, user.id) + for field, value in body.model_dump(exclude_unset=True).items(): + setattr(tracker, field, value) + session.add(tracker) + await session.commit() + await session.refresh(tracker) + return _tracker_response(tracker) + + +@router.delete("/{tracker_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_tracker( + tracker_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Delete a tracker.""" + tracker = await _get_user_tracker(session, tracker_id, user.id) + await session.delete(tracker) + await session.commit() + + +@router.post("/{tracker_id}/trigger") +async def trigger_tracker( + tracker_id: int, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Force an immediate check for a tracker.""" + tracker = await _get_user_tracker(session, tracker_id, user.id) + # Import here to avoid circular imports + from ..services.watcher import check_tracker + result = await check_tracker(tracker.id) + return {"triggered": True, "result": result} + + +@router.get("/{tracker_id}/history") +async def tracker_history( + tracker_id: int, + limit: int = 20, + user: User = Depends(get_current_user), + session: AsyncSession = Depends(get_session), +): + """Get recent events for a tracker.""" + await _get_user_tracker(session, tracker_id, user.id) + result = await session.exec( + select(EventLog) + .where(EventLog.tracker_id == tracker_id) + .order_by(EventLog.created_at.desc()) + .limit(limit) + ) + return [ + { + "id": e.id, + "event_type": e.event_type, + "album_id": e.album_id, + "album_name": e.album_name, + "details": e.details, + "created_at": e.created_at.isoformat(), + } + for e in result.all() + ] + + +def _tracker_response(t: AlbumTracker) -> dict: + return { + "id": t.id, + "name": t.name, + "server_id": t.server_id, + "album_ids": t.album_ids, + "event_types": t.event_types, + "target_ids": t.target_ids, + "template_id": t.template_id, + "scan_interval": t.scan_interval, + "enabled": t.enabled, + "quiet_hours_start": t.quiet_hours_start, + "quiet_hours_end": t.quiet_hours_end, + "created_at": t.created_at.isoformat(), + } + + +async def _get_user_tracker( + session: AsyncSession, tracker_id: int, user_id: int +) -> AlbumTracker: + tracker = await session.get(AlbumTracker, tracker_id) + if not tracker or tracker.user_id != user_id: + raise HTTPException(status_code=404, detail="Tracker not found") + return tracker diff --git a/packages/server/src/immich_watcher_server/api/users.py b/packages/server/src/immich_watcher_server/api/users.py new file mode 100644 index 0000000..b05f73a --- /dev/null +++ b/packages/server/src/immich_watcher_server/api/users.py @@ -0,0 +1,78 @@ +"""User management API routes (admin only).""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +import bcrypt + +from ..auth.dependencies import require_admin +from ..database.engine import get_session +from ..database.models import User + +router = APIRouter(prefix="/api/users", tags=["users"]) + + +class UserCreate(BaseModel): + username: str + password: str + role: str = "user" + + +class UserUpdate(BaseModel): + username: str | None = None + password: str | None = None + role: str | None = None + + +@router.get("") +async def list_users( + admin: User = Depends(require_admin), + session: AsyncSession = Depends(get_session), +): + """List all users (admin only).""" + result = await session.exec(select(User)) + return [ + {"id": u.id, "username": u.username, "role": u.role, "created_at": u.created_at.isoformat()} + for u in result.all() + ] + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_user( + body: UserCreate, + admin: User = Depends(require_admin), + session: AsyncSession = Depends(get_session), +): + """Create a new user (admin only).""" + # Check for duplicate username + result = await session.exec(select(User).where(User.username == body.username)) + if result.first(): + raise HTTPException(status_code=409, detail="Username already exists") + + user = User( + username=body.username, + hashed_password=bcrypt.hashpw(body.password.encode(), bcrypt.gensalt()).decode(), + role=body.role if body.role in ("admin", "user") else "user", + ) + session.add(user) + await session.commit() + await session.refresh(user) + return {"id": user.id, "username": user.username, "role": user.role} + + +@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_user( + user_id: int, + admin: User = Depends(require_admin), + session: AsyncSession = Depends(get_session), +): + """Delete a user (admin only, cannot delete self).""" + if user_id == admin.id: + raise HTTPException(status_code=400, detail="Cannot delete yourself") + user = await session.get(User, user_id) + if not user: + raise HTTPException(status_code=404, detail="User not found") + await session.delete(user) + await session.commit() diff --git a/packages/server/src/immich_watcher_server/auth/__init__.py b/packages/server/src/immich_watcher_server/auth/__init__.py new file mode 100644 index 0000000..bab7255 --- /dev/null +++ b/packages/server/src/immich_watcher_server/auth/__init__.py @@ -0,0 +1 @@ +"""Authentication package.""" diff --git a/packages/server/src/immich_watcher_server/auth/dependencies.py b/packages/server/src/immich_watcher_server/auth/dependencies.py new file mode 100644 index 0000000..ff5138c --- /dev/null +++ b/packages/server/src/immich_watcher_server/auth/dependencies.py @@ -0,0 +1,52 @@ +"""FastAPI dependencies for authentication.""" + +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +import jwt + +from ..database.engine import get_session +from ..database.models import User +from .jwt import decode_token + +_bearer = HTTPBearer() + + +async def get_current_user( + credentials: HTTPAuthorizationCredentials = Depends(_bearer), + session: AsyncSession = Depends(get_session), +) -> User: + """Extract and validate the current user from the JWT token.""" + try: + payload = decode_token(credentials.credentials) + if payload.get("type") != "access": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token type", + ) + user_id = int(payload["sub"]) + except (jwt.PyJWTError, KeyError, ValueError) as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired token", + ) from exc + + user = await session.get(User, user_id) + if user is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="User not found", + ) + return user + + +async def require_admin(user: User = Depends(get_current_user)) -> User: + """Require the current user to be an admin.""" + if user.role != "admin": + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin access required", + ) + return user diff --git a/packages/server/src/immich_watcher_server/auth/jwt.py b/packages/server/src/immich_watcher_server/auth/jwt.py new file mode 100644 index 0000000..e85c96c --- /dev/null +++ b/packages/server/src/immich_watcher_server/auth/jwt.py @@ -0,0 +1,44 @@ +"""JWT token creation and validation.""" + +from datetime import datetime, timedelta, timezone + +import jwt + +from ..config import settings + +ALGORITHM = "HS256" + + +def create_access_token(user_id: int, role: str) -> str: + """Create a short-lived access token.""" + expire = datetime.now(timezone.utc) + timedelta( + minutes=settings.access_token_expire_minutes + ) + payload = { + "sub": str(user_id), + "role": role, + "type": "access", + "exp": expire, + } + return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM) + + +def create_refresh_token(user_id: int) -> str: + """Create a long-lived refresh token.""" + expire = datetime.now(timezone.utc) + timedelta( + days=settings.refresh_token_expire_days + ) + payload = { + "sub": str(user_id), + "type": "refresh", + "exp": expire, + } + return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM) + + +def decode_token(token: str) -> dict: + """Decode and validate a JWT token. + + Raises jwt.PyJWTError on invalid/expired tokens. + """ + return jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM]) diff --git a/packages/server/src/immich_watcher_server/auth/routes.py b/packages/server/src/immich_watcher_server/auth/routes.py new file mode 100644 index 0000000..2d3897b --- /dev/null +++ b/packages/server/src/immich_watcher_server/auth/routes.py @@ -0,0 +1,138 @@ +"""Authentication API routes.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlmodel import func, select +from sqlmodel.ext.asyncio.session import AsyncSession + +import bcrypt + +from ..database.engine import get_session +from ..database.models import User +from .dependencies import get_current_user +from .jwt import create_access_token, create_refresh_token, decode_token + +router = APIRouter(prefix="/api/auth", tags=["auth"]) + + +class SetupRequest(BaseModel): + username: str + password: str + + +class LoginRequest(BaseModel): + username: str + password: str + + +class TokenResponse(BaseModel): + access_token: str + refresh_token: str + token_type: str = "bearer" + + +class UserResponse(BaseModel): + id: int + username: str + role: str + + +class RefreshRequest(BaseModel): + refresh_token: str + + +def _hash_password(password: str) -> str: + return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() + + +def _verify_password(password: str, hashed: str) -> bool: + return bcrypt.checkpw(password.encode(), hashed.encode()) + + +@router.post("/setup", response_model=TokenResponse) +async def setup( + body: SetupRequest, + session: AsyncSession = Depends(get_session), +): + """Create the first admin user. Only works when no users exist.""" + result = await session.exec(select(func.count()).select_from(User)) + count = result.one() + if count > 0: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Setup already completed. Use /api/auth/login instead.", + ) + + user = User( + username=body.username, + hashed_password=_hash_password(body.password), + role="admin", + ) + session.add(user) + await session.commit() + await session.refresh(user) + + return TokenResponse( + access_token=create_access_token(user.id, user.role), + refresh_token=create_refresh_token(user.id), + ) + + +@router.post("/login", response_model=TokenResponse) +async def login( + body: LoginRequest, + session: AsyncSession = Depends(get_session), +): + """Authenticate and return JWT tokens.""" + result = await session.exec(select(User).where(User.username == body.username)) + user = result.first() + if not user or not _verify_password(body.password, user.hashed_password): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid username or password", + ) + + return TokenResponse( + access_token=create_access_token(user.id, user.role), + refresh_token=create_refresh_token(user.id), + ) + + +@router.post("/refresh", response_model=TokenResponse) +async def refresh( + body: RefreshRequest, + session: AsyncSession = Depends(get_session), +): + """Refresh access token using a refresh token.""" + import jwt as pyjwt + + try: + payload = decode_token(body.refresh_token) + if payload.get("type") != "refresh": + raise HTTPException(status_code=401, detail="Invalid token type") + user_id = int(payload["sub"]) + except (pyjwt.PyJWTError, KeyError, ValueError) as exc: + raise HTTPException(status_code=401, detail="Invalid refresh token") from exc + + user = await session.get(User, user_id) + if not user: + raise HTTPException(status_code=401, detail="User not found") + + return TokenResponse( + access_token=create_access_token(user.id, user.role), + refresh_token=create_refresh_token(user.id), + ) + + +@router.get("/me", response_model=UserResponse) +async def me(user: User = Depends(get_current_user)): + """Get current user info.""" + return UserResponse(id=user.id, username=user.username, role=user.role) + + +@router.get("/needs-setup") +async def needs_setup(session: AsyncSession = Depends(get_session)): + """Check if initial setup is needed (no users exist).""" + result = await session.exec(select(func.count()).select_from(User)) + count = result.one() + return {"needs_setup": count == 0} diff --git a/packages/server/src/immich_watcher_server/config.py b/packages/server/src/immich_watcher_server/config.py new file mode 100644 index 0000000..dd3e2e9 --- /dev/null +++ b/packages/server/src/immich_watcher_server/config.py @@ -0,0 +1,34 @@ +"""Server configuration from environment variables.""" + +from pathlib import Path +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + # Database + data_dir: Path = Path("/data") + database_url: str = "" # Computed from data_dir if empty + + # JWT + secret_key: str = "change-me-in-production" + access_token_expire_minutes: int = 60 + refresh_token_expire_days: int = 30 + + # Server + host: str = "0.0.0.0" + port: int = 8420 + debug: bool = False + + model_config = {"env_prefix": "IMMICH_WATCHER_"} + + @property + def effective_database_url(self) -> str: + if self.database_url: + return self.database_url + db_path = self.data_dir / "immich_watcher.db" + return f"sqlite+aiosqlite:///{db_path}" + + +settings = Settings() diff --git a/packages/server/src/immich_watcher_server/database/__init__.py b/packages/server/src/immich_watcher_server/database/__init__.py new file mode 100644 index 0000000..0f2bb6a --- /dev/null +++ b/packages/server/src/immich_watcher_server/database/__init__.py @@ -0,0 +1,24 @@ +"""Database package.""" + +from .engine import get_session, init_db +from .models import ( + AlbumState, + AlbumTracker, + EventLog, + ImmichServer, + MessageTemplate, + NotificationTarget, + User, +) + +__all__ = [ + "get_session", + "init_db", + "AlbumState", + "AlbumTracker", + "EventLog", + "ImmichServer", + "MessageTemplate", + "NotificationTarget", + "User", +] diff --git a/packages/server/src/immich_watcher_server/database/engine.py b/packages/server/src/immich_watcher_server/database/engine.py new file mode 100644 index 0000000..2391165 --- /dev/null +++ b/packages/server/src/immich_watcher_server/database/engine.py @@ -0,0 +1,36 @@ +"""Database engine and session management.""" + +from collections.abc import AsyncGenerator + +from sqlmodel import SQLModel +from sqlmodel.ext.asyncio.session import AsyncSession +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from ..config import settings + +_engine: AsyncEngine | None = None + + +def get_engine() -> AsyncEngine: + """Get or create the async database engine.""" + global _engine + if _engine is None: + _engine = create_async_engine( + settings.effective_database_url, + echo=settings.debug, + ) + return _engine + + +async def init_db() -> None: + """Create all database tables.""" + engine = get_engine() + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + + +async def get_session() -> AsyncGenerator[AsyncSession, None]: + """Yield an async database session (FastAPI dependency).""" + engine = get_engine() + async with AsyncSession(engine) as session: + yield session diff --git a/packages/server/src/immich_watcher_server/database/models.py b/packages/server/src/immich_watcher_server/database/models.py new file mode 100644 index 0000000..5f4ebe6 --- /dev/null +++ b/packages/server/src/immich_watcher_server/database/models.py @@ -0,0 +1,112 @@ +"""SQLModel database table definitions.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from sqlmodel import JSON, Column, Field, SQLModel + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +class User(SQLModel, table=True): + """Application user.""" + + id: int | None = Field(default=None, primary_key=True) + username: str = Field(index=True, unique=True) + hashed_password: str + role: str = Field(default="user") # "admin" or "user" + created_at: datetime = Field(default_factory=_utcnow) + + +class ImmichServer(SQLModel, table=True): + """Immich server connection.""" + + __tablename__ = "immich_server" + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(foreign_key="user.id") + name: str = Field(default="Immich") + url: str + api_key: str + external_domain: str | None = None + created_at: datetime = Field(default_factory=_utcnow) + + +class NotificationTarget(SQLModel, table=True): + """Notification destination (Telegram chat, webhook URL).""" + + __tablename__ = "notification_target" + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(foreign_key="user.id") + type: str # "telegram" or "webhook" + name: str + config: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=_utcnow) + + +class MessageTemplate(SQLModel, table=True): + """Jinja2 message template.""" + + __tablename__ = "message_template" + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(foreign_key="user.id") + name: str + body: str = Field(default="") + is_default: bool = Field(default=False) + created_at: datetime = Field(default_factory=_utcnow) + + +class AlbumTracker(SQLModel, table=True): + """Album change tracker configuration.""" + + __tablename__ = "album_tracker" + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(foreign_key="user.id") + server_id: int = Field(foreign_key="immich_server.id") + name: str + album_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + event_types: list[str] = Field( + default_factory=lambda: ["assets_added"], + sa_column=Column(JSON), + ) + target_ids: list[int] = Field(default_factory=list, sa_column=Column(JSON)) + template_id: int | None = Field(default=None, foreign_key="message_template.id") + scan_interval: int = Field(default=60) # seconds + enabled: bool = Field(default=True) + quiet_hours_start: str | None = None # "HH:MM" + quiet_hours_end: str | None = None # "HH:MM" + created_at: datetime = Field(default_factory=_utcnow) + + +class AlbumState(SQLModel, table=True): + """Persisted album state for change detection across restarts.""" + + __tablename__ = "album_state" + + id: int | None = Field(default=None, primary_key=True) + tracker_id: int = Field(foreign_key="album_tracker.id") + album_id: str + asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + pending_asset_ids: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + last_updated: datetime = Field(default_factory=_utcnow) + + +class EventLog(SQLModel, table=True): + """Log of detected album change events.""" + + __tablename__ = "event_log" + + id: int | None = Field(default=None, primary_key=True) + tracker_id: int = Field(foreign_key="album_tracker.id") + event_type: str + album_id: str + album_name: str + details: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=_utcnow) diff --git a/packages/server/src/immich_watcher_server/main.py b/packages/server/src/immich_watcher_server/main.py new file mode 100644 index 0000000..e2f2d26 --- /dev/null +++ b/packages/server/src/immich_watcher_server/main.py @@ -0,0 +1,94 @@ +"""FastAPI application entry point.""" + +from __future__ import annotations + +import logging +from contextlib import asynccontextmanager +from pathlib import Path + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles + +from .config import settings +from .database.engine import init_db +from .services.scheduler import start_scheduler, stop_scheduler + +from .auth.routes import router as auth_router +from .api.servers import router as servers_router +from .api.trackers import router as trackers_router +from .api.templates import router as templates_router +from .api.targets import router as targets_router +from .api.users import router as users_router +from .api.status import router as status_router + +logging.basicConfig( + level=logging.DEBUG if settings.debug else logging.INFO, + format="%(asctime)s %(levelname)s [%(name)s] %(message)s", +) +_LOGGER = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan: startup and shutdown.""" + # Startup + settings.data_dir.mkdir(parents=True, exist_ok=True) + await init_db() + _LOGGER.info("Database initialized at %s", settings.effective_database_url) + + await start_scheduler() + + yield + + # Shutdown + await stop_scheduler() + + +app = FastAPI( + title="Immich Watcher", + description="Standalone Immich album change notification server", + version="0.1.0", + lifespan=lifespan, +) + +# CORS for frontend dev server +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# API routes +app.include_router(auth_router) +app.include_router(servers_router) +app.include_router(trackers_router) +app.include_router(templates_router) +app.include_router(targets_router) +app.include_router(users_router) +app.include_router(status_router) + +# Serve frontend static files if available +_frontend_dist = Path(__file__).parent / "frontend" +if _frontend_dist.is_dir(): + app.mount("/", StaticFiles(directory=_frontend_dist, html=True), name="frontend") + + +@app.get("/api/health") +async def health(): + """Health check endpoint.""" + return {"status": "ok", "version": "0.1.0"} + + +def run(): + """Run the server (entry point for `immich-watcher` CLI command).""" + import uvicorn + + uvicorn.run( + "immich_watcher_server.main:app", + host=settings.host, + port=settings.port, + reload=settings.debug, + ) diff --git a/packages/server/src/immich_watcher_server/services/__init__.py b/packages/server/src/immich_watcher_server/services/__init__.py new file mode 100644 index 0000000..ebf015f --- /dev/null +++ b/packages/server/src/immich_watcher_server/services/__init__.py @@ -0,0 +1 @@ +"""Background services package.""" diff --git a/packages/server/src/immich_watcher_server/services/notifier.py b/packages/server/src/immich_watcher_server/services/notifier.py new file mode 100644 index 0000000..6a04255 --- /dev/null +++ b/packages/server/src/immich_watcher_server/services/notifier.py @@ -0,0 +1,128 @@ +"""Notification dispatch service.""" + +from __future__ import annotations + +import logging +from typing import Any + +import aiohttp +import jinja2 + +from immich_watcher_core.telegram.client import TelegramClient + +from ..database.models import MessageTemplate, NotificationTarget +from ..webhook.client import WebhookClient + +_LOGGER = logging.getLogger(__name__) + +# Default template used when no custom template is configured +DEFAULT_TEMPLATE = ( + "{{ added_count }} new item(s) added to album \"{{ album_name }}\"." + "{% if people %}\nPeople: {{ people | join(', ') }}{% endif %}" +) + + +def render_template(template_body: str, context: dict[str, Any]) -> str: + """Render a Jinja2 template with the given context.""" + env = jinja2.Environment(autoescape=False) + tmpl = env.from_string(template_body) + return tmpl.render(**context) + + +async def send_notification( + target: NotificationTarget, + event_data: dict[str, Any], + template: MessageTemplate | None = None, +) -> dict[str, Any]: + """Send a notification to a target using event data. + + Args: + target: Notification destination (telegram or webhook) + event_data: Album change event data (album_name, added_count, etc.) + template: Optional message template (uses default if None) + """ + template_body = template.body if template else DEFAULT_TEMPLATE + try: + message = render_template(template_body, event_data) + except jinja2.TemplateError as e: + _LOGGER.error("Template rendering failed: %s", e) + message = f"Album changed: {event_data.get('album_name', 'unknown')}" + + if target.type == "telegram": + return await _send_telegram(target, message, event_data) + elif target.type == "webhook": + return await _send_webhook(target, message, event_data) + else: + return {"success": False, "error": f"Unknown target type: {target.type}"} + + +async def send_test_notification(target: NotificationTarget) -> dict[str, Any]: + """Send a test notification to verify target configuration.""" + test_data = { + "album_name": "Test Album", + "added_count": 1, + "removed_count": 0, + "change_type": "assets_added", + "people": [], + "added_assets": [], + } + + if target.type == "telegram": + return await _send_telegram( + target, "Test notification from Immich Watcher", test_data + ) + elif target.type == "webhook": + return await _send_webhook( + target, "Test notification from Immich Watcher", test_data + ) + return {"success": False, "error": f"Unknown target type: {target.type}"} + + +async def _send_telegram( + target: NotificationTarget, message: str, event_data: dict[str, Any] +) -> dict[str, Any]: + """Send notification via Telegram.""" + config = target.config + bot_token = config.get("bot_token") + chat_id = config.get("chat_id") + + if not bot_token or not chat_id: + return {"success": False, "error": "Missing bot_token or chat_id in target config"} + + async with aiohttp.ClientSession() as session: + client = TelegramClient(session, bot_token) + + # Build assets list from event data for media sending + assets = [] + for asset in event_data.get("added_assets", []): + url = asset.get("download_url") or asset.get("url") + if url: + asset_type = "video" if asset.get("type") == "VIDEO" else "photo" + assets.append({"url": url, "type": asset_type}) + + return await client.send_notification( + chat_id=str(chat_id), + caption=message, + assets=assets if assets else None, + ) + + +async def _send_webhook( + target: NotificationTarget, message: str, event_data: dict[str, Any] +) -> dict[str, Any]: + """Send notification via webhook.""" + config = target.config + url = config.get("url") + headers = config.get("headers", {}) + + if not url: + return {"success": False, "error": "Missing url in target config"} + + payload = { + "message": message, + "event": event_data, + } + + async with aiohttp.ClientSession() as session: + client = WebhookClient(session, url, headers) + return await client.send(payload) diff --git a/packages/server/src/immich_watcher_server/services/scheduler.py b/packages/server/src/immich_watcher_server/services/scheduler.py new file mode 100644 index 0000000..be7aff9 --- /dev/null +++ b/packages/server/src/immich_watcher_server/services/scheduler.py @@ -0,0 +1,99 @@ +"""Background job scheduler for album polling.""" + +from __future__ import annotations + +import logging + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from ..database.engine import get_engine +from ..database.models import AlbumTracker +from .watcher import check_tracker + +_LOGGER = logging.getLogger(__name__) + +_scheduler: AsyncIOScheduler | None = None + + +def get_scheduler() -> AsyncIOScheduler: + """Get the global scheduler instance.""" + global _scheduler + if _scheduler is None: + _scheduler = AsyncIOScheduler() + return _scheduler + + +async def start_scheduler() -> None: + """Start the scheduler and load all enabled trackers.""" + scheduler = get_scheduler() + + engine = get_engine() + async with AsyncSession(engine) as session: + result = await session.exec( + select(AlbumTracker).where(AlbumTracker.enabled == True) # noqa: E712 + ) + trackers = result.all() + + for tracker in trackers: + _add_tracker_job(scheduler, tracker.id, tracker.scan_interval) + + scheduler.start() + _LOGGER.info("Scheduler started with %d tracker jobs", len(trackers)) + + +async def stop_scheduler() -> None: + """Stop the scheduler.""" + scheduler = get_scheduler() + if scheduler.running: + scheduler.shutdown(wait=False) + _LOGGER.info("Scheduler stopped") + + +def add_tracker_job(tracker_id: int, scan_interval: int) -> None: + """Add or update a scheduler job for a tracker.""" + scheduler = get_scheduler() + _add_tracker_job(scheduler, tracker_id, scan_interval) + + +def remove_tracker_job(tracker_id: int) -> None: + """Remove a scheduler job for a tracker.""" + scheduler = get_scheduler() + job_id = f"tracker_{tracker_id}" + if scheduler.get_job(job_id): + scheduler.remove_job(job_id) + _LOGGER.debug("Removed scheduler job for tracker %d", tracker_id) + + +def _add_tracker_job( + scheduler: AsyncIOScheduler, tracker_id: int, scan_interval: int +) -> None: + """Add or replace a scheduler job.""" + job_id = f"tracker_{tracker_id}" + + # Remove existing job if present + if scheduler.get_job(job_id): + scheduler.remove_job(job_id) + + scheduler.add_job( + _run_tracker_check, + trigger=IntervalTrigger(seconds=scan_interval), + id=job_id, + args=[tracker_id], + replace_existing=True, + max_instances=1, + ) + _LOGGER.debug( + "Scheduled tracker %d every %d seconds", tracker_id, scan_interval + ) + + +async def _run_tracker_check(tracker_id: int) -> None: + """Run a single tracker check (called by scheduler).""" + try: + result = await check_tracker(tracker_id) + _LOGGER.debug("Tracker %d check result: %s", tracker_id, result) + except Exception: + _LOGGER.exception("Error checking tracker %d", tracker_id) diff --git a/packages/server/src/immich_watcher_server/services/watcher.py b/packages/server/src/immich_watcher_server/services/watcher.py new file mode 100644 index 0000000..246d0b2 --- /dev/null +++ b/packages/server/src/immich_watcher_server/services/watcher.py @@ -0,0 +1,208 @@ +"""Album watcher service - polls Immich and detects changes.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +import aiohttp +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession +from sqlalchemy.ext.asyncio import AsyncEngine + +from immich_watcher_core.asset_utils import build_asset_detail, get_any_url +from immich_watcher_core.change_detector import detect_album_changes +from immich_watcher_core.immich_client import ImmichApiError, ImmichClient +from immich_watcher_core.models import AlbumChange, AlbumData + +from ..database.engine import get_engine +from ..database.models import ( + AlbumState, + AlbumTracker, + EventLog, + ImmichServer, + MessageTemplate, + NotificationTarget, +) +from .notifier import send_notification + +_LOGGER = logging.getLogger(__name__) + + +async def check_tracker(tracker_id: int) -> dict[str, Any]: + """Check a single tracker for album changes. + + Called by the scheduler or manually via API trigger. + """ + engine = get_engine() + async with AsyncSession(engine) as session: + tracker = await session.get(AlbumTracker, tracker_id) + if not tracker or not tracker.enabled: + return {"skipped": True, "reason": "disabled or not found"} + + server = await session.get(ImmichServer, tracker.server_id) + if not server: + return {"error": "Server not found"} + + results = [] + async with aiohttp.ClientSession() as http_session: + client = ImmichClient(http_session, server.url, server.api_key) + + # Fetch server config for external domain + await client.get_server_config() + users_cache = await client.get_users() + + for album_id in tracker.album_ids: + result = await _check_album( + session, http_session, client, tracker, album_id, users_cache + ) + results.append(result) + + await session.commit() + return {"albums_checked": len(tracker.album_ids), "results": results} + + +async def _check_album( + session: AsyncSession, + http_session: aiohttp.ClientSession, + client: ImmichClient, + tracker: AlbumTracker, + album_id: str, + users_cache: dict[str, str], +) -> dict[str, Any]: + """Check a single album for changes.""" + try: + album = await client.get_album(album_id, users_cache) + except ImmichApiError as err: + _LOGGER.error("Failed to fetch album %s: %s", album_id, err) + return {"album_id": album_id, "error": str(err)} + + if album is None: + return {"album_id": album_id, "status": "not_found"} + + # Load previous state + result = await session.exec( + select(AlbumState).where( + AlbumState.tracker_id == tracker.id, + AlbumState.album_id == album_id, + ) + ) + state = result.first() + + if state is None: + # First check - save state, no change detection + state = AlbumState( + tracker_id=tracker.id, + album_id=album_id, + asset_ids=list(album.asset_ids), + pending_asset_ids=[], + last_updated=datetime.now(timezone.utc), + ) + session.add(state) + return {"album_id": album_id, "status": "initialized", "asset_count": album.asset_count} + + # Build previous AlbumData from persisted state for change detection + previous_asset_ids = set(state.asset_ids) + pending = set(state.pending_asset_ids) + + # Create a minimal previous AlbumData for comparison + prev_album = AlbumData( + id=album_id, + name=album.name, # Use current name (rename detection compares) + asset_count=len(previous_asset_ids), + photo_count=0, + video_count=0, + created_at=album.created_at, + updated_at="", + shared=album.shared, # Use current (sharing detection compares) + owner=album.owner, + thumbnail_asset_id=None, + asset_ids=previous_asset_ids, + ) + + # Detect changes + change, updated_pending = detect_album_changes(prev_album, album, pending) + + # Update persisted state + state.asset_ids = list(album.asset_ids) + state.pending_asset_ids = list(updated_pending) + state.last_updated = datetime.now(timezone.utc) + session.add(state) + + if change is None: + return {"album_id": album_id, "status": "no_changes"} + + # Check if this event type is tracked + if change.change_type not in tracker.event_types and "changed" not in tracker.event_types: + return {"album_id": album_id, "status": "filtered", "change_type": change.change_type} + + # Log the event + shared_links = await client.get_shared_links(album_id) + event_data = _build_event_data(change, album, client.external_url, shared_links) + + event_log = EventLog( + tracker_id=tracker.id, + event_type=change.change_type, + album_id=album_id, + album_name=album.name, + details={"added_count": change.added_count, "removed_count": change.removed_count}, + ) + session.add(event_log) + + # Send notifications to all configured targets + for target_id in tracker.target_ids: + target = await session.get(NotificationTarget, target_id) + if not target: + continue + + template = None + if tracker.template_id: + template = await session.get(MessageTemplate, tracker.template_id) + + try: + await send_notification(target, event_data, template) + except Exception: + _LOGGER.exception("Failed to send notification to target %d", target_id) + + return { + "album_id": album_id, + "status": "changed", + "change_type": change.change_type, + "added_count": change.added_count, + "removed_count": change.removed_count, + } + + +def _build_event_data( + change: AlbumChange, + album: AlbumData, + external_url: str, + shared_links: list, +) -> dict[str, Any]: + """Build event data dict for template rendering and webhook payload.""" + added_details = [] + for asset in change.added_assets: + if asset.is_processed: + added_details.append( + build_asset_detail(asset, external_url, shared_links, include_thumbnail=False) + ) + + album_url = get_any_url(external_url, shared_links) + + return { + "album_id": change.album_id, + "album_name": change.album_name, + "album_url": album_url or "", + "change_type": change.change_type, + "added_count": change.added_count, + "removed_count": change.removed_count, + "added_assets": added_details, + "removed_assets": change.removed_asset_ids, + "people": list(album.people), + "shared": album.shared, + "old_name": change.old_name, + "new_name": change.new_name, + "old_shared": change.old_shared, + "new_shared": change.new_shared, + } diff --git a/packages/server/src/immich_watcher_server/webhook/__init__.py b/packages/server/src/immich_watcher_server/webhook/__init__.py new file mode 100644 index 0000000..b8c957b --- /dev/null +++ b/packages/server/src/immich_watcher_server/webhook/__init__.py @@ -0,0 +1 @@ +"""Webhook notification provider.""" diff --git a/packages/server/src/immich_watcher_server/webhook/client.py b/packages/server/src/immich_watcher_server/webhook/client.py new file mode 100644 index 0000000..c752296 --- /dev/null +++ b/packages/server/src/immich_watcher_server/webhook/client.py @@ -0,0 +1,41 @@ +"""Generic webhook notification client.""" + +from __future__ import annotations + +import logging +from typing import Any + +import aiohttp + +_LOGGER = logging.getLogger(__name__) + + +class WebhookClient: + """Send JSON payloads to a webhook URL.""" + + def __init__(self, session: aiohttp.ClientSession, url: str, headers: dict[str, str] | None = None) -> None: + self._session = session + self._url = url + self._headers = headers or {} + + async def send(self, payload: dict[str, Any]) -> dict[str, Any]: + """POST JSON payload to the webhook URL. + + Returns: + Dict with success status and optional error. + """ + try: + async with self._session.post( + self._url, + json=payload, + headers={"Content-Type": "application/json", **self._headers}, + ) as response: + if 200 <= response.status < 300: + _LOGGER.debug("Webhook sent successfully to %s (HTTP %d)", self._url, response.status) + return {"success": True, "status_code": response.status} + body = await response.text() + _LOGGER.error("Webhook failed: HTTP %d - %s", response.status, body[:200]) + return {"success": False, "error": f"HTTP {response.status}", "body": body[:200]} + except aiohttp.ClientError as err: + _LOGGER.error("Webhook request failed: %s", err) + return {"success": False, "error": str(err)} diff --git a/plans/phase-3-server-backend.md b/plans/phase-3-server-backend.md new file mode 100644 index 0000000..0191ae4 --- /dev/null +++ b/plans/phase-3-server-backend.md @@ -0,0 +1,123 @@ +# Phase 3: Build Standalone Server Backend + +**Status**: In progress +**Parent**: [primary-plan.md](primary-plan.md) + +--- + +## Goal + +Build a standalone FastAPI web server that provides Immich album change notifications without Home Assistant, using the shared core library from Phase 1. + +--- + +## Tech Stack + +- **Framework**: FastAPI (async-native, OpenAPI docs) +- **Database**: SQLite via SQLModel (SQLAlchemy + Pydantic) +- **Auth**: bcrypt + JWT (multi-user, admin/user roles) +- **Scheduler**: APScheduler (async jobs per tracker) +- **Notifications**: Core TelegramClient + new WebhookClient +- **Templates**: Jinja2 (same syntax as HA blueprint) +- **Deployment**: Docker (single container, SQLite in volume) + +--- + +## Directory Structure + +``` +packages/server/ + pyproject.toml + src/immich_watcher_server/ + __init__.py + main.py # FastAPI app, lifespan, static mount + config.py # Settings from env vars + database/ + __init__.py + models.py # SQLModel table definitions + engine.py # Engine creation, session factory + auth/ + __init__.py + jwt.py # JWT token creation/validation + dependencies.py # FastAPI dependency for current_user + routes.py # /api/auth/* endpoints + api/ + __init__.py + servers.py # CRUD /api/servers/* + trackers.py # CRUD /api/trackers/* + templates.py # CRUD /api/templates/* + targets.py # CRUD /api/targets/* + users.py # CRUD /api/users/* (admin) + status.py # GET /api/status + services/ + __init__.py + scheduler.py # APScheduler setup, job management + watcher.py # Album polling + change detection + notifier.py # Dispatch notifications (Telegram/webhook) + webhook/ + __init__.py + client.py # Generic webhook POST JSON + Dockerfile + docker-compose.yml +``` + +--- + +## Tasks + +### 1. Package setup `[ ]` +- pyproject.toml with deps: fastapi, uvicorn, sqlmodel, pyjwt, bcrypt, apscheduler, jinja2, immich-watcher-core + +### 2. Database models `[ ]` +- `User`: id, username, hashed_password, role (admin/user), created_at +- `ImmichServer`: id, user_id, name, url, api_key, external_domain, created_at +- `NotificationTarget`: id, user_id, type (telegram/webhook), name, config JSON, created_at +- `MessageTemplate`: id, user_id, name, body (Jinja2), is_default, created_at +- `AlbumTracker`: id, user_id, server_id FK, name, album_ids JSON, event_types JSON, target_ids JSON, template_id FK (nullable), scan_interval, enabled, quiet_hours_start, quiet_hours_end, created_at +- `AlbumState`: id, tracker_id FK, album_id, asset_ids JSON, last_updated +- `EventLog`: id, tracker_id FK, event_type, album_name, details JSON, created_at + +### 3. Auth `[ ]` +- POST /api/auth/setup (create first admin, only when no users exist) +- POST /api/auth/login (returns JWT access + refresh tokens) +- GET /api/auth/me (current user info) +- JWT dependency for all protected routes + +### 4. API endpoints `[ ]` +- CRUD for servers, trackers, templates, targets +- GET /api/servers/{id}/albums (proxy to Immich) +- POST /api/templates/{id}/preview (render with sample data) +- POST /api/targets/{id}/test (send test notification) +- POST /api/trackers/{id}/trigger (force check) +- GET /api/trackers/{id}/history (recent events) +- GET /api/status (dashboard data) +- CRUD /api/users (admin only) + +### 5. Background scheduler `[ ]` +- APScheduler with AsyncIOScheduler +- One job per enabled tracker at its scan_interval +- Job: fetch album -> detect changes -> render template -> notify +- Start/stop jobs when trackers are created/updated/deleted + +### 6. Notification dispatch `[ ]` +- TelegramClient from core (reuse) +- WebhookClient: POST JSON to configured URL with event data +- Jinja2 template rendering with same variables as blueprint + +### 7. Docker `[ ]` +- Multi-stage Dockerfile (build frontend -> Python runtime) +- docker-compose.yml for standalone deployment + +--- + +## Acceptance Criteria + +- [ ] Server starts, creates SQLite DB on first run +- [ ] First-run setup creates admin account +- [ ] Can add Immich server, browse albums +- [ ] Can create tracker, template, and notification target +- [ ] Scheduler polls albums and detects changes +- [ ] Telegram notifications sent on album changes +- [ ] Webhook notifications sent on album changes +- [ ] JWT auth protects all endpoints +- [ ] Docker build works diff --git a/plans/primary-plan.md b/plans/primary-plan.md index b9f6f47..de73841 100644 --- a/plans/primary-plan.md +++ b/plans/primary-plan.md @@ -186,7 +186,7 @@ async def _execute_telegram_notification(self, ...): - Verify identical behavior with real Immich server - **Subplan**: `plans/phase-2-haos-refactor.md` -### Phase 3: Build Server Backend `[ ]` +### Phase 3: Build Server Backend `[x]` - FastAPI app with database, scheduler, API endpoints - Telegram + webhook notification providers - Jinja2 template engine with variable system matching blueprint @@ -203,6 +203,14 @@ async def _execute_telegram_notification(self, ...): - Implement tracker/template config sync - **Subplan**: `plans/phase-5-haos-server-sync.md` +### Phase 6: Claude AI Telegram Bot Enhancement (Optional) `[ ]` +- Integrate Claude AI to enhance the Telegram notification bot +- Enable conversational interactions: users can ask questions about their albums, get summaries, request specific photos +- AI-powered message formatting: intelligent caption generation, album descriptions +- Smart notification filtering: AI decides notification importance/relevance +- Natural language tracker configuration via Telegram chat +- **Subplan**: `plans/phase-6-claude-ai-bot.md` + --- ## Verification