diff --git a/TODO.md b/TODO.md
index 4584cf8..6db4d31 100644
--- a/TODO.md
+++ b/TODO.md
@@ -52,19 +52,15 @@ Known projects using this approach for reference: scrcpy-hidden-api, shizuku, co
Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV box via USB-to-serial adapters.
-- [ ] Add [usb-serial-for-android](https://github.com/mik3y/usb-serial-for-android) dependency to `android/app/build.gradle.kts`
-- [ ] Create Kotlin `UsbSerialBridge` class that:
- - Enumerates USB serial devices via Android USB Host API
- - Requests user permission for USB device access
- - Opens a serial connection (baud rate configurable)
- - Exposes a write method callable from Python via Chaquopy
-- [ ] Create Python `AndroidSerialProvider` in `server/src/ledgrab/core/devices/` that:
- - Replaces `pyserial` on Android (which can't access USB ports)
- - Calls `UsbSerialBridge` via Chaquopy to send LED data
- - Registers as an alternative serial transport when `is_android()` is True
-- [ ] Add USB device permission dialog to `MainActivity` (auto-triggered on device connect)
-- [ ] Test with common USB-to-serial chips: CH340, CP2102, FTDI
-- [ ] Document supported USB LED controllers in README
+- [x] Added `com.github.mik3y:usb-serial-for-android:3.8.1` (via JitPack) to `android/app/build.gradle.kts`.
+- [x] Kotlin `UsbSerialBridge` singleton (`android/app/src/main/java/com/ledgrab/android/UsbSerialBridge.kt`) — exposes `listDevices()`, `open(vid, pid, serial, baud)`, `write(handle, ByteArray)`, `close(handle)`. Permission request fires automatically from `open()` when the user hasn't granted access yet. Handles are opaque integers, port map is synchronized, so Python threads can share one bridge.
+- [x] Python `AndroidSerialTransport` in `server/src/ledgrab/core/devices/android_serial_transport.py` drives the bridge through Chaquopy. `SerialTransport` Protocol + `PySerialTransport` + `list_serial_ports()` factory live in `serial_transport.py`; `AdalightClient` and `SerialDeviceProvider` now go through the abstraction instead of importing `pyserial` directly.
+- [x] URL scheme extended: `usb:VID:PID[:serial][@baud]` on Android alongside the existing `COM3[:baud]` / `/dev/ttyUSB0[:baud]` desktop paths.
+- [x] App initializes the bridge on startup (`LedGrabApp.onCreate` → `UsbSerialBridge.init(this)`); manifest declares `uses-feature android.hardware.usb.host`.
+- [ ] Real-device test pending — no USB-serial hardware on dev machine. Need to verify on a TV box with CH340, CP2102, or FTDI adapter.
+- [ ] Document supported USB LED controllers in README (once real-device test passes).
+- [ ] Optional: auto-launch the app when a known USB-serial adapter is plugged in (intent-filter on `USB_DEVICE_ATTACHED` + `res/xml/device_filter.xml`). Skipped in v1 — users can just open LedGrab and hit "Discover".
+- [ ] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) still imports `pyserial` directly and needs bidirectional reads — separate refactor to extend the transport with `read()` if ESP-NOW-via-USB on Android is needed.
## Performance Metrics Abstraction
diff --git a/android/app/build.gradle.kts b/android/app/build.gradle.kts
index b2be883..cfec31a 100644
--- a/android/app/build.gradle.kts
+++ b/android/app/build.gradle.kts
@@ -116,4 +116,7 @@ dependencies {
implementation("androidx.lifecycle:lifecycle-service:2.8.7")
// QR code generation for displaying server URL on TV
implementation("com.google.zxing:core:3.5.3")
+ // USB-serial drivers (CH340, CP2102, FTDI, Prolific, CDC-ACM) for
+ // driving Adalight/AmbiLED controllers plugged into Android TV boxes.
+ implementation("com.github.mik3y:usb-serial-for-android:3.8.1")
}
diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml
index 4e81310..485b4bd 100644
--- a/android/app/src/main/AndroidManifest.xml
+++ b/android/app/src/main/AndroidManifest.xml
@@ -21,6 +21,12 @@
android:name="android.hardware.touchscreen"
android:required="false" />
+
+
+
()
+
+ /** Called once from [LedGrabApp.onCreate] so we can resolve services. */
+ @JvmStatic
+ fun init(context: Context) {
+ val app = context.applicationContext
+ appContext = app
+
+ val filter = IntentFilter(ACTION_USB_PERMISSION)
+ val receiver = object : BroadcastReceiver() {
+ override fun onReceive(ctx: Context, intent: Intent) {
+ // We just log; the next open() call checks hasPermission() again.
+ val granted = intent.getBooleanExtra(
+ UsbManager.EXTRA_PERMISSION_GRANTED,
+ false,
+ )
+ Log.i(TAG, "USB permission broadcast: granted=$granted")
+ }
+ }
+ // Android 14 requires RECEIVER_NOT_EXPORTED for non-system broadcasts.
+ if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
+ app.registerReceiver(receiver, filter, Context.RECEIVER_NOT_EXPORTED)
+ } else {
+ @Suppress("UnspecifiedRegisterReceiverFlag")
+ app.registerReceiver(receiver, filter)
+ }
+ }
+
+ private fun ctx(): Context =
+ appContext ?: error("UsbSerialBridge.init() not called — app context unavailable")
+
+ private fun safeSerial(driver: UsbSerialDriver): String =
+ try {
+ driver.device.serialNumber ?: ""
+ } catch (_: SecurityException) {
+ // Reading the serial requires USB permission on API 29+.
+ ""
+ }
+
+ /**
+ * Enumerate attached USB-serial devices.
+ *
+ * Each entry is `"VID|PID|serial|description"` with VID/PID as
+ * 4-char lowercase hex. Pipe is used as the separator so device
+ * descriptions containing colons (common on FTDI strings) don't
+ * confuse the Python parser.
+ */
+ @JvmStatic
+ fun listDevices(): List {
+ val manager = ctx().getSystemService(Context.USB_SERVICE) as UsbManager
+ val drivers = UsbSerialProber.getDefaultProber().findAllDrivers(manager)
+ return drivers.map { driver ->
+ val dev = driver.device
+ val vid = "%04x".format(dev.vendorId)
+ val pid = "%04x".format(dev.productId)
+ val serial = safeSerial(driver)
+ val description = buildString {
+ append(dev.manufacturerName ?: "USB")
+ val product = dev.productName
+ if (!product.isNullOrBlank()) {
+ append(' ')
+ append(product)
+ }
+ }.trim().ifEmpty { "USB $vid:$pid" }
+ "$vid|$pid|$serial|$description"
+ }
+ }
+
+ /**
+ * Open the first matching USB-serial device. Returns a non-negative
+ * opaque handle on success, -1 on failure (device not found, user
+ * denied permission, or driver error). Failures also trigger an
+ * async permission-request dialog when applicable — subsequent
+ * open() calls will succeed once the user grants.
+ */
+ @JvmStatic
+ fun open(vendorId: Int, productId: Int, serial: String, baud: Int): Int {
+ val context = ctx()
+ val manager = context.getSystemService(Context.USB_SERVICE) as UsbManager
+ val drivers = UsbSerialProber.getDefaultProber().findAllDrivers(manager)
+ val driver = drivers.firstOrNull { d ->
+ val dev = d.device
+ dev.vendorId == vendorId &&
+ dev.productId == productId &&
+ (serial.isEmpty() || safeSerial(d) == serial)
+ }
+ if (driver == null) {
+ Log.w(TAG, "No matching device for $vendorId:$productId:$serial")
+ return -1
+ }
+
+ if (!manager.hasPermission(driver.device)) {
+ Log.w(TAG, "USB permission not yet granted for ${driver.device.deviceName}")
+ requestPermission(context, manager, driver)
+ return -1
+ }
+
+ val connection = manager.openDevice(driver.device)
+ if (connection == null) {
+ Log.w(TAG, "openDevice returned null for ${driver.device.deviceName}")
+ return -1
+ }
+ val port = driver.ports.firstOrNull()
+ if (port == null) {
+ connection.close()
+ Log.w(TAG, "Driver reports no ports for ${driver.device.deviceName}")
+ return -1
+ }
+
+ try {
+ port.open(connection)
+ port.setParameters(
+ baud,
+ 8,
+ UsbSerialPort.STOPBITS_1,
+ UsbSerialPort.PARITY_NONE,
+ )
+ } catch (e: Exception) {
+ Log.e(TAG, "Failed to configure serial port", e)
+ runCatching { port.close() }
+ return -1
+ }
+
+ val handle = handleSeq.getAndIncrement()
+ synchronized(openPorts) { openPorts[handle] = port }
+ Log.i(
+ TAG,
+ "Opened USB serial ${driver.device.deviceName} baud=$baud handle=$handle",
+ )
+ return handle
+ }
+
+ /** Write bytes to the previously-opened handle. Throws if invalid. */
+ @JvmStatic
+ fun write(handle: Int, data: ByteArray) {
+ val port = synchronized(openPorts) { openPorts[handle] }
+ ?: throw IllegalStateException("Invalid handle $handle")
+ // 1s write timeout matches the old pyserial `timeout=1` behavior.
+ port.write(data, 1_000)
+ }
+
+ /** Close a previously-opened handle. Silently ignores unknown handles. */
+ @JvmStatic
+ fun close(handle: Int) {
+ val port = synchronized(openPorts) { openPorts.remove(handle) } ?: return
+ runCatching { port.close() }
+ .onFailure { Log.w(TAG, "close($handle): ${it.message}") }
+ }
+
+ private fun requestPermission(
+ context: Context,
+ manager: UsbManager,
+ driver: UsbSerialDriver,
+ ) {
+ val flags =
+ if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
+ PendingIntent.FLAG_IMMUTABLE
+ } else {
+ 0
+ }
+ val intent = Intent(ACTION_USB_PERMISSION).apply {
+ setPackage(context.packageName)
+ }
+ val pending = PendingIntent.getBroadcast(context, 0, intent, flags)
+ manager.requestPermission(driver.device, pending)
+ }
+}
diff --git a/android/settings.gradle.kts b/android/settings.gradle.kts
index d22e677..d02007d 100644
--- a/android/settings.gradle.kts
+++ b/android/settings.gradle.kts
@@ -11,6 +11,8 @@ dependencyResolutionManagement {
repositories {
google()
mavenCentral()
+ // usb-serial-for-android (mik3y) is distributed via JitPack
+ maven { url = uri("https://jitpack.io") }
}
}
diff --git a/server/src/ledgrab/core/devices/adalight_client.py b/server/src/ledgrab/core/devices/adalight_client.py
index 7141478..cca63bc 100644
--- a/server/src/ledgrab/core/devices/adalight_client.py
+++ b/server/src/ledgrab/core/devices/adalight_client.py
@@ -7,40 +7,21 @@ from typing import Optional, Tuple
import numpy as np
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
+from ledgrab.core.devices.serial_transport import (
+ open_transport,
+ parse_serial_url,
+ port_exists,
+)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
-DEFAULT_BAUD_RATE = 115200
ARDUINO_RESET_DELAY = 2.0 # seconds to wait after opening serial for Arduino bootloader
def parse_adalight_url(url: str) -> Tuple[str, int]:
- """Parse an Adalight URL into (port, baud_rate).
-
- Formats:
- "COM3" -> ("COM3", 115200)
- "COM3:230400" -> ("COM3", 230400)
- "/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200)
- """
- url = url.strip()
- if ":" in url and not url.startswith("/"):
- # Windows COM port with baud: "COM3:230400"
- parts = url.rsplit(":", 1)
- try:
- baud = int(parts[1])
- return parts[0], baud
- except ValueError:
- pass
- elif ":" in url and url.startswith("/"):
- # Unix path with baud: "/dev/ttyUSB0:230400"
- parts = url.rsplit(":", 1)
- try:
- baud = int(parts[1])
- return parts[0], baud
- except ValueError:
- pass
- return url, DEFAULT_BAUD_RATE
+ """Backwards-compatible alias for :func:`parse_serial_url`."""
+ return parse_serial_url(url)
def _build_adalight_header(led_count: int) -> bytes:
@@ -81,13 +62,12 @@ class AdalightClient(LEDClient):
async def connect(self) -> bool:
"""Open serial port and wait for Arduino reset."""
- import serial
-
try:
- self._serial = await asyncio.to_thread(
- serial.Serial, port=self._port, baudrate=self._baud_rate, timeout=1
- )
- # Wait for Arduino to finish bootloader reset (non-blocking)
+ self._serial = open_transport(self._port, baud_rate=self._baud_rate, timeout=1)
+ await asyncio.to_thread(self._serial.open)
+ # Wait for Arduino to finish bootloader reset (non-blocking).
+ # USB-to-TTL adapters without DTR don't reset, but the delay
+ # is harmless on those — keeps the path uniform.
await asyncio.sleep(ARDUINO_RESET_DELAY)
self._connected = True
logger.info(
@@ -122,7 +102,7 @@ class AdalightClient(LEDClient):
f"led_count={self._led_count}"
)
self._connected = False
- if self._serial and self._serial.is_open:
+ if self._serial is not None:
try:
self._serial.close()
except Exception as e:
@@ -182,18 +162,13 @@ class AdalightClient(LEDClient):
) -> DeviceHealth:
"""Check if the serial port exists without opening it.
- Enumerates COM ports to avoid exclusive-access conflicts on Windows.
+ Enumerates COM ports (or USB devices on Android) to avoid
+ exclusive-access conflicts on Windows.
"""
port, _baud = parse_adalight_url(url)
try:
- import serial.tools.list_ports
-
- available_ports = [p.device for p in serial.tools.list_ports.comports()]
- port_upper = port.upper()
- found = any(p.upper() == port_upper for p in available_ports)
-
- if found:
+ if port_exists(port):
return DeviceHealth(
online=True,
latency_ms=0.0,
@@ -202,12 +177,11 @@ class AdalightClient(LEDClient):
device_version=None,
device_led_count=prev_health.device_led_count if prev_health else None,
)
- else:
- return DeviceHealth(
- online=False,
- last_checked=datetime.now(timezone.utc),
- error=f"Serial port {port} not found",
- )
+ return DeviceHealth(
+ online=False,
+ last_checked=datetime.now(timezone.utc),
+ error=f"Serial port {port} not found",
+ )
except Exception as e:
return DeviceHealth(
online=False,
diff --git a/server/src/ledgrab/core/devices/android_serial_transport.py b/server/src/ledgrab/core/devices/android_serial_transport.py
new file mode 100644
index 0000000..d0f6775
--- /dev/null
+++ b/server/src/ledgrab/core/devices/android_serial_transport.py
@@ -0,0 +1,159 @@
+"""Android USB-serial transport backed by the Kotlin ``UsbSerialBridge``.
+
+Calls into Java land through Chaquopy; this module only loads on
+Android. URL format: ``usb:VID:PID`` or ``usb:VID:PID:serial`` (with an
+optional ``@baud`` suffix).
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import List, Optional, Tuple
+
+from ledgrab.utils import get_logger
+from ledgrab.utils.platform import is_android
+
+from .serial_transport import SerialPortInfo
+
+logger = get_logger(__name__)
+
+
+def _bridge():
+ """Return the singleton Kotlin ``UsbSerialBridge`` instance, or raise.
+
+ The bridge exposes static methods ``listDevices()``, ``open(...)``,
+ ``write(...)``, ``close(...)``, etc. — see ``UsbSerialBridge.kt``.
+ """
+ if not is_android():
+ raise RuntimeError("AndroidSerialTransport is only usable on Android")
+ try:
+ from java import jclass # type: ignore[import-not-found]
+ except ImportError as e:
+ raise RuntimeError("Chaquopy java interop not available") from e
+
+ return jclass("com.ledgrab.android.UsbSerialBridge").INSTANCE
+
+
+@dataclass(frozen=True)
+class _UsbAddress:
+ vendor_id: int
+ product_id: int
+ serial: Optional[str]
+
+ @classmethod
+ def parse(cls, device: str) -> "_UsbAddress":
+ if not device.startswith("usb:"):
+ raise ValueError(f"Not a USB device URL: {device!r}")
+ body = device[len("usb:") :]
+ parts = body.split(":")
+ if len(parts) < 2:
+ raise ValueError(
+ f"USB URL must be 'usb:VID:PID' or 'usb:VID:PID:serial' (got {device!r})"
+ )
+ try:
+ vid = int(parts[0], 16)
+ pid = int(parts[1], 16)
+ except ValueError as e:
+ raise ValueError(f"VID/PID must be hex: {device!r}") from e
+ serial = parts[2] if len(parts) >= 3 and parts[2] else None
+ return cls(vid, pid, serial)
+
+
+def _format_url(vid: int, pid: int, serial: Optional[str]) -> str:
+ base = f"usb:{vid:04x}:{pid:04x}"
+ return f"{base}:{serial}" if serial else base
+
+
+def list_android_usb_devices() -> List[SerialPortInfo]:
+ """Return USB-serial devices currently attached to the Android host."""
+ try:
+ bridge = _bridge()
+ except Exception as e:
+ logger.debug("UsbSerialBridge unavailable: %s", e)
+ return []
+
+ out: List[SerialPortInfo] = []
+ try:
+ # listDevices() returns a Java List of "vid:pid:serial:description"
+ # tuples — split here to keep the Kotlin DTO trivial.
+ for entry in bridge.listDevices():
+ entry_s = str(entry)
+ parts = entry_s.split("|", 3)
+ if len(parts) < 4:
+ continue
+ vid_str, pid_str, serial, description = parts
+ try:
+ vid = int(vid_str, 16)
+ pid = int(pid_str, 16)
+ except ValueError:
+ continue
+ url = _format_url(vid, pid, serial or None)
+ out.append(SerialPortInfo(device=url, description=description or url))
+ except Exception as e:
+ logger.warning("UsbSerialBridge.listDevices failed: %s", e)
+ return out
+
+
+class AndroidSerialTransport:
+ """Serial transport that pipes writes through the Kotlin USB bridge."""
+
+ def __init__(self, device: str, baud_rate: int) -> None:
+ self._url = device
+ self._addr = _UsbAddress.parse(device)
+ self._baud_rate = baud_rate
+ self._handle: Optional[int] = None # opaque token from the bridge
+
+ @property
+ def is_open(self) -> bool:
+ return self._handle is not None
+
+ def open(self) -> None:
+ bridge = _bridge()
+ # open() returns a non-negative int handle on success, or -1 if the
+ # device wasn't found / permission was denied.
+ handle = int(
+ bridge.open(
+ self._addr.vendor_id,
+ self._addr.product_id,
+ self._addr.serial or "",
+ self._baud_rate,
+ )
+ )
+ if handle < 0:
+ raise RuntimeError(
+ f"Failed to open USB serial device {self._url} "
+ f"(handle={handle}; user may have denied permission)"
+ )
+ self._handle = handle
+ logger.info("Android USB serial opened: %s @ %d baud", self._url, self._baud_rate)
+
+ def write(self, data: bytes) -> None:
+ if self._handle is None:
+ raise RuntimeError(f"USB serial {self._url} is not open")
+ bridge = _bridge()
+ # Chaquopy auto-marshals bytes → Java byte[].
+ bridge.write(self._handle, data)
+
+ def flush(self) -> None:
+ # USB endpoints are unbuffered from this side — nothing to flush.
+ return
+
+ def close(self) -> None:
+ if self._handle is None:
+ return
+ try:
+ _bridge().close(self._handle)
+ except Exception as e:
+ logger.warning("Error closing USB serial %s: %s", self._url, e)
+ finally:
+ self._handle = None
+
+
+__all__ = [
+ "AndroidSerialTransport",
+ "list_android_usb_devices",
+]
+
+
+def _used() -> Tuple[type, ...]: # pragma: no cover — re-export marker
+ return (AndroidSerialTransport,)
diff --git a/server/src/ledgrab/core/devices/serial_provider.py b/server/src/ledgrab/core/devices/serial_provider.py
index 72efdab..60a660f 100644
--- a/server/src/ledgrab/core/devices/serial_provider.py
+++ b/server/src/ledgrab/core/devices/serial_provider.py
@@ -14,6 +14,10 @@ from ledgrab.core.devices.led_client import (
DiscoveredDevice,
LEDDeviceProvider,
)
+from ledgrab.core.devices.serial_transport import (
+ list_serial_ports,
+ port_exists,
+)
from ledgrab.utils import get_logger
logger = get_logger(__name__)
@@ -47,15 +51,9 @@ class SerialDeviceProvider(LEDDeviceProvider):
port, _baud = parse_adalight_url(url)
try:
- import serial.tools.list_ports
-
- available_ports = [p.device for p in serial.tools.list_ports.comports()]
- port_upper = port.upper()
- if not any(p.upper() == port_upper for p in available_ports):
- raise ValueError(
- f"Serial port {port} not found. "
- f"Available ports: {', '.join(available_ports) or 'none'}"
- )
+ if not port_exists(port):
+ available = ", ".join(p.device for p in list_serial_ports()) or "none"
+ raise ValueError(f"Serial port {port} not found. Available ports: {available}")
except ValueError:
raise
except Exception as e:
@@ -67,22 +65,19 @@ class SerialDeviceProvider(LEDDeviceProvider):
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
"""Discover serial ports that could be LED devices."""
try:
- import serial.tools.list_ports
-
- ports = serial.tools.list_ports.comports()
- results = []
- for port_info in ports:
- results.append(
- DiscoveredDevice(
- name=port_info.description or port_info.device,
- url=port_info.device,
- device_type=self.device_type,
- ip=port_info.device,
- mac="",
- led_count=None,
- version=None,
- )
+ ports = list_serial_ports()
+ results = [
+ DiscoveredDevice(
+ name=port_info.description,
+ url=port_info.device,
+ device_type=self.device_type,
+ ip=port_info.device,
+ mac="",
+ led_count=None,
+ version=None,
)
+ for port_info in ports
+ ]
logger.info(f"{self.device_type} serial port scan found {len(results)} port(s)")
return results
except Exception as e:
diff --git a/server/src/ledgrab/core/devices/serial_transport.py b/server/src/ledgrab/core/devices/serial_transport.py
new file mode 100644
index 0000000..e247225
--- /dev/null
+++ b/server/src/ledgrab/core/devices/serial_transport.py
@@ -0,0 +1,174 @@
+"""Serial transport abstraction for LED clients.
+
+Wraps the platform-specific way to open a serial line so callers
+(AdalightClient, etc.) don't import ``pyserial`` directly. The primary
+motivation is Android: under Chaquopy, ``pyserial`` exists but cannot
+touch USB ports, so we route writes through a Kotlin
+``UsbSerialBridge`` instead.
+
+URL format
+----------
+- ``"COM3"`` or ``"COM3:115200"`` — desktop COM port (Windows)
+- ``"/dev/ttyUSB0"`` or ``"/dev/ttyUSB0:115200"`` — desktop tty
+- ``"usb:VID:PID"`` or ``"usb:VID:PID:serial@115200"`` — Android USB device
+
+Selection happens in :func:`open_transport`: anything starting with
+``usb:`` goes through :class:`AndroidSerialTransport`, everything else
+through :class:`PySerialTransport`. :func:`list_serial_ports` returns
+desktop COM ports on desktop, USB devices on Android.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import List, Optional, Protocol
+
+from ledgrab.utils import get_logger
+from ledgrab.utils.platform import is_android
+
+logger = get_logger(__name__)
+
+DEFAULT_BAUD_RATE = 115200
+
+
+@dataclass(frozen=True)
+class SerialPortInfo:
+ """A discovered serial port, regardless of source platform."""
+
+ device: str # URL-like identifier passed back to open_transport()
+ description: str # Human-friendly name for the UI
+
+
+class SerialTransport(Protocol):
+ """Minimal serial line — open, write bytes, close. No reads (LED out only)."""
+
+ @property
+ def is_open(self) -> bool: ...
+ def open(self) -> None: ...
+ def write(self, data: bytes) -> None: ...
+ def flush(self) -> None: ...
+ def close(self) -> None: ...
+
+
+def parse_serial_url(url: str) -> tuple[str, int]:
+ """Parse a serial URL into (device, baud_rate).
+
+ Recognized formats::
+
+ "COM3" -> ("COM3", 115200)
+ "COM3:230400" -> ("COM3", 230400)
+ "/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200)
+ "/dev/ttyUSB0:230400"-> ("/dev/ttyUSB0", 230400)
+ "usb:1a86:7523" -> ("usb:1a86:7523", 115200)
+ "usb:1a86:7523@250k" -> raises (only ints accepted for baud)
+ "usb:1a86:7523@230400" -> ("usb:1a86:7523", 230400)
+
+ USB URLs use ``@`` to separate baud because the colon is already
+ the field separator between vendor and product IDs.
+ """
+ raw = url.strip()
+ if raw.startswith("usb:"):
+ if "@" in raw:
+ device, _, baud_str = raw.rpartition("@")
+ try:
+ return device, int(baud_str)
+ except ValueError:
+ pass
+ return raw, DEFAULT_BAUD_RATE
+
+ # Desktop COM/tty paths use a trailing :BAUD suffix.
+ if ":" in raw:
+ head, _, tail = raw.rpartition(":")
+ try:
+ return head, int(tail)
+ except ValueError:
+ pass
+ return raw, DEFAULT_BAUD_RATE
+
+
+def list_serial_ports() -> List[SerialPortInfo]:
+ """Enumerate serial ports available on this host (or USB devices on Android)."""
+ if is_android():
+ try:
+ from ledgrab.core.devices.android_serial_transport import (
+ list_android_usb_devices,
+ )
+
+ return list_android_usb_devices()
+ except Exception as e:
+ logger.warning("Android USB enumeration failed: %s", e)
+ return []
+
+ try:
+ import serial.tools.list_ports
+ except ImportError:
+ logger.warning("pyserial not available — serial enumeration disabled")
+ return []
+
+ return [
+ SerialPortInfo(device=p.device, description=p.description or p.device)
+ for p in serial.tools.list_ports.comports()
+ ]
+
+
+def port_exists(device: str) -> bool:
+ """Return True if a port with this device id is currently visible."""
+ target = device.upper()
+ return any(p.device.upper() == target for p in list_serial_ports())
+
+
+def open_transport(
+ url: str,
+ baud_rate: Optional[int] = None,
+ timeout: float = 1.0,
+) -> SerialTransport:
+ """Construct an unopened transport for ``url``. Caller invokes ``open()``."""
+ device, parsed_baud = parse_serial_url(url)
+ effective_baud = baud_rate or parsed_baud
+
+ if device.startswith("usb:"):
+ from ledgrab.core.devices.android_serial_transport import (
+ AndroidSerialTransport,
+ )
+
+ return AndroidSerialTransport(device, effective_baud)
+
+ return PySerialTransport(device, effective_baud, timeout)
+
+
+class PySerialTransport:
+ """Default serial transport backed by ``pyserial`` on desktop hosts."""
+
+ def __init__(self, device: str, baud_rate: int, timeout: float = 1.0) -> None:
+ self._device = device
+ self._baud_rate = baud_rate
+ self._timeout = timeout
+ self._serial = None
+
+ @property
+ def is_open(self) -> bool:
+ return self._serial is not None and self._serial.is_open
+
+ def open(self) -> None:
+ import serial # imported here so Android (no real pyserial) doesn't fail at import
+
+ self._serial = serial.Serial(
+ port=self._device, baudrate=self._baud_rate, timeout=self._timeout
+ )
+
+ def write(self, data: bytes) -> None:
+ if self._serial is None:
+ raise RuntimeError(f"Serial port {self._device} is not open")
+ self._serial.write(data)
+
+ def flush(self) -> None:
+ if self._serial is not None:
+ self._serial.flush()
+
+ def close(self) -> None:
+ if self._serial is not None and self._serial.is_open:
+ try:
+ self._serial.close()
+ except Exception as e:
+ logger.warning("Error closing %s: %s", self._device, e)
+ self._serial = None
diff --git a/server/tests/test_serial_transport.py b/server/tests/test_serial_transport.py
new file mode 100644
index 0000000..1398cc0
--- /dev/null
+++ b/server/tests/test_serial_transport.py
@@ -0,0 +1,153 @@
+"""Tests for the serial transport abstraction."""
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from ledgrab.core.devices.serial_transport import (
+ DEFAULT_BAUD_RATE,
+ PySerialTransport,
+ SerialPortInfo,
+ list_serial_ports,
+ open_transport,
+ parse_serial_url,
+ port_exists,
+)
+
+
+# ── URL parsing ────────────────────────────────────────────────────
+
+
+@pytest.mark.parametrize(
+ "url,expected",
+ [
+ ("COM3", ("COM3", DEFAULT_BAUD_RATE)),
+ ("COM3:230400", ("COM3", 230400)),
+ ("/dev/ttyUSB0", ("/dev/ttyUSB0", DEFAULT_BAUD_RATE)),
+ ("/dev/ttyUSB0:230400", ("/dev/ttyUSB0", 230400)),
+ # USB URLs use @ for baud since `:` separates VID:PID:serial
+ ("usb:1a86:7523", ("usb:1a86:7523", DEFAULT_BAUD_RATE)),
+ ("usb:1a86:7523:AB01", ("usb:1a86:7523:AB01", DEFAULT_BAUD_RATE)),
+ ("usb:1a86:7523@230400", ("usb:1a86:7523", 230400)),
+ ("usb:1a86:7523:AB01@500000", ("usb:1a86:7523:AB01", 500000)),
+ ],
+)
+def test_parse_serial_url(url, expected):
+ assert parse_serial_url(url) == expected
+
+
+def test_parse_serial_url_strips_whitespace():
+ assert parse_serial_url(" COM3 ") == ("COM3", DEFAULT_BAUD_RATE)
+
+
+# ── PySerialTransport ──────────────────────────────────────────────
+
+
+def test_pyserial_transport_open_and_write(monkeypatch):
+ fake_serial_module = MagicMock()
+ fake_handle = MagicMock(is_open=True)
+ fake_serial_module.Serial.return_value = fake_handle
+
+ monkeypatch.setitem(__import__("sys").modules, "serial", fake_serial_module)
+
+ t = PySerialTransport("COM3", 115200)
+ assert not t.is_open
+ t.open()
+ fake_serial_module.Serial.assert_called_once_with(port="COM3", baudrate=115200, timeout=1.0)
+ assert t.is_open
+
+ t.write(b"hello")
+ fake_handle.write.assert_called_once_with(b"hello")
+
+ t.flush()
+ fake_handle.flush.assert_called_once()
+
+ t.close()
+ fake_handle.close.assert_called_once()
+
+
+def test_pyserial_transport_write_before_open_raises():
+ t = PySerialTransport("COM3", 115200)
+ with pytest.raises(RuntimeError, match="not open"):
+ t.write(b"x")
+
+
+# ── Factory ────────────────────────────────────────────────────────
+
+
+def test_open_transport_picks_pyserial_for_com_url():
+ t = open_transport("COM3:230400")
+ assert isinstance(t, PySerialTransport)
+ assert t._device == "COM3"
+ assert t._baud_rate == 230400
+
+
+def test_open_transport_explicit_baud_overrides_url():
+ t = open_transport("COM3:230400", baud_rate=500000)
+ assert t._baud_rate == 500000
+
+
+def test_open_transport_picks_android_for_usb_url(monkeypatch):
+ """usb: URLs should route to the Android transport even off-Android."""
+ # Importing AndroidSerialTransport itself works on any host; only the
+ # bridge call inside .open() fails when not on Android.
+ t = open_transport("usb:1a86:7523@230400")
+ assert type(t).__name__ == "AndroidSerialTransport"
+ assert t._baud_rate == 230400
+
+
+# ── Discovery ──────────────────────────────────────────────────────
+
+
+def test_list_serial_ports_uses_pyserial_on_desktop(monkeypatch):
+ monkeypatch.setattr("ledgrab.core.devices.serial_transport.is_android", lambda: False)
+
+ fake_module = MagicMock()
+ fake_module.tools.list_ports.comports.return_value = [
+ MagicMock(device="COM3", description="USB Serial CH340"),
+ MagicMock(device="COM4", description=None),
+ ]
+ with patch.dict(
+ "sys.modules",
+ {
+ "serial": fake_module,
+ "serial.tools": fake_module.tools,
+ "serial.tools.list_ports": fake_module.tools.list_ports,
+ },
+ ):
+ ports = list_serial_ports()
+
+ assert len(ports) == 2
+ assert ports[0] == SerialPortInfo(device="COM3", description="USB Serial CH340")
+ # Falls back to device id when description is None
+ assert ports[1].description == "COM4"
+
+
+def test_list_serial_ports_routes_to_android_when_on_android(monkeypatch):
+ monkeypatch.setattr("ledgrab.core.devices.serial_transport.is_android", lambda: True)
+
+ fake_devices = [SerialPortInfo(device="usb:1a86:7523", description="CH340 LED")]
+ fake_mod = MagicMock()
+ fake_mod.list_android_usb_devices.return_value = fake_devices
+ monkeypatch.setitem(
+ __import__("sys").modules,
+ "ledgrab.core.devices.android_serial_transport",
+ fake_mod,
+ )
+
+ assert list_serial_ports() == fake_devices
+
+
+def test_port_exists_is_case_insensitive(monkeypatch):
+ fake_ports = [
+ SerialPortInfo(device="COM3", description="x"),
+ SerialPortInfo(device="/dev/ttyUSB0", description="y"),
+ ]
+ monkeypatch.setattr(
+ "ledgrab.core.devices.serial_transport.list_serial_ports", lambda: fake_ports
+ )
+ assert port_exists("com3") is True
+ assert port_exists("/dev/ttyusb0") is True
+ assert port_exists("COM99") is False