"""API client for communicating with the Media Server.""" from __future__ import annotations import asyncio import hashlib import logging from collections.abc import Callable from typing import Any import aiohttp from aiohttp import ClientError, ClientResponseError from .const import ( API_HEALTH, API_STATUS, API_PLAY, API_PAUSE, API_STOP, API_NEXT, API_PREVIOUS, API_VOLUME, API_MUTE, API_SEEK, API_TURN_ON, API_TURN_OFF, API_TOGGLE, API_SCRIPTS_LIST, API_SCRIPTS_EXECUTE, ) _LOGGER = logging.getLogger(__name__) class MediaServerError(Exception): """Base exception for Media Server errors.""" class MediaServerConnectionError(MediaServerError): """Exception for connection errors.""" class MediaServerAuthError(MediaServerError): """Exception for authentication errors.""" class MediaServerClient: """Client for the Media Server REST API.""" def __init__( self, host: str, port: int, token: str, session: aiohttp.ClientSession | None = None, ) -> None: """Initialize the client. Args: host: Server hostname or IP address port: Server port token: API authentication token session: Optional aiohttp session (will create one if not provided) """ self._host = host self._port = int(port) # Ensure port is an integer self._token = token self._session = session self._own_session = session is None self._base_url = f"http://{host}:{self._port}" async def _ensure_session(self) -> aiohttp.ClientSession: """Ensure we have an aiohttp session.""" if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() self._own_session = True return self._session async def close(self) -> None: """Close the client session.""" if self._own_session and self._session and not self._session.closed: await self._session.close() def _get_headers(self) -> dict[str, str]: """Get headers for API requests.""" return { "Authorization": f"Bearer {self._token}", "Content-Type": "application/json", } async def _request( self, method: str, endpoint: str, json_data: dict | None = None, auth_required: bool = True, ) -> dict[str, Any]: """Make an API request. Args: method: HTTP method (GET, POST, etc.) endpoint: API endpoint path json_data: Optional JSON body data auth_required: Whether to include authentication header Returns: Response data as dictionary Raises: MediaServerConnectionError: On connection errors MediaServerAuthError: On authentication errors MediaServerError: On other errors """ session = await self._ensure_session() url = f"{self._base_url}{endpoint}" headers = self._get_headers() if auth_required else {} try: timeout = aiohttp.ClientTimeout(total=10) async with session.request( method, url, headers=headers, json=json_data, timeout=timeout ) as response: if response.status == 401: raise MediaServerAuthError("Invalid API token") if response.status == 403: raise MediaServerAuthError("Access forbidden") response.raise_for_status() return await response.json() except aiohttp.ClientConnectorError as err: raise MediaServerConnectionError( f"Cannot connect to server at {self._base_url}: {err}" ) from err except ClientResponseError as err: raise MediaServerError(f"API error: {err.status} {err.message}") from err except ClientError as err: raise MediaServerConnectionError(f"Connection error: {err}") from err async def check_connection(self) -> bool: """Check if the server is reachable and token is valid. Returns: True if connection is successful """ try: # First check health (no auth) await self._request("GET", API_HEALTH, auth_required=False) # Then check auth by getting status await self._request("GET", API_STATUS) return True except MediaServerError: return False async def get_health(self) -> dict[str, Any]: """Get server health status (no authentication required). Returns: Health status data """ return await self._request("GET", API_HEALTH, auth_required=False) async def get_status(self) -> dict[str, Any]: """Get current media playback status. Returns: Media status data including state, title, artist, volume, etc. """ data = await self._request("GET", API_STATUS) # Convert relative album_art_url to absolute URL with token and cache-buster if data.get("album_art_url") and data["album_art_url"].startswith("/"): # Add track info hash to force HA to re-fetch when track changes import hashlib track_id = f"{data.get('title', '')}-{data.get('artist', '')}" track_hash = hashlib.md5(track_id.encode()).hexdigest()[:8] data["album_art_url"] = f"{self._base_url}{data['album_art_url']}?token={self._token}&t={track_hash}" return data async def play(self) -> dict[str, Any]: """Resume or start playback. Returns: Response data """ return await self._request("POST", API_PLAY) async def pause(self) -> dict[str, Any]: """Pause playback. Returns: Response data """ return await self._request("POST", API_PAUSE) async def stop(self) -> dict[str, Any]: """Stop playback. Returns: Response data """ return await self._request("POST", API_STOP) async def next_track(self) -> dict[str, Any]: """Skip to next track. Returns: Response data """ return await self._request("POST", API_NEXT) async def previous_track(self) -> dict[str, Any]: """Go to previous track. Returns: Response data """ return await self._request("POST", API_PREVIOUS) async def set_volume(self, volume: int) -> dict[str, Any]: """Set the volume level. Args: volume: Volume level (0-100) Returns: Response data """ return await self._request("POST", API_VOLUME, {"volume": volume}) async def toggle_mute(self) -> dict[str, Any]: """Toggle mute state. Returns: Response data with new mute state """ return await self._request("POST", API_MUTE) async def seek(self, position: float) -> dict[str, Any]: """Seek to a position in the current track. Args: position: Position in seconds Returns: Response data """ return await self._request("POST", API_SEEK, {"position": position}) async def turn_on(self) -> dict[str, Any]: """Send turn on command. Returns: Response data """ return await self._request("POST", API_TURN_ON) async def turn_off(self) -> dict[str, Any]: """Send turn off command. Returns: Response data """ return await self._request("POST", API_TURN_OFF) async def toggle(self) -> dict[str, Any]: """Send toggle command. Returns: Response data """ return await self._request("POST", API_TOGGLE) async def list_scripts(self) -> list[dict[str, Any]]: """List available scripts on the server. Returns: List of scripts with name, description, and timeout """ return await self._request("GET", API_SCRIPTS_LIST) async def execute_script( self, script_name: str, args: list[str] | None = None ) -> dict[str, Any]: """Execute a script on the server. Args: script_name: Name of the script to execute args: Optional list of arguments to pass to the script Returns: Execution result with success, exit_code, stdout, stderr """ endpoint = f"{API_SCRIPTS_EXECUTE}/{script_name}" json_data = {"args": args or []} return await self._request("POST", endpoint, json_data) class MediaServerWebSocket: """WebSocket client for real-time media status updates.""" def __init__( self, host: str, port: int, token: str, on_status_update: Callable[[dict[str, Any]], None], on_disconnect: Callable[[], None] | None = None, ) -> None: """Initialize the WebSocket client. Args: host: Server hostname or IP port: Server port token: API authentication token on_status_update: Callback when status update received on_disconnect: Callback when connection lost """ self._host = host self._port = int(port) self._token = token self._on_status_update = on_status_update self._on_disconnect = on_disconnect self._ws_url = f"ws://{host}:{self._port}/api/media/ws?token={token}" self._session: aiohttp.ClientSession | None = None self._ws: aiohttp.ClientWebSocketResponse | None = None self._receive_task: asyncio.Task | None = None self._running = False async def connect(self) -> bool: """Establish WebSocket connection. Returns: True if connection successful """ try: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() self._ws = await self._session.ws_connect( self._ws_url, heartbeat=30, timeout=aiohttp.ClientTimeout(total=10), ) self._running = True # Start receive loop self._receive_task = asyncio.create_task(self._receive_loop()) _LOGGER.info("WebSocket connected to %s:%s", self._host, self._port) return True except Exception as err: _LOGGER.warning("WebSocket connection failed: %s", err) return False async def disconnect(self) -> None: """Close WebSocket connection.""" self._running = False if self._receive_task: self._receive_task.cancel() try: await self._receive_task except asyncio.CancelledError: pass self._receive_task = None if self._ws and not self._ws.closed: await self._ws.close() self._ws = None if self._session and not self._session.closed: await self._session.close() self._session = None _LOGGER.debug("WebSocket disconnected") async def _receive_loop(self) -> None: """Background loop to receive WebSocket messages.""" while self._running and self._ws and not self._ws.closed: try: msg = await self._ws.receive(timeout=60) if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() msg_type = data.get("type") if msg_type in ("status", "status_update"): status_data = data.get("data", {}) # Convert album art URL to absolute if ( status_data.get("album_art_url") and status_data["album_art_url"].startswith("/") ): track_id = f"{status_data.get('title', '')}-{status_data.get('artist', '')}" track_hash = hashlib.md5(track_id.encode()).hexdigest()[:8] status_data["album_art_url"] = ( f"http://{self._host}:{self._port}" f"{status_data['album_art_url']}?token={self._token}&t={track_hash}" ) self._on_status_update(status_data) elif msg_type == "pong": _LOGGER.debug("Received pong") elif msg.type == aiohttp.WSMsgType.CLOSED: _LOGGER.warning("WebSocket closed by server") break elif msg.type == aiohttp.WSMsgType.ERROR: _LOGGER.error("WebSocket error: %s", self._ws.exception()) break except asyncio.TimeoutError: # Send ping to keep connection alive if self._ws and not self._ws.closed: try: await self._ws.send_json({"type": "ping"}) except Exception: break except asyncio.CancelledError: break except Exception as err: _LOGGER.error("WebSocket receive error: %s", err) break # Connection lost, notify callback if self._on_disconnect: self._on_disconnect() @property def is_connected(self) -> bool: """Return True if WebSocket is connected.""" return self._ws is not None and not self._ws.closed