Files
haos-hacs-integration-media…/custom_components/remote_media_player/__init__.py
T
alexei.dolgolyov 8e8acccbb2
Release / release (push) Successful in 3s
chore: release v0.3.2
2026-05-18 13:14:03 +03:00

315 lines
10 KiB
Python

"""The Remote Media Player integration."""
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_AREA_ID, ATTR_DEVICE_ID, ATTR_ENTITY_ID, Platform
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers import entity_registry as er
from .api_client import MediaServerClient, MediaServerError
from .const import (
ATTR_FILE_PATH,
ATTR_SCRIPT_NAME,
ATTR_SCRIPT_PARAMS,
CONF_HOST,
CONF_PORT,
CONF_TOKEN,
DOMAIN,
SERVICE_EXECUTE_SCRIPT,
SERVICE_PLAY_MEDIA_FILE,
)
from .display_coordinator import DisplayCoordinator
from .foreground_coordinator import ForegroundCoordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS: list[Platform] = [
Platform.MEDIA_PLAYER,
Platform.BUTTON,
Platform.NUMBER,
Platform.SWITCH,
Platform.SENSOR,
Platform.BINARY_SENSOR,
Platform.SELECT,
]
# Target-selector fields injected by HA when `target:` is declared in services.yaml.
# Listed explicitly so the voluptuous schema does not strip them.
_TARGET_FIELDS = {
vol.Optional(ATTR_DEVICE_ID): vol.Any(cv.string, [cv.string]),
vol.Optional(ATTR_ENTITY_ID): vol.Any(cv.string, [cv.string]),
vol.Optional(ATTR_AREA_ID): vol.Any(cv.string, [cv.string]),
}
# Service schema for execute_script
SERVICE_EXECUTE_SCRIPT_SCHEMA = vol.Schema(
{
vol.Required(ATTR_SCRIPT_NAME): cv.string,
vol.Optional(ATTR_SCRIPT_PARAMS, default={}): dict,
**_TARGET_FIELDS,
}
)
# Service schema for play_media_file
SERVICE_PLAY_MEDIA_FILE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_FILE_PATH): cv.string,
**_TARGET_FIELDS,
}
)
def _as_list(value: Any) -> list[str]:
"""Normalize a target field to a list of IDs (HA passes str or list)."""
if value is None:
return []
if isinstance(value, str):
return [value]
return list(value)
def _resolve_entry_ids(hass: HomeAssistant, call: ServiceCall) -> list[str]:
"""Resolve target selectors in a ServiceCall to config entry IDs.
Returns the entry IDs of Remote Media Player hubs that match the target.
If no target is provided, returns all configured entries (legacy fan-out).
Targets that don't resolve to any of our entries are skipped with a warning.
"""
device_ids = set(_as_list(call.data.get(ATTR_DEVICE_ID)))
entity_ids = set(_as_list(call.data.get(ATTR_ENTITY_ID)))
area_ids = set(_as_list(call.data.get(ATTR_AREA_ID)))
domain_entries: set[str] = set(hass.data.get(DOMAIN, {}).keys())
if not (device_ids or entity_ids or area_ids):
return list(domain_entries)
dev_reg = dr.async_get(hass)
ent_reg = er.async_get(hass)
# Expand area_id -> device_ids (devices located in that area).
if area_ids:
for device in dev_reg.devices.values():
if device.area_id in area_ids:
device_ids.add(device.id)
matched: set[str] = set()
for device_id in device_ids:
device = dev_reg.async_get(device_id)
if device is None:
continue
for entry_id in device.config_entries:
if entry_id in domain_entries:
matched.add(entry_id)
for entity_id in entity_ids:
entity = ent_reg.async_get(entity_id)
if entity is None or entity.config_entry_id is None:
continue
if entity.config_entry_id in domain_entries:
matched.add(entity.config_entry_id)
if not matched:
_LOGGER.warning(
"Service call targeted device(s)/entity(ies)/area(s) %s but no "
"Remote Media Player hubs matched — nothing will be executed",
{
"device_id": sorted(device_ids),
"entity_id": sorted(entity_ids),
"area_id": sorted(area_ids),
},
)
return list(matched)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Remote Media Player from a config entry.
Args:
hass: Home Assistant instance
entry: Config entry
Returns:
True if setup was successful
"""
_LOGGER.debug("Setting up Remote Media Player: %s", entry.entry_id)
# Create API client
client = MediaServerClient(
host=entry.data[CONF_HOST],
port=entry.data[CONF_PORT],
token=entry.data[CONF_TOKEN],
)
# Verify connection
if not await client.check_connection():
_LOGGER.error("Failed to connect to Media Server")
await client.close()
return False
# Create the shared display coordinator BEFORE platform setup so each
# display platform's async_setup_entry can register against the same
# data source instead of polling /api/display/monitors on its own.
display_coordinator = DisplayCoordinator(hass, client)
try:
await display_coordinator.async_config_entry_first_refresh()
except Exception as err: # noqa: BLE001 - first refresh wraps its own errors
_LOGGER.warning("Initial display monitor fetch failed, will retry: %s", err)
# Foreground coordinator — shared by sensor + binary_sensor platforms and
# nudged by the media-player WebSocket receiver when it gets a push.
foreground_coordinator = ForegroundCoordinator(hass, client)
try:
await foreground_coordinator.async_config_entry_first_refresh()
except Exception as err: # noqa: BLE001
_LOGGER.warning("Initial foreground fetch failed, will retry: %s", err)
# Store client in hass.data
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = {
"client": client,
"display_coordinator": display_coordinator,
"foreground_coordinator": foreground_coordinator,
}
# Register services if not already registered
if not hass.services.has_service(DOMAIN, SERVICE_EXECUTE_SCRIPT):
async def async_execute_script(call: ServiceCall) -> dict[str, Any]:
"""Execute a script on the targeted media server hubs."""
script_name = call.data[ATTR_SCRIPT_NAME]
script_params = call.data.get(ATTR_SCRIPT_PARAMS, {})
target_entries = _resolve_entry_ids(hass, call)
_LOGGER.debug(
"Executing script '%s' with params %s on entries: %s",
script_name,
script_params,
target_entries,
)
results: dict[str, Any] = {}
for entry_id in target_entries:
data = hass.data[DOMAIN].get(entry_id)
if data is None:
continue
client: MediaServerClient = data["client"]
try:
result = await client.execute_script(script_name, script_params)
results[entry_id] = result
_LOGGER.info(
"Script '%s' executed on %s: success=%s",
script_name,
entry_id,
result.get("success", False),
)
except MediaServerError as err:
_LOGGER.error(
"Failed to execute script '%s' on %s: %s",
script_name,
entry_id,
err,
)
results[entry_id] = {"success": False, "error": str(err)}
return results
hass.services.async_register(
DOMAIN,
SERVICE_EXECUTE_SCRIPT,
async_execute_script,
schema=SERVICE_EXECUTE_SCRIPT_SCHEMA,
)
# Register play_media_file service if not already registered
if not hass.services.has_service(DOMAIN, SERVICE_PLAY_MEDIA_FILE):
async def async_play_media_file(call: ServiceCall) -> None:
"""Handle play_media_file service call."""
file_path = call.data[ATTR_FILE_PATH]
target_entries = _resolve_entry_ids(hass, call)
_LOGGER.debug(
"Service play_media_file called with path '%s' on entries: %s",
file_path,
target_entries,
)
for entry_id in target_entries:
data = hass.data[DOMAIN].get(entry_id)
if data is None:
continue
client: MediaServerClient = data["client"]
try:
await client.play_media_file(file_path)
_LOGGER.info("Started playback of %s on %s", file_path, entry_id)
except MediaServerError as err:
_LOGGER.error("Failed to play %s on %s: %s", file_path, entry_id, err)
hass.services.async_register(
DOMAIN,
SERVICE_PLAY_MEDIA_FILE,
async_play_media_file,
schema=SERVICE_PLAY_MEDIA_FILE_SCHEMA,
)
# Forward setup to platforms
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Register update listener for options
entry.async_on_unload(entry.add_update_listener(async_update_options))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry.
Args:
hass: Home Assistant instance
entry: Config entry
Returns:
True if unload was successful
"""
_LOGGER.debug("Unloading Remote Media Player: %s", entry.entry_id)
# Unload platforms
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
# Close client and remove data
data = hass.data[DOMAIN].pop(entry.entry_id)
# Shutdown coordinator (WebSocket cleanup)
if "coordinator" in data:
await data["coordinator"].async_shutdown()
# Close HTTP client
await data["client"].close()
# Remove services if this was the last entry
if not hass.data[DOMAIN]:
hass.services.async_remove(DOMAIN, SERVICE_EXECUTE_SCRIPT)
hass.services.async_remove(DOMAIN, SERVICE_PLAY_MEDIA_FILE)
return unload_ok
async def async_update_options(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update.
Args:
hass: Home Assistant instance
entry: Config entry
"""
_LOGGER.debug("Options updated for: %s", entry.entry_id)
await hass.config_entries.async_reload(entry.entry_id)