- Fix CORS: set allow_credentials=False (token auth, not cookies) - Add threading.Lock for position cache thread safety - Add shutdown_executor() for clean ThreadPoolExecutor cleanup - Dedicated ThreadPoolExecutors for script/callback execution - Fix Mutagen file handle leaks with try/finally close - Reduce idle WebSocket polling (0.5s → 2.0s when no clients) - Add :focus-visible styles for playback control buttons - Add aria-label to icon-only header buttons - Dynamic album art alt text for screen readers - Persist MDI icon cache to localStorage Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
700 lines
29 KiB
Python
700 lines
29 KiB
Python
"""Windows media controller using WinRT APIs."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import time as _time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import Optional, Any
|
|
|
|
from ..models import MediaState, MediaStatus
|
|
from .media_controller import MediaController
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Thread pool for WinRT operations (they don't play well with asyncio)
|
|
_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="winrt")
|
|
|
|
# Global storage for current album art (as bytes)
|
|
_current_album_art_bytes: bytes | None = None
|
|
|
|
# Lock protecting _position_cache and _track_skip_pending from concurrent access
|
|
_position_lock = threading.Lock()
|
|
|
|
# Global storage for position tracking
|
|
_position_cache = {
|
|
"track_id": "",
|
|
"base_position": 0.0,
|
|
"base_time": 0.0,
|
|
"is_playing": False,
|
|
"duration": 0.0,
|
|
}
|
|
# Flag to force position to 0 after track skip (until title changes)
|
|
_track_skip_pending = {
|
|
"active": False,
|
|
"old_title": "",
|
|
"skip_time": 0.0,
|
|
"grace_until": 0.0, # After title changes, ignore stale SMTC positions
|
|
"stale_pos": -999, # The stale SMTC position we're ignoring
|
|
}
|
|
|
|
|
|
def get_current_album_art() -> bytes | None:
|
|
"""Get the current album art bytes."""
|
|
return _current_album_art_bytes
|
|
|
|
# Windows-specific imports
|
|
try:
|
|
from winsdk.windows.media.control import (
|
|
GlobalSystemMediaTransportControlsSessionManager as MediaManager,
|
|
GlobalSystemMediaTransportControlsSessionPlaybackStatus as PlaybackStatus,
|
|
)
|
|
|
|
WINSDK_AVAILABLE = True
|
|
except ImportError:
|
|
WINSDK_AVAILABLE = False
|
|
logger.warning("winsdk not available")
|
|
|
|
# Volume control imports
|
|
PYCAW_AVAILABLE = False
|
|
_volume_control = None
|
|
_configured_device_name: str | None = None
|
|
|
|
try:
|
|
from ctypes import cast, POINTER
|
|
from comtypes import CLSCTX_ALL, CoInitialize, CoUninitialize
|
|
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
|
|
|
|
import warnings
|
|
# Suppress pycaw warnings about missing device properties
|
|
warnings.filterwarnings("ignore", category=UserWarning, module="pycaw")
|
|
|
|
def _get_all_audio_devices() -> list[dict[str, str]]:
|
|
"""Get list of all audio output devices."""
|
|
devices = []
|
|
try:
|
|
# Use pycaw's GetAllDevices which handles property retrieval
|
|
all_devices = AudioUtilities.GetAllDevices()
|
|
for device in all_devices:
|
|
# Only include render (output) devices with valid names
|
|
# Render devices have IDs starting with {0.0.0
|
|
if device.FriendlyName and device.id and device.id.startswith("{0.0.0"):
|
|
devices.append({
|
|
"id": device.id,
|
|
"name": device.FriendlyName,
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error enumerating audio devices: {e}")
|
|
return devices
|
|
|
|
def _find_device_by_name(device_name: str):
|
|
"""Find an audio device by its friendly name (partial match).
|
|
|
|
Returns the AudioDevice wrapper for the matched device.
|
|
"""
|
|
try:
|
|
# Get all devices and find matching one
|
|
all_devices = AudioUtilities.GetAllDevices()
|
|
for device in all_devices:
|
|
if device.FriendlyName and device_name.lower() in device.FriendlyName.lower():
|
|
logger.info(f"Found audio device: {device.FriendlyName}")
|
|
return device
|
|
except Exception as e:
|
|
logger.error(f"Error finding device by name: {e}")
|
|
return None
|
|
|
|
def _init_volume_control(device_name: str | None = None):
|
|
"""Initialize volume control interface.
|
|
|
|
Args:
|
|
device_name: Name of the audio device to control (partial match).
|
|
If None, uses the default audio device.
|
|
"""
|
|
global _volume_control, _configured_device_name
|
|
if _volume_control is not None and device_name == _configured_device_name:
|
|
return _volume_control
|
|
|
|
_configured_device_name = device_name
|
|
|
|
try:
|
|
if device_name:
|
|
# Find specific device by name
|
|
device = _find_device_by_name(device_name)
|
|
if device is None:
|
|
logger.warning(f"Audio device '{device_name}' not found, using default")
|
|
device = AudioUtilities.GetSpeakers()
|
|
else:
|
|
# Use default device
|
|
device = AudioUtilities.GetSpeakers()
|
|
|
|
if hasattr(device, 'Activate'):
|
|
interface = device.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
|
|
elif hasattr(device, '_dev'):
|
|
interface = device._dev.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
|
|
else:
|
|
logger.warning("Could not activate audio device")
|
|
return None
|
|
|
|
_volume_control = cast(interface, POINTER(IAudioEndpointVolume))
|
|
return _volume_control
|
|
except Exception as e:
|
|
logger.error(f"Volume control init error: {e}")
|
|
return None
|
|
|
|
PYCAW_AVAILABLE = True
|
|
except ImportError as e:
|
|
logger.warning(f"pycaw not available: {e}")
|
|
|
|
def _get_all_audio_devices() -> list[dict[str, str]]:
|
|
return []
|
|
|
|
def _find_device_by_name(device_name: str):
|
|
return None
|
|
|
|
def _init_volume_control(device_name: str | None = None):
|
|
return None
|
|
|
|
WINDOWS_AVAILABLE = WINSDK_AVAILABLE
|
|
|
|
|
|
def _sync_get_media_status() -> dict[str, Any]:
|
|
"""Synchronously get media status (runs in thread pool)."""
|
|
import asyncio
|
|
|
|
result = {
|
|
"state": "idle",
|
|
"title": None,
|
|
"artist": None,
|
|
"album": None,
|
|
"duration": None,
|
|
"position": None,
|
|
"source": None,
|
|
}
|
|
|
|
try:
|
|
# Create a new event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
# Get media session manager
|
|
manager = loop.run_until_complete(MediaManager.request_async())
|
|
if manager is None:
|
|
return result
|
|
|
|
session = _find_best_session(manager, loop)
|
|
if session is None:
|
|
return result
|
|
|
|
# Get playback status
|
|
playback_info = session.get_playback_info()
|
|
if playback_info:
|
|
status = playback_info.playback_status
|
|
if status == PlaybackStatus.PLAYING:
|
|
result["state"] = "playing"
|
|
elif status == PlaybackStatus.PAUSED:
|
|
result["state"] = "paused"
|
|
elif status == PlaybackStatus.STOPPED:
|
|
result["state"] = "stopped"
|
|
|
|
# Get media properties FIRST (needed for track ID)
|
|
media_props = loop.run_until_complete(
|
|
session.try_get_media_properties_async()
|
|
)
|
|
if media_props:
|
|
result["title"] = media_props.title or None
|
|
result["artist"] = media_props.artist or None
|
|
result["album"] = media_props.album_title or None
|
|
|
|
# Get timeline
|
|
timeline = session.get_timeline_properties()
|
|
if timeline:
|
|
try:
|
|
# end_time and position are datetime.timedelta objects
|
|
end_time = timeline.end_time
|
|
position = timeline.position
|
|
|
|
# Get duration
|
|
if hasattr(end_time, 'total_seconds'):
|
|
duration = end_time.total_seconds()
|
|
# Sanity check: duration should be positive and reasonable (< 24 hours)
|
|
if 0 < duration < 86400:
|
|
result["duration"] = duration
|
|
|
|
# Get position from SMTC and interpolate for smooth updates
|
|
if hasattr(position, 'total_seconds'):
|
|
smtc_pos = position.total_seconds()
|
|
current_time = _time.time()
|
|
is_playing = result["state"] == "playing"
|
|
current_title = result.get('title', '')
|
|
|
|
with _position_lock:
|
|
# Check if track skip is pending and title changed
|
|
skip_just_completed = False
|
|
if _track_skip_pending["active"]:
|
|
if current_title and current_title != _track_skip_pending["old_title"]:
|
|
# Title changed - clear the skip flag and start grace period
|
|
_track_skip_pending["active"] = False
|
|
_track_skip_pending["old_title"] = ""
|
|
_track_skip_pending["grace_until"] = current_time + 300.0 # Long grace period
|
|
_track_skip_pending["stale_pos"] = -999 # Reset stale position tracking
|
|
skip_just_completed = True
|
|
# Reset position cache for new track
|
|
new_track_id = f"{current_title}:{result.get('artist', '')}:{result.get('duration', 0)}"
|
|
_position_cache["track_id"] = new_track_id
|
|
_position_cache["base_position"] = 0.0
|
|
_position_cache["base_time"] = current_time
|
|
_position_cache["last_smtc_pos"] = -999 # Force fresh start
|
|
_position_cache["is_playing"] = is_playing
|
|
logger.debug(f"Track skip complete, new title: {current_title}, grace until: {_track_skip_pending['grace_until']}")
|
|
elif current_time - _track_skip_pending["skip_time"] > 5.0:
|
|
# Timeout after 5 seconds
|
|
_track_skip_pending["active"] = False
|
|
logger.debug("Track skip timeout")
|
|
|
|
# Check if we're in grace period (after skip, ignore high SMTC positions)
|
|
in_grace_period = current_time < _track_skip_pending.get("grace_until", 0)
|
|
|
|
# If track skip is pending or just completed, use cached/reset position
|
|
if _track_skip_pending["active"]:
|
|
pos = 0.0
|
|
_position_cache["base_position"] = 0.0
|
|
_position_cache["base_time"] = current_time
|
|
_position_cache["is_playing"] = is_playing
|
|
elif skip_just_completed:
|
|
# Just completed skip - interpolate from 0
|
|
if is_playing:
|
|
elapsed = current_time - _position_cache["base_time"]
|
|
pos = elapsed
|
|
else:
|
|
pos = 0.0
|
|
elif in_grace_period:
|
|
# Grace period after track skip
|
|
# SMTC position is stale (from old track) and won't update until seek/pause
|
|
# We interpolate from 0 and only trust SMTC when it changes or reports low value
|
|
|
|
# Calculate interpolated position from start of new track
|
|
if is_playing:
|
|
elapsed = current_time - _position_cache.get("base_time", current_time)
|
|
interpolated_pos = _position_cache.get("base_position", 0.0) + elapsed
|
|
else:
|
|
interpolated_pos = _position_cache.get("base_position", 0.0)
|
|
|
|
# Get the stale position we've been tracking
|
|
stale_pos = _track_skip_pending.get("stale_pos", -999)
|
|
|
|
# Detect if SMTC position changed significantly from the stale value (user seeked)
|
|
smtc_changed = stale_pos >= 0 and abs(smtc_pos - stale_pos) > 3.0
|
|
|
|
# Trust SMTC if:
|
|
# 1. It reports a low position (indicating new track started)
|
|
# 2. It changed from the stale value (user seeked)
|
|
if smtc_pos < 10.0 or smtc_changed:
|
|
# SMTC is now trustworthy
|
|
_position_cache["base_position"] = smtc_pos
|
|
_position_cache["base_time"] = current_time
|
|
_position_cache["last_smtc_pos"] = smtc_pos
|
|
_position_cache["is_playing"] = is_playing
|
|
pos = smtc_pos
|
|
_track_skip_pending["grace_until"] = 0
|
|
_track_skip_pending["stale_pos"] = -999
|
|
logger.debug(f"Grace period: accepting SMTC pos {smtc_pos} (low={smtc_pos < 10}, changed={smtc_changed})")
|
|
else:
|
|
# SMTC is stale - keep interpolating
|
|
pos = interpolated_pos
|
|
# Record the stale position for change detection
|
|
if stale_pos < 0:
|
|
_track_skip_pending["stale_pos"] = smtc_pos
|
|
# Keep grace period active indefinitely while SMTC is stale
|
|
_track_skip_pending["grace_until"] = current_time + 300.0
|
|
logger.debug(f"Grace period: SMTC stale ({smtc_pos}), using interpolated {interpolated_pos}")
|
|
else:
|
|
# Normal position tracking
|
|
# Create track ID from title + artist + duration
|
|
track_id = f"{current_title}:{result.get('artist', '')}:{result.get('duration', 0)}"
|
|
|
|
# Detect if SMTC position changed (new track, seek, or state change)
|
|
smtc_pos_changed = abs(smtc_pos - _position_cache.get("last_smtc_pos", -999)) > 0.5
|
|
track_changed = track_id != _position_cache.get("track_id", "")
|
|
|
|
if smtc_pos_changed or track_changed:
|
|
# SMTC updated - store new baseline
|
|
_position_cache["track_id"] = track_id
|
|
_position_cache["last_smtc_pos"] = smtc_pos
|
|
_position_cache["base_position"] = smtc_pos
|
|
_position_cache["base_time"] = current_time
|
|
_position_cache["is_playing"] = is_playing
|
|
pos = smtc_pos
|
|
elif is_playing:
|
|
# Interpolate position based on elapsed time
|
|
elapsed = current_time - _position_cache.get("base_time", current_time)
|
|
pos = _position_cache.get("base_position", smtc_pos) + elapsed
|
|
else:
|
|
# Paused - use base position
|
|
pos = _position_cache.get("base_position", smtc_pos)
|
|
|
|
# Update playing state
|
|
if _position_cache.get("is_playing") != is_playing:
|
|
_position_cache["base_position"] = pos if is_playing else _position_cache.get("base_position", smtc_pos)
|
|
_position_cache["base_time"] = current_time
|
|
_position_cache["is_playing"] = is_playing
|
|
|
|
# Sanity check: position should be non-negative and <= duration
|
|
if pos >= 0:
|
|
if result["duration"] and pos <= result["duration"]:
|
|
result["position"] = pos
|
|
elif result["duration"] and pos > result["duration"]:
|
|
result["position"] = result["duration"]
|
|
elif not result["duration"]:
|
|
result["position"] = pos
|
|
|
|
logger.debug(f"Timeline: duration={result['duration']}, position={result['position']}")
|
|
except Exception as e:
|
|
logger.debug(f"Timeline parse error: {e}")
|
|
|
|
# Try to get album art (requires media_props)
|
|
if media_props:
|
|
try:
|
|
thumbnail = media_props.thumbnail
|
|
if thumbnail:
|
|
stream = loop.run_until_complete(thumbnail.open_read_async())
|
|
if stream:
|
|
size = stream.size
|
|
if size > 0 and size < 10 * 1024 * 1024: # Max 10MB
|
|
from winsdk.windows.storage.streams import DataReader
|
|
reader = DataReader(stream)
|
|
loop.run_until_complete(reader.load_async(size))
|
|
buffer = bytearray(size)
|
|
reader.read_bytes(buffer)
|
|
reader.close()
|
|
stream.close()
|
|
|
|
global _current_album_art_bytes
|
|
_current_album_art_bytes = bytes(buffer)
|
|
result["album_art_url"] = "/api/media/artwork"
|
|
except Exception as e:
|
|
logger.debug(f"Failed to get album art: {e}")
|
|
|
|
result["source"] = session.source_app_user_model_id
|
|
|
|
finally:
|
|
loop.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting media status: {e}")
|
|
|
|
return result
|
|
|
|
|
|
def _find_best_session(manager, loop):
|
|
"""Find the best media session to control."""
|
|
# First try the current session
|
|
session = manager.get_current_session()
|
|
|
|
# Log all available sessions for debugging
|
|
sessions = manager.get_sessions()
|
|
if sessions:
|
|
logger.debug(f"Total sessions available: {sessions.size}")
|
|
for i in range(sessions.size):
|
|
s = sessions.get_at(i)
|
|
if s:
|
|
playback_info = s.get_playback_info()
|
|
status_name = "unknown"
|
|
if playback_info:
|
|
status_name = str(playback_info.playback_status)
|
|
logger.debug(f" Session {i}: {s.source_app_user_model_id} - status: {status_name}")
|
|
|
|
# If no current session, try to find any active session
|
|
if session is None:
|
|
if sessions and sessions.size > 0:
|
|
# Find a playing session, or use the first one
|
|
for i in range(sessions.size):
|
|
s = sessions.get_at(i)
|
|
if s:
|
|
playback_info = s.get_playback_info()
|
|
if playback_info and playback_info.playback_status == PlaybackStatus.PLAYING:
|
|
session = s
|
|
break
|
|
# If no playing session found, use the first available one
|
|
if session is None and sessions.size > 0:
|
|
session = sessions.get_at(0)
|
|
|
|
return session
|
|
|
|
|
|
def _sync_media_command(command: str) -> bool:
|
|
"""Synchronously execute a media command (runs in thread pool)."""
|
|
import asyncio
|
|
|
|
try:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
manager = loop.run_until_complete(MediaManager.request_async())
|
|
if manager is None:
|
|
return False
|
|
|
|
session = _find_best_session(manager, loop)
|
|
if session is None:
|
|
return False
|
|
|
|
if command == "play":
|
|
return loop.run_until_complete(session.try_play_async())
|
|
elif command == "pause":
|
|
return loop.run_until_complete(session.try_pause_async())
|
|
elif command == "stop":
|
|
return loop.run_until_complete(session.try_stop_async())
|
|
elif command == "next":
|
|
return loop.run_until_complete(session.try_skip_next_async())
|
|
elif command == "previous":
|
|
return loop.run_until_complete(session.try_skip_previous_async())
|
|
|
|
return False
|
|
finally:
|
|
loop.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing media command {command}: {e}")
|
|
return False
|
|
|
|
|
|
def _sync_seek(position: float) -> bool:
|
|
"""Synchronously seek to position."""
|
|
import asyncio
|
|
|
|
try:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
manager = loop.run_until_complete(MediaManager.request_async())
|
|
if manager is None:
|
|
return False
|
|
|
|
session = _find_best_session(manager, loop)
|
|
if session is None:
|
|
return False
|
|
|
|
position_ticks = int(position * 10_000_000)
|
|
return loop.run_until_complete(
|
|
session.try_change_playback_position_async(position_ticks)
|
|
)
|
|
finally:
|
|
loop.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error seeking: {e}")
|
|
return False
|
|
|
|
|
|
def shutdown_executor() -> None:
|
|
"""Shut down the WinRT thread pool executor."""
|
|
_executor.shutdown(wait=False)
|
|
|
|
|
|
class WindowsMediaController(MediaController):
|
|
"""Media controller for Windows using WinRT and pycaw."""
|
|
|
|
def __init__(self, audio_device: str | None = None):
|
|
"""Initialize the Windows media controller.
|
|
|
|
Args:
|
|
audio_device: Name of the audio device to control (partial match).
|
|
If None, uses the default audio device.
|
|
"""
|
|
if not WINDOWS_AVAILABLE:
|
|
raise RuntimeError(
|
|
"Windows media control requires winsdk, pycaw, and comtypes packages"
|
|
)
|
|
self._volume_interface = None
|
|
self._volume_init_attempted = False
|
|
self._audio_device = audio_device
|
|
|
|
def _get_volume_interface(self):
|
|
"""Get the audio endpoint volume interface."""
|
|
if not self._volume_init_attempted:
|
|
self._volume_init_attempted = True
|
|
self._volume_interface = _init_volume_control(self._audio_device)
|
|
if self._volume_interface:
|
|
device_info = f" (device: {self._audio_device})" if self._audio_device else " (default device)"
|
|
logger.info(f"Volume control initialized successfully{device_info}")
|
|
else:
|
|
logger.warning("Volume control not available")
|
|
return self._volume_interface
|
|
|
|
@staticmethod
|
|
def get_audio_devices() -> list[dict[str, str]]:
|
|
"""Get list of available audio output devices."""
|
|
return _get_all_audio_devices()
|
|
|
|
async def get_status(self) -> MediaStatus:
|
|
"""Get current media playback status."""
|
|
status = MediaStatus()
|
|
|
|
# Get volume info (synchronous, fast)
|
|
volume_if = self._get_volume_interface()
|
|
if volume_if:
|
|
try:
|
|
volume_scalar = volume_if.GetMasterVolumeLevelScalar()
|
|
status.volume = int(volume_scalar * 100)
|
|
status.muted = bool(volume_if.GetMute())
|
|
except Exception as e:
|
|
logger.debug(f"Failed to get volume: {e}")
|
|
|
|
# Get media info in thread pool (avoids asyncio/WinRT issues)
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
media_info = await asyncio.wait_for(
|
|
loop.run_in_executor(_executor, _sync_get_media_status),
|
|
timeout=5.0
|
|
)
|
|
|
|
state_map = {
|
|
"playing": MediaState.PLAYING,
|
|
"paused": MediaState.PAUSED,
|
|
"stopped": MediaState.STOPPED,
|
|
"idle": MediaState.IDLE,
|
|
}
|
|
status.state = state_map.get(media_info.get("state", "idle"), MediaState.IDLE)
|
|
status.title = media_info.get("title")
|
|
status.artist = media_info.get("artist")
|
|
status.album = media_info.get("album")
|
|
status.album_art_url = media_info.get("album_art_url")
|
|
status.duration = media_info.get("duration")
|
|
status.position = media_info.get("position")
|
|
status.source = media_info.get("source")
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Media status request timed out")
|
|
status.state = MediaState.IDLE
|
|
except Exception as e:
|
|
logger.error(f"Error getting media status: {e}")
|
|
status.state = MediaState.IDLE
|
|
|
|
return status
|
|
|
|
async def _run_command(self, command: str) -> bool:
|
|
"""Run a media command in the thread pool."""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
return await asyncio.wait_for(
|
|
loop.run_in_executor(_executor, _sync_media_command, command),
|
|
timeout=5.0
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Media command {command} timed out")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error running media command {command}: {e}")
|
|
return False
|
|
|
|
async def play(self) -> bool:
|
|
"""Resume playback."""
|
|
return await self._run_command("play")
|
|
|
|
async def pause(self) -> bool:
|
|
"""Pause playback."""
|
|
return await self._run_command("pause")
|
|
|
|
async def stop(self) -> bool:
|
|
"""Stop playback."""
|
|
return await self._run_command("stop")
|
|
|
|
async def next_track(self) -> bool:
|
|
"""Skip to next track."""
|
|
# Get current title before skipping
|
|
try:
|
|
status = await self.get_status()
|
|
old_title = status.title or ""
|
|
except Exception:
|
|
old_title = ""
|
|
|
|
result = await self._run_command("next")
|
|
if result:
|
|
with _position_lock:
|
|
_track_skip_pending["active"] = True
|
|
_track_skip_pending["old_title"] = old_title
|
|
_track_skip_pending["skip_time"] = _time.time()
|
|
logger.debug(f"Track skip initiated, old title: {old_title}")
|
|
return result
|
|
|
|
async def previous_track(self) -> bool:
|
|
"""Go to previous track."""
|
|
# Get current title before skipping
|
|
try:
|
|
status = await self.get_status()
|
|
old_title = status.title or ""
|
|
except Exception:
|
|
old_title = ""
|
|
|
|
result = await self._run_command("previous")
|
|
if result:
|
|
with _position_lock:
|
|
_track_skip_pending["active"] = True
|
|
_track_skip_pending["old_title"] = old_title
|
|
_track_skip_pending["skip_time"] = _time.time()
|
|
logger.debug(f"Track skip initiated, old title: {old_title}")
|
|
return result
|
|
|
|
async def set_volume(self, volume: int) -> bool:
|
|
"""Set system volume."""
|
|
volume_if = self._get_volume_interface()
|
|
if volume_if is None:
|
|
return False
|
|
try:
|
|
volume_if.SetMasterVolumeLevelScalar(volume / 100.0, None)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to set volume: {e}")
|
|
return False
|
|
|
|
async def toggle_mute(self) -> bool:
|
|
"""Toggle mute state."""
|
|
volume_if = self._get_volume_interface()
|
|
if volume_if is None:
|
|
return False
|
|
try:
|
|
current_mute = bool(volume_if.GetMute())
|
|
volume_if.SetMute(not current_mute, None)
|
|
return not current_mute
|
|
except Exception as e:
|
|
logger.error(f"Failed to toggle mute: {e}")
|
|
return False
|
|
|
|
async def seek(self, position: float) -> bool:
|
|
"""Seek to position in seconds."""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
return await asyncio.wait_for(
|
|
loop.run_in_executor(_executor, _sync_seek, position),
|
|
timeout=5.0
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Seek command timed out")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to seek: {e}")
|
|
return False
|
|
|
|
async def open_file(self, file_path: str) -> bool:
|
|
"""Open a media file with the default system player (Windows).
|
|
|
|
Uses os.startfile() to open the file with the default application.
|
|
|
|
Args:
|
|
file_path: Absolute path to the media file
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
import os
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, lambda: os.startfile(file_path))
|
|
logger.info(f"Opened file with default player: {file_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to open file {file_path}: {e}")
|
|
return False
|