FastAPI REST API server for controlling system-wide media playback on Windows, Linux, macOS, and Android. Features: - Play/Pause/Stop/Next/Previous track controls - Volume control and mute - Seek within tracks - Current track info (title, artist, album, artwork) - WebSocket real-time status updates - Script execution API - Token-based authentication - Cross-platform support Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
297 lines
10 KiB
Python
297 lines
10 KiB
Python
"""macOS media controller using AppleScript and system commands."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import subprocess
|
|
import json
|
|
from typing import Optional
|
|
|
|
from ..models import MediaState, MediaStatus
|
|
from .media_controller import MediaController
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MacOSMediaController(MediaController):
|
|
"""Media controller for macOS using osascript and system commands."""
|
|
|
|
def _run_osascript(self, script: str) -> Optional[str]:
|
|
"""Run an AppleScript and return the output."""
|
|
try:
|
|
result = subprocess.run(
|
|
["osascript", "-e", script],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"osascript error: {e}")
|
|
return None
|
|
|
|
def _get_active_app(self) -> Optional[str]:
|
|
"""Get the currently active media application."""
|
|
# Check common media apps in order of preference
|
|
apps = ["Spotify", "Music", "TV", "VLC", "QuickTime Player"]
|
|
|
|
for app in apps:
|
|
script = f'''
|
|
tell application "System Events"
|
|
if exists (processes where name is "{app}") then
|
|
return "{app}"
|
|
end if
|
|
end tell
|
|
return ""
|
|
'''
|
|
result = self._run_osascript(script)
|
|
if result:
|
|
return result
|
|
|
|
return None
|
|
|
|
def _get_spotify_info(self) -> dict:
|
|
"""Get playback info from Spotify."""
|
|
script = '''
|
|
tell application "Spotify"
|
|
if player state is playing then
|
|
set currentState to "playing"
|
|
else if player state is paused then
|
|
set currentState to "paused"
|
|
else
|
|
set currentState to "stopped"
|
|
end if
|
|
|
|
try
|
|
set trackName to name of current track
|
|
set artistName to artist of current track
|
|
set albumName to album of current track
|
|
set trackDuration to duration of current track
|
|
set trackPosition to player position
|
|
set artUrl to artwork url of current track
|
|
on error
|
|
set trackName to ""
|
|
set artistName to ""
|
|
set albumName to ""
|
|
set trackDuration to 0
|
|
set trackPosition to 0
|
|
set artUrl to ""
|
|
end try
|
|
|
|
return currentState & "|" & trackName & "|" & artistName & "|" & albumName & "|" & trackDuration & "|" & trackPosition & "|" & artUrl
|
|
end tell
|
|
'''
|
|
result = self._run_osascript(script)
|
|
if result:
|
|
parts = result.split("|")
|
|
if len(parts) >= 7:
|
|
return {
|
|
"state": parts[0],
|
|
"title": parts[1] or None,
|
|
"artist": parts[2] or None,
|
|
"album": parts[3] or None,
|
|
"duration": float(parts[4]) / 1000 if parts[4] else None, # ms to seconds
|
|
"position": float(parts[5]) if parts[5] else None,
|
|
"art_url": parts[6] or None,
|
|
}
|
|
return {}
|
|
|
|
def _get_music_info(self) -> dict:
|
|
"""Get playback info from Apple Music."""
|
|
script = '''
|
|
tell application "Music"
|
|
if player state is playing then
|
|
set currentState to "playing"
|
|
else if player state is paused then
|
|
set currentState to "paused"
|
|
else
|
|
set currentState to "stopped"
|
|
end if
|
|
|
|
try
|
|
set trackName to name of current track
|
|
set artistName to artist of current track
|
|
set albumName to album of current track
|
|
set trackDuration to duration of current track
|
|
set trackPosition to player position
|
|
on error
|
|
set trackName to ""
|
|
set artistName to ""
|
|
set albumName to ""
|
|
set trackDuration to 0
|
|
set trackPosition to 0
|
|
end try
|
|
|
|
return currentState & "|" & trackName & "|" & artistName & "|" & albumName & "|" & trackDuration & "|" & trackPosition
|
|
end tell
|
|
'''
|
|
result = self._run_osascript(script)
|
|
if result:
|
|
parts = result.split("|")
|
|
if len(parts) >= 6:
|
|
return {
|
|
"state": parts[0],
|
|
"title": parts[1] or None,
|
|
"artist": parts[2] or None,
|
|
"album": parts[3] or None,
|
|
"duration": float(parts[4]) if parts[4] else None,
|
|
"position": float(parts[5]) if parts[5] else None,
|
|
}
|
|
return {}
|
|
|
|
def _get_volume(self) -> tuple[int, bool]:
|
|
"""Get system volume and mute state."""
|
|
try:
|
|
# Get volume level
|
|
result = self._run_osascript("output volume of (get volume settings)")
|
|
volume = int(result) if result else 100
|
|
|
|
# Get mute state
|
|
result = self._run_osascript("output muted of (get volume settings)")
|
|
muted = result == "true"
|
|
|
|
return volume, muted
|
|
except Exception as e:
|
|
logger.error(f"Failed to get volume: {e}")
|
|
return 100, False
|
|
|
|
async def get_status(self) -> MediaStatus:
|
|
"""Get current media playback status."""
|
|
status = MediaStatus()
|
|
|
|
# Get system volume
|
|
volume, muted = self._get_volume()
|
|
status.volume = volume
|
|
status.muted = muted
|
|
|
|
# Try to get info from active media app
|
|
active_app = self._get_active_app()
|
|
if active_app is None:
|
|
status.state = MediaState.IDLE
|
|
return status
|
|
|
|
status.source = active_app
|
|
|
|
if active_app == "Spotify":
|
|
info = self._get_spotify_info()
|
|
elif active_app == "Music":
|
|
info = self._get_music_info()
|
|
else:
|
|
info = {}
|
|
|
|
if info:
|
|
state = info.get("state", "stopped")
|
|
if state == "playing":
|
|
status.state = MediaState.PLAYING
|
|
elif state == "paused":
|
|
status.state = MediaState.PAUSED
|
|
else:
|
|
status.state = MediaState.STOPPED
|
|
|
|
status.title = info.get("title")
|
|
status.artist = info.get("artist")
|
|
status.album = info.get("album")
|
|
status.duration = info.get("duration")
|
|
status.position = info.get("position")
|
|
status.album_art_url = info.get("art_url")
|
|
else:
|
|
status.state = MediaState.IDLE
|
|
|
|
return status
|
|
|
|
async def play(self) -> bool:
|
|
"""Resume playback using media key simulation."""
|
|
# Use system media key
|
|
script = '''
|
|
tell application "System Events"
|
|
key code 16 using {command down, option down}
|
|
end tell
|
|
'''
|
|
# Fallback: try specific app
|
|
active_app = self._get_active_app()
|
|
if active_app == "Spotify":
|
|
self._run_osascript('tell application "Spotify" to play')
|
|
return True
|
|
elif active_app == "Music":
|
|
self._run_osascript('tell application "Music" to play')
|
|
return True
|
|
|
|
# Use media key simulation
|
|
result = subprocess.run(
|
|
["osascript", "-e", 'tell application "System Events" to key code 49'],
|
|
capture_output=True,
|
|
)
|
|
return result.returncode == 0
|
|
|
|
async def pause(self) -> bool:
|
|
"""Pause playback."""
|
|
active_app = self._get_active_app()
|
|
if active_app == "Spotify":
|
|
self._run_osascript('tell application "Spotify" to pause')
|
|
return True
|
|
elif active_app == "Music":
|
|
self._run_osascript('tell application "Music" to pause')
|
|
return True
|
|
return False
|
|
|
|
async def stop(self) -> bool:
|
|
"""Stop playback."""
|
|
active_app = self._get_active_app()
|
|
if active_app == "Spotify":
|
|
self._run_osascript('tell application "Spotify" to pause')
|
|
return True
|
|
elif active_app == "Music":
|
|
self._run_osascript('tell application "Music" to stop')
|
|
return True
|
|
return False
|
|
|
|
async def next_track(self) -> bool:
|
|
"""Skip to next track."""
|
|
active_app = self._get_active_app()
|
|
if active_app == "Spotify":
|
|
self._run_osascript('tell application "Spotify" to next track')
|
|
return True
|
|
elif active_app == "Music":
|
|
self._run_osascript('tell application "Music" to next track')
|
|
return True
|
|
return False
|
|
|
|
async def previous_track(self) -> bool:
|
|
"""Go to previous track."""
|
|
active_app = self._get_active_app()
|
|
if active_app == "Spotify":
|
|
self._run_osascript('tell application "Spotify" to previous track')
|
|
return True
|
|
elif active_app == "Music":
|
|
self._run_osascript('tell application "Music" to previous track')
|
|
return True
|
|
return False
|
|
|
|
async def set_volume(self, volume: int) -> bool:
|
|
"""Set system volume."""
|
|
result = self._run_osascript(f"set volume output volume {volume}")
|
|
return result is not None or True # osascript returns empty on success
|
|
|
|
async def toggle_mute(self) -> bool:
|
|
"""Toggle mute state."""
|
|
_, current_mute = self._get_volume()
|
|
new_mute = not current_mute
|
|
self._run_osascript(f"set volume output muted {str(new_mute).lower()}")
|
|
return new_mute
|
|
|
|
async def seek(self, position: float) -> bool:
|
|
"""Seek to position in seconds."""
|
|
active_app = self._get_active_app()
|
|
if active_app == "Spotify":
|
|
self._run_osascript(
|
|
f'tell application "Spotify" to set player position to {position}'
|
|
)
|
|
return True
|
|
elif active_app == "Music":
|
|
self._run_osascript(
|
|
f'tell application "Music" to set player position to {position}'
|
|
)
|
|
return True
|
|
return False
|