393 lines
13 KiB
Python
393 lines
13 KiB
Python
"""Emby REST API client."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from .const import (
|
|
COMMAND_MUTE,
|
|
COMMAND_SET_VOLUME,
|
|
COMMAND_UNMUTE,
|
|
DEFAULT_PORT,
|
|
DEVICE_ID,
|
|
DEVICE_NAME,
|
|
DEVICE_VERSION,
|
|
ENDPOINT_ITEMS,
|
|
ENDPOINT_SESSIONS,
|
|
ENDPOINT_SYSTEM_INFO,
|
|
ENDPOINT_USERS,
|
|
PLAY_COMMAND_PLAY_NOW,
|
|
PLAYBACK_COMMAND_NEXT_TRACK,
|
|
PLAYBACK_COMMAND_PAUSE,
|
|
PLAYBACK_COMMAND_PREVIOUS_TRACK,
|
|
PLAYBACK_COMMAND_SEEK,
|
|
PLAYBACK_COMMAND_STOP,
|
|
PLAYBACK_COMMAND_UNPAUSE,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class EmbyApiError(Exception):
|
|
"""Base exception for Emby API errors."""
|
|
|
|
|
|
class EmbyConnectionError(EmbyApiError):
|
|
"""Exception for connection errors."""
|
|
|
|
|
|
class EmbyAuthenticationError(EmbyApiError):
|
|
"""Exception for authentication errors."""
|
|
|
|
|
|
class EmbyApiClient:
|
|
"""Emby REST API client."""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
api_key: str,
|
|
port: int = DEFAULT_PORT,
|
|
ssl: bool = False,
|
|
session: aiohttp.ClientSession | None = None,
|
|
) -> None:
|
|
"""Initialize the Emby API client."""
|
|
self._host = host
|
|
self._port = port
|
|
self._api_key = api_key
|
|
self._ssl = ssl
|
|
self._session = session
|
|
self._owns_session = session is None
|
|
|
|
protocol = "https" if ssl else "http"
|
|
self._base_url = f"{protocol}://{host}:{port}"
|
|
|
|
@property
|
|
def base_url(self) -> str:
|
|
"""Return the base URL."""
|
|
return self._base_url
|
|
|
|
async def _ensure_session(self) -> aiohttp.ClientSession:
|
|
"""Ensure an aiohttp session exists."""
|
|
if self._session is None or self._session.closed:
|
|
self._session = aiohttp.ClientSession()
|
|
self._owns_session = True
|
|
return self._session
|
|
|
|
async def close(self) -> None:
|
|
"""Close the aiohttp session if we own it."""
|
|
if self._owns_session and self._session and not self._session.closed:
|
|
await self._session.close()
|
|
|
|
def _get_headers(self) -> dict[str, str]:
|
|
"""Get headers for API requests."""
|
|
return {
|
|
"X-Emby-Token": self._api_key,
|
|
"X-Emby-Client": DEVICE_NAME,
|
|
"X-Emby-Device-Name": DEVICE_NAME,
|
|
"X-Emby-Device-Id": DEVICE_ID,
|
|
"X-Emby-Client-Version": DEVICE_VERSION,
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
async def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
params: dict[str, Any] | None = None,
|
|
data: dict[str, Any] | None = None,
|
|
) -> Any:
|
|
"""Make an API request."""
|
|
session = await self._ensure_session()
|
|
url = f"{self._base_url}{endpoint}"
|
|
|
|
_LOGGER.debug("Making %s request to %s", method, url)
|
|
|
|
try:
|
|
async with session.request(
|
|
method,
|
|
url,
|
|
headers=self._get_headers(),
|
|
params=params,
|
|
json=data,
|
|
timeout=aiohttp.ClientTimeout(total=15),
|
|
ssl=False if not self._ssl else None, # Disable SSL verification if not using SSL
|
|
) as response:
|
|
_LOGGER.debug("Response status: %s", response.status)
|
|
|
|
if response.status == 401:
|
|
raise EmbyAuthenticationError("Invalid API key")
|
|
if response.status == 403:
|
|
raise EmbyAuthenticationError("Access forbidden")
|
|
if response.status >= 400:
|
|
text = await response.text()
|
|
_LOGGER.error("API error %s: %s", response.status, text)
|
|
raise EmbyApiError(f"API error {response.status}: {text}")
|
|
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if "application/json" in content_type:
|
|
return await response.json()
|
|
return await response.text()
|
|
|
|
except aiohttp.ClientError as err:
|
|
_LOGGER.error("Connection error to %s: %s", url, err)
|
|
raise EmbyConnectionError(f"Connection error: {err}") from err
|
|
except TimeoutError as err:
|
|
_LOGGER.error("Timeout connecting to %s", url)
|
|
raise EmbyConnectionError(f"Connection timeout: {err}") from err
|
|
|
|
async def _get(
|
|
self, endpoint: str, params: dict[str, Any] | None = None
|
|
) -> Any:
|
|
"""Make a GET request."""
|
|
return await self._request("GET", endpoint, params=params)
|
|
|
|
async def _post(
|
|
self,
|
|
endpoint: str,
|
|
params: dict[str, Any] | None = None,
|
|
data: dict[str, Any] | None = None,
|
|
) -> Any:
|
|
"""Make a POST request."""
|
|
return await self._request("POST", endpoint, params=params, data=data)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Authentication & System
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def test_connection(self) -> dict[str, Any]:
|
|
"""Test the connection to the Emby server.
|
|
|
|
Tries both /emby/System/Info and /System/Info endpoints.
|
|
Returns server info if successful.
|
|
"""
|
|
# Try with /emby prefix first (standard Emby)
|
|
try:
|
|
_LOGGER.debug("Trying connection with /emby prefix")
|
|
return await self._get(ENDPOINT_SYSTEM_INFO)
|
|
except (EmbyConnectionError, EmbyApiError) as err:
|
|
_LOGGER.debug("Connection with /emby prefix failed: %s", err)
|
|
|
|
# Try without /emby prefix (some Emby configurations)
|
|
try:
|
|
_LOGGER.debug("Trying connection without /emby prefix")
|
|
return await self._get("/System/Info")
|
|
except (EmbyConnectionError, EmbyApiError) as err:
|
|
_LOGGER.debug("Connection without /emby prefix failed: %s", err)
|
|
raise EmbyConnectionError(
|
|
f"Cannot connect to Emby server at {self._base_url}. "
|
|
"Please verify the host, port, and that the server is running."
|
|
) from err
|
|
|
|
async def get_server_info(self) -> dict[str, Any]:
|
|
"""Get server information."""
|
|
return await self._get(ENDPOINT_SYSTEM_INFO)
|
|
|
|
async def get_users(self) -> list[dict[str, Any]]:
|
|
"""Get list of users."""
|
|
return await self._get(ENDPOINT_USERS)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Sessions
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def get_sessions(self) -> list[dict[str, Any]]:
|
|
"""Get all active sessions."""
|
|
return await self._get(ENDPOINT_SESSIONS)
|
|
|
|
async def get_controllable_sessions(
|
|
self, user_id: str | None = None
|
|
) -> list[dict[str, Any]]:
|
|
"""Get sessions that can be remotely controlled."""
|
|
params = {}
|
|
if user_id:
|
|
params["ControllableByUserId"] = user_id
|
|
|
|
sessions = await self._get(ENDPOINT_SESSIONS, params=params)
|
|
return [s for s in sessions if s.get("SupportsRemoteControl")]
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Playback Control
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def play_media(
|
|
self,
|
|
session_id: str,
|
|
item_ids: list[str],
|
|
play_command: str = PLAY_COMMAND_PLAY_NOW,
|
|
start_position_ticks: int = 0,
|
|
) -> None:
|
|
"""Send play command to a session."""
|
|
endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Playing"
|
|
params = {
|
|
"ItemIds": ",".join(item_ids),
|
|
"PlayCommand": play_command,
|
|
}
|
|
if start_position_ticks > 0:
|
|
params["StartPositionTicks"] = start_position_ticks
|
|
|
|
_LOGGER.debug(
|
|
"Sending play_media: endpoint=%s, session_id=%s, item_ids=%s, command=%s",
|
|
endpoint,
|
|
session_id,
|
|
item_ids,
|
|
play_command,
|
|
)
|
|
await self._post(endpoint, params=params)
|
|
|
|
async def _playback_command(self, session_id: str, command: str) -> None:
|
|
"""Send a playback command to a session."""
|
|
endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Playing/{command}"
|
|
await self._post(endpoint)
|
|
|
|
async def play(self, session_id: str) -> None:
|
|
"""Resume playback."""
|
|
await self._playback_command(session_id, PLAYBACK_COMMAND_UNPAUSE)
|
|
|
|
async def pause(self, session_id: str) -> None:
|
|
"""Pause playback."""
|
|
await self._playback_command(session_id, PLAYBACK_COMMAND_PAUSE)
|
|
|
|
async def stop(self, session_id: str) -> None:
|
|
"""Stop playback."""
|
|
await self._playback_command(session_id, PLAYBACK_COMMAND_STOP)
|
|
|
|
async def next_track(self, session_id: str) -> None:
|
|
"""Skip to next track."""
|
|
await self._playback_command(session_id, PLAYBACK_COMMAND_NEXT_TRACK)
|
|
|
|
async def previous_track(self, session_id: str) -> None:
|
|
"""Skip to previous track."""
|
|
await self._playback_command(session_id, PLAYBACK_COMMAND_PREVIOUS_TRACK)
|
|
|
|
async def seek(self, session_id: str, position_ticks: int) -> None:
|
|
"""Seek to a position."""
|
|
endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Playing/{PLAYBACK_COMMAND_SEEK}"
|
|
await self._post(endpoint, params={"SeekPositionTicks": position_ticks})
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Volume Control
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def _send_command(
|
|
self, session_id: str, command: str, arguments: dict[str, Any] | None = None
|
|
) -> None:
|
|
"""Send a general command to a session."""
|
|
endpoint = f"{ENDPOINT_SESSIONS}/{session_id}/Command"
|
|
data: dict[str, Any] = {"Name": command}
|
|
if arguments:
|
|
# Emby expects arguments as strings
|
|
data["Arguments"] = {k: str(v) for k, v in arguments.items()}
|
|
await self._post(endpoint, data=data)
|
|
|
|
async def set_volume(self, session_id: str, volume: int) -> None:
|
|
"""Set volume level (0-100)."""
|
|
volume = max(0, min(100, volume))
|
|
await self._send_command(session_id, COMMAND_SET_VOLUME, {"Volume": volume})
|
|
|
|
async def mute(self, session_id: str) -> None:
|
|
"""Mute the session."""
|
|
await self._send_command(session_id, COMMAND_MUTE)
|
|
|
|
async def unmute(self, session_id: str) -> None:
|
|
"""Unmute the session."""
|
|
await self._send_command(session_id, COMMAND_UNMUTE)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Library Browsing
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def get_views(self, user_id: str) -> list[dict[str, Any]]:
|
|
"""Get user's library views (top-level folders)."""
|
|
endpoint = f"{ENDPOINT_USERS}/{user_id}/Views"
|
|
result = await self._get(endpoint)
|
|
return result.get("Items", [])
|
|
|
|
async def get_items(
|
|
self,
|
|
user_id: str,
|
|
parent_id: str | None = None,
|
|
include_item_types: list[str] | None = None,
|
|
recursive: bool = False,
|
|
sort_by: str = "SortName",
|
|
sort_order: str = "Ascending",
|
|
start_index: int = 0,
|
|
limit: int = 100,
|
|
search_term: str | None = None,
|
|
fields: list[str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Get items from the library."""
|
|
endpoint = f"{ENDPOINT_USERS}/{user_id}/Items"
|
|
|
|
params: dict[str, Any] = {
|
|
"SortBy": sort_by,
|
|
"SortOrder": sort_order,
|
|
"StartIndex": start_index,
|
|
"Limit": limit,
|
|
"Recursive": str(recursive).lower(),
|
|
}
|
|
|
|
if parent_id:
|
|
params["ParentId"] = parent_id
|
|
if include_item_types:
|
|
params["IncludeItemTypes"] = ",".join(include_item_types)
|
|
if search_term:
|
|
params["SearchTerm"] = search_term
|
|
if fields:
|
|
params["Fields"] = ",".join(fields)
|
|
else:
|
|
params["Fields"] = "PrimaryImageAspectRatio,BasicSyncInfo"
|
|
|
|
return await self._get(endpoint, params=params)
|
|
|
|
async def get_item(self, user_id: str, item_id: str) -> dict[str, Any]:
|
|
"""Get a single item by ID."""
|
|
endpoint = f"{ENDPOINT_USERS}/{user_id}/Items/{item_id}"
|
|
return await self._get(endpoint)
|
|
|
|
async def get_artists(
|
|
self,
|
|
user_id: str,
|
|
parent_id: str | None = None,
|
|
start_index: int = 0,
|
|
limit: int = 100,
|
|
) -> dict[str, Any]:
|
|
"""Get artists."""
|
|
endpoint = "/emby/Artists"
|
|
params: dict[str, Any] = {
|
|
"UserId": user_id,
|
|
"StartIndex": start_index,
|
|
"Limit": limit,
|
|
"SortBy": "SortName",
|
|
"SortOrder": "Ascending",
|
|
}
|
|
if parent_id:
|
|
params["ParentId"] = parent_id
|
|
|
|
return await self._get(endpoint, params=params)
|
|
|
|
def get_image_url(
|
|
self,
|
|
item_id: str,
|
|
image_type: str = "Primary",
|
|
max_width: int | None = None,
|
|
max_height: int | None = None,
|
|
) -> str:
|
|
"""Get the URL for an item's image."""
|
|
url = f"{self._base_url}{ENDPOINT_ITEMS}/{item_id}/Images/{image_type}"
|
|
params = []
|
|
if max_width:
|
|
params.append(f"maxWidth={max_width}")
|
|
if max_height:
|
|
params.append(f"maxHeight={max_height}")
|
|
params.append(f"api_key={self._api_key}")
|
|
|
|
if params:
|
|
url += "?" + "&".join(params)
|
|
|
|
return url
|