Compare commits

...

1 Commits

Author SHA1 Message Date
7c631d09f6 Add media browser feature with UI improvements
- Refactored index.html: Split into separate HTML (309 lines), CSS (908 lines), and JS (1,286 lines) files
- Implemented media browser with folder configuration, recursive navigation, and thumbnail display
- Added metadata extraction using mutagen library (title, artist, album, duration, bitrate, codec)
- Implemented thumbnail generation and caching with SHA256 hash-based keys and LRU eviction
- Added platform-specific file playback (os.startfile on Windows, xdg-open on Linux, open on macOS)
- Implemented path validation security to prevent directory traversal attacks
- Added smooth thumbnail loading with fade-in animation and loading spinner
- Added i18n support for browser (English and Russian)
- Updated dependencies: mutagen>=1.47.0, pillow>=10.0.0
- Added comprehensive media browser documentation to README

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 21:31:02 +03:00
21 changed files with 4535 additions and 2203 deletions

3
.gitignore vendored
View File

@@ -46,3 +46,6 @@ logs/
# OS
.DS_Store
Thumbs.db
# Thumbnail cache
.cache/

View File

@@ -75,6 +75,8 @@ Unregister-ScheduledTask -TaskName "MediaServer" -Confirm:$false
**Best Practice:** Always restart the server immediately after committing backend changes to verify they work correctly before pushing.
**CRITICAL** Always check acccessibility of WebUI after server restart to ensure that server has started without issues
## Configuration
Copy `config.example.yaml` to `config.yaml` and customize.

View File

@@ -89,6 +89,94 @@ We welcome translations for additional languages! To contribute a new locale:
See [CLAUDE.md](CLAUDE.md#internationalization-i18n) for detailed translation guidelines.
## Media Browser
The Media Browser feature allows you to browse and play media files from configured folders directly through the Web UI.
### Browser Features
- **Folder Configuration** - Mount multiple media folders (music/video directories)
- **Recursive Navigation** - Browse through folder hierarchies with breadcrumb navigation
- **Thumbnail Display** - Automatically generated thumbnails from album art
- **Metadata Extraction** - View title, artist, album, duration, bitrate, and more
- **Remote Playback** - Play files on the PC running the media server (not in the browser)
- **Last Path Memory** - Automatically returns to your last browsed location
- **Lazy Loading** - Thumbnails load only when visible for better performance
### Configuration
Add media folders in your `config.yaml`:
```yaml
# Media folders for browser
media_folders:
music:
path: "C:\\Users\\YourUsername\\Music"
label: "My Music"
enabled: true
videos:
path: "C:\\Users\\YourUsername\\Videos"
label: "My Videos"
enabled: true
# Thumbnail size: "small" (150x150), "medium" (300x300), or "both"
thumbnail_size: "medium"
```
### How Playback Works
When you play a file from the Media Browser:
1. The file is opened using the **default system media player** on the PC running the media server
2. This is designed for **remote control scenarios** where you browse media from one device (e.g., Home Assistant dashboard, phone) but want audio to play on the PC
3. The media player must support the **Windows Media Session API** for playback tracking
### Media Player Compatibility
**⚠️ Important Limitation:** Not all media players expose their playback information to the Windows Media Session API. This means some players will open and play the file, but the Media Server UI won't show playback status, track information, or allow remote control.
**✅ Compatible Players** (work with playback tracking):
- **VLC Media Player** - Full support
- **Groove Music** (Windows 10/11 built-in) - Full support
- **Spotify** - Full support (if already running)
- **Chrome/Edge/Firefox** - Full support for web players
- **foobar2000** - Full support (with proper configuration/plugins)
**❌ Limited/No Support:**
- **Windows Media Player Classic** - Opens files but doesn't expose session info
- **Windows Media Player** (classic version) - Limited session support
**Recommendation:** Set **VLC Media Player** or **Groove Music** as your default audio player for the best experience with the Media Browser.
#### Changing Your Default Media Player (Windows)
1. Open Windows Settings → Apps → Default apps
2. Search for "Music player" or "Video player"
3. Select VLC Media Player or Groove Music
4. Files opened from Media Browser will now use the selected player
### API Endpoints
The Media Browser exposes several REST API endpoints:
| Endpoint | Method | Description |
|--------------------------|--------|-----------------------------------|
| `/api/browser/folders` | GET | List configured media folders |
| `/api/browser/browse` | GET | Browse directory contents |
| `/api/browser/metadata` | GET | Get media file metadata |
| `/api/browser/thumbnail` | GET | Get thumbnail image |
| `/api/browser/play` | POST | Open file with default player |
All endpoints require bearer token authentication.
### Security Notes
- **Path Traversal Protection** - All paths are validated to prevent directory traversal attacks
- **Folder Restrictions** - Only configured folders are accessible
- **Authentication Required** - All endpoints require a valid API token
## Requirements
- Python 3.10+

View File

@@ -10,6 +10,14 @@ from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class MediaFolderConfig(BaseModel):
"""Configuration for a media folder."""
path: str = Field(..., description="Absolute path to media folder")
label: str = Field(..., description="Human-readable display label")
enabled: bool = Field(default=True, description="Whether this folder is active")
class CallbackConfig(BaseModel):
"""Configuration for a callback script (no label/description needed)."""
@@ -77,6 +85,18 @@ class Settings(BaseSettings):
description="Callback scripts executed by integration events (on_turn_on, on_turn_off, on_toggle)",
)
# Media folders for browsing
media_folders: dict[str, MediaFolderConfig] = Field(
default_factory=dict,
description="Media folders available for browsing in the media browser",
)
# Thumbnail settings
thumbnail_size: str = Field(
default="medium",
description='Thumbnail size: "small" (150x150), "medium" (300x300), or "both"',
)
@classmethod
def load_from_yaml(cls, path: Optional[Path] = None) -> "Settings":
"""Load settings from a YAML configuration file."""

View File

@@ -8,7 +8,7 @@ from typing import Optional
import yaml
from .config import CallbackConfig, ScriptConfig, settings
from .config import CallbackConfig, MediaFolderConfig, ScriptConfig, settings
logger = logging.getLogger(__name__)
@@ -279,6 +279,114 @@ class ConfigManager:
logger.info(f"Callback '{name}' deleted from config")
def add_media_folder(self, folder_id: str, config: MediaFolderConfig) -> None:
"""Add a new media folder to config.
Args:
folder_id: Folder ID (must be unique).
config: Media folder configuration.
Raises:
ValueError: If folder already exists.
IOError: If config file cannot be written.
"""
with self._lock:
# Read YAML
if not self._config_path.exists():
data = {}
else:
with open(self._config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
# Check if folder already exists
if "media_folders" in data and folder_id in data["media_folders"]:
raise ValueError(f"Media folder '{folder_id}' already exists")
# Add folder
if "media_folders" not in data:
data["media_folders"] = {}
data["media_folders"][folder_id] = config.model_dump(exclude_none=True)
# Write YAML
self._config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self._config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
# Update in-memory settings
settings.media_folders[folder_id] = config
logger.info(f"Media folder '{folder_id}' added to config")
def update_media_folder(self, folder_id: str, config: MediaFolderConfig) -> None:
"""Update an existing media folder.
Args:
folder_id: Folder ID.
config: New media folder configuration.
Raises:
ValueError: If folder does not exist.
IOError: If config file cannot be written.
"""
with self._lock:
# Read YAML
if not self._config_path.exists():
raise ValueError(f"Config file not found: {self._config_path}")
with open(self._config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
# Check if folder exists
if "media_folders" not in data or folder_id not in data["media_folders"]:
raise ValueError(f"Media folder '{folder_id}' does not exist")
# Update folder
data["media_folders"][folder_id] = config.model_dump(exclude_none=True)
# Write YAML
with open(self._config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
# Update in-memory settings
settings.media_folders[folder_id] = config
logger.info(f"Media folder '{folder_id}' updated in config")
def delete_media_folder(self, folder_id: str) -> None:
"""Delete a media folder from config.
Args:
folder_id: Folder ID.
Raises:
ValueError: If folder does not exist.
IOError: If config file cannot be written.
"""
with self._lock:
# Read YAML
if not self._config_path.exists():
raise ValueError(f"Config file not found: {self._config_path}")
with open(self._config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
# Check if folder exists
if "media_folders" not in data or folder_id not in data["media_folders"]:
raise ValueError(f"Media folder '{folder_id}' does not exist")
# Delete folder
del data["media_folders"][folder_id]
# Write YAML
with open(self._config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
# Update in-memory settings
if folder_id in settings.media_folders:
del settings.media_folders[folder_id]
logger.info(f"Media folder '{folder_id}' deleted from config")
# Global config manager instance
config_manager = ConfigManager()

View File

@@ -15,7 +15,7 @@ from fastapi.staticfiles import StaticFiles
from . import __version__
from .auth import get_token_label, token_label_var
from .config import settings, generate_default_config, get_config_dir
from .routes import audio_router, callbacks_router, health_router, media_router, scripts_router
from .routes import audio_router, browser_router, callbacks_router, health_router, media_router, scripts_router
from .services import get_media_controller
from .services.websocket_manager import ws_manager
@@ -110,6 +110,7 @@ def create_app() -> FastAPI:
# Register routers
app.include_router(audio_router)
app.include_router(browser_router)
app.include_router(callbacks_router)
app.include_router(health_router)
app.include_router(media_router)

View File

@@ -1,9 +1,10 @@
"""API route modules."""
from .audio import router as audio_router
from .browser import router as browser_router
from .callbacks import router as callbacks_router
from .health import router as health_router
from .media import router as media_router
from .scripts import router as scripts_router
__all__ = ["audio_router", "callbacks_router", "health_router", "media_router", "scripts_router"]
__all__ = ["audio_router", "browser_router", "callbacks_router", "health_router", "media_router", "scripts_router"]

View File

@@ -0,0 +1,419 @@
"""Browser API routes for media file browsing."""
import asyncio
import logging
from pathlib import Path
from typing import Optional
from urllib.parse import unquote
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from ..auth import verify_token
from ..config import MediaFolderConfig, settings
from ..config_manager import config_manager
from ..services.browser_service import BrowserService
from ..services.metadata_service import MetadataService
from ..services.thumbnail_service import ThumbnailService
from ..services import get_media_controller
from ..services.websocket_manager import ws_manager
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/browser", tags=["browser"])
# Request/Response Models
class FolderCreateRequest(BaseModel):
"""Request model for creating a media folder."""
folder_id: str = Field(..., description="Unique folder ID")
label: str = Field(..., description="Display label")
path: str = Field(..., description="Absolute path to media folder")
enabled: bool = Field(default=True, description="Whether folder is enabled")
class FolderUpdateRequest(BaseModel):
"""Request model for updating a media folder."""
label: str = Field(..., description="Display label")
path: str = Field(..., description="Absolute path to media folder")
enabled: bool = Field(default=True, description="Whether folder is enabled")
class PlayRequest(BaseModel):
"""Request model for playing a media file."""
path: str = Field(..., description="Full path to the media file")
# Folder Management Endpoints
@router.get("/folders")
async def list_folders(_: str = Depends(verify_token)):
"""List all configured media folders.
Returns:
Dictionary of folder configurations.
"""
folders = {}
for folder_id, config in settings.media_folders.items():
folders[folder_id] = {
"id": folder_id,
"label": config.label,
"path": config.path,
"enabled": config.enabled,
}
return folders
@router.post("/folders/create")
async def create_folder(
request: FolderCreateRequest,
_: str = Depends(verify_token),
):
"""Create a new media folder configuration.
Args:
request: Folder creation request.
Returns:
Success message.
Raises:
HTTPException: If folder already exists or validation fails.
"""
try:
# Validate folder_id format (alphanumeric and underscore only)
if not request.folder_id.replace("_", "").isalnum():
raise HTTPException(
status_code=400,
detail="Folder ID must contain only alphanumeric characters and underscores",
)
# Validate path exists
path = Path(request.path)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Path does not exist: {request.path}")
if not path.is_dir():
raise HTTPException(status_code=400, detail=f"Path is not a directory: {request.path}")
# Create config
config = MediaFolderConfig(
path=request.path,
label=request.label,
enabled=request.enabled,
)
# Add to config manager
config_manager.add_media_folder(request.folder_id, config)
return {
"success": True,
"message": f"Media folder '{request.folder_id}' created successfully",
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating media folder: {e}")
raise HTTPException(status_code=500, detail="Failed to create media folder")
@router.put("/folders/update/{folder_id}")
async def update_folder(
folder_id: str,
request: FolderUpdateRequest,
_: str = Depends(verify_token),
):
"""Update an existing media folder configuration.
Args:
folder_id: ID of the folder to update.
request: Folder update request.
Returns:
Success message.
Raises:
HTTPException: If folder doesn't exist or validation fails.
"""
try:
# Validate path exists
path = Path(request.path)
if not path.exists():
raise HTTPException(status_code=400, detail=f"Path does not exist: {request.path}")
if not path.is_dir():
raise HTTPException(status_code=400, detail=f"Path is not a directory: {request.path}")
# Create config
config = MediaFolderConfig(
path=request.path,
label=request.label,
enabled=request.enabled,
)
# Update config manager
config_manager.update_media_folder(folder_id, config)
return {
"success": True,
"message": f"Media folder '{folder_id}' updated successfully",
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error updating media folder: {e}")
raise HTTPException(status_code=500, detail="Failed to update media folder")
@router.delete("/folders/delete/{folder_id}")
async def delete_folder(
folder_id: str,
_: str = Depends(verify_token),
):
"""Delete a media folder configuration.
Args:
folder_id: ID of the folder to delete.
Returns:
Success message.
Raises:
HTTPException: If folder doesn't exist.
"""
try:
config_manager.delete_media_folder(folder_id)
return {
"success": True,
"message": f"Media folder '{folder_id}' deleted successfully",
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Error deleting media folder: {e}")
raise HTTPException(status_code=500, detail="Failed to delete media folder")
# Browse Endpoints
@router.get("/browse")
async def browse(
folder_id: str = Query(..., description="Media folder ID"),
path: str = Query(default="", description="Path relative to folder root"),
offset: int = Query(default=0, ge=0, description="Pagination offset"),
limit: int = Query(default=100, ge=1, le=1000, description="Pagination limit"),
_: str = Depends(verify_token),
):
"""Browse a directory and list files/folders.
Args:
folder_id: ID of the media folder.
path: Path to browse (URL-encoded, relative to folder root).
offset: Pagination offset.
limit: Maximum items to return.
Returns:
Directory listing with items and metadata.
Raises:
HTTPException: If path validation fails or directory not accessible.
"""
try:
# URL decode the path
decoded_path = unquote(path)
# Browse directory
result = BrowserService.browse_directory(
folder_id=folder_id,
path=decoded_path,
offset=offset,
limit=limit,
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Error browsing directory: {e}")
raise HTTPException(status_code=500, detail="Failed to browse directory")
# Metadata Endpoint
@router.get("/metadata")
async def get_metadata(
path: str = Query(..., description="Full path to media file (URL-encoded)"),
_: str = Depends(verify_token),
):
"""Get metadata for a media file.
Args:
path: Full path to the media file (URL-encoded).
Returns:
Media file metadata.
Raises:
HTTPException: If file not found or metadata extraction fails.
"""
try:
# URL decode the path
decoded_path = unquote(path)
file_path = Path(decoded_path)
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
if not file_path.is_file():
raise HTTPException(status_code=400, detail="Path is not a file")
# Extract metadata in executor (blocking operation)
loop = asyncio.get_event_loop()
metadata = await loop.run_in_executor(
None,
MetadataService.extract_metadata,
file_path,
)
return metadata
except HTTPException:
raise
except Exception as e:
logger.error(f"Error extracting metadata: {e}")
raise HTTPException(status_code=500, detail="Failed to extract metadata")
# Thumbnail Endpoint
@router.get("/thumbnail")
async def get_thumbnail(
path: str = Query(..., description="Full path to media file (URL-encoded)"),
size: str = Query(default="medium", description='Thumbnail size: "small" or "medium"'),
_: str = Depends(verify_token),
):
"""Get thumbnail for a media file.
Args:
path: Full path to the media file (URL-encoded).
size: Thumbnail size ("small" or "medium").
Returns:
JPEG image bytes.
Raises:
HTTPException: If file not found or thumbnail generation fails.
"""
try:
# URL decode the path
decoded_path = unquote(path)
file_path = Path(decoded_path)
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
if not file_path.is_file():
raise HTTPException(status_code=400, detail="Path is not a file")
# Validate size
if size not in ("small", "medium"):
size = "medium"
# Get thumbnail
thumbnail_data = await ThumbnailService.get_thumbnail(file_path, size)
if thumbnail_data is None:
raise HTTPException(status_code=404, detail="Thumbnail not available")
# Calculate ETag (hash of path + mtime)
import hashlib
stat = file_path.stat()
etag_data = f"{file_path}:{stat.st_mtime}:{size}".encode()
etag = hashlib.md5(etag_data).hexdigest()
# Return image with caching headers
return Response(
content=thumbnail_data,
media_type="image/jpeg",
headers={
"ETag": f'"{etag}"',
"Cache-Control": "public, max-age=86400", # 24 hours
},
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error generating thumbnail: {e}")
raise HTTPException(status_code=500, detail="Failed to generate thumbnail")
# Playback Endpoint
@router.post("/play")
async def play_file(
request: PlayRequest,
_: str = Depends(verify_token),
):
"""Open a media file with the default system player.
Args:
request: Play request with file path.
Returns:
Success message.
Raises:
HTTPException: If file not found or playback fails.
"""
try:
file_path = Path(request.path)
# Validate file exists
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
if not file_path.is_file():
raise HTTPException(status_code=400, detail="Path is not a file")
# Validate file is a media file
if not BrowserService.is_media_file(file_path):
raise HTTPException(status_code=400, detail="File is not a media file")
# Get media controller and open file
controller = get_media_controller()
success = await controller.open_file(str(file_path))
if not success:
raise HTTPException(status_code=500, detail="Failed to open file")
# Wait for media player to start and register with Windows Media Session API
# This allows the UI to update immediately with the new playback state
await asyncio.sleep(1.5)
# Get updated status and broadcast to all connected clients
try:
status = await controller.get_status()
status_dict = status.model_dump()
await ws_manager.broadcast({
"type": "status",
"data": status_dict
})
logger.info(f"Broadcasted status update after opening file: {file_path.name}")
except Exception as e:
logger.warning(f"Failed to broadcast status after opening file: {e}")
return {
"success": True,
"message": f"Playing {file_path.name}",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error playing file: {e}")
raise HTTPException(status_code=500, detail="Failed to play file")

View File

@@ -0,0 +1,216 @@
"""Browser service for media file browsing and path validation."""
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..config import settings
logger = logging.getLogger(__name__)
# Media file extensions
AUDIO_EXTENSIONS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".aac", ".wma", ".opus"}
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".webm", ".flv"}
MEDIA_EXTENSIONS = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS
class BrowserService:
"""Service for browsing media files with path validation."""
@staticmethod
def validate_path(folder_id: str, requested_path: str) -> Path:
"""Validate and resolve a path within an allowed folder.
Args:
folder_id: ID of the configured media folder.
requested_path: Path to validate (relative to folder root or absolute).
Returns:
Resolved absolute Path object.
Raises:
ValueError: If folder_id invalid or path traversal attempted.
FileNotFoundError: If path does not exist.
"""
# Get folder config
if folder_id not in settings.media_folders:
raise ValueError(f"Media folder '{folder_id}' not configured")
folder_config = settings.media_folders[folder_id]
if not folder_config.enabled:
raise ValueError(f"Media folder '{folder_id}' is disabled")
# Get base path
base_path = Path(folder_config.path).resolve()
if not base_path.exists():
raise FileNotFoundError(f"Media folder path does not exist: {base_path}")
if not base_path.is_dir():
raise ValueError(f"Media folder path is not a directory: {base_path}")
# Handle relative vs absolute paths
if requested_path.startswith("/") or requested_path.startswith("\\"):
# Relative to folder root (remove leading slash)
requested_path = requested_path.lstrip("/\\")
# Build and resolve full path
if requested_path:
full_path = (base_path / requested_path).resolve()
else:
full_path = base_path
# Security check: Ensure resolved path is within base path
try:
full_path.relative_to(base_path)
except ValueError:
logger.warning(
f"Path traversal attempt detected: {requested_path} "
f"(resolved to {full_path}, base: {base_path})"
)
raise ValueError("Path traversal not allowed")
# Check if path exists
if not full_path.exists():
raise FileNotFoundError(f"Path does not exist: {requested_path}")
return full_path
@staticmethod
def is_media_file(path: Path) -> bool:
"""Check if a file is a media file based on extension.
Args:
path: Path to check.
Returns:
True if file is a media file, False otherwise.
"""
return path.suffix.lower() in MEDIA_EXTENSIONS
@staticmethod
def get_file_type(path: Path) -> str:
"""Get the file type (folder, audio, video, other).
Args:
path: Path to check.
Returns:
File type string: "folder", "audio", "video", or "other".
"""
if path.is_dir():
return "folder"
suffix = path.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
return "audio"
elif suffix in VIDEO_EXTENSIONS:
return "video"
else:
return "other"
@staticmethod
def browse_directory(
folder_id: str,
path: str = "",
offset: int = 0,
limit: int = 100,
) -> dict:
"""Browse a directory and return items with metadata.
Args:
folder_id: ID of the configured media folder.
path: Path to browse (relative to folder root).
offset: Pagination offset (default: 0).
limit: Maximum items to return (default: 100).
Returns:
Dictionary with:
- current_path: Current path (relative to folder root)
- parent_path: Parent path (None if at root)
- items: List of file/folder items
- total: Total number of items
- offset: Current offset
- limit: Current limit
- folder_id: Folder ID
Raises:
ValueError: If path validation fails.
FileNotFoundError: If path does not exist.
"""
# Validate path
full_path = BrowserService.validate_path(folder_id, path)
# Get base path for relative path calculation
folder_config = settings.media_folders[folder_id]
base_path = Path(folder_config.path).resolve()
# Check if it's a directory
if not full_path.is_dir():
raise ValueError(f"Path is not a directory: {path}")
# Calculate relative path
try:
relative_path = full_path.relative_to(base_path)
current_path = "/" + str(relative_path).replace("\\", "/") if str(relative_path) != "." else "/"
except ValueError:
current_path = "/"
# Calculate parent path
if full_path == base_path:
parent_path = None
else:
parent_relative = full_path.parent.relative_to(base_path)
parent_path = "/" + str(parent_relative).replace("\\", "/") if str(parent_relative) != "." else "/"
# List directory contents
try:
all_items = []
for item in full_path.iterdir():
# Skip hidden files (starting with .)
if item.name.startswith("."):
continue
# Get file type
file_type = BrowserService.get_file_type(item)
# Skip non-media files (but include folders)
if file_type == "other" and not item.is_dir():
continue
# Get file info
try:
stat = item.stat()
size = stat.st_size if item.is_file() else None
modified = datetime.fromtimestamp(stat.st_mtime).isoformat()
except (OSError, PermissionError):
size = None
modified = None
all_items.append({
"name": item.name,
"type": file_type,
"size": size,
"modified": modified,
"is_media": file_type in ("audio", "video"),
})
# Sort: folders first, then by name
all_items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower()))
# Apply pagination
total = len(all_items)
items = all_items[offset:offset + limit]
return {
"folder_id": folder_id,
"current_path": current_path,
"parent_path": parent_path,
"items": items,
"total": total,
"offset": offset,
"limit": limit,
}
except PermissionError:
raise ValueError(f"Permission denied accessing path: {path}")

View File

@@ -293,3 +293,27 @@ class LinuxMediaController(MediaController):
except Exception as e:
logger.error(f"Failed to seek: {e}")
return False
async def open_file(self, file_path: str) -> bool:
"""Open a media file with the default system player (Linux).
Uses xdg-open to open the file with the default application.
Args:
file_path: Absolute path to the media file
Returns:
True if successful, False otherwise
"""
try:
process = await asyncio.create_subprocess_exec(
'xdg-open', file_path,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await process.wait()
logger.info(f"Opened file with default player: {file_path}")
return True
except Exception as e:
logger.error(f"Failed to open file {file_path}: {e}")
return False

View File

@@ -294,3 +294,27 @@ class MacOSMediaController(MediaController):
)
return True
return False
async def open_file(self, file_path: str) -> bool:
"""Open a media file with the default system player (macOS).
Uses the 'open' command to open the file with the default application.
Args:
file_path: Absolute path to the media file
Returns:
True if successful, False otherwise
"""
try:
process = await asyncio.create_subprocess_exec(
'open', file_path,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await process.wait()
logger.info(f"Opened file with default player: {file_path}")
return True
except Exception as e:
logger.error(f"Failed to open file {file_path}: {e}")
return False

View File

@@ -94,3 +94,15 @@ class MediaController(ABC):
True if successful, False otherwise
"""
pass
@abstractmethod
async def open_file(self, file_path: str) -> bool:
"""Open a media file with the default system player.
Args:
file_path: Absolute path to the media file
Returns:
True if successful, False otherwise
"""
pass

View File

@@ -0,0 +1,194 @@
"""Metadata extraction service for media files."""
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
class MetadataService:
"""Service for extracting metadata from media files."""
@staticmethod
def extract_audio_metadata(file_path: Path) -> dict:
"""Extract metadata from an audio file.
Args:
file_path: Path to the audio file.
Returns:
Dictionary with audio metadata.
"""
try:
import mutagen
from mutagen import File as MutagenFile
audio = MutagenFile(str(file_path), easy=True)
if audio is None:
return {"error": "Unable to read audio file"}
metadata = {
"type": "audio",
"filename": file_path.name,
"path": str(file_path),
}
# Extract duration
if hasattr(audio.info, "length"):
metadata["duration"] = round(audio.info.length, 2)
# Extract bitrate
if hasattr(audio.info, "bitrate"):
metadata["bitrate"] = audio.info.bitrate
# Extract sample rate
if hasattr(audio.info, "sample_rate"):
metadata["sample_rate"] = audio.info.sample_rate
elif hasattr(audio.info, "samplerate"):
metadata["sample_rate"] = audio.info.samplerate
# Extract channels
if hasattr(audio.info, "channels"):
metadata["channels"] = audio.info.channels
# Extract tags (use easy=True for consistent tag names)
if audio is not None and hasattr(audio, "tags") and audio.tags:
# Easy tags provide lists, so we take the first item
tags = audio.tags
if "title" in tags:
metadata["title"] = tags["title"][0] if isinstance(tags["title"], list) else tags["title"]
if "artist" in tags:
metadata["artist"] = tags["artist"][0] if isinstance(tags["artist"], list) else tags["artist"]
if "album" in tags:
metadata["album"] = tags["album"][0] if isinstance(tags["album"], list) else tags["album"]
if "albumartist" in tags:
metadata["album_artist"] = tags["albumartist"][0] if isinstance(tags["albumartist"], list) else tags["albumartist"]
if "date" in tags:
metadata["date"] = tags["date"][0] if isinstance(tags["date"], list) else tags["date"]
if "genre" in tags:
metadata["genre"] = tags["genre"][0] if isinstance(tags["genre"], list) else tags["genre"]
if "tracknumber" in tags:
metadata["track_number"] = tags["tracknumber"][0] if isinstance(tags["tracknumber"], list) else tags["tracknumber"]
# If no title tag, use filename
if "title" not in metadata:
metadata["title"] = file_path.stem
return metadata
except ImportError:
logger.error("mutagen library not installed, cannot extract metadata")
return {"error": "mutagen library not installed"}
except Exception as e:
logger.error(f"Error extracting audio metadata from {file_path}: {e}")
return {
"error": str(e),
"filename": file_path.name,
"title": file_path.stem,
}
@staticmethod
def extract_video_metadata(file_path: Path) -> dict:
"""Extract basic metadata from a video file.
Args:
file_path: Path to the video file.
Returns:
Dictionary with video metadata.
"""
try:
import mutagen
from mutagen import File as MutagenFile
video = MutagenFile(str(file_path))
if video is None:
return {
"type": "video",
"filename": file_path.name,
"title": file_path.stem,
}
metadata = {
"type": "video",
"filename": file_path.name,
"path": str(file_path),
}
# Extract duration
if hasattr(video.info, "length"):
metadata["duration"] = round(video.info.length, 2)
# Extract bitrate
if hasattr(video.info, "bitrate"):
metadata["bitrate"] = video.info.bitrate
# Extract video-specific properties if available
if hasattr(video.info, "width"):
metadata["width"] = video.info.width
if hasattr(video.info, "height"):
metadata["height"] = video.info.height
# Try to extract title from tags
if hasattr(video, "tags") and video.tags:
tags = video.tags
if hasattr(tags, "get"):
title = tags.get("title") or tags.get("TITLE") or tags.get("\xa9nam")
if title:
metadata["title"] = title[0] if isinstance(title, list) else str(title)
# If no title tag, use filename
if "title" not in metadata:
metadata["title"] = file_path.stem
return metadata
except ImportError:
logger.error("mutagen library not installed, cannot extract metadata")
return {
"error": "mutagen library not installed",
"type": "video",
"filename": file_path.name,
"title": file_path.stem,
}
except Exception as e:
logger.debug(f"Error extracting video metadata from {file_path}: {e}")
# Return basic metadata
return {
"type": "video",
"filename": file_path.name,
"title": file_path.stem,
}
@staticmethod
def extract_metadata(file_path: Path) -> dict:
"""Extract metadata from a media file (auto-detect type).
Args:
file_path: Path to the media file.
Returns:
Dictionary with media metadata.
"""
from .browser_service import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS
suffix = file_path.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
return MetadataService.extract_audio_metadata(file_path)
elif suffix in VIDEO_EXTENSIONS:
return MetadataService.extract_video_metadata(file_path)
else:
return {
"error": "Unsupported file type",
"filename": file_path.name,
}

View File

@@ -0,0 +1,359 @@
"""Thumbnail generation and caching service."""
import asyncio
import hashlib
import logging
import os
import shutil
import subprocess
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Thumbnail sizes
THUMBNAIL_SIZES = {
"small": (150, 150),
"medium": (300, 300),
}
# Cache size limit (500MB)
CACHE_SIZE_LIMIT = 500 * 1024 * 1024 # 500MB in bytes
class ThumbnailService:
"""Service for generating and caching thumbnails."""
@staticmethod
def get_cache_dir() -> Path:
"""Get the thumbnail cache directory path.
Returns:
Path to the cache directory (project-local).
"""
# Store cache in project directory: media-server/.cache/thumbnails/
project_root = Path(__file__).parent.parent.parent
cache_dir = project_root / ".cache" / "thumbnails"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
@staticmethod
def get_cache_key(file_path: Path) -> str:
"""Generate cache key from file path.
Args:
file_path: Path to the media file.
Returns:
SHA256 hash of the absolute file path.
"""
absolute_path = str(file_path.resolve())
return hashlib.sha256(absolute_path.encode()).hexdigest()
@staticmethod
def get_cached_thumbnail(file_path: Path, size: str) -> Optional[bytes]:
"""Get cached thumbnail if valid.
Args:
file_path: Path to the media file.
size: Thumbnail size ("small" or "medium").
Returns:
Thumbnail bytes if cached and valid, None otherwise.
"""
cache_dir = ThumbnailService.get_cache_dir()
cache_key = ThumbnailService.get_cache_key(file_path)
cache_path = cache_dir / cache_key / f"{size}.jpg"
if not cache_path.exists():
return None
# Check if file has been modified since cache was created
try:
file_mtime = file_path.stat().st_mtime
cache_mtime = cache_path.stat().st_mtime
if file_mtime > cache_mtime:
logger.debug(f"Cache invalidated for {file_path.name} (file modified)")
return None
# Read cached thumbnail
with open(cache_path, "rb") as f:
return f.read()
except (OSError, PermissionError) as e:
logger.error(f"Error reading cached thumbnail: {e}")
return None
@staticmethod
def cache_thumbnail(file_path: Path, size: str, image_data: bytes) -> None:
"""Cache a thumbnail.
Args:
file_path: Path to the media file.
size: Thumbnail size ("small" or "medium").
image_data: Thumbnail image data (JPEG bytes).
"""
cache_dir = ThumbnailService.get_cache_dir()
cache_key = ThumbnailService.get_cache_key(file_path)
cache_folder = cache_dir / cache_key
cache_folder.mkdir(parents=True, exist_ok=True)
cache_path = cache_folder / f"{size}.jpg"
try:
with open(cache_path, "wb") as f:
f.write(image_data)
logger.debug(f"Cached thumbnail for {file_path.name} ({size})")
except (OSError, PermissionError) as e:
logger.error(f"Error caching thumbnail: {e}")
@staticmethod
def generate_audio_thumbnail(file_path: Path, size: str) -> Optional[bytes]:
"""Generate thumbnail from audio file (extract album art).
Args:
file_path: Path to the audio file.
size: Thumbnail size ("small" or "medium").
Returns:
Thumbnail bytes (JPEG) or None if no album art.
"""
try:
import mutagen
from mutagen import File as MutagenFile
from PIL import Image
from io import BytesIO
audio = MutagenFile(str(file_path))
if audio is None:
return None
# Extract album art
art_data = None
# Try different tag types for album art
if hasattr(audio, "pictures") and audio.pictures:
# FLAC, Ogg Vorbis
art_data = audio.pictures[0].data
elif hasattr(audio, "tags"):
tags = audio.tags
if tags is not None:
# MP3 (ID3)
if hasattr(tags, "getall"):
apic_frames = tags.getall("APIC")
if apic_frames:
art_data = apic_frames[0].data
# MP4/M4A
elif "covr" in tags:
art_data = bytes(tags["covr"][0])
# Try other common keys
elif "APIC:" in tags:
art_data = tags["APIC:"].data
if art_data is None:
return None
# Resize image
img = Image.open(BytesIO(art_data))
# Convert to RGB if necessary (handle RGBA, grayscale, etc.)
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
# Resize with maintaining aspect ratio and center crop
target_size = THUMBNAIL_SIZES[size]
img.thumbnail((target_size[0] * 2, target_size[1] * 2), Image.Resampling.LANCZOS)
# Center crop to square
width, height = img.size
min_dim = min(width, height)
left = (width - min_dim) // 2
top = (height - min_dim) // 2
right = left + min_dim
bottom = top + min_dim
img = img.crop((left, top, right, bottom))
# Final resize
img = img.resize(target_size, Image.Resampling.LANCZOS)
# Save as JPEG
output = BytesIO()
img.save(output, format="JPEG", quality=85, optimize=True)
return output.getvalue()
except ImportError:
logger.error("Required libraries (mutagen, Pillow) not installed")
return None
except Exception as e:
logger.debug(f"Error generating audio thumbnail for {file_path.name}: {e}")
return None
@staticmethod
async def generate_video_thumbnail(file_path: Path, size: str) -> Optional[bytes]:
"""Generate thumbnail from video file using ffmpeg.
Args:
file_path: Path to the video file.
size: Thumbnail size ("small" or "medium").
Returns:
Thumbnail bytes (JPEG) or None if ffmpeg not available.
"""
try:
from PIL import Image
from io import BytesIO
# Check if ffmpeg is available
if not shutil.which("ffmpeg"):
logger.debug("ffmpeg not available, cannot generate video thumbnail")
return None
# Extract frame at 10% duration
target_size = THUMBNAIL_SIZES[size]
# Use ffmpeg to extract a frame
cmd = [
"ffmpeg",
"-i", str(file_path),
"-vf", f"thumbnail,scale={target_size[0]}:{target_size[1]}:force_original_aspect_ratio=increase,crop={target_size[0]}:{target_size[1]}",
"-frames:v", "1",
"-f", "image2pipe",
"-vcodec", "mjpeg",
"-"
]
# Run ffmpeg with timeout
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
try:
stdout, _ = await asyncio.wait_for(process.communicate(), timeout=10.0)
if process.returncode == 0 and stdout:
# ffmpeg output is already JPEG, but let's ensure proper quality
img = Image.open(BytesIO(stdout))
# Convert to RGB if necessary
if img.mode != "RGB":
img = img.convert("RGB")
# Save as JPEG with consistent quality
output = BytesIO()
img.save(output, format="JPEG", quality=85, optimize=True)
return output.getvalue()
except asyncio.TimeoutError:
logger.warning(f"ffmpeg timeout for {file_path.name}")
process.kill()
await process.wait()
return None
except ImportError:
logger.error("Pillow library not installed")
return None
except Exception as e:
logger.debug(f"Error generating video thumbnail for {file_path.name}: {e}")
return None
@staticmethod
async def get_thumbnail(file_path: Path, size: str = "medium") -> Optional[bytes]:
"""Get thumbnail for a media file (from cache or generate).
Args:
file_path: Path to the media file.
size: Thumbnail size ("small" or "medium").
Returns:
Thumbnail bytes (JPEG) or None if unavailable.
"""
from .browser_service import AUDIO_EXTENSIONS, VIDEO_EXTENSIONS
# Validate size
if size not in THUMBNAIL_SIZES:
size = "medium"
# Check cache first
cached = ThumbnailService.get_cached_thumbnail(file_path, size)
if cached:
return cached
# Generate thumbnail based on file type
suffix = file_path.suffix.lower()
thumbnail_data = None
if suffix in AUDIO_EXTENSIONS:
# Audio files - run in executor (sync operation)
loop = asyncio.get_event_loop()
thumbnail_data = await loop.run_in_executor(
None,
ThumbnailService.generate_audio_thumbnail,
file_path,
size,
)
elif suffix in VIDEO_EXTENSIONS:
# Video files - already async
thumbnail_data = await ThumbnailService.generate_video_thumbnail(file_path, size)
# Cache if generated successfully
if thumbnail_data:
ThumbnailService.cache_thumbnail(file_path, size, thumbnail_data)
return thumbnail_data
@staticmethod
def cleanup_cache() -> None:
"""Clean up cache if it exceeds size limit.
Removes oldest thumbnails by access time.
"""
cache_dir = ThumbnailService.get_cache_dir()
try:
# Calculate total cache size
total_size = 0
cache_items = []
for folder in cache_dir.iterdir():
if folder.is_dir():
for file in folder.iterdir():
if file.is_file():
stat = file.stat()
total_size += stat.st_size
cache_items.append((file, stat.st_atime, stat.st_size))
# If cache is within limit, no cleanup needed
if total_size <= CACHE_SIZE_LIMIT:
return
logger.info(f"Cache size {total_size / 1024 / 1024:.2f}MB exceeds limit, cleaning up...")
# Sort by access time (oldest first)
cache_items.sort(key=lambda x: x[1])
# Remove oldest items until under limit
for file, _, size in cache_items:
if total_size <= CACHE_SIZE_LIMIT:
break
try:
file.unlink()
total_size -= size
logger.debug(f"Removed cached thumbnail: {file}")
# Remove empty parent folder
parent = file.parent
if parent != cache_dir and not any(parent.iterdir()):
parent.rmdir()
except (OSError, PermissionError) as e:
logger.error(f"Error removing cache file: {e}")
logger.info(f"Cache cleanup complete, new size: {total_size / 1024 / 1024:.2f}MB")
except Exception as e:
logger.error(f"Error during cache cleanup: {e}")

View File

@@ -666,3 +666,24 @@ class WindowsMediaController(MediaController):
except Exception as e:
logger.error(f"Failed to seek: {e}")
return False
async def open_file(self, file_path: str) -> bool:
"""Open a media file with the default system player (Windows).
Uses os.startfile() to open the file with the default application.
Args:
file_path: Absolute path to the media file
Returns:
True if successful, False otherwise
"""
try:
import os
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: os.startfile(file_path))
logger.info(f"Opened file with default player: {file_path}")
return True
except Exception as e:
logger.error(f"Failed to open file {file_path}: {e}")
return False

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -107,5 +107,29 @@
"callbacks.msg.load_failed": "Failed to load callback details",
"callbacks.msg.list_failed": "Failed to load callbacks",
"callbacks.confirm.delete": "Are you sure you want to delete the callback \"{name}\"?",
"callbacks.confirm.unsaved": "You have unsaved changes. Are you sure you want to discard them?"
"callbacks.confirm.unsaved": "You have unsaved changes. Are you sure you want to discard them?",
"browser.title": "Media Browser",
"browser.manage_folders": "Manage Folders",
"browser.select_folder": "Select a folder...",
"browser.select_folder_option": "Select a folder...",
"browser.no_folder_selected": "Select a folder to browse media files",
"browser.no_items": "No media files found in this folder",
"browser.previous": "Previous",
"browser.next": "Next",
"browser.play_success": "Playing {filename}",
"browser.play_error": "Failed to play file",
"browser.error_loading": "Error loading directory",
"browser.error_loading_folders": "Failed to load media folders",
"browser.manage_folders_hint": "Folder management coming soon! For now, edit config.yaml to add media folders.",
"browser.folder_dialog.title_add": "Add Media Folder",
"browser.folder_dialog.title_edit": "Edit Media Folder",
"browser.folder_dialog.folder_id": "Folder ID *",
"browser.folder_dialog.folder_id_help": "Alphanumeric and underscore only",
"browser.folder_dialog.label": "Label *",
"browser.folder_dialog.label_help": "Display name for this folder",
"browser.folder_dialog.path": "Path *",
"browser.folder_dialog.path_help": "Absolute path to media directory",
"browser.folder_dialog.enabled": "Enabled",
"browser.folder_dialog.cancel": "Cancel",
"browser.folder_dialog.save": "Save"
}

View File

@@ -107,5 +107,29 @@
"callbacks.msg.load_failed": "Не удалось загрузить данные обратного вызова",
"callbacks.msg.list_failed": "Не удалось загрузить обратные вызовы",
"callbacks.confirm.delete": "Вы уверены, что хотите удалить обратный вызов \"{name}\"?",
"callbacks.confirm.unsaved": "У вас есть несохраненные изменения. Вы уверены, что хотите отменить их?"
"callbacks.confirm.unsaved": "У вас есть несохраненные изменения. Вы уверены, что хотите отменить их?",
"browser.title": "Медиа Браузер",
"browser.manage_folders": "Управление папками",
"browser.select_folder": "Выберите папку...",
"browser.select_folder_option": "Выберите папку...",
"browser.no_folder_selected": "Выберите папку для просмотра медиафайлов",
"browser.no_items": "В этой папке не найдено медиафайлов",
"browser.previous": "Предыдущая",
"browser.next": "Следующая",
"browser.play_success": "Воспроизведение {filename}",
"browser.play_error": "Не удалось воспроизвести файл",
"browser.error_loading": "Ошибка загрузки каталога",
"browser.error_loading_folders": "Не удалось загрузить медиа папки",
"browser.manage_folders_hint": "Управление папками скоро появится! Пока редактируйте config.yaml для добавления медиа папок.",
"browser.folder_dialog.title_add": "Добавить медиа папку",
"browser.folder_dialog.title_edit": "Редактировать медиа папку",
"browser.folder_dialog.folder_id": "ID папки *",
"browser.folder_dialog.folder_id_help": "Только буквы, цифры и подчеркивание",
"browser.folder_dialog.label": "Метка *",
"browser.folder_dialog.label_help": "Отображаемое имя папки",
"browser.folder_dialog.path": "Путь *",
"browser.folder_dialog.path_help": "Абсолютный путь к медиа каталогу",
"browser.folder_dialog.enabled": "Включено",
"browser.folder_dialog.cancel": "Отмена",
"browser.folder_dialog.save": "Сохранить"
}

View File

@@ -30,6 +30,8 @@ dependencies = [
"pydantic>=2.0",
"pydantic-settings>=2.0",
"pyyaml>=6.0",
"mutagen>=1.47.0",
"pillow>=10.0.0",
]
[project.optional-dependencies]