From 928d626620383255b102657059e062b7f754b90f Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Tue, 14 Apr 2026 19:15:08 +0300 Subject: [PATCH] refactor(devices): route ESP-NOW client through SerialTransport Drops the direct pyserial imports from espnow_client/espnow_provider in favor of open_transport/list_serial_ports/port_exists. The gateway protocol is write-only, so no read() extension was needed. ESP-NOW gateways are now reachable via usb:VID:PID URLs on Android. --- TODO.md | 23 +++--- .../src/ledgrab/core/devices/espnow_client.py | 49 +++++++----- .../ledgrab/core/devices/espnow_provider.py | 79 +++++++++++-------- 3 files changed, 85 insertions(+), 66 deletions(-) diff --git a/TODO.md b/TODO.md index 6db4d31..6aa358a 100644 --- a/TODO.md +++ b/TODO.md @@ -34,19 +34,18 @@ Build the Android APK automatically on push/tag. ## Android Root Capture (No Permission Dialog, No System Indicator) -MediaProjection shows a mandatory system overlay/indicator while capturing — unavoidable on stock Android. Many cheap Android TV boxes ship pre-rooted, so an alternative root-only path would give much better UX. +MediaProjection shows a mandatory system overlay/indicator while capturing — unavoidable on stock Android. Many cheap Android TV boxes ship pre-rooted, so an alternative root-only path gives much better UX. -- [ ] Detect root at runtime: check for `su` binary, `Superuser.apk`, etc. -- [ ] Implement `SurfaceControlCaptureEngine` (new capture engine) using hidden `SurfaceControl.screenshot()` API via reflection - - No permission dialog - - No system capture indicator - - Direct bitmap output (no encoder/decoder roundtrip) -- [ ] Engine priority: higher than MediaProjection when root detected -- [ ] Fallback chain: `SurfaceControl` (root) → `MediaProjection` (stock) → `adb screencap` (last resort) -- [ ] Handle Android version differences in `SurfaceControl` API surface (renamed/moved across API 29, 30, 33) -- [ ] Alternative: shell out to `screenrecord --output-format=h264 -` as root (same H.264 decode as scrcpy_client_engine, but local instead of remote ADB) +- [x] Root detection — `Root.kt` checks common `su` binary paths and, on demand, runs `su -c id` to actually prove UID 0. First call triggers Magisk's grant dialog; grant is cached per session. Exposed to Python via Chaquopy. +- [x] `RootScreenrecord.kt` — spawns `su -c screenrecord --output-format=h264 --size=WxH -`, feeds the H.264 stdout through a MediaCodec decoder whose output Surface is wired into an ImageReader (RGBA_8888, row-stride-aware). Decoded frames reach the Python pipeline via `PythonBridge.pushRootFrame`. +- [x] Python-side `RootScreenrecordEngine` (`core/capture_engines/root_screenrecord_engine.py`) mirrors `MediaProjectionEngine` with `ENGINE_PRIORITY=110` (> MediaProjection's 100) so the factory picks it automatically when available. +- [x] `MainActivity` tries `Root.requestGrant()` before launching the MediaProjection consent flow — on rooted devices the consent dialog is skipped entirely. `CaptureService` has a `createRootIntent()` entry point that bypasses the MediaProjection path. +- [x] Fallback: if `Root.requestGrant()` returns false (no root, user denied, or `su` timeout) the existing MediaProjection flow runs unchanged. +- [ ] Real-hardware test pending — need to verify on the user's Magisk'd TV box that: (1) grant dialog appears once, (2) frames actually flow through MediaCodec without the Android 14 capture indicator showing, (3) stop/start cycle terminates the `su` process cleanly. +- [WONTDO] `SurfaceControl.screenshot()` via reflection — renamed/moved across API 28/29/30/33, hidden-API blocklist varies by release, even rooted apps hit it; days of maintenance for a marginal latency win over the screenrecord path. Not worth it. +- [WONTDO] `adb screencap` fallback — full-PNG-per-frame pipeline is slower than MediaProjection, no value as a last resort. -Known projects using this approach for reference: scrcpy-hidden-api, shizuku, commercial scrcpy-derived apps. +Known projects using the screenrecord approach for reference: scrcpy (over ADB), scrcpy-hidden-api, shizuku. ## Android USB Serial Support @@ -60,7 +59,7 @@ Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV - [ ] Real-device test pending — no USB-serial hardware on dev machine. Need to verify on a TV box with CH340, CP2102, or FTDI adapter. - [ ] Document supported USB LED controllers in README (once real-device test passes). - [ ] Optional: auto-launch the app when a known USB-serial adapter is plugged in (intent-filter on `USB_DEVICE_ATTACHED` + `res/xml/device_filter.xml`). Skipped in v1 — users can just open LedGrab and hit "Discover". -- [ ] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) still imports `pyserial` directly and needs bidirectional reads — separate refactor to extend the transport with `read()` if ESP-NOW-via-USB on Android is needed. +- [x] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) now routes through `SerialTransport` — `open_transport()` for the gateway serial link, `list_serial_ports()` + `port_exists()` for discovery/validation. Works transparently with `usb:VID:PID` URLs on Android. (Gateway protocol is write-only, so no `read()` extension was needed after all.) ## Performance Metrics Abstraction diff --git a/server/src/ledgrab/core/devices/espnow_client.py b/server/src/ledgrab/core/devices/espnow_client.py index 6e22ee9..b812385 100644 --- a/server/src/ledgrab/core/devices/espnow_client.py +++ b/server/src/ledgrab/core/devices/espnow_client.py @@ -8,6 +8,11 @@ from typing import List, Optional, Tuple, Union import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.serial_transport import ( + open_transport, + parse_serial_url, + port_exists, +) from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -15,6 +20,7 @@ logger = get_logger(__name__) # Gateway serial protocol constants FRAME_START = 0xEE FRAME_END = 0xEF +DEFAULT_ESPNOW_BAUD = 921600 def _mac_str_to_bytes(mac: str) -> bytes: @@ -53,14 +59,14 @@ class ESPNowClient(LEDClient): self, url: str = "", led_count: int = 0, - baud_rate: int = 921600, + baud_rate: Optional[int] = None, espnow_peer_mac: str = "FF:FF:FF:FF:FF:FF", espnow_channel: int = 1, **kwargs, ): - self._port = url + self._port, url_baud = parse_serial_url(url) + self._baud_rate = baud_rate or url_baud or DEFAULT_ESPNOW_BAUD self._led_count = led_count - self._baud_rate = baud_rate self._peer_mac = _mac_str_to_bytes(espnow_peer_mac) self._channel = espnow_channel self._serial = None @@ -69,15 +75,12 @@ class ESPNowClient(LEDClient): async def connect(self) -> bool: try: - import serial as pyserial - except ImportError: - raise RuntimeError("pyserial is required for ESP-NOW devices: pip install pyserial") + self._serial = open_transport(self._port, baud_rate=self._baud_rate, timeout=1) + await asyncio.to_thread(self._serial.open) + except Exception as e: + logger.error("Failed to open ESP-NOW gateway %s: %s", self._port, e) + raise RuntimeError(f"Failed to open ESP-NOW gateway {self._port}: {e}") - loop = asyncio.get_event_loop() - self._serial = await loop.run_in_executor( - None, - lambda: pyserial.Serial(self._port, self._baud_rate, timeout=1), - ) self._connected = True logger.info( "ESP-NOW client connected: port=%s baud=%d peer=%s channel=%d leds=%d", @@ -90,9 +93,8 @@ class ESPNowClient(LEDClient): return True async def close(self) -> None: - if self._serial and self._serial.is_open: - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, self._serial.close) + if self._serial is not None and self._serial.is_open: + await asyncio.to_thread(self._serial.close) self._serial = None self._connected = False logger.info("ESP-NOW client closed: port=%s", self._port) @@ -150,14 +152,21 @@ class ESPNowClient(LEDClient): http_client, prev_health: Optional[DeviceHealth] = None, ) -> DeviceHealth: - """Check if the serial port is available.""" + """Check if the serial port is available without opening it.""" + port, _baud = parse_serial_url(url) try: - import serial as pyserial - - s = pyserial.Serial(url, timeout=0.1) - s.close() + if port_exists(port): + return DeviceHealth( + online=True, + latency_ms=0.0, + last_checked=datetime.now(timezone.utc), + device_name=prev_health.device_name if prev_health else None, + device_led_count=prev_health.device_led_count if prev_health else None, + ) return DeviceHealth( - online=True, latency_ms=0.0, last_checked=datetime.now(timezone.utc) + online=False, + last_checked=datetime.now(timezone.utc), + error=f"Serial port {port} not found", ) except Exception as e: return DeviceHealth( diff --git a/server/src/ledgrab/core/devices/espnow_provider.py b/server/src/ledgrab/core/devices/espnow_provider.py index 49446cd..cfe770f 100644 --- a/server/src/ledgrab/core/devices/espnow_provider.py +++ b/server/src/ledgrab/core/devices/espnow_provider.py @@ -2,24 +2,33 @@ from typing import List +from ledgrab.core.devices.espnow_client import ESPNowClient from ledgrab.core.devices.led_client import ( DeviceHealth, DiscoveredDevice, LEDClient, LEDDeviceProvider, ) -from ledgrab.core.devices.espnow_client import ESPNowClient +from ledgrab.core.devices.serial_transport import ( + list_serial_ports, + parse_serial_url, + port_exists, +) from ledgrab.utils import get_logger logger = get_logger(__name__) +# Description fragments that commonly indicate an ESP32 gateway's USB bridge. +_ESP_DESC_FRAGMENTS = ("cp210", "ch340", "esp", "ftdi", "usb serial") + class ESPNowDeviceProvider(LEDDeviceProvider): """Provider for ESP-NOW LED devices via serial ESP32 gateway. - URL = serial port of the gateway ESP32 (e.g. COM3, /dev/ttyUSB0). - Each device represents one remote ESP32 peer identified by MAC address. - Multiple devices can share the same gateway (serial port). + URL = serial port of the gateway ESP32 (COM3, /dev/ttyUSB0, or + usb:VID:PID on Android). Each device represents one remote ESP32 + peer identified by MAC address. Multiple devices can share the same + gateway (serial port). """ @property @@ -34,7 +43,7 @@ class ESPNowDeviceProvider(LEDDeviceProvider): return ESPNowClient( url, led_count=kwargs.get("led_count", 0), - baud_rate=kwargs.get("baud_rate", 921600), + baud_rate=kwargs.get("baud_rate"), espnow_peer_mac=kwargs.get("espnow_peer_mac", "FF:FF:FF:FF:FF:FF"), espnow_channel=kwargs.get("espnow_channel", 1), ) @@ -44,43 +53,45 @@ class ESPNowDeviceProvider(LEDDeviceProvider): async def validate_device(self, url: str) -> dict: """Validate serial port is accessible. LED count is manual.""" + port, _baud = parse_serial_url(url) try: - import serial as pyserial - - s = pyserial.Serial(url, timeout=0.5) - s.close() - except ImportError: - raise ValueError("pyserial is required for ESP-NOW devices: pip install pyserial") + if not port_exists(port): + available = ", ".join(p.device for p in list_serial_ports()) or "none" + raise ValueError(f"Serial port {port} not found. Available ports: {available}") + except ValueError: + raise except Exception as e: - raise ValueError(f"Cannot open serial port {url}: {e}") + raise ValueError(f"Failed to enumerate serial ports: {e}") return {} async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]: - """Discover available serial ports that could be ESP32 gateways.""" - try: - import serial.tools.list_ports + """Discover serial ports that could be ESP32 gateways. - ports = serial.tools.list_ports.comports() - results = [] + Filters the host's serial-port list by description fragments + typical of ESP32 USB bridges (CP210x, CH340, FTDI, generic + "USB Serial"). On Android, the USB bridge already reports + descriptive names, so the same filter applies. + """ + try: + ports = list_serial_ports() + results: List[DiscoveredDevice] = [] for port in ports: - # Look for ESP32 USB descriptors desc = (port.description or "").lower() - vid = port.vid or 0 - # Common ESP32 USB VIDs: Espressif (0x303A), Silicon Labs CP210x (0x10C4), - # FTDI (0x0403), WCH CH340 (0x1A86) - esp_vids = {0x303A, 0x10C4, 0x0403, 0x1A86} - if vid in esp_vids or "cp210" in desc or "ch340" in desc or "esp" in desc: - results.append( - DiscoveredDevice( - name=f"ESP-NOW Gateway ({port.description})", - url=port.device, - device_type="espnow", - ip="", - mac="", - led_count=None, - version=None, - ) + if not any(frag in desc for frag in _ESP_DESC_FRAGMENTS): + continue + results.append( + DiscoveredDevice( + name=f"ESP-NOW Gateway ({port.description})", + url=port.device, + device_type="espnow", + ip="", + mac="", + led_count=None, + version=None, ) + ) + logger.info("ESP-NOW gateway scan found %d candidate port(s)", len(results)) return results - except ImportError: + except Exception as e: + logger.error("ESP-NOW gateway discovery failed: %s", e) return []