"""Browser service for media file browsing and path validation.""" import logging import os from datetime import datetime from pathlib import Path from typing import Optional from ..config import settings try: from mutagen import File as MutagenFile HAS_MUTAGEN = True except ImportError: HAS_MUTAGEN = False logger = logging.getLogger(__name__) # Media file extensions AUDIO_EXTENSIONS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".aac", ".wma", ".opus"} VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".webm", ".flv"} MEDIA_EXTENSIONS = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS class BrowserService: """Service for browsing media files with path validation.""" @staticmethod def validate_path(folder_id: str, requested_path: str) -> Path: """Validate and resolve a path within an allowed folder. Args: folder_id: ID of the configured media folder. requested_path: Path to validate (relative to folder root or absolute). Returns: Resolved absolute Path object. Raises: ValueError: If folder_id invalid or path traversal attempted. FileNotFoundError: If path does not exist. """ # Get folder config if folder_id not in settings.media_folders: raise ValueError(f"Media folder '{folder_id}' not configured") folder_config = settings.media_folders[folder_id] if not folder_config.enabled: raise ValueError(f"Media folder '{folder_id}' is disabled") # Get base path base_path = Path(folder_config.path).resolve() if not base_path.exists(): raise FileNotFoundError(f"Media folder path does not exist: {base_path}") if not base_path.is_dir(): raise ValueError(f"Media folder path is not a directory: {base_path}") # Handle relative vs absolute paths if requested_path.startswith("/") or requested_path.startswith("\\"): # Relative to folder root (remove leading slash) requested_path = requested_path.lstrip("/\\") # Build and resolve full path if requested_path: full_path = (base_path / requested_path).resolve() else: full_path = base_path # Security check: Ensure resolved path is within base path try: full_path.relative_to(base_path) except ValueError: logger.warning( f"Path traversal attempt detected: {requested_path} " f"(resolved to {full_path}, base: {base_path})" ) raise ValueError("Path traversal not allowed") # Check if path exists if not full_path.exists(): raise FileNotFoundError(f"Path does not exist: {requested_path}") return full_path @staticmethod def is_media_file(path: Path) -> bool: """Check if a file is a media file based on extension. Args: path: Path to check. Returns: True if file is a media file, False otherwise. """ return path.suffix.lower() in MEDIA_EXTENSIONS @staticmethod def get_file_type(path: Path) -> str: """Get the file type (folder, audio, video, other). Args: path: Path to check. Returns: File type string: "folder", "audio", "video", or "other". """ if path.is_dir(): return "folder" suffix = path.suffix.lower() if suffix in AUDIO_EXTENSIONS: return "audio" elif suffix in VIDEO_EXTENSIONS: return "video" else: return "other" @staticmethod def get_duration(file_path: Path) -> Optional[float]: """Get duration of a media file in seconds (header-only read). Args: file_path: Path to the media file. Returns: Duration in seconds, or None if unavailable. """ if not HAS_MUTAGEN: return None try: audio = MutagenFile(str(file_path)) if audio is not None and hasattr(audio, "info") and hasattr(audio.info, "length"): return round(audio.info.length, 2) except Exception: pass return None @staticmethod def browse_directory( folder_id: str, path: str = "", offset: int = 0, limit: int = 100, ) -> dict: """Browse a directory and return items with metadata. Args: folder_id: ID of the configured media folder. path: Path to browse (relative to folder root). offset: Pagination offset (default: 0). limit: Maximum items to return (default: 100). Returns: Dictionary with: - current_path: Current path (relative to folder root) - parent_path: Parent path (None if at root) - items: List of file/folder items - total: Total number of items - offset: Current offset - limit: Current limit - folder_id: Folder ID Raises: ValueError: If path validation fails. FileNotFoundError: If path does not exist. """ # Validate path full_path = BrowserService.validate_path(folder_id, path) # Get base path for relative path calculation folder_config = settings.media_folders[folder_id] base_path = Path(folder_config.path).resolve() # Check if it's a directory if not full_path.is_dir(): raise ValueError(f"Path is not a directory: {path}") # Calculate relative path try: relative_path = full_path.relative_to(base_path) current_path = "/" + str(relative_path).replace("\\", "/") if str(relative_path) != "." else "/" except ValueError: current_path = "/" # Calculate parent path if full_path == base_path: parent_path = None else: parent_relative = full_path.parent.relative_to(base_path) parent_path = "/" + str(parent_relative).replace("\\", "/") if str(parent_relative) != "." else "/" # List directory contents try: all_items = [] for item in full_path.iterdir(): # Skip hidden files (starting with .) if item.name.startswith("."): continue # Get file type file_type = BrowserService.get_file_type(item) # Skip non-media files (but include folders) if file_type == "other" and not item.is_dir(): continue # Get file info try: stat = item.stat() size = stat.st_size if item.is_file() else None modified = datetime.fromtimestamp(stat.st_mtime).isoformat() except (OSError, PermissionError): size = None modified = None all_items.append({ "name": item.name, "type": file_type, "size": size, "modified": modified, "is_media": file_type in ("audio", "video"), }) # Sort: folders first, then by name all_items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower())) # Apply pagination total = len(all_items) items = all_items[offset:offset + limit] # Extract duration for media files in the current page for item in items: if item["is_media"]: item_path = full_path / item["name"] item["duration"] = BrowserService.get_duration(item_path) else: item["duration"] = None return { "folder_id": folder_id, "current_path": current_path, "parent_path": parent_path, "items": items, "total": total, "offset": offset, "limit": limit, } except PermissionError: raise ValueError(f"Permission denied accessing path: {path}")