Files
media-player-server/media_server/services/browser_service.py
alexei.dolgolyov 98a33bca54 Tabbed UI, browse caching, and bottom mini player
- Convert stacked sections to tabbed interface (Player, Browser, Actions, Scripts, Callbacks) with localStorage persistence
- Add in-memory directory listing cache (5-min TTL) with nocache bypass for refresh
- Defer stat()/duration calls to paginated items only for faster browse
- Move mini player from top to bottom with footer padding fix
- Always show scrollbar to prevent layout shift between tabs
- Add tab localization keys (en/ru)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-09 02:34:29 +03:00

274 lines
9.1 KiB
Python

"""Browser service for media file browsing and path validation."""
import logging
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..config import settings
# Directory listing cache: {resolved_path_str: (timestamp, all_items)}
_dir_cache: dict[str, tuple[float, list[dict]]] = {}
DIR_CACHE_TTL = 300 # 5 minutes
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
logger = logging.getLogger(__name__)
# Media file extensions
AUDIO_EXTENSIONS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".aac", ".wma", ".opus"}
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".webm", ".flv"}
MEDIA_EXTENSIONS = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS
class BrowserService:
"""Service for browsing media files with path validation."""
@staticmethod
def validate_path(folder_id: str, requested_path: str) -> Path:
"""Validate and resolve a path within an allowed folder.
Args:
folder_id: ID of the configured media folder.
requested_path: Path to validate (relative to folder root or absolute).
Returns:
Resolved absolute Path object.
Raises:
ValueError: If folder_id invalid or path traversal attempted.
FileNotFoundError: If path does not exist.
"""
# Get folder config
if folder_id not in settings.media_folders:
raise ValueError(f"Media folder '{folder_id}' not configured")
folder_config = settings.media_folders[folder_id]
if not folder_config.enabled:
raise ValueError(f"Media folder '{folder_id}' is disabled")
# Get base path
base_path = Path(folder_config.path).resolve()
if not base_path.exists():
raise FileNotFoundError(f"Media folder path does not exist: {base_path}")
if not base_path.is_dir():
raise ValueError(f"Media folder path is not a directory: {base_path}")
# Handle relative vs absolute paths
if requested_path.startswith("/") or requested_path.startswith("\\"):
# Relative to folder root (remove leading slash)
requested_path = requested_path.lstrip("/\\")
# Build and resolve full path
if requested_path:
full_path = (base_path / requested_path).resolve()
else:
full_path = base_path
# Security check: Ensure resolved path is within base path
try:
full_path.relative_to(base_path)
except ValueError:
logger.warning(
f"Path traversal attempt detected: {requested_path} "
f"(resolved to {full_path}, base: {base_path})"
)
raise ValueError("Path traversal not allowed")
# Check if path exists
if not full_path.exists():
raise FileNotFoundError(f"Path does not exist: {requested_path}")
return full_path
@staticmethod
def is_media_file(path: Path) -> bool:
"""Check if a file is a media file based on extension.
Args:
path: Path to check.
Returns:
True if file is a media file, False otherwise.
"""
return path.suffix.lower() in MEDIA_EXTENSIONS
@staticmethod
def get_file_type(path: Path) -> str:
"""Get the file type (folder, audio, video, other).
Args:
path: Path to check.
Returns:
File type string: "folder", "audio", "video", or "other".
"""
if path.is_dir():
return "folder"
suffix = path.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
return "audio"
elif suffix in VIDEO_EXTENSIONS:
return "video"
else:
return "other"
@staticmethod
def get_duration(file_path: Path) -> Optional[float]:
"""Get duration of a media file in seconds (header-only read).
Args:
file_path: Path to the media file.
Returns:
Duration in seconds, or None if unavailable.
"""
if not HAS_MUTAGEN:
return None
try:
audio = MutagenFile(str(file_path))
if audio is not None and hasattr(audio, "info") and hasattr(audio.info, "length"):
return round(audio.info.length, 2)
except Exception:
pass
return None
@staticmethod
def browse_directory(
folder_id: str,
path: str = "",
offset: int = 0,
limit: int = 100,
nocache: bool = False,
) -> dict:
"""Browse a directory and return items with metadata.
Args:
folder_id: ID of the configured media folder.
path: Path to browse (relative to folder root).
offset: Pagination offset (default: 0).
limit: Maximum items to return (default: 100).
Returns:
Dictionary with:
- current_path: Current path (relative to folder root)
- parent_path: Parent path (None if at root)
- items: List of file/folder items
- total: Total number of items
- offset: Current offset
- limit: Current limit
- folder_id: Folder ID
Raises:
ValueError: If path validation fails.
FileNotFoundError: If path does not exist.
"""
# Validate path
full_path = BrowserService.validate_path(folder_id, path)
# Get base path for relative path calculation
folder_config = settings.media_folders[folder_id]
base_path = Path(folder_config.path).resolve()
# Check if it's a directory
if not full_path.is_dir():
raise ValueError(f"Path is not a directory: {path}")
# Calculate relative path
try:
relative_path = full_path.relative_to(base_path)
current_path = "/" + str(relative_path).replace("\\", "/") if str(relative_path) != "." else "/"
except ValueError:
current_path = "/"
# Calculate parent path
if full_path == base_path:
parent_path = None
else:
parent_relative = full_path.parent.relative_to(base_path)
parent_path = "/" + str(parent_relative).replace("\\", "/") if str(parent_relative) != "." else "/"
# List directory contents (with caching)
try:
cache_key = str(full_path)
now = time.monotonic()
# Check cache
if not nocache and cache_key in _dir_cache:
cached_time, cached_items = _dir_cache[cache_key]
if now - cached_time < DIR_CACHE_TTL:
all_items = cached_items
else:
del _dir_cache[cache_key]
all_items = None
else:
all_items = None
# Enumerate directory if not cached
if all_items is None:
all_items = []
for item in full_path.iterdir():
if item.name.startswith("."):
continue
is_dir = item.is_dir()
if is_dir:
file_type = "folder"
else:
suffix = item.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
file_type = "audio"
elif suffix in VIDEO_EXTENSIONS:
file_type = "video"
else:
continue
all_items.append({
"name": item.name,
"type": file_type,
"is_media": file_type in ("audio", "video"),
})
all_items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower()))
_dir_cache[cache_key] = (now, all_items)
# Apply pagination
total = len(all_items)
items = all_items[offset:offset + limit]
# Fetch stat + duration only for items on the current page
for item in items:
item_path = full_path / item["name"]
try:
stat = item_path.stat()
item["size"] = stat.st_size if item["type"] != "folder" else None
item["modified"] = datetime.fromtimestamp(stat.st_mtime).isoformat()
except (OSError, PermissionError):
item["size"] = None
item["modified"] = None
if item["is_media"]:
item["duration"] = BrowserService.get_duration(item_path)
else:
item["duration"] = None
return {
"folder_id": folder_id,
"current_path": current_path,
"parent_path": parent_path,
"items": items,
"total": total,
"offset": offset,
"limit": limit,
}
except PermissionError:
raise ValueError(f"Permission denied accessing path: {path}")