feat: Android TV app embedding Python server via Chaquopy
Lint & Test / test (push) Successful in 2m10s

Adds a native Android TV application that runs the full LedGrab Python
server in-process via Chaquopy. Captures the TV box screen using the
MediaProjection API and exposes the existing web UI on the device's
local network — users configure via phone/tablet browser.

Android (new /android/ module):
- Kotlin shell: MainActivity, CaptureService (foreground service),
  ScreenCapture (MediaProjection + ImageReader), PythonBridge (Chaquopy).
- Polished Leanback-themed UI with QR code for easy web UI access.
- AGP 8.9 + Chaquopy 17 + Gradle 8.11 (avoids the AGP 8.7 thread-lock bug).
- Pre-built pydantic-core wheels for arm64-v8a, x86_64, x86 cross-compiled
  with maturin + Android NDK, linked against Chaquopy's libpython3.11.so.

Python server platform guards:
- New utils/platform.py with is_android()/is_windows()/is_linux() helpers.
- Guard every top-level import of desktop-only packages (mss, psutil,
  sounddevice, pyserial, PyAudioWPatch, etc.) with try/except ImportError.
- Android-incompatible calls gated with None-checks so the server runs on
  reduced capabilities on Android (no CPU/RAM metrics, no mss displays).
- utils/image_codec.py gains a Pillow fallback for resize + JPEG encode
  when cv2 is unavailable; all internal cv2.resize callers migrated.
- New android_entry.py start_server/stop_server invoked from Kotlin.
- get_displays API falls back to best available engine when mss fails.

New capture engines:
- MediaProjectionEngine: receives RGBA frames pushed from Kotlin through
  a thread-safe queue; caches last frame for static-screen previews.
- ScrcpyClientEngine: optional H.264 streaming via scrcpy-client library
  (priority 10, overrides the ADB-screencap engine when installed).

Frontend:
- Tab loaders previously required an apiKey; now correctly treat
  "auth disabled" as authenticated (Android has no auth by default).
- Re-trigger the active tab's loader after loadServerInfo resolves
  authRequired, since initTabs runs earlier.
- Add i18n keys for the demo / mediaprojection / scrcpy_client engines.

Docs:
- TODO.md: follow-ups for multi-ABI wheel rebuilds, CI pipeline, USB
  serial LED controllers, root-only capture, perf metrics abstraction.
- CLAUDE.md: Android dependency sync policy (pip --exclude doesn't exist).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-14 03:11:43 +03:00
parent a0b65e3fcb
commit 8574424fb7
56 changed files with 2443 additions and 126 deletions
+6 -2
View File
@@ -2,11 +2,15 @@
from importlib.metadata import version, PackageNotFoundError
# Fallback version — kept in sync with pyproject.toml.
# Used when the package isn't pip-installed (e.g. embedded via Chaquopy
# on Android, where the source is included directly via source sets).
_FALLBACK_VERSION = "0.3.0"
try:
__version__ = version("ledgrab")
except PackageNotFoundError:
# Running from source without pip install (e.g. dev, embedded Python)
__version__ = "0.0.0-dev"
__version__ = _FALLBACK_VERSION
__author__ = "Alexei Dolgolyov"
__email__ = "dolgolyov.alexei@gmail.com"
+84
View File
@@ -0,0 +1,84 @@
"""Android entry point for LedGrab.
Called from Kotlin via Chaquopy to start/stop the FastAPI server
inside an Android application. Sets up Android-specific paths
(app-private storage) before importing the main application.
"""
import asyncio
import os
import threading
from typing import Optional
_server_thread: Optional[threading.Thread] = None
_shutdown_event: Optional[asyncio.Event] = None
_loop: Optional[asyncio.AbstractEventLoop] = None
def start_server(data_dir: str, port: int = 8080) -> None:
"""Start the LedGrab uvicorn server.
Called from Kotlin's ``PythonBridge.startServer()``. This function
blocks until ``stop_server()`` is called, so Kotlin should invoke it
on a background thread.
Args:
data_dir: Android app-private files directory
(e.g. ``/data/data/com.ledgrab.android/files``).
port: HTTP port for the web UI / API.
"""
# ── Configure paths before any LedGrab imports ──────────────
os.makedirs(os.path.join(data_dir, "data"), exist_ok=True)
os.makedirs(os.path.join(data_dir, "data", "assets"), exist_ok=True)
os.makedirs(os.path.join(data_dir, "config"), exist_ok=True)
# Change working directory to app-private storage so relative
# paths (e.g. "config/default_config.yaml") don't hit system dirs
os.chdir(data_dir)
os.environ["LEDGRAB_STORAGE__DATABASE_FILE"] = os.path.join(data_dir, "data", "ledgrab.db")
os.environ["LEDGRAB_ASSETS__ASSETS_DIR"] = os.path.join(data_dir, "data", "assets")
os.environ["LEDGRAB_SERVER__HOST"] = "0.0.0.0"
os.environ["LEDGRAB_SERVER__PORT"] = str(port)
# ── Now safe to import LedGrab ──────────────────────────────
import uvicorn # noqa: E402
from ledgrab.utils import setup_logging, get_logger # noqa: E402
setup_logging()
logger = get_logger(__name__)
logger.info("LedGrab Android: starting server on port %d", port)
logger.info("Data directory: %s", data_dir)
from ledgrab.config import get_config # noqa: E402
config = get_config()
uv_config = uvicorn.Config(
"ledgrab.main:app",
host=config.server.host,
port=port,
log_level=config.server.log_level.lower(),
# No uvloop/httptools on Android — use pure-Python asyncio
loop="asyncio",
)
server = uvicorn.Server(uv_config)
global _shutdown_event, _loop
_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_loop)
_shutdown_event = asyncio.Event()
logger.info("LedGrab Android: server starting")
_loop.run_until_complete(server.serve())
logger.info("LedGrab Android: server stopped")
def stop_server() -> None:
"""Signal the uvicorn server to shut down gracefully.
Called from Kotlin's ``PythonBridge.stopServer()``.
"""
if _shutdown_event is not None and _loop is not None:
_loop.call_soon_threadsafe(_shutdown_event.set)
@@ -1643,8 +1643,7 @@ async def test_color_strip_ws(
try:
frame = _frame_live.get_latest_frame()
if frame is not None and frame.image is not None:
from ledgrab.utils.image_codec import encode_jpeg
import cv2 as _cv2
from ledgrab.utils.image_codec import encode_jpeg, resize_image
img = frame.image
# Ensure 3-channel RGB (some engines may produce BGRA)
@@ -1668,9 +1667,7 @@ async def test_color_strip_ws(
if scale < 1.0:
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
img = _cv2.resize(
img, (new_w, new_h), interpolation=_cv2.INTER_AREA
)
img = resize_image(img, new_w, new_h)
# Wire format: [0xFD] [jpeg_bytes]
await websocket.send_bytes(b"\xfd" + encode_jpeg(img, quality=70))
except Exception as e:
+38 -5
View File
@@ -12,7 +12,11 @@ from typing import Optional
import os
import psutil
try:
import psutil
except ImportError:
psutil = None # type: ignore[assignment]
from fastapi import APIRouter, Depends, HTTPException, Query
from ledgrab import __version__, REPO_URL, DONATE_URL
@@ -54,9 +58,12 @@ from ledgrab.api.routes.system_settings import load_external_url # noqa: F401
logger = get_logger(__name__)
# Prime psutil CPU counters (first call always returns 0.0)
psutil.cpu_percent(interval=None)
_process = psutil.Process(os.getpid())
_process.cpu_percent(interval=None) # prime process-level counter
if psutil is not None:
psutil.cpu_percent(interval=None)
_process = psutil.Process(os.getpid())
_process.cpu_percent(interval=None) # prime process-level counter
else:
_process = None # type: ignore[assignment]
# GPU monitoring (initialized once in utils.gpu, shared with metrics_history)
from ledgrab.utils.gpu import ( # noqa: E402
@@ -200,7 +207,18 @@ async def get_displays(
else:
display_dataclasses = await asyncio.to_thread(get_available_displays)
else:
display_dataclasses = await asyncio.to_thread(get_available_displays)
# Try the mss-based detection first (desktop default).
# If mss is unavailable (e.g. on Android), fall back to the
# best registered engine that has its own display list.
try:
display_dataclasses = await asyncio.to_thread(get_available_displays)
except RuntimeError:
display_dataclasses = []
if not display_dataclasses:
best = EngineRegistry.get_best_available_engine()
if best:
engine_cls = EngineRegistry.get_engine(best)
display_dataclasses = await asyncio.to_thread(engine_cls.get_available_displays)
# Convert dataclass DisplayInfo to Pydantic DisplayInfo
displays = [
@@ -264,6 +282,21 @@ def get_system_performance(_: AuthRequired):
and NVML calls are blocking and would stall the event loop if run
in an ``async def`` handler.
"""
if psutil is None or _process is None:
# psutil unavailable on this platform (e.g. Android)
from datetime import datetime, timezone
return PerformanceResponse(
timestamp=datetime.now(timezone.utc),
cpu_name=_cpu_name,
cpu_percent=0.0,
ram_used_mb=0.0,
ram_total_mb=0.0,
ram_percent=0.0,
app_cpu_percent=0.0,
app_ram_mb=0.0,
gpu=None,
)
mem = psutil.virtual_memory()
# App-level metrics
+7 -4
View File
@@ -169,10 +169,13 @@ class Config(BaseSettings):
if demo_path.exists():
return cls.from_yaml(demo_path)
# Try default location
default_path = Path("config/default_config.yaml")
if default_path.exists():
return cls.from_yaml(default_path)
# Try default location (guard against permission errors on Android)
try:
default_path = Path("config/default_config.yaml")
if default_path.exists():
return cls.from_yaml(default_path)
except (PermissionError, OSError):
pass
# Use defaults
return cls()
+35 -9
View File
@@ -1,4 +1,9 @@
"""Audio capture engine abstraction layer."""
"""Audio capture engine abstraction layer.
Audio engines with native dependencies (WASAPI, PortAudio) are imported
inside try/except blocks so the package loads cleanly on platforms where
those libraries are unavailable (e.g. Android via Chaquopy).
"""
from ledgrab.core.audio.base import (
AudioCaptureEngine,
@@ -13,13 +18,33 @@ from ledgrab.core.audio.analysis import (
DEFAULT_SAMPLE_RATE,
DEFAULT_CHUNK_SIZE,
)
from ledgrab.core.audio.wasapi_engine import WasapiEngine, WasapiCaptureStream
from ledgrab.core.audio.sounddevice_engine import SounddeviceEngine, SounddeviceCaptureStream
# ── Platform-specific audio engines ─────────────────────────────────
try:
from ledgrab.core.audio.wasapi_engine import WasapiEngine, WasapiCaptureStream
_has_wasapi = True
except ImportError:
_has_wasapi = False
try:
from ledgrab.core.audio.sounddevice_engine import (
SounddeviceEngine,
SounddeviceCaptureStream,
)
_has_sounddevice = True
except ImportError:
_has_sounddevice = False
from ledgrab.core.audio.demo_engine import DemoAudioEngine, DemoAudioCaptureStream
# Auto-register available engines
AudioEngineRegistry.register(WasapiEngine)
AudioEngineRegistry.register(SounddeviceEngine)
if _has_wasapi:
AudioEngineRegistry.register(WasapiEngine)
if _has_sounddevice:
AudioEngineRegistry.register(SounddeviceEngine)
AudioEngineRegistry.register(DemoAudioEngine)
__all__ = [
@@ -32,10 +57,11 @@ __all__ = [
"NUM_BANDS",
"DEFAULT_SAMPLE_RATE",
"DEFAULT_CHUNK_SIZE",
"WasapiEngine",
"WasapiCaptureStream",
"SounddeviceEngine",
"SounddeviceCaptureStream",
"DemoAudioEngine",
"DemoAudioCaptureStream",
]
if _has_wasapi:
__all__ += ["WasapiEngine", "WasapiCaptureStream"]
if _has_sounddevice:
__all__ += ["SounddeviceEngine", "SounddeviceCaptureStream"]
@@ -6,7 +6,6 @@ Non-Windows: graceful degradation (returns empty results).
import asyncio
import ctypes
import ctypes.wintypes
import os
import sys
import threading
@@ -18,6 +17,9 @@ logger = get_logger(__name__)
_IS_WINDOWS = sys.platform == "win32"
if _IS_WINDOWS:
import ctypes.wintypes
class PlatformDetector:
"""Detect running processes and the foreground window's process."""
@@ -3,9 +3,13 @@
from dataclasses import dataclass
from typing import List
import mss
import numpy as np
try:
import mss
except ImportError:
mss = None # type: ignore[assignment]
from ledgrab.utils import get_logger, get_monitor_names, get_monitor_refresh_rates
logger = get_logger(__name__)
@@ -54,6 +58,8 @@ def get_available_displays() -> List[DisplayInfo]:
Raises:
RuntimeError: If unable to detect displays
"""
if mss is None:
return []
try:
# Get friendly monitor names (Windows only, falls back to generic names)
monitor_names = get_monitor_names()
@@ -105,6 +111,8 @@ def capture_display(display_index: int = 0) -> ScreenCapture:
ValueError: If display_index is invalid
RuntimeError: If screen capture fails
"""
if mss is None:
raise RuntimeError("mss library not available on this platform")
try:
with mss.mss() as sct:
# mss monitors[0] is the combined screen, monitors[1+] are individual displays
@@ -1,4 +1,9 @@
"""Screen capture engine abstraction layer."""
"""Screen capture engine abstraction layer.
Engines with native/platform-specific dependencies are imported inside
try/except blocks so the package loads cleanly on any platform (including
Android via Chaquopy where desktop capture libraries are unavailable).
"""
from ledgrab.core.capture_engines.base import (
CaptureEngine,
@@ -7,14 +12,61 @@ from ledgrab.core.capture_engines.base import (
ScreenCapture,
)
from ledgrab.core.capture_engines.factory import EngineRegistry
from ledgrab.core.capture_engines.mss_engine import MSSEngine, MSSCaptureStream
from ledgrab.core.capture_engines.dxcam_engine import DXcamEngine, DXcamCaptureStream
from ledgrab.core.capture_engines.bettercam_engine import BetterCamEngine, BetterCamCaptureStream
from ledgrab.core.capture_engines.wgc_engine import WGCEngine, WGCCaptureStream
from ledgrab.core.capture_engines.scrcpy_engine import ScrcpyEngine, ScrcpyCaptureStream
from ledgrab.core.capture_engines.demo_engine import DemoCaptureEngine, DemoCaptureStream
# Camera engine requires OpenCV — optional dependency
# ── Desktop capture engines (platform-specific native deps) ──────────
try:
from ledgrab.core.capture_engines.mss_engine import MSSEngine, MSSCaptureStream
_has_mss = True
except ImportError:
_has_mss = False
try:
from ledgrab.core.capture_engines.dxcam_engine import DXcamEngine, DXcamCaptureStream
_has_dxcam = True
except ImportError:
_has_dxcam = False
try:
from ledgrab.core.capture_engines.bettercam_engine import (
BetterCamEngine,
BetterCamCaptureStream,
)
_has_bettercam = True
except ImportError:
_has_bettercam = False
try:
from ledgrab.core.capture_engines.wgc_engine import WGCEngine, WGCCaptureStream
_has_wgc = True
except ImportError:
_has_wgc = False
# ── ADB-based Android capture ───────────────────────────────────────
try:
from ledgrab.core.capture_engines.scrcpy_engine import ScrcpyEngine, ScrcpyCaptureStream
_has_scrcpy = True
except ImportError:
_has_scrcpy = False
try:
from ledgrab.core.capture_engines.scrcpy_client_engine import (
ScrcpyClientEngine,
ScrcpyClientCaptureStream,
)
_has_scrcpy_client = True
except ImportError:
_has_scrcpy_client = False
# ── Camera (OpenCV) ─────────────────────────────────────────────────
try:
from ledgrab.core.capture_engines.camera_engine import CameraEngine, CameraCaptureStream
@@ -22,35 +74,67 @@ try:
except ImportError:
_has_camera = False
# Auto-register available engines
EngineRegistry.register(MSSEngine)
EngineRegistry.register(DXcamEngine)
EngineRegistry.register(BetterCamEngine)
EngineRegistry.register(WGCEngine)
EngineRegistry.register(ScrcpyEngine)
# ── Android MediaProjection (Chaquopy bridge) ───────────────────────
try:
from ledgrab.core.capture_engines.mediaprojection_engine import (
MediaProjectionEngine,
MediaProjectionCaptureStream,
)
_has_mediaprojection = True
except ImportError:
_has_mediaprojection = False
# ── Demo / always available ─────────────────────────────────────────
from ledgrab.core.capture_engines.demo_engine import DemoCaptureEngine, DemoCaptureStream
# ── Auto-register available engines ─────────────────────────────────
if _has_mss:
EngineRegistry.register(MSSEngine)
if _has_dxcam:
EngineRegistry.register(DXcamEngine)
if _has_bettercam:
EngineRegistry.register(BetterCamEngine)
if _has_wgc:
EngineRegistry.register(WGCEngine)
if _has_scrcpy:
EngineRegistry.register(ScrcpyEngine)
if _has_scrcpy_client:
EngineRegistry.register(ScrcpyClientEngine)
if _has_camera:
EngineRegistry.register(CameraEngine)
if _has_mediaprojection:
EngineRegistry.register(MediaProjectionEngine)
EngineRegistry.register(DemoCaptureEngine)
# ── Public API ──────────────────────────────────────────────────────
__all__ = [
"CaptureEngine",
"CaptureStream",
"DisplayInfo",
"ScreenCapture",
"EngineRegistry",
"MSSEngine",
"MSSCaptureStream",
"DXcamEngine",
"DXcamCaptureStream",
"BetterCamEngine",
"BetterCamCaptureStream",
"WGCEngine",
"WGCCaptureStream",
"ScrcpyEngine",
"ScrcpyCaptureStream",
"DemoCaptureEngine",
"DemoCaptureStream",
]
if _has_mss:
__all__ += ["MSSEngine", "MSSCaptureStream"]
if _has_dxcam:
__all__ += ["DXcamEngine", "DXcamCaptureStream"]
if _has_bettercam:
__all__ += ["BetterCamEngine", "BetterCamCaptureStream"]
if _has_wgc:
__all__ += ["WGCEngine", "WGCCaptureStream"]
if _has_scrcpy:
__all__ += ["ScrcpyEngine", "ScrcpyCaptureStream"]
if _has_scrcpy_client:
__all__ += ["ScrcpyClientEngine", "ScrcpyClientCaptureStream"]
if _has_camera:
__all__ += ["CameraEngine", "CameraCaptureStream"]
if _has_mediaprojection:
__all__ += ["MediaProjectionEngine", "MediaProjectionCaptureStream"]
@@ -0,0 +1,201 @@
"""Android MediaProjection capture engine.
Receives screen frames pushed from Kotlin (via Chaquopy) through a
module-level frame queue. The Kotlin layer captures the screen using
the ``MediaProjection`` API and calls :func:`push_frame` with raw RGBA
bytes for each frame.
This engine is only available when running inside an Android app that
has set up the frame queue.
"""
import queue
from typing import Any, Dict, List, Optional
import numpy as np
from ledgrab.core.capture_engines.base import (
CaptureEngine,
CaptureStream,
DisplayInfo,
ScreenCapture,
)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Frame queue — the bridge between Kotlin and Python
# ---------------------------------------------------------------------------
_frame_queue: queue.Queue["ScreenCapture"] = queue.Queue(maxsize=2)
_display_info: Optional[DisplayInfo] = None
_active = False
_frames_received = 0
_frames_consumed = 0
# MediaProjection only fires onImageAvailable when the screen changes.
# Cache the last frame so preview stays usable on static screens.
_last_frame: Optional["ScreenCapture"] = None
def configure(width: int, height: int) -> None:
"""Set display dimensions. Called from Kotlin before server start.
Drains any stale frames from a previous capture session so the
first frame after restart is actually current.
"""
global _display_info, _active, _last_frame, _frames_received
# Drain the queue — frames from a previous capture session are stale
while not _frame_queue.empty():
try:
_frame_queue.get_nowait()
except queue.Empty:
break
_last_frame = None
_frames_received = 0
_display_info = DisplayInfo(
index=0,
name="Android TV Screen",
width=width,
height=height,
x=0,
y=0,
is_primary=True,
refresh_rate=60,
)
_active = True
logger.info("MediaProjection engine configured: %dx%d", width, height)
def push_frame(rgba_bytes: bytes, width: int, height: int) -> None:
"""Push a captured frame from Kotlin into the Python pipeline.
Called from Kotlin's ``PythonBridge.pushFrame()`` on the capture
thread. The RGBA byte buffer is converted to an RGB NumPy array
and placed on the queue. If the queue is full (Python consumer is
slow), the oldest frame is dropped.
Args:
rgba_bytes: Raw RGBA pixel data (width * height * 4 bytes).
width: Frame width in pixels.
height: Frame height in pixels.
"""
global _frames_received
_frames_received += 1
if _frames_received == 1 or _frames_received % 100 == 0:
logger.info("MediaProjection: received %d frames", _frames_received)
# RGBA → RGB (drop alpha channel)
rgba = np.frombuffer(rgba_bytes, dtype=np.uint8).reshape((height, width, 4))
rgb = rgba[:, :, :3].copy()
frame = ScreenCapture(
image=rgb,
width=width,
height=height,
display_index=0,
)
global _last_frame
_last_frame = frame
# Drop oldest frame if queue is full (non-blocking)
try:
_frame_queue.put_nowait(frame)
except queue.Full:
try:
_frame_queue.get_nowait()
except queue.Empty:
pass
try:
_frame_queue.put_nowait(frame)
except queue.Full:
pass
def shutdown() -> None:
"""Deactivate the engine. Called when the Android app stops."""
global _active
_active = False
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class MediaProjectionCaptureStream(CaptureStream):
"""Reads frames pushed by Kotlin from the module-level queue."""
def __init__(self, display_index: int, config: Dict[str, Any]):
super().__init__(display_index, config)
def initialize(self) -> None:
if self._initialized:
return
if not _active:
raise RuntimeError(
"MediaProjection engine not configured. "
"This engine is only available inside the Android app."
)
self._initialized = True
logger.info("MediaProjection capture stream initialized")
def capture_frame(self) -> Optional[ScreenCapture]:
if not self._initialized:
self.initialize()
# Prefer fresh frames from the queue; fall back to the last
# received frame when the screen is static (MediaProjection
# only emits frames on actual content changes).
try:
return _frame_queue.get(timeout=0.1)
except queue.Empty:
return _last_frame
def cleanup(self) -> None:
# Drain the queue
while not _frame_queue.empty():
try:
_frame_queue.get_nowait()
except queue.Empty:
break
self._initialized = False
logger.info("MediaProjection capture stream cleaned up")
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class MediaProjectionEngine(CaptureEngine):
"""Android MediaProjection capture engine.
Only available when running inside the LedGrab Android app.
The Kotlin layer calls :func:`configure` at startup and
:func:`push_frame` for each captured frame.
"""
ENGINE_TYPE = "mediaprojection"
ENGINE_PRIORITY = 100 # Highest priority on Android
HAS_OWN_DISPLAYS = True
@classmethod
def is_available(cls) -> bool:
return _active and _display_info is not None
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {}
@classmethod
def get_available_displays(cls) -> List[DisplayInfo]:
if _display_info is not None:
return [_display_info]
return []
@classmethod
def create_stream(
cls, display_index: int, config: Dict[str, Any]
) -> MediaProjectionCaptureStream:
return MediaProjectionCaptureStream(display_index, config)
@@ -0,0 +1,279 @@
"""High-performance Android screen capture via the scrcpy protocol.
Uses the ``scrcpy-client`` library to stream H.264/H.265 video from an
Android device over ADB. Frames are decoded by PyAV and delivered as
NumPy arrays at up to 60 FPS — a major upgrade over the ``adb screencap``
polling approach in :mod:`scrcpy_engine` (~1-2 FPS).
Prerequisites (pip packages):
- scrcpy-client>=0.5.0 (pulls in ``av``, ``adbutils``, ``numpy``)
The library pushes a small (~35 KB) ``scrcpy-server.jar`` to the device,
launches it via ``app_process``, and opens an ADB socket for the encoded
video stream. No APK installation, no root.
"""
import threading
from typing import Any, Dict, List, Optional
import numpy as np
from ledgrab.core.capture_engines.base import (
CaptureEngine,
CaptureStream,
DisplayInfo,
ScreenCapture,
)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
try:
import scrcpy
from adbutils import adb as adb_client
_HAS_SCRCPY_CLIENT = True
except ImportError:
_HAS_SCRCPY_CLIENT = False
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _list_devices() -> List[Dict[str, Any]]:
"""Enumerate connected ADB devices via adbutils."""
if not _HAS_SCRCPY_CLIENT:
return []
devices = []
try:
for dev in adb_client.device_list():
serial = dev.serial
# Query model name
try:
model = dev.prop.model or serial
except Exception:
model = serial
# Query screen resolution
width, height = 1920, 1080
try:
wm_output = dev.shell("wm size")
for line in wm_output.strip().splitlines():
if "x" in line:
parts = line.split()[-1].split("x")
width, height = int(parts[0]), int(parts[1])
break
except Exception:
pass
devices.append(
{
"serial": serial,
"model": model,
"width": width,
"height": height,
}
)
except Exception as e:
logger.debug("Failed to enumerate ADB devices: %s", e)
return devices
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class ScrcpyClientCaptureStream(CaptureStream):
"""H.264/H.265 video stream from an Android device via scrcpy protocol.
The ``scrcpy.Client`` runs in a background thread. On each decoded
frame it invokes our callback, which stores the latest frame.
``capture_frame()`` returns it without blocking.
"""
def __init__(self, display_index: int, config: Dict[str, Any]):
super().__init__(display_index, config)
self._client: Optional["scrcpy.Client"] = None
self._latest_frame: Optional[ScreenCapture] = None
self._frame_lock = threading.Lock()
self._frame_event = threading.Event()
self._client_thread: Optional[threading.Thread] = None
self._device_serial: Optional[str] = None
def initialize(self) -> None:
if self._initialized:
return
devices = _list_devices()
if not devices:
raise RuntimeError(
"No ADB devices found. Connect a device via USB or " "'adb connect <ip>' for WiFi."
)
if self.display_index >= len(devices):
raise RuntimeError(
f"Device index {self.display_index} out of range "
f"(found {len(devices)} device(s))"
)
device = devices[self.display_index]
self._device_serial = device["serial"]
max_fps = self.config.get("max_fps", 30)
max_size = self.config.get("max_size", 480)
bitrate = self.config.get("bitrate", 2_000_000)
logger.info(
"scrcpy-client: connecting to %s (%s, %dx%d) — " "max_fps=%d, max_size=%d, bitrate=%d",
self._device_serial,
device["model"],
device["width"],
device["height"],
max_fps,
max_size,
bitrate,
)
self._client = scrcpy.Client(
device=self._device_serial,
max_fps=max_fps,
max_size=max_size,
bitrate=bitrate,
)
self._client.add_listener(scrcpy.EVENT_FRAME, self._on_frame)
# scrcpy.Client.start() blocks, so run in a thread
self._client_thread = threading.Thread(
target=self._run_client, daemon=True, name="scrcpy-client"
)
self._client_thread.start()
# Wait for first frame (with timeout)
if not self._frame_event.wait(timeout=10.0):
logger.warning(
"scrcpy-client: no frame received within 10s from %s",
self._device_serial,
)
self._initialized = True
logger.info("scrcpy-client: stream initialized for %s", self._device_serial)
def _run_client(self) -> None:
"""Start the scrcpy client (blocking)."""
try:
self._client.start(threaded=False)
except Exception as e:
logger.error("scrcpy-client: error for %s: %s", self._device_serial, e)
def _on_frame(self, frame: np.ndarray) -> None:
"""Callback invoked by scrcpy-client for each decoded frame.
The frame arrives as a BGR numpy array; we convert to RGB to
match the ``ScreenCapture`` convention.
"""
if frame is None:
return
# BGR → RGB
rgb = frame[:, :, ::-1].copy()
h, w = rgb.shape[:2]
with self._frame_lock:
self._latest_frame = ScreenCapture(
image=rgb,
width=w,
height=h,
display_index=self.display_index,
)
self._frame_event.set()
def capture_frame(self) -> Optional[ScreenCapture]:
if not self._initialized:
self.initialize()
if not self._frame_event.is_set():
return None
with self._frame_lock:
frame = self._latest_frame
self._frame_event.clear()
return frame
def cleanup(self) -> None:
if self._client is not None:
try:
self._client.stop()
except Exception as e:
logger.debug("scrcpy-client: stop error: %s", e)
if self._client_thread is not None and self._client_thread.is_alive():
self._client_thread.join(timeout=5)
self._client = None
self._client_thread = None
self._latest_frame = None
self._frame_event.clear()
self._initialized = False
logger.info("scrcpy-client: stream cleaned up (%s)", self._device_serial)
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class ScrcpyClientEngine(CaptureEngine):
"""High-performance Android capture via scrcpy H.264 streaming.
Requires the ``scrcpy-client`` pip package (optional dependency).
When available, this engine is preferred over :class:`ScrcpyEngine`
(which falls back to ``adb screencap`` at ~1-2 FPS).
Prerequisites:
- ``pip install scrcpy-client`` (pulls in PyAV, adbutils)
- adb on PATH (for device connection)
- USB debugging enabled on Android device
"""
ENGINE_TYPE = "scrcpy_client"
ENGINE_PRIORITY = 10 # Higher than ScrcpyEngine (5)
HAS_OWN_DISPLAYS = True
@classmethod
def is_available(cls) -> bool:
return _HAS_SCRCPY_CLIENT
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"max_fps": 30,
"max_size": 480,
"bitrate": 2_000_000,
}
@classmethod
def get_available_displays(cls) -> List[DisplayInfo]:
devices = _list_devices()
displays = []
for idx, device in enumerate(devices):
displays.append(
DisplayInfo(
index=idx,
name=f"{device['model']} ({device['serial']})",
width=device["width"],
height=device["height"],
x=idx * 500,
y=0,
is_primary=(idx == 0),
refresh_rate=60,
)
)
logger.debug("scrcpy-client: detected %d Android device(s)", len(displays))
return displays
@classmethod
def create_stream(cls, display_index: int, config: Dict[str, Any]) -> ScrcpyClientCaptureStream:
return ScrcpyClientCaptureStream(display_index, config)
@@ -2,10 +2,10 @@
from typing import List, Optional
import cv2
import numpy as np
from ledgrab.core.filters.base import FilterOptionDef, PostprocessingFilter
from ledgrab.utils.image_codec import resize_image
from ledgrab.core.filters.image_pool import ImagePool
from ledgrab.core.filters.registry import FilterRegistry
@@ -44,7 +44,7 @@ class DownscalerFilter(PostprocessingFilter):
if new_h == h and new_w == w:
return None
downscaled = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
downscaled = resize_image(image, new_w, new_h)
result = image_pool.acquire(new_h, new_w, image.shape[2] if image.ndim == 3 else 3)
np.copyto(result, downscaled)
+3 -3
View File
@@ -2,10 +2,10 @@
from typing import List, Optional
import cv2
import numpy as np
from ledgrab.core.filters.base import FilterOptionDef, PostprocessingFilter
from ledgrab.utils.image_codec import resize_image
from ledgrab.core.filters.image_pool import ImagePool
from ledgrab.core.filters.registry import FilterRegistry
@@ -42,8 +42,8 @@ class PixelateFilter(PostprocessingFilter):
# vectorized C++ instead of per-block Python loop
small_w = max(1, w // block_size)
small_h = max(1, h // block_size)
small = cv2.resize(image, (small_w, small_h), interpolation=cv2.INTER_AREA)
pixelated = cv2.resize(small, (w, h), interpolation=cv2.INTER_NEAREST)
small = resize_image(image, small_w, small_h)
pixelated = resize_image(small, w, h)
np.copyto(image, pixelated)
return None
@@ -10,9 +10,10 @@ import threading
import time
from typing import TYPE_CHECKING, List, Optional
import cv2
import numpy as np
from ledgrab.utils.image_codec import resize_image
from ledgrab.core.capture.screen_capture import (
calculate_average_color,
calculate_dominant_color,
@@ -150,7 +151,7 @@ class KeyColorsColorStripStream(ColorStripStream):
calc_fn = _CALC_FNS.get(src.interpolation_mode, calculate_average_color)
# Downsample
small = cv2.resize(capture.image, KC_WORK_SIZE, interpolation=cv2.INTER_AREA)
small = resize_image(capture.image, KC_WORK_SIZE[0], KC_WORK_SIZE[1])
# Extract colors per rectangle
n = len(self._rect_names)
@@ -6,7 +6,10 @@ from collections import deque
from datetime import datetime, timezone
from typing import Dict, Optional
import psutil
try:
import psutil
except ImportError:
psutil = None # type: ignore[assignment]
from ledgrab.utils import get_logger
from ledgrab.utils.gpu import (
@@ -21,8 +24,11 @@ MAX_SAMPLES = 120 # ~2 minutes at 1-second interval
SAMPLE_INTERVAL = 1.0 # seconds
_process = psutil.Process(os.getpid())
_process.cpu_percent(interval=None) # prime process-level counter
if psutil is not None:
_process = psutil.Process(os.getpid())
_process.cpu_percent(interval=None) # prime process-level counter
else:
_process = None # type: ignore[assignment]
def _collect_system_snapshot() -> dict:
@@ -30,6 +36,21 @@ def _collect_system_snapshot() -> dict:
Returns a dict suitable for direct JSON serialization.
"""
if psutil is None or _process is None:
# psutil unavailable (e.g. Android) — return zeroed snapshot
return {
"t": datetime.now(timezone.utc).isoformat(),
"cpu": 0.0,
"ram_pct": 0.0,
"ram_used": 0.0,
"ram_total": 0.0,
"app_cpu": 0.0,
"app_ram": 0.0,
"gpu_util": None,
"gpu_temp": None,
"app_gpu_mem": None,
}
mem = psutil.virtual_memory()
proc_mem = _process.memory_info()
snapshot = {
+8
View File
@@ -198,6 +198,7 @@ import {
// Layer 6: tabs, navigation, command palette, settings
import { switchTab, initTabs, startAutoRefresh, handlePopState } from './features/tabs.ts';
import { callTabLoader } from './core/tab-registry.ts';
import { navigateToCard } from './core/navigation.ts';
import { openCommandPalette, closeCommandPalette, initCommandPalette } from './core/command-palette.ts';
import {
@@ -761,6 +762,13 @@ document.addEventListener('DOMContentLoaded', async () => {
loadDisplays();
loadTargetsTab();
// Trigger the active tab's loader — initTabs() ran before authRequired
// was known, so its conditional loader call may have been skipped.
const activeTab = localStorage.getItem('activeTab') || 'dashboard';
if (activeTab !== 'targets') {
callTabLoader(activeTab);
}
// Start global events WebSocket and auto-refresh
startEventsWS();
startEntityEventListeners();
@@ -2,7 +2,7 @@
* Tab switching — switchTab, initTabs, startAutoRefresh, hash routing.
*/
import { apiKey, refreshInterval, setRefreshInterval, dashboardPollInterval } from '../core/state.ts';
import { apiKey, authRequired, refreshInterval, setRefreshInterval, dashboardPollInterval } from '../core/state.ts';
import { getActiveSubTab, setActiveSubTab, callTabLoader, callSubTabSwitcher, getSubTabConfig, getTabConfig } from '../core/tab-registry.ts';
/** Parse location.hash into {tab, subTab}. */
@@ -48,9 +48,12 @@ export function switchTab(name: string, { updateHash = true, skipLoad = false }:
_setHash(name, getActiveSubTab(name));
}
// Authenticated when either auth is disabled or we have an apiKey
const isAuthed = !authRequired || !!apiKey;
if (name === 'dashboard') {
// Use window.* to avoid circular imports with feature modules
if (!skipLoad && apiKey) callTabLoader(name);
if (!skipLoad && isAuthed) callTabLoader(name);
} else {
if (typeof window.stopPerfPolling === 'function') window.stopPerfPolling();
if (typeof window.stopUptimeTimer === 'function') window.stopUptimeTimer();
@@ -59,7 +62,7 @@ export function switchTab(name: string, { updateHash = true, skipLoad = false }:
if (typeof window.disconnectAllKCWebSockets === 'function') window.disconnectAllKCWebSockets();
if (typeof window.disconnectAllLedPreviewWS === 'function') window.disconnectAllLedPreviewWS();
}
if (!apiKey || skipLoad) return;
if (!isAuthed || skipLoad) return;
callTabLoader(name);
}
}
@@ -97,7 +100,8 @@ export function startAutoRefresh(): void {
}
setRefreshInterval(setInterval(() => {
if (!apiKey || document.hidden) return;
const isAuthed = !authRequired || !!apiKey;
if (!isAuthed || document.hidden) return;
const activeTab = localStorage.getItem('activeTab') || 'dashboard';
const cfg = getTabConfig(activeTab);
if (!cfg?.autoRefresh) return;
+4 -1
View File
@@ -80,8 +80,11 @@
"templates.engine.dxcam.desc": "DirectX, low latency",
"templates.engine.bettercam.desc": "DirectX, high performance",
"templates.engine.camera.desc": "USB/IP camera capture",
"templates.engine.scrcpy.desc": "Android screen mirror",
"templates.engine.scrcpy.desc": "Android screen mirror (adb screencap)",
"templates.engine.scrcpy_client.desc": "Android H.264 stream, high FPS",
"templates.engine.wgc.desc": "Windows Graphics Capture",
"templates.engine.demo.desc": "Animated test pattern (demo mode)",
"templates.engine.mediaprojection.desc": "Native Android screen capture",
"templates.config": "Configuration",
"templates.config.show": "Show configuration",
"templates.config.none": "No additional configuration",
+4 -1
View File
@@ -84,8 +84,11 @@
"templates.engine.dxcam.desc": "DirectX, низкая задержка",
"templates.engine.bettercam.desc": "DirectX, высокая производительность",
"templates.engine.camera.desc": "Захват USB/IP камеры",
"templates.engine.scrcpy.desc": "Зеркалирование экрана Android",
"templates.engine.scrcpy.desc": "Зеркалирование экрана Android (adb screencap)",
"templates.engine.scrcpy_client.desc": "Android H.264 поток, высокая частота",
"templates.engine.wgc.desc": "Windows Graphics Capture",
"templates.engine.demo.desc": "Тестовый анимированный шаблон (демо)",
"templates.engine.mediaprojection.desc": "Нативный захват экрана Android",
"templates.config": "Конфигурация",
"templates.config.show": "Показать конфигурацию",
"templates.config.none": "Нет дополнительных настроек",
+4 -1
View File
@@ -84,8 +84,11 @@
"templates.engine.dxcam.desc": "DirectX,低延迟",
"templates.engine.bettercam.desc": "DirectX,高性能",
"templates.engine.camera.desc": "USB/IP摄像头捕获",
"templates.engine.scrcpy.desc": "Android屏幕镜像",
"templates.engine.scrcpy.desc": "Android屏幕镜像 (adb screencap)",
"templates.engine.scrcpy_client.desc": "Android H.264流,高帧率",
"templates.engine.wgc.desc": "Windows图形捕获",
"templates.engine.demo.desc": "动画测试图案(演示模式)",
"templates.engine.mediaprojection.desc": "原生Android屏幕捕获",
"templates.config": "配置",
"templates.config.show": "显示配置",
"templates.config.none": "无额外配置",
+102 -36
View File
@@ -1,24 +1,62 @@
"""Image encoding/decoding/resizing utilities using OpenCV.
"""Image encoding/decoding/resizing utilities.
Uses OpenCV (cv2) when available for best performance, falls back to
Pillow on platforms where cv2 is unavailable (e.g. Android via Chaquopy).
Replaces PIL/Pillow for JPEG encoding, image loading, and resizing operations.
All functions work with numpy RGB arrays (H, W, 3) uint8.
"""
import base64
import io
from pathlib import Path
from typing import Tuple, Union
import cv2
import numpy as np
try:
import cv2
_HAS_CV2 = True
except ImportError:
_HAS_CV2 = False
try:
from PIL import Image
_HAS_PIL = True
except ImportError:
_HAS_PIL = False
def _require_backend() -> None:
if not _HAS_CV2 and not _HAS_PIL:
raise ImportError(
"Neither opencv-python-headless nor Pillow is installed. "
"At least one is required for image operations."
)
# ---------------------------------------------------------------------------
# Encode
# ---------------------------------------------------------------------------
def encode_jpeg(image: np.ndarray, quality: int = 85) -> bytes:
"""Encode an RGB numpy array as JPEG bytes."""
bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
ok, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not ok:
raise RuntimeError("JPEG encoding failed")
return buf.tobytes()
if _HAS_CV2:
bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
ok, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not ok:
raise RuntimeError("JPEG encoding failed")
return buf.tobytes()
if _HAS_PIL:
img = Image.fromarray(image, "RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=quality)
return buf.getvalue()
_require_backend()
def encode_jpeg_data_uri(image: np.ndarray, quality: int = 85) -> str:
@@ -28,62 +66,90 @@ def encode_jpeg_data_uri(image: np.ndarray, quality: int = 85) -> str:
return f"data:image/jpeg;base64,{b64}"
def resize_image(image: np.ndarray, width: int, height: int) -> np.ndarray:
"""Resize an image to exact dimensions.
# ---------------------------------------------------------------------------
# Resize
# ---------------------------------------------------------------------------
Uses INTER_AREA for downscaling (better quality, faster) and
INTER_LANCZOS4 for upscaling.
"""
h, w = image.shape[:2]
shrinking = (width * height) < (w * h)
interp = cv2.INTER_AREA if shrinking else cv2.INTER_LANCZOS4
return cv2.resize(image, (width, height), interpolation=interp)
def resize_image(image: np.ndarray, width: int, height: int) -> np.ndarray:
"""Resize an image to exact dimensions."""
if _HAS_CV2:
h, w = image.shape[:2]
shrinking = (width * height) < (w * h)
interp = cv2.INTER_AREA if shrinking else cv2.INTER_LANCZOS4
return cv2.resize(image, (width, height), interpolation=interp)
if _HAS_PIL:
img = Image.fromarray(image, "RGB")
resized = img.resize((width, height), Image.LANCZOS)
return np.asarray(resized)
_require_backend()
def thumbnail(image: np.ndarray, max_width: int) -> np.ndarray:
"""Create a thumbnail that fits within max_width, preserving aspect ratio.
Uses INTER_AREA (optimal for downscaling).
"""
"""Create a thumbnail that fits within max_width, preserving aspect ratio."""
h, w = image.shape[:2]
if w <= max_width:
return image.copy()
scale = max_width / w
new_w = max_width
new_h = max(1, int(h * scale))
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
return resize_image(image, new_w, new_h)
def resize_down(image: np.ndarray, max_width: int) -> np.ndarray:
"""Downscale if wider than max_width; return as-is otherwise.
Uses INTER_AREA (optimal for downscaling).
"""
"""Downscale if wider than max_width; return as-is otherwise."""
h, w = image.shape[:2]
if w <= max_width:
return image
scale = max_width / w
new_w = max_width
new_h = max(1, int(h * scale))
return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
return resize_image(image, new_w, new_h)
# ---------------------------------------------------------------------------
# Load
# ---------------------------------------------------------------------------
def load_image_file(path: Union[str, Path]) -> np.ndarray:
"""Load an image file and return as RGB numpy array."""
path = str(path)
bgr = cv2.imread(path, cv2.IMREAD_COLOR)
if bgr is None:
raise FileNotFoundError(f"Cannot load image: {path}")
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
if _HAS_CV2:
bgr = cv2.imread(path, cv2.IMREAD_COLOR)
if bgr is None:
raise FileNotFoundError(f"Cannot load image: {path}")
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
if _HAS_PIL:
img = Image.open(path).convert("RGB")
return np.asarray(img)
_require_backend()
def load_image_bytes(data: bytes) -> np.ndarray:
"""Decode image bytes (JPEG, PNG, etc.) and return as RGB numpy array."""
arr = np.frombuffer(data, dtype=np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise ValueError("Cannot decode image data")
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
if _HAS_CV2:
arr = np.frombuffer(data, dtype=np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise ValueError("Cannot decode image data")
return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
if _HAS_PIL:
img = Image.open(io.BytesIO(data)).convert("RGB")
return np.asarray(img)
_require_backend()
# ---------------------------------------------------------------------------
# Info
# ---------------------------------------------------------------------------
def image_size(image: np.ndarray) -> Tuple[int, int]:
+37
View File
@@ -0,0 +1,37 @@
"""Platform detection utilities.
Centralizes platform checks so the rest of the codebase can use
``is_android()``, ``is_windows()``, etc. instead of ad-hoc
``sys.platform`` comparisons.
Android reports ``sys.platform == "linux"``, so a dedicated check is
needed to distinguish it from desktop Linux.
"""
import os
import sys
_is_android: bool | None = None
def is_android() -> bool:
"""Return True when running inside an Android environment (Chaquopy)."""
global _is_android
if _is_android is None:
_is_android = hasattr(sys, "getandroidapilevel") or bool(os.environ.get("ANDROID_ROOT"))
return _is_android
def is_windows() -> bool:
"""Return True on Windows."""
return sys.platform == "win32"
def is_linux() -> bool:
"""Return True on desktop Linux (excludes Android)."""
return sys.platform == "linux" and not is_android()
def is_macos() -> bool:
"""Return True on macOS."""
return sys.platform == "darwin"