"""Data coordinator for Emby Media Player integration.""" from __future__ import annotations import logging from dataclasses import dataclass, field from datetime import timedelta from typing import Any from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .api import EmbyApiClient, EmbyApiError from .const import ( DOMAIN, TICKS_PER_SECOND, WS_MESSAGE_PLAYBACK_PROGRESS, WS_MESSAGE_PLAYBACK_START, WS_MESSAGE_PLAYBACK_STOP, WS_MESSAGE_SESSIONS, ) from .websocket import EmbyWebSocket _LOGGER = logging.getLogger(__name__) @dataclass class EmbyNowPlaying: """Currently playing media information.""" item_id: str name: str media_type: str # Audio, Video item_type: str # Movie, Episode, Audio, etc. artist: str | None = None album: str | None = None album_artist: str | None = None series_name: str | None = None season_name: str | None = None index_number: int | None = None # Episode number parent_index_number: int | None = None # Season number duration_ticks: int = 0 primary_image_tag: str | None = None primary_image_item_id: str | None = None backdrop_image_tags: list[str] = field(default_factory=list) genres: list[str] = field(default_factory=list) production_year: int | None = None overview: str | None = None @property def duration_seconds(self) -> float: """Get duration in seconds.""" return self.duration_ticks / TICKS_PER_SECOND if self.duration_ticks else 0 @dataclass class EmbyPlayState: """Playback state information.""" is_paused: bool = False is_muted: bool = False volume_level: int = 100 # 0-100 position_ticks: int = 0 can_seek: bool = True repeat_mode: str = "RepeatNone" shuffle_mode: str = "Sorted" play_method: str | None = None # DirectPlay, DirectStream, Transcode @property def position_seconds(self) -> float: """Get position in seconds.""" return self.position_ticks / TICKS_PER_SECOND if self.position_ticks else 0 @dataclass class EmbySession: """Represents an Emby client session.""" session_id: str device_id: str device_name: str client_name: str app_version: str | None = None user_id: str | None = None user_name: str | None = None supports_remote_control: bool = True now_playing: EmbyNowPlaying | None = None play_state: EmbyPlayState | None = None playable_media_types: list[str] = field(default_factory=list) supported_commands: list[str] = field(default_factory=list) @property def is_playing(self) -> bool: """Return True if media is currently playing (not paused).""" return ( self.now_playing is not None and self.play_state is not None and not self.play_state.is_paused ) @property def is_paused(self) -> bool: """Return True if media is paused.""" return ( self.now_playing is not None and self.play_state is not None and self.play_state.is_paused ) @property def is_idle(self) -> bool: """Return True if session is idle (no media playing).""" return self.now_playing is None class EmbyCoordinator(DataUpdateCoordinator[dict[str, EmbySession]]): """Coordinator for Emby data with WebSocket + polling fallback.""" def __init__( self, hass: HomeAssistant, api: EmbyApiClient, websocket: EmbyWebSocket, scan_interval: int, ) -> None: """Initialize the coordinator.""" super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=scan_interval), ) self.api = api self._websocket = websocket self._ws_connected = False self._remove_ws_callback: callable | None = None async def async_setup(self) -> None: """Set up the coordinator with WebSocket connection.""" # Try to establish WebSocket connection if await self._websocket.connect(): await self._websocket.subscribe_to_sessions() self._remove_ws_callback = self._websocket.add_callback( self._handle_ws_message ) self._ws_connected = True _LOGGER.info("Emby WebSocket connected, using real-time updates") else: _LOGGER.warning( "Emby WebSocket connection failed, using polling fallback" ) @callback def _handle_ws_message(self, message_type: str, data: Any) -> None: """Handle incoming WebSocket message.""" _LOGGER.debug("Handling WebSocket message: %s", message_type) if message_type == WS_MESSAGE_SESSIONS: # Full session list received if isinstance(data, list): sessions = self._parse_sessions(data) self.async_set_updated_data(sessions) elif message_type in ( WS_MESSAGE_PLAYBACK_START, WS_MESSAGE_PLAYBACK_STOP, WS_MESSAGE_PLAYBACK_PROGRESS, ): # Individual session update - trigger a refresh to get full state # We could optimize this by updating only the affected session, # but a full refresh ensures consistency self.hass.async_create_task(self.async_request_refresh()) async def _async_update_data(self) -> dict[str, EmbySession]: """Fetch sessions from Emby API (polling fallback).""" try: sessions_data = await self.api.get_sessions() return self._parse_sessions(sessions_data) except EmbyApiError as err: raise UpdateFailed(f"Error fetching Emby sessions: {err}") from err def _parse_sessions(self, sessions_data: list[dict[str, Any]]) -> dict[str, EmbySession]: """Parse session data into EmbySession objects.""" sessions: dict[str, EmbySession] = {} for session_data in sessions_data: # Only include sessions that support remote control if not session_data.get("SupportsRemoteControl", False): continue session_id = session_data.get("Id") if not session_id: continue # Parse now playing item now_playing = None now_playing_data = session_data.get("NowPlayingItem") if now_playing_data: now_playing = self._parse_now_playing(now_playing_data) # Parse play state play_state = None play_state_data = session_data.get("PlayState") if play_state_data: play_state = self._parse_play_state(play_state_data) session = EmbySession( session_id=session_id, device_id=session_data.get("DeviceId", ""), device_name=session_data.get("DeviceName", "Unknown Device"), client_name=session_data.get("Client", "Unknown Client"), app_version=session_data.get("ApplicationVersion"), user_id=session_data.get("UserId"), user_name=session_data.get("UserName"), supports_remote_control=session_data.get("SupportsRemoteControl", True), now_playing=now_playing, play_state=play_state, playable_media_types=session_data.get("PlayableMediaTypes", []), supported_commands=session_data.get("SupportedCommands", []), ) sessions[session_id] = session return sessions def _parse_now_playing(self, data: dict[str, Any]) -> EmbyNowPlaying: """Parse now playing item data.""" # Get artists as string artists = data.get("Artists", []) artist = ", ".join(artists) if artists else data.get("AlbumArtist") # Get the image item ID (for series/seasons, might be different from item ID) image_item_id = data.get("Id") if data.get("SeriesId"): image_item_id = data.get("SeriesId") elif data.get("ParentId") and data.get("Type") == "Audio": image_item_id = data.get("ParentId") # Use album ID for music return EmbyNowPlaying( item_id=data.get("Id", ""), name=data.get("Name", ""), media_type=data.get("MediaType", ""), item_type=data.get("Type", ""), artist=artist, album=data.get("Album"), album_artist=data.get("AlbumArtist"), series_name=data.get("SeriesName"), season_name=data.get("SeasonName"), index_number=data.get("IndexNumber"), parent_index_number=data.get("ParentIndexNumber"), duration_ticks=data.get("RunTimeTicks", 0), primary_image_tag=data.get("PrimaryImageTag"), primary_image_item_id=image_item_id, backdrop_image_tags=data.get("BackdropImageTags", []), genres=data.get("Genres", []), production_year=data.get("ProductionYear"), overview=data.get("Overview"), ) def _parse_play_state(self, data: dict[str, Any]) -> EmbyPlayState: """Parse play state data.""" return EmbyPlayState( is_paused=data.get("IsPaused", False), is_muted=data.get("IsMuted", False), volume_level=data.get("VolumeLevel", 100), position_ticks=data.get("PositionTicks", 0), can_seek=data.get("CanSeek", True), repeat_mode=data.get("RepeatMode", "RepeatNone"), shuffle_mode=data.get("ShuffleMode", "Sorted"), play_method=data.get("PlayMethod"), ) def update_scan_interval(self, interval: int) -> None: """Update the polling scan interval.""" self.update_interval = timedelta(seconds=interval) _LOGGER.debug("Updated scan interval to %d seconds", interval) async def async_shutdown(self) -> None: """Shut down the coordinator.""" if self._remove_ws_callback: self._remove_ws_callback() await self._websocket.close()