"""Media browser for Emby Media Player integration.""" from __future__ import annotations import logging from typing import Any from homeassistant.components.media_player import ( BrowseError, BrowseMedia, MediaClass, MediaType, ) from homeassistant.core import HomeAssistant from .api import EmbyApiClient, EmbyApiError from .const import ( ITEM_TYPE_AUDIO, ITEM_TYPE_COLLECTION_FOLDER, ITEM_TYPE_EPISODE, ITEM_TYPE_FOLDER, ITEM_TYPE_MOVIE, ITEM_TYPE_MUSIC_ALBUM, ITEM_TYPE_MUSIC_ARTIST, ITEM_TYPE_PLAYLIST, ITEM_TYPE_SEASON, ITEM_TYPE_SERIES, ITEM_TYPE_USER_VIEW, ) _LOGGER = logging.getLogger(__name__) # Default page size when no pagination params provided DEFAULT_PAGE_SIZE = 100 # Maximum items per browse call (HA UI also has limits) MAX_PAGE_SIZE = 200 ITEM_TYPE_TO_MEDIA_CLASS: dict[str, MediaClass] = { ITEM_TYPE_MOVIE: MediaClass.MOVIE, ITEM_TYPE_SERIES: MediaClass.TV_SHOW, ITEM_TYPE_SEASON: MediaClass.SEASON, ITEM_TYPE_EPISODE: MediaClass.EPISODE, ITEM_TYPE_AUDIO: MediaClass.TRACK, ITEM_TYPE_MUSIC_ALBUM: MediaClass.ALBUM, ITEM_TYPE_MUSIC_ARTIST: MediaClass.ARTIST, ITEM_TYPE_PLAYLIST: MediaClass.PLAYLIST, ITEM_TYPE_FOLDER: MediaClass.DIRECTORY, ITEM_TYPE_COLLECTION_FOLDER: MediaClass.DIRECTORY, ITEM_TYPE_USER_VIEW: MediaClass.DIRECTORY, } ITEM_TYPE_TO_MEDIA_TYPE: dict[str, MediaType | str] = { ITEM_TYPE_MOVIE: MediaType.MOVIE, ITEM_TYPE_SERIES: MediaType.TVSHOW, ITEM_TYPE_SEASON: MediaType.SEASON, ITEM_TYPE_EPISODE: MediaType.EPISODE, ITEM_TYPE_AUDIO: MediaType.TRACK, ITEM_TYPE_MUSIC_ALBUM: MediaType.ALBUM, ITEM_TYPE_MUSIC_ARTIST: MediaType.ARTIST, ITEM_TYPE_PLAYLIST: MediaType.PLAYLIST, } PLAYABLE_ITEM_TYPES = frozenset( {ITEM_TYPE_MOVIE, ITEM_TYPE_EPISODE, ITEM_TYPE_AUDIO} ) EXPANDABLE_ITEM_TYPES = frozenset( { ITEM_TYPE_SERIES, ITEM_TYPE_SEASON, ITEM_TYPE_MUSIC_ALBUM, ITEM_TYPE_MUSIC_ARTIST, ITEM_TYPE_PLAYLIST, ITEM_TYPE_FOLDER, ITEM_TYPE_COLLECTION_FOLDER, ITEM_TYPE_USER_VIEW, } ) async def async_browse_media( hass: HomeAssistant, api: EmbyApiClient, user_id: str, media_content_type: MediaType | str | None, media_content_id: str | None, ) -> BrowseMedia: """Browse Emby media library.""" try: if not media_content_id: return await _build_root_browse(api, user_id) return await _build_item_browse(api, user_id, media_content_id) except BrowseError: raise except EmbyApiError as err: _LOGGER.warning("Failed to browse Emby library: %s", err) raise BrowseError(f"Failed to browse Emby library: {err}") from err except Exception as err: # noqa: BLE001 - convert any leak into BrowseError _LOGGER.exception("Unexpected error while browsing Emby library") raise BrowseError(f"Unexpected error: {err}") from err async def _build_root_browse(api: EmbyApiClient, user_id: str) -> BrowseMedia: """Build root browse media structure (library views).""" views = await api.get_views(user_id) if not isinstance(views, list): views = [] children: list[BrowseMedia] = [] for view in views: if not isinstance(view, dict): continue item_id = view.get("Id") if not item_id: continue name = view.get("Name", "Unknown") collection_type = view.get("CollectionType", "") if collection_type == "movies": media_class = MediaClass.MOVIE elif collection_type == "tvshows": media_class = MediaClass.TV_SHOW elif collection_type == "music": media_class = MediaClass.MUSIC else: media_class = MediaClass.DIRECTORY children.append( BrowseMedia( media_class=media_class, media_content_id=item_id, media_content_type=MediaType.CHANNELS, title=name, can_play=False, can_expand=True, ) ) return BrowseMedia( media_class=MediaClass.DIRECTORY, media_content_id="", media_content_type=MediaType.CHANNELS, title="Emby Library", can_play=False, can_expand=True, children=children, ) async def _build_item_browse( api: EmbyApiClient, user_id: str, item_id: str ) -> BrowseMedia: """Build browse media structure for a specific item.""" item = await api.get_item(user_id, item_id) item_type = item.get("Type", "") item_name = item.get("Name", "Unknown") children_data = await api.get_items( user_id=user_id, parent_id=item_id, limit=MAX_PAGE_SIZE, fields=["PrimaryImageAspectRatio"], ) raw_children = ( children_data.get("Items", []) if isinstance(children_data, dict) else [] ) children: list[BrowseMedia] = [] for child in raw_children: if not isinstance(child, dict): continue child_media = _build_browse_media_item(child) if child_media: children.append(child_media) media_class = ITEM_TYPE_TO_MEDIA_CLASS.get(item_type, MediaClass.DIRECTORY) media_type = ITEM_TYPE_TO_MEDIA_TYPE.get(item_type, MediaType.CHANNELS) return BrowseMedia( media_class=media_class, media_content_id=item_id, media_content_type=media_type, title=item_name, can_play=item_type in PLAYABLE_ITEM_TYPES, can_expand=True, children=children, ) def _build_browse_media_item(item: dict[str, Any]) -> BrowseMedia | None: """Build a BrowseMedia item from Emby item data.""" item_id = item.get("Id") if not item_id: return None item_type = item.get("Type", "") name = item.get("Name", "Unknown") if item_type == ITEM_TYPE_EPISODE: season_num = item.get("ParentIndexNumber") episode_num = item.get("IndexNumber") if season_num is not None and episode_num is not None: name = f"S{season_num:02d}E{episode_num:02d} - {name}" elif episode_num is not None: name = f"E{episode_num:02d} - {name}" if item_type == ITEM_TYPE_AUDIO: track_num = item.get("IndexNumber") artists = item.get("Artists", []) if track_num is not None: name = f"{track_num}. {name}" if artists: name = f"{name} - {', '.join(artists)}" media_class = ITEM_TYPE_TO_MEDIA_CLASS.get(item_type, MediaClass.VIDEO) media_type = ITEM_TYPE_TO_MEDIA_TYPE.get(item_type, MediaType.VIDEO) return BrowseMedia( media_class=media_class, media_content_id=item_id, media_content_type=media_type, title=name, can_play=item_type in PLAYABLE_ITEM_TYPES, can_expand=item_type in EXPANDABLE_ITEM_TYPES, )