"""Linux media controller using MPRIS D-Bus interface.""" import asyncio import logging import subprocess from typing import Any, Optional from ..models import MediaState, MediaStatus from .media_controller import MediaController logger = logging.getLogger(__name__) # Linux-specific imports try: import dbus from dbus.mainloop.glib import DBusGMainLoop DBUS_AVAILABLE = True except ImportError: DBUS_AVAILABLE = False logger.warning("D-Bus libraries not available") class LinuxMediaController(MediaController): """Media controller for Linux using MPRIS D-Bus interface.""" MPRIS_PATH = "/org/mpris/MediaPlayer2" MPRIS_INTERFACE = "org.mpris.MediaPlayer2.Player" MPRIS_PREFIX = "org.mpris.MediaPlayer2." PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties" def __init__(self): if not DBUS_AVAILABLE: raise RuntimeError( "Linux media control requires dbus-python package. " "Install with: sudo apt-get install python3-dbus" ) DBusGMainLoop(set_as_default=True) self._bus = dbus.SessionBus() def _get_active_player(self) -> Optional[str]: """Find an active MPRIS media player on the bus.""" try: bus_names = self._bus.list_names() mpris_players = [ name for name in bus_names if name.startswith(self.MPRIS_PREFIX) ] if not mpris_players: return None # Prefer players that are currently playing for player in mpris_players: try: proxy = self._bus.get_object(player, self.MPRIS_PATH) props = dbus.Interface(proxy, self.PROPERTIES_INTERFACE) status = props.Get(self.MPRIS_INTERFACE, "PlaybackStatus") if status == "Playing": return player except Exception: continue # Return the first available player return mpris_players[0] except Exception as e: logger.error(f"Failed to get active player: {e}") return None def _get_player_interface(self, player_name: str): """Get the MPRIS player interface.""" proxy = self._bus.get_object(player_name, self.MPRIS_PATH) return dbus.Interface(proxy, self.MPRIS_INTERFACE) def _get_properties_interface(self, player_name: str): """Get the properties interface for a player.""" proxy = self._bus.get_object(player_name, self.MPRIS_PATH) return dbus.Interface(proxy, self.PROPERTIES_INTERFACE) def _get_property(self, player_name: str, property_name: str) -> Any: """Get a property from the player.""" try: props = self._get_properties_interface(player_name) return props.Get(self.MPRIS_INTERFACE, property_name) except Exception as e: logger.debug(f"Failed to get property {property_name}: {e}") return None def _get_volume_pulseaudio(self) -> tuple[int, bool]: """Get volume using pactl (PulseAudio/PipeWire).""" try: # Get default sink volume result = subprocess.run( ["pactl", "get-sink-volume", "@DEFAULT_SINK@"], capture_output=True, text=True, timeout=5, ) if result.returncode == 0: # Parse volume from output like "Volume: front-left: 65536 / 100% / 0.00 dB" for part in result.stdout.split("/"): if "%" in part: volume = int(part.strip().rstrip("%")) break else: volume = 100 else: volume = 100 # Get mute status result = subprocess.run( ["pactl", "get-sink-mute", "@DEFAULT_SINK@"], capture_output=True, text=True, timeout=5, ) muted = "yes" in result.stdout.lower() if result.returncode == 0 else False return volume, muted except Exception as e: logger.error(f"Failed to get volume via pactl: {e}") return 100, False def _set_volume_pulseaudio(self, volume: int) -> bool: """Set volume using pactl.""" try: result = subprocess.run( ["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{volume}%"], capture_output=True, timeout=5, ) return result.returncode == 0 except Exception as e: logger.error(f"Failed to set volume: {e}") return False def _toggle_mute_pulseaudio(self) -> bool: """Toggle mute using pactl, returns new mute state.""" try: result = subprocess.run( ["pactl", "set-sink-mute", "@DEFAULT_SINK@", "toggle"], capture_output=True, timeout=5, ) if result.returncode == 0: _, muted = self._get_volume_pulseaudio() return muted return False except Exception as e: logger.error(f"Failed to toggle mute: {e}") return False def _sync_get_status(self) -> MediaStatus: """Synchronous status read (called from a worker thread).""" status = MediaStatus() volume, muted = self._get_volume_pulseaudio() status.volume = volume status.muted = muted player_name = self._get_active_player() if player_name is None: status.state = MediaState.IDLE return status playback_status = self._get_property(player_name, "PlaybackStatus") if playback_status == "Playing": status.state = MediaState.PLAYING elif playback_status == "Paused": status.state = MediaState.PAUSED elif playback_status == "Stopped": status.state = MediaState.STOPPED else: status.state = MediaState.IDLE metadata = self._get_property(player_name, "Metadata") if metadata: status.title = str(metadata.get("xesam:title", "")) or None artists = metadata.get("xesam:artist", []) if artists: status.artist = str(artists[0]) if isinstance(artists, list) else str(artists) status.album = str(metadata.get("xesam:album", "")) or None status.album_art_url = str(metadata.get("mpris:artUrl", "")) or None length = metadata.get("mpris:length", 0) if length: status.duration = int(length) / 1_000_000 position = self._get_property(player_name, "Position") if position is not None: status.position = int(position) / 1_000_000 status.source = player_name.replace(self.MPRIS_PREFIX, "") return status async def get_status(self) -> MediaStatus: """Get current media playback status (off the event loop).""" # pactl + DBus calls each take 5-100ms on a Pi and would block every # other coroutine on the server. Run them in a worker thread. return await asyncio.to_thread(self._sync_get_status) def _call_player(self, method_name: str) -> bool: player_name = self._get_active_player() if player_name is None: return False try: player = self._get_player_interface(player_name) getattr(player, method_name)() return True except Exception as e: logger.error(f"Failed to call player.{method_name}: {e}") return False async def play(self) -> bool: return await asyncio.to_thread(self._call_player, "Play") async def pause(self) -> bool: return await asyncio.to_thread(self._call_player, "Pause") async def stop(self) -> bool: return await asyncio.to_thread(self._call_player, "Stop") async def next_track(self) -> bool: return await asyncio.to_thread(self._call_player, "Next") async def previous_track(self) -> bool: return await asyncio.to_thread(self._call_player, "Previous") async def set_volume(self, volume: int) -> bool: return await asyncio.to_thread(self._set_volume_pulseaudio, volume) async def toggle_mute(self) -> bool: return await asyncio.to_thread(self._toggle_mute_pulseaudio) def _sync_seek(self, position: float) -> bool: player_name = self._get_active_player() if player_name is None: return False try: player = self._get_player_interface(player_name) player.SetPosition( self._get_property(player_name, "Metadata").get("mpris:trackid", "/"), int(position * 1_000_000), ) return True except Exception as e: logger.error(f"Failed to seek: {e}") return False async def seek(self, position: float) -> bool: return await asyncio.to_thread(self._sync_seek, position) async def open_file(self, file_path: str) -> bool: """Open a media file with the default system player (Linux). Uses xdg-open to open the file with the default application. Args: file_path: Absolute path to the media file Returns: True if successful, False otherwise """ try: process = await asyncio.create_subprocess_exec( 'xdg-open', file_path, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) await process.wait() logger.info(f"Opened file with default player: {file_path}") return True except Exception as e: logger.error(f"Failed to open file {file_path}: {e}") return False