Add media browser integration for Home Assistant

- Implement async_browse_media() to enable browsing media folders through HA Media Browser UI
- Add async_play_media() to handle file playback from media browser
- Add play_media_file service for automation support
- Add BROWSE_MEDIA and PLAY_MEDIA feature flags
- Implement media browser API client methods (get_media_folders, browse_folder, play_media_file)
- Fix path separator handling for cross-platform compatibility

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-06 22:24:08 +03:00
parent e4eeb2a97b
commit 8cbe33eb72
4 changed files with 239 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ from homeassistant.helpers import config_validation as cv
from .api_client import MediaServerClient, MediaServerError from .api_client import MediaServerClient, MediaServerError
from .const import ( from .const import (
ATTR_FILE_PATH,
ATTR_SCRIPT_ARGS, ATTR_SCRIPT_ARGS,
ATTR_SCRIPT_NAME, ATTR_SCRIPT_NAME,
CONF_HOST, CONF_HOST,
@@ -21,6 +22,7 @@ from .const import (
CONF_TOKEN, CONF_TOKEN,
DOMAIN, DOMAIN,
SERVICE_EXECUTE_SCRIPT, SERVICE_EXECUTE_SCRIPT,
SERVICE_PLAY_MEDIA_FILE,
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -37,6 +39,13 @@ SERVICE_EXECUTE_SCRIPT_SCHEMA = vol.Schema(
} }
) )
# Service schema for play_media_file
SERVICE_PLAY_MEDIA_FILE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_FILE_PATH): cv.string,
}
)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Remote Media Player from a config entry. """Set up Remote Media Player from a config entry.
@@ -111,6 +120,29 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
schema=SERVICE_EXECUTE_SCRIPT_SCHEMA, schema=SERVICE_EXECUTE_SCRIPT_SCHEMA,
) )
# Register play_media_file service if not already registered
if not hass.services.has_service(DOMAIN, SERVICE_PLAY_MEDIA_FILE):
async def async_play_media_file(call: ServiceCall) -> None:
"""Handle play_media_file service call."""
file_path = call.data[ATTR_FILE_PATH]
_LOGGER.debug("Service play_media_file called with path: %s", file_path)
# Execute on all configured media server instances
for entry_id, data in hass.data[DOMAIN].items():
client: MediaServerClient = data["client"]
try:
await client.play_media_file(file_path)
_LOGGER.info("Started playback of %s on %s", file_path, entry_id)
except MediaServerError as err:
_LOGGER.error("Failed to play %s on %s: %s", file_path, entry_id, err)
hass.services.async_register(
DOMAIN,
SERVICE_PLAY_MEDIA_FILE,
async_play_media_file,
schema=SERVICE_PLAY_MEDIA_FILE_SCHEMA,
)
# Forward setup to platforms # Forward setup to platforms
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@@ -149,6 +181,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# Remove services if this was the last entry # Remove services if this was the last entry
if not hass.data[DOMAIN]: if not hass.data[DOMAIN]:
hass.services.async_remove(DOMAIN, SERVICE_EXECUTE_SCRIPT) hass.services.async_remove(DOMAIN, SERVICE_EXECUTE_SCRIPT)
hass.services.async_remove(DOMAIN, SERVICE_PLAY_MEDIA_FILE)
return unload_ok return unload_ok

View File

@@ -27,6 +27,9 @@ from .const import (
API_TOGGLE, API_TOGGLE,
API_SCRIPTS_LIST, API_SCRIPTS_LIST,
API_SCRIPTS_EXECUTE, API_SCRIPTS_EXECUTE,
API_BROWSER_FOLDERS,
API_BROWSER_BROWSE,
API_BROWSER_PLAY,
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -296,6 +299,49 @@ class MediaServerClient:
json_data = {"args": args or []} json_data = {"args": args or []}
return await self._request("POST", endpoint, json_data) return await self._request("POST", endpoint, json_data)
async def get_media_folders(self) -> dict[str, dict[str, Any]]:
"""Get configured media folders.
Returns:
Dictionary of folders with folder_id as key and folder config as value
"""
return await self._request("GET", API_BROWSER_FOLDERS)
async def browse_folder(
self, folder_id: str, path: str = "", offset: int = 0, limit: int = 100
) -> dict[str, Any]:
"""Browse a media folder.
Args:
folder_id: ID of the folder to browse
path: Path within the folder (empty for root)
offset: Pagination offset
limit: Number of items to return
Returns:
Dictionary with current_path, parent_path, items, total, offset, limit
"""
params = {
"folder_id": folder_id,
"path": path,
"offset": offset,
"limit": limit,
}
query_string = "&".join(f"{k}={v}" for k, v in params.items())
endpoint = f"{API_BROWSER_BROWSE}?{query_string}"
return await self._request("GET", endpoint)
async def play_media_file(self, file_path: str) -> dict[str, Any]:
"""Play a media file by absolute path.
Args:
file_path: Absolute path to the media file
Returns:
Response data with success status
"""
return await self._request("POST", API_BROWSER_PLAY, {"path": file_path})
class MediaServerWebSocket: class MediaServerWebSocket:
"""WebSocket client for real-time media status updates.""" """WebSocket client for real-time media status updates."""

View File

@@ -34,10 +34,15 @@ API_TOGGLE = "/api/media/toggle"
API_SCRIPTS_LIST = "/api/scripts/list" API_SCRIPTS_LIST = "/api/scripts/list"
API_SCRIPTS_EXECUTE = "/api/scripts/execute" API_SCRIPTS_EXECUTE = "/api/scripts/execute"
API_WEBSOCKET = "/api/media/ws" API_WEBSOCKET = "/api/media/ws"
API_BROWSER_FOLDERS = "/api/browser/folders"
API_BROWSER_BROWSE = "/api/browser/browse"
API_BROWSER_PLAY = "/api/browser/play"
# Service names # Service names
SERVICE_EXECUTE_SCRIPT = "execute_script" SERVICE_EXECUTE_SCRIPT = "execute_script"
SERVICE_PLAY_MEDIA_FILE = "play_media_file"
# Service attributes # Service attributes
ATTR_SCRIPT_NAME = "script_name" ATTR_SCRIPT_NAME = "script_name"
ATTR_SCRIPT_ARGS = "args" ATTR_SCRIPT_ARGS = "args"
ATTR_FILE_PATH = "file_path"

View File

@@ -8,11 +8,16 @@ from datetime import datetime, timedelta
from typing import Any from typing import Any
from homeassistant.components.media_player import ( from homeassistant.components.media_player import (
BrowseMedia,
MediaPlayerEntity, MediaPlayerEntity,
MediaPlayerEntityFeature, MediaPlayerEntityFeature,
MediaPlayerState, MediaPlayerState,
MediaType, MediaType,
) )
from homeassistant.components.media_player.const import (
MediaClass,
)
from urllib.parse import quote, unquote
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
@@ -303,6 +308,8 @@ class RemoteMediaPlayerEntity(CoordinatorEntity[MediaPlayerCoordinator], MediaPl
| MediaPlayerEntityFeature.SEEK | MediaPlayerEntityFeature.SEEK
| MediaPlayerEntityFeature.TURN_ON | MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF | MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.PLAY_MEDIA
) )
@property @property
@@ -489,3 +496,151 @@ class RemoteMediaPlayerEntity(CoordinatorEntity[MediaPlayerCoordinator], MediaPl
await self.coordinator.client.toggle() await self.coordinator.client.toggle()
except MediaServerError as err: except MediaServerError as err:
_LOGGER.error("Failed to toggle: %s", err) _LOGGER.error("Failed to toggle: %s", err)
# Media Browser support
@staticmethod
def _encode_media_id(folder_id: str, path: str = "") -> str:
"""Encode folder_id and path into media_content_id.
Format: folder_id|encoded_path
Root folder: folder_id|
"""
return f"{folder_id}|{quote(path, safe='')}"
@staticmethod
def _decode_media_id(media_content_id: str) -> tuple[str, str]:
"""Decode media_content_id into folder_id and path.
Returns:
Tuple of (folder_id, path)
"""
if not media_content_id or "|" not in media_content_id:
return "", ""
folder_id, encoded_path = media_content_id.split("|", 1)
path = unquote(encoded_path) if encoded_path else ""
return folder_id, path
async def async_browse_media(
self,
media_content_type: str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the media browsing.
Args:
media_content_type: Type of media (unused, but required by HA)
media_content_id: ID in format "folder_id|path" or None for root
Returns:
BrowseMedia object with children
"""
_LOGGER.debug("Browse media: type=%s, id=%s", media_content_type, media_content_id)
# Root level - list all folders
if not media_content_id:
folders = await self.coordinator.client.get_media_folders()
children = [
BrowseMedia(
title=config["label"],
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC, # All folders show as music
media_content_id=self._encode_media_id(folder_id, ""),
can_play=False,
can_expand=True,
)
for folder_id, config in folders.items()
if config.get("enabled", True)
]
return BrowseMedia(
title="Media Folders",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
media_content_id="",
can_play=False,
can_expand=True,
children=children,
)
# Browse specific folder
folder_id, path = self._decode_media_id(media_content_id)
if not folder_id:
raise ValueError("Invalid media_content_id format")
# Get folder contents from API
browse_data = await self.coordinator.client.browse_folder(folder_id, path, offset=0, limit=1000)
children = []
for item in browse_data.get("items", []):
if item["type"] == "folder":
# Subfolder
item_path = f"{path}/{item['name']}" if path else item['name']
children.append(
BrowseMedia(
title=item["name"],
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
media_content_id=self._encode_media_id(folder_id, item_path),
can_play=False,
can_expand=True,
)
)
elif item.get("is_media", False):
# Media file
# Build absolute path for playback
folders = await self.coordinator.client.get_media_folders()
base_path = folders[folder_id]["path"]
file_path_in_folder = f"{path}/{item['name']}" if path else item['name']
# Handle platform path separators
separator = '\\' if '\\' in base_path else '/'
# Ensure base_path doesn't end with separator to avoid double separators
base_path_clean = base_path.rstrip('/\\')
absolute_path = f"{base_path_clean}{separator}{file_path_in_folder.replace('/', separator)}"
children.append(
BrowseMedia(
title=item["name"],
media_class=MediaClass.MUSIC,
media_content_type=MediaType.MUSIC,
media_content_id=absolute_path, # Use absolute path as ID for playback
can_play=True,
can_expand=False,
)
)
# Get current folder label
current_title = path.split("/")[-1] if path else browse_data.get("label", folder_id)
return BrowseMedia(
title=current_title,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.MUSIC,
media_content_id=media_content_id,
can_play=False,
can_expand=True,
children=children,
)
async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any
) -> None:
"""Play a media file.
Args:
media_type: Type of media (unused)
media_id: Absolute file path to media file
**kwargs: Additional arguments (unused)
"""
_LOGGER.debug("Play media: type=%s, id=%s", media_type, media_id)
try:
# media_id is the absolute file path from browse_media
await self.coordinator.client.play_media_file(media_id)
# Request immediate status update
await self.coordinator.async_request_refresh()
except MediaServerError as err:
_LOGGER.error("Failed to play media file: %s", err)