Files
alexei.dolgolyov 6ae0ed1787
Release / release (push) Successful in 2s
chore: release v0.2.0
Production-readiness pass: security hardening, performance improvements,
new services (send_message, set_repeat, refresh_library), diagnostics,
reauth flow, image proxy, per-instance device IDs, exponential WS
reconnect backoff, ID validation, stale device cleanup, and supporting
integration plumbing. Three rounds of independent code review applied.

See RELEASE_NOTES.md for the full changelog.
2026-05-26 13:16:36 +03:00

206 lines
6.3 KiB
Python

"""Services for the Emby Media Player integration."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import voluptuous as vol
from homeassistant.components.media_player import DOMAIN as MEDIA_PLAYER_DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import entity_registry as er
from .api import EmbyApiError
from .const import (
ATTR_HEADER,
ATTR_MESSAGE,
ATTR_REPEAT_MODE,
ATTR_TIMEOUT_MS,
DOMAIN,
REPEAT_MODE_ALL,
REPEAT_MODE_NONE,
REPEAT_MODE_ONE,
)
if TYPE_CHECKING:
from . import EmbyRuntimeData
from .api import EmbyApiClient
_LOGGER = logging.getLogger(__name__)
SERVICE_SEND_MESSAGE = "send_message"
SERVICE_SET_REPEAT = "set_repeat"
SERVICE_REFRESH_LIBRARY = "refresh_library"
_REPEAT_MODES = (REPEAT_MODE_NONE, REPEAT_MODE_ONE, REPEAT_MODE_ALL)
_SEND_MESSAGE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(ATTR_MESSAGE): cv.string,
vol.Optional(ATTR_HEADER): cv.string,
vol.Optional(ATTR_TIMEOUT_MS): vol.All(
cv.positive_int, vol.Range(min=100, max=60000)
),
}
)
_SET_REPEAT_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(ATTR_REPEAT_MODE): vol.In(_REPEAT_MODES),
}
)
_REFRESH_LIBRARY_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
}
)
def _resolve_sessions(
hass: HomeAssistant, entity_ids: list[str]
) -> list[tuple["EmbyRuntimeData", str]]:
"""Resolve entity_ids into (runtime_data, session_id) pairs."""
entity_registry = er.async_get(hass)
resolved: list[tuple[EmbyRuntimeData, str]] = []
for entity_id in entity_ids:
entry = entity_registry.async_get(entity_id)
if (
entry is None
or entry.platform != DOMAIN
or entry.domain != MEDIA_PLAYER_DOMAIN
):
raise ServiceValidationError(
f"{entity_id} is not an Emby media player"
)
config_entry = hass.config_entries.async_get_entry(entry.config_entry_id)
if (
config_entry is None
or config_entry.state is not ConfigEntryState.LOADED
):
raise HomeAssistantError(
f"Emby integration for {entity_id} is not loaded"
)
# unique_id is "{entry_id}_{session_id}"
prefix = f"{config_entry.entry_id}_"
unique_id = entry.unique_id or ""
if not unique_id.startswith(prefix):
raise HomeAssistantError(
f"Cannot resolve session for {entity_id}"
)
session_id = unique_id.removeprefix(prefix)
resolved.append((config_entry.runtime_data, session_id))
return resolved
def _resolve_apis(
hass: HomeAssistant, entity_ids: list[str] | None
) -> list["EmbyApiClient"]:
"""Resolve unique API clients for the supplied entities, or all entries."""
if entity_ids:
return [rd.api for rd, _ in _resolve_sessions(hass, entity_ids)]
return [
entry.runtime_data.api
for entry in hass.config_entries.async_entries(DOMAIN)
if entry.state is ConfigEntryState.LOADED
]
async def async_setup_services(hass: HomeAssistant) -> None:
"""Register integration-level services (idempotent)."""
if hass.services.has_service(DOMAIN, SERVICE_SEND_MESSAGE):
return
async def _send_message(call: ServiceCall) -> None:
targets = _resolve_sessions(hass, call.data[ATTR_ENTITY_ID])
message = call.data[ATTR_MESSAGE]
header = call.data.get(ATTR_HEADER)
timeout_ms = call.data.get(ATTR_TIMEOUT_MS)
errors: list[str] = []
for runtime_data, session_id in targets:
try:
await runtime_data.api.display_message(
session_id, message, header=header, timeout_ms=timeout_ms
)
except EmbyApiError as err:
errors.append(f"{session_id}: {err}")
if errors:
raise HomeAssistantError(
"Failed to send message to: " + "; ".join(errors)
)
async def _set_repeat(call: ServiceCall) -> None:
targets = _resolve_sessions(hass, call.data[ATTR_ENTITY_ID])
mode = call.data[ATTR_REPEAT_MODE]
errors: list[str] = []
for runtime_data, session_id in targets:
try:
await runtime_data.api.set_repeat_mode(session_id, mode)
except EmbyApiError as err:
errors.append(f"{session_id}: {err}")
if errors:
raise HomeAssistantError(
"Failed to set repeat mode on: " + "; ".join(errors)
)
async def _refresh_library(call: ServiceCall) -> None:
apis = _resolve_apis(hass, call.data.get(ATTR_ENTITY_ID))
# De-duplicate API instances (multiple entities can share one server).
seen: set[int] = set()
errors: list[str] = []
for api in apis:
if id(api) in seen:
continue
seen.add(id(api))
try:
await api.refresh_library()
except EmbyApiError as err:
errors.append(str(err))
if errors:
raise HomeAssistantError(
"Failed to refresh library on one or more servers: "
+ "; ".join(errors)
)
hass.services.async_register(
DOMAIN, SERVICE_SEND_MESSAGE, _send_message, schema=_SEND_MESSAGE_SCHEMA
)
hass.services.async_register(
DOMAIN, SERVICE_SET_REPEAT, _set_repeat, schema=_SET_REPEAT_SCHEMA
)
hass.services.async_register(
DOMAIN,
SERVICE_REFRESH_LIBRARY,
_refresh_library,
schema=_REFRESH_LIBRARY_SCHEMA,
)
def async_unload_services(hass: HomeAssistant) -> None:
"""Remove integration-level services."""
for service in (
SERVICE_SEND_MESSAGE,
SERVICE_SET_REPEAT,
SERVICE_REFRESH_LIBRARY,
):
if hass.services.has_service(DOMAIN, service):
hass.services.async_remove(DOMAIN, service)