feat(devices): Android USB-serial support for Adalight/AmbiLED controllers
Adds end-to-end support for driving USB-connected Adalight / AmbiLED LED controllers from Android TV boxes. Android's security model blocks direct USB access from Python, so writes route through a Kotlin UsbSerialBridge singleton via Chaquopy. Python side: - New SerialTransport Protocol (serial_transport.py) with open / write / flush / close. Desktop uses PySerialTransport (wraps pyserial), Android uses AndroidSerialTransport (wraps the Kotlin bridge). - list_serial_ports() factory returns desktop COM ports on desktop, USB devices on Android — callers don't branch. - URL scheme extended: existing COM3[:baud] and /dev/ttyUSB0[:baud] unchanged; new usb:VID:PID[:serial][@baud] for Android (@ is the baud separator since : is already used between VID and PID). - AdalightClient and SerialDeviceProvider refactored to go through the transport — no more direct pyserial imports in hot paths. - 17 new unit tests cover URL parsing, PySerial transport, factory selection, platform-branching discovery. Full suite 750 passing. Kotlin side: - UsbSerialBridge.kt singleton uses com.hoho.android.usbserial (mik3y) which ships drivers for CH340, CP2102, FTDI, Prolific, and CDC-ACM (Arduino). Exposes listDevices, open, write, close via @JvmStatic for Chaquopy. First open() attempt without permission triggers the system USB permission dialog; next call succeeds once user grants. - usb-serial-for-android is distributed via JitPack — added that repo in settings.gradle.kts and the dependency in app/build.gradle.kts. - AndroidManifest declares uses-feature android.hardware.usb.host (required=false so non-USB-host phones still install). - LedGrabApp.onCreate calls UsbSerialBridge.init(this) so the bridge resolves the UsbManager without needing an Activity ref. Verified: ./gradlew compileDebugKotlin succeeds; off-Android import of android_serial_transport works. Real-hardware smoke test on a TV box with a CH340/CP2102/FTDI adapter still pending. ESP-NOW (espnow_client / espnow_provider) still imports pyserial directly because it needs bidirectional reads — separate refactor to extend the transport with read() if that path ever needs Android USB support.
This commit is contained in:
@@ -52,19 +52,15 @@ Known projects using this approach for reference: scrcpy-hidden-api, shizuku, co
|
|||||||
|
|
||||||
Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV box via USB-to-serial adapters.
|
Drive USB LED controllers (APA102, WS2812) connected directly to the Android TV box via USB-to-serial adapters.
|
||||||
|
|
||||||
- [ ] Add [usb-serial-for-android](https://github.com/mik3y/usb-serial-for-android) dependency to `android/app/build.gradle.kts`
|
- [x] Added `com.github.mik3y:usb-serial-for-android:3.8.1` (via JitPack) to `android/app/build.gradle.kts`.
|
||||||
- [ ] Create Kotlin `UsbSerialBridge` class that:
|
- [x] Kotlin `UsbSerialBridge` singleton (`android/app/src/main/java/com/ledgrab/android/UsbSerialBridge.kt`) — exposes `listDevices()`, `open(vid, pid, serial, baud)`, `write(handle, ByteArray)`, `close(handle)`. Permission request fires automatically from `open()` when the user hasn't granted access yet. Handles are opaque integers, port map is synchronized, so Python threads can share one bridge.
|
||||||
- Enumerates USB serial devices via Android USB Host API
|
- [x] Python `AndroidSerialTransport` in `server/src/ledgrab/core/devices/android_serial_transport.py` drives the bridge through Chaquopy. `SerialTransport` Protocol + `PySerialTransport` + `list_serial_ports()` factory live in `serial_transport.py`; `AdalightClient` and `SerialDeviceProvider` now go through the abstraction instead of importing `pyserial` directly.
|
||||||
- Requests user permission for USB device access
|
- [x] URL scheme extended: `usb:VID:PID[:serial][@baud]` on Android alongside the existing `COM3[:baud]` / `/dev/ttyUSB0[:baud]` desktop paths.
|
||||||
- Opens a serial connection (baud rate configurable)
|
- [x] App initializes the bridge on startup (`LedGrabApp.onCreate` → `UsbSerialBridge.init(this)`); manifest declares `uses-feature android.hardware.usb.host`.
|
||||||
- Exposes a write method callable from Python via Chaquopy
|
- [ ] Real-device test pending — no USB-serial hardware on dev machine. Need to verify on a TV box with CH340, CP2102, or FTDI adapter.
|
||||||
- [ ] Create Python `AndroidSerialProvider` in `server/src/ledgrab/core/devices/` that:
|
- [ ] Document supported USB LED controllers in README (once real-device test passes).
|
||||||
- Replaces `pyserial` on Android (which can't access USB ports)
|
- [ ] Optional: auto-launch the app when a known USB-serial adapter is plugged in (intent-filter on `USB_DEVICE_ATTACHED` + `res/xml/device_filter.xml`). Skipped in v1 — users can just open LedGrab and hit "Discover".
|
||||||
- Calls `UsbSerialBridge` via Chaquopy to send LED data
|
- [ ] ESP-NOW client (`espnow_client.py` / `espnow_provider.py`) still imports `pyserial` directly and needs bidirectional reads — separate refactor to extend the transport with `read()` if ESP-NOW-via-USB on Android is needed.
|
||||||
- Registers as an alternative serial transport when `is_android()` is True
|
|
||||||
- [ ] Add USB device permission dialog to `MainActivity` (auto-triggered on device connect)
|
|
||||||
- [ ] Test with common USB-to-serial chips: CH340, CP2102, FTDI
|
|
||||||
- [ ] Document supported USB LED controllers in README
|
|
||||||
|
|
||||||
## Performance Metrics Abstraction
|
## Performance Metrics Abstraction
|
||||||
|
|
||||||
|
|||||||
@@ -116,4 +116,7 @@ dependencies {
|
|||||||
implementation("androidx.lifecycle:lifecycle-service:2.8.7")
|
implementation("androidx.lifecycle:lifecycle-service:2.8.7")
|
||||||
// QR code generation for displaying server URL on TV
|
// QR code generation for displaying server URL on TV
|
||||||
implementation("com.google.zxing:core:3.5.3")
|
implementation("com.google.zxing:core:3.5.3")
|
||||||
|
// USB-serial drivers (CH340, CP2102, FTDI, Prolific, CDC-ACM) for
|
||||||
|
// driving Adalight/AmbiLED controllers plugged into Android TV boxes.
|
||||||
|
implementation("com.github.mik3y:usb-serial-for-android:3.8.1")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,12 @@
|
|||||||
android:name="android.hardware.touchscreen"
|
android:name="android.hardware.touchscreen"
|
||||||
android:required="false" />
|
android:required="false" />
|
||||||
|
|
||||||
|
<!-- USB host — for USB-to-TTL adapters driving Adalight/AmbiLED
|
||||||
|
controllers. required=false so phones without USB host still install. -->
|
||||||
|
<uses-feature
|
||||||
|
android:name="android.hardware.usb.host"
|
||||||
|
android:required="false" />
|
||||||
|
|
||||||
<application
|
<application
|
||||||
android:name=".LedGrabApp"
|
android:name=".LedGrabApp"
|
||||||
android:allowBackup="true"
|
android:allowBackup="true"
|
||||||
|
|||||||
@@ -18,5 +18,9 @@ class LedGrabApp : Application() {
|
|||||||
if (!Python.isStarted()) {
|
if (!Python.isStarted()) {
|
||||||
Python.start(AndroidPlatform(this))
|
Python.start(AndroidPlatform(this))
|
||||||
}
|
}
|
||||||
|
// Bind application context for the USB-serial bridge so Python
|
||||||
|
// can enumerate and open USB-to-TTL adapters without needing
|
||||||
|
// an Activity reference.
|
||||||
|
UsbSerialBridge.init(this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,203 @@
|
|||||||
|
package com.ledgrab.android
|
||||||
|
|
||||||
|
import android.app.PendingIntent
|
||||||
|
import android.content.BroadcastReceiver
|
||||||
|
import android.content.Context
|
||||||
|
import android.content.Intent
|
||||||
|
import android.content.IntentFilter
|
||||||
|
import android.hardware.usb.UsbManager
|
||||||
|
import android.os.Build
|
||||||
|
import android.util.Log
|
||||||
|
import com.hoho.android.usbserial.driver.UsbSerialDriver
|
||||||
|
import com.hoho.android.usbserial.driver.UsbSerialPort
|
||||||
|
import com.hoho.android.usbserial.driver.UsbSerialProber
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
|
/**
|
||||||
|
* USB-serial bridge exposed to the Python server via Chaquopy.
|
||||||
|
*
|
||||||
|
* Uses the `usb-serial-for-android` library (mik3y) which ships drivers
|
||||||
|
* for the common USB-to-TTL chips (CH340, CP2102, FTDI, Prolific, and
|
||||||
|
* CDC-ACM) found on Arduino boards and Adalight/AmbiLED controllers.
|
||||||
|
*
|
||||||
|
* Python callers access the singleton instance via
|
||||||
|
* `UsbSerialBridge.INSTANCE.listDevices()` etc. — see
|
||||||
|
* `server/src/ledgrab/core/devices/android_serial_transport.py`.
|
||||||
|
*
|
||||||
|
* The bridge holds no Context of its own; [init] must be called once
|
||||||
|
* from [LedGrabApp.onCreate] to bind the application context.
|
||||||
|
*/
|
||||||
|
object UsbSerialBridge {
|
||||||
|
private const val TAG = "UsbSerialBridge"
|
||||||
|
private const val ACTION_USB_PERMISSION = "com.ledgrab.android.USB_PERMISSION"
|
||||||
|
|
||||||
|
@Volatile private var appContext: Context? = null
|
||||||
|
|
||||||
|
private val handleSeq = AtomicInteger(1)
|
||||||
|
private val openPorts = HashMap<Int, UsbSerialPort>()
|
||||||
|
|
||||||
|
/** Called once from [LedGrabApp.onCreate] so we can resolve services. */
|
||||||
|
@JvmStatic
|
||||||
|
fun init(context: Context) {
|
||||||
|
val app = context.applicationContext
|
||||||
|
appContext = app
|
||||||
|
|
||||||
|
val filter = IntentFilter(ACTION_USB_PERMISSION)
|
||||||
|
val receiver = object : BroadcastReceiver() {
|
||||||
|
override fun onReceive(ctx: Context, intent: Intent) {
|
||||||
|
// We just log; the next open() call checks hasPermission() again.
|
||||||
|
val granted = intent.getBooleanExtra(
|
||||||
|
UsbManager.EXTRA_PERMISSION_GRANTED,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
Log.i(TAG, "USB permission broadcast: granted=$granted")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Android 14 requires RECEIVER_NOT_EXPORTED for non-system broadcasts.
|
||||||
|
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
|
||||||
|
app.registerReceiver(receiver, filter, Context.RECEIVER_NOT_EXPORTED)
|
||||||
|
} else {
|
||||||
|
@Suppress("UnspecifiedRegisterReceiverFlag")
|
||||||
|
app.registerReceiver(receiver, filter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun ctx(): Context =
|
||||||
|
appContext ?: error("UsbSerialBridge.init() not called — app context unavailable")
|
||||||
|
|
||||||
|
private fun safeSerial(driver: UsbSerialDriver): String =
|
||||||
|
try {
|
||||||
|
driver.device.serialNumber ?: ""
|
||||||
|
} catch (_: SecurityException) {
|
||||||
|
// Reading the serial requires USB permission on API 29+.
|
||||||
|
""
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enumerate attached USB-serial devices.
|
||||||
|
*
|
||||||
|
* Each entry is `"VID|PID|serial|description"` with VID/PID as
|
||||||
|
* 4-char lowercase hex. Pipe is used as the separator so device
|
||||||
|
* descriptions containing colons (common on FTDI strings) don't
|
||||||
|
* confuse the Python parser.
|
||||||
|
*/
|
||||||
|
@JvmStatic
|
||||||
|
fun listDevices(): List<String> {
|
||||||
|
val manager = ctx().getSystemService(Context.USB_SERVICE) as UsbManager
|
||||||
|
val drivers = UsbSerialProber.getDefaultProber().findAllDrivers(manager)
|
||||||
|
return drivers.map { driver ->
|
||||||
|
val dev = driver.device
|
||||||
|
val vid = "%04x".format(dev.vendorId)
|
||||||
|
val pid = "%04x".format(dev.productId)
|
||||||
|
val serial = safeSerial(driver)
|
||||||
|
val description = buildString {
|
||||||
|
append(dev.manufacturerName ?: "USB")
|
||||||
|
val product = dev.productName
|
||||||
|
if (!product.isNullOrBlank()) {
|
||||||
|
append(' ')
|
||||||
|
append(product)
|
||||||
|
}
|
||||||
|
}.trim().ifEmpty { "USB $vid:$pid" }
|
||||||
|
"$vid|$pid|$serial|$description"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open the first matching USB-serial device. Returns a non-negative
|
||||||
|
* opaque handle on success, -1 on failure (device not found, user
|
||||||
|
* denied permission, or driver error). Failures also trigger an
|
||||||
|
* async permission-request dialog when applicable — subsequent
|
||||||
|
* open() calls will succeed once the user grants.
|
||||||
|
*/
|
||||||
|
@JvmStatic
|
||||||
|
fun open(vendorId: Int, productId: Int, serial: String, baud: Int): Int {
|
||||||
|
val context = ctx()
|
||||||
|
val manager = context.getSystemService(Context.USB_SERVICE) as UsbManager
|
||||||
|
val drivers = UsbSerialProber.getDefaultProber().findAllDrivers(manager)
|
||||||
|
val driver = drivers.firstOrNull { d ->
|
||||||
|
val dev = d.device
|
||||||
|
dev.vendorId == vendorId &&
|
||||||
|
dev.productId == productId &&
|
||||||
|
(serial.isEmpty() || safeSerial(d) == serial)
|
||||||
|
}
|
||||||
|
if (driver == null) {
|
||||||
|
Log.w(TAG, "No matching device for $vendorId:$productId:$serial")
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!manager.hasPermission(driver.device)) {
|
||||||
|
Log.w(TAG, "USB permission not yet granted for ${driver.device.deviceName}")
|
||||||
|
requestPermission(context, manager, driver)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
val connection = manager.openDevice(driver.device)
|
||||||
|
if (connection == null) {
|
||||||
|
Log.w(TAG, "openDevice returned null for ${driver.device.deviceName}")
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
val port = driver.ports.firstOrNull()
|
||||||
|
if (port == null) {
|
||||||
|
connection.close()
|
||||||
|
Log.w(TAG, "Driver reports no ports for ${driver.device.deviceName}")
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
port.open(connection)
|
||||||
|
port.setParameters(
|
||||||
|
baud,
|
||||||
|
8,
|
||||||
|
UsbSerialPort.STOPBITS_1,
|
||||||
|
UsbSerialPort.PARITY_NONE,
|
||||||
|
)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Log.e(TAG, "Failed to configure serial port", e)
|
||||||
|
runCatching { port.close() }
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
val handle = handleSeq.getAndIncrement()
|
||||||
|
synchronized(openPorts) { openPorts[handle] = port }
|
||||||
|
Log.i(
|
||||||
|
TAG,
|
||||||
|
"Opened USB serial ${driver.device.deviceName} baud=$baud handle=$handle",
|
||||||
|
)
|
||||||
|
return handle
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Write bytes to the previously-opened handle. Throws if invalid. */
|
||||||
|
@JvmStatic
|
||||||
|
fun write(handle: Int, data: ByteArray) {
|
||||||
|
val port = synchronized(openPorts) { openPorts[handle] }
|
||||||
|
?: throw IllegalStateException("Invalid handle $handle")
|
||||||
|
// 1s write timeout matches the old pyserial `timeout=1` behavior.
|
||||||
|
port.write(data, 1_000)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Close a previously-opened handle. Silently ignores unknown handles. */
|
||||||
|
@JvmStatic
|
||||||
|
fun close(handle: Int) {
|
||||||
|
val port = synchronized(openPorts) { openPorts.remove(handle) } ?: return
|
||||||
|
runCatching { port.close() }
|
||||||
|
.onFailure { Log.w(TAG, "close($handle): ${it.message}") }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun requestPermission(
|
||||||
|
context: Context,
|
||||||
|
manager: UsbManager,
|
||||||
|
driver: UsbSerialDriver,
|
||||||
|
) {
|
||||||
|
val flags =
|
||||||
|
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
|
||||||
|
PendingIntent.FLAG_IMMUTABLE
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
val intent = Intent(ACTION_USB_PERMISSION).apply {
|
||||||
|
setPackage(context.packageName)
|
||||||
|
}
|
||||||
|
val pending = PendingIntent.getBroadcast(context, 0, intent, flags)
|
||||||
|
manager.requestPermission(driver.device, pending)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,6 +11,8 @@ dependencyResolutionManagement {
|
|||||||
repositories {
|
repositories {
|
||||||
google()
|
google()
|
||||||
mavenCentral()
|
mavenCentral()
|
||||||
|
// usb-serial-for-android (mik3y) is distributed via JitPack
|
||||||
|
maven { url = uri("https://jitpack.io") }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,40 +7,21 @@ from typing import Optional, Tuple
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
from ledgrab.core.devices.led_client import DeviceHealth, LEDClient
|
||||||
|
from ledgrab.core.devices.serial_transport import (
|
||||||
|
open_transport,
|
||||||
|
parse_serial_url,
|
||||||
|
port_exists,
|
||||||
|
)
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
DEFAULT_BAUD_RATE = 115200
|
|
||||||
ARDUINO_RESET_DELAY = 2.0 # seconds to wait after opening serial for Arduino bootloader
|
ARDUINO_RESET_DELAY = 2.0 # seconds to wait after opening serial for Arduino bootloader
|
||||||
|
|
||||||
|
|
||||||
def parse_adalight_url(url: str) -> Tuple[str, int]:
|
def parse_adalight_url(url: str) -> Tuple[str, int]:
|
||||||
"""Parse an Adalight URL into (port, baud_rate).
|
"""Backwards-compatible alias for :func:`parse_serial_url`."""
|
||||||
|
return parse_serial_url(url)
|
||||||
Formats:
|
|
||||||
"COM3" -> ("COM3", 115200)
|
|
||||||
"COM3:230400" -> ("COM3", 230400)
|
|
||||||
"/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200)
|
|
||||||
"""
|
|
||||||
url = url.strip()
|
|
||||||
if ":" in url and not url.startswith("/"):
|
|
||||||
# Windows COM port with baud: "COM3:230400"
|
|
||||||
parts = url.rsplit(":", 1)
|
|
||||||
try:
|
|
||||||
baud = int(parts[1])
|
|
||||||
return parts[0], baud
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
elif ":" in url and url.startswith("/"):
|
|
||||||
# Unix path with baud: "/dev/ttyUSB0:230400"
|
|
||||||
parts = url.rsplit(":", 1)
|
|
||||||
try:
|
|
||||||
baud = int(parts[1])
|
|
||||||
return parts[0], baud
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
return url, DEFAULT_BAUD_RATE
|
|
||||||
|
|
||||||
|
|
||||||
def _build_adalight_header(led_count: int) -> bytes:
|
def _build_adalight_header(led_count: int) -> bytes:
|
||||||
@@ -81,13 +62,12 @@ class AdalightClient(LEDClient):
|
|||||||
|
|
||||||
async def connect(self) -> bool:
|
async def connect(self) -> bool:
|
||||||
"""Open serial port and wait for Arduino reset."""
|
"""Open serial port and wait for Arduino reset."""
|
||||||
import serial
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._serial = await asyncio.to_thread(
|
self._serial = open_transport(self._port, baud_rate=self._baud_rate, timeout=1)
|
||||||
serial.Serial, port=self._port, baudrate=self._baud_rate, timeout=1
|
await asyncio.to_thread(self._serial.open)
|
||||||
)
|
# Wait for Arduino to finish bootloader reset (non-blocking).
|
||||||
# Wait for Arduino to finish bootloader reset (non-blocking)
|
# USB-to-TTL adapters without DTR don't reset, but the delay
|
||||||
|
# is harmless on those — keeps the path uniform.
|
||||||
await asyncio.sleep(ARDUINO_RESET_DELAY)
|
await asyncio.sleep(ARDUINO_RESET_DELAY)
|
||||||
self._connected = True
|
self._connected = True
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -122,7 +102,7 @@ class AdalightClient(LEDClient):
|
|||||||
f"led_count={self._led_count}"
|
f"led_count={self._led_count}"
|
||||||
)
|
)
|
||||||
self._connected = False
|
self._connected = False
|
||||||
if self._serial and self._serial.is_open:
|
if self._serial is not None:
|
||||||
try:
|
try:
|
||||||
self._serial.close()
|
self._serial.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -182,18 +162,13 @@ class AdalightClient(LEDClient):
|
|||||||
) -> DeviceHealth:
|
) -> DeviceHealth:
|
||||||
"""Check if the serial port exists without opening it.
|
"""Check if the serial port exists without opening it.
|
||||||
|
|
||||||
Enumerates COM ports to avoid exclusive-access conflicts on Windows.
|
Enumerates COM ports (or USB devices on Android) to avoid
|
||||||
|
exclusive-access conflicts on Windows.
|
||||||
"""
|
"""
|
||||||
port, _baud = parse_adalight_url(url)
|
port, _baud = parse_adalight_url(url)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import serial.tools.list_ports
|
if port_exists(port):
|
||||||
|
|
||||||
available_ports = [p.device for p in serial.tools.list_ports.comports()]
|
|
||||||
port_upper = port.upper()
|
|
||||||
found = any(p.upper() == port_upper for p in available_ports)
|
|
||||||
|
|
||||||
if found:
|
|
||||||
return DeviceHealth(
|
return DeviceHealth(
|
||||||
online=True,
|
online=True,
|
||||||
latency_ms=0.0,
|
latency_ms=0.0,
|
||||||
@@ -202,12 +177,11 @@ class AdalightClient(LEDClient):
|
|||||||
device_version=None,
|
device_version=None,
|
||||||
device_led_count=prev_health.device_led_count if prev_health else None,
|
device_led_count=prev_health.device_led_count if prev_health else None,
|
||||||
)
|
)
|
||||||
else:
|
return DeviceHealth(
|
||||||
return DeviceHealth(
|
online=False,
|
||||||
online=False,
|
last_checked=datetime.now(timezone.utc),
|
||||||
last_checked=datetime.now(timezone.utc),
|
error=f"Serial port {port} not found",
|
||||||
error=f"Serial port {port} not found",
|
)
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return DeviceHealth(
|
return DeviceHealth(
|
||||||
online=False,
|
online=False,
|
||||||
|
|||||||
@@ -0,0 +1,159 @@
|
|||||||
|
"""Android USB-serial transport backed by the Kotlin ``UsbSerialBridge``.
|
||||||
|
|
||||||
|
Calls into Java land through Chaquopy; this module only loads on
|
||||||
|
Android. URL format: ``usb:VID:PID`` or ``usb:VID:PID:serial`` (with an
|
||||||
|
optional ``@baud`` suffix).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
from ledgrab.utils import get_logger
|
||||||
|
from ledgrab.utils.platform import is_android
|
||||||
|
|
||||||
|
from .serial_transport import SerialPortInfo
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _bridge():
|
||||||
|
"""Return the singleton Kotlin ``UsbSerialBridge`` instance, or raise.
|
||||||
|
|
||||||
|
The bridge exposes static methods ``listDevices()``, ``open(...)``,
|
||||||
|
``write(...)``, ``close(...)``, etc. — see ``UsbSerialBridge.kt``.
|
||||||
|
"""
|
||||||
|
if not is_android():
|
||||||
|
raise RuntimeError("AndroidSerialTransport is only usable on Android")
|
||||||
|
try:
|
||||||
|
from java import jclass # type: ignore[import-not-found]
|
||||||
|
except ImportError as e:
|
||||||
|
raise RuntimeError("Chaquopy java interop not available") from e
|
||||||
|
|
||||||
|
return jclass("com.ledgrab.android.UsbSerialBridge").INSTANCE
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class _UsbAddress:
|
||||||
|
vendor_id: int
|
||||||
|
product_id: int
|
||||||
|
serial: Optional[str]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse(cls, device: str) -> "_UsbAddress":
|
||||||
|
if not device.startswith("usb:"):
|
||||||
|
raise ValueError(f"Not a USB device URL: {device!r}")
|
||||||
|
body = device[len("usb:") :]
|
||||||
|
parts = body.split(":")
|
||||||
|
if len(parts) < 2:
|
||||||
|
raise ValueError(
|
||||||
|
f"USB URL must be 'usb:VID:PID' or 'usb:VID:PID:serial' (got {device!r})"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
vid = int(parts[0], 16)
|
||||||
|
pid = int(parts[1], 16)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"VID/PID must be hex: {device!r}") from e
|
||||||
|
serial = parts[2] if len(parts) >= 3 and parts[2] else None
|
||||||
|
return cls(vid, pid, serial)
|
||||||
|
|
||||||
|
|
||||||
|
def _format_url(vid: int, pid: int, serial: Optional[str]) -> str:
|
||||||
|
base = f"usb:{vid:04x}:{pid:04x}"
|
||||||
|
return f"{base}:{serial}" if serial else base
|
||||||
|
|
||||||
|
|
||||||
|
def list_android_usb_devices() -> List[SerialPortInfo]:
|
||||||
|
"""Return USB-serial devices currently attached to the Android host."""
|
||||||
|
try:
|
||||||
|
bridge = _bridge()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("UsbSerialBridge unavailable: %s", e)
|
||||||
|
return []
|
||||||
|
|
||||||
|
out: List[SerialPortInfo] = []
|
||||||
|
try:
|
||||||
|
# listDevices() returns a Java List<String> of "vid:pid:serial:description"
|
||||||
|
# tuples — split here to keep the Kotlin DTO trivial.
|
||||||
|
for entry in bridge.listDevices():
|
||||||
|
entry_s = str(entry)
|
||||||
|
parts = entry_s.split("|", 3)
|
||||||
|
if len(parts) < 4:
|
||||||
|
continue
|
||||||
|
vid_str, pid_str, serial, description = parts
|
||||||
|
try:
|
||||||
|
vid = int(vid_str, 16)
|
||||||
|
pid = int(pid_str, 16)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
url = _format_url(vid, pid, serial or None)
|
||||||
|
out.append(SerialPortInfo(device=url, description=description or url))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("UsbSerialBridge.listDevices failed: %s", e)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
class AndroidSerialTransport:
|
||||||
|
"""Serial transport that pipes writes through the Kotlin USB bridge."""
|
||||||
|
|
||||||
|
def __init__(self, device: str, baud_rate: int) -> None:
|
||||||
|
self._url = device
|
||||||
|
self._addr = _UsbAddress.parse(device)
|
||||||
|
self._baud_rate = baud_rate
|
||||||
|
self._handle: Optional[int] = None # opaque token from the bridge
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_open(self) -> bool:
|
||||||
|
return self._handle is not None
|
||||||
|
|
||||||
|
def open(self) -> None:
|
||||||
|
bridge = _bridge()
|
||||||
|
# open() returns a non-negative int handle on success, or -1 if the
|
||||||
|
# device wasn't found / permission was denied.
|
||||||
|
handle = int(
|
||||||
|
bridge.open(
|
||||||
|
self._addr.vendor_id,
|
||||||
|
self._addr.product_id,
|
||||||
|
self._addr.serial or "",
|
||||||
|
self._baud_rate,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if handle < 0:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Failed to open USB serial device {self._url} "
|
||||||
|
f"(handle={handle}; user may have denied permission)"
|
||||||
|
)
|
||||||
|
self._handle = handle
|
||||||
|
logger.info("Android USB serial opened: %s @ %d baud", self._url, self._baud_rate)
|
||||||
|
|
||||||
|
def write(self, data: bytes) -> None:
|
||||||
|
if self._handle is None:
|
||||||
|
raise RuntimeError(f"USB serial {self._url} is not open")
|
||||||
|
bridge = _bridge()
|
||||||
|
# Chaquopy auto-marshals bytes → Java byte[].
|
||||||
|
bridge.write(self._handle, data)
|
||||||
|
|
||||||
|
def flush(self) -> None:
|
||||||
|
# USB endpoints are unbuffered from this side — nothing to flush.
|
||||||
|
return
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
if self._handle is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
_bridge().close(self._handle)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Error closing USB serial %s: %s", self._url, e)
|
||||||
|
finally:
|
||||||
|
self._handle = None
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"AndroidSerialTransport",
|
||||||
|
"list_android_usb_devices",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _used() -> Tuple[type, ...]: # pragma: no cover — re-export marker
|
||||||
|
return (AndroidSerialTransport,)
|
||||||
@@ -14,6 +14,10 @@ from ledgrab.core.devices.led_client import (
|
|||||||
DiscoveredDevice,
|
DiscoveredDevice,
|
||||||
LEDDeviceProvider,
|
LEDDeviceProvider,
|
||||||
)
|
)
|
||||||
|
from ledgrab.core.devices.serial_transport import (
|
||||||
|
list_serial_ports,
|
||||||
|
port_exists,
|
||||||
|
)
|
||||||
from ledgrab.utils import get_logger
|
from ledgrab.utils import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
@@ -47,15 +51,9 @@ class SerialDeviceProvider(LEDDeviceProvider):
|
|||||||
port, _baud = parse_adalight_url(url)
|
port, _baud = parse_adalight_url(url)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import serial.tools.list_ports
|
if not port_exists(port):
|
||||||
|
available = ", ".join(p.device for p in list_serial_ports()) or "none"
|
||||||
available_ports = [p.device for p in serial.tools.list_ports.comports()]
|
raise ValueError(f"Serial port {port} not found. Available ports: {available}")
|
||||||
port_upper = port.upper()
|
|
||||||
if not any(p.upper() == port_upper for p in available_ports):
|
|
||||||
raise ValueError(
|
|
||||||
f"Serial port {port} not found. "
|
|
||||||
f"Available ports: {', '.join(available_ports) or 'none'}"
|
|
||||||
)
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -67,22 +65,19 @@ class SerialDeviceProvider(LEDDeviceProvider):
|
|||||||
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
|
async def discover(self, timeout: float = 3.0) -> List[DiscoveredDevice]:
|
||||||
"""Discover serial ports that could be LED devices."""
|
"""Discover serial ports that could be LED devices."""
|
||||||
try:
|
try:
|
||||||
import serial.tools.list_ports
|
ports = list_serial_ports()
|
||||||
|
results = [
|
||||||
ports = serial.tools.list_ports.comports()
|
DiscoveredDevice(
|
||||||
results = []
|
name=port_info.description,
|
||||||
for port_info in ports:
|
url=port_info.device,
|
||||||
results.append(
|
device_type=self.device_type,
|
||||||
DiscoveredDevice(
|
ip=port_info.device,
|
||||||
name=port_info.description or port_info.device,
|
mac="",
|
||||||
url=port_info.device,
|
led_count=None,
|
||||||
device_type=self.device_type,
|
version=None,
|
||||||
ip=port_info.device,
|
|
||||||
mac="",
|
|
||||||
led_count=None,
|
|
||||||
version=None,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
for port_info in ports
|
||||||
|
]
|
||||||
logger.info(f"{self.device_type} serial port scan found {len(results)} port(s)")
|
logger.info(f"{self.device_type} serial port scan found {len(results)} port(s)")
|
||||||
return results
|
return results
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -0,0 +1,174 @@
|
|||||||
|
"""Serial transport abstraction for LED clients.
|
||||||
|
|
||||||
|
Wraps the platform-specific way to open a serial line so callers
|
||||||
|
(AdalightClient, etc.) don't import ``pyserial`` directly. The primary
|
||||||
|
motivation is Android: under Chaquopy, ``pyserial`` exists but cannot
|
||||||
|
touch USB ports, so we route writes through a Kotlin
|
||||||
|
``UsbSerialBridge`` instead.
|
||||||
|
|
||||||
|
URL format
|
||||||
|
----------
|
||||||
|
- ``"COM3"`` or ``"COM3:115200"`` — desktop COM port (Windows)
|
||||||
|
- ``"/dev/ttyUSB0"`` or ``"/dev/ttyUSB0:115200"`` — desktop tty
|
||||||
|
- ``"usb:VID:PID"`` or ``"usb:VID:PID:serial@115200"`` — Android USB device
|
||||||
|
|
||||||
|
Selection happens in :func:`open_transport`: anything starting with
|
||||||
|
``usb:`` goes through :class:`AndroidSerialTransport`, everything else
|
||||||
|
through :class:`PySerialTransport`. :func:`list_serial_ports` returns
|
||||||
|
desktop COM ports on desktop, USB devices on Android.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import List, Optional, Protocol
|
||||||
|
|
||||||
|
from ledgrab.utils import get_logger
|
||||||
|
from ledgrab.utils.platform import is_android
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
DEFAULT_BAUD_RATE = 115200
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SerialPortInfo:
|
||||||
|
"""A discovered serial port, regardless of source platform."""
|
||||||
|
|
||||||
|
device: str # URL-like identifier passed back to open_transport()
|
||||||
|
description: str # Human-friendly name for the UI
|
||||||
|
|
||||||
|
|
||||||
|
class SerialTransport(Protocol):
|
||||||
|
"""Minimal serial line — open, write bytes, close. No reads (LED out only)."""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_open(self) -> bool: ...
|
||||||
|
def open(self) -> None: ...
|
||||||
|
def write(self, data: bytes) -> None: ...
|
||||||
|
def flush(self) -> None: ...
|
||||||
|
def close(self) -> None: ...
|
||||||
|
|
||||||
|
|
||||||
|
def parse_serial_url(url: str) -> tuple[str, int]:
|
||||||
|
"""Parse a serial URL into (device, baud_rate).
|
||||||
|
|
||||||
|
Recognized formats::
|
||||||
|
|
||||||
|
"COM3" -> ("COM3", 115200)
|
||||||
|
"COM3:230400" -> ("COM3", 230400)
|
||||||
|
"/dev/ttyUSB0" -> ("/dev/ttyUSB0", 115200)
|
||||||
|
"/dev/ttyUSB0:230400"-> ("/dev/ttyUSB0", 230400)
|
||||||
|
"usb:1a86:7523" -> ("usb:1a86:7523", 115200)
|
||||||
|
"usb:1a86:7523@250k" -> raises (only ints accepted for baud)
|
||||||
|
"usb:1a86:7523@230400" -> ("usb:1a86:7523", 230400)
|
||||||
|
|
||||||
|
USB URLs use ``@`` to separate baud because the colon is already
|
||||||
|
the field separator between vendor and product IDs.
|
||||||
|
"""
|
||||||
|
raw = url.strip()
|
||||||
|
if raw.startswith("usb:"):
|
||||||
|
if "@" in raw:
|
||||||
|
device, _, baud_str = raw.rpartition("@")
|
||||||
|
try:
|
||||||
|
return device, int(baud_str)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return raw, DEFAULT_BAUD_RATE
|
||||||
|
|
||||||
|
# Desktop COM/tty paths use a trailing :BAUD suffix.
|
||||||
|
if ":" in raw:
|
||||||
|
head, _, tail = raw.rpartition(":")
|
||||||
|
try:
|
||||||
|
return head, int(tail)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return raw, DEFAULT_BAUD_RATE
|
||||||
|
|
||||||
|
|
||||||
|
def list_serial_ports() -> List[SerialPortInfo]:
|
||||||
|
"""Enumerate serial ports available on this host (or USB devices on Android)."""
|
||||||
|
if is_android():
|
||||||
|
try:
|
||||||
|
from ledgrab.core.devices.android_serial_transport import (
|
||||||
|
list_android_usb_devices,
|
||||||
|
)
|
||||||
|
|
||||||
|
return list_android_usb_devices()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Android USB enumeration failed: %s", e)
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
import serial.tools.list_ports
|
||||||
|
except ImportError:
|
||||||
|
logger.warning("pyserial not available — serial enumeration disabled")
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [
|
||||||
|
SerialPortInfo(device=p.device, description=p.description or p.device)
|
||||||
|
for p in serial.tools.list_ports.comports()
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def port_exists(device: str) -> bool:
|
||||||
|
"""Return True if a port with this device id is currently visible."""
|
||||||
|
target = device.upper()
|
||||||
|
return any(p.device.upper() == target for p in list_serial_ports())
|
||||||
|
|
||||||
|
|
||||||
|
def open_transport(
|
||||||
|
url: str,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: float = 1.0,
|
||||||
|
) -> SerialTransport:
|
||||||
|
"""Construct an unopened transport for ``url``. Caller invokes ``open()``."""
|
||||||
|
device, parsed_baud = parse_serial_url(url)
|
||||||
|
effective_baud = baud_rate or parsed_baud
|
||||||
|
|
||||||
|
if device.startswith("usb:"):
|
||||||
|
from ledgrab.core.devices.android_serial_transport import (
|
||||||
|
AndroidSerialTransport,
|
||||||
|
)
|
||||||
|
|
||||||
|
return AndroidSerialTransport(device, effective_baud)
|
||||||
|
|
||||||
|
return PySerialTransport(device, effective_baud, timeout)
|
||||||
|
|
||||||
|
|
||||||
|
class PySerialTransport:
|
||||||
|
"""Default serial transport backed by ``pyserial`` on desktop hosts."""
|
||||||
|
|
||||||
|
def __init__(self, device: str, baud_rate: int, timeout: float = 1.0) -> None:
|
||||||
|
self._device = device
|
||||||
|
self._baud_rate = baud_rate
|
||||||
|
self._timeout = timeout
|
||||||
|
self._serial = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_open(self) -> bool:
|
||||||
|
return self._serial is not None and self._serial.is_open
|
||||||
|
|
||||||
|
def open(self) -> None:
|
||||||
|
import serial # imported here so Android (no real pyserial) doesn't fail at import
|
||||||
|
|
||||||
|
self._serial = serial.Serial(
|
||||||
|
port=self._device, baudrate=self._baud_rate, timeout=self._timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
def write(self, data: bytes) -> None:
|
||||||
|
if self._serial is None:
|
||||||
|
raise RuntimeError(f"Serial port {self._device} is not open")
|
||||||
|
self._serial.write(data)
|
||||||
|
|
||||||
|
def flush(self) -> None:
|
||||||
|
if self._serial is not None:
|
||||||
|
self._serial.flush()
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
if self._serial is not None and self._serial.is_open:
|
||||||
|
try:
|
||||||
|
self._serial.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Error closing %s: %s", self._device, e)
|
||||||
|
self._serial = None
|
||||||
@@ -0,0 +1,153 @@
|
|||||||
|
"""Tests for the serial transport abstraction."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ledgrab.core.devices.serial_transport import (
|
||||||
|
DEFAULT_BAUD_RATE,
|
||||||
|
PySerialTransport,
|
||||||
|
SerialPortInfo,
|
||||||
|
list_serial_ports,
|
||||||
|
open_transport,
|
||||||
|
parse_serial_url,
|
||||||
|
port_exists,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── URL parsing ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"url,expected",
|
||||||
|
[
|
||||||
|
("COM3", ("COM3", DEFAULT_BAUD_RATE)),
|
||||||
|
("COM3:230400", ("COM3", 230400)),
|
||||||
|
("/dev/ttyUSB0", ("/dev/ttyUSB0", DEFAULT_BAUD_RATE)),
|
||||||
|
("/dev/ttyUSB0:230400", ("/dev/ttyUSB0", 230400)),
|
||||||
|
# USB URLs use @ for baud since `:` separates VID:PID:serial
|
||||||
|
("usb:1a86:7523", ("usb:1a86:7523", DEFAULT_BAUD_RATE)),
|
||||||
|
("usb:1a86:7523:AB01", ("usb:1a86:7523:AB01", DEFAULT_BAUD_RATE)),
|
||||||
|
("usb:1a86:7523@230400", ("usb:1a86:7523", 230400)),
|
||||||
|
("usb:1a86:7523:AB01@500000", ("usb:1a86:7523:AB01", 500000)),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_parse_serial_url(url, expected):
|
||||||
|
assert parse_serial_url(url) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_serial_url_strips_whitespace():
|
||||||
|
assert parse_serial_url(" COM3 ") == ("COM3", DEFAULT_BAUD_RATE)
|
||||||
|
|
||||||
|
|
||||||
|
# ── PySerialTransport ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_pyserial_transport_open_and_write(monkeypatch):
|
||||||
|
fake_serial_module = MagicMock()
|
||||||
|
fake_handle = MagicMock(is_open=True)
|
||||||
|
fake_serial_module.Serial.return_value = fake_handle
|
||||||
|
|
||||||
|
monkeypatch.setitem(__import__("sys").modules, "serial", fake_serial_module)
|
||||||
|
|
||||||
|
t = PySerialTransport("COM3", 115200)
|
||||||
|
assert not t.is_open
|
||||||
|
t.open()
|
||||||
|
fake_serial_module.Serial.assert_called_once_with(port="COM3", baudrate=115200, timeout=1.0)
|
||||||
|
assert t.is_open
|
||||||
|
|
||||||
|
t.write(b"hello")
|
||||||
|
fake_handle.write.assert_called_once_with(b"hello")
|
||||||
|
|
||||||
|
t.flush()
|
||||||
|
fake_handle.flush.assert_called_once()
|
||||||
|
|
||||||
|
t.close()
|
||||||
|
fake_handle.close.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_pyserial_transport_write_before_open_raises():
|
||||||
|
t = PySerialTransport("COM3", 115200)
|
||||||
|
with pytest.raises(RuntimeError, match="not open"):
|
||||||
|
t.write(b"x")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Factory ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_transport_picks_pyserial_for_com_url():
|
||||||
|
t = open_transport("COM3:230400")
|
||||||
|
assert isinstance(t, PySerialTransport)
|
||||||
|
assert t._device == "COM3"
|
||||||
|
assert t._baud_rate == 230400
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_transport_explicit_baud_overrides_url():
|
||||||
|
t = open_transport("COM3:230400", baud_rate=500000)
|
||||||
|
assert t._baud_rate == 500000
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_transport_picks_android_for_usb_url(monkeypatch):
|
||||||
|
"""usb: URLs should route to the Android transport even off-Android."""
|
||||||
|
# Importing AndroidSerialTransport itself works on any host; only the
|
||||||
|
# bridge call inside .open() fails when not on Android.
|
||||||
|
t = open_transport("usb:1a86:7523@230400")
|
||||||
|
assert type(t).__name__ == "AndroidSerialTransport"
|
||||||
|
assert t._baud_rate == 230400
|
||||||
|
|
||||||
|
|
||||||
|
# ── Discovery ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_serial_ports_uses_pyserial_on_desktop(monkeypatch):
|
||||||
|
monkeypatch.setattr("ledgrab.core.devices.serial_transport.is_android", lambda: False)
|
||||||
|
|
||||||
|
fake_module = MagicMock()
|
||||||
|
fake_module.tools.list_ports.comports.return_value = [
|
||||||
|
MagicMock(device="COM3", description="USB Serial CH340"),
|
||||||
|
MagicMock(device="COM4", description=None),
|
||||||
|
]
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"serial": fake_module,
|
||||||
|
"serial.tools": fake_module.tools,
|
||||||
|
"serial.tools.list_ports": fake_module.tools.list_ports,
|
||||||
|
},
|
||||||
|
):
|
||||||
|
ports = list_serial_ports()
|
||||||
|
|
||||||
|
assert len(ports) == 2
|
||||||
|
assert ports[0] == SerialPortInfo(device="COM3", description="USB Serial CH340")
|
||||||
|
# Falls back to device id when description is None
|
||||||
|
assert ports[1].description == "COM4"
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_serial_ports_routes_to_android_when_on_android(monkeypatch):
|
||||||
|
monkeypatch.setattr("ledgrab.core.devices.serial_transport.is_android", lambda: True)
|
||||||
|
|
||||||
|
fake_devices = [SerialPortInfo(device="usb:1a86:7523", description="CH340 LED")]
|
||||||
|
fake_mod = MagicMock()
|
||||||
|
fake_mod.list_android_usb_devices.return_value = fake_devices
|
||||||
|
monkeypatch.setitem(
|
||||||
|
__import__("sys").modules,
|
||||||
|
"ledgrab.core.devices.android_serial_transport",
|
||||||
|
fake_mod,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert list_serial_ports() == fake_devices
|
||||||
|
|
||||||
|
|
||||||
|
def test_port_exists_is_case_insensitive(monkeypatch):
|
||||||
|
fake_ports = [
|
||||||
|
SerialPortInfo(device="COM3", description="x"),
|
||||||
|
SerialPortInfo(device="/dev/ttyUSB0", description="y"),
|
||||||
|
]
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"ledgrab.core.devices.serial_transport.list_serial_ports", lambda: fake_ports
|
||||||
|
)
|
||||||
|
assert port_exists("com3") is True
|
||||||
|
assert port_exists("/dev/ttyusb0") is True
|
||||||
|
assert port_exists("COM99") is False
|
||||||
Reference in New Issue
Block a user