"""API client for communicating with the Media Server.""" from __future__ import annotations import asyncio import hashlib import logging import uuid from collections.abc import Callable from typing import Any import aiohttp from aiohttp import ClientError, ClientResponseError from .const import ( API_HEALTH, API_STATUS, API_PLAY, API_PAUSE, API_STOP, API_NEXT, API_PREVIOUS, API_VOLUME, API_MUTE, API_SEEK, API_TURN_ON, API_TURN_OFF, API_TOGGLE, API_SCRIPTS_LIST, API_SCRIPTS_EXECUTE, API_BROWSER_FOLDERS, API_BROWSER_BROWSE, API_BROWSER_PLAY, API_DISPLAY_MONITORS, API_FOREGROUND, API_DISPLAY_BRIGHTNESS, API_DISPLAY_POWER, API_DISPLAY_CONTRAST, API_DISPLAY_INPUT_SOURCE, API_DISPLAY_COLOR_PRESET, API_DISPLAY_PICTURE_MODE, ) _LOGGER = logging.getLogger(__name__) class MediaServerError(Exception): """Base exception for Media Server errors.""" class MediaServerConnectionError(MediaServerError): """Exception for connection errors.""" class MediaServerAuthError(MediaServerError): """Exception for authentication errors.""" class MediaServerRateLimitError(MediaServerError): """Raised when the server replies with HTTP 429. The media server's in-process token-bucket limiter (v0.3.0+) returns 429 with a ``Retry-After`` header — capture it so callers can back off. """ def __init__(self, message: str, retry_after: float | None = None) -> None: super().__init__(message) self.retry_after = retry_after class MediaServerClient: """Client for the Media Server REST API.""" def __init__( self, host: str, port: int, token: str, session: aiohttp.ClientSession | None = None, use_ssl: bool = False, verify_ssl: bool = True, ) -> None: """Initialize the client. Args: host: Server hostname or IP address port: Server port token: API authentication token session: Optional aiohttp session (will create one if not provided) use_ssl: If True, talk HTTPS instead of HTTP. The media server v0.3.0+ supports ``ssl_certfile`` / ``ssl_keyfile`` in ``config.yaml``. verify_ssl: If False, skip TLS certificate verification (only needed for self-signed certs on a trusted LAN). """ self._host = host self._port = int(port) # Ensure port is an integer self._token = token self._session = session self._own_session = session is None self._use_ssl = use_ssl # aiohttp accepts ``ssl=False`` to disable verification; ``None`` keeps # the default verifying SSLContext. self._ssl: bool | None = False if (use_ssl and not verify_ssl) else None scheme = "https" if use_ssl else "http" self._base_url = f"{scheme}://{host}:{self._port}" async def _ensure_session(self) -> aiohttp.ClientSession: """Ensure we have an aiohttp session.""" if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() self._own_session = True return self._session async def close(self) -> None: """Close the client session.""" if self._own_session and self._session and not self._session.closed: await self._session.close() def _get_headers(self) -> dict[str, str]: """Get headers for API requests. When no token is configured the media server runs in anonymous mode (``auth.auth_enabled()`` returns False), so we omit the Authorization header entirely rather than sending ``Bearer `` with an empty value. Every request carries a per-call ``X-Request-ID`` that the media server echoes back into its log lines (audit log + access log) so a problem in HA can be correlated to the matching server-side entry. The id is a UUID4 hex (32 chars) which fits the server's ``[A-Za-z0-9._-]{1,128}`` allow-list and is therefore preserved verbatim instead of being replaced by a fresh server-side id. """ headers = { "Content-Type": "application/json", "X-Request-ID": uuid.uuid4().hex, } if self._token: headers["Authorization"] = f"Bearer {self._token}" return headers async def _request( self, method: str, endpoint: str, json_data: dict | None = None, auth_required: bool = True, ) -> dict[str, Any]: """Make an API request. Args: method: HTTP method (GET, POST, etc.) endpoint: API endpoint path json_data: Optional JSON body data auth_required: Whether to include authentication header Returns: Response data as dictionary Raises: MediaServerConnectionError: On connection errors MediaServerAuthError: On authentication errors MediaServerError: On other errors """ session = await self._ensure_session() url = f"{self._base_url}{endpoint}" # Always send X-Request-ID, even on unauthenticated calls — it's the # observability hook, not an auth token, and the health endpoint # benefits from being log-correlated just like every other. headers = self._get_headers() if auth_required else { "Content-Type": "application/json", "X-Request-ID": uuid.uuid4().hex, } try: timeout = aiohttp.ClientTimeout(total=10) async with session.request( method, url, headers=headers, json=json_data, timeout=timeout, ssl=self._ssl, ) as response: if response.status == 401: raise MediaServerAuthError("Invalid API token") if response.status == 403: raise MediaServerAuthError("Access forbidden") if response.status == 429: retry_after_raw = response.headers.get("Retry-After", "") try: retry_after = float(retry_after_raw) if retry_after_raw else None except ValueError: retry_after = None raise MediaServerRateLimitError( f"Rate limited by server (retry after {retry_after}s)", retry_after=retry_after, ) response.raise_for_status() return await response.json() except aiohttp.ClientConnectorError as err: raise MediaServerConnectionError( f"Cannot connect to server at {self._base_url}: {err}" ) from err except ClientResponseError as err: raise MediaServerError(f"API error: {err.status} {err.message}") from err except ClientError as err: raise MediaServerConnectionError(f"Connection error: {err}") from err async def check_connection(self) -> bool: """Check if the server is reachable and token is valid. Returns: True if connection is successful """ try: # First check health (no auth) await self._request("GET", API_HEALTH, auth_required=False) # Then check auth by getting status await self._request("GET", API_STATUS) return True except MediaServerError: return False async def get_health(self) -> dict[str, Any]: """Get server health status (no authentication required). Returns: Health status data """ return await self._request("GET", API_HEALTH, auth_required=False) async def get_status(self) -> dict[str, Any]: """Get current media playback status. Returns: Media status data including state, title, artist, volume, etc. """ data = await self._request("GET", API_STATUS) # Convert relative album_art_url to absolute URL with cache-buster # (and token only when auth is enabled on the server side). if data.get("album_art_url") and data["album_art_url"].startswith("/"): # Add track info hash to force HA to re-fetch when track changes import hashlib track_id = f"{data.get('title', '')}-{data.get('artist', '')}" track_hash = hashlib.md5(track_id.encode()).hexdigest()[:8] token_param = f"token={self._token}&" if self._token else "" data["album_art_url"] = ( f"{self._base_url}{data['album_art_url']}?{token_param}t={track_hash}" ) return data async def play(self) -> dict[str, Any]: """Resume or start playback. Returns: Response data """ return await self._request("POST", API_PLAY) async def pause(self) -> dict[str, Any]: """Pause playback. Returns: Response data """ return await self._request("POST", API_PAUSE) async def stop(self) -> dict[str, Any]: """Stop playback. Returns: Response data """ return await self._request("POST", API_STOP) async def next_track(self) -> dict[str, Any]: """Skip to next track. Returns: Response data """ return await self._request("POST", API_NEXT) async def previous_track(self) -> dict[str, Any]: """Go to previous track. Returns: Response data """ return await self._request("POST", API_PREVIOUS) async def set_volume(self, volume: int) -> dict[str, Any]: """Set the volume level. Args: volume: Volume level (0-100) Returns: Response data """ return await self._request("POST", API_VOLUME, {"volume": volume}) async def toggle_mute(self) -> dict[str, Any]: """Toggle mute state. Returns: Response data with new mute state """ return await self._request("POST", API_MUTE) async def seek(self, position: float) -> dict[str, Any]: """Seek to a position in the current track. Args: position: Position in seconds Returns: Response data """ return await self._request("POST", API_SEEK, {"position": position}) async def turn_on(self) -> dict[str, Any]: """Send turn on command. Returns: Response data """ return await self._request("POST", API_TURN_ON) async def turn_off(self) -> dict[str, Any]: """Send turn off command. Returns: Response data """ return await self._request("POST", API_TURN_OFF) async def toggle(self) -> dict[str, Any]: """Send toggle command. Returns: Response data """ return await self._request("POST", API_TOGGLE) async def list_scripts(self) -> list[dict[str, Any]]: """List available scripts on the server. Returns: List of scripts with name, description, and timeout """ return await self._request("GET", API_SCRIPTS_LIST) async def execute_script( self, script_name: str, params: dict[str, str | int | float | bool] | None = None, ) -> dict[str, Any]: """Execute a script on the server. The server (v0.3.0+) rate-limits ``/api/scripts/execute`` at 10/min per peer. If we hit 429 we wait for ``Retry-After`` (capped at 30 s) and retry once — enough for a brief HA-side burst without masking a real sustained overload, which falls through as ``MediaServerRateLimitError``. Args: script_name: Name of the script to execute params: Optional named parameters (validated against script schema) Returns: Execution result with success, exit_code, stdout, stderr """ endpoint = f"{API_SCRIPTS_EXECUTE}/{script_name}" json_data = {"params": params or {}} try: return await self._request("POST", endpoint, json_data) except MediaServerRateLimitError as err: wait = min(err.retry_after or 5.0, 30.0) _LOGGER.warning( "execute_script(%s) rate-limited, retrying after %.1fs", script_name, wait, ) await asyncio.sleep(wait) return await self._request("POST", endpoint, json_data) async def get_media_folders(self) -> dict[str, dict[str, Any]]: """Get configured media folders. Returns: Dictionary of folders with folder_id as key and folder config as value """ response = await self._request("GET", API_BROWSER_FOLDERS) # Server >= c9ee41a wraps the result as {"folders": {...}, "management_enabled": bool}. # Older servers returned the flat folder dict directly. if isinstance(response, dict) and "folders" in response and isinstance(response["folders"], dict): return response["folders"] return response async def browse_folder( self, folder_id: str, path: str = "", offset: int = 0, limit: int = 100 ) -> dict[str, Any]: """Browse a media folder. Args: folder_id: ID of the folder to browse path: Path within the folder (empty for root) offset: Pagination offset limit: Number of items to return Returns: Dictionary with current_path, parent_path, items, total, offset, limit """ params = { "folder_id": folder_id, "path": path, "offset": offset, "limit": limit, } query_string = "&".join(f"{k}={v}" for k, v in params.items()) endpoint = f"{API_BROWSER_BROWSE}?{query_string}" return await self._request("GET", endpoint) async def play_media_file(self, file_path: str) -> dict[str, Any]: """Play a media file by absolute path. Args: file_path: Absolute path to the media file Returns: Response data with success status """ return await self._request("POST", API_BROWSER_PLAY, {"path": file_path}) async def get_display_monitors(self) -> list[dict[str, Any]]: """Get list of connected monitors with brightness, power, DDC/CI state. Uses the server's short TTL cache so per-entity polling does not pay the full DDC/CI probe cost on every call. """ return await self._request("GET", API_DISPLAY_MONITORS) async def set_display_brightness(self, monitor_id: int, brightness: int) -> dict[str, Any]: """Set brightness for a specific monitor. Args: monitor_id: Monitor index brightness: Brightness level (0-100) Returns: Response data with success status """ return await self._request( "POST", f"{API_DISPLAY_BRIGHTNESS}/{monitor_id}", {"brightness": brightness} ) async def set_display_power(self, monitor_id: int, on: bool) -> dict[str, Any]: """Set power state for a specific monitor. Args: monitor_id: Monitor index on: True to turn on, False to turn off Returns: Response data with success status """ return await self._request( "POST", f"{API_DISPLAY_POWER}/{monitor_id}", {"on": on} ) async def set_display_contrast(self, monitor_id: int, contrast: int) -> dict[str, Any]: """Set DDC/CI contrast for a specific monitor (0-100).""" return await self._request( "POST", f"{API_DISPLAY_CONTRAST}/{monitor_id}", {"contrast": contrast} ) async def set_display_input_source(self, monitor_id: int, source: str) -> dict[str, Any]: """Switch a monitor's DDC/CI input source by enum name (e.g. 'HDMI1').""" return await self._request( "POST", f"{API_DISPLAY_INPUT_SOURCE}/{monitor_id}", {"source": source} ) async def set_display_color_preset(self, monitor_id: int, preset: str) -> dict[str, Any]: """Apply a DDC/CI color preset by enum name (e.g. 'COLOR_TEMP_6500K').""" return await self._request( "POST", f"{API_DISPLAY_COLOR_PRESET}/{monitor_id}", {"preset": preset} ) async def set_display_picture_mode(self, monitor_id: int, code: int) -> dict[str, Any]: """Apply a DDC/CI picture/scene mode (VCP 0xDC) by raw code.""" return await self._request( "POST", f"{API_DISPLAY_PICTURE_MODE}/{monitor_id}", {"code": code} ) async def get_foreground(self) -> dict[str, Any]: """Get the foreground window/process snapshot. Returns the structured payload described in the media server's ``ForegroundInfo`` dataclass: process name, window title, fullscreen flag, owning monitor, geometry, and process start time. """ return await self._request("GET", API_FOREGROUND) class MediaServerWebSocket: """WebSocket client for real-time media status updates.""" def __init__( self, host: str, port: int, token: str, on_status_update: Callable[[dict[str, Any]], None], on_disconnect: Callable[[], None] | None = None, on_scripts_changed: Callable[[], None] | None = None, on_foreground_update: Callable[[dict[str, Any]], None] | None = None, use_ssl: bool = False, verify_ssl: bool = True, ) -> None: """Initialize the WebSocket client. Args: host: Server hostname or IP port: Server port token: API authentication token on_status_update: Callback when status update received on_disconnect: Callback when connection lost on_scripts_changed: Callback when scripts have changed on_foreground_update: Callback when foreground process changes use_ssl: If True, talk WSS instead of WS. verify_ssl: If False, skip TLS certificate verification. """ self._host = host self._port = int(port) self._token = token self._on_status_update = on_status_update self._on_disconnect = on_disconnect self._on_scripts_changed = on_scripts_changed self._on_foreground_update = on_foreground_update self._use_ssl = use_ssl self._ssl: bool | None = False if (use_ssl and not verify_ssl) else None # The server's WS endpoint accepts an unauthenticated connection when # api_tokens is empty (see media.py:websocket_endpoint), so we only # append ?token=... when one was configured. Pre-0.3.0 servers only # know the query path; 0.3.0+ servers prefer the ``Sec-WebSocket-Protocol`` # subprotocol (which keeps the token out of URLs / Referer / logs) but # still accept the query as a documented back-compat fallback. We send # both so the integration works against either server version. token_query = f"?token={token}" if token else "" scheme = "wss" if use_ssl else "ws" self._ws_url = f"{scheme}://{host}:{self._port}/api/media/ws{token_query}" self._session: aiohttp.ClientSession | None = None self._ws: aiohttp.ClientWebSocketResponse | None = None self._receive_task: asyncio.Task | None = None self._running = False async def connect(self) -> bool: """Establish WebSocket connection. Returns: True if connection successful """ try: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() ws_kwargs: dict[str, Any] = { "heartbeat": 30, "timeout": aiohttp.ClientTimeout(total=10), } if self._token: # Subprotocol-based auth (preferred by media server v0.3.0+). # aiohttp negotiates this header; if the server doesn't echo # it back (older versions), aiohttp still completes the # handshake — at which point the ?token= query in the URL # takes over. Safe across both server generations. ws_kwargs["protocols"] = [f"media-server.token.{self._token}"] if self._ssl is not None: ws_kwargs["ssl"] = self._ssl self._ws = await self._session.ws_connect(self._ws_url, **ws_kwargs) self._running = True # Start receive loop self._receive_task = asyncio.create_task(self._receive_loop()) _LOGGER.info("WebSocket connected to %s:%s", self._host, self._port) return True except Exception as err: _LOGGER.warning("WebSocket connection failed: %s", err) return False async def disconnect(self) -> None: """Close WebSocket connection.""" self._running = False if self._receive_task: self._receive_task.cancel() try: await self._receive_task except asyncio.CancelledError: pass self._receive_task = None if self._ws and not self._ws.closed: await self._ws.close() self._ws = None if self._session and not self._session.closed: await self._session.close() self._session = None _LOGGER.debug("WebSocket disconnected") async def _receive_loop(self) -> None: """Background loop to receive WebSocket messages.""" while self._running and self._ws and not self._ws.closed: try: msg = await self._ws.receive(timeout=60) if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() msg_type = data.get("type") if msg_type in ("status", "status_update"): status_data = data.get("data", {}) # Convert album art URL to absolute if ( status_data.get("album_art_url") and status_data["album_art_url"].startswith("/") ): track_id = f"{status_data.get('title', '')}-{status_data.get('artist', '')}" track_hash = hashlib.md5(track_id.encode()).hexdigest()[:8] token_param = f"token={self._token}&" if self._token else "" http_scheme = "https" if self._use_ssl else "http" status_data["album_art_url"] = ( f"{http_scheme}://{self._host}:{self._port}" f"{status_data['album_art_url']}?{token_param}t={track_hash}" ) self._on_status_update(status_data) elif msg_type == "scripts_changed": _LOGGER.info("Scripts changed notification received") if self._on_scripts_changed: self._on_scripts_changed() elif msg_type in ("foreground", "foreground_update"): if self._on_foreground_update: self._on_foreground_update(data.get("data", {})) elif msg_type == "pong": _LOGGER.debug("Received pong") elif msg.type == aiohttp.WSMsgType.CLOSED: _LOGGER.warning("WebSocket closed by server") break elif msg.type == aiohttp.WSMsgType.ERROR: _LOGGER.error("WebSocket error: %s", self._ws.exception()) break except asyncio.TimeoutError: # Send ping to keep connection alive if self._ws and not self._ws.closed: try: await self._ws.send_json({"type": "ping"}) except Exception: break except asyncio.CancelledError: break except Exception as err: _LOGGER.error("WebSocket receive error: %s", err) break # Connection lost, notify callback if self._on_disconnect: self._on_disconnect() @property def is_connected(self) -> bool: """Return True if WebSocket is connected.""" return self._ws is not None and not self._ws.closed