From 7fcb8dd346488b1f4448afe5e725a0a90588c3b6 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Tue, 14 Apr 2026 16:34:09 +0300 Subject: [PATCH] feat(devices): Android USB-serial support for Adalight/AmbiLED controllers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds end-to-end support for driving USB-connected Adalight / AmbiLED LED controllers from Android TV boxes. Android's security model blocks direct USB access from Python, so writes route through a Kotlin UsbSerialBridge singleton via Chaquopy. Python side: - New SerialTransport Protocol (serial_transport.py) with open / write / flush / close. Desktop uses PySerialTransport (wraps pyserial), Android uses AndroidSerialTransport (wraps the Kotlin bridge). - list_serial_ports() factory returns desktop COM ports on desktop, USB devices on Android — callers don't branch. - URL scheme extended: existing COM3[:baud] and /dev/ttyUSB0[:baud] unchanged; new usb:VID:PID[:serial][@baud] for Android (@ is the baud separator since : is already used between VID and PID). - AdalightClient and SerialDeviceProvider refactored to go through the transport — no more direct pyserial imports in hot paths. - 17 new unit tests cover URL parsing, PySerial transport, factory selection, platform-branching discovery. Full suite 750 passing. Kotlin side: - UsbSerialBridge.kt singleton uses com.hoho.android.usbserial (mik3y) which ships drivers for CH340, CP2102, FTDI, Prolific, and CDC-ACM (Arduino). Exposes listDevices, open, write, close via @JvmStatic for Chaquopy. First open() attempt without permission triggers the system USB permission dialog; next call succeeds once user grants. - usb-serial-for-android is distributed via JitPack — added that repo in settings.gradle.kts and the dependency in app/build.gradle.kts. - AndroidManifest declares uses-feature android.hardware.usb.host (required=false so non-USB-host phones still install). - LedGrabApp.onCreate calls UsbSerialBridge.init(this) so the bridge resolves the UsbManager without needing an Activity ref. Verified: ./gradlew compileDebugKotlin succeeds; off-Android import of android_serial_transport works. Real-hardware smoke test on a TV box with a CH340/CP2102/FTDI adapter still pending. ESP-NOW (espnow_client / espnow_provider) still imports pyserial directly because it needs bidirectional reads — separate refactor to extend the transport with read() if that path ever needs Android USB support. --- TODO.md | 22 +- android/app/build.gradle.kts | 3 + android/app/src/main/AndroidManifest.xml | 6 + .../java/com/ledgrab/android/LedGrabApp.kt | 4 + .../com/ledgrab/android/UsbSerialBridge.kt | 203 ++++++++++++++++++ android/settings.gradle.kts | 2 + .../ledgrab/core/devices/adalight_client.py | 68 ++---- .../core/devices/android_serial_transport.py | 159 ++++++++++++++ .../ledgrab/core/devices/serial_provider.py | 43 ++-- .../ledgrab/core/devices/serial_transport.py | 174 +++++++++++++++ server/tests/test_serial_transport.py | 153 +++++++++++++ 11 files changed, 753 insertions(+), 84 deletions(-) create mode 100644 android/app/src/main/java/com/ledgrab/android/UsbSerialBridge.kt create mode 100644 server/src/ledgrab/core/devices/android_serial_transport.py create mode 100644 server/src/ledgrab/core/devices/serial_transport.py create mode 100644 server/tests/test_serial_transport.py diff --git a/TODO.md b/TODO.md index 4584cf8..6db4d31 100644 --- a/TODO.md +++ b/TODO.md @@ -52,19 +52,15 @@ Known projects using this approach for reference: scrcpy-hidden-api, shizuku, co Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV box via USB-to-serial adapters. -- [ ] Add [usb-serial-for-android](https://github.com/mik3y/usb-serial-for-android) dependency to `android/app/build.gradle.kts` -- [ ] Create Kotlin `UsbSerialBridge` class that: - - Enumerates USB serial devices via Android USB Host API - - Requests user permission for USB device access - - Opens a serial connection (baud rate configurable) - - Exposes a write method callable from Python via Chaquopy -- [ ] Create Python `AndroidSerialProvider` in `server/src/ledgrab/core/devices/` that: - - Replaces `pyserial` on Android (which can't access USB ports) - - Calls `UsbSerialBridge` via Chaquopy to send LED data - - Registers as an alternative serial transport when `is_android()` is True -- [ ] Add USB device permission dialog to `MainActivity` (auto-triggered on device connect) -- [ ] Test with common USB-to-serial chips: CH340, CP2102, FTDI -- [ ] Document supported USB LED controllers in README +- [x] Added `com.github.mik3y:usb-serial-for-android:3.8.1` (via JitPack) to `android/app/build.gradle.kts`. +- [x] Kotlin `UsbSerialBridge` singleton (`android/app/src/main/java/com/ledgrab/android/UsbSerialBridge.kt`) — exposes `listDevices()`, `open(vid, pid, serial, baud)`, `write(handle, ByteArray)`, `close(handle)`. Permission request fires automatically from `open()` when the user hasn't granted access yet. Handles are opaque integers, port map is synchronized, so Python threads can share one bridge. +- [x] Python `AndroidSerialTransport` in `server/src/ledgrab/core/devices/android_serial_transport.py` drives the bridge through Chaquopy. `SerialTransport` Protocol + `PySerialTransport` + `list_serial_ports()` factory live in `serial_transport.py`; `AdalightClient` and `SerialDeviceProvider` now go through the abstraction instead of importing `pyserial` directly. +- [x] URL scheme extended: `usb:VID:PID[:serial][@baud]` on Android alongside the existing `COM3[:baud]` / `/dev/ttyUSB0[:baud]` desktop paths. +- [x] App initializes the bridge on startup (`LedGrabApp.onCreate` → `UsbSerialBridge.init(this)`); manifest declares `uses-feature android.hardware.usb.host`. +- [ ] Real-device test pending — no USB-serial hardware on dev machine. Need to verify on a TV box with CH340, CP2102, or FTDI adapter. +- [ ] Document supported USB LED controllers in README (once real-device test passes). +- [ ] Optional: auto-launch the app when a known USB-serial adapter is plugged in (intent-filter on `USB_DEVICE_ATTACHED` + `res/xml/device_filter.xml`). Skipped in v1 — users can just open LedGrab and hit "Discover". +- [ ] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) still imports `pyserial` directly and needs bidirectional reads — separate refactor to extend the transport with `read()` if ESP-NOW-via-USB on Android is needed. ## Performance Metrics Abstraction diff --git a/android/app/build.gradle.kts b/android/app/build.gradle.kts index b2be883..cfec31a 100644 --- a/android/app/build.gradle.kts +++ b/android/app/build.gradle.kts @@ -116,4 +116,7 @@ dependencies { implementation("androidx.lifecycle:lifecycle-service:2.8.7") // QR code generation for displaying server URL on TV implementation("com.google.zxing:core:3.5.3") + // USB-serial drivers (CH340, CP2102, FTDI, Prolific, CDC-ACM) for + // driving Adalight/AmbiLED controllers plugged into Android TV boxes. + implementation("com.github.mik3y:usb-serial-for-android:3.8.1") } diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml index 4e81310..485b4bd 100644 --- a/android/app/src/main/AndroidManifest.xml +++ b/android/app/src/main/AndroidManifest.xml @@ -21,6 +21,12 @@ android:name="android.hardware.touchscreen" android:required="false" /> + + + () + + /** Called once from [LedGrabApp.onCreate] so we can resolve services. */ + @JvmStatic + fun init(context: Context) { + val app = context.applicationContext + appContext = app + + val filter = IntentFilter(ACTION_USB_PERMISSION) + val receiver = object : BroadcastReceiver() { + override fun onReceive(ctx: Context, intent: Intent) { + // We just log; the next open() call checks hasPermission() again. + val granted = intent.getBooleanExtra( + UsbManager.EXTRA_PERMISSION_GRANTED, + false, + ) + Log.i(TAG, "USB permission broadcast: granted=$granted") + } + } + // Android 14 requires RECEIVER_NOT_EXPORTED for non-system broadcasts. + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) { + app.registerReceiver(receiver, filter, Context.RECEIVER_NOT_EXPORTED) + } else { + @Suppress("UnspecifiedRegisterReceiverFlag") + app.registerReceiver(receiver, filter) + } + } + + private fun ctx(): Context = + appContext ?: error("UsbSerialBridge.init() not called — app context unavailable") + + private fun safeSerial(driver: UsbSerialDriver): String = + try { + driver.device.serialNumber ?: "" + } catch (_: SecurityException) { + // Reading the serial requires USB permission on API 29+. + "" + } + + /** + * Enumerate attached USB-serial devices. + * + * Each entry is `"VID|PID|serial|description"` with VID/PID as + * 4-char lowercase hex. Pipe is used as the separator so device + * descriptions containing colons (common on FTDI strings) don't + * confuse the Python parser. + */ + @JvmStatic + fun listDevices(): List { + val manager = ctx().getSystemService(Context.USB_SERVICE) as UsbManager + val drivers = UsbSerialProber.getDefaultProber().findAllDrivers(manager) + return drivers.map { driver -> + val dev = driver.device + val vid = "%04x".format(dev.vendorId) + val pid = "%04x".format(dev.productId) + val serial = safeSerial(driver) + val description = buildString { + append(dev.manufacturerName ?: "USB") + val product = dev.productName + if (!product.isNullOrBlank()) { + append(' ') + append(product) + } + }.trim().ifEmpty { "USB $vid:$pid" } + "$vid|$pid|$serial|$description" + } + } + + /** + * Open the first matching USB-serial device. Returns a non-negative + * opaque handle on success, -1 on failure (device not found, user + * denied permission, or driver error). Failures also trigger an + * async permission-request dialog when applicable — subsequent + * open() calls will succeed once the user grants. + */ + @JvmStatic + fun open(vendorId: Int, productId: Int, serial: String, baud: Int): Int { + val context = ctx() + val manager = context.getSystemService(Context.USB_SERVICE) as UsbManager + val drivers = UsbSerialProber.getDefaultProber().findAllDrivers(manager) + val driver = drivers.firstOrNull { d -> + val dev = d.device + dev.vendorId == vendorId && + dev.productId == productId && + (serial.isEmpty() || safeSerial(d) == serial) + } + if (driver == null) { + Log.w(TAG, "No matching device for $vendorId:$productId:$serial") + return -1 + } + + if (!manager.hasPermission(driver.device)) { + Log.w(TAG, "USB permission not yet granted for ${driver.device.deviceName}") + requestPermission(context, manager, driver) + return -1 + } + + val connection = manager.openDevice(driver.device) + if (connection == null) { + Log.w(TAG, "openDevice returned null for ${driver.device.deviceName}") + return -1 + } + val port = driver.ports.firstOrNull() + if (port == null) { + connection.close() + Log.w(TAG, "Driver reports no ports for ${driver.device.deviceName}") + return -1 + } + + try { + port.open(connection) + port.setParameters( + baud, + 8, + UsbSerialPort.STOPBITS_1, + UsbSerialPort.PARITY_NONE, + ) + } catch (e: Exception) { + Log.e(TAG, "Failed to configure serial port", e) + runCatching { port.close() } + return -1 + } + + val handle = handleSeq.getAndIncrement() + synchronized(openPorts) { openPorts[handle] = port } + Log.i( + TAG, + "Opened USB serial ${driver.device.deviceName} baud=$baud handle=$handle", + ) + return handle + } + + /** Write bytes to the previously-opened handle. Throws if invalid. */ + @JvmStatic + fun write(handle: Int, data: ByteArray) { + val port = synchronized(openPorts) { openPorts[handle] } + ?: throw IllegalStateException("Invalid handle $handle") + // 1s write timeout matches the old pyserial `timeout=1` behavior. + port.write(data, 1_000) + } + + /** Close a previously-opened handle. Silently ignores unknown handles. */ + @JvmStatic + fun close(handle: Int) { + val port = synchronized(openPorts) { openPorts.remove(handle) } ?: return + runCatching { port.close() } + .onFailure { Log.w(TAG, "close($handle): ${it.message}") } + } + + private fun requestPermission( + context: Context, + manager: UsbManager, + driver: UsbSerialDriver, + ) { + val flags = + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { + PendingIntent.FLAG_IMMUTABLE + } else { + 0 + } + val intent = Intent(ACTION_USB_PERMISSION).apply { + setPackage(context.packageName) + } + val pending = PendingIntent.getBroadcast(context, 0, intent, flags) + manager.requestPermission(driver.device, pending) + } +} diff --git a/android/settings.gradle.kts b/android/settings.gradle.kts index d22e677..d02007d 100644 --- a/android/settings.gradle.kts +++ b/android/settings.gradle.kts @@ -11,6 +11,8 @@ dependencyResolutionManagement { repositories { google() mavenCentral() + // usb-serial-for-android (mik3y) is distributed via JitPack + maven { url = uri("https://jitpack.io") } } } diff --git a/server/src/ledgrab/core/devices/adalight_client.py b/server/src/ledgrab/core/devices/adalight_client.py index 7141478..cca63bc 100644 --- a/server/src/ledgrab/core/devices/adalight_client.py +++ b/server/src/ledgrab/core/devices/adalight_client.py @@ -7,40 +7,21 @@ from typing import Optional, Tuple import numpy as np from ledgrab.core.devices.led_client import DeviceHealth, LEDClient +from ledgrab.core.devices.serial_transport import ( + open_transport, + parse_serial_url, + port_exists, +) from ledgrab.utils import get_logger logger = get_logger(__name__) -DEFAULT_BAUD_RATE = 115200 ARDUINO_RESET_DELAY = 2.0 # seconds to wait after opening serial for Arduino bootloader def parse_adalight_url(url: str) -> Tuple[str, int]: - """Parse an Adalight URL into (port, baud_rate). - - Formats: - "COM3" -> ("COM3", 115200) - "COM3:230400" -> ("COM3", 230400) - "/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200) - """ - url = url.strip() - if ":" in url and not url.startswith("/"): - # Windows COM port with baud: "COM3:230400" - parts = url.rsplit(":", 1) - try: - baud = int(parts[1]) - return parts[0], baud - except ValueError: - pass - elif ":" in url and url.startswith("/"): - # Unix path with baud: "/dev/ttyUSB0:230400" - parts = url.rsplit(":", 1) - try: - baud = int(parts[1]) - return parts[0], baud - except ValueError: - pass - return url, DEFAULT_BAUD_RATE + """Backwards-compatible alias for :func:`parse_serial_url`.""" + return parse_serial_url(url) def _build_adalight_header(led_count: int) -> bytes: @@ -81,13 +62,12 @@ class AdalightClient(LEDClient): async def connect(self) -> bool: """Open serial port and wait for Arduino reset.""" - import serial - try: - self._serial = await asyncio.to_thread( - serial.Serial, port=self._port, baudrate=self._baud_rate, timeout=1 - ) - # Wait for Arduino to finish bootloader reset (non-blocking) + self._serial = open_transport(self._port, baud_rate=self._baud_rate, timeout=1) + await asyncio.to_thread(self._serial.open) + # Wait for Arduino to finish bootloader reset (non-blocking). + # USB-to-TTL adapters without DTR don't reset, but the delay + # is harmless on those — keeps the path uniform. await asyncio.sleep(ARDUINO_RESET_DELAY) self._connected = True logger.info( @@ -122,7 +102,7 @@ class AdalightClient(LEDClient): f"led_count={self._led_count}" ) self._connected = False - if self._serial and self._serial.is_open: + if self._serial is not None: try: self._serial.close() except Exception as e: @@ -182,18 +162,13 @@ class AdalightClient(LEDClient): ) -> DeviceHealth: """Check if the serial port exists without opening it. - Enumerates COM ports to avoid exclusive-access conflicts on Windows. + Enumerates COM ports (or USB devices on Android) to avoid + exclusive-access conflicts on Windows. """ port, _baud = parse_adalight_url(url) try: - import serial.tools.list_ports - - available_ports = [p.device for p in serial.tools.list_ports.comports()] - port_upper = port.upper() - found = any(p.upper() == port_upper for p in available_ports) - - if found: + if port_exists(port): return DeviceHealth( online=True, latency_ms=0.0, @@ -202,12 +177,11 @@ class AdalightClient(LEDClient): device_version=None, device_led_count=prev_health.device_led_count if prev_health else None, ) - else: - return DeviceHealth( - online=False, - last_checked=datetime.now(timezone.utc), - error=f"Serial port {port} not found", - ) + return DeviceHealth( + online=False, + last_checked=datetime.now(timezone.utc), + error=f"Serial port {port} not found", + ) except Exception as e: return DeviceHealth( online=False, diff --git a/server/src/ledgrab/core/devices/android_serial_transport.py b/server/src/ledgrab/core/devices/android_serial_transport.py new file mode 100644 index 0000000..d0f6775 --- /dev/null +++ b/server/src/ledgrab/core/devices/android_serial_transport.py @@ -0,0 +1,159 @@ +"""Android USB-serial transport backed by the Kotlin ``UsbSerialBridge``. + +Calls into Java land through Chaquopy; this module only loads on +Android. URL format: ``usb:VID:PID`` or ``usb:VID:PID:serial`` (with an +optional ``@baud`` suffix). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Optional, Tuple + +from ledgrab.utils import get_logger +from ledgrab.utils.platform import is_android + +from .serial_transport import SerialPortInfo + +logger = get_logger(__name__) + + +def _bridge(): + """Return the singleton Kotlin ``UsbSerialBridge`` instance, or raise. + + The bridge exposes static methods ``listDevices()``, ``open(...)``, + ``write(...)``, ``close(...)``, etc. — see ``UsbSerialBridge.kt``. + """ + if not is_android(): + raise RuntimeError("AndroidSerialTransport is only usable on Android") + try: + from java import jclass # type: ignore[import-not-found] + except ImportError as e: + raise RuntimeError("Chaquopy java interop not available") from e + + return jclass("com.ledgrab.android.UsbSerialBridge").INSTANCE + + +@dataclass(frozen=True) +class _UsbAddress: + vendor_id: int + product_id: int + serial: Optional[str] + + @classmethod + def parse(cls, device: str) -> "_UsbAddress": + if not device.startswith("usb:"): + raise ValueError(f"Not a USB device URL: {device!r}") + body = device[len("usb:") :] + parts = body.split(":") + if len(parts) < 2: + raise ValueError( + f"USB URL must be 'usb:VID:PID' or 'usb:VID:PID:serial' (got {device!r})" + ) + try: + vid = int(parts[0], 16) + pid = int(parts[1], 16) + except ValueError as e: + raise ValueError(f"VID/PID must be hex: {device!r}") from e + serial = parts[2] if len(parts) >= 3 and parts[2] else None + return cls(vid, pid, serial) + + +def _format_url(vid: int, pid: int, serial: Optional[str]) -> str: + base = f"usb:{vid:04x}:{pid:04x}" + return f"{base}:{serial}" if serial else base + + +def list_android_usb_devices() -> List[SerialPortInfo]: + """Return USB-serial devices currently attached to the Android host.""" + try: + bridge = _bridge() + except Exception as e: + logger.debug("UsbSerialBridge unavailable: %s", e) + return [] + + out: List[SerialPortInfo] = [] + try: + # listDevices() returns a Java List of "vid:pid:serial:description" + # tuples — split here to keep the Kotlin DTO trivial. + for entry in bridge.listDevices(): + entry_s = str(entry) + parts = entry_s.split("|", 3) + if len(parts) < 4: + continue + vid_str, pid_str, serial, description = parts + try: + vid = int(vid_str, 16) + pid = int(pid_str, 16) + except ValueError: + continue + url = _format_url(vid, pid, serial or None) + out.append(SerialPortInfo(device=url, description=description or url)) + except Exception as e: + logger.warning("UsbSerialBridge.listDevices failed: %s", e) + return out + + +class AndroidSerialTransport: + """Serial transport that pipes writes through the Kotlin USB bridge.""" + + def __init__(self, device: str, baud_rate: int) -> None: + self._url = device + self._addr = _UsbAddress.parse(device) + self._baud_rate = baud_rate + self._handle: Optional[int] = None # opaque token from the bridge + + @property + def is_open(self) -> bool: + return self._handle is not None + + def open(self) -> None: + bridge = _bridge() + # open() returns a non-negative int handle on success, or -1 if the + # device wasn't found / permission was denied. + handle = int( + bridge.open( + self._addr.vendor_id, + self._addr.product_id, + self._addr.serial or "", + self._baud_rate, + ) + ) + if handle < 0: + raise RuntimeError( + f"Failed to open USB serial device {self._url} " + f"(handle={handle}; user may have denied permission)" + ) + self._handle = handle + logger.info("Android USB serial opened: %s @ %d baud", self._url, self._baud_rate) + + def write(self, data: bytes) -> None: + if self._handle is None: + raise RuntimeError(f"USB serial {self._url} is not open") + bridge = _bridge() + # Chaquopy auto-marshals bytes → Java byte[]. + bridge.write(self._handle, data) + + def flush(self) -> None: + # USB endpoints are unbuffered from this side — nothing to flush. + return + + def close(self) -> None: + if self._handle is None: + return + try: + _bridge().close(self._handle) + except Exception as e: + logger.warning("Error closing USB serial %s: %s", self._url, e) + finally: + self._handle = None + + +__all__ = [ + "AndroidSerialTransport", + "list_android_usb_devices", +] + + +def _used() -> Tuple[type, ...]: # pragma: no cover — re-export marker + return (AndroidSerialTransport,) diff --git a/server/src/ledgrab/core/devices/serial_provider.py b/server/src/ledgrab/core/devices/serial_provider.py index 72efdab..60a660f 100644 --- a/server/src/ledgrab/core/devices/serial_provider.py +++ b/server/src/ledgrab/core/devices/serial_provider.py @@ -14,6 +14,10 @@ from ledgrab.core.devices.led_client import ( DiscoveredDevice, LEDDeviceProvider, ) +from ledgrab.core.devices.serial_transport import ( + list_serial_ports, + port_exists, +) from ledgrab.utils import get_logger logger = get_logger(__name__) @@ -47,15 +51,9 @@ class SerialDeviceProvider(LEDDeviceProvider): port, _baud = parse_adalight_url(url) try: - import serial.tools.list_ports - - available_ports = [p.device for p in serial.tools.list_ports.comports()] - port_upper = port.upper() - if not any(p.upper() == port_upper for p in available_ports): - raise ValueError( - f"Serial port {port} not found. " - f"Available ports: {', '.join(available_ports) or 'none'}" - ) + if not port_exists(port): + available = ", ".join(p.device for p in list_serial_ports()) or "none" + raise ValueError(f"Serial port {port} not found. Available ports: {available}") except ValueError: raise except Exception as e: @@ -67,22 +65,19 @@ class SerialDeviceProvider(LEDDeviceProvider): async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]: """Discover serial ports that could be LED devices.""" try: - import serial.tools.list_ports - - ports = serial.tools.list_ports.comports() - results = [] - for port_info in ports: - results.append( - DiscoveredDevice( - name=port_info.description or port_info.device, - url=port_info.device, - device_type=self.device_type, - ip=port_info.device, - mac="", - led_count=None, - version=None, - ) + ports = list_serial_ports() + results = [ + DiscoveredDevice( + name=port_info.description, + url=port_info.device, + device_type=self.device_type, + ip=port_info.device, + mac="", + led_count=None, + version=None, ) + for port_info in ports + ] logger.info(f"{self.device_type} serial port scan found {len(results)} port(s)") return results except Exception as e: diff --git a/server/src/ledgrab/core/devices/serial_transport.py b/server/src/ledgrab/core/devices/serial_transport.py new file mode 100644 index 0000000..e247225 --- /dev/null +++ b/server/src/ledgrab/core/devices/serial_transport.py @@ -0,0 +1,174 @@ +"""Serial transport abstraction for LED clients. + +Wraps the platform-specific way to open a serial line so callers +(AdalightClient, etc.) don't import ``pyserial`` directly. The primary +motivation is Android: under Chaquopy, ``pyserial`` exists but cannot +touch USB ports, so we route writes through a Kotlin +``UsbSerialBridge`` instead. + +URL format +---------- +- ``"COM3"`` or ``"COM3:115200"`` — desktop COM port (Windows) +- ``"/dev/ttyUSB0"`` or ``"/dev/ttyUSB0:115200"`` — desktop tty +- ``"usb:VID:PID"`` or ``"usb:VID:PID:serial@115200"`` — Android USB device + +Selection happens in :func:`open_transport`: anything starting with +``usb:`` goes through :class:`AndroidSerialTransport`, everything else +through :class:`PySerialTransport`. :func:`list_serial_ports` returns +desktop COM ports on desktop, USB devices on Android. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Optional, Protocol + +from ledgrab.utils import get_logger +from ledgrab.utils.platform import is_android + +logger = get_logger(__name__) + +DEFAULT_BAUD_RATE = 115200 + + +@dataclass(frozen=True) +class SerialPortInfo: + """A discovered serial port, regardless of source platform.""" + + device: str # URL-like identifier passed back to open_transport() + description: str # Human-friendly name for the UI + + +class SerialTransport(Protocol): + """Minimal serial line — open, write bytes, close. No reads (LED out only).""" + + @property + def is_open(self) -> bool: ... + def open(self) -> None: ... + def write(self, data: bytes) -> None: ... + def flush(self) -> None: ... + def close(self) -> None: ... + + +def parse_serial_url(url: str) -> tuple[str, int]: + """Parse a serial URL into (device, baud_rate). + + Recognized formats:: + + "COM3" -> ("COM3", 115200) + "COM3:230400" -> ("COM3", 230400) + "/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200) + "/dev/ttyUSB0:230400"-> ("/dev/ttyUSB0", 230400) + "usb:1a86:7523" -> ("usb:1a86:7523", 115200) + "usb:1a86:7523@250k" -> raises (only ints accepted for baud) + "usb:1a86:7523@230400" -> ("usb:1a86:7523", 230400) + + USB URLs use ``@`` to separate baud because the colon is already + the field separator between vendor and product IDs. + """ + raw = url.strip() + if raw.startswith("usb:"): + if "@" in raw: + device, _, baud_str = raw.rpartition("@") + try: + return device, int(baud_str) + except ValueError: + pass + return raw, DEFAULT_BAUD_RATE + + # Desktop COM/tty paths use a trailing :BAUD suffix. + if ":" in raw: + head, _, tail = raw.rpartition(":") + try: + return head, int(tail) + except ValueError: + pass + return raw, DEFAULT_BAUD_RATE + + +def list_serial_ports() -> List[SerialPortInfo]: + """Enumerate serial ports available on this host (or USB devices on Android).""" + if is_android(): + try: + from ledgrab.core.devices.android_serial_transport import ( + list_android_usb_devices, + ) + + return list_android_usb_devices() + except Exception as e: + logger.warning("Android USB enumeration failed: %s", e) + return [] + + try: + import serial.tools.list_ports + except ImportError: + logger.warning("pyserial not available — serial enumeration disabled") + return [] + + return [ + SerialPortInfo(device=p.device, description=p.description or p.device) + for p in serial.tools.list_ports.comports() + ] + + +def port_exists(device: str) -> bool: + """Return True if a port with this device id is currently visible.""" + target = device.upper() + return any(p.device.upper() == target for p in list_serial_ports()) + + +def open_transport( + url: str, + baud_rate: Optional[int] = None, + timeout: float = 1.0, +) -> SerialTransport: + """Construct an unopened transport for ``url``. Caller invokes ``open()``.""" + device, parsed_baud = parse_serial_url(url) + effective_baud = baud_rate or parsed_baud + + if device.startswith("usb:"): + from ledgrab.core.devices.android_serial_transport import ( + AndroidSerialTransport, + ) + + return AndroidSerialTransport(device, effective_baud) + + return PySerialTransport(device, effective_baud, timeout) + + +class PySerialTransport: + """Default serial transport backed by ``pyserial`` on desktop hosts.""" + + def __init__(self, device: str, baud_rate: int, timeout: float = 1.0) -> None: + self._device = device + self._baud_rate = baud_rate + self._timeout = timeout + self._serial = None + + @property + def is_open(self) -> bool: + return self._serial is not None and self._serial.is_open + + def open(self) -> None: + import serial # imported here so Android (no real pyserial) doesn't fail at import + + self._serial = serial.Serial( + port=self._device, baudrate=self._baud_rate, timeout=self._timeout + ) + + def write(self, data: bytes) -> None: + if self._serial is None: + raise RuntimeError(f"Serial port {self._device} is not open") + self._serial.write(data) + + def flush(self) -> None: + if self._serial is not None: + self._serial.flush() + + def close(self) -> None: + if self._serial is not None and self._serial.is_open: + try: + self._serial.close() + except Exception as e: + logger.warning("Error closing %s: %s", self._device, e) + self._serial = None diff --git a/server/tests/test_serial_transport.py b/server/tests/test_serial_transport.py new file mode 100644 index 0000000..1398cc0 --- /dev/null +++ b/server/tests/test_serial_transport.py @@ -0,0 +1,153 @@ +"""Tests for the serial transport abstraction.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from ledgrab.core.devices.serial_transport import ( + DEFAULT_BAUD_RATE, + PySerialTransport, + SerialPortInfo, + list_serial_ports, + open_transport, + parse_serial_url, + port_exists, +) + + +# ── URL parsing ──────────────────────────────────────────────────── + + +@pytest.mark.parametrize( + "url,expected", + [ + ("COM3", ("COM3", DEFAULT_BAUD_RATE)), + ("COM3:230400", ("COM3", 230400)), + ("/dev/ttyUSB0", ("/dev/ttyUSB0", DEFAULT_BAUD_RATE)), + ("/dev/ttyUSB0:230400", ("/dev/ttyUSB0", 230400)), + # USB URLs use @ for baud since `:` separates VID:PID:serial + ("usb:1a86:7523", ("usb:1a86:7523", DEFAULT_BAUD_RATE)), + ("usb:1a86:7523:AB01", ("usb:1a86:7523:AB01", DEFAULT_BAUD_RATE)), + ("usb:1a86:7523@230400", ("usb:1a86:7523", 230400)), + ("usb:1a86:7523:AB01@500000", ("usb:1a86:7523:AB01", 500000)), + ], +) +def test_parse_serial_url(url, expected): + assert parse_serial_url(url) == expected + + +def test_parse_serial_url_strips_whitespace(): + assert parse_serial_url(" COM3 ") == ("COM3", DEFAULT_BAUD_RATE) + + +# ── PySerialTransport ────────────────────────────────────────────── + + +def test_pyserial_transport_open_and_write(monkeypatch): + fake_serial_module = MagicMock() + fake_handle = MagicMock(is_open=True) + fake_serial_module.Serial.return_value = fake_handle + + monkeypatch.setitem(__import__("sys").modules, "serial", fake_serial_module) + + t = PySerialTransport("COM3", 115200) + assert not t.is_open + t.open() + fake_serial_module.Serial.assert_called_once_with(port="COM3", baudrate=115200, timeout=1.0) + assert t.is_open + + t.write(b"hello") + fake_handle.write.assert_called_once_with(b"hello") + + t.flush() + fake_handle.flush.assert_called_once() + + t.close() + fake_handle.close.assert_called_once() + + +def test_pyserial_transport_write_before_open_raises(): + t = PySerialTransport("COM3", 115200) + with pytest.raises(RuntimeError, match="not open"): + t.write(b"x") + + +# ── Factory ──────────────────────────────────────────────────────── + + +def test_open_transport_picks_pyserial_for_com_url(): + t = open_transport("COM3:230400") + assert isinstance(t, PySerialTransport) + assert t._device == "COM3" + assert t._baud_rate == 230400 + + +def test_open_transport_explicit_baud_overrides_url(): + t = open_transport("COM3:230400", baud_rate=500000) + assert t._baud_rate == 500000 + + +def test_open_transport_picks_android_for_usb_url(monkeypatch): + """usb: URLs should route to the Android transport even off-Android.""" + # Importing AndroidSerialTransport itself works on any host; only the + # bridge call inside .open() fails when not on Android. + t = open_transport("usb:1a86:7523@230400") + assert type(t).__name__ == "AndroidSerialTransport" + assert t._baud_rate == 230400 + + +# ── Discovery ────────────────────────────────────────────────────── + + +def test_list_serial_ports_uses_pyserial_on_desktop(monkeypatch): + monkeypatch.setattr("ledgrab.core.devices.serial_transport.is_android", lambda: False) + + fake_module = MagicMock() + fake_module.tools.list_ports.comports.return_value = [ + MagicMock(device="COM3", description="USB Serial CH340"), + MagicMock(device="COM4", description=None), + ] + with patch.dict( + "sys.modules", + { + "serial": fake_module, + "serial.tools": fake_module.tools, + "serial.tools.list_ports": fake_module.tools.list_ports, + }, + ): + ports = list_serial_ports() + + assert len(ports) == 2 + assert ports[0] == SerialPortInfo(device="COM3", description="USB Serial CH340") + # Falls back to device id when description is None + assert ports[1].description == "COM4" + + +def test_list_serial_ports_routes_to_android_when_on_android(monkeypatch): + monkeypatch.setattr("ledgrab.core.devices.serial_transport.is_android", lambda: True) + + fake_devices = [SerialPortInfo(device="usb:1a86:7523", description="CH340 LED")] + fake_mod = MagicMock() + fake_mod.list_android_usb_devices.return_value = fake_devices + monkeypatch.setitem( + __import__("sys").modules, + "ledgrab.core.devices.android_serial_transport", + fake_mod, + ) + + assert list_serial_ports() == fake_devices + + +def test_port_exists_is_case_insensitive(monkeypatch): + fake_ports = [ + SerialPortInfo(device="COM3", description="x"), + SerialPortInfo(device="/dev/ttyUSB0", description="y"), + ] + monkeypatch.setattr( + "ledgrab.core.devices.serial_transport.list_serial_ports", lambda: fake_ports + ) + assert port_exists("com3") is True + assert port_exists("/dev/ttyusb0") is True + assert port_exists("COM99") is False