"""Browser service for media file browsing and path validation.""" import logging import os import time from datetime import datetime from pathlib import Path from typing import Optional from ..config import settings # Directory listing cache: {resolved_path_str: (timestamp, all_items)} _dir_cache: dict[str, tuple[float, list[dict]]] = {} DIR_CACHE_TTL = 300 # 5 minutes try: from mutagen import File as MutagenFile HAS_MUTAGEN = True except ImportError: HAS_MUTAGEN = False logger = logging.getLogger(__name__) # Media file extensions AUDIO_EXTENSIONS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".aac", ".wma", ".opus"} VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".webm", ".flv"} MEDIA_EXTENSIONS = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS class BrowserService: """Service for browsing media files with path validation.""" @staticmethod def validate_path(folder_id: str, requested_path: str) -> Path: """Validate and resolve a path within an allowed folder. Args: folder_id: ID of the configured media folder. requested_path: Path to validate (relative to folder root or absolute). Returns: Resolved absolute Path object. Raises: ValueError: If folder_id invalid or path traversal attempted. FileNotFoundError: If path does not exist. """ # Get folder config if folder_id not in settings.media_folders: raise ValueError(f"Media folder '{folder_id}' not configured") folder_config = settings.media_folders[folder_id] if not folder_config.enabled: raise ValueError(f"Media folder '{folder_id}' is disabled") # Get base path base_path = Path(folder_config.path).resolve() if not base_path.exists(): raise FileNotFoundError(f"Media folder path does not exist: {base_path}") if not base_path.is_dir(): raise ValueError(f"Media folder path is not a directory: {base_path}") # Handle relative vs absolute paths if requested_path.startswith("/") or requested_path.startswith("\\"): # Relative to folder root (remove leading slash) requested_path = requested_path.lstrip("/\\") # Build and resolve full path if requested_path: full_path = (base_path / requested_path).resolve() else: full_path = base_path # Security check: Ensure resolved path is within base path try: full_path.relative_to(base_path) except ValueError: logger.warning( f"Path traversal attempt detected: {requested_path} " f"(resolved to {full_path}, base: {base_path})" ) raise ValueError("Path traversal not allowed") # Check if path exists if not full_path.exists(): raise FileNotFoundError(f"Path does not exist: {requested_path}") return full_path @staticmethod def is_media_file(path: Path) -> bool: """Check if a file is a media file based on extension. Args: path: Path to check. Returns: True if file is a media file, False otherwise. """ return path.suffix.lower() in MEDIA_EXTENSIONS @staticmethod def get_file_type(path: Path) -> str: """Get the file type (folder, audio, video, other). Args: path: Path to check. Returns: File type string: "folder", "audio", "video", or "other". """ if path.is_dir(): return "folder" suffix = path.suffix.lower() if suffix in AUDIO_EXTENSIONS: return "audio" elif suffix in VIDEO_EXTENSIONS: return "video" else: return "other" @staticmethod def get_media_info(file_path: Path) -> dict: """Get duration and bitrate of a media file (header-only read). Args: file_path: Path to the media file. Returns: Dict with 'duration' (float or None) and 'bitrate' (int or None). """ result = {"duration": None, "bitrate": None} if not HAS_MUTAGEN: return result try: audio = MutagenFile(str(file_path)) if audio is not None and hasattr(audio, "info"): if hasattr(audio.info, "length"): result["duration"] = round(audio.info.length, 2) if hasattr(audio.info, "bitrate") and audio.info.bitrate: result["bitrate"] = audio.info.bitrate except Exception: pass return result @staticmethod def browse_directory( folder_id: str, path: str = "", offset: int = 0, limit: int = 100, nocache: bool = False, ) -> dict: """Browse a directory and return items with metadata. Args: folder_id: ID of the configured media folder. path: Path to browse (relative to folder root). offset: Pagination offset (default: 0). limit: Maximum items to return (default: 100). Returns: Dictionary with: - current_path: Current path (relative to folder root) - parent_path: Parent path (None if at root) - items: List of file/folder items - total: Total number of items - offset: Current offset - limit: Current limit - folder_id: Folder ID Raises: ValueError: If path validation fails. FileNotFoundError: If path does not exist. """ # Validate path full_path = BrowserService.validate_path(folder_id, path) # Get base path for relative path calculation folder_config = settings.media_folders[folder_id] base_path = Path(folder_config.path).resolve() # Check if it's a directory if not full_path.is_dir(): raise ValueError(f"Path is not a directory: {path}") # Calculate relative path try: relative_path = full_path.relative_to(base_path) current_path = "/" + str(relative_path).replace("\\", "/") if str(relative_path) != "." else "/" except ValueError: current_path = "/" # Calculate parent path if full_path == base_path: parent_path = None else: parent_relative = full_path.parent.relative_to(base_path) parent_path = "/" + str(parent_relative).replace("\\", "/") if str(parent_relative) != "." else "/" # List directory contents (with caching) try: cache_key = str(full_path) now = time.monotonic() # Check cache if not nocache and cache_key in _dir_cache: cached_time, cached_items = _dir_cache[cache_key] if now - cached_time < DIR_CACHE_TTL: all_items = cached_items else: del _dir_cache[cache_key] all_items = None else: all_items = None # Enumerate directory if not cached if all_items is None: all_items = [] for item in full_path.iterdir(): if item.name.startswith("."): continue is_dir = item.is_dir() if is_dir: file_type = "folder" else: suffix = item.suffix.lower() if suffix in AUDIO_EXTENSIONS: file_type = "audio" elif suffix in VIDEO_EXTENSIONS: file_type = "video" else: continue all_items.append({ "name": item.name, "type": file_type, "is_media": file_type in ("audio", "video"), }) all_items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower())) _dir_cache[cache_key] = (now, all_items) # Apply pagination total = len(all_items) items = all_items[offset:offset + limit] # Fetch stat + duration only for items on the current page for item in items: item_path = full_path / item["name"] try: stat = item_path.stat() item["size"] = stat.st_size if item["type"] != "folder" else None item["modified"] = datetime.fromtimestamp(stat.st_mtime).isoformat() except (OSError, PermissionError): item["size"] = None item["modified"] = None if item["is_media"]: info = BrowserService.get_media_info(item_path) item["duration"] = info["duration"] item["bitrate"] = info["bitrate"] else: item["duration"] = None item["bitrate"] = None return { "folder_id": folder_id, "current_path": current_path, "parent_path": parent_path, "items": items, "total": total, "offset": offset, "limit": limit, } except PermissionError: raise ValueError(f"Permission denied accessing path: {path}")