refactor(devices): route ESP-NOW client through SerialTransport
Build Android APK / build-android (push) Failing after 1m39s
Lint & Test / test (push) Successful in 4m54s

Drops the direct pyserial imports from espnow_client/espnow_provider
in favor of open_transport/list_serial_ports/port_exists. The gateway
protocol is write-only, so no read() extension was needed. ESP-NOW
gateways are now reachable via usb:VID:PID URLs on Android.
This commit is contained in:
2026-04-14 19:15:08 +03:00
parent 580bd692e6
commit 928d626620
3 changed files with 85 additions and 66 deletions
@@ -8,6 +8,11 @@ from typing import List, Optional, Tuple, Union
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
from ledgrab.core.devices.serial_transport import (
open_transport,
parse_serial_url,
port_exists,
)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -15,6 +20,7 @@ logger = get_logger(__name__)
# Gateway serial protocol constants
FRAME_START = 0xEE
FRAME_END = 0xEF
DEFAULT_ESPNOW_BAUD = 921600
def _mac_str_to_bytes(mac: str) -> bytes:
@@ -53,14 +59,14 @@ class ESPNowClient(LEDClient):
self,
url: str = "",
led_count: int = 0,
baud_rate: int = 921600,
baud_rate: Optional[int] = None,
espnow_peer_mac: str = "FF:FF:FF:FF:FF:FF",
espnow_channel: int = 1,
**kwargs,
):
self._port = url
self._port, url_baud = parse_serial_url(url)
self._baud_rate = baud_rate or url_baud or DEFAULT_ESPNOW_BAUD
self._led_count = led_count
self._baud_rate = baud_rate
self._peer_mac = _mac_str_to_bytes(espnow_peer_mac)
self._channel = espnow_channel
self._serial = None
@@ -69,15 +75,12 @@ class ESPNowClient(LEDClient):
async def connect(self) -> bool:
try:
import serial as pyserial
except ImportError:
raise RuntimeError("pyserial is required for ESP-NOW devices: pip install pyserial")
self._serial = open_transport(self._port, baud_rate=self._baud_rate, timeout=1)
await asyncio.to_thread(self._serial.open)
except Exception as e:
logger.error("Failed to open ESP-NOW gateway %s: %s", self._port, e)
raise RuntimeError(f"Failed to open ESP-NOW gateway {self._port}: {e}")
loop = asyncio.get_event_loop()
self._serial = await loop.run_in_executor(
None,
lambda: pyserial.Serial(self._port, self._baud_rate, timeout=1),
)
self._connected = True
logger.info(
"ESP-NOW client connected: port=%s baud=%d peer=%s channel=%d leds=%d",
@@ -90,9 +93,8 @@ class ESPNowClient(LEDClient):
return True
async def close(self) -> None:
if self._serial and self._serial.is_open:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._serial.close)
if self._serial is not None and self._serial.is_open:
await asyncio.to_thread(self._serial.close)
self._serial = None
self._connected = False
logger.info("ESP-NOW client closed: port=%s", self._port)
@@ -150,14 +152,21 @@ class ESPNowClient(LEDClient):
http_client,
prev_health: Optional[DeviceHealth] = None,
) -> DeviceHealth:
"""Check if the serial port is available."""
"""Check if the serial port is available without opening it."""
port, _baud = parse_serial_url(url)
try:
import serial as pyserial
s = pyserial.Serial(url, timeout=0.1)
s.close()
if port_exists(port):
return DeviceHealth(
online=True,
latency_ms=0.0,
last_checked=datetime.now(timezone.utc),
device_name=prev_health.device_name if prev_health else None,
device_led_count=prev_health.device_led_count if prev_health else None,
)
return DeviceHealth(
online=True, latency_ms=0.0, last_checked=datetime.now(timezone.utc)
online=False,
last_checked=datetime.now(timezone.utc),
error=f"Serial port {port} not found",
)
except Exception as e:
return DeviceHealth(
@@ -2,24 +2,33 @@
from typing import List
from ledgrab.core.devices.espnow_client import ESPNowClient
from ledgrab.core.devices.led_client import (
DeviceHealth,
DiscoveredDevice,
LEDClient,
LEDDeviceProvider,
)
from ledgrab.core.devices.espnow_client import ESPNowClient
from ledgrab.core.devices.serial_transport import (
list_serial_ports,
parse_serial_url,
port_exists,
)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
# Description fragments that commonly indicate an ESP32 gateway's USB bridge.
_ESP_DESC_FRAGMENTS = ("cp210", "ch340", "esp", "ftdi", "usb serial")
class ESPNowDeviceProvider(LEDDeviceProvider):
"""Provider for ESP-NOW LED devices via serial ESP32 gateway.
URL = serial port of the gateway ESP32 (e.g. COM3, /dev/ttyUSB0).
Each device represents one remote ESP32 peer identified by MAC address.
Multiple devices can share the same gateway (serial port).
URL = serial port of the gateway ESP32 (COM3, /dev/ttyUSB0, or
usb:VID:PID on Android). Each device represents one remote ESP32
peer identified by MAC address. Multiple devices can share the same
gateway (serial port).
"""
@property
@@ -34,7 +43,7 @@ class ESPNowDeviceProvider(LEDDeviceProvider):
return ESPNowClient(
url,
led_count=kwargs.get("led_count", 0),
baud_rate=kwargs.get("baud_rate", 921600),
baud_rate=kwargs.get("baud_rate"),
espnow_peer_mac=kwargs.get("espnow_peer_mac", "FF:FF:FF:FF:FF:FF"),
espnow_channel=kwargs.get("espnow_channel", 1),
)
@@ -44,43 +53,45 @@ class ESPNowDeviceProvider(LEDDeviceProvider):
async def validate_device(self, url: str) -> dict:
"""Validate serial port is accessible. LED count is manual."""
port, _baud = parse_serial_url(url)
try:
import serial as pyserial
s = pyserial.Serial(url, timeout=0.5)
s.close()
except ImportError:
raise ValueError("pyserial is required for ESP-NOW devices: pip install pyserial")
if not port_exists(port):
available = ", ".join(p.device for p in list_serial_ports()) or "none"
raise ValueError(f"Serial port {port} not found. Available ports: {available}")
except ValueError:
raise
except Exception as e:
raise ValueError(f"Cannot open serial port {url}: {e}")
raise ValueError(f"Failed to enumerate serial ports: {e}")
return {}
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover available serial ports that could be ESP32 gateways."""
try:
import serial.tools.list_ports
"""Discover serial ports that could be ESP32 gateways.
ports = serial.tools.list_ports.comports()
results = []
Filters the host's serial-port list by description fragments
typical of ESP32 USB bridges (CP210x, CH340, FTDI, generic
"USB Serial"). On Android, the USB bridge already reports
descriptive names, so the same filter applies.
"""
try:
ports = list_serial_ports()
results: List[DiscoveredDevice] = []
for port in ports:
# Look for ESP32 USB descriptors
desc = (port.description or "").lower()
vid = port.vid or 0
# Common ESP32 USB VIDs: Espressif (0x303A), Silicon Labs CP210x (0x10C4),
# FTDI (0x0403), WCH CH340 (0x1A86)
esp_vids = {0x303A, 0x10C4, 0x0403, 0x1A86}
if vid in esp_vids or "cp210" in desc or "ch340" in desc or "esp" in desc:
results.append(
DiscoveredDevice(
name=f"ESP-NOW Gateway ({port.description})",
url=port.device,
device_type="espnow",
ip="",
mac="",
led_count=None,
version=None,
)
if not any(frag in desc for frag in _ESP_DESC_FRAGMENTS):
continue
results.append(
DiscoveredDevice(
name=f"ESP-NOW Gateway ({port.description})",
url=port.device,
device_type="espnow",
ip="",
mac="",
led_count=None,
version=None,
)
)
logger.info("ESP-NOW gateway scan found %d candidate port(s)", len(results))
return results
except ImportError:
except Exception as e:
logger.error("ESP-NOW gateway discovery failed: %s", e)
return []