"""Emby REST API client.""" from __future__ import annotations import logging from typing import Any import aiohttp from .const import ( COMMAND_MUTE, COMMAND_SET_VOLUME, COMMAND_UNMUTE, DEFAULT_PORT, DEVICE_ID, DEVICE_NAME, DEVICE_VERSION, ENDPOINT_ITEMS, ENDPOINT_SESSIONS, ENDPOINT_SYSTEM_INFO, ENDPOINT_USERS, PLAY_COMMAND_PLAY_NOW, PLAYBACK_COMMAND_NEXT_TRACK, PLAYBACK_COMMAND_PAUSE, PLAYBACK_COMMAND_PREVIOUS_TRACK, PLAYBACK_COMMAND_SEEK, PLAYBACK_COMMAND_STOP, PLAYBACK_COMMAND_UNPAUSE, ) _LOGGER = logging.getLogger(__name__) class EmbyApiError(Exception): """Base exception for Emby API errors.""" class EmbyConnectionError(EmbyApiError): """Exception for connection errors.""" class EmbyAuthenticationError(EmbyApiError): """Exception for authentication errors.""" class EmbyApiClient: """Emby REST API client.""" def __init__( self, host: str, api_key: str, port: int = DEFAULT_PORT, ssl: bool = False, session: aiohttp.ClientSession | None = None, ) -> None: """Initialize the Emby API client.""" self._host = host self._port = port self._api_key = api_key self._ssl = ssl self._session = session self._owns_session = session is None protocol = "https" if ssl else "http" self._base_url = f"{protocol}://{host}:{port}" @property def base_url(self) -> str: """Return the base URL.""" return self._base_url async def _ensure_session(self) -> aiohttp.ClientSession: """Ensure an aiohttp session exists.""" if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() self._owns_session = True return self._session async def close(self) -> None: """Close the aiohttp session if we own it.""" if self._owns_session and self._session and not self._session.closed: await self._session.close() def _get_headers(self) -> dict[str, str]: """Get headers for API requests.""" return { "X-Emby-Token": self._api_key, "X-Emby-Client": DEVICE_NAME, "X-Emby-Device-Name": DEVICE_NAME, "X-Emby-Device-Id": DEVICE_ID, "X-Emby-Client-Version": DEVICE_VERSION, "Content-Type": "application/json", "Accept": "application/json", } async def _request( self, method: str, endpoint: str, params: dict[str, Any] | None = None, data: dict[str, Any] | None = None, ) -> Any: """Make an API request.""" session = await self._ensure_session() url = f"{self._base_url}{endpoint}" _LOGGER.debug("Making %s request to %s", method, url) try: async with session.request( method, url, headers=self._get_headers(), params=params, json=data, timeout=aiohttp.ClientTimeout(total=15), ssl=False if not self._ssl else None, # Disable SSL verification if not using SSL ) as response: _LOGGER.debug("Response status: %s", response.status) if response.status == 401: raise EmbyAuthenticationError("Invalid API key") if response.status == 403: raise EmbyAuthenticationError("Access forbidden") if response.status >= 400: text = await response.text() _LOGGER.error("API error %s: %s", response.status, text) raise EmbyApiError(f"API error {response.status}: {text}") content_type = response.headers.get("Content-Type", "") if "application/json" in content_type: return await response.json() return await response.text() except aiohttp.ClientError as err: _LOGGER.error("Connection error to %s: %s", url, err) raise EmbyConnectionError(f"Connection error: {err}") from err except TimeoutError as err: _LOGGER.error("Timeout connecting to %s", url) raise EmbyConnectionError(f"Connection timeout: {err}") from err async def _get( self, endpoint: str, params: dict[str, Any] | None = None ) -> Any: """Make a GET request.""" return await self._request("GET", endpoint, params=params) async def _post( self, endpoint: str, params: dict[str, Any] | None = None, data: dict[str, Any] | None = None, ) -> Any: """Make a POST request.""" return await self._request("POST", endpoint, params=params, data=data) # ------------------------------------------------------------------------- # Authentication & System # ------------------------------------------------------------------------- async def test_connection(self) -> dict[str, Any]: """Test the connection to the Emby server. Tries both /emby/System/Info and /System/Info endpoints. Returns server info if successful. """ # Try with /emby prefix first (standard Emby) try: _LOGGER.debug("Trying connection with /emby prefix") return await self._get(ENDPOINT_SYSTEM_INFO) except (EmbyConnectionError, EmbyApiError) as err: _LOGGER.debug("Connection with /emby prefix failed: %s", err) # Try without /emby prefix (some Emby configurations) try: _LOGGER.debug("Trying connection without /emby prefix") return await self._get("/System/Info") except (EmbyConnectionError, EmbyApiError) as err: _LOGGER.debug("Connection without /emby prefix failed: %s", err) raise EmbyConnectionError( f"Cannot connect to Emby server at {self._base_url}. " "Please verify the host, port, and that the server is running." ) from err async def get_server_info(self) -> dict[str, Any]: """Get server information.""" return await self._get(ENDPOINT_SYSTEM_INFO) async def get_users(self) -> list[dict[str, Any]]: """Get list of users.""" return await self._get(ENDPOINT_USERS) # ------------------------------------------------------------------------- # Sessions # ------------------------------------------------------------------------- async def get_sessions(self) -> list[dict[str, Any]]: """Get all active sessions.""" return await self._get(ENDPOINT_SESSIONS) async def get_controllable_sessions( self, user_id: str | None = None ) -> list[dict[str, Any]]: """Get sessions that can be remotely controlled.""" params = {} if user_id: params["ControllableByUserId"] = user_id sessions = await self._get(ENDPOINT_SESSIONS, params=params) return [s for s in sessions if s.get("SupportsRemoteControl")] # ------------------------------------------------------------------------- # Playback Control # ------------------------------------------------------------------------- async def play_media( self, session_id: str, item_ids: list[str], play_command: str = PLAY_COMMAND_PLAY_NOW, start_position_ticks: int = 0, ) -> None: """Send play command to a session.""" endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Playing" params = { "ItemIds": ",".join(item_ids), "PlayCommand": play_command, } if start_position_ticks > 0: params["StartPositionTicks"] = start_position_ticks _LOGGER.debug( "Sending play_media: endpoint=%s, session_id=%s, item_ids=%s, command=%s", endpoint, session_id, item_ids, play_command, ) await self._post(endpoint, params=params) async def _playback_command(self, session_id: str, command: str) -> None: """Send a playback command to a session.""" endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Playing/{command}" await self._post(endpoint) async def play(self, session_id: str) -> None: """Resume playback.""" await self._playback_command(session_id, PLAYBACK_COMMAND_UNPAUSE) async def pause(self, session_id: str) -> None: """Pause playback.""" await self._playback_command(session_id, PLAYBACK_COMMAND_PAUSE) async def stop(self, session_id: str) -> None: """Stop playback.""" await self._playback_command(session_id, PLAYBACK_COMMAND_STOP) async def next_track(self, session_id: str) -> None: """Skip to next track.""" await self._playback_command(session_id, PLAYBACK_COMMAND_NEXT_TRACK) async def previous_track(self, session_id: str) -> None: """Skip to previous track.""" await self._playback_command(session_id, PLAYBACK_COMMAND_PREVIOUS_TRACK) async def seek(self, session_id: str, position_ticks: int) -> None: """Seek to a position.""" endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Playing/{PLAYBACK_COMMAND_SEEK}" await self._post(endpoint, params={"SeekPositionTicks": position_ticks}) # ------------------------------------------------------------------------- # Volume Control # ------------------------------------------------------------------------- async def _send_command( self, session_id: str, command: str, arguments: dict[str, Any] | None = None ) -> None: """Send a general command to a session.""" endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Command" data: dict[str, Any] = {"Name": command} if arguments: # Emby expects arguments as strings data["Arguments"] = {k: str(v) for k, v in arguments.items()} await self._post(endpoint, data=data) async def set_volume(self, session_id: str, volume: int) -> None: """Set volume level (0-100).""" volume = max(0, min(100, volume)) await self._send_command(session_id, COMMAND_SET_VOLUME, {"Volume": volume}) async def mute(self, session_id: str) -> None: """Mute the session.""" await self._send_command(session_id, COMMAND_MUTE) async def unmute(self, session_id: str) -> None: """Unmute the session.""" await self._send_command(session_id, COMMAND_UNMUTE) # ------------------------------------------------------------------------- # Library Browsing # ------------------------------------------------------------------------- async def get_views(self, user_id: str) -> list[dict[str, Any]]: """Get user's library views (top-level folders).""" endpoint = f"{ENDPOINT_USERS}/{user_id}/Views" result = await self._get(endpoint) return result.get("Items", []) async def get_items( self, user_id: str, parent_id: str | None = None, include_item_types: list[str] | None = None, recursive: bool = False, sort_by: str = "SortName", sort_order: str = "Ascending", start_index: int = 0, limit: int = 100, search_term: str | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: """Get items from the library.""" endpoint = f"{ENDPOINT_USERS}/{user_id}/Items" params: dict[str, Any] = { "SortBy": sort_by, "SortOrder": sort_order, "StartIndex": start_index, "Limit": limit, "Recursive": str(recursive).lower(), } if parent_id: params["ParentId"] = parent_id if include_item_types: params["IncludeItemTypes"] = ",".join(include_item_types) if search_term: params["SearchTerm"] = search_term if fields: params["Fields"] = ",".join(fields) else: params["Fields"] = "PrimaryImageAspectRatio,BasicSyncInfo" return await self._get(endpoint, params=params) async def get_item(self, user_id: str, item_id: str) -> dict[str, Any]: """Get a single item by ID.""" endpoint = f"{ENDPOINT_USERS}/{user_id}/Items/{item_id}" return await self._get(endpoint) async def get_artists( self, user_id: str, parent_id: str | None = None, start_index: int = 0, limit: int = 100, ) -> dict[str, Any]: """Get artists.""" endpoint = "/emby/Artists" params: dict[str, Any] = { "UserId": user_id, "StartIndex": start_index, "Limit": limit, "SortBy": "SortName", "SortOrder": "Ascending", } if parent_id: params["ParentId"] = parent_id return await self._get(endpoint, params=params) def get_image_url( self, item_id: str, image_type: str = "Primary", max_width: int | None = None, max_height: int | None = None, ) -> str: """Get the URL for an item's image.""" url = f"{self._base_url}{ENDPOINT_ITEMS}/{item_id}/Images/{image_type}" params = [] if max_width: params.append(f"maxWidth={max_width}") if max_height: params.append(f"maxHeight={max_height}") params.append(f"api_key={self._api_key}") if params: url += "?" + "&".join(params) return url