Files
media-player-mixed/media_server/services/linux_media.py
alexei.dolgolyov 67a89e8349 Initial commit: Media server and Home Assistant integration
- FastAPI server for Windows media control via WinRT/SMTC
- Home Assistant custom integration with media player entity
- Script button entities for system commands
- Position tracking with grace period for track skip handling
- Server availability detection in HA entity

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 13:08:40 +03:00

296 lines
9.9 KiB
Python

"""Linux media controller using MPRIS D-Bus interface."""
import asyncio
import logging
import subprocess
from typing import Optional, Any
from ..models import MediaState, MediaStatus
from .media_controller import MediaController
logger = logging.getLogger(__name__)
# Linux-specific imports
try:
import dbus
from dbus.mainloop.glib import DBusGMainLoop
DBUS_AVAILABLE = True
except ImportError:
DBUS_AVAILABLE = False
logger.warning("D-Bus libraries not available")
class LinuxMediaController(MediaController):
"""Media controller for Linux using MPRIS D-Bus interface."""
MPRIS_PATH = "/org/mpris/MediaPlayer2"
MPRIS_INTERFACE = "org.mpris.MediaPlayer2.Player"
MPRIS_PREFIX = "org.mpris.MediaPlayer2."
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
def __init__(self):
if not DBUS_AVAILABLE:
raise RuntimeError(
"Linux media control requires dbus-python package. "
"Install with: sudo apt-get install python3-dbus"
)
DBusGMainLoop(set_as_default=True)
self._bus = dbus.SessionBus()
def _get_active_player(self) -> Optional[str]:
"""Find an active MPRIS media player on the bus."""
try:
bus_names = self._bus.list_names()
mpris_players = [
name for name in bus_names if name.startswith(self.MPRIS_PREFIX)
]
if not mpris_players:
return None
# Prefer players that are currently playing
for player in mpris_players:
try:
proxy = self._bus.get_object(player, self.MPRIS_PATH)
props = dbus.Interface(proxy, self.PROPERTIES_INTERFACE)
status = props.Get(self.MPRIS_INTERFACE, "PlaybackStatus")
if status == "Playing":
return player
except Exception:
continue
# Return the first available player
return mpris_players[0]
except Exception as e:
logger.error(f"Failed to get active player: {e}")
return None
def _get_player_interface(self, player_name: str):
"""Get the MPRIS player interface."""
proxy = self._bus.get_object(player_name, self.MPRIS_PATH)
return dbus.Interface(proxy, self.MPRIS_INTERFACE)
def _get_properties_interface(self, player_name: str):
"""Get the properties interface for a player."""
proxy = self._bus.get_object(player_name, self.MPRIS_PATH)
return dbus.Interface(proxy, self.PROPERTIES_INTERFACE)
def _get_property(self, player_name: str, property_name: str) -> Any:
"""Get a property from the player."""
try:
props = self._get_properties_interface(player_name)
return props.Get(self.MPRIS_INTERFACE, property_name)
except Exception as e:
logger.debug(f"Failed to get property {property_name}: {e}")
return None
def _get_volume_pulseaudio(self) -> tuple[int, bool]:
"""Get volume using pactl (PulseAudio/PipeWire)."""
try:
# Get default sink volume
result = subprocess.run(
["pactl", "get-sink-volume", "@DEFAULT_SINK@"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
# Parse volume from output like "Volume: front-left: 65536 / 100% / 0.00 dB"
for part in result.stdout.split("/"):
if "%" in part:
volume = int(part.strip().rstrip("%"))
break
else:
volume = 100
else:
volume = 100
# Get mute status
result = subprocess.run(
["pactl", "get-sink-mute", "@DEFAULT_SINK@"],
capture_output=True,
text=True,
timeout=5,
)
muted = "yes" in result.stdout.lower() if result.returncode == 0 else False
return volume, muted
except Exception as e:
logger.error(f"Failed to get volume via pactl: {e}")
return 100, False
def _set_volume_pulseaudio(self, volume: int) -> bool:
"""Set volume using pactl."""
try:
result = subprocess.run(
["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{volume}%"],
capture_output=True,
timeout=5,
)
return result.returncode == 0
except Exception as e:
logger.error(f"Failed to set volume: {e}")
return False
def _toggle_mute_pulseaudio(self) -> bool:
"""Toggle mute using pactl, returns new mute state."""
try:
result = subprocess.run(
["pactl", "set-sink-mute", "@DEFAULT_SINK@", "toggle"],
capture_output=True,
timeout=5,
)
if result.returncode == 0:
_, muted = self._get_volume_pulseaudio()
return muted
return False
except Exception as e:
logger.error(f"Failed to toggle mute: {e}")
return False
async def get_status(self) -> MediaStatus:
"""Get current media playback status."""
status = MediaStatus()
# Get system volume
volume, muted = self._get_volume_pulseaudio()
status.volume = volume
status.muted = muted
# Get active player
player_name = self._get_active_player()
if player_name is None:
status.state = MediaState.IDLE
return status
# Get playback status
playback_status = self._get_property(player_name, "PlaybackStatus")
if playback_status == "Playing":
status.state = MediaState.PLAYING
elif playback_status == "Paused":
status.state = MediaState.PAUSED
elif playback_status == "Stopped":
status.state = MediaState.STOPPED
else:
status.state = MediaState.IDLE
# Get metadata
metadata = self._get_property(player_name, "Metadata")
if metadata:
status.title = str(metadata.get("xesam:title", "")) or None
artists = metadata.get("xesam:artist", [])
if artists:
status.artist = str(artists[0]) if isinstance(artists, list) else str(artists)
status.album = str(metadata.get("xesam:album", "")) or None
status.album_art_url = str(metadata.get("mpris:artUrl", "")) or None
# Duration in microseconds
length = metadata.get("mpris:length", 0)
if length:
status.duration = int(length) / 1_000_000
# Get position (in microseconds)
position = self._get_property(player_name, "Position")
if position is not None:
status.position = int(position) / 1_000_000
# Get source name
status.source = player_name.replace(self.MPRIS_PREFIX, "")
return status
async def play(self) -> bool:
"""Resume playback."""
player_name = self._get_active_player()
if player_name is None:
return False
try:
player = self._get_player_interface(player_name)
player.Play()
return True
except Exception as e:
logger.error(f"Failed to play: {e}")
return False
async def pause(self) -> bool:
"""Pause playback."""
player_name = self._get_active_player()
if player_name is None:
return False
try:
player = self._get_player_interface(player_name)
player.Pause()
return True
except Exception as e:
logger.error(f"Failed to pause: {e}")
return False
async def stop(self) -> bool:
"""Stop playback."""
player_name = self._get_active_player()
if player_name is None:
return False
try:
player = self._get_player_interface(player_name)
player.Stop()
return True
except Exception as e:
logger.error(f"Failed to stop: {e}")
return False
async def next_track(self) -> bool:
"""Skip to next track."""
player_name = self._get_active_player()
if player_name is None:
return False
try:
player = self._get_player_interface(player_name)
player.Next()
return True
except Exception as e:
logger.error(f"Failed to skip next: {e}")
return False
async def previous_track(self) -> bool:
"""Go to previous track."""
player_name = self._get_active_player()
if player_name is None:
return False
try:
player = self._get_player_interface(player_name)
player.Previous()
return True
except Exception as e:
logger.error(f"Failed to skip previous: {e}")
return False
async def set_volume(self, volume: int) -> bool:
"""Set system volume."""
return self._set_volume_pulseaudio(volume)
async def toggle_mute(self) -> bool:
"""Toggle mute state."""
return self._toggle_mute_pulseaudio()
async def seek(self, position: float) -> bool:
"""Seek to position in seconds."""
player_name = self._get_active_player()
if player_name is None:
return False
try:
player = self._get_player_interface(player_name)
# MPRIS expects position in microseconds
player.SetPosition(
self._get_property(player_name, "Metadata").get("mpris:trackid", "/"),
int(position * 1_000_000),
)
return True
except Exception as e:
logger.error(f"Failed to seek: {e}")
return False