Compare commits

...

4 Commits

Author SHA1 Message Date
alexei.dolgolyov 68040173c6 Merge feature/android-webcam-capture: Android on-device webcam capture
Camera2 + ImageReader (CameraBridge) -> push RGB frames -> AndroidCameraEngine,
reusing the MediaProjection capture-engine selection path so webcams surface as
selectable displays on Android TV. On-demand camera lifecycle, conditional camera
FGS type (granted-only), single-camera ownership lock. Per-phase + final + security
reviews pass; 14 files; new isolated tests; zero new Python/Gradle deps.
assembleDebug + 1883 pytest + ruff all green.
2026-06-02 13:46:59 +03:00
alexei.dolgolyov 4bf3fe65db feat(android): on-device webcam capture via Camera2 (AndroidCameraEngine)
Add on-device webcam capture to the experimental Android-TV build. Desktop
captures webcams via OpenCV (no Chaquopy/Android wheel); this adds a push-based
AndroidCameraEngine that plugs into the same selection path desktop uses
(capture template engine_type="android_camera" + display_index, HAS_OWN_DISPLAYS).

A Kotlin CameraBridge (Camera2) enumerates cameras and opens them on demand —
only while a capture source is active, driven Python->Kotlin via a guarded jclass
singleton (BleBridge pattern) — converts each frame YUV_420_888->RGB, and pushes
RGB bytes into a module-level queue mirroring mediaprojection_engine.py. Cameras
surface as selectable displays like the desktop OpenCV engine; the data-driven
capture-template UI is unchanged. No new Python deps; no new Gradle deps
(Camera2 is in-platform).

Engine: ENGINE_PRIORITY=0 (never auto-selected over MediaProjection=100; explicit
engine_type only). Single-camera ownership is serialized with a lock + ref-count
(same-camera streams attach, different-camera refused, last release stops),
mirroring the desktop CameraEngine guard.

Permission: CAMERA requested at capture-start, gated on FEATURE_CAMERA_ANY so
camera-less TV boxes never prompt; graceful degradation when denied. The service
is promoted with the camera FGS type (+ FOREGROUND_SERVICE_CAMERA) only when
CAMERA is already granted, so backgrounded capture keeps working without risking
a failed startForeground on camera-less boxes (camera can't ride the
MediaProjection token the way audio playback capture does).

Reviewed via multi-agent adversarial pass (13 findings -> 4 fixed: device leak on
session-failure, multi-stream collision, camera FGS type, i18n key; 9 refuted).

Tests: 18 new desktop-CI tests (no device needed); full suite 1883 passed.
Verified: assembleDebug BUILD SUCCESSFUL, ruff clean.

Docs: ANDROID-REVIEW/android-webcam-capture-plan.md (design), updated
android-missing-functionality.md + README feature table + en/ru/zh locales.
2026-06-02 13:36:23 +03:00
alexei.dolgolyov 34db5de8c3 Merge feature/android-notification-capture: Android on-device notification capture
NotificationListenerService -> push_notification() -> _AndroidBackend path so OS
notifications drive the existing color-strip LED effects on Android TV at app-name
parity. Per-phase + final + security reviews all pass; 11 new isolated tests; zero
new Python deps. Android assembleDebug pending device/CI verification.
2026-06-02 11:47:29 +03:00
alexei.dolgolyov 0be3f833df feat(android): on-device OS notification capture (NotificationListenerService)
Add an Android backend to os_notification_listener.py so notifications on the
experimental Android-TV build drive the existing NotificationColorStripSource
LED effects (flash/pulse/sweep, per-app colors + sounds) at app-name parity
with the Windows/Linux backends.

A Kotlin NotificationListenerService forwards the posting app's display label
across the Chaquopy JNI boundary into a new push-based _AndroidBackend +
module-level push_notification() receiver; the existing color-strip pipeline,
per-app colors/filters, and history endpoint are reused unchanged.

- Python: _AndroidBackend (probed first), push_notification() receiver,
  _LinuxBackend.probe() hardened with is_linux() to exclude Android (which
  also reports platform.system() == "Linux").
- Android: LedGrabNotificationListener NLS — serial single-thread executor,
  full crash isolation around Python.getInstance(), label-only forwarding
  (never notification title/body), ongoing/group-summary/self-package noise
  filtering. Manifest service exported + gated by
  BIND_NOTIFICATION_LISTENER_SERVICE (no new uses-permission).
- UX: prompt-once notification-access + manual "Grant notification access"
  button wired into the D-pad focus chain (computed from visible controls);
  en/ru/zh strings.
- Tests: 11 isolated unit tests — module-global + tmp_path history isolation,
  push routing contract, callback-exception swallowing, None app-name, and a
  desktop-regression lock on backend selection order.
- Docs: README OS-support Android column (notification + audio cells),
  ANDROID-REVIEW status flipped to Implemented.

Zero new Python deps; no build.gradle.kts / Chaquopy pip changes.
2026-06-02 11:47:13 +03:00
21 changed files with 2011 additions and 39 deletions
+47 -19
View File
@@ -45,8 +45,8 @@ Python receiver engine mirroring that pattern.**
| LED transports (network/USB-serial/BLE) | ✅ | ✅ (USB via Android driver, BLE via Android bridge) | No |
| System metrics | psutil | ✅ CPU/RAM/battery/thermal via `/proc`, `/sys` (`AndroidMetricsProvider`) | No |
| **Audio capture** | WASAPI / Sounddevice | ❌ no PortAudio | **Yes** |
| **Notification capture** | WinRT / D-Bus | ❌ listener only Win/Linux | **Yes** |
| Webcam capture | OpenCV | ❌ no OpenCV wheel | Yes (niche) |
| Notification capture | WinRT / D-Bus | ✅ NotificationListenerService → `push_notification()` | No (implemented) |
| Webcam capture | OpenCV | ✅ Camera2 + on-demand bridge (`AndroidCameraEngine`) | No (implemented) |
| GPU monitoring | NVML | ❌ no NVIDIA GPU | Marginal |
| Capture from *another* Android phone | scrcpy/ADB | ❌ | Skip (redundant) |
| Automation: window/process conditions | Windows ctypes | ❌ sandboxed | Partial |
@@ -70,7 +70,7 @@ Python receiver engine mirroring that pattern.**
media and the device's own audio. Root mode (no MediaProjection) → mic-only.
- 📄 **See `android-audio-capture-plan.md`** for the full implementation plan.
### 🔔 Notification capture — **FEASIBLE, HIGH VALUE** (planned)
### 🔔 Notification capture — **IMPLEMENTED** (shipped)
- **Android is the *best* platform for this:** `NotificationListenerService` is the native,
event-push mechanism (no polling).
@@ -82,16 +82,42 @@ Python receiver engine mirroring that pattern.**
- **Permission:** user enables "Notification access" in Settings (`ACTION_NOTIFICATION_LISTENER_SETTINGS`);
no runtime-permission popup.
- **Effort:** moderate. **Value:** high.
- 📄 **Plan approved & detailed** — see `C:\Users\Alexei\.claude\plans\deep-enchanting-muffin.md`
(app-name parity; prompt-once permission UX).
- **Implemented** on branch `feature/android-notification-capture`: a push-based
`_AndroidBackend` + module-level `push_notification()` in `os_notification_listener.py`,
a Kotlin `LedGrabNotificationListener` (NLS), and prompt-once permission UX. App-name
parity — only the resolved app label crosses the JNI boundary, never the notification
title/body. ⚠️ App labels can differ across OSes (Windows `display_name` / Linux D-Bus
`app_name` / Android `getApplicationLabel`), so desktop-configured per-app colors/filters
may need re-matching on Android.
### 📷 Webcam capture — **FEASIBLE, LOW VALUE**
### 📷 Webcam capture — **IMPLEMENTED** ✅ (shipped)
- **Blocker** is `opencv-python-headless` (no Chaquopy cp311 wheel) — but capture doesn't
*need* OpenCV. Use **CameraX / Camera2** + `ImageReader` in Kotlin and push frames through
the same bridge as MediaProjection into a new `CameraBridgeEngine`.
- **Effort:** moderate. **Value:** low — TVs rarely have cameras; USB-UVC webcams need extra
device handling. Recommend deferring unless a concrete use case appears.
- **Blocker** was `opencv-python-headless` (no Chaquopy cp311 wheel) — but capture doesn't
*need* OpenCV. Implemented with **Camera2** + `ImageReader` in Kotlin pushing RGB frames
through the same bridge as MediaProjection into a new `AndroidCameraEngine`.
- **Path:** a Kotlin `CameraBridge` singleton (Camera2) enumerates cameras and **opens the
camera on demand** (only while a capture source is active — driven Python→Kotlin via the
`BleBridge`/`UsbSerialBridge` pattern), converts each frame YUV_420_888→RGB, and pushes it
into a push-based `AndroidCameraEngine` (`core/capture_engines/android_camera_engine.py`)
that mirrors `mediaprojection_engine.py`. Cameras surface as selectable "displays" exactly
like the desktop OpenCV `CameraEngine`; the data-driven capture-template UI (engine list +
`resolution` config + display picker) needs **no changes**. **No new Python deps; no new
Gradle deps** (Camera2 is in-platform).
- **Permission:** `CAMERA` requested at capture-start, gated on `FEATURE_CAMERA_ANY` so
camera-less TV boxes never see the prompt; graceful degradation when denied. The service is
promoted with the `camera` FGS type (+ `FOREGROUND_SERVICE_CAMERA`) **only when CAMERA is
already granted**, so backgrounded capture keeps working without risking a failed service
start on camera-less boxes. (Unlike audio playback capture, the camera can't ride the
MediaProjection token, so it needs its own FGS type to survive backgrounding.)
- **Effort:** moderate. **Value:** low (TVs rarely have cameras), but the implementation reuses
existing infrastructure end-to-end. **Priority `0`** so it's never auto-selected over
MediaProjection — chosen explicitly via `engine_type="android_camera"`.
- ⚠️ **MVP scope / limitations:** webcam capture works **while LedGrab capture is running**
(no camera-only server path on Android); one camera active at a time; `"auto"` picks a
balanced output size (not the sensor max) to keep per-frame YUV→RGB cheap; USB-UVC webcams
appear only if the device routes them through Camera2 (varies by box); no frame-rotation
correction.
- 📄 **See `android-webcam-capture-plan.md`** for the full implementation notes.
### 🎮 GPU monitoring — **MARGINAL, SKIP FOR NOW**
@@ -128,18 +154,20 @@ Python receiver engine mirroring that pattern.**
| Priority | Feature | Effort | Value | New Python deps | Status |
| -------- | ------- | ------ | ----- | --------------- | ------ |
| 1 | Notification capture | Moderate | High | None | **Plan approved** |
| 2 | Audio capture | Moderate | High | None | **Plan written** (this folder) |
| 3 | Automation: foreground-app condition | Moderate | Moderate | None | Idea |
| 4 | Webcam capture (CameraX) | Moderate | Low | None | Idea |
| 1 | Notification capture | Moderate | High | None | **✅ Implemented** |
| 2 | Audio capture | Moderate | High | None | **✅ Implemented** |
| 4 | Webcam capture (Camera2) | Moderate | Low | None | **✅ Implemented** |
| 3 | Automation: foreground-app condition | Moderate | Moderate | None | Idea (only remaining) |
| — | GPU load (vendor sysfs) | LowMed | Low | None | Not recommended |
| — | Capture from another phone | — | — | — | Won't do |
| — | Multi-display / monitor names | Low | Low | None | Not recommended |
**Recommended order:** ship notifications → ship audio → reassess. Both reuse existing
infrastructure (bridge pattern, the MediaProjection consent token, the audio/notification
pipelines) and add **zero** Python dependencies, so neither risks the Chaquopy
`--no-deps` build constraint documented in `CLAUDE.md`.
**Status:** notifications, audio, **and webcam** are all shipped — each reuses existing
infrastructure (bridge pattern, the MediaProjection consent token / process-global
`Python.getInstance()`, the capture/audio/notification pipelines) and adds **zero** Python
dependencies, so none risks the Chaquopy `--no-deps` build constraint documented in
`CLAUDE.md`. The only remaining idea is the **foreground-app automation condition** (moderate
value); GPU load, another-phone capture, and multi-display remain not-recommended / won't-do.
## Cross-cutting notes
@@ -0,0 +1,168 @@
# Plan: Android on-device webcam capture
> Status: **implemented** on branch `feature/android-webcam-capture`. Last updated 2026-06-02.
## Context
LedGrab captures webcams on desktop through OpenCV (`cv2.VideoCapture`) in
`server/src/ledgrab/core/capture_engines/camera_engine.py`. On the **experimental Android-TV
build**, `opencv-python-headless` has no Chaquopy cp311 wheel, so the camera engine never
loads and cameras are unusable on-device.
Android doesn't need OpenCV to capture a camera: the platform exposes **Camera2**
(`android.hardware.camera2`), and the codebase already has the bridge shape to plug a Kotlin
capture source into a push-based Python engine. This feature adds an on-device camera engine
so a USB/integrated camera can drive ambient lighting, at parity with how the desktop OpenCV
camera engine feeds the pipeline.
The design mirrors the working screen-capture bridge
(`mediaprojection_engine.py``ScreenCapture.kt`) and the just-shipped audio engine
(`android_audio_engine.py``AudioCapture.kt`). **No new Python dependencies** (numpy already
bundled) and **no new Gradle dependencies** (Camera2 is in-platform) → no Chaquopy /
`build.gradle.kts` changes.
## Approach
A new **push-based** capture engine registered in the existing `EngineRegistry`, plus a Kotlin
`CameraBridge` that opens the camera **on demand**:
```
[capture source acquired] → AndroidCameraCaptureStream.initialize()
→ android_camera_engine.start_camera(index, w, h) [guarded jclass]
→ CameraBridge.startCamera(index, w, h) [Camera2 open + session]
→ onImageAvailable → YUV_420_888→RGB (stride-aware) → push_frame(rgbBytes, w, h)
→ android_camera_engine [module-level queue] → AndroidCameraCaptureStream.capture_frame()
→ ScreenCaptureLiveStream → processing pipeline [unchanged]
[capture source released] → AndroidCameraCaptureStream.cleanup()
→ android_camera_engine.stop_camera() → CameraBridge.stopCamera() [releases the camera]
```
The camera is **only open while a camera source is active** — the camera-in-use indicator and
battery cost are bounded to actual use, unlike always-on screen/audio capture. This on-demand
control reuses the synchronous Python→Kotlin singleton pattern of `BleBridge`/`UsbSerialBridge`.
## Selection path (why nothing downstream changes)
Webcams on desktop are a `ScreenCapturePictureSource` (`stream_type="raw"`) bound to a capture
template whose `engine_type="camera"` + a `display_index`. `live_stream_manager`
`_create_screen_capture_live_stream` reads `engine_type` from the template and calls
`EngineRegistry.create_stream(engine_type, display_index, config)`. Android adds
`engine_type="android_camera"` — the **same path**. The frontend
(`static/js/features/streams-capture-templates.ts`) is fully data-driven: the engine list,
the `resolution` config dropdown (keyed by field name), and the camera picker
(`/config/displays?engine_type=android_camera`, since `HAS_OWN_DISPLAYS=True`) all work with
no frontend changes.
## Part A — Python (`core/capture_engines/android_camera_engine.py`)
Mirrors `mediaprojection_engine.py` (module-level `queue.Queue` + `push_frame` + `_last_frame`
fallback + drop-oldest) and the desktop `CameraEngine` shape (cameras as displays,
`resolution` config).
- `_camera_bridge()` — lazy, `is_android()`-guarded `from java import jclass;
jclass("com.ledgrab.android.CameraBridge").INSTANCE`. **Never imported at module load** (this
module imports on desktop CI). Mirrors `core/devices/android_ble_transport.py`.
- `list_cameras()` → parses `CameraBridge.listCameras()` JSON into
`[{"index","name","facing"}]`; `_enumerate_cameras()` caches it (30 s TTL).
- `push_frame(rgb_bytes, w, h)` → `np.frombuffer(...uint8)` reshape **`(h, w, 3)`** (RGB, 3
B/px — NOT the RGBA `(h,w,4)` of the screen engine) → `.copy()` → drop-oldest enqueue. A
short/malformed buffer is dropped, never reshape-crashes.
- `start_camera(index, w, h) -> bool` / `stop_camera(index)` → guarded bridge calls.
- `AndroidCameraEngine`: `ENGINE_TYPE="android_camera"`, `ENGINE_PRIORITY=0` (never
auto-selected over MediaProjection=100 — explicit `engine_type` only), `HAS_OWN_DISPLAYS=True`,
`is_available()=is_android() and ≥1 enumerated camera`, `get_config_choices()` exposes
`resolution` (same presets as desktop).
- `AndroidCameraCaptureStream`: `initialize()` parses `resolution` → `start_camera(...)` (raises
if it returns False), drains stale frames; `capture_frame()` pops queue / returns `_last_frame`;
`cleanup()` → `stop_camera(...)`.
Registered in `capture_engines/__init__.py` behind a guarded import (mirrors the
mediaprojection block).
## Part B — Android (`CameraBridge.kt`)
`object CameraBridge` (mirrors `BleBridge`):
- `init(context)` — from `LedGrabApp.onCreate` (context only, no camera opened).
- `listCameras(): String` — JSON array from `CameraManager.cameraIdList` + `LENS_FACING`
(front/back/external). No CAMERA permission needed.
- `startCamera(index, width, height): Boolean` — checks CAMERA permission; resolves cameraId;
picks the supported YUV size closest to the request (balanced default ≤1280×720 for "auto");
opens device + capture session on a private `HandlerThread`, blocking until configured
(`runBlocking { withTimeout { ... } }` over `suspendCancellableCoroutine`-wrapped Camera2
callbacks); sets a repeating preview request. Returns false (no throw across JNI) on
permission/range/configure failure. Closes any prior camera first.
- `onImageAvailable` → paced (≈20 fps) → stride-aware **YUV_420_888→RGB** (BT.601 fixed-point,
reused plane + RGB buffers) → push to the cached `android_camera_engine` module handle.
- `stopCamera()` — stops repeating, closes session/device/reader, idempotent.
## Part C — Wiring + permission + manifest
- `LedGrabApp.kt` — `CameraBridge.init(this)` next to `BleBridge.init`.
- `MainActivity.kt` — `ensureCameraPermission()` (mirror `ensureAudioPermission`): request
`CAMERA` iff `hasSystemFeature(FEATURE_CAMERA_ANY)`; called from both `startCaptureService`
(MediaProjection path) and `startRootCaptureService` (root path). Fire-and-forget.
- `AndroidManifest.xml` — `<uses-permission CAMERA>` + `<uses-feature camera.any required=false>`
+ `<uses-permission FOREGROUND_SERVICE_CAMERA>`, and `camera` added to the `CaptureService`
`foregroundServiceType` union (`mediaProjection|specialUse|camera`).
- `CaptureService.onStartCommand` — on API 34+, OR `FOREGROUND_SERVICE_TYPE_CAMERA` into the
promotion type **only when CAMERA is already granted**. Unlike audio playback capture (which
rides the MediaProjection token under the mediaProjection type), the camera has no such
coupling, so without its own FGS type Android 14+ revokes camera access once the app is
backgrounded. The conditional guard avoids a failed `startForeground` (which would kill the
whole service) on a camera-less / not-yet-granted box. If CAMERA is granted later, the camera
type takes effect on the next Start.
- No `proguard-rules.pro` change — the blanket `-keep class com.ledgrab.android.** { *; }`
already covers `CameraBridge`, and R8/minify is disabled.
## What does NOT change
- **Frontend / API** — data-driven engine list, config, and display picker.
- **`build.gradle.kts` / Chaquopy pip block** — no new Python or Gradle packages.
- **Processing pipeline** — `ScreenCaptureLiveStream`, filters, color-strip sources unchanged.
## Files
**Create**
- `server/src/ledgrab/core/capture_engines/android_camera_engine.py`
- `android/app/src/main/java/com/ledgrab/android/CameraBridge.kt`
- `server/tests/core/test_android_camera_engine.py`
**Modify**
- `server/src/ledgrab/core/capture_engines/__init__.py` — guarded import + registration.
- `android/app/src/main/java/com/ledgrab/android/LedGrabApp.kt` — `CameraBridge.init`.
- `android/app/src/main/java/com/ledgrab/android/MainActivity.kt` — `ensureCameraPermission`.
- `android/app/src/main/AndroidManifest.xml` — `CAMERA` + `camera.any`.
## Tests (Python — desktop CI, no device)
`server/tests/core/test_android_camera_engine.py`: push→capture round-trips RGB `(h,w,3)`;
drop-oldest when full; `_last_frame` fallback on empty; short-buffer never crashes;
`initialize()` opens with parsed/auto resolution and raises on open-failure / off-Android;
`cleanup()` closes once (idempotent); `is_available()` gating (android + cameras); display
enumeration; priority 0 never beats MediaProjection; create-via-registry yields a pushed frame.
## Verification
1. **Python:** `py -3.13 -m pytest tests/core/test_android_camera_engine.py --no-cov -q`, then
the full suite (1880 passed, 2 skipped; 15 new).
2. **Lint:** `ruff check src/ tests/ --fix` — clean.
3. **Android build:** `./gradlew :app:assembleDebug` — BUILD SUCCESSFUL.
4. **On device (manual):** install APK → Start capture → grant CAMERA → create a capture
template with engine `android_camera` + a camera display + a ScreenCapture source bound to
a strip → confirm LEDs react to the camera feed and the camera indicator only lights while
the source is active.
## Risks / notes
- **MVP scope:** webcam works **while LedGrab capture is running** (the Python server only runs
inside `CaptureService`; there is no camera-only start path on Android).
- **One camera at a time:** `startCamera` closes any previously-open camera first.
- **`"auto"` resolution** picks a balanced output size (~720p), not the sensor max, to keep the
per-frame YUV→RGB conversion cheap on low-end TV boxes.
- **USB-UVC webcams** appear only if the device exposes them through Camera2 (`LENS_FACING_EXTERNAL`),
which varies by box; an explicit UVC library would be a separate, larger effort.
- **No frame-rotation correction** — sensor orientation is not applied (ambient color sampling
is largely orientation-tolerant); could be added later.
- **CAMERA denied** → the engine reports no usable camera and capture proceeds without it.
+11 -10
View File
@@ -105,16 +105,17 @@ LedGrab runs as a desktop / server application:
### Feature support by OS
| Feature | Windows | Linux / macOS |
| ------- | ------- | ------------- |
| Screen capture | DXCam, BetterCam, WGC, MSS | MSS |
| Webcam capture | OpenCV (DirectShow) | OpenCV (V4L2) |
| Audio capture | WASAPI, Sounddevice | Sounddevice (PulseAudio/PipeWire) |
| GPU monitoring | NVIDIA (nvidia-ml-py) | NVIDIA (nvidia-ml-py) |
| Capture from Android phone | scrcpy (ADB) | scrcpy (ADB) |
| Notification capture | WinRT | dbus (Linux) |
| Monitor names | Friendly names (WMI) | Generic ("Display 0") |
| Automation: window/process conditions | Supported | Partial |
| Feature | Windows | Linux / macOS | Android TV (experimental) |
| ------- | ------- | ------------- | ------------------------- |
| Screen capture | DXCam, BetterCam, WGC, MSS | MSS | MediaProjection; root `screenrecord` (rooted devices) |
| Webcam capture | OpenCV (DirectShow) | OpenCV (V4L2) | Camera2 (on-demand, while capture is running) |
| Audio capture | WASAPI, Sounddevice | Sounddevice (PulseAudio/PipeWire) | AudioPlaybackCapture (API 29+) |
| GPU monitoring | NVIDIA (nvidia-ml-py) | NVIDIA (nvidia-ml-py) | — (CPU/RAM/battery/thermal via `/proc`) |
| Capture from Android phone | scrcpy (ADB) | scrcpy (ADB) | — (captures its own screen instead) |
| Notification capture | WinRT | dbus (Linux) | NotificationListenerService |
| Monitor names | Friendly names (WMI) | Generic ("Display 0") | Single built-in display |
| LED transports | Network, USB-serial, BLE | Network, USB-serial, BLE | Network, USB-serial (Android driver), BLE (Android bridge) |
| Automation: window/process conditions | Supported | Partial | — |
## Requirements
+45 -1
View File
@@ -35,6 +35,13 @@
<uses-permission android:name="android.permission.FOREGROUND_SERVICE" />
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION" />
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_SPECIAL_USE" />
<!-- FOREGROUND_SERVICE_CAMERA (API 34+): required to keep camera access while
the app is backgrounded during on-device webcam capture. The service is
promoted with the `camera` FGS type ONLY when CAMERA is already granted
(see CaptureService.onStartCommand) — unlike audio playback capture (which
rides the MediaProjection token under the mediaProjection type), the camera
has no such coupling and needs its own FGS type to survive backgrounding. -->
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_CAMERA" />
<!-- POST_NOTIFICATIONS for Android 13+ foreground service notification -->
<uses-permission android:name="android.permission.POST_NOTIFICATIONS" />
@@ -47,6 +54,17 @@
only be required if the mic-fallback path ran inside the service). -->
<uses-permission android:name="android.permission.RECORD_AUDIO" />
<!-- CAMERA for on-device webcam capture (Camera2). Runtime "dangerous"
permission, requested in MainActivity gated on FEATURE_CAMERA_ANY so
camera-less TV boxes never see the prompt; capture degrades gracefully
when denied. The camera is opened ON DEMAND (only while a camera
capture source is active). To keep capturing after the app is
backgrounded, the service is promoted with the `camera` FGS type
(FOREGROUND_SERVICE_CAMERA above) — but only when CAMERA is already
granted, so a camera-less / not-yet-granted box never risks a failed
service start. -->
<uses-permission android:name="android.permission.CAMERA" />
<!-- Autostart on boot — BootReceiver spawns CaptureService in root
mode so capture resumes without the user touching the remote. -->
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
@@ -71,6 +89,15 @@
android:name="android.hardware.usb.host"
android:required="false" />
<!-- Camera hardware — for on-device webcam capture. required=false so
camera-less TV boxes (the common case) still install; the camera
engine simply reports no displays on such devices. camera.any covers
built-in (front/back) and external/USB-UVC cameras the platform
routes through Camera2. -->
<uses-feature
android:name="android.hardware.camera.any"
android:required="false" />
<application
android:name=".LedGrabApp"
android:allowBackup="false"
@@ -103,13 +130,30 @@
PROPERTY_SPECIAL_USE_FGS_SUBTYPE rationale below. -->
<service
android:name=".CaptureService"
android:foregroundServiceType="mediaProjection|specialUse"
android:foregroundServiceType="mediaProjection|specialUse|camera"
android:exported="false">
<property
android:name="android.app.PROPERTY_SPECIAL_USE_FGS_SUBTYPE"
android:value="Root-mode screen capture for ambient LED sync. Uses /system/bin/screenrecord on rooted devices to avoid MediaProjection's persistent capture indicator overlay, which is required for the always-on ambient-lighting use case." />
</service>
<!-- Notification capture — a NotificationListenerService bound by
system_server. exported="true" is REQUIRED here (the system binds
it cross-process) and intentionally diverges from CaptureService
(exported="false"); access is gated by the system-held
BIND_NOTIFICATION_LISTENER_SERVICE permission, so no new
<uses-permission> is needed. The user grants access via
Settings > Notification access (opened from MainActivity). -->
<service
android:name=".LedGrabNotificationListener"
android:label="@string/notification_listener_label"
android:exported="true"
android:permission="android.permission.BIND_NOTIFICATION_LISTENER_SERVICE">
<intent-filter>
<action android:name="android.service.notification.NotificationListenerService" />
</intent-filter>
</service>
<!-- Autostart — fires on device boot (and package replace).
On rooted devices, launches CaptureService directly so capture
resumes without the user tapping Start. Unrooted devices are
@@ -0,0 +1,411 @@
package com.ledgrab.android
import android.Manifest
import android.annotation.SuppressLint
import android.content.Context
import android.content.pm.PackageManager
import android.graphics.ImageFormat
import android.hardware.camera2.CameraCaptureSession
import android.hardware.camera2.CameraCharacteristics
import android.hardware.camera2.CameraDevice
import android.hardware.camera2.CameraManager
import android.media.Image
import android.media.ImageReader
import android.os.Handler
import android.os.HandlerThread
import android.os.SystemClock
import android.util.Log
import android.util.Size
import android.view.Surface
import com.chaquo.python.PyObject
import com.chaquo.python.Python
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeout
import org.json.JSONArray
import org.json.JSONObject
/**
* Android camera bridge exposed to the Python server via Chaquopy.
*
* Wraps the Camera2 API into synchronous, blocking calls that can be
* invoked from a Python thread (Chaquopy proxy threads are real OS
* threads). The physical camera is opened **on demand** — Python's
* `android_camera_engine` calls [startCamera] when a capture stream
* initializes and [stopCamera] when it cleans up, so the camera-in-use
* indicator and battery cost are limited to actual use.
*
* Each captured frame is converted YUV_420_888 → RGB and pushed to the
* Python engine's `push_frame`, mirroring how [ScreenCapture] feeds
* `mediaprojection_engine`. Camera2 callbacks run on a private
* [HandlerThread] so they never touch the main looper.
*
* Python callers access the singleton via
* `jclass("com.ledgrab.android.CameraBridge").INSTANCE` — see
* `server/src/ledgrab/core/capture_engines/android_camera_engine.py`.
*/
object CameraBridge {
private const val TAG = "CameraBridge"
private const val ENGINE_MODULE = "ledgrab.core.capture_engines.android_camera_engine"
private const val OPEN_TIMEOUT_MS = 8_000L
private const val MAX_IMAGES = 2
private const val TARGET_FPS = 20
// "auto" capture size — balanced for ambient LED sampling (the LED
// pipeline downscales anyway), kept modest so the per-frame YUV→RGB
// conversion stays cheap on low-end TV boxes.
private const val DEFAULT_W = 1280
private const val DEFAULT_H = 720
private const val BYTES_PER_RGB = 3
@Volatile private var appContext: Context? = null
// Dedicated looper thread so Camera2 callbacks don't land on main.
private val camThread = HandlerThread("LedGrab-Camera").also { it.start() }
private val camHandler = Handler(camThread.looper)
// Active session state — guarded by [lock]. One camera at a time.
private val lock = Any()
private var cameraDevice: CameraDevice? = null
private var captureSession: CameraCaptureSession? = null
private var imageReader: ImageReader? = null
@Volatile private var running = false
private var activeIndex = -1
// Cached Python engine module handle for the per-frame push fast path.
@Volatile private var engineModule: PyObject? = null
// Reusable conversion buffers — sized once per session (output size is
// fixed for the session), reused to avoid per-frame GC churn on TV boxes.
private var rgbBuffer: ByteArray? = null
private var yBuf: ByteArray? = null
private var uBuf: ByteArray? = null
private var vBuf: ByteArray? = null
// Monotonic frame pacing (mirrors ScreenCapture's accumulator).
private val frameIntervalNanos = 1_000_000_000L / TARGET_FPS.coerceAtLeast(1)
private var nextFrameNanos = 0L
/** Called once from [LedGrabApp.onCreate] to bind the application context. */
@JvmStatic
fun init(context: Context) {
appContext = context.applicationContext
}
/**
* Enumerate cameras as a JSON array string the Python engine parses:
* `[{"index":0,"name":"Back camera","facing":"back","cameraId":"0"}, ...]`
*
* Indices are stable (positional in [CameraManager.cameraIdList]) so
* Python's `display_index` maps 1:1 to [startCamera]'s `index`.
* Enumeration needs no CAMERA permission. Returns `[]` on any error.
*/
@JvmStatic
fun listCameras(): String {
val arr = JSONArray()
val ctx = appContext
if (ctx == null) {
Log.w(TAG, "listCameras: context not bound (init not called)")
return arr.toString()
}
try {
val mgr = ctx.getSystemService(Context.CAMERA_SERVICE) as CameraManager
mgr.cameraIdList.forEachIndexed { idx, id ->
val facing = facingOf(mgr, id)
val name = when (facing) {
"front" -> "Front camera"
"back" -> "Back camera"
"external" -> "External camera $idx"
else -> "Camera $idx"
}
arr.put(
JSONObject()
.put("index", idx)
.put("name", name)
.put("facing", facing)
.put("cameraId", id),
)
}
} catch (e: Exception) {
Log.w(TAG, "listCameras failed: ${e.message}")
}
return arr.toString()
}
/**
* Open camera [index] and start streaming RGB frames to Python.
* Blocks until the capture session is configured (or fails/times out).
*
* Returns false — without throwing across the JNI boundary — when the
* CAMERA permission is missing, the index is out of range, or the
* device/session fails to configure. Closes any previously-open camera
* first (one active at a time).
*/
@SuppressLint("MissingPermission")
@JvmStatic
fun startCamera(index: Int, width: Int, height: Int): Boolean {
synchronized(lock) {
closeLocked()
val ctx = appContext ?: run {
Log.w(TAG, "startCamera: context not bound")
return false
}
if (ctx.checkSelfPermission(Manifest.permission.CAMERA)
!= PackageManager.PERMISSION_GRANTED
) {
Log.w(TAG, "startCamera: CAMERA permission not granted")
return false
}
val mgr = ctx.getSystemService(Context.CAMERA_SERVICE) as CameraManager
val ids = try {
mgr.cameraIdList
} catch (e: Exception) {
Log.w(TAG, "startCamera: cameraIdList failed: ${e.message}")
return false
}
if (index < 0 || index >= ids.size) {
Log.w(TAG, "startCamera: index $index out of range (${ids.size} cameras)")
return false
}
val cameraId = ids[index]
val size = chooseSize(mgr, cameraId, width, height) ?: run {
Log.w(TAG, "startCamera: no YUV output sizes for camera $index")
return false
}
val reader = ImageReader.newInstance(
size.width, size.height, ImageFormat.YUV_420_888, MAX_IMAGES,
)
// Size the conversion buffers once for this session.
rgbBuffer = ByteArray(size.width * size.height * BYTES_PER_RGB)
yBuf = null; uBuf = null; vBuf = null
nextFrameNanos = SystemClock.elapsedRealtimeNanos()
reader.setOnImageAvailableListener({ r -> onFrame(r) }, camHandler)
return try {
runBlocking {
withTimeout(OPEN_TIMEOUT_MS) {
// Publish each resource to its field as soon as it exists so
// closeLocked() (in the catch) can release it if a LATER step
// throws. Assigning only after setRepeatingRequest succeeds
// would orphan the opened CameraDevice on a createSession /
// setRepeatingRequest failure (camera stuck on; subsequent
// opens fail with CAMERA_IN_USE).
imageReader = reader
val device = openCamera(mgr, cameraId)
cameraDevice = device
val session = createSession(device, reader.surface)
captureSession = session
val request = device.createCaptureRequest(CameraDevice.TEMPLATE_PREVIEW)
.apply { addTarget(reader.surface) }
.build()
session.setRepeatingRequest(request, null, camHandler)
activeIndex = index
running = true
Log.i(TAG, "Camera $index opened (${size.width}x${size.height} @ ${TARGET_FPS}fps)")
true
}
}
} catch (e: Exception) {
Log.e(TAG, "startCamera($index) failed: ${e.message}")
// imageReader/cameraDevice/captureSession are now whatever got
// assigned before the failure — closeLocked releases each exactly
// once (idempotent, runCatching-wrapped).
closeLocked()
false
}
}
}
/** Stop streaming and release the camera. Idempotent; safe if not started. */
@JvmStatic
fun stopCamera() {
synchronized(lock) { closeLocked() }
Log.i(TAG, "Camera stopped")
}
// ── internals ────────────────────────────────────────────────────────
private fun facingOf(mgr: CameraManager, id: String): String =
when (mgr.getCameraCharacteristics(id).get(CameraCharacteristics.LENS_FACING)) {
CameraCharacteristics.LENS_FACING_FRONT -> "front"
CameraCharacteristics.LENS_FACING_BACK -> "back"
CameraCharacteristics.LENS_FACING_EXTERNAL -> "external"
else -> "unknown"
}
/** Pick the supported YUV size closest in area to the request (or the
* balanced default for `auto`/0). */
private fun chooseSize(mgr: CameraManager, cameraId: String, reqW: Int, reqH: Int): Size? {
val map = mgr.getCameraCharacteristics(cameraId)
.get(CameraCharacteristics.SCALER_STREAM_CONFIGURATION_MAP) ?: return null
val sizes = map.getOutputSizes(ImageFormat.YUV_420_888)
if (sizes == null || sizes.isEmpty()) return null
val targetArea = (if (reqW > 0) reqW else DEFAULT_W).toLong() *
(if (reqH > 0) reqH else DEFAULT_H)
return sizes.minByOrNull { kotlin.math.abs(it.width.toLong() * it.height - targetArea) }
}
@SuppressLint("MissingPermission")
private suspend fun openCamera(mgr: CameraManager, cameraId: String): CameraDevice =
suspendCancellableCoroutine { cont ->
mgr.openCamera(cameraId, object : CameraDevice.StateCallback() {
override fun onOpened(device: CameraDevice) {
if (cont.isActive) cont.resume(device) else device.close()
}
override fun onDisconnected(device: CameraDevice) {
device.close()
if (cont.isActive) cont.resumeWithException(IllegalStateException("camera disconnected"))
}
override fun onError(device: CameraDevice, error: Int) {
device.close()
if (cont.isActive) cont.resumeWithException(IllegalStateException("camera error $error"))
}
}, camHandler)
}
@Suppress("DEPRECATION")
private suspend fun createSession(device: CameraDevice, surface: Surface): CameraCaptureSession =
suspendCancellableCoroutine { cont ->
// createCaptureSession(List, callback, handler) is deprecated at
// API 30 but is the correct API down to minSdk 24 (the
// SessionConfiguration overload is API 28+).
device.createCaptureSession(
listOf(surface),
object : CameraCaptureSession.StateCallback() {
override fun onConfigured(session: CameraCaptureSession) {
if (cont.isActive) cont.resume(session)
}
override fun onConfigureFailed(session: CameraCaptureSession) {
if (cont.isActive) cont.resumeWithException(IllegalStateException("session configure failed"))
}
},
camHandler,
)
}
/** ImageReader callback — paced, converts YUV→RGB, pushes to Python. */
private fun onFrame(reader: ImageReader) {
if (!running) {
runCatching { reader.acquireLatestImage()?.close() }
return
}
val now = SystemClock.elapsedRealtimeNanos()
if (now < nextFrameNanos) {
runCatching { reader.acquireLatestImage()?.close() }
return
}
val image = runCatching { reader.acquireLatestImage() }.getOrNull() ?: return
try {
val w = image.width
val h = image.height
val out = ensureRgbBuffer(w * h * BYTES_PER_RGB)
yuv420ToRgb(image, out, w, h)
pushFrame(out, w, h)
nextFrameNanos += frameIntervalNanos
if (now - nextFrameNanos > frameIntervalNanos * 4) {
nextFrameNanos = now + frameIntervalNanos
}
} catch (e: Exception) {
Log.w(TAG, "frame processing error: ${e.message}")
} finally {
runCatching { image.close() }
}
}
private fun ensureRgbBuffer(size: Int): ByteArray {
val buf = rgbBuffer
if (buf != null && buf.size == size) return buf
return ByteArray(size).also { rgbBuffer = it }
}
/**
* Stride-aware YUV_420_888 → packed RGB (3 bytes/px) using BT.601
* fixed-point coefficients. Handles both planar and semi-planar
* (NV21-like, pixelStride 2) chroma layouts via the plane strides.
*/
private fun yuv420ToRgb(image: Image, out: ByteArray, width: Int, height: Int) {
val planes = image.planes
val yPlane = planes[0]
val uPlane = planes[1]
val vPlane = planes[2]
val yRowStride = yPlane.rowStride
val yPixStride = yPlane.pixelStride
val uRowStride = uPlane.rowStride
val uPixStride = uPlane.pixelStride
val vRowStride = vPlane.rowStride
val vPixStride = vPlane.pixelStride
// Copy each plane to a reusable array for fast indexed access
// (ByteBuffer absolute-get per pixel is far slower).
val yByteBuf = yPlane.buffer
val uByteBuf = uPlane.buffer
val vByteBuf = vPlane.buffer
val yArr = ensurePlane(yBuf, yByteBuf.remaining()).also { yBuf = it }
val uArr = ensurePlane(uBuf, uByteBuf.remaining()).also { uBuf = it }
val vArr = ensurePlane(vBuf, vByteBuf.remaining()).also { vBuf = it }
yByteBuf.get(yArr, 0, yArr.size)
uByteBuf.get(uArr, 0, uArr.size)
vByteBuf.get(vArr, 0, vArr.size)
var o = 0
for (row in 0 until height) {
val yRowBase = row * yRowStride
val uvRow = row shr 1
val uRowBase = uvRow * uRowStride
val vRowBase = uvRow * vRowStride
for (col in 0 until width) {
val y = (yArr[yRowBase + col * yPixStride].toInt() and 0xFF)
val uvCol = col shr 1
val u = (uArr[uRowBase + uvCol * uPixStride].toInt() and 0xFF) - 128
val v = (vArr[vRowBase + uvCol * vPixStride].toInt() and 0xFF) - 128
// BT.601 full-range, fixed-point (<<16).
var r = y + ((91881 * v) shr 16)
var g = y - ((22554 * u + 46802 * v) shr 16)
var b = y + ((116130 * u) shr 16)
if (r < 0) r = 0 else if (r > 255) r = 255
if (g < 0) g = 0 else if (g > 255) g = 255
if (b < 0) b = 0 else if (b > 255) b = 255
out[o++] = r.toByte()
out[o++] = g.toByte()
out[o++] = b.toByte()
}
}
}
/** Return [cached] if it already fits [n] bytes, else a fresh array. */
private fun ensurePlane(cached: ByteArray?, n: Int): ByteArray =
if (cached != null && cached.size == n) cached else ByteArray(n)
private fun pushFrame(rgb: ByteArray, width: Int, height: Int) {
val module = engineModule ?: runCatching {
Python.getInstance().getModule(ENGINE_MODULE)
}.getOrNull()?.also { engineModule = it } ?: return
try {
module.callAttr("push_frame", rgb, width, height)
} catch (e: Exception) {
Log.w(TAG, "push_frame failed: ${e.message}")
}
}
/** Tear down the active session. Caller holds [lock]. */
private fun closeLocked() {
running = false
activeIndex = -1
runCatching { imageReader?.setOnImageAvailableListener(null, null) }
runCatching { captureSession?.stopRepeating() }
runCatching { captureSession?.close() }
captureSession = null
runCatching { cameraDevice?.close() }
cameraDevice = null
runCatching { imageReader?.close() }
imageReader = null
}
}
@@ -113,11 +113,25 @@ class CaptureService : Service() {
val url = "http://$localIp:$SERVER_PORT"
try {
val type = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
if (useRoot) {
var t = if (useRoot) {
ServiceInfo.FOREGROUND_SERVICE_TYPE_SPECIAL_USE
} else {
ServiceInfo.FOREGROUND_SERVICE_TYPE_MEDIA_PROJECTION
}
// On-demand webcam capture opens the camera from this service.
// To retain camera access once the app is backgrounded (the
// always-on ambient-lighting case), API 34+ requires the camera
// FGS type. Add it ONLY when CAMERA is already granted — promoting
// with the camera type without the runtime permission throws and
// would kill the whole service on the (common) camera-less or
// not-yet-granted box. If CAMERA is granted later, it takes effect
// on the next Start (matches the audio/permission UX).
if (checkSelfPermission(Manifest.permission.CAMERA) ==
PackageManager.PERMISSION_GRANTED
) {
t = t or ServiceInfo.FOREGROUND_SERVICE_TYPE_CAMERA
}
t
} else {
0
}
@@ -51,6 +51,9 @@ class LedGrabApp : Application() {
// Bind application context for the BLE bridge so Python can
// scan and connect to BLE LED controllers.
BleBridge.init(this)
// Bind application context for the camera bridge so Python can
// enumerate cameras and open them on demand (webcam capture).
CameraBridge.init(this)
// Pre-warm the API key on a background thread. First-launch
// generation does a SharedPreferences.commit() (synchronous
@@ -0,0 +1,97 @@
package com.ledgrab.android
import android.app.Notification
import android.service.notification.NotificationListenerService
import android.service.notification.StatusBarNotification
import android.util.Log
import com.chaquo.python.Python
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
/**
* Captures posted OS notifications and forwards the posting app's display
* label to the Python notification pipeline, where the existing
* `NotificationColorStripSource` fires its one-shot LED effect.
*
* Direction is Kotlin -> Python via the process-global Chaquopy instance
* (NOT a per-[CaptureService] [PythonBridge]): `system_server` binds this
* service independently of [CaptureService], so it resolves Python itself.
* The Python receiver (`os_notification_listener.push_notification`) is a
* no-op whenever the server/listener isn't running, so a notification
* arriving before — or after — a capture session is safely ignored.
*/
class LedGrabNotificationListener : NotificationListenerService() {
// Serial executor: the Python receiver does a (non-concurrency-safe) history
// disk write and may play a sound, so pushes must not overlap. Off the main
// looper to keep the system service responsive.
private val pushExecutor = Executors.newSingleThreadExecutor()
// packageName -> resolved human-readable label. Matches the app_name the
// Windows/Linux backends pass, so per-app colors/filters keep working.
// Naturally bounded by the number of notification-posting apps (tens) and
// cleared with the process — no eviction needed.
private val labelCache = ConcurrentHashMap<String, String>()
override fun onNotificationPosted(sbn: StatusBarNotification?) {
val notification = sbn ?: return
// The Python server (and thus the listener) only exists during a capture
// session. isRunning is a coarse early-out — the authoritative gate is the
// Python receiver's None-check — but it avoids needless JNI churn here.
if (!CaptureService.isRunning) return
// Filter notifications that should never drive an effect:
// - ongoing (media transport, downloads): not user-facing "alerts"
// - group summaries: duplicate their child notifications
// - our own foreground-service notification: would self-trigger
if (notification.isOngoing) return
if ((notification.notification.flags and Notification.FLAG_GROUP_SUMMARY) != 0) return
if (notification.packageName == packageName) return
val label = resolveAppLabel(notification.packageName)
pushExecutor.execute {
try {
Python.getInstance()
.getModule(PY_MODULE)
.callAttr("push_notification", label)
} catch (t: Throwable) {
// Never crash a system-bound service. Python.getInstance() throws
// IllegalStateException if Python.start() hasn't run (e.g. the
// service was bound at boot before the app process initialized).
// Log at debug — the label is potentially sensitive on a shared TV.
Log.d(TAG, "push_notification failed: ${t.message}")
}
}
}
/** Resolve (and cache) a package's human-readable label; fall back to the package name. */
private fun resolveAppLabel(pkg: String): String {
labelCache[pkg]?.let { return it }
val resolved = runCatching {
val info = packageManager.getApplicationInfo(pkg, 0)
packageManager.getApplicationLabel(info).toString()
}.getOrDefault(pkg)
labelCache[pkg] = resolved
return resolved
}
override fun onListenerConnected() {
Log.i(TAG, "Notification listener connected")
}
override fun onListenerDisconnected() {
Log.i(TAG, "Notification listener disconnected")
}
override fun onDestroy() {
pushExecutor.shutdown()
super.onDestroy()
}
companion object {
private const val TAG = "LedGrabNotifListener"
private const val PY_MODULE = "ledgrab.core.processing.os_notification_listener"
}
}
@@ -24,6 +24,7 @@ import android.widget.ImageView
import android.widget.LinearLayout
import android.widget.ScrollView
import android.widget.TextView
import androidx.core.app.NotificationManagerCompat
import androidx.core.content.ContextCompat
import androidx.core.splashscreen.SplashScreen.Companion.installSplashScreen
import com.google.zxing.BarcodeFormat
@@ -54,7 +55,10 @@ class MainActivity : Activity() {
private const val REQUEST_MEDIA_PROJECTION = 1001
private const val REQUEST_POST_NOTIFICATIONS = 1002
private const val REQUEST_RECORD_AUDIO = 1003
private const val REQUEST_CAMERA = 1004
private const val QR_SIZE_PX = 560
private const val NOTIF_PREFS = "ledgrab_notif"
private const val KEY_NOTIF_ACCESS_PROMPTED = "notif_access_prompted"
}
// Stopped-state views (always inflated).
@@ -64,6 +68,7 @@ class MainActivity : Activity() {
private lateinit var versionText: TextView
private lateinit var autostartCheck: CheckBox
private lateinit var autostartPrefs: AutostartPrefs
private lateinit var grantNotificationButton: Button
// Running-state views (lazy-inflated via ViewStub).
private lateinit var runningPanelStub: ViewStub
@@ -107,6 +112,7 @@ class MainActivity : Activity() {
toggleButton = findViewById(R.id.toggle_button)
versionText = findViewById(R.id.version_text)
autostartCheck = findViewById(R.id.autostart_check)
grantNotificationButton = findViewById(R.id.grant_notification_button)
val versionName = packageManager.getPackageInfo(packageName, 0).versionName
versionText.text = getString(R.string.version_prefix, versionName ?: "?")
@@ -127,8 +133,10 @@ class MainActivity : Activity() {
autostartCheck.visibility = View.GONE
}
grantNotificationButton.setOnClickListener { openNotificationListenerSettings() }
toggleButton.setOnClickListener { startCapture() }
updateNotificationAccessUi()
updateUI()
}
@@ -149,12 +157,16 @@ class MainActivity : Activity() {
override fun onResume() {
super.onResume()
if (!::stoppedPanel.isInitialized) return
// Restart the pulse if we returned to the foreground while the
// service is still running. The running panel's view may have
// been recreated; ensureRunningPanelInflated already keys off
// the field reference.
if (CaptureService.isRunning && ::stoppedPanel.isInitialized) {
// service is still running. The running panel's view may have been
// recreated; ensureRunningPanelInflated already keys off the field
// reference. When stopped, refresh the notification-access button —
// the user may have just granted/revoked access in Settings.
if (CaptureService.isRunning) {
updateUI()
} else {
updateNotificationAccessUi()
}
}
@@ -197,6 +209,8 @@ class MainActivity : Activity() {
private fun startRootCaptureService() {
ensureNotificationPermission()
ensureNotificationListenerAccess()
ensureCameraPermission()
ContextCompat.startForegroundService(this, CaptureService.createRootIntent(this))
updateUI()
}
@@ -216,7 +230,9 @@ class MainActivity : Activity() {
private fun startCaptureService(resultCode: Int, resultData: Intent) {
ensureNotificationPermission()
ensureNotificationListenerAccess()
ensureAudioPermission()
ensureCameraPermission()
val intent = CaptureService.createIntent(this, resultCode, resultData)
ContextCompat.startForegroundService(this, intent)
updateUI()
@@ -493,4 +509,83 @@ class MainActivity : Activity() {
)
}
}
/**
* Request CAMERA so the capture service can open the device camera for
* on-device webcam capture. Fire-and-forget, like [ensureAudioPermission]:
* capture still works without it (just no camera engine), so we don't block
* on the result. Gated on actual camera hardware via FEATURE_CAMERA_ANY so
* camera-less TV boxes (the common case) never see the prompt. The camera
* is opened on demand only while a camera source is active — granting this
* does not keep the camera on. If first granted here, the camera engine
* becomes available on the next Start.
*/
private fun ensureCameraPermission() {
if (!packageManager.hasSystemFeature(PackageManager.FEATURE_CAMERA_ANY)) return
if (checkSelfPermission(Manifest.permission.CAMERA)
!= PackageManager.PERMISSION_GRANTED
) {
@Suppress("DEPRECATION")
requestPermissions(
arrayOf(Manifest.permission.CAMERA),
REQUEST_CAMERA,
)
}
}
/** Whether the user has granted notification-listener access to this app. */
private fun isNotificationAccessGranted(): Boolean =
NotificationManagerCompat.getEnabledListenerPackages(this).contains(packageName)
/** Open the system Notification-access screen (manual affordance / re-grant). */
private fun openNotificationListenerSettings() {
runCatching {
startActivity(Intent(Settings.ACTION_NOTIFICATION_LISTENER_SETTINGS))
}.onFailure { Log.w(TAG, "Notification-access settings unavailable: ${it.message}") }
}
/**
* Prompt-once-then-remember: the first time capture starts without
* notification-listener access, open the settings screen so the user can
* grant it — then never nag again (the manual "Grant notification access"
* button stays available). Fire-and-forget like [ensureNotificationPermission].
*/
private fun ensureNotificationListenerAccess() {
if (isNotificationAccessGranted()) return
val prefs = getSharedPreferences(NOTIF_PREFS, MODE_PRIVATE)
if (prefs.getBoolean(KEY_NOTIF_ACCESS_PROMPTED, false)) return
prefs.edit().putBoolean(KEY_NOTIF_ACCESS_PROMPTED, true).apply()
openNotificationListenerSettings()
}
/**
* Show the "Grant notification access" button only while access is missing,
* then re-wire the D-pad focus chain. Called on create and on resume
* (access can change in Settings while we're backgrounded).
*/
private fun updateNotificationAccessUi() {
if (!::grantNotificationButton.isInitialized) return
grantNotificationButton.visibility =
if (isNotificationAccessGranted()) View.GONE else View.VISIBLE
wireStoppedFocusChain()
}
/**
* Link the visible stopped-panel controls into a single up/down D-pad chain.
* Both optional controls (the grant-access button and the root-only autostart
* checkbox) may be GONE, so the chain is computed from whatever is visible —
* a static nextFocus pointing at a GONE view would strand the focus on a TV
* remote.
*/
private fun wireStoppedFocusChain() {
val chain = listOfNotNull(
toggleButton,
grantNotificationButton.takeIf { it.visibility == View.VISIBLE },
autostartCheck.takeIf { it.visibility == View.VISIBLE },
)
chain.forEachIndexed { i, view ->
view.nextFocusUpId = (chain.getOrNull(i - 1) ?: view).id
view.nextFocusDownId = (chain.getOrNull(i + 1) ?: view).id
}
}
}
@@ -66,6 +66,21 @@
android:focusableInTouchMode="true"
android:nextFocusDown="@+id/autostart_check" />
<!-- Shown only while notification-listener access is missing. The D-pad
focus chain is wired at runtime (wireStoppedFocusChain) because this
button and the autostart checkbox are both conditionally visible. -->
<Button
android:id="@+id/grant_notification_button"
style="@style/Widget.LedGrab.Button.Secondary"
android:layout_width="320dp"
android:layout_height="56dp"
android:layout_marginTop="20dp"
android:text="@string/btn_grant_notification_access"
android:textSize="18sp"
android:focusable="true"
android:focusableInTouchMode="true"
android:visibility="gone" />
<CheckBox
android:id="@+id/autostart_check"
android:layout_width="wrap_content"
@@ -25,4 +25,6 @@
<string name="notification_channel_description">Отображается, пока LedGrab захватывает экран.</string>
<string name="notification_title">LedGrab работает</string>
<string name="notification_text">Веб-интерфейс: %1$s</string>
<string name="notification_listener_label">Захват уведомлений LedGrab</string>
<string name="btn_grant_notification_access">Разрешить доступ к уведомлениям</string>
</resources>
@@ -25,4 +25,6 @@
<string name="notification_channel_description">LedGrab 捕获屏幕时显示。</string>
<string name="notification_title">LedGrab 运行中</string>
<string name="notification_text">Web界面:%1$s</string>
<string name="notification_listener_label">LedGrab 通知捕获</string>
<string name="btn_grant_notification_access">授予通知访问权限</string>
</resources>
@@ -25,4 +25,6 @@
<string name="notification_channel_description">Shows while LedGrab is capturing the screen.</string>
<string name="notification_title">LedGrab Running</string>
<string name="notification_text">Web UI: %1$s</string>
<string name="notification_listener_label">LedGrab notification capture</string>
<string name="btn_grant_notification_access">Grant notification access</string>
</resources>
@@ -86,6 +86,18 @@ try:
except ImportError:
_has_mediaprojection = False
# ── Android camera/webcam (Camera2 via Chaquopy bridge) ─────────────
try:
from ledgrab.core.capture_engines.android_camera_engine import (
AndroidCameraEngine,
AndroidCameraCaptureStream,
)
_has_android_camera = True
except ImportError:
_has_android_camera = False
# ── Android root screenrecord (rooted Magisk devices) ───────────────
try:
@@ -120,6 +132,8 @@ if _has_camera:
EngineRegistry.register(CameraEngine)
if _has_mediaprojection:
EngineRegistry.register(MediaProjectionEngine)
if _has_android_camera:
EngineRegistry.register(AndroidCameraEngine)
if _has_root_screenrecord:
EngineRegistry.register(RootScreenrecordEngine)
EngineRegistry.register(DemoCaptureEngine)
@@ -152,5 +166,7 @@ if _has_camera:
__all__ += ["CameraEngine", "CameraCaptureStream"]
if _has_mediaprojection:
__all__ += ["MediaProjectionEngine", "MediaProjectionCaptureStream"]
if _has_android_camera:
__all__ += ["AndroidCameraEngine", "AndroidCameraCaptureStream"]
if _has_root_screenrecord:
__all__ += ["RootScreenrecordEngine", "RootScreenrecordCaptureStream"]
@@ -0,0 +1,430 @@
"""Android camera (webcam) capture engine.
Receives camera frames pushed from Kotlin (via Chaquopy) through a
module-level frame queue. The Kotlin :class:`CameraBridge` opens a
camera with the Camera2 API, converts each frame to RGB, and calls
:func:`push_frame` with raw RGB bytes.
The physical camera is opened **on demand** — only while a capture
stream is active. :meth:`AndroidCameraCaptureStream.initialize` calls
:func:`start_camera` (which signals the Kotlin bridge to open the
camera) and :meth:`cleanup` calls :func:`stop_camera`. This keeps the
camera-in-use indicator and battery cost limited to actual use, unlike
the always-on screen/audio capture.
Mirrors the screen-capture bridge
(``core/capture_engines/mediaprojection_engine.py``): a module-level
queue plus push/last-frame fallback/drop-oldest, consumed through the
standard :class:`CaptureEngine` / :class:`CaptureStream` interface so
the live-stream and processing pipelines work unchanged. Cameras are
exposed as selectable "displays" exactly like the desktop OpenCV
:class:`CameraEngine`.
This engine is only available when running inside the LedGrab Android
app (``is_android()``) with at least one camera the Kotlin bridge can
enumerate. All Java interop is lazy + guarded so this module imports
cleanly on desktop CI.
"""
import json
import queue
import threading
import time
from typing import Any, Dict, List, Optional
import numpy as np
from ledgrab.core.capture_engines.base import (
CaptureEngine,
CaptureStream,
DisplayInfo,
ScreenCapture,
)
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_android
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Frame queue — the bridge between Kotlin and Python
# ---------------------------------------------------------------------------
_frame_queue: "queue.Queue[ScreenCapture]" = queue.Queue(maxsize=2)
_active = False
_active_index = 0
_frames_received = 0
# Single-camera ownership. The Kotlin bridge supports exactly one open camera
# at a time (it closes any prior camera on a new open), and all streams share
# the one module-level frame queue. So the engine serializes ownership the way
# the desktop CameraEngine does with its _camera_lock/_active_cv2_indices: the
# first stream to initialize() owns the camera; a second stream on the SAME
# camera attaches (ref-counted); a second stream on a DIFFERENT camera is
# refused. Only the last owner to clean up actually stops the camera. Without
# this, two concurrent android_camera sources on different displays would make
# the second open silently steal the first's frames, and either stream's
# cleanup would drain the shared queue out from under the other.
_state_lock = threading.Lock()
_owner_index: int | None = None # display_index that currently owns the camera
_owner_refs = 0 # number of streams attached to the active camera
# Camera2 delivers frames continuously, but cache the last one so a
# brief consumer stall still has something to read (mirrors
# mediaprojection_engine's _last_frame).
_last_frame: Optional["ScreenCapture"] = None
# Enumeration cache. is_available() is polled by the engine registry,
# so the (cheap but non-free) Camera2 enumeration is cached briefly —
# matching the desktop CameraEngine's 30 s TTL.
_cam_cache: List[Dict[str, Any]] | None = None
_cam_cache_time: float = 0.0
_CAM_CACHE_TTL = 30.0 # seconds
# Resolution presets shown in the UI. Identical to the desktop
# CameraEngine set so the data-driven capture-template config UI
# (keyed by the "resolution" field name) renders the same dropdown.
# "auto" lets the Kotlin bridge pick a balanced output size.
_RESOLUTION_CHOICES: List[str] = [
"auto",
"640x480",
"1280x720",
"1920x1080",
"2560x1440",
"3840x2160",
]
def _parse_resolution(value: Any) -> tuple[int, int] | None:
"""Parse a 'WxH' string into (width, height). None for 'auto'/invalid."""
if not isinstance(value, str):
return None
s = value.strip().lower()
if s in ("", "auto"):
return None
parts = s.replace("×", "x").split("x")
if len(parts) != 2:
return None
try:
w, h = int(parts[0]), int(parts[1])
except ValueError:
return None
if w <= 0 or h <= 0:
return None
return w, h
# ---------------------------------------------------------------------------
# Kotlin CameraBridge interop — lazy + guarded (never at import time)
# ---------------------------------------------------------------------------
def _camera_bridge():
"""Return the Kotlin ``CameraBridge`` singleton, or None off-Android.
The ``from java import jclass`` import only resolves inside the
Chaquopy runtime, so it must never run at module import time (this
module is imported on desktop CI too). Mirrors
``core/devices/android_ble_transport.py``.
"""
if not is_android():
return None
try:
from java import jclass # type: ignore[import-not-found]
except ImportError as exc:
logger.debug("Chaquopy java interop not available: %s", exc)
return None
try:
return jclass("com.ledgrab.android.CameraBridge").INSTANCE
except Exception as exc: # pragma: no cover - Android-only path
logger.debug("CameraBridge singleton unavailable: %s", exc)
return None
def list_cameras() -> List[Dict[str, Any]]:
"""Enumerate cameras via the Kotlin bridge.
Returns a list of ``{"index": int, "name": str, "facing": str}``
dicts in stable enumeration order, or ``[]`` off-Android / on error
/ when the device has no cameras or CAMERA enumeration fails.
Monkeypatched in tests to inject a fake list without Android.
"""
bridge = _camera_bridge()
if bridge is None:
return []
try:
raw = bridge.listCameras() # JSON array string
except Exception as exc: # pragma: no cover - Android-only path
logger.warning("CameraBridge.listCameras failed: %s", exc)
return []
try:
parsed = json.loads(str(raw))
except (ValueError, TypeError) as exc: # pragma: no cover
logger.warning("CameraBridge.listCameras returned invalid JSON: %s", exc)
return []
cameras: List[Dict[str, Any]] = []
for i, entry in enumerate(parsed if isinstance(parsed, list) else []):
if not isinstance(entry, dict):
continue
cameras.append(
{
"index": int(entry.get("index", i)),
"name": str(entry.get("name") or f"Camera {i}"),
"facing": str(entry.get("facing") or "unknown"),
}
)
return cameras
def _enumerate_cameras() -> List[Dict[str, Any]]:
"""Cached camera enumeration (TTL ``_CAM_CACHE_TTL``)."""
global _cam_cache, _cam_cache_time
now = time.monotonic()
if _cam_cache is not None and (now - _cam_cache_time) < _CAM_CACHE_TTL:
return _cam_cache
_cam_cache = list_cameras()
_cam_cache_time = now
return _cam_cache
def start_camera(index: int, width: int, height: int) -> bool:
"""Signal the Kotlin bridge to open camera ``index`` (on demand).
``width``/``height`` are the requested capture size (0 => let the
bridge pick a balanced default). Returns True if the camera began
streaming. False off-Android, when the bridge is unavailable, or
when the open failed (e.g. CAMERA permission denied, camera in use).
Monkeypatched in tests.
"""
bridge = _camera_bridge()
if bridge is None:
return False
try:
return bool(bridge.startCamera(index, width, height))
except Exception as exc: # pragma: no cover - Android-only path
logger.warning("CameraBridge.startCamera(%d) failed: %s", index, exc)
return False
def stop_camera(index: int) -> None:
"""Signal the Kotlin bridge to close the active camera. No-op off-Android."""
bridge = _camera_bridge()
if bridge is None:
return
try:
bridge.stopCamera()
except Exception as exc: # pragma: no cover - Android-only path
logger.debug("CameraBridge.stopCamera failed: %s", exc)
def push_frame(rgb_bytes: bytes, width: int, height: int) -> None:
"""Push one RGB frame from Kotlin into the capture pipeline.
Called from ``CameraBridge`` on its capture thread. The byte buffer
is interpreted as tightly-packed RGB (``width * height * 3`` bytes,
3 bytes/pixel — NOT RGBA). The buffer is copied out so Kotlin may
reuse its backing array; the oldest queued frame is dropped if the
consumer is slow.
"""
global _frames_received, _last_frame
expected = width * height * 3
if expected <= 0:
return
arr = np.frombuffer(rgb_bytes, dtype=np.uint8)
if arr.size < expected:
# Short/malformed buffer — drop rather than reshape-crash.
return
# Copy out of the read-only frombuffer view (and off any reusable
# Kotlin buffer) so the queued frame owns its memory. Mirrors
# mediaprojection_engine.push_frame's .copy().
rgb = arr[:expected].reshape((height, width, 3)).copy()
frame = ScreenCapture(
image=rgb,
width=width,
height=height,
display_index=_active_index,
)
_last_frame = frame
_frames_received += 1
if _frames_received == 1 or _frames_received % 100 == 0:
logger.info("Android camera: received %d frames", _frames_received)
# Drop oldest frame if queue is full (non-blocking).
try:
_frame_queue.put_nowait(frame)
except queue.Full:
try:
_frame_queue.get_nowait()
except queue.Empty:
pass
try:
_frame_queue.put_nowait(frame)
except queue.Full:
pass
def shutdown() -> None:
"""Deactivate the engine. Called when the Android app stops."""
global _active
_active = False
logger.info("Android camera engine shut down")
def _drain_queue() -> None:
"""Discard any queued frames (stale frames from a prior session)."""
global _last_frame
while not _frame_queue.empty():
try:
_frame_queue.get_nowait()
except queue.Empty:
break
_last_frame = None
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class AndroidCameraCaptureStream(CaptureStream):
"""Reads camera frames pushed by Kotlin from the module-level queue.
Opening the physical camera is on demand: :meth:`initialize` asks
the Kotlin bridge to open the camera bound to ``display_index`` and
:meth:`cleanup` asks it to close.
"""
def initialize(self) -> None:
if self._initialized:
return
if not is_android():
raise RuntimeError(
"Android camera engine not available. "
"This engine is only usable inside the Android app."
)
parsed = _parse_resolution(self.config.get("resolution", "auto"))
target_w, target_h = parsed if parsed is not None else (0, 0)
global _active, _active_index, _owner_index, _owner_refs
with _state_lock:
if _owner_index is not None and _owner_index != self.display_index:
# Another camera is already streaming — the bridge can only
# drive one at a time, so refuse rather than silently stealing
# the active camera's frames (mirrors the desktop CameraEngine's
# "already in use by another stream").
raise RuntimeError(
f"Android camera {_owner_index} is already in use by another "
f"capture; only one camera can stream at a time"
)
if _owner_index == self.display_index:
# Same camera already open — attach to it (ref-counted).
_owner_refs += 1
self._initialized = True
logger.info(
"Android camera capture stream attached (camera=%d, refs=%d)",
self.display_index,
_owner_refs,
)
return
# No camera open — open this one. Drain stale frames first so the
# first captured frame is actually current.
_drain_queue()
if not start_camera(self.display_index, target_w, target_h):
raise RuntimeError(
f"Failed to open Android camera {self.display_index} "
f"(CAMERA permission denied, camera in use, or unavailable)"
)
_owner_index = self.display_index
_owner_refs = 1
_active = True
_active_index = self.display_index
self._initialized = True
logger.info("Android camera capture stream initialized (camera=%d)", self.display_index)
def capture_frame(self) -> ScreenCapture | None:
if not self._initialized:
self.initialize()
# Prefer a fresh frame; fall back to the last one on a brief stall.
try:
return _frame_queue.get(timeout=0.1)
except queue.Empty:
return _last_frame
def cleanup(self) -> None:
if self._initialized:
global _active, _owner_index, _owner_refs
with _state_lock:
_owner_refs -= 1
if _owner_refs <= 0:
# Last owner released — actually stop the camera.
stop_camera(self.display_index)
_owner_index = None
_owner_refs = 0
_active = False
_drain_queue()
self._initialized = False
logger.info("Android camera capture stream cleaned up (camera=%d)", self.display_index)
else:
self._initialized = False
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class AndroidCameraEngine(CaptureEngine):
"""Android camera/webcam capture engine (Camera2 via Kotlin bridge).
Only available inside the LedGrab Android app with at least one
enumerable camera. Each camera is exposed as a selectable
"display", mirroring the desktop OpenCV :class:`CameraEngine`.
Selected explicitly via ``engine_type="android_camera"`` in a
capture template — never auto-selected (priority 0, below
MediaProjection's 100).
"""
ENGINE_TYPE = "android_camera"
ENGINE_PRIORITY = 0 # never auto-selected over MediaProjection (100); explicit only
HAS_OWN_DISPLAYS = True
@classmethod
def is_available(cls) -> bool:
return is_android() and len(_enumerate_cameras()) > 0
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {"resolution": "auto"}
@classmethod
def get_config_choices(cls) -> Dict[str, List[str]]:
return {"resolution": list(_RESOLUTION_CHOICES)}
@classmethod
def get_available_displays(cls) -> List[DisplayInfo]:
displays: List[DisplayInfo] = []
for cam in _enumerate_cameras():
idx = cam["index"]
displays.append(
DisplayInfo(
index=idx,
name=cam["name"],
width=0,
height=0,
x=idx * 500,
y=0,
is_primary=(idx == 0),
refresh_rate=30,
)
)
return displays
@classmethod
def create_stream(
cls, display_index: int, config: Dict[str, Any]
) -> AndroidCameraCaptureStream:
merged = {**cls.get_default_config(), **config}
return AndroidCameraCaptureStream(display_index, merged)
@@ -8,6 +8,8 @@ Supported platforms:
- **Windows**: polls toast notifications via winrt UserNotificationListener
(falls back to winsdk if winrt packages are not installed)
- **Linux**: monitors org.freedesktop.Notifications via D-Bus (dbus-next)
- **Android**: receives notifications pushed from a Kotlin NotificationListenerService
via Chaquopy (push-based; see push_notification() and _AndroidBackend)
"""
import asyncio
@@ -17,9 +19,10 @@ import platform
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Set
from typing import Callable, Dict, List, Optional, Set
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_linux
logger = get_logger(__name__)
@@ -30,15 +33,71 @@ _HISTORY_MAX = 50
# Module-level singleton for dependency access
_instance: Optional["OsNotificationListener"] = None
# Push target for the Android backend — set by _AndroidBackend.start(), read by
# push_notification(). None when the Android backend isn't running (desktop / server down).
_android_target: Callable[[str | None], None] | None = None
def get_os_notification_listener() -> Optional["OsNotificationListener"]:
"""Return the global OsNotificationListener instance (or None)."""
return _instance
def push_notification(app_name: str | None) -> None:
"""Receive an Android notification pushed from Kotlin via Chaquopy.
Called by the LedGrabNotificationListener service through
``Python.getInstance().getModule(...).callAttr("push_notification", label)``.
Routes the posting app's display label into the active listener's
``_on_new_notification`` handler. No-op when the Android backend isn't running,
so a notification arriving before the server is ready (or on desktop) is safely
ignored.
"""
# Snapshot into a local first: stop() may null _android_target concurrently, but an
# in-flight push then still completes against the prior callback. Do NOT collapse this
# into `if _android_target is not None: _android_target(...)` — that reintroduces a
# TOCTOU None-deref race.
cb = _android_target
if cb is None:
return
try:
cb(app_name)
except Exception as exc: # never let a JNI-side call crash the bound service
logger.warning("push_notification callback error: %s", exc)
# ── Platform backends ──────────────────────────────────────────────────
class _AndroidBackend:
"""Push-based backend — notifications arrive from Kotlin via push_notification().
Unlike the Windows/Linux backends (which poll or eavesdrop on a thread), Android
notifications are delivered by a Kotlin NotificationListenerService across the
Chaquopy JNI boundary into the module-level push_notification() receiver, so
start()/stop() simply register/clear the receiver target.
"""
def __init__(self, on_notification):
self._on_notification = on_notification
@staticmethod
def probe() -> bool:
"""Return True when running on Android (Chaquopy)."""
from ledgrab.utils.platform import is_android
return is_android()
def start(self) -> None:
global _android_target
_android_target = self._on_notification
logger.info("OS notification listener: Android backend active")
def stop(self) -> None:
global _android_target
_android_target = None
def _import_winrt_notifications():
"""Try to import WinRT notification APIs: winrt first, then winsdk fallback.
@@ -193,7 +252,9 @@ class _LinuxBackend:
@staticmethod
def probe() -> bool:
"""Return True if this backend can run on the current system."""
if platform.system() != "Linux":
# is_linux() excludes Android, which also reports platform.system() == "Linux"
# but has no D-Bus session — defense-in-depth beyond probe ordering.
if not is_linux():
return False
try:
import dbus_next # noqa: F401
@@ -312,8 +373,9 @@ class OsNotificationListener:
global _instance
_instance = self
# Try platform backends in order
for backend_cls in (_WindowsBackend, _LinuxBackend):
# Try platform backends in order (Android first — it reports platform.system()
# == "Linux", so probing it ahead of _LinuxBackend is the robust ordering).
for backend_cls in (_AndroidBackend, _WindowsBackend, _LinuxBackend):
if backend_cls.probe():
self._backend = backend_cls(on_notification=self._on_new_notification)
self._backend.start()
@@ -103,6 +103,7 @@
"templates.engine.wgc.desc": "Windows Graphics Capture",
"templates.engine.demo.desc": "Animated test pattern (demo mode)",
"templates.engine.mediaprojection.desc": "Native Android screen capture",
"templates.engine.android_camera.desc": "On-device camera capture (Camera2)",
"templates.config": "Configuration",
"templates.config.show": "Show configuration",
"templates.config.none": "No additional configuration",
@@ -158,6 +158,7 @@
"templates.engine.wgc.desc": "Windows Graphics Capture",
"templates.engine.demo.desc": "Тестовый анимированный шаблон (демо)",
"templates.engine.mediaprojection.desc": "Нативный захват экрана Android",
"templates.engine.android_camera.desc": "Захват камеры устройства (Camera2)",
"templates.config": "Конфигурация",
"templates.config.show": "Показать конфигурацию",
"templates.config.none": "Нет дополнительных настроек",
@@ -156,6 +156,7 @@
"templates.engine.wgc.desc": "Windows图形捕获",
"templates.engine.demo.desc": "动画测试图案(演示模式)",
"templates.engine.mediaprojection.desc": "原生Android屏幕捕获",
"templates.engine.android_camera.desc": "设备摄像头捕获 (Camera2)",
"templates.config": "配置",
"templates.config.show": "显示配置",
"templates.config.none": "无额外配置",
@@ -0,0 +1,237 @@
"""Tests for the Android push-based notification backend.
These run on desktop CI (no Android device needed): ``is_android`` is
monkeypatched and the app label is pushed directly into the module-level
``push_notification`` receiver, exactly as the Kotlin
``NotificationListenerService`` would across the Chaquopy bridge.
Isolation (critical): the listener keeps process-global state
(``_android_target``, ``_instance``) and persists history to a hardcoded
``data/notification_history.json``. Every test resets those globals and
repoints ``_HISTORY_FILE`` to ``tmp_path`` so the suite never leaks state
between tests or clobbers the real repo data file.
"""
from datetime import datetime, timezone
import pytest
import ledgrab.core.processing.os_notification_listener as nl
from ledgrab.storage.color_strip_source import NotificationColorStripSource
PLATFORM_MOD = "ledgrab.utils.platform"
# ---------------------------------------------------------------------------
# Test doubles
# ---------------------------------------------------------------------------
class _FakeStream:
"""Stub NotificationColorStripStream — records fire() calls."""
def __init__(self, accept: bool = True):
self._accept = accept
self.fired_with: list = []
def fire(self, app_name=None) -> bool:
self.fired_with.append(app_name)
return self._accept
class _FakeStore:
def __init__(self, sources):
self._sources = sources
def get_all_sources(self):
return list(self._sources)
class _FakeStreamManager:
def __init__(self, streams):
self._streams = streams
def get_streams_by_source_id(self, source_id):
return list(self._streams)
def _notif_source(
*, source_id: str = "css_test", os_listener: bool = True
) -> NotificationColorStripSource:
now = datetime.now(timezone.utc)
return NotificationColorStripSource.create_from_kwargs(
id=source_id,
name="Test Notification Source",
source_type="notification",
created_at=now,
updated_at=now,
os_listener=os_listener,
)
# ---------------------------------------------------------------------------
# Fixtures — module-global + disk isolation
# ---------------------------------------------------------------------------
@pytest.fixture
def nl_mod(monkeypatch, tmp_path):
"""Reset module globals and repoint the history file to tmp_path.
``monkeypatch.setattr`` auto-restores originals on teardown, so even though
``start()``/``stop()`` rebind ``_android_target`` and ``_instance`` during a
test, the globals are returned to their pre-test values afterward — no
cross-test leakage and no write to the real repo ``data/`` dir.
"""
monkeypatch.setattr(nl, "_android_target", None)
monkeypatch.setattr(nl, "_instance", None)
monkeypatch.setattr(nl, "_HISTORY_FILE", tmp_path / "notification_history.json")
return nl
# ---------------------------------------------------------------------------
# _AndroidBackend.probe()
# ---------------------------------------------------------------------------
def test_probe_true_on_android(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
assert nl_mod._AndroidBackend.probe() is True
def test_probe_false_on_desktop(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: False)
assert nl_mod._AndroidBackend.probe() is False
# ---------------------------------------------------------------------------
# push_notification() routing contract
# ---------------------------------------------------------------------------
def test_push_is_noop_before_start(nl_mod):
# _android_target is None → no callback, no exception.
nl_mod.push_notification("Telegram") # must not raise
def test_push_routes_after_start_and_stops_after_stop(nl_mod):
received: list = []
backend = nl_mod._AndroidBackend(on_notification=received.append)
backend.start()
nl_mod.push_notification("Telegram")
assert received == ["Telegram"]
backend.stop()
nl_mod.push_notification("Signal") # no-op after stop
assert received == ["Telegram"]
def test_push_swallows_callback_exception(nl_mod):
def boom(_app):
raise RuntimeError("callback exploded")
nl_mod._AndroidBackend(on_notification=boom).start()
# JNI entry point must never propagate — would crash the bound service.
nl_mod.push_notification("X")
# ---------------------------------------------------------------------------
# Integration — start() selects Android, push fires the stream + records history
# ---------------------------------------------------------------------------
def test_android_selected_push_fires_stream_and_records_history(nl_mod, monkeypatch, tmp_path):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
stream = _FakeStream(accept=True)
listener = nl_mod.OsNotificationListener(
_FakeStore([_notif_source(os_listener=True)]),
_FakeStreamManager([stream]),
)
listener.start()
assert listener.available is True # flips True on backend selection, not on push
nl_mod.push_notification("Telegram")
assert stream.fired_with == ["Telegram"]
assert listener.recent_history[0]["app"] == "Telegram"
assert listener.recent_history[0]["fired"] == 1
# history written under tmp_path — never the repo data/ dir
assert nl_mod._HISTORY_FILE.exists()
assert nl_mod._HISTORY_FILE.parent == tmp_path
listener.stop()
def test_push_with_none_app_name_is_recorded(nl_mod, monkeypatch):
# The Windows (_extract_app_name) and Linux D-Bus paths can yield None;
# the Android path falls back to the package name, but None must still be
# handled end-to-end without raising.
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
stream = _FakeStream(accept=True)
listener = nl_mod.OsNotificationListener(
_FakeStore([_notif_source(os_listener=True)]),
_FakeStreamManager([stream]),
)
listener.start()
nl_mod.push_notification(None)
assert stream.fired_with == [None]
assert listener.recent_history[0]["app"] is None
listener.stop()
def test_get_os_notification_listener_tracks_started_instance(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
assert nl_mod.get_os_notification_listener() is None
listener = nl_mod.OsNotificationListener(_FakeStore([]), _FakeStreamManager([]))
listener.start()
assert nl_mod.get_os_notification_listener() is listener
listener.stop()
def test_source_with_os_listener_off_does_not_fire(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
stream = _FakeStream()
listener = nl_mod.OsNotificationListener(
_FakeStore([_notif_source(os_listener=False)]),
_FakeStreamManager([stream]),
)
listener.start()
nl_mod.push_notification("Telegram")
assert stream.fired_with == [] # os_listener=False → skipped
listener.stop()
# ---------------------------------------------------------------------------
# Desktop regression — the probe-order change must not alter desktop selection
# ---------------------------------------------------------------------------
def test_android_probe_false_on_real_desktop(nl_mod, monkeypatch):
# With is_android() False, the new first-in-tuple backend must not be selectable.
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: False)
assert nl_mod._AndroidBackend.probe() is False
def test_desktop_selection_unchanged_windows_wins(nl_mod, monkeypatch):
# Deterministically control probes and stub start() so no real polling thread spawns.
# Order under test is (_AndroidBackend, _WindowsBackend, _LinuxBackend): Android skipped,
# Windows is the first True → it must be the selected backend, exactly as before.
monkeypatch.setattr(nl_mod._AndroidBackend, "probe", staticmethod(lambda: False))
monkeypatch.setattr(nl_mod._WindowsBackend, "probe", staticmethod(lambda: True))
monkeypatch.setattr(nl_mod._LinuxBackend, "probe", staticmethod(lambda: False))
started: list = []
monkeypatch.setattr(nl_mod._WindowsBackend, "start", lambda self: started.append("win"))
listener = nl_mod.OsNotificationListener(_FakeStore([]), _FakeStreamManager([]))
listener.start()
assert listener.available is True
assert isinstance(listener._backend, nl_mod._WindowsBackend)
assert started == ["win"]
@@ -0,0 +1,342 @@
"""Tests for the Android camera (webcam) capture engine.
These run on desktop CI (no Android device needed): ``is_android`` and the
Kotlin-bridge hooks (``list_cameras`` / ``start_camera`` / ``stop_camera``)
are monkeypatched, and RGB frames are pushed directly into the module-level
queue, exactly as the Kotlin ``CameraBridge`` would.
"""
import queue
import numpy as np
import pytest
# Importing the package triggers auto-registration of AndroidCameraEngine.
import ledgrab.core.capture_engines # noqa: F401
from ledgrab.core.capture_engines import android_camera_engine as eng
from ledgrab.core.capture_engines.factory import EngineRegistry
ENGINE_MOD = "ledgrab.core.capture_engines.android_camera_engine"
W = 16
H = 8
_FAKE_CAMERAS = [
{"index": 0, "name": "Back camera", "facing": "back"},
{"index": 1, "name": "Front camera", "facing": "front"},
]
# ---------------------------------------------------------------------------
# Helpers / fixtures
# ---------------------------------------------------------------------------
def _drain() -> None:
while not eng._frame_queue.empty():
try:
eng._frame_queue.get_nowait()
except queue.Empty:
break
def _frame(marker: int = 0, w: int = W, h: int = H) -> bytes:
"""A tightly-packed RGB frame whose first pixel's R channel is ``marker``."""
arr = np.zeros((h, w, 3), dtype=np.uint8)
arr[0, 0, 0] = marker
return arr.tobytes()
@pytest.fixture
def reset_engine():
"""Reset module-global engine state; snapshot/restore the registry.
The engine keeps its queue + caches in module globals and the registry
is a class-level singleton — both must be restored so this test file
never disturbs the desktop engines other tests rely on.
"""
saved_engines = dict(EngineRegistry._engines)
eng.shutdown()
_drain()
eng._frames_received = 0
eng._active = False
eng._active_index = 0
eng._last_frame = None
eng._cam_cache = None
eng._cam_cache_time = 0.0
eng._owner_index = None
eng._owner_refs = 0
yield eng
eng.shutdown()
_drain()
eng._cam_cache = None
eng._cam_cache_time = 0.0
eng._owner_index = None
eng._owner_refs = 0
EngineRegistry._engines.clear()
EngineRegistry._engines.update(saved_engines)
@pytest.fixture
def on_android(monkeypatch, reset_engine):
"""Engine fixture with ``is_android`` True, demo mode off, fake cameras,
and the open/close hooks stubbed to succeed (recording calls)."""
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr("ledgrab.core.capture_engines.factory.is_demo_mode", lambda: False)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
calls = {"start": [], "stop": []}
monkeypatch.setattr(
f"{ENGINE_MOD}.start_camera",
lambda index, w, h: calls["start"].append((index, w, h)) or True,
)
monkeypatch.setattr(
f"{ENGINE_MOD}.stop_camera",
lambda index: calls["stop"].append(index),
)
reset_engine.calls = calls
return reset_engine
# ---------------------------------------------------------------------------
# Queue / push contract
# ---------------------------------------------------------------------------
def test_push_frame_round_trips_rgb(on_android):
# Arrange
stream = eng.AndroidCameraEngine.create_stream(0, {})
stream.initialize()
# Act
eng.push_frame(_frame(marker=42), W, H)
got = stream.capture_frame()
# Assert
assert got is not None
assert got.image.shape == (H, W, 3)
assert got.image.dtype == np.uint8
assert int(got.image[0, 0, 0]) == 42
assert got.width == W and got.height == H
def test_queue_drops_oldest_when_full(reset_engine):
# Arrange
maxsize = eng._frame_queue.maxsize # 2
# Act — push more frames than the queue holds, each tagged 0..N-1
total = maxsize + 3
for i in range(total):
eng.push_frame(_frame(marker=i), W, H)
drained = []
while True:
try:
drained.append(eng._frame_queue.get_nowait())
except queue.Empty:
break
# Assert — only the newest `maxsize` frames survived, oldest dropped
assert len(drained) == maxsize
markers = [int(f.image[0, 0, 0]) for f in drained]
assert markers == list(range(total - maxsize, total))
def test_capture_frame_falls_back_to_last_frame_when_empty(on_android):
# Arrange
stream = eng.AndroidCameraEngine.create_stream(0, {})
stream.initialize()
eng.push_frame(_frame(marker=7), W, H)
# Act — first read drains the queue; second read finds it empty
first = stream.capture_frame()
second = stream.capture_frame()
# Assert — the static-frame fallback returns the cached last frame
assert first is not None
assert second is not None
assert int(second.image[0, 0, 0]) == 7
def test_push_frame_short_buffer_does_not_crash(reset_engine):
# A buffer shorter than width*height*3 must be dropped, not reshape-crash.
eng.push_frame(b"\x01\x02\x03", W, H) # far too short
assert eng._frame_queue.empty()
assert eng._last_frame is None
# ---------------------------------------------------------------------------
# On-demand open/close lifecycle
# ---------------------------------------------------------------------------
def test_initialize_opens_camera_with_parsed_resolution(on_android):
stream = eng.AndroidCameraEngine.create_stream(1, {"resolution": "1280x720"})
stream.initialize()
assert on_android.calls["start"] == [(1, 1280, 720)]
def test_initialize_auto_resolution_requests_zero(on_android):
stream = eng.AndroidCameraEngine.create_stream(0, {"resolution": "auto"})
stream.initialize()
assert on_android.calls["start"] == [(0, 0, 0)]
def test_cleanup_closes_camera_once(on_android):
stream = eng.AndroidCameraEngine.create_stream(0, {})
stream.initialize()
stream.cleanup()
assert on_android.calls["stop"] == [0]
# Idempotent — a second cleanup does not re-signal the bridge.
stream.cleanup()
assert on_android.calls["stop"] == [0]
def test_second_camera_index_is_refused(on_android):
# First stream owns camera 0.
s0 = eng.AndroidCameraEngine.create_stream(0, {})
s0.initialize()
# A stream on a DIFFERENT camera must be refused (one camera at a time),
# not silently steal camera 0's stream.
s1 = eng.AndroidCameraEngine.create_stream(1, {})
with pytest.raises(RuntimeError):
s1.initialize()
# Only the first open reached the bridge.
assert on_android.calls["start"] == [(0, 0, 0)]
def test_same_camera_attaches_and_refcounts(on_android):
# Two streams on the SAME camera share one physical open (ref-counted).
a = eng.AndroidCameraEngine.create_stream(0, {})
b = eng.AndroidCameraEngine.create_stream(0, {})
a.initialize()
b.initialize()
assert on_android.calls["start"] == [(0, 0, 0)] # opened once
# First release must NOT stop the camera (the other stream is still live).
a.cleanup()
assert on_android.calls["stop"] == []
# Last release stops it exactly once.
b.cleanup()
assert on_android.calls["stop"] == [0]
def test_camera_freed_after_release_allows_other_index(on_android):
# After fully releasing camera 0, a different camera can be opened.
s0 = eng.AndroidCameraEngine.create_stream(0, {})
s0.initialize()
s0.cleanup()
s1 = eng.AndroidCameraEngine.create_stream(1, {})
s1.initialize() # must not raise
assert on_android.calls["start"] == [(0, 0, 0), (1, 0, 0)]
def test_initialize_raises_when_open_fails(monkeypatch, reset_engine):
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr(f"{ENGINE_MOD}.start_camera", lambda index, w, h: False)
stream = eng.AndroidCameraEngine.create_stream(0, {})
with pytest.raises(RuntimeError):
stream.initialize()
def test_initialize_raises_off_android(monkeypatch, reset_engine):
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
stream = eng.AndroidCameraEngine.create_stream(0, {})
with pytest.raises(RuntimeError):
stream.initialize()
# ---------------------------------------------------------------------------
# Availability / enumeration (platform-gated)
# ---------------------------------------------------------------------------
def test_is_available_requires_android_and_cameras(monkeypatch, reset_engine):
# Off-Android → unavailable regardless of cameras.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
assert eng.AndroidCameraEngine.is_available() is False
# On-Android but no cameras → unavailable.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: [])
eng._cam_cache = None # bust the enumeration cache
assert eng.AndroidCameraEngine.is_available() is False
# On-Android with ≥1 camera → available.
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
eng._cam_cache = None
assert eng.AndroidCameraEngine.is_available() is True
def test_get_available_displays_maps_cameras(on_android):
displays = eng.AndroidCameraEngine.get_available_displays()
assert len(displays) == 2
assert displays[0].index == 0 and displays[0].name == "Back camera"
assert displays[0].is_primary is True
assert displays[1].index == 1 and displays[1].name == "Front camera"
assert displays[1].is_primary is False
def test_config_choices_expose_resolution(reset_engine):
choices = eng.AndroidCameraEngine.get_config_choices()
assert "resolution" in choices
assert "auto" in choices["resolution"]
assert "1920x1080" in choices["resolution"]
# ---------------------------------------------------------------------------
# Registry integration
# ---------------------------------------------------------------------------
def test_engine_registers_with_expected_type_and_priority():
# Auto-registration ran on import; the engine is in the registry.
assert "android_camera" in EngineRegistry.get_all_engines()
assert eng.AndroidCameraEngine.ENGINE_PRIORITY == 0
assert eng.AndroidCameraEngine.HAS_OWN_DISPLAYS is True
def test_does_not_beat_mediaprojection_by_priority(monkeypatch, reset_engine):
"""Priority 0 must never let the camera win the best-engine race over
MediaProjection (100) on Android."""
from ledgrab.core.capture_engines import mediaprojection_engine as mp
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
monkeypatch.setattr("ledgrab.core.capture_engines.factory.is_demo_mode", lambda: False)
eng._cam_cache = None
# Controlled registry: just the two engines whose priority race we assert.
EngineRegistry._engines.clear()
EngineRegistry.register(mp.MediaProjectionEngine)
EngineRegistry.register(eng.AndroidCameraEngine)
mp.configure(640, 480) # make MediaProjection available
try:
best = EngineRegistry.get_best_available_engine()
assert best == "mediaprojection"
assert best != "android_camera"
finally:
mp.shutdown()
while not mp._frame_queue.empty():
try:
mp._frame_queue.get_nowait()
except queue.Empty:
break
def test_stream_via_registry_yields_pushed_frame(on_android):
# Arrange — register cleanly (fixture restores afterward).
stream = EngineRegistry.create_stream("android_camera", 0, {})
stream.initialize()
# Act
eng.push_frame(_frame(marker=99), W, H)
got = stream.capture_frame()
# Assert
assert got is not None
assert int(got.image[0, 0, 0]) == 99
assert got.display_index == 0