Files
haos-hacs-integration-media…/custom_components/emby_player/coordinator.py
alexei.dolgolyov 5b2c653104
All checks were successful
Validate / Hassfest (push) Successful in 3s
Revert "Reduce unnecessary state refreshes to prevent UI dialog closure"
This reverts commit 8419b0de8c.
2026-02-03 05:20:02 +03:00

284 lines
10 KiB
Python

"""Data coordinator for Emby Media Player integration."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Any
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .api import EmbyApiClient, EmbyApiError
from .const import (
DOMAIN,
TICKS_PER_SECOND,
WS_MESSAGE_PLAYBACK_PROGRESS,
WS_MESSAGE_PLAYBACK_START,
WS_MESSAGE_PLAYBACK_STOP,
WS_MESSAGE_SESSIONS,
)
from .websocket import EmbyWebSocket
_LOGGER = logging.getLogger(__name__)
@dataclass
class EmbyNowPlaying:
"""Currently playing media information."""
item_id: str
name: str
media_type: str # Audio, Video
item_type: str # Movie, Episode, Audio, etc.
artist: str | None = None
album: str | None = None
album_artist: str | None = None
series_name: str | None = None
season_name: str | None = None
index_number: int | None = None # Episode number
parent_index_number: int | None = None # Season number
duration_ticks: int = 0
primary_image_tag: str | None = None
primary_image_item_id: str | None = None
backdrop_image_tags: list[str] = field(default_factory=list)
genres: list[str] = field(default_factory=list)
production_year: int | None = None
overview: str | None = None
@property
def duration_seconds(self) -> float:
"""Get duration in seconds."""
return self.duration_ticks / TICKS_PER_SECOND if self.duration_ticks else 0
@dataclass
class EmbyPlayState:
"""Playback state information."""
is_paused: bool = False
is_muted: bool = False
volume_level: int = 100 # 0-100
position_ticks: int = 0
can_seek: bool = True
repeat_mode: str = "RepeatNone"
shuffle_mode: str = "Sorted"
play_method: str | None = None # DirectPlay, DirectStream, Transcode
@property
def position_seconds(self) -> float:
"""Get position in seconds."""
return self.position_ticks / TICKS_PER_SECOND if self.position_ticks else 0
@dataclass
class EmbySession:
"""Represents an Emby client session."""
session_id: str
device_id: str
device_name: str
client_name: str
app_version: str | None = None
user_id: str | None = None
user_name: str | None = None
supports_remote_control: bool = True
now_playing: EmbyNowPlaying | None = None
play_state: EmbyPlayState | None = None
playable_media_types: list[str] = field(default_factory=list)
supported_commands: list[str] = field(default_factory=list)
@property
def is_playing(self) -> bool:
"""Return True if media is currently playing (not paused)."""
return (
self.now_playing is not None
and self.play_state is not None
and not self.play_state.is_paused
)
@property
def is_paused(self) -> bool:
"""Return True if media is paused."""
return (
self.now_playing is not None
and self.play_state is not None
and self.play_state.is_paused
)
@property
def is_idle(self) -> bool:
"""Return True if session is idle (no media playing)."""
return self.now_playing is None
class EmbyCoordinator(DataUpdateCoordinator[dict[str, EmbySession]]):
"""Coordinator for Emby data with WebSocket + polling fallback."""
def __init__(
self,
hass: HomeAssistant,
api: EmbyApiClient,
websocket: EmbyWebSocket,
scan_interval: int,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=scan_interval),
)
self.api = api
self._websocket = websocket
self._ws_connected = False
self._remove_ws_callback: callable | None = None
async def async_setup(self) -> None:
"""Set up the coordinator with WebSocket connection."""
# Try to establish WebSocket connection
if await self._websocket.connect():
await self._websocket.subscribe_to_sessions()
self._remove_ws_callback = self._websocket.add_callback(
self._handle_ws_message
)
self._ws_connected = True
_LOGGER.info("Emby WebSocket connected, using real-time updates")
else:
_LOGGER.warning(
"Emby WebSocket connection failed, using polling fallback"
)
@callback
def _handle_ws_message(self, message_type: str, data: Any) -> None:
"""Handle incoming WebSocket message."""
_LOGGER.debug("Handling WebSocket message: %s", message_type)
if message_type == WS_MESSAGE_SESSIONS:
# Full session list received
if isinstance(data, list):
sessions = self._parse_sessions(data)
self.async_set_updated_data(sessions)
elif message_type in (
WS_MESSAGE_PLAYBACK_START,
WS_MESSAGE_PLAYBACK_STOP,
WS_MESSAGE_PLAYBACK_PROGRESS,
):
# Individual session update - trigger a refresh to get full state
# We could optimize this by updating only the affected session,
# but a full refresh ensures consistency
self.hass.async_create_task(self.async_request_refresh())
async def _async_update_data(self) -> dict[str, EmbySession]:
"""Fetch sessions from Emby API (polling fallback)."""
try:
sessions_data = await self.api.get_sessions()
return self._parse_sessions(sessions_data)
except EmbyApiError as err:
raise UpdateFailed(f"Error fetching Emby sessions: {err}") from err
def _parse_sessions(self, sessions_data: list[dict[str, Any]]) -> dict[str, EmbySession]:
"""Parse session data into EmbySession objects."""
sessions: dict[str, EmbySession] = {}
for session_data in sessions_data:
# Only include sessions that support remote control
if not session_data.get("SupportsRemoteControl", False):
continue
session_id = session_data.get("Id")
if not session_id:
continue
# Parse now playing item
now_playing = None
now_playing_data = session_data.get("NowPlayingItem")
if now_playing_data:
now_playing = self._parse_now_playing(now_playing_data)
# Parse play state
play_state = None
play_state_data = session_data.get("PlayState")
if play_state_data:
play_state = self._parse_play_state(play_state_data)
session = EmbySession(
session_id=session_id,
device_id=session_data.get("DeviceId", ""),
device_name=session_data.get("DeviceName", "Unknown Device"),
client_name=session_data.get("Client", "Unknown Client"),
app_version=session_data.get("ApplicationVersion"),
user_id=session_data.get("UserId"),
user_name=session_data.get("UserName"),
supports_remote_control=session_data.get("SupportsRemoteControl", True),
now_playing=now_playing,
play_state=play_state,
playable_media_types=session_data.get("PlayableMediaTypes", []),
supported_commands=session_data.get("SupportedCommands", []),
)
sessions[session_id] = session
return sessions
def _parse_now_playing(self, data: dict[str, Any]) -> EmbyNowPlaying:
"""Parse now playing item data."""
# Get artists as string
artists = data.get("Artists", [])
artist = ", ".join(artists) if artists else data.get("AlbumArtist")
# Get the image item ID (for series/seasons, might be different from item ID)
image_item_id = data.get("Id")
if data.get("SeriesId"):
image_item_id = data.get("SeriesId")
elif data.get("ParentId") and data.get("Type") == "Audio":
image_item_id = data.get("ParentId") # Use album ID for music
return EmbyNowPlaying(
item_id=data.get("Id", ""),
name=data.get("Name", ""),
media_type=data.get("MediaType", ""),
item_type=data.get("Type", ""),
artist=artist,
album=data.get("Album"),
album_artist=data.get("AlbumArtist"),
series_name=data.get("SeriesName"),
season_name=data.get("SeasonName"),
index_number=data.get("IndexNumber"),
parent_index_number=data.get("ParentIndexNumber"),
duration_ticks=data.get("RunTimeTicks", 0),
primary_image_tag=data.get("PrimaryImageTag"),
primary_image_item_id=image_item_id,
backdrop_image_tags=data.get("BackdropImageTags", []),
genres=data.get("Genres", []),
production_year=data.get("ProductionYear"),
overview=data.get("Overview"),
)
def _parse_play_state(self, data: dict[str, Any]) -> EmbyPlayState:
"""Parse play state data."""
return EmbyPlayState(
is_paused=data.get("IsPaused", False),
is_muted=data.get("IsMuted", False),
volume_level=data.get("VolumeLevel", 100),
position_ticks=data.get("PositionTicks", 0),
can_seek=data.get("CanSeek", True),
repeat_mode=data.get("RepeatMode", "RepeatNone"),
shuffle_mode=data.get("ShuffleMode", "Sorted"),
play_method=data.get("PlayMethod"),
)
def update_scan_interval(self, interval: int) -> None:
"""Update the polling scan interval."""
self.update_interval = timedelta(seconds=interval)
_LOGGER.debug("Updated scan interval to %d seconds", interval)
async def async_shutdown(self) -> None:
"""Shut down the coordinator."""
if self._remove_ws_callback:
self._remove_ws_callback()
await self._websocket.close()