245 lines
7.6 KiB
Python
245 lines
7.6 KiB
Python
"""Emby WebSocket client for real-time updates."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from collections.abc import Callable
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from .const import (
|
|
DEVICE_ID,
|
|
DEVICE_NAME,
|
|
DEVICE_VERSION,
|
|
WEBSOCKET_PATH,
|
|
WS_MESSAGE_PLAYBACK_PROGRESS,
|
|
WS_MESSAGE_PLAYBACK_START,
|
|
WS_MESSAGE_PLAYBACK_STOP,
|
|
WS_MESSAGE_SESSIONS,
|
|
WS_MESSAGE_SESSIONS_START,
|
|
WS_MESSAGE_SESSIONS_STOP,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
# Message types we're interested in
|
|
TRACKED_MESSAGE_TYPES = {
|
|
WS_MESSAGE_SESSIONS,
|
|
WS_MESSAGE_PLAYBACK_START,
|
|
WS_MESSAGE_PLAYBACK_STOP,
|
|
WS_MESSAGE_PLAYBACK_PROGRESS,
|
|
}
|
|
|
|
|
|
class EmbyWebSocket:
|
|
"""WebSocket client for real-time Emby updates."""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
port: int,
|
|
api_key: str,
|
|
ssl: bool = False,
|
|
session: aiohttp.ClientSession | None = None,
|
|
) -> None:
|
|
"""Initialize the WebSocket client."""
|
|
self._host = host
|
|
self._port = port
|
|
self._api_key = api_key
|
|
self._ssl = ssl
|
|
self._session = session
|
|
self._owns_session = session is None
|
|
|
|
protocol = "wss" if ssl else "ws"
|
|
self._url = f"{protocol}://{host}:{port}{WEBSOCKET_PATH}"
|
|
|
|
self._ws: aiohttp.ClientWebSocketResponse | None = None
|
|
self._callbacks: list[Callable[[str, Any], None]] = []
|
|
self._listen_task: asyncio.Task | None = None
|
|
self._running = False
|
|
self._reconnect_interval = 30 # seconds
|
|
|
|
@property
|
|
def connected(self) -> bool:
|
|
"""Return True if connected to WebSocket."""
|
|
return self._ws is not None and not self._ws.closed
|
|
|
|
async def _ensure_session(self) -> aiohttp.ClientSession:
|
|
"""Ensure an aiohttp session exists."""
|
|
if self._session is None or self._session.closed:
|
|
self._session = aiohttp.ClientSession()
|
|
self._owns_session = True
|
|
return self._session
|
|
|
|
async def connect(self) -> bool:
|
|
"""Connect to Emby WebSocket."""
|
|
if self.connected:
|
|
return True
|
|
|
|
session = await self._ensure_session()
|
|
|
|
# Build WebSocket URL with authentication params
|
|
params = {
|
|
"api_key": self._api_key,
|
|
"deviceId": DEVICE_ID,
|
|
}
|
|
|
|
try:
|
|
self._ws = await session.ws_connect(
|
|
self._url,
|
|
params=params,
|
|
heartbeat=30,
|
|
timeout=aiohttp.ClientTimeout(total=10),
|
|
)
|
|
self._running = True
|
|
_LOGGER.debug("Connected to Emby WebSocket at %s", self._url)
|
|
|
|
# Start listening for messages
|
|
self._listen_task = asyncio.create_task(self._listen())
|
|
|
|
return True
|
|
|
|
except aiohttp.ClientError as err:
|
|
_LOGGER.warning("Failed to connect to Emby WebSocket: %s", err)
|
|
return False
|
|
except Exception as err:
|
|
_LOGGER.exception("Unexpected error connecting to WebSocket: %s", err)
|
|
return False
|
|
|
|
async def _listen(self) -> None:
|
|
"""Listen for WebSocket messages."""
|
|
if not self._ws:
|
|
return
|
|
|
|
try:
|
|
async for msg in self._ws:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
try:
|
|
data = json.loads(msg.data)
|
|
await self._handle_message(data)
|
|
except json.JSONDecodeError:
|
|
_LOGGER.warning("Invalid JSON received: %s", msg.data)
|
|
|
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
|
_LOGGER.error(
|
|
"WebSocket error: %s", self._ws.exception() if self._ws else "Unknown"
|
|
)
|
|
break
|
|
|
|
elif msg.type in (
|
|
aiohttp.WSMsgType.CLOSE,
|
|
aiohttp.WSMsgType.CLOSED,
|
|
aiohttp.WSMsgType.CLOSING,
|
|
):
|
|
_LOGGER.debug("WebSocket connection closed")
|
|
break
|
|
|
|
except asyncio.CancelledError:
|
|
_LOGGER.debug("WebSocket listener cancelled")
|
|
except Exception as err:
|
|
_LOGGER.exception("Error in WebSocket listener: %s", err)
|
|
finally:
|
|
self._ws = None
|
|
|
|
# Attempt reconnection if still running
|
|
if self._running:
|
|
_LOGGER.info(
|
|
"WebSocket disconnected, will reconnect in %d seconds",
|
|
self._reconnect_interval,
|
|
)
|
|
asyncio.create_task(self._reconnect())
|
|
|
|
async def _reconnect(self) -> None:
|
|
"""Attempt to reconnect to WebSocket."""
|
|
await asyncio.sleep(self._reconnect_interval)
|
|
|
|
if self._running and not self.connected:
|
|
_LOGGER.debug("Attempting WebSocket reconnection...")
|
|
if await self.connect():
|
|
await self.subscribe_to_sessions()
|
|
|
|
async def _handle_message(self, message: dict[str, Any]) -> None:
|
|
"""Handle an incoming WebSocket message."""
|
|
msg_type = message.get("MessageType", "")
|
|
data = message.get("Data")
|
|
|
|
_LOGGER.debug("Received WebSocket message: %s", msg_type)
|
|
|
|
if msg_type in TRACKED_MESSAGE_TYPES:
|
|
# Notify all callbacks
|
|
for callback in self._callbacks:
|
|
try:
|
|
callback(msg_type, data)
|
|
except Exception:
|
|
_LOGGER.exception("Error in WebSocket callback")
|
|
|
|
async def subscribe_to_sessions(self) -> None:
|
|
"""Subscribe to session updates."""
|
|
if not self.connected:
|
|
_LOGGER.warning("Cannot subscribe: WebSocket not connected")
|
|
return
|
|
|
|
# Request session updates every 1500ms
|
|
await self._send_message(WS_MESSAGE_SESSIONS_START, "0,1500")
|
|
_LOGGER.debug("Subscribed to session updates")
|
|
|
|
async def unsubscribe_from_sessions(self) -> None:
|
|
"""Unsubscribe from session updates."""
|
|
if self.connected:
|
|
await self._send_message(WS_MESSAGE_SESSIONS_STOP, "")
|
|
|
|
async def _send_message(self, message_type: str, data: Any) -> None:
|
|
"""Send a message through the WebSocket."""
|
|
if not self._ws or self._ws.closed:
|
|
return
|
|
|
|
message = {
|
|
"MessageType": message_type,
|
|
"Data": data,
|
|
}
|
|
|
|
try:
|
|
await self._ws.send_json(message)
|
|
except Exception as err:
|
|
_LOGGER.warning("Failed to send WebSocket message: %s", err)
|
|
|
|
def add_callback(self, callback: Callable[[str, Any], None]) -> Callable[[], None]:
|
|
"""Add a callback for WebSocket messages.
|
|
|
|
Returns a function to remove the callback.
|
|
"""
|
|
self._callbacks.append(callback)
|
|
|
|
def remove_callback() -> None:
|
|
if callback in self._callbacks:
|
|
self._callbacks.remove(callback)
|
|
|
|
return remove_callback
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from WebSocket."""
|
|
self._running = False
|
|
|
|
if self._listen_task and not self._listen_task.done():
|
|
self._listen_task.cancel()
|
|
try:
|
|
await self._listen_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
if self._ws and not self._ws.closed:
|
|
await self._ws.close()
|
|
|
|
self._ws = None
|
|
_LOGGER.debug("Disconnected from Emby WebSocket")
|
|
|
|
async def close(self) -> None:
|
|
"""Close the WebSocket and session."""
|
|
await self.disconnect()
|
|
|
|
if self._owns_session and self._session and not self._session.closed:
|
|
await self._session.close()
|