refactor(devices): route ESP-NOW client through SerialTransport
Build Android APK / build-android (push) Failing after 1m39s
Lint & Test / test (push) Successful in 4m54s

Drops the direct pyserial imports from espnow_client/espnow_provider
in favor of open_transport/list_serial_ports/port_exists. The gateway
protocol is write-only, so no read() extension was needed. ESP-NOW
gateways are now reachable via usb:VID:PID URLs on Android.
This commit is contained in:
2026-04-14 19:15:08 +03:00
parent 580bd692e6
commit 928d626620
3 changed files with 85 additions and 66 deletions
+11 -12
View File
@@ -34,19 +34,18 @@ Build the Android APK automatically on push/tag.
## Android Root Capture (No Permission Dialog, No System Indicator)
MediaProjection shows a mandatory system overlay/indicator while capturing — unavoidable on stock Android. Many cheap Android TV boxes ship pre-rooted, so an alternative root-only path would give much better UX.
MediaProjection shows a mandatory system overlay/indicator while capturing — unavoidable on stock Android. Many cheap Android TV boxes ship pre-rooted, so an alternative root-only path gives much better UX.
- [ ] Detect root at runtime: check for `su` binary, `Superuser.apk`, etc.
- [ ] Implement `SurfaceControlCaptureEngine` (new capture engine) using hidden `SurfaceControl.screenshot()` API via reflection
- No permission dialog
- No system capture indicator
- Direct bitmap output (no encoder/decoder roundtrip)
- [ ] Engine priority: higher than MediaProjection when root detected
- [ ] Fallback chain: `SurfaceControl` (root) → `MediaProjection` (stock) → `adb screencap` (last resort)
- [ ] Handle Android version differences in `SurfaceControl` API surface (renamed/moved across API 29, 30, 33)
- [ ] Alternative: shell out to `screenrecord --output-format=h264 -` as root (same H.264 decode as scrcpy_client_engine, but local instead of remote ADB)
- [x] Root detection — `Root.kt` checks common `su` binary paths and, on demand, runs `su -c id` to actually prove UID 0. First call triggers Magisk's grant dialog; grant is cached per session. Exposed to Python via Chaquopy.
- [x] `RootScreenrecord.kt` — spawns `su -c screenrecord --output-format=h264 --size=WxH -`, feeds the H.264 stdout through a MediaCodec decoder whose output Surface is wired into an ImageReader (RGBA_8888, row-stride-aware). Decoded frames reach the Python pipeline via `PythonBridge.pushRootFrame`.
- [x] Python-side `RootScreenrecordEngine` (`core/capture_engines/root_screenrecord_engine.py`) mirrors `MediaProjectionEngine` with `ENGINE_PRIORITY=110` (> MediaProjection's 100) so the factory picks it automatically when available.
- [x] `MainActivity` tries `Root.requestGrant()` before launching the MediaProjection consent flow — on rooted devices the consent dialog is skipped entirely. `CaptureService` has a `createRootIntent()` entry point that bypasses the MediaProjection path.
- [x] Fallback: if `Root.requestGrant()` returns false (no root, user denied, or `su` timeout) the existing MediaProjection flow runs unchanged.
- [ ] Real-hardware test pending — need to verify on the user's Magisk'd TV box that: (1) grant dialog appears once, (2) frames actually flow through MediaCodec without the Android 14 capture indicator showing, (3) stop/start cycle terminates the `su` process cleanly.
- [WONTDO] `SurfaceControl.screenshot()` via reflection — renamed/moved across API 28/29/30/33, hidden-API blocklist varies by release, even rooted apps hit it; days of maintenance for a marginal latency win over the screenrecord path. Not worth it.
- [WONTDO] `adb screencap` fallback — full-PNG-per-frame pipeline is slower than MediaProjection, no value as a last resort.
Known projects using this approach for reference: scrcpy-hidden-api, shizuku, commercial scrcpy-derived apps.
Known projects using the screenrecord approach for reference: scrcpy (over ADB), scrcpy-hidden-api, shizuku.
## Android USB Serial Support
@@ -60,7 +59,7 @@ Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV
- [ ] Real-device test pending — no USB-serial hardware on dev machine. Need to verify on a TV box with CH340, CP2102, or FTDI adapter.
- [ ] Document supported USB LED controllers in README (once real-device test passes).
- [ ] Optional: auto-launch the app when a known USB-serial adapter is plugged in (intent-filter on `USB_DEVICE_ATTACHED` + `res/xml/device_filter.xml`). Skipped in v1 — users can just open LedGrab and hit "Discover".
- [ ] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) still imports `pyserial` directly and needs bidirectional reads — separate refactor to extend the transport with `read()` if ESP-NOW-via-USB on Android is needed.
- [x] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) now routes through `SerialTransport``open_transport()` for the gateway serial link, `list_serial_ports()` + `port_exists()` for discovery/validation. Works transparently with `usb:VID:PID` URLs on Android. (Gateway protocol is write-only, so no `read()` extension was needed after all.)
## Performance Metrics Abstraction
@@ -8,6 +8,11 @@ from typing import List, Optional, Tuple, Union
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.serial_transport import (
open_transport,
parse_serial_url,
port_exists,
)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -15,6 +20,7 @@ logger = get_logger(__name__)
# Gateway serial protocol constants
FRAME_START = 0xEE
FRAME_END = 0xEF
DEFAULT_ESPNOW_BAUD = 921600
def _mac_str_to_bytes(mac: str) -> bytes:
@@ -53,14 +59,14 @@ class ESPNowClient(LEDClient):
self,
url: str = "",
led_count: int = 0,
baud_rate: int = 921600,
baud_rate: Optional[int] = None,
espnow_peer_mac: str = "FF:FF:FF:FF:FF:FF",
espnow_channel: int = 1,
**kwargs,
):
self._port = url
self._port, url_baud = parse_serial_url(url)
self._baud_rate = baud_rate or url_baud or DEFAULT_ESPNOW_BAUD
self._led_count = led_count
self._baud_rate = baud_rate
self._peer_mac = _mac_str_to_bytes(espnow_peer_mac)
self._channel = espnow_channel
self._serial = None
@@ -69,15 +75,12 @@ class ESPNowClient(LEDClient):
async def connect(self) -> bool:
try:
import serial as pyserial
except ImportError:
raise RuntimeError("pyserial is required for ESP-NOW devices: pip install pyserial")
self._serial = open_transport(self._port, baud_rate=self._baud_rate, timeout=1)
await asyncio.to_thread(self._serial.open)
except Exception as e:
logger.error("Failed to open ESP-NOW gateway %s: %s", self._port, e)
raise RuntimeError(f"Failed to open ESP-NOW gateway {self._port}: {e}")
loop = asyncio.get_event_loop()
self._serial = await loop.run_in_executor(
None,
lambda: pyserial.Serial(self._port, self._baud_rate, timeout=1),
)
self._connected = True
logger.info(
"ESP-NOW client connected: port=%s baud=%d peer=%s channel=%d leds=%d",
@@ -90,9 +93,8 @@ class ESPNowClient(LEDClient):
return True
async def close(self) -> None:
if self._serial and self._serial.is_open:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._serial.close)
if self._serial is not None and self._serial.is_open:
await asyncio.to_thread(self._serial.close)
self._serial = None
self._connected = False
logger.info("ESP-NOW client closed: port=%s", self._port)
@@ -150,14 +152,21 @@ class ESPNowClient(LEDClient):
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the serial port is available."""
"""Check if the serial port is available without opening it."""
port, _baud = parse_serial_url(url)
try:
import serial as pyserial
s = pyserial.Serial(url, timeout=0.1)
s.close()
if port_exists(port):
return DeviceHealth(
online=True,
latency_ms=0.0,
last_checked=datetime.now(timezone.utc),
device_name=prev_health.device_name if prev_health else None,
device_led_count=prev_health.device_led_count if prev_health else None,
)
return DeviceHealth(
online=True, latency_ms=0.0, last_checked=datetime.now(timezone.utc)
online=False,
last_checked=datetime.now(timezone.utc),
error=f"Serial port {port} not found",
)
except Exception as e:
return DeviceHealth(
@@ -2,24 +2,33 @@
from typing import List
from ledgrab.core.devices.espnow_client import ESPNowClient
from ledgrab.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from ledgrab.core.devices.espnow_client import ESPNowClient
from ledgrab.core.devices.serial_transport import (
list_serial_ports,
parse_serial_url,
port_exists,
)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
# Description fragments that commonly indicate an ESP32 gateway's USB bridge.
_ESP_DESC_FRAGMENTS = ("cp210", "ch340", "esp", "ftdi", "usb serial")
class ESPNowDeviceProvider(LEDDeviceProvider):
"""Provider for ESP-NOW LED devices via serial ESP32 gateway.
URL = serial port of the gateway ESP32 (e.g. COM3, /dev/ttyUSB0).
Each device represents one remote ESP32 peer identified by MAC address.
Multiple devices can share the same gateway (serial port).
URL = serial port of the gateway ESP32 (COM3, /dev/ttyUSB0, or
usb:VID:PID on Android). Each device represents one remote ESP32
peer identified by MAC address. Multiple devices can share the same
gateway (serial port).
"""
@property
@@ -34,7 +43,7 @@ class ESPNowDeviceProvider(LEDDeviceProvider):
return ESPNowClient(
url,
led_count=kwargs.get("led_count", 0),
baud_rate=kwargs.get("baud_rate", 921600),
baud_rate=kwargs.get("baud_rate"),
espnow_peer_mac=kwargs.get("espnow_peer_mac", "FF:FF:FF:FF:FF:FF"),
espnow_channel=kwargs.get("espnow_channel", 1),
)
@@ -44,43 +53,45 @@ class ESPNowDeviceProvider(LEDDeviceProvider):
async def validate_device(self, url: str) -> dict:
"""Validate serial port is accessible. LED count is manual."""
port, _baud = parse_serial_url(url)
try:
import serial as pyserial
s = pyserial.Serial(url, timeout=0.5)
s.close()
except ImportError:
raise ValueError("pyserial is required for ESP-NOW devices: pip install pyserial")
if not port_exists(port):
available = ", ".join(p.device for p in list_serial_ports()) or "none"
raise ValueError(f"Serial port {port} not found. Available ports: {available}")
except ValueError:
raise
except Exception as e:
raise ValueError(f"Cannot open serial port {url}: {e}")
raise ValueError(f"Failed to enumerate serial ports: {e}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover available serial ports that could be ESP32 gateways."""
try:
import serial.tools.list_ports
"""Discover serial ports that could be ESP32 gateways.
ports = serial.tools.list_ports.comports()
results = []
Filters the host's serial-port list by description fragments
typical of ESP32 USB bridges (CP210x, CH340, FTDI, generic
"USB Serial"). On Android, the USB bridge already reports
descriptive names, so the same filter applies.
"""
try:
ports = list_serial_ports()
results: List[DiscoveredDevice] = []
for port in ports:
# Look for ESP32 USB descriptors
desc = (port.description or "").lower()
vid = port.vid or 0
# Common ESP32 USB VIDs: Espressif (0x303A), Silicon Labs CP210x (0x10C4),
# FTDI (0x0403), WCH CH340 (0x1A86)
esp_vids = {0x303A, 0x10C4, 0x0403, 0x1A86}
if vid in esp_vids or "cp210" in desc or "ch340" in desc or "esp" in desc:
results.append(
DiscoveredDevice(
name=f"ESP-NOW Gateway ({port.description})",
url=port.device,
device_type="espnow",
ip="",
mac="",
led_count=None,
version=None,
)
if not any(frag in desc for frag in _ESP_DESC_FRAGMENTS):
continue
results.append(
DiscoveredDevice(
name=f"ESP-NOW Gateway ({port.description})",
url=port.device,
device_type="espnow",
ip="",
mac="",
led_count=None,
version=None,
)
)
logger.info("ESP-NOW gateway scan found %d candidate port(s)", len(results))
return results
except ImportError:
except Exception as e:
logger.error("ESP-NOW gateway discovery failed: %s", e)
return []