Files
media-player-server/media_server/services/browser_service.py
alexei.dolgolyov 84b985e6df Backend optimizations, frontend optimizations, and UI design improvements
Backend optimizations:
- GZip middleware for compressed responses
- Concurrent WebSocket broadcast
- Skip status polling when no clients connected
- Deduplicated token validation with caching
- Fire-and-forget HA state callbacks
- Single stat() per browser item
- Metadata caching (LRU)
- M3U playlist optimization
- Autostart setup (Task Scheduler + hidden VBS launcher)

Frontend code optimizations:
- Fix thumbnail blob URL memory leak
- Fix WebSocket ping interval leak on reconnect
- Skip artwork re-fetch when same track playing
- Deduplicate volume slider logic
- Extract magic numbers into named constants
- Standardize error handling with toast notifications
- Cache play/pause SVG constants
- Loading state management for async buttons
- Request deduplication for rapid clicks
- Cache 30+ DOM element references
- Deduplicate volume updates over WebSocket

Frontend design improvements:
- Progress bar seek thumb and hover expansion
- Custom themed scrollbars
- Toast notification accent border strips
- Keyboard focus-visible states
- Album art ambient glow effect
- Animated sliding tab indicator
- Mini-player top progress line
- Empty state SVG illustrations
- Responsive tablet breakpoint (601-900px)
- Horizontal player layout on wide screens (>900px)
- Glassmorphism mini-player with backdrop blur
- Vinyl spin animation (toggleable)
- Table horizontal scroll on narrow screens

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 20:38:35 +03:00

330 lines
12 KiB
Python

"""Browser service for media file browsing and path validation."""
import logging
import os
import stat as stat_module
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..config import settings
# Directory listing cache: {resolved_path_str: (timestamp, all_items)}
_dir_cache: dict[str, tuple[float, list[dict]]] = {}
DIR_CACHE_TTL = 300 # 5 minutes
# Media info cache: {(file_path_str, mtime): {duration, bitrate, title}}
_media_info_cache: dict[tuple[str, float], dict] = {}
_MEDIA_INFO_CACHE_MAX = 5000
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
logger = logging.getLogger(__name__)
# Media file extensions
AUDIO_EXTENSIONS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".aac", ".wma", ".opus"}
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".webm", ".flv"}
MEDIA_EXTENSIONS = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS
class BrowserService:
"""Service for browsing media files with path validation."""
@staticmethod
def validate_path(folder_id: str, requested_path: str) -> Path:
"""Validate and resolve a path within an allowed folder.
Args:
folder_id: ID of the configured media folder.
requested_path: Path to validate (relative to folder root or absolute).
Returns:
Resolved absolute Path object.
Raises:
ValueError: If folder_id invalid or path traversal attempted.
FileNotFoundError: If path does not exist.
"""
# Get folder config
if folder_id not in settings.media_folders:
raise ValueError(f"Media folder '{folder_id}' not configured")
folder_config = settings.media_folders[folder_id]
if not folder_config.enabled:
raise ValueError(f"Media folder '{folder_id}' is disabled")
# Get base path
base_path = Path(folder_config.path).resolve()
if not base_path.exists():
raise FileNotFoundError(f"Media folder path does not exist: {base_path}")
if not base_path.is_dir():
raise ValueError(f"Media folder path is not a directory: {base_path}")
# Handle relative vs absolute paths
if requested_path.startswith("/") or requested_path.startswith("\\"):
# Relative to folder root (remove leading slash)
requested_path = requested_path.lstrip("/\\")
# Build and resolve full path
if requested_path:
full_path = (base_path / requested_path).resolve()
else:
full_path = base_path
# Security check: Ensure resolved path is within base path
try:
full_path.relative_to(base_path)
except ValueError:
logger.warning(
f"Path traversal attempt detected: {requested_path} "
f"(resolved to {full_path}, base: {base_path})"
)
raise ValueError("Path traversal not allowed")
# Check if path exists
if not full_path.exists():
raise FileNotFoundError(f"Path does not exist: {requested_path}")
return full_path
@staticmethod
def is_media_file(path: Path) -> bool:
"""Check if a file is a media file based on extension.
Args:
path: Path to check.
Returns:
True if file is a media file, False otherwise.
"""
return path.suffix.lower() in MEDIA_EXTENSIONS
@staticmethod
def get_file_type(path: Path) -> str:
"""Get the file type (folder, audio, video, other).
Args:
path: Path to check.
Returns:
File type string: "folder", "audio", "video", or "other".
"""
if path.is_dir():
return "folder"
suffix = path.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
return "audio"
elif suffix in VIDEO_EXTENSIONS:
return "video"
else:
return "other"
@staticmethod
def get_media_info(file_path: Path, mtime: float | None = None) -> dict:
"""Get duration, bitrate, and title of a media file (header-only read).
Results are cached by (path, mtime) to avoid re-reading unchanged files.
Args:
file_path: Path to the media file.
mtime: File modification time (avoids an extra stat call).
Returns:
Dict with 'duration' (float or None), 'bitrate' (int or None),
and 'title' (str or None).
"""
result = {"duration": None, "bitrate": None, "title": None}
if not HAS_MUTAGEN:
return result
# Use mtime-based cache to skip mutagen reads for unchanged files
if mtime is None:
try:
mtime = file_path.stat().st_mtime
except (OSError, PermissionError):
pass
if mtime is not None:
cache_key = (str(file_path), mtime)
cached = _media_info_cache.get(cache_key)
if cached is not None:
return cached
try:
audio = MutagenFile(str(file_path), easy=True)
if audio is not None and hasattr(audio, "info"):
if hasattr(audio.info, "length"):
result["duration"] = round(audio.info.length, 2)
if hasattr(audio.info, "bitrate") and audio.info.bitrate:
result["bitrate"] = audio.info.bitrate
if audio is not None and hasattr(audio, "tags") and audio.tags:
tags = audio.tags
title = None
artist = None
if "title" in tags:
title = tags["title"][0] if isinstance(tags["title"], list) else tags["title"]
if "artist" in tags:
artist = tags["artist"][0] if isinstance(tags["artist"], list) else tags["artist"]
if artist and title:
result["title"] = f"{artist} \u2013 {title}"
elif title:
result["title"] = title
except Exception:
pass
# Cache result (evict oldest entries if cache is full)
if mtime is not None:
if len(_media_info_cache) >= _MEDIA_INFO_CACHE_MAX:
# Remove oldest ~20% of entries
to_remove = list(_media_info_cache.keys())[:_MEDIA_INFO_CACHE_MAX // 5]
for k in to_remove:
del _media_info_cache[k]
_media_info_cache[(str(file_path), mtime)] = result
return result
@staticmethod
def browse_directory(
folder_id: str,
path: str = "",
offset: int = 0,
limit: int = 100,
nocache: bool = False,
) -> dict:
"""Browse a directory and return items with metadata.
Args:
folder_id: ID of the configured media folder.
path: Path to browse (relative to folder root).
offset: Pagination offset (default: 0).
limit: Maximum items to return (default: 100).
Returns:
Dictionary with:
- current_path: Current path (relative to folder root)
- parent_path: Parent path (None if at root)
- items: List of file/folder items
- total: Total number of items
- offset: Current offset
- limit: Current limit
- folder_id: Folder ID
Raises:
ValueError: If path validation fails.
FileNotFoundError: If path does not exist.
"""
# Validate path
full_path = BrowserService.validate_path(folder_id, path)
# Get base path for relative path calculation
folder_config = settings.media_folders[folder_id]
base_path = Path(folder_config.path).resolve()
# Check if it's a directory
if not full_path.is_dir():
raise ValueError(f"Path is not a directory: {path}")
# Calculate relative path
try:
relative_path = full_path.relative_to(base_path)
current_path = "/" + str(relative_path).replace("\\", "/") if str(relative_path) != "." else "/"
except ValueError:
current_path = "/"
# Calculate parent path
if full_path == base_path:
parent_path = None
else:
parent_relative = full_path.parent.relative_to(base_path)
parent_path = "/" + str(parent_relative).replace("\\", "/") if str(parent_relative) != "." else "/"
# List directory contents (with caching)
try:
cache_key = str(full_path)
now = time.monotonic()
# Check cache
if not nocache and cache_key in _dir_cache:
cached_time, cached_items = _dir_cache[cache_key]
if now - cached_time < DIR_CACHE_TTL:
all_items = cached_items
else:
del _dir_cache[cache_key]
all_items = None
else:
all_items = None
# Enumerate directory if not cached
if all_items is None:
all_items = []
for item in full_path.iterdir():
if item.name.startswith("."):
continue
# Single stat() call per item — reuse for type check and metadata
try:
st = item.stat()
except (OSError, PermissionError):
continue
if stat_module.S_ISDIR(st.st_mode):
file_type = "folder"
else:
suffix = item.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
file_type = "audio"
elif suffix in VIDEO_EXTENSIONS:
file_type = "video"
else:
continue
all_items.append({
"name": item.name,
"type": file_type,
"is_media": file_type in ("audio", "video"),
"_size": st.st_size,
"_mtime": st.st_mtime,
})
all_items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower()))
_dir_cache[cache_key] = (now, all_items)
# Apply pagination
total = len(all_items)
items = all_items[offset:offset + limit]
# Enrich items on the current page with metadata
for item in items:
item["size"] = item["_size"] if item["type"] != "folder" else None
item["modified"] = datetime.fromtimestamp(item["_mtime"]).isoformat()
if item["is_media"]:
item_path = full_path / item["name"]
info = BrowserService.get_media_info(item_path, item["_mtime"])
item["duration"] = info["duration"]
item["bitrate"] = info["bitrate"]
item["title"] = info["title"]
else:
item["duration"] = None
item["bitrate"] = None
item["title"] = None
return {
"folder_id": folder_id,
"current_path": current_path,
"parent_path": parent_path,
"items": items,
"total": total,
"offset": offset,
"limit": limit,
}
except PermissionError:
raise ValueError(f"Permission denied accessing path: {path}")