"""Browser service for media file browsing and path validation.""" import logging import os import stat as stat_module import time from datetime import datetime from pathlib import Path from typing import Optional from ..config import settings # Directory listing cache: {resolved_path_str: (timestamp, all_items)} _dir_cache: dict[str, tuple[float, list[dict]]] = {} DIR_CACHE_TTL = 300 # 5 minutes # Media info cache: {(file_path_str, mtime): {duration, bitrate, title}} _media_info_cache: dict[tuple[str, float], dict] = {} _MEDIA_INFO_CACHE_MAX = 5000 try: from mutagen import File as MutagenFile HAS_MUTAGEN = True except ImportError: HAS_MUTAGEN = False logger = logging.getLogger(__name__) # Media file extensions AUDIO_EXTENSIONS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".aac", ".wma", ".opus"} VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".webm", ".flv"} MEDIA_EXTENSIONS = AUDIO_EXTENSIONS | VIDEO_EXTENSIONS class BrowserService: """Service for browsing media files with path validation.""" @staticmethod def validate_path(folder_id: str, requested_path: str) -> Path: """Validate and resolve a path within an allowed folder. Args: folder_id: ID of the configured media folder. requested_path: Path to validate (relative to folder root or absolute). Returns: Resolved absolute Path object. Raises: ValueError: If folder_id invalid or path traversal attempted. FileNotFoundError: If path does not exist. """ # Get folder config if folder_id not in settings.media_folders: raise ValueError(f"Media folder '{folder_id}' not configured") folder_config = settings.media_folders[folder_id] if not folder_config.enabled: raise ValueError(f"Media folder '{folder_id}' is disabled") # Get base path base_path = Path(folder_config.path).resolve() if not base_path.exists(): raise FileNotFoundError(f"Media folder path does not exist: {base_path}") if not base_path.is_dir(): raise ValueError(f"Media folder path is not a directory: {base_path}") # Handle relative vs absolute paths if requested_path.startswith("/") or requested_path.startswith("\\"): # Relative to folder root (remove leading slash) requested_path = requested_path.lstrip("/\\") # Build and resolve full path if requested_path: full_path = (base_path / requested_path).resolve() else: full_path = base_path # Security check: Ensure resolved path is within base path try: full_path.relative_to(base_path) except ValueError: logger.warning( f"Path traversal attempt detected: {requested_path} " f"(resolved to {full_path}, base: {base_path})" ) raise ValueError("Path traversal not allowed") # Check if path exists if not full_path.exists(): raise FileNotFoundError(f"Path does not exist: {requested_path}") return full_path @staticmethod def is_media_file(path: Path) -> bool: """Check if a file is a media file based on extension. Args: path: Path to check. Returns: True if file is a media file, False otherwise. """ return path.suffix.lower() in MEDIA_EXTENSIONS @staticmethod def get_file_type(path: Path) -> str: """Get the file type (folder, audio, video, other). Args: path: Path to check. Returns: File type string: "folder", "audio", "video", or "other". """ if path.is_dir(): return "folder" suffix = path.suffix.lower() if suffix in AUDIO_EXTENSIONS: return "audio" elif suffix in VIDEO_EXTENSIONS: return "video" else: return "other" @staticmethod def get_media_info(file_path: Path, mtime: float | None = None) -> dict: """Get duration, bitrate, and title of a media file (header-only read). Results are cached by (path, mtime) to avoid re-reading unchanged files. Args: file_path: Path to the media file. mtime: File modification time (avoids an extra stat call). Returns: Dict with 'duration' (float or None), 'bitrate' (int or None), and 'title' (str or None). """ result = {"duration": None, "bitrate": None, "title": None} if not HAS_MUTAGEN: return result # Use mtime-based cache to skip mutagen reads for unchanged files if mtime is None: try: mtime = file_path.stat().st_mtime except (OSError, PermissionError): pass if mtime is not None: cache_key = (str(file_path), mtime) cached = _media_info_cache.get(cache_key) if cached is not None: return cached try: audio = MutagenFile(str(file_path), easy=True) if audio is not None and hasattr(audio, "info"): if hasattr(audio.info, "length"): result["duration"] = round(audio.info.length, 2) if hasattr(audio.info, "bitrate") and audio.info.bitrate: result["bitrate"] = audio.info.bitrate if audio is not None and hasattr(audio, "tags") and audio.tags: tags = audio.tags title = None artist = None if "title" in tags: title = tags["title"][0] if isinstance(tags["title"], list) else tags["title"] if "artist" in tags: artist = tags["artist"][0] if isinstance(tags["artist"], list) else tags["artist"] if artist and title: result["title"] = f"{artist} \u2013 {title}" elif title: result["title"] = title except Exception: pass # Cache result (evict oldest entries if cache is full) if mtime is not None: if len(_media_info_cache) >= _MEDIA_INFO_CACHE_MAX: # Remove oldest ~20% of entries to_remove = list(_media_info_cache.keys())[:_MEDIA_INFO_CACHE_MAX // 5] for k in to_remove: del _media_info_cache[k] _media_info_cache[(str(file_path), mtime)] = result return result @staticmethod def browse_directory( folder_id: str, path: str = "", offset: int = 0, limit: int = 100, nocache: bool = False, ) -> dict: """Browse a directory and return items with metadata. Args: folder_id: ID of the configured media folder. path: Path to browse (relative to folder root). offset: Pagination offset (default: 0). limit: Maximum items to return (default: 100). Returns: Dictionary with: - current_path: Current path (relative to folder root) - parent_path: Parent path (None if at root) - items: List of file/folder items - total: Total number of items - offset: Current offset - limit: Current limit - folder_id: Folder ID Raises: ValueError: If path validation fails. FileNotFoundError: If path does not exist. """ # Validate path full_path = BrowserService.validate_path(folder_id, path) # Get base path for relative path calculation folder_config = settings.media_folders[folder_id] base_path = Path(folder_config.path).resolve() # Check if it's a directory if not full_path.is_dir(): raise ValueError(f"Path is not a directory: {path}") # Calculate relative path try: relative_path = full_path.relative_to(base_path) current_path = "/" + str(relative_path).replace("\\", "/") if str(relative_path) != "." else "/" except ValueError: current_path = "/" # Calculate parent path if full_path == base_path: parent_path = None else: parent_relative = full_path.parent.relative_to(base_path) parent_path = "/" + str(parent_relative).replace("\\", "/") if str(parent_relative) != "." else "/" # List directory contents (with caching) try: cache_key = str(full_path) now = time.monotonic() # Check cache if not nocache and cache_key in _dir_cache: cached_time, cached_items = _dir_cache[cache_key] if now - cached_time < DIR_CACHE_TTL: all_items = cached_items else: del _dir_cache[cache_key] all_items = None else: all_items = None # Enumerate directory if not cached if all_items is None: all_items = [] for item in full_path.iterdir(): if item.name.startswith("."): continue # Single stat() call per item — reuse for type check and metadata try: st = item.stat() except (OSError, PermissionError): continue if stat_module.S_ISDIR(st.st_mode): file_type = "folder" else: suffix = item.suffix.lower() if suffix in AUDIO_EXTENSIONS: file_type = "audio" elif suffix in VIDEO_EXTENSIONS: file_type = "video" else: continue all_items.append({ "name": item.name, "type": file_type, "is_media": file_type in ("audio", "video"), "_size": st.st_size, "_mtime": st.st_mtime, }) all_items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower())) _dir_cache[cache_key] = (now, all_items) # Apply pagination total = len(all_items) items = all_items[offset:offset + limit] # Enrich items on the current page with metadata for item in items: item["size"] = item["_size"] if item["type"] != "folder" else None item["modified"] = datetime.fromtimestamp(item["_mtime"]).isoformat() if item["is_media"]: item_path = full_path / item["name"] info = BrowserService.get_media_info(item_path, item["_mtime"]) item["duration"] = info["duration"] item["bitrate"] = info["bitrate"] item["title"] = info["title"] else: item["duration"] = None item["bitrate"] = None item["title"] = None return { "folder_id": folder_id, "current_path": current_path, "parent_path": parent_path, "items": items, "total": total, "offset": offset, "limit": limit, } except PermissionError: raise ValueError(f"Permission denied accessing path: {path}")