"""Media player platform for Emby Media Player integration.""" from __future__ import annotations import logging from datetime import datetime from typing import Any from homeassistant.components.media_player import ( BrowseMedia, MediaPlayerDeviceClass, MediaPlayerEntity, MediaPlayerEntityFeature, MediaPlayerState, MediaType, ) from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.util import slugify from homeassistant.util.dt import utcnow from . import EmbyConfigEntry, EmbyRuntimeData from .browse_media import async_browse_media from .const import ( ATTR_CLIENT_NAME, ATTR_DEVICE_ID, ATTR_DEVICE_NAME, ATTR_ITEM_ID, ATTR_ITEM_TYPE, ATTR_SESSION_ID, ATTR_USER_NAME, DOMAIN, ITEM_TYPE_AUDIO, ITEM_TYPE_EPISODE, ITEM_TYPE_MOVIE, MEDIA_TYPE_AUDIO, MEDIA_TYPE_VIDEO, TICKS_PER_SECOND, ) from .coordinator import EmbyCoordinator, EmbySession _LOGGER = logging.getLogger(__name__) # Supported features for Emby media player SUPPORTED_FEATURES = ( MediaPlayerEntityFeature.PAUSE | MediaPlayerEntityFeature.PLAY | MediaPlayerEntityFeature.STOP | MediaPlayerEntityFeature.SEEK | MediaPlayerEntityFeature.VOLUME_SET | MediaPlayerEntityFeature.VOLUME_MUTE | MediaPlayerEntityFeature.PREVIOUS_TRACK | MediaPlayerEntityFeature.NEXT_TRACK | MediaPlayerEntityFeature.PLAY_MEDIA | MediaPlayerEntityFeature.BROWSE_MEDIA ) async def async_setup_entry( hass: HomeAssistant, entry: EmbyConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: """Set up Emby media player entities.""" runtime_data: EmbyRuntimeData = entry.runtime_data coordinator = runtime_data.coordinator # Track which sessions we've already created entities for tracked_sessions: set[str] = set() @callback def async_update_entities() -> None: """Add new entities for new sessions.""" if coordinator.data is None: return current_sessions = set(coordinator.data.keys()) new_sessions = current_sessions - tracked_sessions if new_sessions: new_entities = [ EmbyMediaPlayer(coordinator, entry, session_id) for session_id in new_sessions ] async_add_entities(new_entities) tracked_sessions.update(new_sessions) _LOGGER.debug("Added %d new Emby media player entities", len(new_entities)) # Register listener for coordinator updates entry.async_on_unload(coordinator.async_add_listener(async_update_entities)) # Add entities for existing sessions async_update_entities() class EmbyMediaPlayer(CoordinatorEntity[EmbyCoordinator], MediaPlayerEntity): """Representation of an Emby media player.""" _attr_has_entity_name = True _attr_device_class = MediaPlayerDeviceClass.TV _attr_supported_features = SUPPORTED_FEATURES def __init__( self, coordinator: EmbyCoordinator, entry: ConfigEntry, session_id: str, ) -> None: """Initialize the Emby media player.""" super().__init__(coordinator) self._entry = entry self._session_id = session_id self._last_position_update: datetime | None = None # Get initial session info for naming session = self._session device_name = session.device_name if session else "Unknown" client_name = session.client_name if session else "Unknown" # Set unique ID and entity ID self._attr_unique_id = f"{entry.entry_id}_{session_id}" self._attr_name = f"{device_name} ({client_name})" @property def _session(self) -> EmbySession | None: """Get current session data.""" if self.coordinator.data is None: return None return self.coordinator.data.get(self._session_id) @property def _runtime_data(self) -> EmbyRuntimeData: """Get runtime data.""" return self._entry.runtime_data @property def available(self) -> bool: """Return True if entity is available.""" return self.coordinator.last_update_success and self._session is not None @property def device_info(self) -> DeviceInfo: """Return device info.""" session = self._session device_name = session.device_name if session else "Unknown Device" client_name = session.client_name if session else "Unknown" return DeviceInfo( identifiers={(DOMAIN, self._session_id)}, name=f"{device_name}", manufacturer="Emby", model=client_name, sw_version=session.app_version if session else None, entry_type=DeviceEntryType.SERVICE, ) @property def state(self) -> MediaPlayerState: """Return the state of the player.""" session = self._session if session is None: return MediaPlayerState.OFF if session.is_playing: return MediaPlayerState.PLAYING if session.is_paused: return MediaPlayerState.PAUSED return MediaPlayerState.IDLE @property def volume_level(self) -> float | None: """Return volume level (0.0-1.0).""" session = self._session if session and session.play_state: return session.play_state.volume_level / 100 return None @property def is_volume_muted(self) -> bool | None: """Return True if volume is muted.""" session = self._session if session and session.play_state: return session.play_state.is_muted return None @property def media_content_id(self) -> str | None: """Return the content ID of current playing media.""" session = self._session if session and session.now_playing: return session.now_playing.item_id return None @property def media_content_type(self) -> MediaType | str | None: """Return the content type of current playing media.""" session = self._session if session and session.now_playing: media_type = session.now_playing.media_type if media_type == MEDIA_TYPE_AUDIO: return MediaType.MUSIC if media_type == MEDIA_TYPE_VIDEO: item_type = session.now_playing.item_type if item_type == ITEM_TYPE_MOVIE: return MediaType.MOVIE if item_type == ITEM_TYPE_EPISODE: return MediaType.TVSHOW return MediaType.VIDEO return None @property def media_title(self) -> str | None: """Return the title of current playing media.""" session = self._session if session and session.now_playing: np = session.now_playing # For TV episodes, include series and episode info if np.item_type == ITEM_TYPE_EPISODE and np.series_name: season = f"S{np.parent_index_number:02d}" if np.parent_index_number else "" episode = f"E{np.index_number:02d}" if np.index_number else "" return f"{np.series_name} {season}{episode} - {np.name}" return np.name return None @property def media_artist(self) -> str | None: """Return the artist of current playing media.""" session = self._session if session and session.now_playing: return session.now_playing.artist return None @property def media_album_name(self) -> str | None: """Return the album name of current playing media.""" session = self._session if session and session.now_playing: return session.now_playing.album return None @property def media_album_artist(self) -> str | None: """Return the album artist of current playing media.""" session = self._session if session and session.now_playing: return session.now_playing.album_artist return None @property def media_series_title(self) -> str | None: """Return the series title for TV shows.""" session = self._session if session and session.now_playing: return session.now_playing.series_name return None @property def media_season(self) -> str | None: """Return the season for TV shows.""" session = self._session if session and session.now_playing and session.now_playing.parent_index_number: return str(session.now_playing.parent_index_number) return None @property def media_episode(self) -> str | None: """Return the episode for TV shows.""" session = self._session if session and session.now_playing and session.now_playing.index_number: return str(session.now_playing.index_number) return None @property def media_duration(self) -> int | None: """Return the duration of current playing media in seconds.""" session = self._session if session and session.now_playing: return int(session.now_playing.duration_seconds) return None @property def media_position(self) -> int | None: """Return the position of current playing media in seconds.""" session = self._session if session and session.play_state: return int(session.play_state.position_seconds) return None @property def media_position_updated_at(self) -> datetime | None: """Return when position was last updated.""" session = self._session if session and session.play_state and session.now_playing: return utcnow() return None @property def media_image_url(self) -> str | None: """Return the image URL of current playing media.""" session = self._session if session and session.now_playing: np = session.now_playing item_id = np.primary_image_item_id or np.item_id if item_id: return self._runtime_data.api.get_image_url( item_id, image_type="Primary", max_width=500, max_height=500, ) return None @property def extra_state_attributes(self) -> dict[str, Any]: """Return extra state attributes.""" session = self._session if session is None: return {} attrs = { ATTR_SESSION_ID: session.session_id, ATTR_DEVICE_ID: session.device_id, ATTR_DEVICE_NAME: session.device_name, ATTR_CLIENT_NAME: session.client_name, ATTR_USER_NAME: session.user_name, } if session.now_playing: attrs[ATTR_ITEM_ID] = session.now_playing.item_id attrs[ATTR_ITEM_TYPE] = session.now_playing.item_type return attrs # ------------------------------------------------------------------------- # Playback Control Methods # ------------------------------------------------------------------------- async def async_media_play(self) -> None: """Resume playback.""" await self._runtime_data.api.play(self._session_id) await self.coordinator.async_request_refresh() async def async_media_pause(self) -> None: """Pause playback.""" await self._runtime_data.api.pause(self._session_id) await self.coordinator.async_request_refresh() async def async_media_stop(self) -> None: """Stop playback.""" await self._runtime_data.api.stop(self._session_id) await self.coordinator.async_request_refresh() async def async_media_next_track(self) -> None: """Skip to next track.""" await self._runtime_data.api.next_track(self._session_id) await self.coordinator.async_request_refresh() async def async_media_previous_track(self) -> None: """Skip to previous track.""" await self._runtime_data.api.previous_track(self._session_id) await self.coordinator.async_request_refresh() async def async_media_seek(self, position: float) -> None: """Seek to position.""" position_ticks = int(position * TICKS_PER_SECOND) await self._runtime_data.api.seek(self._session_id, position_ticks) await self.coordinator.async_request_refresh() async def async_set_volume_level(self, volume: float) -> None: """Set volume level (0.0-1.0).""" volume_percent = int(volume * 100) await self._runtime_data.api.set_volume(self._session_id, volume_percent) await self.coordinator.async_request_refresh() async def async_mute_volume(self, mute: bool) -> None: """Mute or unmute.""" if mute: await self._runtime_data.api.mute(self._session_id) else: await self._runtime_data.api.unmute(self._session_id) await self.coordinator.async_request_refresh() # ------------------------------------------------------------------------- # Media Browsing & Playing # ------------------------------------------------------------------------- async def async_play_media( self, media_type: MediaType | str, media_id: str, **kwargs: Any, ) -> None: """Play a piece of media.""" await self._runtime_data.api.play_media( self._session_id, item_ids=[media_id], ) await self.coordinator.async_request_refresh() async def async_browse_media( self, media_content_type: MediaType | str | None = None, media_content_id: str | None = None, ) -> BrowseMedia: """Browse media.""" return await async_browse_media( self.hass, self._runtime_data.api, self._runtime_data.user_id, media_content_type, media_content_id, )