Add ADB-based Android screen capture engine with display picker integration
New scrcpy/ADB capture engine that captures Android device screens over ADB using screencap polling. Supports USB and WiFi ADB connections with device auto-discovery. Engine-aware display picker shows Android devices when scrcpy engine is selected, with inline ADB connect form for WiFi devices. Key changes: - New scrcpy_engine.py using adb screencap polling (~1-2 FPS over WiFi) - Engine-aware GET /config/displays?engine_type= API - ADB connect/disconnect API endpoints (POST /adb/connect, /adb/disconnect) - Display picker supports engine-specific device lists - Stream/test modals pass engine type to display picker - Test template handler changed to sync def to prevent event loop blocking - Restart script merges registry PATH for newly-installed tools - All engines (including unavailable) shown in engine list with status flag Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -11,12 +11,14 @@ from wled_controller.core.capture_engines.mss_engine import MSSEngine, MSSCaptur
|
||||
from wled_controller.core.capture_engines.dxcam_engine import DXcamEngine, DXcamCaptureStream
|
||||
from wled_controller.core.capture_engines.bettercam_engine import BetterCamEngine, BetterCamCaptureStream
|
||||
from wled_controller.core.capture_engines.wgc_engine import WGCEngine, WGCCaptureStream
|
||||
from wled_controller.core.capture_engines.scrcpy_engine import ScrcpyEngine, ScrcpyCaptureStream
|
||||
|
||||
# Auto-register available engines
|
||||
EngineRegistry.register(MSSEngine)
|
||||
EngineRegistry.register(DXcamEngine)
|
||||
EngineRegistry.register(BetterCamEngine)
|
||||
EngineRegistry.register(WGCEngine)
|
||||
EngineRegistry.register(ScrcpyEngine)
|
||||
|
||||
__all__ = [
|
||||
"CaptureEngine",
|
||||
@@ -32,4 +34,6 @@ __all__ = [
|
||||
"BetterCamCaptureStream",
|
||||
"WGCEngine",
|
||||
"WGCCaptureStream",
|
||||
"ScrcpyEngine",
|
||||
"ScrcpyCaptureStream",
|
||||
]
|
||||
|
||||
350
server/src/wled_controller/core/capture_engines/scrcpy_engine.py
Normal file
350
server/src/wled_controller/core/capture_engines/scrcpy_engine.py
Normal file
@@ -0,0 +1,350 @@
|
||||
"""ADB-based Android screen capture engine.
|
||||
|
||||
Captures Android device screens over ADB (USB or WiFi) by polling
|
||||
``adb exec-out screencap -p`` in a background thread. Each poll returns
|
||||
a PNG screenshot that is decoded into a NumPy RGB array.
|
||||
|
||||
Typical throughput is ~1-2 FPS over WiFi, depending on the device. For
|
||||
LED ambient lighting this is usually sufficient because WLED controllers
|
||||
apply smooth colour transitions between updates.
|
||||
|
||||
Prerequisites (system binaries, NOT Python packages):
|
||||
- adb (bundled with scrcpy, or Android SDK Platform-Tools)
|
||||
"""
|
||||
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from wled_controller.core.capture_engines.base import (
|
||||
CaptureEngine,
|
||||
CaptureStream,
|
||||
DisplayInfo,
|
||||
ScreenCapture,
|
||||
)
|
||||
from wled_controller.utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ADB_TIMEOUT = 10 # seconds for ADB commands
|
||||
|
||||
|
||||
def _find_adb() -> str:
|
||||
"""Return the path to the ``adb`` binary.
|
||||
|
||||
Prefers the copy bundled with scrcpy (avoids ADB version mismatches),
|
||||
falls back to whatever ``adb`` is on PATH.
|
||||
"""
|
||||
# Check for scrcpy-bundled adb next to scrcpy on PATH
|
||||
scrcpy_path = shutil.which("scrcpy")
|
||||
if scrcpy_path:
|
||||
bundled = os.path.join(os.path.dirname(scrcpy_path), "adb.exe")
|
||||
if os.path.isfile(bundled):
|
||||
return bundled
|
||||
bundled_unix = os.path.join(os.path.dirname(scrcpy_path), "adb")
|
||||
if os.path.isfile(bundled_unix):
|
||||
return bundled_unix
|
||||
|
||||
# Fall back to system adb
|
||||
system_adb = shutil.which("adb")
|
||||
if system_adb:
|
||||
return system_adb
|
||||
|
||||
return "adb" # last resort — will fail with FileNotFoundError
|
||||
|
||||
|
||||
_adb_path: Optional[str] = None
|
||||
|
||||
|
||||
def _get_adb() -> str:
|
||||
"""Cached lookup of the adb binary path."""
|
||||
global _adb_path
|
||||
if _adb_path is None:
|
||||
_adb_path = _find_adb()
|
||||
logger.debug(f"Using adb: {_adb_path}")
|
||||
return _adb_path
|
||||
|
||||
|
||||
def _list_adb_devices() -> List[Dict[str, Any]]:
|
||||
"""Enumerate connected ADB devices with metadata.
|
||||
|
||||
Returns a list of dicts: {serial, model, width, height}.
|
||||
"""
|
||||
adb = _get_adb()
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[adb, "devices", "-l"],
|
||||
capture_output=True, text=True, timeout=_ADB_TIMEOUT,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return []
|
||||
|
||||
devices: List[Dict[str, Any]] = []
|
||||
for line in result.stdout.strip().splitlines()[1:]: # skip header
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
parts = line.split()
|
||||
# Second token is the state: "device", "offline", "unauthorized", etc.
|
||||
if len(parts) < 2 or parts[1] != "device":
|
||||
continue
|
||||
|
||||
serial = parts[0]
|
||||
|
||||
# Extract model name from '-l' output (e.g. model:Pixel_6)
|
||||
model = serial
|
||||
for part in parts:
|
||||
if part.startswith("model:"):
|
||||
model = part.split(":", 1)[1].replace("_", " ")
|
||||
break
|
||||
|
||||
# Query screen resolution
|
||||
width, height = 1920, 1080 # sensible defaults
|
||||
try:
|
||||
size_result = subprocess.run(
|
||||
[adb, "-s", serial, "shell", "wm", "size"],
|
||||
capture_output=True, text=True, timeout=_ADB_TIMEOUT,
|
||||
)
|
||||
match = re.search(r"(\d+)x(\d+)", size_result.stdout)
|
||||
if match:
|
||||
width, height = int(match.group(1)), int(match.group(2))
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
|
||||
devices.append({
|
||||
"serial": serial,
|
||||
"model": model,
|
||||
"width": width,
|
||||
"height": height,
|
||||
})
|
||||
|
||||
return devices
|
||||
|
||||
|
||||
def _screencap_once(adb: str, serial: str) -> Optional[np.ndarray]:
|
||||
"""Capture a single PNG screenshot and return it as an RGB NumPy array."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[adb, "-s", serial, "exec-out", "screencap", "-p"],
|
||||
capture_output=True, timeout=_ADB_TIMEOUT,
|
||||
)
|
||||
if result.returncode != 0 or len(result.stdout) < 100:
|
||||
return None
|
||||
|
||||
img = Image.open(io.BytesIO(result.stdout))
|
||||
return np.asarray(img.convert("RGB"))
|
||||
except Exception as e:
|
||||
logger.debug(f"screencap failed for {serial}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CaptureStream
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class ScrcpyCaptureStream(CaptureStream):
|
||||
"""ADB screencap-based capture stream for a specific Android device.
|
||||
|
||||
A background thread repeatedly calls ``adb exec-out screencap -p``
|
||||
and stores the latest decoded frame. ``capture_frame()`` returns it
|
||||
without blocking.
|
||||
"""
|
||||
|
||||
def __init__(self, display_index: int, config: Dict[str, Any]):
|
||||
super().__init__(display_index, config)
|
||||
self._capture_thread: Optional[threading.Thread] = None
|
||||
self._latest_frame: Optional[ScreenCapture] = None
|
||||
self._frame_lock = threading.Lock()
|
||||
self._frame_event = threading.Event()
|
||||
self._running = False
|
||||
self._device_serial: Optional[str] = None
|
||||
|
||||
def initialize(self) -> None:
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# ── Resolve device serial from display index ──
|
||||
devices = _list_adb_devices()
|
||||
if not devices:
|
||||
raise RuntimeError(
|
||||
"No ADB devices found. Connect a device via USB or use "
|
||||
"'adb connect <ip>' for WiFi."
|
||||
)
|
||||
if self.display_index >= len(devices):
|
||||
raise RuntimeError(
|
||||
f"Device index {self.display_index} out of range "
|
||||
f"(found {len(devices)} device(s))"
|
||||
)
|
||||
|
||||
device = devices[self.display_index]
|
||||
self._device_serial = device["serial"]
|
||||
logger.info(
|
||||
f"ADB screencap: initializing capture for device "
|
||||
f"{self._device_serial} ({device['model']}, "
|
||||
f"{device['width']}x{device['height']})"
|
||||
)
|
||||
|
||||
# ── Verify screencap works with a test capture ──
|
||||
test_frame = _screencap_once(_get_adb(), self._device_serial)
|
||||
if test_frame is None:
|
||||
raise RuntimeError(
|
||||
f"ADB screencap failed for device {self._device_serial}. "
|
||||
"Ensure the device is connected and USB debugging is enabled."
|
||||
)
|
||||
|
||||
h, w = test_frame.shape[:2]
|
||||
with self._frame_lock:
|
||||
self._latest_frame = ScreenCapture(
|
||||
image=test_frame,
|
||||
width=w,
|
||||
height=h,
|
||||
display_index=self.display_index,
|
||||
)
|
||||
self._frame_event.set()
|
||||
|
||||
# ── Start background polling thread ──
|
||||
self._running = True
|
||||
self._capture_thread = threading.Thread(
|
||||
target=self._capture_loop, daemon=True, name="adb-screencap"
|
||||
)
|
||||
self._capture_thread.start()
|
||||
|
||||
self._initialized = True
|
||||
logger.info(
|
||||
f"ADB screencap stream initialized "
|
||||
f"(device={self._device_serial}, first frame {w}x{h})"
|
||||
)
|
||||
|
||||
def _capture_loop(self) -> None:
|
||||
"""Background thread: poll ``adb screencap`` and store latest frame."""
|
||||
adb = _get_adb()
|
||||
serial = self._device_serial
|
||||
poll_interval = self.config.get("poll_interval", 0.0)
|
||||
|
||||
while self._running:
|
||||
rgb = _screencap_once(adb, serial)
|
||||
if rgb is None:
|
||||
if self._running:
|
||||
time.sleep(1.0) # back off on failure
|
||||
continue
|
||||
|
||||
h, w = rgb.shape[:2]
|
||||
with self._frame_lock:
|
||||
self._latest_frame = ScreenCapture(
|
||||
image=rgb,
|
||||
width=w,
|
||||
height=h,
|
||||
display_index=self.display_index,
|
||||
)
|
||||
self._frame_event.set()
|
||||
|
||||
if poll_interval > 0:
|
||||
time.sleep(poll_interval)
|
||||
|
||||
def capture_frame(self) -> Optional[ScreenCapture]:
|
||||
if not self._initialized:
|
||||
self.initialize()
|
||||
|
||||
if not self._frame_event.is_set():
|
||||
return None
|
||||
|
||||
with self._frame_lock:
|
||||
if self._latest_frame is None:
|
||||
return None
|
||||
frame = self._latest_frame
|
||||
self._frame_event.clear()
|
||||
|
||||
return frame
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self._running = False
|
||||
|
||||
if self._capture_thread and self._capture_thread.is_alive():
|
||||
self._capture_thread.join(timeout=_ADB_TIMEOUT + 2)
|
||||
self._capture_thread = None
|
||||
|
||||
self._frame_event.clear()
|
||||
self._latest_frame = None
|
||||
self._initialized = False
|
||||
logger.info(
|
||||
f"ADB screencap stream cleaned up "
|
||||
f"(device={self._device_serial})"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CaptureEngine
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class ScrcpyEngine(CaptureEngine):
|
||||
"""ADB-based Android screen capture engine.
|
||||
|
||||
Captures Android device screens over ADB using ``screencap``.
|
||||
Supports both USB and WiFi (TCP/IP) ADB connections.
|
||||
scrcpy is optional (used only for device management); only ``adb``
|
||||
is required for capture.
|
||||
|
||||
Prerequisites:
|
||||
- adb on PATH (bundled with scrcpy, or standalone)
|
||||
- USB debugging enabled on Android device
|
||||
"""
|
||||
|
||||
ENGINE_TYPE = "scrcpy"
|
||||
ENGINE_PRIORITY = 5
|
||||
|
||||
@classmethod
|
||||
def is_available(cls) -> bool:
|
||||
adb = _get_adb()
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[adb, "version"],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls) -> Dict[str, Any]:
|
||||
return {
|
||||
"poll_interval": 0.0,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_available_displays(cls) -> List[DisplayInfo]:
|
||||
devices = _list_adb_devices()
|
||||
|
||||
displays = []
|
||||
for idx, device in enumerate(devices):
|
||||
displays.append(DisplayInfo(
|
||||
index=idx,
|
||||
name=f"{device['model']} ({device['serial']})",
|
||||
width=device["width"],
|
||||
height=device["height"],
|
||||
x=idx * 500, # spread horizontally for visual picker
|
||||
y=0,
|
||||
is_primary=(idx == 0),
|
||||
refresh_rate=60,
|
||||
))
|
||||
|
||||
logger.debug(f"ADB detected {len(displays)} Android device(s)")
|
||||
return displays
|
||||
|
||||
@classmethod
|
||||
def create_stream(
|
||||
cls, display_index: int, config: Dict[str, Any]
|
||||
) -> ScrcpyCaptureStream:
|
||||
return ScrcpyCaptureStream(display_index, config)
|
||||
Reference in New Issue
Block a user