feat(notify-bridge): phase 2 - core abstractions

Define the generic provider/event/variable system:
- ServiceProvider ABC with connect, disconnect, poll, list_collections
- ServiceProviderType enum (IMMICH first)
- EventType enum (assets_added/removed, collection_renamed/deleted, sharing_changed)
- MediaAsset, MediaCollection, CollectionState dataclasses
- TemplateVariableDefinition and VariableRegistry with 11 base variables
- All models use extra: dict for provider-specific data passthrough

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 22:33:11 +03:00
parent b724447f4d
commit 3ed0d8ce88
9 changed files with 378 additions and 0 deletions
+8
View File
@@ -29,3 +29,11 @@ PID=$(netstat -ano 2>/dev/null | grep ':5173.*LISTENING' | awk '{print $5}' | he
- **System-owned entities**: `user_id=0` means system-owned (e.g. default templates).
- **FastAPI route ordering**: Static path routes MUST be registered BEFORE parameterized routes.
- **`__pycache__`**: Add to `.gitignore`. Never commit.
## Project Structure (Phase 1)
- **packages/core** (`notify_bridge_core`): Shared library — providers, models, notifications, templates. No DB dependency.
- **packages/server** (`notify_bridge_server`): FastAPI REST API + SQLite. Depends on core.
- **frontend**: SvelteKit 2 + Svelte 5 + Tailwind CSS v4. Static adapter with SPA fallback. Dev proxy to :8420.
- **Environment vars**: `NOTIFY_BRIDGE_DATA_DIR`, `NOTIFY_BRIDGE_SECRET_KEY`, `NOTIFY_BRIDGE_DATABASE_URL`
- Core package includes `jinja2` dependency (template rendering lives in core, not server).
@@ -1 +1,14 @@
"""Core data models — events, media assets, collections."""
from notify_bridge_core.models.collections import CollectionState
from notify_bridge_core.models.events import EventType, ServiceEvent
from notify_bridge_core.models.media import MediaAsset, MediaCollection, MediaType
__all__ = [
"CollectionState",
"EventType",
"MediaAsset",
"MediaCollection",
"MediaType",
"ServiceEvent",
]
@@ -0,0 +1,18 @@
"""Collection state tracking for change detection."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class CollectionState:
"""Persisted state for a tracked collection, used for change detection."""
collection_id: str
asset_ids: set[str] = field(default_factory=set)
pending_asset_ids: set[str] = field(default_factory=set)
name: str = ""
shared: bool = False
last_updated: datetime | None = None
@@ -0,0 +1,54 @@
"""Generic event models for service provider notifications."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
from notify_bridge_core.models.media import MediaAsset
from notify_bridge_core.providers.base import ServiceProviderType
class EventType(str, Enum):
"""Types of events a service provider can emit."""
ASSETS_ADDED = "assets_added"
ASSETS_REMOVED = "assets_removed"
COLLECTION_RENAMED = "collection_renamed"
COLLECTION_DELETED = "collection_deleted"
SHARING_CHANGED = "sharing_changed"
@dataclass
class ServiceEvent:
"""A generic event emitted by a service provider.
Contains all information needed to render a notification template.
Provider-specific data goes in the `extra` dict.
"""
event_type: EventType
provider_type: ServiceProviderType
provider_name: str
collection_id: str
collection_name: str
timestamp: datetime
# Change details
added_assets: list[MediaAsset] = field(default_factory=list)
removed_asset_ids: list[str] = field(default_factory=list)
added_count: int = 0
removed_count: int = 0
# For renames
old_name: str | None = None
new_name: str | None = None
# For sharing changes
old_shared: bool | None = None
new_shared: bool | None = None
# Provider-specific extra data
extra: dict[str, Any] = field(default_factory=dict)
@@ -0,0 +1,56 @@
"""Generic media asset and collection models."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
class MediaType(str, Enum):
"""Types of media assets."""
IMAGE = "image"
VIDEO = "video"
DOCUMENT = "document"
@dataclass
class MediaAsset:
"""A generic media asset from a service provider.
Common fields are top-level. Provider-specific data goes in `extra`.
"""
id: str
type: MediaType
filename: str
created_at: datetime
# Common optional fields
owner_name: str | None = None
description: str | None = None
tags: list[str] = field(default_factory=list)
thumbnail_url: str | None = None
full_url: str | None = None
# Provider-specific extras (e.g., rating, location, people for Immich)
extra: dict[str, Any] = field(default_factory=dict)
@dataclass
class MediaCollection:
"""A generic collection of media assets (album, folder, etc.).
Provider-specific data goes in `extra`.
"""
id: str
name: str
asset_count: int = 0
assets: dict[str, MediaAsset] = field(default_factory=dict)
asset_ids: set[str] = field(default_factory=set)
# Provider-specific extras
extra: dict[str, Any] = field(default_factory=dict)
@@ -1 +1,5 @@
"""Service provider abstractions and implementations."""
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
__all__ = ["ServiceProvider", "ServiceProviderType"]
@@ -0,0 +1,76 @@
"""Service provider abstraction — base class and types."""
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.templates.variables import TemplateVariableDefinition
class ServiceProviderType(str, Enum):
"""Supported service provider types."""
IMMICH = "immich"
class ServiceProvider(ABC):
"""Abstract base class for service providers.
A service provider connects to an external service (e.g., Immich photo server)
and can poll for changes, producing generic ServiceEvent objects.
"""
provider_type: ServiceProviderType
@abstractmethod
async def connect(self) -> bool:
"""Connect to the service and verify connectivity.
Returns True if connection is successful.
"""
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect and clean up resources."""
@abstractmethod
async def poll(
self,
collection_ids: list[str],
tracker_state: dict[str, Any],
) -> tuple[list[ServiceEvent], dict[str, Any]]:
"""Poll for changes in the specified collections.
Args:
collection_ids: IDs of collections to check.
tracker_state: Previous state dict (opaque to caller, managed by provider).
Returns:
Tuple of (list of events detected, updated state dict).
"""
@abstractmethod
def get_available_variables(self) -> list[TemplateVariableDefinition]:
"""Return the template variables this provider makes available."""
@abstractmethod
def get_provider_config_schema(self) -> dict[str, Any]:
"""Return JSON schema for this provider's configuration fields."""
@abstractmethod
async def list_collections(self) -> list[dict[str, Any]]:
"""List available collections from the service.
Returns a list of dicts with at least 'id' and 'name' keys.
"""
@abstractmethod
async def test_connection(self) -> dict[str, Any]:
"""Test the connection and return status info.
Returns a dict with 'ok' (bool) and optional 'message' (str).
"""
@@ -1 +1,15 @@
"""Template system — rendering, variables, validation."""
from notify_bridge_core.templates.variables import (
BASE_VARIABLES,
TemplateVariableDefinition,
VariableRegistry,
registry,
)
__all__ = [
"BASE_VARIABLES",
"TemplateVariableDefinition",
"VariableRegistry",
"registry",
]
@@ -0,0 +1,135 @@
"""Template variable system — definitions and registry."""
from __future__ import annotations
from dataclasses import dataclass, field
from notify_bridge_core.providers.base import ServiceProviderType
@dataclass
class TemplateVariableDefinition:
"""Definition of a variable available in notification templates."""
name: str
type: str # "string", "int", "list", "datetime", "bool"
description: str
example: str
provider_type: ServiceProviderType | None = None # None = base variable
# Base variables available to all providers
BASE_VARIABLES: list[TemplateVariableDefinition] = [
TemplateVariableDefinition(
name="event_type",
type="string",
description="Event type identifier (e.g., 'assets_added')",
example="assets_added",
),
TemplateVariableDefinition(
name="timestamp",
type="datetime",
description="When the event occurred",
example="2026-03-19 14:30:00",
),
TemplateVariableDefinition(
name="service_name",
type="string",
description="Name of the service provider instance",
example="My Immich Server",
),
TemplateVariableDefinition(
name="service_type",
type="string",
description="Type of service provider",
example="immich",
),
TemplateVariableDefinition(
name="collection_name",
type="string",
description="Name of the collection (album, folder, etc.)",
example="Vacation 2026",
),
TemplateVariableDefinition(
name="collection_id",
type="string",
description="Unique identifier of the collection",
example="abc-123-def",
),
TemplateVariableDefinition(
name="added_count",
type="int",
description="Number of assets added",
example="5",
),
TemplateVariableDefinition(
name="removed_count",
type="int",
description="Number of assets removed",
example="2",
),
TemplateVariableDefinition(
name="assets",
type="list",
description="List of added media assets with details",
example="[{id, type, filename, ...}]",
),
TemplateVariableDefinition(
name="old_name",
type="string",
description="Previous collection name (for rename events)",
example="Old Album Name",
),
TemplateVariableDefinition(
name="new_name",
type="string",
description="New collection name (for rename events)",
example="New Album Name",
),
]
@dataclass
class VariableRegistry:
"""Registry of template variables, organized by provider type.
Maintains base variables (available to all providers) and
provider-specific variables. Queries return the union of both.
"""
_provider_vars: dict[ServiceProviderType, list[TemplateVariableDefinition]] = field(
default_factory=dict
)
def register_provider_variables(
self,
provider_type: ServiceProviderType,
variables: list[TemplateVariableDefinition],
) -> None:
"""Register variables for a specific provider type."""
self._provider_vars[provider_type] = variables
def get_variables(
self, provider_type: ServiceProviderType
) -> list[TemplateVariableDefinition]:
"""Get all variables available for a provider type (base + provider-specific)."""
provider_vars = self._provider_vars.get(provider_type, [])
return BASE_VARIABLES + provider_vars
def get_base_variables(self) -> list[TemplateVariableDefinition]:
"""Get base variables available to all providers."""
return list(BASE_VARIABLES)
def get_provider_specific_variables(
self, provider_type: ServiceProviderType
) -> list[TemplateVariableDefinition]:
"""Get only the provider-specific variables (excluding base)."""
return list(self._provider_vars.get(provider_type, []))
def get_variable_names(self, provider_type: ServiceProviderType) -> set[str]:
"""Get the set of all variable names available for a provider type."""
return {v.name for v in self.get_variables(provider_type)}
# Global registry instance
registry = VariableRegistry()