Files
media-player-server/media_server/services/macos_media.py
T
alexei.dolgolyov ddf4a6cb29
Lint & Test / test (push) Successful in 20s
Lint & Test / linux-smoke (push) Failing after 34s
feat: production-ready Linux & macOS support
- Add `linux` (dbus-python, PyGObject, python-xlib) and `macos`
  (pyobjc) extras to pyproject.toml with sys_platform markers; move
  cross-platform screen-brightness-control + monitorcontrol to base deps.
- build-dist-linux.sh: install `.[linux]`, pkg-config pre-flight for
  dbus-1/glib-2.0, emit a systemd unit with DBUS_SESSION_BUS_ADDRESS +
  XDG_RUNTIME_DIR + ReadWritePaths for ~/.config and ~/.cache so MPRIS
  works and audit-log / thumbnail writes aren't blocked by ProtectHome.
- New build-dist-macos.sh + per-user LaunchAgent installer producing
  MediaServer-vX.Y-macos-{arm64,x86_64}.tar.gz.
- Templated media-server.service updated to match the dist layout with
  proper session-bus env vars and a writable state-dir grant.
- install_linux.sh: drop dead requirements.txt path; install via
  `pip install ".[linux]"` and pre-create the writable state dirs.
- Cross-platform album artwork: abstract MediaController.get_album_art()
  with Linux (mpris:artUrl, file:// + http(s)://) and macOS (Spotify URL)
  impls; routes/media artwork endpoint now awaits the controller.
- LinuxMediaController connects to the session bus lazily — failure no
  longer crashes lifespan startup; MPRIS calls return idle until the bus
  is reachable. Logged once at INFO with a hint about
  `loginctl enable-linger`.
- Startup preflight on Linux warns if DBUS_SESSION_BUS_ADDRESS or
  XDG_RUNTIME_DIR is unset and informs the user when Wayland disables
  the foreground probe.
- /api/media/visualizer/status now reports a per-OS unavailable_reason.
- tray._confirm guarded against ctypes.windll on non-Windows.
- config.example.yaml: per-OS commented script examples; on_turn_off
  default is now a no-op echo (used to silently fail off Windows).
- README: replace stale `pip install -r requirements.txt` instructions
  with the new extras; add systemd lingering doc + troubleshooting
  section; add macOS LaunchAgent section.
- CI: new linux-smoke job (installs `.[linux]`, boots the server under
  dbus-run-session, asserts /api/health). Release workflow gains
  apt-deps step for the Linux build and a best-effort macOS build job.
2026-05-26 12:17:30 +03:00

378 lines
14 KiB
Python

"""macOS media controller using AppleScript and system commands."""
import asyncio
import logging
import subprocess
import threading
from typing import Optional
from ..models import MediaState, MediaStatus
from .media_controller import MediaController
logger = logging.getLogger(__name__)
# Cap remote artwork downloads (Spotify's artwork url is http(s)://).
_MAX_ART_BYTES = 8 * 1024 * 1024
_ART_FETCH_TIMEOUT = 5.0 # seconds
class MacOSMediaController(MediaController):
"""Media controller for macOS using osascript and system commands."""
def __init__(self) -> None:
# Cached art bytes keyed by the active art URL.
self._art_lock = threading.Lock()
self._art_url: Optional[str] = None
self._art_bytes: Optional[bytes] = None
def _run_osascript(self, script: str) -> Optional[str]:
"""Run an AppleScript and return the output."""
try:
result = subprocess.run(
["osascript", "-e", script],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
return result.stdout.strip()
return None
except Exception as e:
logger.error(f"osascript error: {e}")
return None
def _get_active_app(self) -> Optional[str]:
"""Get the currently active media application."""
# Check common media apps in order of preference
apps = ["Spotify", "Music", "TV", "VLC", "QuickTime Player"]
for app in apps:
script = f'''
tell application "System Events"
if exists (processes where name is "{app}") then
return "{app}"
end if
end tell
return ""
'''
result = self._run_osascript(script)
if result:
return result
return None
def _get_spotify_info(self) -> dict:
"""Get playback info from Spotify."""
script = '''
tell application "Spotify"
if player state is playing then
set currentState to "playing"
else if player state is paused then
set currentState to "paused"
else
set currentState to "stopped"
end if
try
set trackName to name of current track
set artistName to artist of current track
set albumName to album of current track
set trackDuration to duration of current track
set trackPosition to player position
set artUrl to artwork url of current track
on error
set trackName to ""
set artistName to ""
set albumName to ""
set trackDuration to 0
set trackPosition to 0
set artUrl to ""
end try
return currentState & "|" & trackName & "|" & artistName & "|" & albumName & "|" & trackDuration & "|" & trackPosition & "|" & artUrl
end tell
'''
result = self._run_osascript(script)
if result:
parts = result.split("|")
if len(parts) >= 7:
return {
"state": parts[0],
"title": parts[1] or None,
"artist": parts[2] or None,
"album": parts[3] or None,
"duration": float(parts[4]) / 1000 if parts[4] else None, # ms to seconds
"position": float(parts[5]) if parts[5] else None,
"art_url": parts[6] or None,
}
return {}
def _get_music_info(self) -> dict:
"""Get playback info from Apple Music."""
script = '''
tell application "Music"
if player state is playing then
set currentState to "playing"
else if player state is paused then
set currentState to "paused"
else
set currentState to "stopped"
end if
try
set trackName to name of current track
set artistName to artist of current track
set albumName to album of current track
set trackDuration to duration of current track
set trackPosition to player position
on error
set trackName to ""
set artistName to ""
set albumName to ""
set trackDuration to 0
set trackPosition to 0
end try
return currentState & "|" & trackName & "|" & artistName & "|" & albumName & "|" & trackDuration & "|" & trackPosition
end tell
'''
result = self._run_osascript(script)
if result:
parts = result.split("|")
if len(parts) >= 6:
return {
"state": parts[0],
"title": parts[1] or None,
"artist": parts[2] or None,
"album": parts[3] or None,
"duration": float(parts[4]) if parts[4] else None,
"position": float(parts[5]) if parts[5] else None,
}
return {}
def _get_volume(self) -> tuple[int, bool]:
"""Get system volume and mute state."""
try:
# Get volume level
result = self._run_osascript("output volume of (get volume settings)")
volume = int(result) if result else 100
# Get mute state
result = self._run_osascript("output muted of (get volume settings)")
muted = result == "true"
return volume, muted
except Exception as e:
logger.error(f"Failed to get volume: {e}")
return 100, False
async def get_status(self) -> MediaStatus:
"""Get current media playback status."""
status = MediaStatus()
# Get system volume
volume, muted = self._get_volume()
status.volume = volume
status.muted = muted
# Try to get info from active media app
active_app = self._get_active_app()
if active_app is None:
status.state = MediaState.IDLE
return status
status.source = active_app
if active_app == "Spotify":
info = self._get_spotify_info()
elif active_app == "Music":
info = self._get_music_info()
else:
info = {}
if info:
state = info.get("state", "stopped")
if state == "playing":
status.state = MediaState.PLAYING
elif state == "paused":
status.state = MediaState.PAUSED
else:
status.state = MediaState.STOPPED
status.title = info.get("title")
status.artist = info.get("artist")
status.album = info.get("album")
status.duration = info.get("duration")
status.position = info.get("position")
art_url = info.get("art_url")
status.album_art_url = art_url
# Track changes invalidate the cached image bytes — actual
# fetch happens lazily in get_album_art().
with self._art_lock:
if art_url != self._art_url:
self._art_url = art_url
self._art_bytes = None
else:
status.state = MediaState.IDLE
return status
def _fetch_art_sync(self, url: str) -> Optional[bytes]:
"""Resolve a Spotify/Music art URL (http(s)://) to bytes.
File-scheme URLs aren't expected on macOS (AppleScript apps return
artwork as remote URLs), so only http(s) is supported.
"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
except ValueError:
return None
if parsed.scheme.lower() not in ("http", "https"):
return None
import urllib.error
import urllib.request
req = urllib.request.Request(url, headers={"User-Agent": "media-server/0.x"})
try:
with urllib.request.urlopen(req, timeout=_ART_FETCH_TIMEOUT) as resp:
return resp.read(_MAX_ART_BYTES + 1)[:_MAX_ART_BYTES] or None
except (urllib.error.URLError, urllib.error.HTTPError, OSError) as e:
logger.debug("Could not fetch macOS art %s: %s", url, e)
return None
async def get_album_art(self) -> Optional[bytes]:
"""Return cached art bytes, fetching once per track URL."""
with self._art_lock:
url = self._art_url
cached = self._art_bytes
if cached is not None:
return cached
if not url:
return None
data = await asyncio.to_thread(self._fetch_art_sync, url)
with self._art_lock:
if url == self._art_url:
self._art_bytes = data
return data
async def play(self) -> bool:
"""Resume playback using media key simulation."""
# Use system media key
# Fallback: try specific app
active_app = self._get_active_app()
if active_app == "Spotify":
self._run_osascript('tell application "Spotify" to play')
return True
elif active_app == "Music":
self._run_osascript('tell application "Music" to play')
return True
# Use media key simulation
result = subprocess.run(
["osascript", "-e", 'tell application "System Events" to key code 49'],
capture_output=True,
)
return result.returncode == 0
async def pause(self) -> bool:
"""Pause playback."""
active_app = self._get_active_app()
if active_app == "Spotify":
self._run_osascript('tell application "Spotify" to pause')
return True
elif active_app == "Music":
self._run_osascript('tell application "Music" to pause')
return True
return False
async def stop(self) -> bool:
"""Stop playback."""
active_app = self._get_active_app()
if active_app == "Spotify":
self._run_osascript('tell application "Spotify" to pause')
return True
elif active_app == "Music":
self._run_osascript('tell application "Music" to stop')
return True
return False
async def next_track(self) -> bool:
"""Skip to next track."""
active_app = self._get_active_app()
if active_app == "Spotify":
self._run_osascript('tell application "Spotify" to next track')
return True
elif active_app == "Music":
self._run_osascript('tell application "Music" to next track')
return True
return False
async def previous_track(self) -> bool:
"""Go to previous track."""
active_app = self._get_active_app()
if active_app == "Spotify":
self._run_osascript('tell application "Spotify" to previous track')
return True
elif active_app == "Music":
self._run_osascript('tell application "Music" to previous track')
return True
return False
async def set_volume(self, volume: int) -> bool:
"""Set system volume."""
# osascript returns empty string on success and None on failure (the
# _run_osascript helper catches subprocess errors). The previous
# `result is not None or True` always returned True regardless of
# outcome — surface real failures so the route can return 503.
result = self._run_osascript(f"set volume output volume {int(volume)}")
return result is not None
async def toggle_mute(self) -> bool:
"""Toggle mute state."""
_, current_mute = self._get_volume()
new_mute = not current_mute
self._run_osascript(f"set volume output muted {str(new_mute).lower()}")
return new_mute
async def seek(self, position: float) -> bool:
"""Seek to position in seconds."""
active_app = self._get_active_app()
if active_app == "Spotify":
self._run_osascript(
f'tell application "Spotify" to set player position to {position}'
)
return True
elif active_app == "Music":
self._run_osascript(
f'tell application "Music" to set player position to {position}'
)
return True
return False
async def open_file(self, file_path: str) -> bool:
"""Open a media file with the default system player (macOS).
Uses the 'open' command to open the file with the default application.
Args:
file_path: Absolute path to the media file
Returns:
True if successful, False otherwise
"""
try:
process = await asyncio.create_subprocess_exec(
'open', file_path,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await process.wait()
logger.info(f"Opened file with default player: {file_path}")
return True
except Exception as e:
logger.error(f"Failed to open file {file_path}: {e}")
return False