"""Service provider management API routes.""" import logging from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from typing import Any import aiohttp from ..auth.dependencies import get_current_user from ..database.engine import get_session from ..database.models import ServiceProvider, User from ..services import make_immich_provider, make_gitea_provider _LOGGER = logging.getLogger(__name__) router = APIRouter(prefix="/api/providers", tags=["providers"]) class ProviderCreate(BaseModel): type: str name: str icon: str = "" config: dict[str, Any] = {} class ProviderUpdate(BaseModel): name: str | None = None icon: str | None = None config: dict[str, Any] | None = None class ProviderResponse(BaseModel): id: int type: str name: str icon: str config: dict[str, Any] created_at: str @router.get("") async def list_providers( user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """List all service providers for the current user.""" result = await session.exec( select(ServiceProvider).where(ServiceProvider.user_id == user.id) ) providers = result.all() return [_provider_response(p) for p in providers] @router.post("", status_code=status.HTTP_201_CREATED) async def create_provider( body: ProviderCreate, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Add a new service provider (validates connection for known types).""" # Validate connection for known provider types if body.type == "immich": from notify_bridge_core.providers.immich import ImmichServiceProvider config = body.config async with aiohttp.ClientSession() as http_session: immich = ImmichServiceProvider( http_session, config.get("url", ""), config.get("api_key", ""), config.get("external_domain"), body.name, ) test_result = await immich.test_connection() if not test_result.get("ok"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=test_result.get("message", f"Cannot connect to {body.type} provider"), ) # Store external_domain from server config if available if test_result.get("external_domain"): config["external_domain"] = test_result["external_domain"] elif body.type == "gitea": config = body.config # api_token is optional (webhook_secret is required, but token only for repo listing) if config.get("api_token"): async with aiohttp.ClientSession() as http_session: from notify_bridge_core.providers.gitea import GiteaServiceProvider gitea = GiteaServiceProvider( http_session, config.get("url", ""), config.get("api_token", ""), body.name, ) test_result = await gitea.test_connection() if not test_result.get("ok"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=test_result.get("message", "Cannot connect to Gitea"), ) # Scheduler: no validation needed (virtual provider) provider = ServiceProvider( user_id=user.id, type=body.type, name=body.name, icon=body.icon, config=body.config, ) session.add(provider) await session.commit() await session.refresh(provider) return _provider_response(provider) @router.get("/capabilities") async def list_provider_capabilities(): """List capabilities for all registered provider types.""" from notify_bridge_core.providers.capabilities import get_all_capabilities result = {} for pt, caps in get_all_capabilities().items(): result[pt] = { "provider_type": caps.provider_type, "display_name": caps.display_name, "notification_slots": caps.notification_slots, "command_slots": caps.command_slots, "events": caps.events, "commands": caps.commands, "supported_filters": caps.supported_filters, "webhook_based": caps.webhook_based, } return result @router.get("/capabilities/{provider_type}") async def get_provider_capabilities(provider_type: str): """Get capabilities for a provider type (events, slots, commands).""" from notify_bridge_core.providers.capabilities import get_capabilities caps = get_capabilities(provider_type) if not caps: raise HTTPException(status_code=404, detail=f"Unknown provider type: {provider_type}") return { "provider_type": caps.provider_type, "display_name": caps.display_name, "notification_slots": caps.notification_slots, "command_slots": caps.command_slots, "events": caps.events, "commands": caps.commands, "supported_filters": caps.supported_filters, "webhook_based": caps.webhook_based, } @router.get("/{provider_id}") async def get_provider( provider_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Get a specific service provider.""" provider = await _get_user_provider(session, provider_id, user.id) return _provider_response(provider) @router.put("/{provider_id}") async def update_provider( provider_id: int, body: ProviderUpdate, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Update a service provider.""" provider = await _get_user_provider(session, provider_id, user.id) if body.name is not None: provider.name = body.name if body.icon is not None: provider.icon = body.icon config_changed = body.config is not None and body.config != provider.config if body.config is not None: provider.config = body.config # Re-validate connection when config changes for known provider types if config_changed and provider.type == "immich": try: async with aiohttp.ClientSession() as http_session: immich = make_immich_provider(http_session, provider) test_result = await immich.test_connection() if not test_result.get("ok"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=test_result.get("message", f"Cannot connect to {provider.type} provider"), ) if test_result.get("external_domain"): provider.config = {**provider.config, "external_domain": test_result["external_domain"]} except aiohttp.ClientError as err: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Connection error: {err}", ) elif config_changed and provider.type == "gitea": if provider.config.get("api_token"): try: async with aiohttp.ClientSession() as http_session: gitea = make_gitea_provider(http_session, provider) test_result = await gitea.test_connection() if not test_result.get("ok"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=test_result.get("message", "Cannot connect to Gitea"), ) except aiohttp.ClientError as err: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Connection error: {err}", ) session.add(provider) await session.commit() await session.refresh(provider) return _provider_response(provider) @router.delete("/{provider_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_provider( provider_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Delete a service provider.""" from .delete_protection import check_service_provider, raise_if_used provider = await _get_user_provider(session, provider_id, user.id) raise_if_used(await check_service_provider(session, provider.id), provider.name) await session.delete(provider) await session.commit() @router.post("/{provider_id}/test") async def test_provider( provider_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Check if a service provider is reachable.""" provider = await _get_user_provider(session, provider_id, user.id) if provider.type == "immich": async with aiohttp.ClientSession() as http_session: immich = make_immich_provider(http_session, provider) return await immich.test_connection() if provider.type == "gitea": if not provider.config.get("api_token"): return {"ok": True, "message": "Gitea webhook-only mode (no API token for testing)"} async with aiohttp.ClientSession() as http_session: gitea = make_gitea_provider(http_session, provider) return await gitea.test_connection() if provider.type == "scheduler": return {"ok": True, "message": "Virtual provider — always available"} return {"ok": False, "message": f"Unknown provider type: {provider.type}"} @router.get("/{provider_id}/collections") async def list_collections( provider_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Fetch collections from a service provider.""" provider = await _get_user_provider(session, provider_id, user.id) if provider.type == "immich": async with aiohttp.ClientSession() as http_session: immich = make_immich_provider(http_session, provider) return await immich.list_collections() if provider.type == "gitea": if not provider.config.get("api_token"): return [] async with aiohttp.ClientSession() as http_session: gitea = make_gitea_provider(http_session, provider) return await gitea.list_collections() return [] @router.get("/{provider_id}/albums/{album_id}/shared-links") async def get_album_shared_links( provider_id: int, album_id: str, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Check shared links for a specific album.""" provider = await _get_user_provider(session, provider_id, user.id) if provider.type == "immich": async with aiohttp.ClientSession() as http_session: immich = make_immich_provider(http_session, provider) links = await immich.client.get_shared_links(album_id) return [ { "id": link.id, "key": link.key, "has_password": link.has_password, "is_expired": link.is_expired, "is_accessible": link.is_accessible, } for link in links ] return [] @router.post("/{provider_id}/albums/{album_id}/shared-links") async def create_album_shared_link( provider_id: int, album_id: str, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): """Auto-create a public shared link for an album.""" provider = await _get_user_provider(session, provider_id, user.id) if provider.type == "immich": async with aiohttp.ClientSession() as http_session: immich = make_immich_provider(http_session, provider) success = await immich.client.create_shared_link(album_id) if success: return {"success": True} from fastapi import HTTPException raise HTTPException(status_code=400, detail="Failed to create shared link") from fastapi import HTTPException raise HTTPException(status_code=400, detail="Provider type does not support shared links") def _provider_response(p: ServiceProvider) -> dict: """Build a safe response dict for a provider.""" config = dict(p.config) # Mask sensitive fields for secret_field in ("api_key", "api_token", "webhook_secret"): if secret_field in config: key = config[secret_field] config[secret_field] = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" return { "id": p.id, "type": p.type, "name": p.name, "icon": p.icon, "config": config, "created_at": p.created_at.isoformat(), } async def _get_user_provider( session: AsyncSession, provider_id: int, user_id: int ) -> ServiceProvider: """Get a provider owned by the user, or raise 404.""" provider = await session.get(ServiceProvider, provider_id) if not provider or provider.user_id != user_id: raise HTTPException(status_code=404, detail="Provider not found") return provider