284 lines
10 KiB
Python
284 lines
10 KiB
Python
"""Data coordinator for Emby Media Player integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .api import EmbyApiClient, EmbyApiError
|
|
from .const import (
|
|
DOMAIN,
|
|
TICKS_PER_SECOND,
|
|
WS_MESSAGE_PLAYBACK_PROGRESS,
|
|
WS_MESSAGE_PLAYBACK_START,
|
|
WS_MESSAGE_PLAYBACK_STOP,
|
|
WS_MESSAGE_SESSIONS,
|
|
)
|
|
from .websocket import EmbyWebSocket
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class EmbyNowPlaying:
|
|
"""Currently playing media information."""
|
|
|
|
item_id: str
|
|
name: str
|
|
media_type: str # Audio, Video
|
|
item_type: str # Movie, Episode, Audio, etc.
|
|
artist: str | None = None
|
|
album: str | None = None
|
|
album_artist: str | None = None
|
|
series_name: str | None = None
|
|
season_name: str | None = None
|
|
index_number: int | None = None # Episode number
|
|
parent_index_number: int | None = None # Season number
|
|
duration_ticks: int = 0
|
|
primary_image_tag: str | None = None
|
|
primary_image_item_id: str | None = None
|
|
backdrop_image_tags: list[str] = field(default_factory=list)
|
|
genres: list[str] = field(default_factory=list)
|
|
production_year: int | None = None
|
|
overview: str | None = None
|
|
|
|
@property
|
|
def duration_seconds(self) -> float:
|
|
"""Get duration in seconds."""
|
|
return self.duration_ticks / TICKS_PER_SECOND if self.duration_ticks else 0
|
|
|
|
|
|
@dataclass
|
|
class EmbyPlayState:
|
|
"""Playback state information."""
|
|
|
|
is_paused: bool = False
|
|
is_muted: bool = False
|
|
volume_level: int = 100 # 0-100
|
|
position_ticks: int = 0
|
|
can_seek: bool = True
|
|
repeat_mode: str = "RepeatNone"
|
|
shuffle_mode: str = "Sorted"
|
|
play_method: str | None = None # DirectPlay, DirectStream, Transcode
|
|
|
|
@property
|
|
def position_seconds(self) -> float:
|
|
"""Get position in seconds."""
|
|
return self.position_ticks / TICKS_PER_SECOND if self.position_ticks else 0
|
|
|
|
|
|
@dataclass
|
|
class EmbySession:
|
|
"""Represents an Emby client session."""
|
|
|
|
session_id: str
|
|
device_id: str
|
|
device_name: str
|
|
client_name: str
|
|
app_version: str | None = None
|
|
user_id: str | None = None
|
|
user_name: str | None = None
|
|
supports_remote_control: bool = True
|
|
now_playing: EmbyNowPlaying | None = None
|
|
play_state: EmbyPlayState | None = None
|
|
playable_media_types: list[str] = field(default_factory=list)
|
|
supported_commands: list[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def is_playing(self) -> bool:
|
|
"""Return True if media is currently playing (not paused)."""
|
|
return (
|
|
self.now_playing is not None
|
|
and self.play_state is not None
|
|
and not self.play_state.is_paused
|
|
)
|
|
|
|
@property
|
|
def is_paused(self) -> bool:
|
|
"""Return True if media is paused."""
|
|
return (
|
|
self.now_playing is not None
|
|
and self.play_state is not None
|
|
and self.play_state.is_paused
|
|
)
|
|
|
|
@property
|
|
def is_idle(self) -> bool:
|
|
"""Return True if session is idle (no media playing)."""
|
|
return self.now_playing is None
|
|
|
|
|
|
class EmbyCoordinator(DataUpdateCoordinator[dict[str, EmbySession]]):
|
|
"""Coordinator for Emby data with WebSocket + polling fallback."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
api: EmbyApiClient,
|
|
websocket: EmbyWebSocket,
|
|
scan_interval: int,
|
|
) -> None:
|
|
"""Initialize the coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=timedelta(seconds=scan_interval),
|
|
)
|
|
self.api = api
|
|
self._websocket = websocket
|
|
self._ws_connected = False
|
|
self._remove_ws_callback: callable | None = None
|
|
|
|
async def async_setup(self) -> None:
|
|
"""Set up the coordinator with WebSocket connection."""
|
|
# Try to establish WebSocket connection
|
|
if await self._websocket.connect():
|
|
await self._websocket.subscribe_to_sessions()
|
|
self._remove_ws_callback = self._websocket.add_callback(
|
|
self._handle_ws_message
|
|
)
|
|
self._ws_connected = True
|
|
_LOGGER.info("Emby WebSocket connected, using real-time updates")
|
|
else:
|
|
_LOGGER.warning(
|
|
"Emby WebSocket connection failed, using polling fallback"
|
|
)
|
|
|
|
@callback
|
|
def _handle_ws_message(self, message_type: str, data: Any) -> None:
|
|
"""Handle incoming WebSocket message."""
|
|
_LOGGER.debug("Handling WebSocket message: %s", message_type)
|
|
|
|
if message_type == WS_MESSAGE_SESSIONS:
|
|
# Full session list received
|
|
if isinstance(data, list):
|
|
sessions = self._parse_sessions(data)
|
|
self.async_set_updated_data(sessions)
|
|
|
|
elif message_type in (
|
|
WS_MESSAGE_PLAYBACK_START,
|
|
WS_MESSAGE_PLAYBACK_STOP,
|
|
WS_MESSAGE_PLAYBACK_PROGRESS,
|
|
):
|
|
# Individual session update - trigger a refresh to get full state
|
|
# We could optimize this by updating only the affected session,
|
|
# but a full refresh ensures consistency
|
|
self.hass.async_create_task(self.async_request_refresh())
|
|
|
|
async def _async_update_data(self) -> dict[str, EmbySession]:
|
|
"""Fetch sessions from Emby API (polling fallback)."""
|
|
try:
|
|
sessions_data = await self.api.get_sessions()
|
|
return self._parse_sessions(sessions_data)
|
|
except EmbyApiError as err:
|
|
raise UpdateFailed(f"Error fetching Emby sessions: {err}") from err
|
|
|
|
def _parse_sessions(self, sessions_data: list[dict[str, Any]]) -> dict[str, EmbySession]:
|
|
"""Parse session data into EmbySession objects."""
|
|
sessions: dict[str, EmbySession] = {}
|
|
|
|
for session_data in sessions_data:
|
|
# Only include sessions that support remote control
|
|
if not session_data.get("SupportsRemoteControl", False):
|
|
continue
|
|
|
|
session_id = session_data.get("Id")
|
|
if not session_id:
|
|
continue
|
|
|
|
# Parse now playing item
|
|
now_playing = None
|
|
now_playing_data = session_data.get("NowPlayingItem")
|
|
if now_playing_data:
|
|
now_playing = self._parse_now_playing(now_playing_data)
|
|
|
|
# Parse play state
|
|
play_state = None
|
|
play_state_data = session_data.get("PlayState")
|
|
if play_state_data:
|
|
play_state = self._parse_play_state(play_state_data)
|
|
|
|
session = EmbySession(
|
|
session_id=session_id,
|
|
device_id=session_data.get("DeviceId", ""),
|
|
device_name=session_data.get("DeviceName", "Unknown Device"),
|
|
client_name=session_data.get("Client", "Unknown Client"),
|
|
app_version=session_data.get("ApplicationVersion"),
|
|
user_id=session_data.get("UserId"),
|
|
user_name=session_data.get("UserName"),
|
|
supports_remote_control=session_data.get("SupportsRemoteControl", True),
|
|
now_playing=now_playing,
|
|
play_state=play_state,
|
|
playable_media_types=session_data.get("PlayableMediaTypes", []),
|
|
supported_commands=session_data.get("SupportedCommands", []),
|
|
)
|
|
|
|
sessions[session_id] = session
|
|
|
|
return sessions
|
|
|
|
def _parse_now_playing(self, data: dict[str, Any]) -> EmbyNowPlaying:
|
|
"""Parse now playing item data."""
|
|
# Get artists as string
|
|
artists = data.get("Artists", [])
|
|
artist = ", ".join(artists) if artists else data.get("AlbumArtist")
|
|
|
|
# Get the image item ID (for series/seasons, might be different from item ID)
|
|
image_item_id = data.get("Id")
|
|
if data.get("SeriesId"):
|
|
image_item_id = data.get("SeriesId")
|
|
elif data.get("ParentId") and data.get("Type") == "Audio":
|
|
image_item_id = data.get("ParentId") # Use album ID for music
|
|
|
|
return EmbyNowPlaying(
|
|
item_id=data.get("Id", ""),
|
|
name=data.get("Name", ""),
|
|
media_type=data.get("MediaType", ""),
|
|
item_type=data.get("Type", ""),
|
|
artist=artist,
|
|
album=data.get("Album"),
|
|
album_artist=data.get("AlbumArtist"),
|
|
series_name=data.get("SeriesName"),
|
|
season_name=data.get("SeasonName"),
|
|
index_number=data.get("IndexNumber"),
|
|
parent_index_number=data.get("ParentIndexNumber"),
|
|
duration_ticks=data.get("RunTimeTicks", 0),
|
|
primary_image_tag=data.get("PrimaryImageTag"),
|
|
primary_image_item_id=image_item_id,
|
|
backdrop_image_tags=data.get("BackdropImageTags", []),
|
|
genres=data.get("Genres", []),
|
|
production_year=data.get("ProductionYear"),
|
|
overview=data.get("Overview"),
|
|
)
|
|
|
|
def _parse_play_state(self, data: dict[str, Any]) -> EmbyPlayState:
|
|
"""Parse play state data."""
|
|
return EmbyPlayState(
|
|
is_paused=data.get("IsPaused", False),
|
|
is_muted=data.get("IsMuted", False),
|
|
volume_level=data.get("VolumeLevel", 100),
|
|
position_ticks=data.get("PositionTicks", 0),
|
|
can_seek=data.get("CanSeek", True),
|
|
repeat_mode=data.get("RepeatMode", "RepeatNone"),
|
|
shuffle_mode=data.get("ShuffleMode", "Sorted"),
|
|
play_method=data.get("PlayMethod"),
|
|
)
|
|
|
|
def update_scan_interval(self, interval: int) -> None:
|
|
"""Update the polling scan interval."""
|
|
self.update_interval = timedelta(seconds=interval)
|
|
_LOGGER.debug("Updated scan interval to %d seconds", interval)
|
|
|
|
async def async_shutdown(self) -> None:
|
|
"""Shut down the coordinator."""
|
|
if self._remove_ws_callback:
|
|
self._remove_ws_callback()
|
|
|
|
await self._websocket.close()
|