Compare commits

...

10 Commits

Author SHA1 Message Date
alexei.dolgolyov 02e2ea37f3 fix(scenes): sync brightness value-source change to live processor
apply_scene_state computed brightness_changed = "brightness" in changed, but
the change dict only ever uses the key "brightness_value_source_id", so the
branch was dead and a running target's brightness source was never live-synced
on scene activation (it only took effect after a restart). Check the correct
key.
2026-06-04 20:46:26 +03:00
alexei.dolgolyov fdc9201660 fix(api): remove broken legacy /system/mqtt/settings route
The GET/PUT /api/v1/system/mqtt/settings handlers read cfg.mqtt.*, but the
single-broker MQTTConfig block was removed in the multi-broker refactor, so any
call raised AttributeError. Brokers are now first-class MQTTSource entities
managed via the mqtt.py router, and the frontend no longer calls this endpoint.
Remove the dead handlers, the _load_mqtt_settings helper, the now-unused
get_config import, and the orphaned MQTTSettings{Request,Response} schemas.
2026-06-04 20:46:24 +03:00
alexei.dolgolyov 5686ae5468 fix(security): remove active weak default API key from shipped config
default_config.yaml shipped api_keys.dev: "development-key-change-in-production"
uncommitted/active, while the surrounding comment claimed it had been removed.
On a non-loopback bind this is a publicly-known credential granting full LAN
access. Restore the documented secure default (empty api_keys -> loopback-only
anonymous, LAN rejected) and leave a commented example instead.
2026-06-04 20:46:13 +03:00
alexei.dolgolyov 9960f15a1b docs(android): remove ANDROID-REVIEW planning/review docs
The Android feature-gap assessment and per-feature design docs have served
their purpose — notification, audio, webcam, and the foreground-app automation
condition are all implemented and merged, so no gaps remain to track. The
implementation is documented in the code, commit messages, and git history; the
review docs are now obsolete. No committed files referenced them (only the
local-only plans/ archives, left as point-in-time records).
2026-06-02 15:05:11 +03:00
alexei.dolgolyov 397a53ed1c Merge feature/android-foreground-app-automation: Android foreground-app automation condition
Foreground-app -> scene automation on the Android-TV build via a Kotlin
ForegroundAppBridge (UsageStatsManager) bridged into PlatformDetector ahead of the
Windows-only ctypes path; LauncherApps-backed app picker (/system/installed-apps) +
platform signal (/system/info); PACKAGE_USAGE_STATS special-access UX (on-device
button + web-UI banner, graceful degradation). Reuses the existing automation engine
unchanged; zero new deps. assembleDebug + 1897 pytest + ruff + tsc + build green;
independent final + security reviews pass.
2026-06-02 14:57:45 +03:00
alexei.dolgolyov 1c1bbe2551 feat(android): foreground-app automation condition
Make the existing Application automation rule (foreground app -> activate
scene) work on the Android-TV build. A Kotlin ForegroundAppBridge reads the
foreground app via UsageStatsManager and lists launchable apps via LauncherApps;
PlatformDetector bridges it in (ahead of the Windows-only ctypes guard) so the
existing AutomationEngine / ApplicationRule / storage / deactivation modes are
unchanged. New /system/installed-apps + /system/info endpoints feed an app picker
that stores package names (vs process names on desktop); on Android the editor
hides the match-type selector since the foreground app is the only obtainable
signal. PACKAGE_USAGE_STATS is granted via an on-device button + a web-UI banner
(no blanket prompt at capture start); detection degrades gracefully until granted.

Zero new Python/Gradle deps (UsageStatsManager + LauncherApps are in-platform;
matching only string-compares the package name, so no QUERY_ALL_PACKAGES).
assembleDebug + 1897 pytest + ruff + tsc + npm build all green; independent final
review (0 blockers) + security review (no critical issues).
2026-06-02 14:57:29 +03:00
alexei.dolgolyov 68040173c6 Merge feature/android-webcam-capture: Android on-device webcam capture
Camera2 + ImageReader (CameraBridge) -> push RGB frames -> AndroidCameraEngine,
reusing the MediaProjection capture-engine selection path so webcams surface as
selectable displays on Android TV. On-demand camera lifecycle, conditional camera
FGS type (granted-only), single-camera ownership lock. Per-phase + final + security
reviews pass; 14 files; new isolated tests; zero new Python/Gradle deps.
assembleDebug + 1883 pytest + ruff all green.
2026-06-02 13:46:59 +03:00
alexei.dolgolyov 4bf3fe65db feat(android): on-device webcam capture via Camera2 (AndroidCameraEngine)
Add on-device webcam capture to the experimental Android-TV build. Desktop
captures webcams via OpenCV (no Chaquopy/Android wheel); this adds a push-based
AndroidCameraEngine that plugs into the same selection path desktop uses
(capture template engine_type="android_camera" + display_index, HAS_OWN_DISPLAYS).

A Kotlin CameraBridge (Camera2) enumerates cameras and opens them on demand —
only while a capture source is active, driven Python->Kotlin via a guarded jclass
singleton (BleBridge pattern) — converts each frame YUV_420_888->RGB, and pushes
RGB bytes into a module-level queue mirroring mediaprojection_engine.py. Cameras
surface as selectable displays like the desktop OpenCV engine; the data-driven
capture-template UI is unchanged. No new Python deps; no new Gradle deps
(Camera2 is in-platform).

Engine: ENGINE_PRIORITY=0 (never auto-selected over MediaProjection=100; explicit
engine_type only). Single-camera ownership is serialized with a lock + ref-count
(same-camera streams attach, different-camera refused, last release stops),
mirroring the desktop CameraEngine guard.

Permission: CAMERA requested at capture-start, gated on FEATURE_CAMERA_ANY so
camera-less TV boxes never prompt; graceful degradation when denied. The service
is promoted with the camera FGS type (+ FOREGROUND_SERVICE_CAMERA) only when
CAMERA is already granted, so backgrounded capture keeps working without risking
a failed startForeground on camera-less boxes (camera can't ride the
MediaProjection token the way audio playback capture does).

Reviewed via multi-agent adversarial pass (13 findings -> 4 fixed: device leak on
session-failure, multi-stream collision, camera FGS type, i18n key; 9 refuted).

Tests: 18 new desktop-CI tests (no device needed); full suite 1883 passed.
Verified: assembleDebug BUILD SUCCESSFUL, ruff clean.

Docs: ANDROID-REVIEW/android-webcam-capture-plan.md (design), updated
android-missing-functionality.md + README feature table + en/ru/zh locales.
2026-06-02 13:36:23 +03:00
alexei.dolgolyov 34db5de8c3 Merge feature/android-notification-capture: Android on-device notification capture
NotificationListenerService -> push_notification() -> _AndroidBackend path so OS
notifications drive the existing color-strip LED effects on Android TV at app-name
parity. Per-phase + final + security reviews all pass; 11 new isolated tests; zero
new Python deps. Android assembleDebug pending device/CI verification.
2026-06-02 11:47:29 +03:00
alexei.dolgolyov 0be3f833df feat(android): on-device OS notification capture (NotificationListenerService)
Add an Android backend to os_notification_listener.py so notifications on the
experimental Android-TV build drive the existing NotificationColorStripSource
LED effects (flash/pulse/sweep, per-app colors + sounds) at app-name parity
with the Windows/Linux backends.

A Kotlin NotificationListenerService forwards the posting app's display label
across the Chaquopy JNI boundary into a new push-based _AndroidBackend +
module-level push_notification() receiver; the existing color-strip pipeline,
per-app colors/filters, and history endpoint are reused unchanged.

- Python: _AndroidBackend (probed first), push_notification() receiver,
  _LinuxBackend.probe() hardened with is_linux() to exclude Android (which
  also reports platform.system() == "Linux").
- Android: LedGrabNotificationListener NLS — serial single-thread executor,
  full crash isolation around Python.getInstance(), label-only forwarding
  (never notification title/body), ongoing/group-summary/self-package noise
  filtering. Manifest service exported + gated by
  BIND_NOTIFICATION_LISTENER_SERVICE (no new uses-permission).
- UX: prompt-once notification-access + manual "Grant notification access"
  button wired into the D-pad focus chain (computed from visible controls);
  en/ru/zh strings.
- Tests: 11 isolated unit tests — module-global + tmp_path history isolation,
  push routing contract, callback-exception swallowing, None app-name, and a
  desktop-regression lock on backend selection order.
- Docs: README OS-support Android column (notification + audio cells),
  ANDROID-REVIEW status flipped to Implemented.

Zero new Python deps; no build.gradle.kts / Chaquopy pip changes.
2026-06-02 11:47:13 +03:00
35 changed files with 2714 additions and 638 deletions
@@ -1,308 +0,0 @@
# Plan: Android on-device audio capture
> Status: proposed plan (not yet approved). No code changes. Last updated 2026-06-01.
## Context
LedGrab's audio-reactive features (music analyzer, audio value sources, band filters)
depend on capturing an audio stream and running it through `AudioAnalyzer`
(`server/src/ledgrab/core/audio/analysis.py`). On desktop this is fed by **WASAPI**
(Windows) or **Sounddevice/PortAudio** (cross-platform). On the **experimental
Android-TV build** neither is available — `sounddevice` has no Chaquopy wheel and PortAudio
isn't bundled — so `core/audio/__init__.py` registers only `DemoAudioEngine`, and
audio-reactive lighting is effectively dead on Android.
Android does not need PortAudio: the platform exposes **`AudioPlaybackCapture`** (API 29+),
which captures system playback audio and **takes a `MediaProjection` token — the very token
the app already obtains for screen capture** (`ScreenCapture(projection, …)`). This plan adds
a push-based Android audio engine so the TV box can drive sound-reactive lighting from its own
media playback, at parity with how desktop audio feeds the analyzer.
The design mirrors the working screen-capture bridge
(`mediaprojection_engine.py``ScreenCapture.kt``PythonBridge`) and the existing audio
engine abstraction (`AudioCaptureEngine` / `AudioCaptureStreamBase` /
`AudioEngineRegistry`). **No new Python dependencies** (`numpy` is already bundled) → no
Chaquopy / `build.gradle.kts` `pip {}` changes.
---
## Approach
A new **push-based** audio engine registered in the existing `AudioEngineRegistry`:
- **Python:** `AndroidAudioEngine` + `AndroidAudioCaptureStream` mirroring `SounddeviceEngine`,
but `read_chunk()` pops PCM from a module-level queue that **Kotlin fills** (mirror of
`mediaprojection_engine.push_frame`). High `ENGINE_PRIORITY` so
`AudioEngineRegistry.get_best_available_engine()` selects it on Android. The existing
`ManagedAudioStream` capture loop and `AudioAnalyzer` consume `read_chunk()` unchanged.
- **Android:** an `AudioCapture` helper using `AudioRecord` + `AudioPlaybackCaptureConfiguration`
(reusing `CaptureService`'s `MediaProjection`), pushing float32 PCM to Python. Mic
(`AudioSource.MIC`) fallback. Wired into `CaptureService` next to `ScreenCapture`.
```
[media playback] → AudioRecord (AudioPlaybackCapture, reuses MediaProjection)
→ AudioCapture.kt → PythonBridge.pushAudio(pcmFloat32, frames, channels)
→ android_audio_engine.push_samples() [module-level queue]
→ AndroidAudioCaptureStream.read_chunk() → ManagedAudioStream → AudioAnalyzer [unchanged]
```
---
## Part A — Python (server)
**New file: `server/src/ledgrab/core/audio/android_audio_engine.py`** — mirror
`mediaprojection_engine.py` (queue + configure + push) and `sounddevice_engine.py` (engine/stream shape):
```python
import queue
import numpy as np
from typing import Any, Dict, List
from ledgrab.core.audio.base import AudioCaptureEngine, AudioCaptureStreamBase, AudioDeviceInfo
from ledgrab.utils import get_logger
logger = get_logger(__name__)
_pcm_queue: "queue.Queue[np.ndarray]" = queue.Queue(maxsize=8)
_sample_rate = 48000
_channels = 2
_chunk_size = 1024
_active = False
def configure(sample_rate: int, channels: int, chunk_size: int) -> None:
"""Called from Kotlin before audio frames start flowing. Drains stale PCM."""
global _sample_rate, _channels, _chunk_size, _active
while not _pcm_queue.empty():
try: _pcm_queue.get_nowait()
except queue.Empty: break
_sample_rate, _channels, _chunk_size = sample_rate, channels, chunk_size
_active = True
def push_samples(pcm_float32: bytes) -> None:
"""Push one interleaved float32 PCM chunk from Kotlin. Drops oldest if full."""
samples = np.frombuffer(pcm_float32, dtype=np.float32)
try:
_pcm_queue.put_nowait(samples)
except queue.Full:
try: _pcm_queue.get_nowait()
except queue.Empty: pass
try: _pcm_queue.put_nowait(samples)
except queue.Full: pass
def shutdown() -> None:
global _active
_active = False
class AndroidAudioCaptureStream(AudioCaptureStreamBase):
@property
def channels(self) -> int: return _channels
@property
def sample_rate(self) -> int: return _sample_rate
@property
def chunk_size(self) -> int: return _chunk_size
def initialize(self) -> None:
if not _active:
raise RuntimeError("Android audio engine not configured (only valid in-app).")
self._initialized = True
def cleanup(self) -> None:
self._initialized = False
def read_chunk(self) -> np.ndarray | None:
try:
return _pcm_queue.get(timeout=0.1) # 1-D float32 interleaved
except queue.Empty:
return None
class AndroidAudioEngine(AudioCaptureEngine):
ENGINE_TYPE = "android_playback"
ENGINE_PRIORITY = 100 # highest on Android (demo is lower)
@classmethod
def is_available(cls) -> bool:
from ledgrab.utils.platform import is_android
return is_android() and _active
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {"sample_rate": _sample_rate, "channels": _channels, "chunk_size": _chunk_size}
@classmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
if not cls.is_available(): return []
return [AudioDeviceInfo(index=0, name="Android playback (system audio)",
is_input=True, is_loopback=True,
channels=_channels, default_samplerate=float(_sample_rate))]
@classmethod
def create_stream(cls, device_index, is_loopback, config) -> AndroidAudioCaptureStream:
return AndroidAudioCaptureStream(device_index, is_loopback, {**cls.get_default_config(), **config})
```
**Modify `server/src/ledgrab/core/audio/__init__.py`** — register behind a guarded import,
matching the existing `_has_wasapi` / `_has_sounddevice` pattern:
```python
try:
from ledgrab.core.audio.android_audio_engine import AndroidAudioEngine
_has_android_audio = True
except ImportError:
_has_android_audio = False
...
if _has_android_audio:
AudioEngineRegistry.register(AndroidAudioEngine)
```
**Reused, unchanged:** `AudioEngineRegistry.get_best_available_engine()` (picks by priority),
`ManagedAudioStream._capture_loop()` (`audio_capture.py`), `AudioAnalyzer`, the audio value
sources, and the device-enumeration endpoints. The Android engine appears as one loopback
device named "Android playback (system audio)".
---
## Part B — Android (Kotlin + manifest)
**New file: `android/app/src/main/java/com/ledgrab/android/AudioCapture.kt`**
Mirrors `ScreenCapture.kt`, taking the same `MediaProjection`:
```kotlin
class AudioCapture(
private val projection: MediaProjection,
private val bridge: PythonBridge,
private val sampleRate: Int = 48000,
private val channels: Int = 2,
private val chunkFrames: Int = 1024,
)
```
- `start()` (API 29+, MediaProjection mode):
- Build `AudioPlaybackCaptureConfiguration(projection)` adding usages
`USAGE_MEDIA`, `USAGE_GAME`, `USAGE_UNKNOWN` (the capturable set).
- `AudioRecord.Builder().setAudioPlaybackCaptureConfig(cfg)` with
`AudioFormat(ENCODING_PCM_FLOAT, sampleRate, CHANNEL_IN_STEREO)`.
- On a dedicated `HandlerThread`, loop `audioRecord.read(floatBuf, …, READ_BLOCKING)`
wrap into a little-endian float32 `ByteArray` (reusable buffer, like `ScreenCapture`'s
`frameBuffer`) → `bridge.pushAudio(bytes, framesRead, channels)`.
- `stop()`: stop/release `AudioRecord`, quit the thread.
- **Mic fallback** (`startMic()`): `AudioSource.MIC` for root mode (no MediaProjection) or
API < 29. Used only when playback capture is unavailable.
**Modify `android/app/src/main/java/com/ledgrab/android/PythonBridge.kt`** — add the audio
push path (same shape as `pushFrame`, with a cached PyObject handle):
```kotlin
@Volatile private var androidAudioEngine: PyObject? = null
fun configureAudio(sampleRate: Int, channels: Int, chunkFrames: Int) {
val engine = Python.getInstance().getModule("ledgrab.core.audio.android_audio_engine")
engine.callAttr("configure", sampleRate, channels, chunkFrames)
androidAudioEngine = engine
}
fun pushAudio(pcmFloat32: ByteArray, frames: Int, channels: Int) {
if (!running) return
androidAudioEngine?.let {
try { it.callAttr("push_samples", pcmFloat32) }
catch (e: Exception) { Log.w(TAG, "pushAudio failed: ${e.message}") }
}
}
```
**Modify `android/app/src/main/java/com/ledgrab/android/CaptureService.kt`** — in the
MediaProjection start path (where `ScreenCapture` is created with the projection), if
`RECORD_AUDIO` is granted and API ≥ 29, also `bridge.configureAudio(...)` and start an
`AudioCapture(projection, bridge)`. Stop/release it in `onDestroy` alongside `ScreenCapture`.
Root path → optional mic fallback (or skip; see Risks).
**Modify `android/app/src/main/AndroidManifest.xml`:**
```xml
<uses-permission android:name="android.permission.RECORD_AUDIO" />
<!-- For mic-mode foreground capture on API 34+ (playback capture is covered by the
existing mediaProjection FGS type): -->
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_MICROPHONE" />
```
The existing `CaptureService` already declares `foregroundServiceType="mediaProjection|specialUse"`
and holds `FOREGROUND_SERVICE_MEDIA_PROJECTION`; add `microphone` to the type only if mic
fallback is implemented.
**Modify `MainActivity.kt`** — request `RECORD_AUDIO` at runtime alongside the existing
`ensureNotificationPermission()` (POST_NOTIFICATIONS) flow, before starting capture. Capture
proceeds without audio if denied (graceful degradation).
---
## Orchestration decision (the main trade-off)
Desktop starts audio capture **on demand** when an audio-reactive source is acquired
(`AudioCaptureManager.acquire`). On Android, PCM only flows if Kotlin has set up `AudioRecord`.
- **MVP (recommended):** start `AudioCapture` when `CaptureService` starts (if `RECORD_AUDIO`
granted + MediaProjection mode + API ≥ 29) and push continuously; the bounded queue drops
frames when no audio source consumes them. Simplest; modest extra CPU.
- **Future optimization:** on-demand start/stop signaled Python→Kotlin (Chaquopy can call
Kotlin, as `BleBridge`/`UsbSerialBridge` show) so `AudioRecord` runs only while an
audio-reactive source is active. Defer unless CPU/battery on low-end boxes warrants it.
---
## What does NOT change
- **Frontend / API** — audio engine + device selection, the music analyzer UI, and audio value
sources are engine-agnostic; the Android engine shows up via the existing device enumeration.
- **`build.gradle.kts` / Chaquopy pip block** — no new Python packages.
- **Audio analysis pipeline** — `AudioAnalyzer`, band filters, `ManagedAudioStream` untouched.
---
## Files
**Create**
- `server/src/ledgrab/core/audio/android_audio_engine.py`
- `android/app/src/main/java/com/ledgrab/android/AudioCapture.kt`
- `server/tests/core/audio/test_android_audio_engine.py`
**Modify**
- `server/src/ledgrab/core/audio/__init__.py` — guarded import + registry registration.
- `android/app/src/main/java/com/ledgrab/android/PythonBridge.kt``configureAudio` + `pushAudio`.
- `android/app/src/main/java/com/ledgrab/android/CaptureService.kt` — start/stop `AudioCapture`.
- `android/app/src/main/java/com/ledgrab/android/MainActivity.kt` — request `RECORD_AUDIO`.
- `android/app/src/main/AndroidManifest.xml``RECORD_AUDIO` (+ mic FGS if mic fallback).
---
## Tests (Python — run on desktop CI, no Android device needed)
New `server/tests/core/audio/test_android_audio_engine.py`:
- `configure()` then `push_samples()``read_chunk()` returns the same float32 samples;
queue drops oldest when full (push > maxsize).
- `AndroidAudioEngine.is_available()` is `False` until `configure()` and only on Android
(monkeypatch `ledgrab.utils.platform.is_android`); `True` after.
- `enumerate_devices()` returns exactly one loopback device when active, `[]` otherwise.
- Integration: with `is_android()` patched true + `configure()`, `get_best_available_engine()`
returns `"android_playback"` (priority beats demo), and a stream created via
`AudioEngineRegistry.create_stream("android_playback", 0, True, {})` yields pushed chunks.
- Registry isolation: use `AudioEngineRegistry.clear_registry()` / re-register in fixtures so
desktop engines aren't disturbed.
## Verification
1. **Python:** `py -3.13 -m pytest tests/core/audio/test_android_audio_engine.py --no-cov -q`
(from `server/`), then the full suite.
2. **Lint:** `ruff check src/ tests/ --fix` (from `server/`).
3. **Android build:** `./gradlew :app:assembleDebug` (from `android/`).
4. **On device/emulator (manual):** install APK → grant `RECORD_AUDIO` + screen-capture consent
→ start capture → play non-DRM media (e.g. a local video / YouTube web) → create an
audio-reactive value source bound to a strip → confirm the LEDs react to the audio, and the
Android playback device appears in audio device enumeration.
## Risks / notes
- **DRM opt-out:** Netflix/Disney+/etc. set audio as non-capturable; `AudioPlaybackCapture`
yields silence for them. Works for non-DRM media and the device's own audio. Document in UI.
- **API 29 minimum** for playback capture (minSdk is 24). API 2428 and root mode (no
MediaProjection) → mic fallback only, or audio unsupported. Gate cleanly + log.
- **`RECORD_AUDIO`** is a runtime "dangerous" permission — must be requested; capture must
degrade gracefully when denied.
- **Format:** request `ENCODING_PCM_FLOAT` so Kotlin pushes float32 matching
`read_chunk()`'s contract (1-D interleaved float32, length = frames × channels). If a device
rejects float, capture 16-bit PCM and convert (`/32768.0`) before pushing.
- **Latency/CPU:** small `chunkFrames` (e.g. 1024 @ 48 kHz ≈ 21 ms) keeps reactivity tight;
continuous capture (MVP) adds modest CPU on low-end boxes — see the orchestration trade-off.
- **R8/ProGuard:** minify is disabled and the Python module is resolved by string from Kotlin;
no new keep-rules needed.
@@ -1,153 +0,0 @@
# Android (TV) — Missing Functionality Assessment
> Status: review/feasibility document. No code changes. Last updated 2026-06-01.
## Context
LedGrab ships an **experimental on-device Android-TV build**: a Kotlin shell that
embeds the Python FastAPI server via **Chaquopy**, with Kotlin↔Python **bridges**
(`PythonBridge`, `BleBridge`, `UsbSerialBridge`). Several desktop features are
unavailable on this build because their Python backends rely on native libraries
that have no Android/Chaquopy wheels (`mss`, `dxcam`, `sounddevice`/PortAudio,
`opencv`, `nvidia-ml-py`, `winrt`, `dbus-next`), or on OS facilities Android
sandboxes differently.
The README "Feature support by OS" table now carries an Android column reflecting
this. This document assesses **whether each missing feature can be added**, how, and
whether it's worth it.
### The enabling pattern (why most of this is feasible)
Every desktop capability that's "missing" on Android is missing only because of a
*native dependency*, not because the capability is impossible. Android exposes the
same capability through a platform API, and the codebase already has the bridge
shape to plug it in:
> **Bridge pattern:** a Kotlin component captures an event/buffer → pushes it across
> the Chaquopy JNI boundary into a **module-level receiver** in a small Python engine
> → an existing engine/stream consumes it unchanged.
Reference implementation: `server/src/ledgrab/core/capture_engines/mediaprojection_engine.py`
(`configure()` + `push_frame()` + a bounded `queue.Queue`) ↔
`android/app/src/main/java/com/ledgrab/android/ScreenCapture.kt`
`PythonBridge.pushFrame()`. Screen capture already works on Android this exact way.
So for most missing features the work is: **add a Kotlin capture source + a thin
Python receiver engine mirroring that pattern.**
---
## Current Android capability matrix
| Feature | Desktop | Android (TV) today | Missing? |
| ------- | ------- | ------------------ | -------- |
| Screen capture | DXCam/WGC/MSS | ✅ MediaProjection + root `screenrecord` | No |
| LED transports (network/USB-serial/BLE) | ✅ | ✅ (USB via Android driver, BLE via Android bridge) | No |
| System metrics | psutil | ✅ CPU/RAM/battery/thermal via `/proc`, `/sys` (`AndroidMetricsProvider`) | No |
| **Audio capture** | WASAPI / Sounddevice | ❌ no PortAudio | **Yes** |
| **Notification capture** | WinRT / D-Bus | ❌ listener only Win/Linux | **Yes** |
| Webcam capture | OpenCV | ❌ no OpenCV wheel | Yes (niche) |
| GPU monitoring | NVML | ❌ no NVIDIA GPU | Marginal |
| Capture from *another* Android phone | scrcpy/ADB | ❌ | Skip (redundant) |
| Automation: window/process conditions | Windows ctypes | ❌ sandboxed | Partial |
| Monitor names / multi-display | WMI / generic | Single built-in display | Low value |
---
## Per-feature feasibility
### 🔊 Audio capture — **FEASIBLE, HIGH VALUE** ⭐ (detailed plan exists)
- **Blocker:** only `sounddevice`/PortAudio is missing — not the capability.
- **Android path:** `AudioPlaybackCapture` (API 29+) captures system playback audio and
**takes a `MediaProjection` token — which the app already obtains for screen capture.**
Kotlin `AudioRecord` → push PCM (float32) → a new push-based `AndroidAudioEngine`
mirroring `mediaprojection_engine.py`, registered in `core/audio/__init__.py`, feeding
the existing `AudioAnalyzer` unchanged. Mic (`AudioSource.MIC`) is the fallback.
- **Effort:** moderate. **Value:** high — music/sound-reactive lighting is a flagship use
on a TV box. **No new Python deps.**
- ⚠️ DRM-protected apps (Netflix etc.) opt out of playback capture; works for non-DRM
media and the device's own audio. Root mode (no MediaProjection) → mic-only.
- 📄 **See `android-audio-capture-plan.md`** for the full implementation plan.
### 🔔 Notification capture — **FEASIBLE, HIGH VALUE** ⭐ (planned)
- **Android is the *best* platform for this:** `NotificationListenerService` is the native,
event-push mechanism (no polling).
- **Path:** a `NotificationListenerService` resolves the posting app's display label and
pushes it via a module-level `push_notification()` into the existing
`os_notification_listener.py` pipeline (a new push-based `_AndroidBackend` alongside
`_WindowsBackend`/`_LinuxBackend`). Existing `NotificationColorStripSource` filters,
per-app colors/sounds, and the history endpoint all work unchanged. **No new Python deps.**
- **Permission:** user enables "Notification access" in Settings (`ACTION_NOTIFICATION_LISTENER_SETTINGS`);
no runtime-permission popup.
- **Effort:** moderate. **Value:** high.
- 📄 **Plan approved & detailed** — see `C:\Users\Alexei\.claude\plans\deep-enchanting-muffin.md`
(app-name parity; prompt-once permission UX).
### 📷 Webcam capture — **FEASIBLE, LOW VALUE**
- **Blocker** is `opencv-python-headless` (no Chaquopy cp311 wheel) — but capture doesn't
*need* OpenCV. Use **CameraX / Camera2** + `ImageReader` in Kotlin and push frames through
the same bridge as MediaProjection into a new `CameraBridgeEngine`.
- **Effort:** moderate. **Value:** low — TVs rarely have cameras; USB-UVC webcams need extra
device handling. Recommend deferring unless a concrete use case appears.
### 🎮 GPU monitoring — **MARGINAL, SKIP FOR NOW**
- NVML is desktop-NVIDIA only. Android GPU load lives in **vendor-specific sysfs**
(Adreno `/sys/class/kgsl/kgsl-3d0/gpubusy`, Mali `/sys/class/devfreq/*.mali/...`),
inconsistent and often root-only.
- CPU/RAM/battery/thermal are **already** covered by `AndroidMetricsProvider`. A best-effort
GPU-load reader could be added to that provider, but reliability is poor and value is low.
### 🪟 Automation: window/process conditions — **PARTIAL**
- Android forbids full window/process enumeration (`getRunningTasks` restricted since API 21+).
- **Obtainable:** the *current foreground app package* via `UsageStatsManager` (needs the
`PACKAGE_USAGE_STATS` special access) or an `AccessibilityService`.
- So "when <app> is in the foreground → scene X" is feasible (mirrors
`automations/platform_detector.py`, which currently returns empty off-Windows); full
window-title matching is **not**. **Effort:** moderate. **Value:** moderate (per-app scenes
on a TV box).
### 📱 Capture from *another* Android phone (scrcpy/ADB) — **SKIP**
- Impractical and redundant: no `adb` binary in Chaquopy, TV boxes can't reliably host an
adb server, and the device already captures its **own** screen via MediaProjection.
### 🖥️ Monitor names / multi-display — **LOW VALUE**
- `DisplayManager` can report a better display name and enumerate secondary (HDMI) displays,
but MediaProjection captures the default display; capturing a secondary display is more
involved and rarely useful on a single-screen box.
---
## Prioritization
| Priority | Feature | Effort | Value | New Python deps | Status |
| -------- | ------- | ------ | ----- | --------------- | ------ |
| 1 | Notification capture | Moderate | High | None | **Plan approved** |
| 2 | Audio capture | Moderate | High | None | **Plan written** (this folder) |
| 3 | Automation: foreground-app condition | Moderate | Moderate | None | Idea |
| 4 | Webcam capture (CameraX) | Moderate | Low | None | Idea |
| — | GPU load (vendor sysfs) | LowMed | Low | None | Not recommended |
| — | Capture from another phone | — | — | — | Won't do |
| — | Multi-display / monitor names | Low | Low | None | Not recommended |
**Recommended order:** ship notifications → ship audio → reassess. Both reuse existing
infrastructure (bridge pattern, the MediaProjection consent token, the audio/notification
pipelines) and add **zero** Python dependencies, so neither risks the Chaquopy
`--no-deps` build constraint documented in `CLAUDE.md`.
## Cross-cutting notes
- **No `build.gradle.kts` / Chaquopy pip impact** for notifications or audio — both use Android
platform APIs (Kotlin) + stdlib/`numpy` (already bundled) on the Python side.
- **Per-instance `PythonBridge`:** `PythonBridge` is created per `CaptureService` instance, so
system-bound services (e.g. a `NotificationListenerService`) call Python via the
process-global `Python.getInstance()` rather than borrowing that bridge.
- **Permissions are the recurring friction**, not the capture: audio needs `RECORD_AUDIO` +
(for playback capture) a MediaProjection token; notifications need the "Notification access"
settings toggle; foreground-app automation needs `PACKAGE_USAGE_STATS`.
+11 -10
View File
@@ -105,16 +105,17 @@ LedGrab runs as a desktop / server application:
### Feature support by OS
| Feature | Windows | Linux / macOS |
| ------- | ------- | ------------- |
| Screen capture | DXCam, BetterCam, WGC, MSS | MSS |
| Webcam capture | OpenCV (DirectShow) | OpenCV (V4L2) |
| Audio capture | WASAPI, Sounddevice | Sounddevice (PulseAudio/PipeWire) |
| GPU monitoring | NVIDIA (nvidia-ml-py) | NVIDIA (nvidia-ml-py) |
| Capture from Android phone | scrcpy (ADB) | scrcpy (ADB) |
| Notification capture | WinRT | dbus (Linux) |
| Monitor names | Friendly names (WMI) | Generic ("Display 0") |
| Automation: window/process conditions | Supported | Partial |
| Feature | Windows | Linux / macOS | Android TV (experimental) |
| ------- | ------- | ------------- | ------------------------- |
| Screen capture | DXCam, BetterCam, WGC, MSS | MSS | MediaProjection; root `screenrecord` (rooted devices) |
| Webcam capture | OpenCV (DirectShow) | OpenCV (V4L2) | Camera2 (on-demand, while capture is running) |
| Audio capture | WASAPI, Sounddevice | Sounddevice (PulseAudio/PipeWire) | AudioPlaybackCapture (API 29+) |
| GPU monitoring | NVIDIA (nvidia-ml-py) | NVIDIA (nvidia-ml-py) | — (CPU/RAM/battery/thermal via `/proc`) |
| Capture from Android phone | scrcpy (ADB) | scrcpy (ADB) | — (captures its own screen instead) |
| Notification capture | WinRT | dbus (Linux) | NotificationListenerService |
| Monitor names | Friendly names (WMI) | Generic ("Display 0") | Single built-in display |
| LED transports | Network, USB-serial, BLE | Network, USB-serial, BLE | Network, USB-serial (Android driver), BLE (Android bridge) |
| Automation: window/process conditions | Supported | Partial | Foreground-app condition (UsageStatsManager) |
## Requirements
+57 -1
View File
@@ -35,6 +35,13 @@
<uses-permission android:name="android.permission.FOREGROUND_SERVICE" />
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION" />
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_SPECIAL_USE" />
<!-- FOREGROUND_SERVICE_CAMERA (API 34+): required to keep camera access while
the app is backgrounded during on-device webcam capture. The service is
promoted with the `camera` FGS type ONLY when CAMERA is already granted
(see CaptureService.onStartCommand) — unlike audio playback capture (which
rides the MediaProjection token under the mediaProjection type), the camera
has no such coupling and needs its own FGS type to survive backgrounding. -->
<uses-permission android:name="android.permission.FOREGROUND_SERVICE_CAMERA" />
<!-- POST_NOTIFICATIONS for Android 13+ foreground service notification -->
<uses-permission android:name="android.permission.POST_NOTIFICATIONS" />
@@ -47,6 +54,29 @@
only be required if the mic-fallback path ran inside the service). -->
<uses-permission android:name="android.permission.RECORD_AUDIO" />
<!-- CAMERA for on-device webcam capture (Camera2). Runtime "dangerous"
permission, requested in MainActivity gated on FEATURE_CAMERA_ANY so
camera-less TV boxes never see the prompt; capture degrades gracefully
when denied. The camera is opened ON DEMAND (only while a camera
capture source is active). To keep capturing after the app is
backgrounded, the service is promoted with the `camera` FGS type
(FOREGROUND_SERVICE_CAMERA above) — but only when CAMERA is already
granted, so a camera-less / not-yet-granted box never risks a failed
service start. -->
<uses-permission android:name="android.permission.CAMERA" />
<!-- PACKAGE_USAGE_STATS — read the foreground app for the "Application"
automation rule (foreground app -> activate scene) via UsageStatsManager.
A special-access permission: it can't be granted at runtime; the user
toggles it under Settings > Usage access (opened from MainActivity).
tools:ignore="ProtectedPermissions" silences the build warning that this
is a system/signature-level permission — it is honoured as a user-grantable
special access. NO QUERY_ALL_PACKAGES is needed: matching only compares the
foreground package NAME, and the app picker uses LauncherApps. -->
<uses-permission
android:name="android.permission.PACKAGE_USAGE_STATS"
tools:ignore="ProtectedPermissions" />
<!-- Autostart on boot — BootReceiver spawns CaptureService in root
mode so capture resumes without the user touching the remote. -->
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
@@ -71,6 +101,15 @@
android:name="android.hardware.usb.host"
android:required="false" />
<!-- Camera hardware — for on-device webcam capture. required=false so
camera-less TV boxes (the common case) still install; the camera
engine simply reports no displays on such devices. camera.any covers
built-in (front/back) and external/USB-UVC cameras the platform
routes through Camera2. -->
<uses-feature
android:name="android.hardware.camera.any"
android:required="false" />
<application
android:name=".LedGrabApp"
android:allowBackup="false"
@@ -103,13 +142,30 @@
PROPERTY_SPECIAL_USE_FGS_SUBTYPE rationale below. -->
<service
android:name=".CaptureService"
android:foregroundServiceType="mediaProjection|specialUse"
android:foregroundServiceType="mediaProjection|specialUse|camera"
android:exported="false">
<property
android:name="android.app.PROPERTY_SPECIAL_USE_FGS_SUBTYPE"
android:value="Root-mode screen capture for ambient LED sync. Uses /system/bin/screenrecord on rooted devices to avoid MediaProjection's persistent capture indicator overlay, which is required for the always-on ambient-lighting use case." />
</service>
<!-- Notification capture — a NotificationListenerService bound by
system_server. exported="true" is REQUIRED here (the system binds
it cross-process) and intentionally diverges from CaptureService
(exported="false"); access is gated by the system-held
BIND_NOTIFICATION_LISTENER_SERVICE permission, so no new
<uses-permission> is needed. The user grants access via
Settings > Notification access (opened from MainActivity). -->
<service
android:name=".LedGrabNotificationListener"
android:label="@string/notification_listener_label"
android:exported="true"
android:permission="android.permission.BIND_NOTIFICATION_LISTENER_SERVICE">
<intent-filter>
<action android:name="android.service.notification.NotificationListenerService" />
</intent-filter>
</service>
<!-- Autostart — fires on device boot (and package replace).
On rooted devices, launches CaptureService directly so capture
resumes without the user tapping Start. Unrooted devices are
@@ -0,0 +1,411 @@
package com.ledgrab.android
import android.Manifest
import android.annotation.SuppressLint
import android.content.Context
import android.content.pm.PackageManager
import android.graphics.ImageFormat
import android.hardware.camera2.CameraCaptureSession
import android.hardware.camera2.CameraCharacteristics
import android.hardware.camera2.CameraDevice
import android.hardware.camera2.CameraManager
import android.media.Image
import android.media.ImageReader
import android.os.Handler
import android.os.HandlerThread
import android.os.SystemClock
import android.util.Log
import android.util.Size
import android.view.Surface
import com.chaquo.python.PyObject
import com.chaquo.python.Python
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeout
import org.json.JSONArray
import org.json.JSONObject
/**
* Android camera bridge exposed to the Python server via Chaquopy.
*
* Wraps the Camera2 API into synchronous, blocking calls that can be
* invoked from a Python thread (Chaquopy proxy threads are real OS
* threads). The physical camera is opened **on demand** — Python's
* `android_camera_engine` calls [startCamera] when a capture stream
* initializes and [stopCamera] when it cleans up, so the camera-in-use
* indicator and battery cost are limited to actual use.
*
* Each captured frame is converted YUV_420_888 → RGB and pushed to the
* Python engine's `push_frame`, mirroring how [ScreenCapture] feeds
* `mediaprojection_engine`. Camera2 callbacks run on a private
* [HandlerThread] so they never touch the main looper.
*
* Python callers access the singleton via
* `jclass("com.ledgrab.android.CameraBridge").INSTANCE` — see
* `server/src/ledgrab/core/capture_engines/android_camera_engine.py`.
*/
object CameraBridge {
private const val TAG = "CameraBridge"
private const val ENGINE_MODULE = "ledgrab.core.capture_engines.android_camera_engine"
private const val OPEN_TIMEOUT_MS = 8_000L
private const val MAX_IMAGES = 2
private const val TARGET_FPS = 20
// "auto" capture size — balanced for ambient LED sampling (the LED
// pipeline downscales anyway), kept modest so the per-frame YUV→RGB
// conversion stays cheap on low-end TV boxes.
private const val DEFAULT_W = 1280
private const val DEFAULT_H = 720
private const val BYTES_PER_RGB = 3
@Volatile private var appContext: Context? = null
// Dedicated looper thread so Camera2 callbacks don't land on main.
private val camThread = HandlerThread("LedGrab-Camera").also { it.start() }
private val camHandler = Handler(camThread.looper)
// Active session state — guarded by [lock]. One camera at a time.
private val lock = Any()
private var cameraDevice: CameraDevice? = null
private var captureSession: CameraCaptureSession? = null
private var imageReader: ImageReader? = null
@Volatile private var running = false
private var activeIndex = -1
// Cached Python engine module handle for the per-frame push fast path.
@Volatile private var engineModule: PyObject? = null
// Reusable conversion buffers — sized once per session (output size is
// fixed for the session), reused to avoid per-frame GC churn on TV boxes.
private var rgbBuffer: ByteArray? = null
private var yBuf: ByteArray? = null
private var uBuf: ByteArray? = null
private var vBuf: ByteArray? = null
// Monotonic frame pacing (mirrors ScreenCapture's accumulator).
private val frameIntervalNanos = 1_000_000_000L / TARGET_FPS.coerceAtLeast(1)
private var nextFrameNanos = 0L
/** Called once from [LedGrabApp.onCreate] to bind the application context. */
@JvmStatic
fun init(context: Context) {
appContext = context.applicationContext
}
/**
* Enumerate cameras as a JSON array string the Python engine parses:
* `[{"index":0,"name":"Back camera","facing":"back","cameraId":"0"}, ...]`
*
* Indices are stable (positional in [CameraManager.cameraIdList]) so
* Python's `display_index` maps 1:1 to [startCamera]'s `index`.
* Enumeration needs no CAMERA permission. Returns `[]` on any error.
*/
@JvmStatic
fun listCameras(): String {
val arr = JSONArray()
val ctx = appContext
if (ctx == null) {
Log.w(TAG, "listCameras: context not bound (init not called)")
return arr.toString()
}
try {
val mgr = ctx.getSystemService(Context.CAMERA_SERVICE) as CameraManager
mgr.cameraIdList.forEachIndexed { idx, id ->
val facing = facingOf(mgr, id)
val name = when (facing) {
"front" -> "Front camera"
"back" -> "Back camera"
"external" -> "External camera $idx"
else -> "Camera $idx"
}
arr.put(
JSONObject()
.put("index", idx)
.put("name", name)
.put("facing", facing)
.put("cameraId", id),
)
}
} catch (e: Exception) {
Log.w(TAG, "listCameras failed: ${e.message}")
}
return arr.toString()
}
/**
* Open camera [index] and start streaming RGB frames to Python.
* Blocks until the capture session is configured (or fails/times out).
*
* Returns false — without throwing across the JNI boundary — when the
* CAMERA permission is missing, the index is out of range, or the
* device/session fails to configure. Closes any previously-open camera
* first (one active at a time).
*/
@SuppressLint("MissingPermission")
@JvmStatic
fun startCamera(index: Int, width: Int, height: Int): Boolean {
synchronized(lock) {
closeLocked()
val ctx = appContext ?: run {
Log.w(TAG, "startCamera: context not bound")
return false
}
if (ctx.checkSelfPermission(Manifest.permission.CAMERA)
!= PackageManager.PERMISSION_GRANTED
) {
Log.w(TAG, "startCamera: CAMERA permission not granted")
return false
}
val mgr = ctx.getSystemService(Context.CAMERA_SERVICE) as CameraManager
val ids = try {
mgr.cameraIdList
} catch (e: Exception) {
Log.w(TAG, "startCamera: cameraIdList failed: ${e.message}")
return false
}
if (index < 0 || index >= ids.size) {
Log.w(TAG, "startCamera: index $index out of range (${ids.size} cameras)")
return false
}
val cameraId = ids[index]
val size = chooseSize(mgr, cameraId, width, height) ?: run {
Log.w(TAG, "startCamera: no YUV output sizes for camera $index")
return false
}
val reader = ImageReader.newInstance(
size.width, size.height, ImageFormat.YUV_420_888, MAX_IMAGES,
)
// Size the conversion buffers once for this session.
rgbBuffer = ByteArray(size.width * size.height * BYTES_PER_RGB)
yBuf = null; uBuf = null; vBuf = null
nextFrameNanos = SystemClock.elapsedRealtimeNanos()
reader.setOnImageAvailableListener({ r -> onFrame(r) }, camHandler)
return try {
runBlocking {
withTimeout(OPEN_TIMEOUT_MS) {
// Publish each resource to its field as soon as it exists so
// closeLocked() (in the catch) can release it if a LATER step
// throws. Assigning only after setRepeatingRequest succeeds
// would orphan the opened CameraDevice on a createSession /
// setRepeatingRequest failure (camera stuck on; subsequent
// opens fail with CAMERA_IN_USE).
imageReader = reader
val device = openCamera(mgr, cameraId)
cameraDevice = device
val session = createSession(device, reader.surface)
captureSession = session
val request = device.createCaptureRequest(CameraDevice.TEMPLATE_PREVIEW)
.apply { addTarget(reader.surface) }
.build()
session.setRepeatingRequest(request, null, camHandler)
activeIndex = index
running = true
Log.i(TAG, "Camera $index opened (${size.width}x${size.height} @ ${TARGET_FPS}fps)")
true
}
}
} catch (e: Exception) {
Log.e(TAG, "startCamera($index) failed: ${e.message}")
// imageReader/cameraDevice/captureSession are now whatever got
// assigned before the failure — closeLocked releases each exactly
// once (idempotent, runCatching-wrapped).
closeLocked()
false
}
}
}
/** Stop streaming and release the camera. Idempotent; safe if not started. */
@JvmStatic
fun stopCamera() {
synchronized(lock) { closeLocked() }
Log.i(TAG, "Camera stopped")
}
// ── internals ────────────────────────────────────────────────────────
private fun facingOf(mgr: CameraManager, id: String): String =
when (mgr.getCameraCharacteristics(id).get(CameraCharacteristics.LENS_FACING)) {
CameraCharacteristics.LENS_FACING_FRONT -> "front"
CameraCharacteristics.LENS_FACING_BACK -> "back"
CameraCharacteristics.LENS_FACING_EXTERNAL -> "external"
else -> "unknown"
}
/** Pick the supported YUV size closest in area to the request (or the
* balanced default for `auto`/0). */
private fun chooseSize(mgr: CameraManager, cameraId: String, reqW: Int, reqH: Int): Size? {
val map = mgr.getCameraCharacteristics(cameraId)
.get(CameraCharacteristics.SCALER_STREAM_CONFIGURATION_MAP) ?: return null
val sizes = map.getOutputSizes(ImageFormat.YUV_420_888)
if (sizes == null || sizes.isEmpty()) return null
val targetArea = (if (reqW > 0) reqW else DEFAULT_W).toLong() *
(if (reqH > 0) reqH else DEFAULT_H)
return sizes.minByOrNull { kotlin.math.abs(it.width.toLong() * it.height - targetArea) }
}
@SuppressLint("MissingPermission")
private suspend fun openCamera(mgr: CameraManager, cameraId: String): CameraDevice =
suspendCancellableCoroutine { cont ->
mgr.openCamera(cameraId, object : CameraDevice.StateCallback() {
override fun onOpened(device: CameraDevice) {
if (cont.isActive) cont.resume(device) else device.close()
}
override fun onDisconnected(device: CameraDevice) {
device.close()
if (cont.isActive) cont.resumeWithException(IllegalStateException("camera disconnected"))
}
override fun onError(device: CameraDevice, error: Int) {
device.close()
if (cont.isActive) cont.resumeWithException(IllegalStateException("camera error $error"))
}
}, camHandler)
}
@Suppress("DEPRECATION")
private suspend fun createSession(device: CameraDevice, surface: Surface): CameraCaptureSession =
suspendCancellableCoroutine { cont ->
// createCaptureSession(List, callback, handler) is deprecated at
// API 30 but is the correct API down to minSdk 24 (the
// SessionConfiguration overload is API 28+).
device.createCaptureSession(
listOf(surface),
object : CameraCaptureSession.StateCallback() {
override fun onConfigured(session: CameraCaptureSession) {
if (cont.isActive) cont.resume(session)
}
override fun onConfigureFailed(session: CameraCaptureSession) {
if (cont.isActive) cont.resumeWithException(IllegalStateException("session configure failed"))
}
},
camHandler,
)
}
/** ImageReader callback — paced, converts YUV→RGB, pushes to Python. */
private fun onFrame(reader: ImageReader) {
if (!running) {
runCatching { reader.acquireLatestImage()?.close() }
return
}
val now = SystemClock.elapsedRealtimeNanos()
if (now < nextFrameNanos) {
runCatching { reader.acquireLatestImage()?.close() }
return
}
val image = runCatching { reader.acquireLatestImage() }.getOrNull() ?: return
try {
val w = image.width
val h = image.height
val out = ensureRgbBuffer(w * h * BYTES_PER_RGB)
yuv420ToRgb(image, out, w, h)
pushFrame(out, w, h)
nextFrameNanos += frameIntervalNanos
if (now - nextFrameNanos > frameIntervalNanos * 4) {
nextFrameNanos = now + frameIntervalNanos
}
} catch (e: Exception) {
Log.w(TAG, "frame processing error: ${e.message}")
} finally {
runCatching { image.close() }
}
}
private fun ensureRgbBuffer(size: Int): ByteArray {
val buf = rgbBuffer
if (buf != null && buf.size == size) return buf
return ByteArray(size).also { rgbBuffer = it }
}
/**
* Stride-aware YUV_420_888 → packed RGB (3 bytes/px) using BT.601
* fixed-point coefficients. Handles both planar and semi-planar
* (NV21-like, pixelStride 2) chroma layouts via the plane strides.
*/
private fun yuv420ToRgb(image: Image, out: ByteArray, width: Int, height: Int) {
val planes = image.planes
val yPlane = planes[0]
val uPlane = planes[1]
val vPlane = planes[2]
val yRowStride = yPlane.rowStride
val yPixStride = yPlane.pixelStride
val uRowStride = uPlane.rowStride
val uPixStride = uPlane.pixelStride
val vRowStride = vPlane.rowStride
val vPixStride = vPlane.pixelStride
// Copy each plane to a reusable array for fast indexed access
// (ByteBuffer absolute-get per pixel is far slower).
val yByteBuf = yPlane.buffer
val uByteBuf = uPlane.buffer
val vByteBuf = vPlane.buffer
val yArr = ensurePlane(yBuf, yByteBuf.remaining()).also { yBuf = it }
val uArr = ensurePlane(uBuf, uByteBuf.remaining()).also { uBuf = it }
val vArr = ensurePlane(vBuf, vByteBuf.remaining()).also { vBuf = it }
yByteBuf.get(yArr, 0, yArr.size)
uByteBuf.get(uArr, 0, uArr.size)
vByteBuf.get(vArr, 0, vArr.size)
var o = 0
for (row in 0 until height) {
val yRowBase = row * yRowStride
val uvRow = row shr 1
val uRowBase = uvRow * uRowStride
val vRowBase = uvRow * vRowStride
for (col in 0 until width) {
val y = (yArr[yRowBase + col * yPixStride].toInt() and 0xFF)
val uvCol = col shr 1
val u = (uArr[uRowBase + uvCol * uPixStride].toInt() and 0xFF) - 128
val v = (vArr[vRowBase + uvCol * vPixStride].toInt() and 0xFF) - 128
// BT.601 full-range, fixed-point (<<16).
var r = y + ((91881 * v) shr 16)
var g = y - ((22554 * u + 46802 * v) shr 16)
var b = y + ((116130 * u) shr 16)
if (r < 0) r = 0 else if (r > 255) r = 255
if (g < 0) g = 0 else if (g > 255) g = 255
if (b < 0) b = 0 else if (b > 255) b = 255
out[o++] = r.toByte()
out[o++] = g.toByte()
out[o++] = b.toByte()
}
}
}
/** Return [cached] if it already fits [n] bytes, else a fresh array. */
private fun ensurePlane(cached: ByteArray?, n: Int): ByteArray =
if (cached != null && cached.size == n) cached else ByteArray(n)
private fun pushFrame(rgb: ByteArray, width: Int, height: Int) {
val module = engineModule ?: runCatching {
Python.getInstance().getModule(ENGINE_MODULE)
}.getOrNull()?.also { engineModule = it } ?: return
try {
module.callAttr("push_frame", rgb, width, height)
} catch (e: Exception) {
Log.w(TAG, "push_frame failed: ${e.message}")
}
}
/** Tear down the active session. Caller holds [lock]. */
private fun closeLocked() {
running = false
activeIndex = -1
runCatching { imageReader?.setOnImageAvailableListener(null, null) }
runCatching { captureSession?.stopRepeating() }
runCatching { captureSession?.close() }
captureSession = null
runCatching { cameraDevice?.close() }
cameraDevice = null
runCatching { imageReader?.close() }
imageReader = null
}
}
@@ -113,11 +113,25 @@ class CaptureService : Service() {
val url = "http://$localIp:$SERVER_PORT"
try {
val type = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
if (useRoot) {
var t = if (useRoot) {
ServiceInfo.FOREGROUND_SERVICE_TYPE_SPECIAL_USE
} else {
ServiceInfo.FOREGROUND_SERVICE_TYPE_MEDIA_PROJECTION
}
// On-demand webcam capture opens the camera from this service.
// To retain camera access once the app is backgrounded (the
// always-on ambient-lighting case), API 34+ requires the camera
// FGS type. Add it ONLY when CAMERA is already granted — promoting
// with the camera type without the runtime permission throws and
// would kill the whole service on the (common) camera-less or
// not-yet-granted box. If CAMERA is granted later, it takes effect
// on the next Start (matches the audio/permission UX).
if (checkSelfPermission(Manifest.permission.CAMERA) ==
PackageManager.PERMISSION_GRANTED
) {
t = t or ServiceInfo.FOREGROUND_SERVICE_TYPE_CAMERA
}
t
} else {
0
}
@@ -0,0 +1,154 @@
package com.ledgrab.android
import android.app.AppOpsManager
import android.app.usage.UsageEvents
import android.app.usage.UsageStatsManager
import android.content.Context
import android.content.pm.LauncherApps
import android.os.Build
import android.os.Process
import android.util.Log
import org.json.JSONArray
import org.json.JSONObject
/**
* Foreground-app + installed-app bridge exposed to the Python server via Chaquopy.
*
* Backs the Android implementation of the "Application" automation rule
* (foreground app -> activate scene). Desktop detects the foreground process via
* Win32 ctypes in ``platform_detector.py``; Android has no such API, so this
* bridge wraps two in-platform services into synchronous calls a Python thread
* can invoke (Chaquopy proxy threads are real OS threads):
*
* - [getForegroundPackage] via [UsageStatsManager] (needs PACKAGE_USAGE_STATS,
* a special-access permission granted from Settings — see MainActivity).
* - [listLaunchableApps] via [LauncherApps] for the automation editor's app
* picker (no QUERY_ALL_PACKAGES needed — getActivityList is the sanctioned
* launchable-app enumeration API).
* - [hasUsageAccess] so the server / UI can detect the missing grant.
*
* Detection only ever string-compares the foreground *package name*, so no label
* resolution / package visibility is required at match time.
*
* Python callers access the singleton via
* `jclass("com.ledgrab.android.ForegroundAppBridge").INSTANCE` — see
* `server/src/ledgrab/core/automations/platform_detector.py`.
*/
object ForegroundAppBridge {
private const val TAG = "ForegroundAppBridge"
// Trailing window for queryEvents. queryEvents reports discrete foreground
// transitions (not "current app"), and events can lag a few seconds, so we
// look back far enough to reliably catch the latest MOVE_TO_FOREGROUND while
// staying recent enough not to report a stale app on the ~1s automation tick.
private const val WINDOW_MS = 10_000L
@Volatile private var appContext: Context? = null
/** Called once from [LedGrabApp.onCreate] to bind the application context. */
@JvmStatic
fun init(context: Context) {
appContext = context.applicationContext
}
/**
* Package name of the most recently foregrounded app, or null when none is
* found in the trailing window, Usage Access is not granted, or on any error.
* Never throws across the JNI boundary.
*/
@JvmStatic
fun getForegroundPackage(): String? {
val ctx = appContext ?: run {
Log.w(TAG, "getForegroundPackage: context not bound (init not called)")
return null
}
return try {
val usm = ctx.getSystemService(Context.USAGE_STATS_SERVICE) as? UsageStatsManager
?: return null
val end = System.currentTimeMillis()
val events = usm.queryEvents(end - WINDOW_MS, end)
val event = UsageEvents.Event()
var latestPkg: String? = null
var latestTs = Long.MIN_VALUE
while (events.hasNextEvent()) {
events.getNextEvent(event)
// ACTIVITY_RESUMED (API 29+) shares the value of the legacy
// MOVE_TO_FOREGROUND constant, so the single check covers both.
// >= (not >) so that on an exact-timestamp tie the later-iterated
// event wins — events arrive chronologically, so that is the most
// recent foreground transition.
if (event.eventType == UsageEvents.Event.MOVE_TO_FOREGROUND &&
event.timeStamp >= latestTs
) {
latestTs = event.timeStamp
latestPkg = event.packageName
}
}
latestPkg
} catch (e: Exception) {
// SecurityException when access is missing, plus any service error.
Log.w(TAG, "getForegroundPackage failed: ${e.message}")
null
}
}
/** Whether the user has granted Usage Access (PACKAGE_USAGE_STATS) to this app. */
@JvmStatic
fun hasUsageAccess(): Boolean {
val ctx = appContext ?: return false
return try {
val appOps = ctx.getSystemService(Context.APP_OPS_SERVICE) as? AppOpsManager
?: return false
val mode = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
appOps.unsafeCheckOpNoThrow(
AppOpsManager.OPSTR_GET_USAGE_STATS, Process.myUid(), ctx.packageName,
)
} else {
@Suppress("DEPRECATION")
appOps.checkOpNoThrow(
AppOpsManager.OPSTR_GET_USAGE_STATS, Process.myUid(), ctx.packageName,
)
}
mode == AppOpsManager.MODE_ALLOWED
} catch (e: Exception) {
Log.w(TAG, "hasUsageAccess failed: ${e.message}")
false
}
}
/**
* Launchable apps as a JSON array string the Python server parses:
* `[{"package":"com.netflix.mediaclient","label":"Netflix"}, ...]`
*
* Uses [LauncherApps.getActivityList] (launcher + leanback launchables) —
* no QUERY_ALL_PACKAGES. De-duplicated by package, sorted by label.
* Returns `[]` on any error.
*/
@JvmStatic
fun listLaunchableApps(): String {
val arr = JSONArray()
val ctx = appContext ?: run {
Log.w(TAG, "listLaunchableApps: context not bound (init not called)")
return arr.toString()
}
try {
val launcher = ctx.getSystemService(Context.LAUNCHER_APPS_SERVICE) as? LauncherApps
?: return arr.toString()
val seen = HashSet<String>()
val items = ArrayList<Pair<String, String>>()
for (info in launcher.getActivityList(null, Process.myUserHandle())) {
val pkg = info.applicationInfo?.packageName ?: continue
if (!seen.add(pkg)) continue
val label = info.label?.toString().takeUnless { it.isNullOrBlank() } ?: pkg
items.add(pkg to label)
}
items.sortBy { it.second.lowercase() }
for ((pkg, label) in items) {
arr.put(JSONObject().put("package", pkg).put("label", label))
}
} catch (e: Exception) {
Log.w(TAG, "listLaunchableApps failed: ${e.message}")
}
return arr.toString()
}
}
@@ -51,6 +51,13 @@ class LedGrabApp : Application() {
// Bind application context for the BLE bridge so Python can
// scan and connect to BLE LED controllers.
BleBridge.init(this)
// Bind application context for the camera bridge so Python can
// enumerate cameras and open them on demand (webcam capture).
CameraBridge.init(this)
// Bind application context for the foreground-app bridge so Python can
// detect the foreground app (Application automation rule) and list
// launchable apps for the editor's picker.
ForegroundAppBridge.init(this)
// Pre-warm the API key on a background thread. First-launch
// generation does a SharedPreferences.commit() (synchronous
@@ -0,0 +1,97 @@
package com.ledgrab.android
import android.app.Notification
import android.service.notification.NotificationListenerService
import android.service.notification.StatusBarNotification
import android.util.Log
import com.chaquo.python.Python
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
/**
* Captures posted OS notifications and forwards the posting app's display
* label to the Python notification pipeline, where the existing
* `NotificationColorStripSource` fires its one-shot LED effect.
*
* Direction is Kotlin -> Python via the process-global Chaquopy instance
* (NOT a per-[CaptureService] [PythonBridge]): `system_server` binds this
* service independently of [CaptureService], so it resolves Python itself.
* The Python receiver (`os_notification_listener.push_notification`) is a
* no-op whenever the server/listener isn't running, so a notification
* arriving before — or after — a capture session is safely ignored.
*/
class LedGrabNotificationListener : NotificationListenerService() {
// Serial executor: the Python receiver does a (non-concurrency-safe) history
// disk write and may play a sound, so pushes must not overlap. Off the main
// looper to keep the system service responsive.
private val pushExecutor = Executors.newSingleThreadExecutor()
// packageName -> resolved human-readable label. Matches the app_name the
// Windows/Linux backends pass, so per-app colors/filters keep working.
// Naturally bounded by the number of notification-posting apps (tens) and
// cleared with the process — no eviction needed.
private val labelCache = ConcurrentHashMap<String, String>()
override fun onNotificationPosted(sbn: StatusBarNotification?) {
val notification = sbn ?: return
// The Python server (and thus the listener) only exists during a capture
// session. isRunning is a coarse early-out — the authoritative gate is the
// Python receiver's None-check — but it avoids needless JNI churn here.
if (!CaptureService.isRunning) return
// Filter notifications that should never drive an effect:
// - ongoing (media transport, downloads): not user-facing "alerts"
// - group summaries: duplicate their child notifications
// - our own foreground-service notification: would self-trigger
if (notification.isOngoing) return
if ((notification.notification.flags and Notification.FLAG_GROUP_SUMMARY) != 0) return
if (notification.packageName == packageName) return
val label = resolveAppLabel(notification.packageName)
pushExecutor.execute {
try {
Python.getInstance()
.getModule(PY_MODULE)
.callAttr("push_notification", label)
} catch (t: Throwable) {
// Never crash a system-bound service. Python.getInstance() throws
// IllegalStateException if Python.start() hasn't run (e.g. the
// service was bound at boot before the app process initialized).
// Log at debug — the label is potentially sensitive on a shared TV.
Log.d(TAG, "push_notification failed: ${t.message}")
}
}
}
/** Resolve (and cache) a package's human-readable label; fall back to the package name. */
private fun resolveAppLabel(pkg: String): String {
labelCache[pkg]?.let { return it }
val resolved = runCatching {
val info = packageManager.getApplicationInfo(pkg, 0)
packageManager.getApplicationLabel(info).toString()
}.getOrDefault(pkg)
labelCache[pkg] = resolved
return resolved
}
override fun onListenerConnected() {
Log.i(TAG, "Notification listener connected")
}
override fun onListenerDisconnected() {
Log.i(TAG, "Notification listener disconnected")
}
override fun onDestroy() {
pushExecutor.shutdown()
super.onDestroy()
}
companion object {
private const val TAG = "LedGrabNotifListener"
private const val PY_MODULE = "ledgrab.core.processing.os_notification_listener"
}
}
@@ -24,6 +24,7 @@ import android.widget.ImageView
import android.widget.LinearLayout
import android.widget.ScrollView
import android.widget.TextView
import androidx.core.app.NotificationManagerCompat
import androidx.core.content.ContextCompat
import androidx.core.splashscreen.SplashScreen.Companion.installSplashScreen
import com.google.zxing.BarcodeFormat
@@ -54,7 +55,10 @@ class MainActivity : Activity() {
private const val REQUEST_MEDIA_PROJECTION = 1001
private const val REQUEST_POST_NOTIFICATIONS = 1002
private const val REQUEST_RECORD_AUDIO = 1003
private const val REQUEST_CAMERA = 1004
private const val QR_SIZE_PX = 560
private const val NOTIF_PREFS = "ledgrab_notif"
private const val KEY_NOTIF_ACCESS_PROMPTED = "notif_access_prompted"
}
// Stopped-state views (always inflated).
@@ -64,6 +68,8 @@ class MainActivity : Activity() {
private lateinit var versionText: TextView
private lateinit var autostartCheck: CheckBox
private lateinit var autostartPrefs: AutostartPrefs
private lateinit var grantNotificationButton: Button
private lateinit var grantUsageAccessButton: Button
// Running-state views (lazy-inflated via ViewStub).
private lateinit var runningPanelStub: ViewStub
@@ -107,6 +113,8 @@ class MainActivity : Activity() {
toggleButton = findViewById(R.id.toggle_button)
versionText = findViewById(R.id.version_text)
autostartCheck = findViewById(R.id.autostart_check)
grantNotificationButton = findViewById(R.id.grant_notification_button)
grantUsageAccessButton = findViewById(R.id.grant_usage_access_button)
val versionName = packageManager.getPackageInfo(packageName, 0).versionName
versionText.text = getString(R.string.version_prefix, versionName ?: "?")
@@ -127,8 +135,11 @@ class MainActivity : Activity() {
autostartCheck.visibility = View.GONE
}
grantNotificationButton.setOnClickListener { openNotificationListenerSettings() }
grantUsageAccessButton.setOnClickListener { openUsageAccessSettings() }
toggleButton.setOnClickListener { startCapture() }
updateStoppedPermissionButtons()
updateUI()
}
@@ -149,12 +160,16 @@ class MainActivity : Activity() {
override fun onResume() {
super.onResume()
if (!::stoppedPanel.isInitialized) return
// Restart the pulse if we returned to the foreground while the
// service is still running. The running panel's view may have
// been recreated; ensureRunningPanelInflated already keys off
// the field reference.
if (CaptureService.isRunning && ::stoppedPanel.isInitialized) {
// service is still running. The running panel's view may have been
// recreated; ensureRunningPanelInflated already keys off the field
// reference. When stopped, refresh the notification-access button —
// the user may have just granted/revoked access in Settings.
if (CaptureService.isRunning) {
updateUI()
} else {
updateStoppedPermissionButtons()
}
}
@@ -197,6 +212,8 @@ class MainActivity : Activity() {
private fun startRootCaptureService() {
ensureNotificationPermission()
ensureNotificationListenerAccess()
ensureCameraPermission()
ContextCompat.startForegroundService(this, CaptureService.createRootIntent(this))
updateUI()
}
@@ -216,7 +233,9 @@ class MainActivity : Activity() {
private fun startCaptureService(resultCode: Int, resultData: Intent) {
ensureNotificationPermission()
ensureNotificationListenerAccess()
ensureAudioPermission()
ensureCameraPermission()
val intent = CaptureService.createIntent(this, resultCode, resultData)
ContextCompat.startForegroundService(this, intent)
updateUI()
@@ -493,4 +512,108 @@ class MainActivity : Activity() {
)
}
}
/**
* Request CAMERA so the capture service can open the device camera for
* on-device webcam capture. Fire-and-forget, like [ensureAudioPermission]:
* capture still works without it (just no camera engine), so we don't block
* on the result. Gated on actual camera hardware via FEATURE_CAMERA_ANY so
* camera-less TV boxes (the common case) never see the prompt. The camera
* is opened on demand only while a camera source is active — granting this
* does not keep the camera on. If first granted here, the camera engine
* becomes available on the next Start.
*/
private fun ensureCameraPermission() {
if (!packageManager.hasSystemFeature(PackageManager.FEATURE_CAMERA_ANY)) return
if (checkSelfPermission(Manifest.permission.CAMERA)
!= PackageManager.PERMISSION_GRANTED
) {
@Suppress("DEPRECATION")
requestPermissions(
arrayOf(Manifest.permission.CAMERA),
REQUEST_CAMERA,
)
}
}
/** Whether the user has granted notification-listener access to this app. */
private fun isNotificationAccessGranted(): Boolean =
NotificationManagerCompat.getEnabledListenerPackages(this).contains(packageName)
/** Open the system Notification-access screen (manual affordance / re-grant). */
private fun openNotificationListenerSettings() {
runCatching {
startActivity(Intent(Settings.ACTION_NOTIFICATION_LISTENER_SETTINGS))
}.onFailure { Log.w(TAG, "Notification-access settings unavailable: ${it.message}") }
}
/**
* Whether Usage Access (PACKAGE_USAGE_STATS) is granted — needed by the
* foreground-app automation rule. Delegates to the bridge's AppOps check.
*/
private fun isUsageAccessGranted(): Boolean = ForegroundAppBridge.hasUsageAccess()
/**
* Open the system Usage-Access screen so the user can grant LedGrab access
* for the foreground-app automation rule. Falls back to the generic Settings
* screen on TV-box OEM builds that strip the dedicated intent.
*/
private fun openUsageAccessSettings() {
runCatching {
startActivity(Intent(Settings.ACTION_USAGE_ACCESS_SETTINGS))
}.onFailure {
Log.w(TAG, "Usage-access settings unavailable: ${it.message}")
runCatching { startActivity(Intent(Settings.ACTION_SETTINGS)) }
}
}
/**
* Prompt-once-then-remember: the first time capture starts without
* notification-listener access, open the settings screen so the user can
* grant it — then never nag again (the manual "Grant notification access"
* button stays available). Fire-and-forget like [ensureNotificationPermission].
*/
private fun ensureNotificationListenerAccess() {
if (isNotificationAccessGranted()) return
val prefs = getSharedPreferences(NOTIF_PREFS, MODE_PRIVATE)
if (prefs.getBoolean(KEY_NOTIF_ACCESS_PROMPTED, false)) return
prefs.edit().putBoolean(KEY_NOTIF_ACCESS_PROMPTED, true).apply()
openNotificationListenerSettings()
}
/**
* Show each "Grant <permission> access" button only while that access is
* missing, then re-wire the D-pad focus chain. Called on create and on resume
* (access can change in Settings while we're backgrounded). The usage-access
* button is a passive affordance (no auto-prompt at capture start) — the
* primary guidance is the web-UI banner when an Android app rule needs it.
*/
private fun updateStoppedPermissionButtons() {
if (!::grantNotificationButton.isInitialized) return
grantNotificationButton.visibility =
if (isNotificationAccessGranted()) View.GONE else View.VISIBLE
grantUsageAccessButton.visibility =
if (isUsageAccessGranted()) View.GONE else View.VISIBLE
wireStoppedFocusChain()
}
/**
* Link the visible stopped-panel controls into a single up/down D-pad chain.
* The optional controls (the grant-access buttons and the root-only autostart
* checkbox) may be GONE, so the chain is computed from whatever is visible —
* a static nextFocus pointing at a GONE view would strand the focus on a TV
* remote.
*/
private fun wireStoppedFocusChain() {
val chain = listOfNotNull(
toggleButton,
grantNotificationButton.takeIf { it.visibility == View.VISIBLE },
grantUsageAccessButton.takeIf { it.visibility == View.VISIBLE },
autostartCheck.takeIf { it.visibility == View.VISIBLE },
)
chain.forEachIndexed { i, view ->
view.nextFocusUpId = (chain.getOrNull(i - 1) ?: view).id
view.nextFocusDownId = (chain.getOrNull(i + 1) ?: view).id
}
}
}
@@ -66,6 +66,36 @@
android:focusableInTouchMode="true"
android:nextFocusDown="@+id/autostart_check" />
<!-- Shown only while notification-listener access is missing. The D-pad
focus chain is wired at runtime (wireStoppedFocusChain) because this
button and the autostart checkbox are both conditionally visible. -->
<Button
android:id="@+id/grant_notification_button"
style="@style/Widget.LedGrab.Button.Secondary"
android:layout_width="320dp"
android:layout_height="56dp"
android:layout_marginTop="20dp"
android:text="@string/btn_grant_notification_access"
android:textSize="18sp"
android:focusable="true"
android:focusableInTouchMode="true"
android:visibility="gone" />
<!-- Shown only while Usage Access is missing (needed by the foreground-app
automation rule). Like the grant-notification button, its D-pad focus
chain is wired at runtime (wireStoppedFocusChain). -->
<Button
android:id="@+id/grant_usage_access_button"
style="@style/Widget.LedGrab.Button.Secondary"
android:layout_width="320dp"
android:layout_height="56dp"
android:layout_marginTop="20dp"
android:text="@string/btn_grant_usage_access"
android:textSize="18sp"
android:focusable="true"
android:focusableInTouchMode="true"
android:visibility="gone" />
<CheckBox
android:id="@+id/autostart_check"
android:layout_width="wrap_content"
@@ -25,4 +25,7 @@
<string name="notification_channel_description">Отображается, пока LedGrab захватывает экран.</string>
<string name="notification_title">LedGrab работает</string>
<string name="notification_text">Веб-интерфейс: %1$s</string>
<string name="notification_listener_label">Захват уведомлений LedGrab</string>
<string name="btn_grant_notification_access">Разрешить доступ к уведомлениям</string>
<string name="btn_grant_usage_access">Разрешить доступ к статистике использования</string>
</resources>
@@ -25,4 +25,7 @@
<string name="notification_channel_description">LedGrab 捕获屏幕时显示。</string>
<string name="notification_title">LedGrab 运行中</string>
<string name="notification_text">Web界面:%1$s</string>
<string name="notification_listener_label">LedGrab 通知捕获</string>
<string name="btn_grant_notification_access">授予通知访问权限</string>
<string name="btn_grant_usage_access">授予使用情况访问权限</string>
</resources>
@@ -25,4 +25,7 @@
<string name="notification_channel_description">Shows while LedGrab is capturing the screen.</string>
<string name="notification_title">LedGrab Running</string>
<string name="notification_text">Web UI: %1$s</string>
<string name="notification_listener_label">LedGrab notification capture</string>
<string name="btn_grant_notification_access">Grant notification access</string>
<string name="btn_grant_usage_access">Grant usage access</string>
</resources>
+5 -5
View File
@@ -15,11 +15,11 @@ auth:
# - LAN requests are REJECTED with 401 (security default)
# To enable LAN access, uncomment the example below and replace the value
# with a secret you generated yourself (e.g. `openssl rand -hex 32`).
# The previous default `dev: "development-key-change-in-production"` has
# been removed — it shipped as a publicly-known token and any deployment
# that still uses it grants full LAN access to anyone on the network.
api_keys:
dev: "development-key-change-in-production"
# Do NOT ship a hard-coded key here — a publicly-known token grants full
# LAN access to anyone on the network.
api_keys: {}
# api_keys:
# my-client: "replace-with-output-of-openssl-rand-hex-32"
# Storage paths default to ./data relative to the server's working directory.
# Set LEDGRAB_DATA_DIR in the environment to point at a different data root
+49
View File
@@ -39,8 +39,11 @@ from ledgrab.api.schemas.system import (
DisplayListResponse,
GpuInfo,
HealthResponse,
InstalledAppItem,
InstalledAppsResponse,
PerformanceResponse,
ProcessListResponse,
SystemInfoResponse,
VersionResponse,
)
from ledgrab.config import get_config, is_demo_mode
@@ -278,6 +281,52 @@ async def get_running_processes(_: AuthRequired):
raise HTTPException(status_code=500, detail="Internal server error")
@router.get(
"/api/v1/system/installed-apps",
response_model=InstalledAppsResponse,
tags=["Config"],
)
def get_installed_apps(_: AuthRequired):
"""List launchable apps for the application-rule app picker (Android only).
Returns launchable apps (package + human label) on Android, where the
foreground-app automation rule matches package names. Returns an empty list
on desktop, where the process picker (``/system/processes``) is used instead.
Sync ``def`` so FastAPI runs the (potentially blocking) bridge call in a
thread pool.
"""
from ledgrab.core.automations import platform_detector as pd
try:
apps = pd.list_installed_apps()
items = [InstalledAppItem(package=a["package"], label=a["label"]) for a in apps]
return InstalledAppsResponse(apps=items, count=len(items))
except Exception as e:
logger.error("Failed to list installed apps: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/api/v1/system/info", response_model=SystemInfoResponse, tags=["Info"])
def get_system_info(_: AuthRequired):
"""Platform capability signal for the automation editor.
Tells the frontend whether the server is on Android (so the application-rule
editor uses the launchable-app picker + package matching and surfaces the
Usage-Access banner) vs desktop (process picker + process names), and whether
Usage Access is currently granted. Sync ``def`` so the bridge call runs in a
thread pool.
"""
from ledgrab.core.automations import platform_detector as pd
from ledgrab.utils.platform import is_android
android = is_android()
return SystemInfoResponse(
is_android=android,
app_match_kind="package" if android else "process",
usage_access_granted=(pd.has_usage_access() if android else True),
)
@router.get(
"/api/v1/system/performance",
response_model=PerformanceResponse,
@@ -1,4 +1,4 @@
"""System routes: MQTT, external URL, ADB, logs WebSocket, log level.
"""System routes: external URL, shutdown action, ADB, logs WebSocket, log level.
Extracted from system.py to keep files under 800 lines.
"""
@@ -17,13 +17,10 @@ from ledgrab.api.schemas.system import (
ExternalUrlResponse,
LogLevelRequest,
LogLevelResponse,
MQTTSettingsRequest,
MQTTSettingsResponse,
ShutdownAction,
ShutdownActionRequest,
ShutdownActionResponse,
)
from ledgrab.config import get_config
from ledgrab.storage.database import Database
from ledgrab.utils import get_logger
@@ -32,85 +29,6 @@ logger = get_logger(__name__)
router = APIRouter()
# ---------------------------------------------------------------------------
# MQTT settings
# ---------------------------------------------------------------------------
def _load_mqtt_settings(db: Database) -> dict:
"""Load MQTT settings: YAML config defaults overridden by DB settings."""
cfg = get_config()
defaults = {
"enabled": cfg.mqtt.enabled,
"broker_host": cfg.mqtt.broker_host,
"broker_port": cfg.mqtt.broker_port,
"username": cfg.mqtt.username,
"password": cfg.mqtt.password,
"client_id": cfg.mqtt.client_id,
"base_topic": cfg.mqtt.base_topic,
}
overrides = db.get_setting("mqtt")
if overrides:
defaults.update(overrides)
return defaults
@router.get(
"/api/v1/system/mqtt/settings",
response_model=MQTTSettingsResponse,
tags=["System"],
)
async def get_mqtt_settings(_: AuthRequired, db: Database = Depends(get_database)):
"""Get current MQTT broker settings. Password is masked."""
s = _load_mqtt_settings(db)
return MQTTSettingsResponse(
enabled=s["enabled"],
broker_host=s["broker_host"],
broker_port=s["broker_port"],
username=s["username"],
password_set=bool(s.get("password")),
client_id=s["client_id"],
base_topic=s["base_topic"],
)
@router.put(
"/api/v1/system/mqtt/settings",
response_model=MQTTSettingsResponse,
tags=["System"],
)
async def update_mqtt_settings(
_: AuthRequired, body: MQTTSettingsRequest, db: Database = Depends(get_database)
):
"""Update MQTT broker settings. If password is empty string, the existing password is preserved."""
current = _load_mqtt_settings(db)
# If caller sends an empty password, keep the existing one
password = body.password if body.password else current.get("password", "")
new_settings = {
"enabled": body.enabled,
"broker_host": body.broker_host,
"broker_port": body.broker_port,
"username": body.username,
"password": password,
"client_id": body.client_id,
"base_topic": body.base_topic,
}
db.set_setting("mqtt", new_settings)
logger.info("MQTT settings updated")
return MQTTSettingsResponse(
enabled=new_settings["enabled"],
broker_host=new_settings["broker_host"],
broker_port=new_settings["broker_port"],
username=new_settings["username"],
password_set=bool(new_settings["password"]),
client_id=new_settings["client_id"],
base_topic=new_settings["base_topic"],
)
# ---------------------------------------------------------------------------
# External URL setting
# ---------------------------------------------------------------------------
+14 -2
View File
@@ -11,9 +11,21 @@ class RuleSchema(BaseModel):
rule_type: str = Field(description="Rule type discriminator (e.g. 'application')")
# Application rule fields
apps: List[str] | None = Field(None, description="Process names (for application rule)")
apps: List[str] | None = Field(
None,
description=(
"App identifiers for the application rule. Platform-specific and not "
"portable: process names on Windows (e.g. 'chrome.exe'), package names "
"on Android (e.g. 'com.android.chrome'). Matched case-insensitively."
),
)
match_type: str | None = Field(
None, description="'running' or 'topmost' (for application rule)"
None,
description=(
"'running', 'topmost', 'fullscreen', or 'topmost_fullscreen' (application "
"rule). On Android only the foreground app is detectable, so all values "
"behave as 'foreground'."
),
)
# Time-of-day rule fields
start_time: str | None = Field(None, description="Start time HH:MM (for time_of_day rule)")
+36 -29
View File
@@ -68,6 +68,42 @@ class ProcessListResponse(BaseModel):
count: int = Field(description="Number of unique processes")
class InstalledAppItem(BaseModel):
"""A launchable Android app, for the automation app picker."""
package: str = Field(description="Android package name, e.g. 'com.netflix.mediaclient'")
label: str = Field(description="Human-readable app label, e.g. 'Netflix'")
class InstalledAppsResponse(BaseModel):
"""Launchable apps for the application-rule picker (Android only; empty elsewhere)."""
apps: List[InstalledAppItem] = Field(description="Launchable apps, sorted by label")
count: int = Field(description="Number of apps")
class SystemInfoResponse(BaseModel):
"""Platform capability signal for the frontend (automation editor).
Lets the application-rule editor choose the right app source and matching
semantics per platform, and surface the Usage-Access permission state.
"""
is_android: bool = Field(description="True when the server runs on Android (Chaquopy)")
app_match_kind: Literal["process", "package"] = Field(
description=(
"What ApplicationRule.apps values represent: 'process' names on desktop, "
"'package' names on Android."
)
)
usage_access_granted: bool = Field(
description=(
"Android: whether PACKAGE_USAGE_STATS (Usage Access) is granted, gating "
"foreground-app detection. Always True (not applicable) off-Android."
)
)
class GpuInfo(BaseModel):
"""GPU performance information."""
@@ -158,35 +194,6 @@ class BackupListResponse(BaseModel):
count: int
# ─── MQTT schemas ──────────────────────────────────────────────
class MQTTSettingsResponse(BaseModel):
"""MQTT broker settings response (password is masked)."""
enabled: bool = Field(description="Whether MQTT is enabled")
broker_host: str = Field(description="MQTT broker hostname or IP")
broker_port: int = Field(ge=1, le=65535, description="MQTT broker port")
username: str = Field(description="MQTT username (empty = anonymous)")
password_set: bool = Field(description="Whether a password is configured")
client_id: str = Field(description="MQTT client ID")
base_topic: str = Field(description="Base topic prefix")
class MQTTSettingsRequest(BaseModel):
"""MQTT broker settings update request."""
enabled: bool = Field(description="Whether MQTT is enabled")
broker_host: str = Field(description="MQTT broker hostname or IP")
broker_port: int = Field(ge=1, le=65535, description="MQTT broker port")
username: str = Field(default="", description="MQTT username (empty = anonymous)")
password: str = Field(
default="", description="MQTT password (empty = keep existing if omitted)"
)
client_id: str = Field(default="ledgrab", description="MQTT client ID")
base_topic: str = Field(default="ledgrab", description="Base topic prefix")
# ─── External URL schema ───────────────────────────────────────
@@ -6,12 +6,14 @@ Non-Windows: graceful degradation (returns empty results).
import asyncio
import ctypes
import json
import os
import sys
import threading
from typing import Set
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_android
logger = get_logger(__name__)
@@ -21,6 +23,105 @@ if _IS_WINDOWS:
import ctypes.wintypes
# ---------------------------------------------------------------------------
# Android ForegroundAppBridge interop — lazy + guarded (never at import time)
# ---------------------------------------------------------------------------
# Android reports ``sys.platform == "linux"`` so ``_IS_WINDOWS`` is False there;
# the foreground app is read via the Kotlin ``ForegroundAppBridge`` (UsageStats)
# instead of Win32 ctypes. These module-level wrappers are the monkeypatch
# surface used by tests (mirrors ``android_camera_engine``) — patch the module
# function, not the live ``jclass`` object.
# Emit the "Usage Access not granted" warning only once per process so the ~1s
# automation poll loop doesn't spam the log while access is missing.
_warned_no_usage_access = False
def _foreground_bridge():
"""Return the Kotlin ``ForegroundAppBridge`` singleton, or None off-Android.
The ``from java import jclass`` import only resolves inside the Chaquopy
runtime, so it must never run at module import time (this module is imported
on desktop CI too). Mirrors ``android_camera_engine._camera_bridge()``.
"""
if not is_android():
return None
try:
from java import jclass # type: ignore[import-not-found]
except ImportError as exc:
logger.debug("Chaquopy java interop not available: %s", exc)
return None
try:
return jclass("com.ledgrab.android.ForegroundAppBridge").INSTANCE
except Exception as exc: # pragma: no cover - Android-only path
logger.debug("ForegroundAppBridge singleton unavailable: %s", exc)
return None
def has_usage_access() -> bool:
"""Whether Usage Access (PACKAGE_USAGE_STATS) is granted. False off-Android."""
bridge = _foreground_bridge()
if bridge is None:
return False
try:
return bool(bridge.hasUsageAccess())
except Exception as exc: # pragma: no cover - Android-only path
logger.debug("ForegroundAppBridge.hasUsageAccess failed: %s", exc)
return False
def get_foreground_package() -> str | None:
"""Current foreground app package via the Kotlin bridge, or None.
None off-Android, when the bridge is unavailable, when Usage Access is
missing, or when no foreground event is found in the trailing window.
Monkeypatched in tests.
"""
bridge = _foreground_bridge()
if bridge is None:
return None
try:
pkg = bridge.getForegroundPackage()
except Exception as exc: # pragma: no cover - Android-only path
logger.warning("ForegroundAppBridge.getForegroundPackage failed: %s", exc)
return None
if pkg is None:
return None
s = str(pkg).strip()
return s or None
def list_installed_apps() -> list[dict]:
"""Launchable apps via the Kotlin bridge: ``[{"package": .., "label": ..}]``.
Returns ``[]`` off-Android, when the bridge is unavailable, on error, or on
invalid JSON. Sorted by label (the bridge sorts; order is preserved here).
Monkeypatched in tests.
"""
bridge = _foreground_bridge()
if bridge is None:
return []
try:
raw = bridge.listLaunchableApps() # JSON array string
except Exception as exc: # pragma: no cover - Android-only path
logger.warning("ForegroundAppBridge.listLaunchableApps failed: %s", exc)
return []
try:
parsed = json.loads(str(raw))
except (ValueError, TypeError) as exc: # pragma: no cover - Android-only path
logger.warning("ForegroundAppBridge.listLaunchableApps returned invalid JSON: %s", exc)
return []
apps: list[dict] = []
for entry in parsed if isinstance(parsed, list) else []:
if not isinstance(entry, dict):
continue
pkg = entry.get("package")
if not pkg:
continue
apps.append({"package": str(pkg), "label": str(entry.get("label") or pkg)})
return apps
class PlatformDetector:
"""Detect running processes and the foreground window's process."""
@@ -215,6 +316,31 @@ class PlatformDetector:
# ---- Process detection ----
def _get_android_foreground(self) -> tuple:
"""(package_lowercased, True) for the foreground app on Android.
Returns ``(None, False)`` when Usage Access is not granted (warned once)
or no foreground app is found. ``is_fullscreen`` is reported True because
a foreground TV app effectively covers the screen — so an Android rule's
``topmost``/``topmost_fullscreen``/``fullscreen`` match types all behave
as "this app is in front". Delegates to the module-level bridge wrappers
(the monkeypatch surface used by tests).
"""
global _warned_no_usage_access
if not has_usage_access():
if not _warned_no_usage_access:
logger.warning(
"Android 'Application' automation rules need Usage Access "
"(Settings > Usage access). Foreground-app rules will not match "
"until it is granted."
)
_warned_no_usage_access = True
return (None, False)
pkg = get_foreground_package()
if not pkg:
return (None, False)
return (pkg.lower(), True)
def _get_running_processes_sync(self) -> Set[str]:
"""Get set of lowercase process names via Win32 EnumProcesses.
@@ -222,7 +348,14 @@ class PlatformDetector:
which is ~300x faster than WMI (~8ms vs ~3s). System services
running under protected accounts are not visible, but all
user-facing applications are covered.
On Android there is no process enumeration API (getRunningTasks is
restricted); the foreground app is reported as the sole "running" entry
as a best-effort so ``match_type="running"`` rules still work.
"""
if is_android():
pkg, _ = self._get_android_foreground()
return {pkg} if pkg else set()
if not _IS_WINDOWS:
return set()
@@ -276,9 +409,13 @@ class PlatformDetector:
def _get_topmost_process_sync(self) -> tuple:
"""Get (process_name, is_fullscreen) of the foreground window.
Returns (None, False) when detection fails.
On Android the "foreground window" is the foreground app package (read
via the Kotlin ForegroundAppBridge); see ``_get_android_foreground``.
Returns (None, False) when detection fails / Usage Access is missing.
Blocking — call via executor.
"""
if is_android():
return self._get_android_foreground()
if not _IS_WINDOWS:
return (None, False)
@@ -369,7 +506,13 @@ class PlatformDetector:
Enumerates all top-level windows and checks each for fullscreen.
Returns process names (lowercase) whose window covers an entire monitor.
On Android the foreground app is treated as fullscreen, so it is the
sole entry (best-effort, mirrors ``_get_running_processes_sync``).
"""
if is_android():
pkg, _ = self._get_android_foreground()
return {pkg} if pkg else set()
if not _IS_WINDOWS:
return set()
@@ -86,6 +86,18 @@ try:
except ImportError:
_has_mediaprojection = False
# ── Android camera/webcam (Camera2 via Chaquopy bridge) ─────────────
try:
from ledgrab.core.capture_engines.android_camera_engine import (
AndroidCameraEngine,
AndroidCameraCaptureStream,
)
_has_android_camera = True
except ImportError:
_has_android_camera = False
# ── Android root screenrecord (rooted Magisk devices) ───────────────
try:
@@ -120,6 +132,8 @@ if _has_camera:
EngineRegistry.register(CameraEngine)
if _has_mediaprojection:
EngineRegistry.register(MediaProjectionEngine)
if _has_android_camera:
EngineRegistry.register(AndroidCameraEngine)
if _has_root_screenrecord:
EngineRegistry.register(RootScreenrecordEngine)
EngineRegistry.register(DemoCaptureEngine)
@@ -152,5 +166,7 @@ if _has_camera:
__all__ += ["CameraEngine", "CameraCaptureStream"]
if _has_mediaprojection:
__all__ += ["MediaProjectionEngine", "MediaProjectionCaptureStream"]
if _has_android_camera:
__all__ += ["AndroidCameraEngine", "AndroidCameraCaptureStream"]
if _has_root_screenrecord:
__all__ += ["RootScreenrecordEngine", "RootScreenrecordCaptureStream"]
@@ -0,0 +1,430 @@
"""Android camera (webcam) capture engine.
Receives camera frames pushed from Kotlin (via Chaquopy) through a
module-level frame queue. The Kotlin :class:`CameraBridge` opens a
camera with the Camera2 API, converts each frame to RGB, and calls
:func:`push_frame` with raw RGB bytes.
The physical camera is opened **on demand** — only while a capture
stream is active. :meth:`AndroidCameraCaptureStream.initialize` calls
:func:`start_camera` (which signals the Kotlin bridge to open the
camera) and :meth:`cleanup` calls :func:`stop_camera`. This keeps the
camera-in-use indicator and battery cost limited to actual use, unlike
the always-on screen/audio capture.
Mirrors the screen-capture bridge
(``core/capture_engines/mediaprojection_engine.py``): a module-level
queue plus push/last-frame fallback/drop-oldest, consumed through the
standard :class:`CaptureEngine` / :class:`CaptureStream` interface so
the live-stream and processing pipelines work unchanged. Cameras are
exposed as selectable "displays" exactly like the desktop OpenCV
:class:`CameraEngine`.
This engine is only available when running inside the LedGrab Android
app (``is_android()``) with at least one camera the Kotlin bridge can
enumerate. All Java interop is lazy + guarded so this module imports
cleanly on desktop CI.
"""
import json
import queue
import threading
import time
from typing import Any, Dict, List, Optional
import numpy as np
from ledgrab.core.capture_engines.base import (
CaptureEngine,
CaptureStream,
DisplayInfo,
ScreenCapture,
)
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_android
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Frame queue — the bridge between Kotlin and Python
# ---------------------------------------------------------------------------
_frame_queue: "queue.Queue[ScreenCapture]" = queue.Queue(maxsize=2)
_active = False
_active_index = 0
_frames_received = 0
# Single-camera ownership. The Kotlin bridge supports exactly one open camera
# at a time (it closes any prior camera on a new open), and all streams share
# the one module-level frame queue. So the engine serializes ownership the way
# the desktop CameraEngine does with its _camera_lock/_active_cv2_indices: the
# first stream to initialize() owns the camera; a second stream on the SAME
# camera attaches (ref-counted); a second stream on a DIFFERENT camera is
# refused. Only the last owner to clean up actually stops the camera. Without
# this, two concurrent android_camera sources on different displays would make
# the second open silently steal the first's frames, and either stream's
# cleanup would drain the shared queue out from under the other.
_state_lock = threading.Lock()
_owner_index: int | None = None # display_index that currently owns the camera
_owner_refs = 0 # number of streams attached to the active camera
# Camera2 delivers frames continuously, but cache the last one so a
# brief consumer stall still has something to read (mirrors
# mediaprojection_engine's _last_frame).
_last_frame: Optional["ScreenCapture"] = None
# Enumeration cache. is_available() is polled by the engine registry,
# so the (cheap but non-free) Camera2 enumeration is cached briefly —
# matching the desktop CameraEngine's 30 s TTL.
_cam_cache: List[Dict[str, Any]] | None = None
_cam_cache_time: float = 0.0
_CAM_CACHE_TTL = 30.0 # seconds
# Resolution presets shown in the UI. Identical to the desktop
# CameraEngine set so the data-driven capture-template config UI
# (keyed by the "resolution" field name) renders the same dropdown.
# "auto" lets the Kotlin bridge pick a balanced output size.
_RESOLUTION_CHOICES: List[str] = [
"auto",
"640x480",
"1280x720",
"1920x1080",
"2560x1440",
"3840x2160",
]
def _parse_resolution(value: Any) -> tuple[int, int] | None:
"""Parse a 'WxH' string into (width, height). None for 'auto'/invalid."""
if not isinstance(value, str):
return None
s = value.strip().lower()
if s in ("", "auto"):
return None
parts = s.replace("×", "x").split("x")
if len(parts) != 2:
return None
try:
w, h = int(parts[0]), int(parts[1])
except ValueError:
return None
if w <= 0 or h <= 0:
return None
return w, h
# ---------------------------------------------------------------------------
# Kotlin CameraBridge interop — lazy + guarded (never at import time)
# ---------------------------------------------------------------------------
def _camera_bridge():
"""Return the Kotlin ``CameraBridge`` singleton, or None off-Android.
The ``from java import jclass`` import only resolves inside the
Chaquopy runtime, so it must never run at module import time (this
module is imported on desktop CI too). Mirrors
``core/devices/android_ble_transport.py``.
"""
if not is_android():
return None
try:
from java import jclass # type: ignore[import-not-found]
except ImportError as exc:
logger.debug("Chaquopy java interop not available: %s", exc)
return None
try:
return jclass("com.ledgrab.android.CameraBridge").INSTANCE
except Exception as exc: # pragma: no cover - Android-only path
logger.debug("CameraBridge singleton unavailable: %s", exc)
return None
def list_cameras() -> List[Dict[str, Any]]:
"""Enumerate cameras via the Kotlin bridge.
Returns a list of ``{"index": int, "name": str, "facing": str}``
dicts in stable enumeration order, or ``[]`` off-Android / on error
/ when the device has no cameras or CAMERA enumeration fails.
Monkeypatched in tests to inject a fake list without Android.
"""
bridge = _camera_bridge()
if bridge is None:
return []
try:
raw = bridge.listCameras() # JSON array string
except Exception as exc: # pragma: no cover - Android-only path
logger.warning("CameraBridge.listCameras failed: %s", exc)
return []
try:
parsed = json.loads(str(raw))
except (ValueError, TypeError) as exc: # pragma: no cover
logger.warning("CameraBridge.listCameras returned invalid JSON: %s", exc)
return []
cameras: List[Dict[str, Any]] = []
for i, entry in enumerate(parsed if isinstance(parsed, list) else []):
if not isinstance(entry, dict):
continue
cameras.append(
{
"index": int(entry.get("index", i)),
"name": str(entry.get("name") or f"Camera {i}"),
"facing": str(entry.get("facing") or "unknown"),
}
)
return cameras
def _enumerate_cameras() -> List[Dict[str, Any]]:
"""Cached camera enumeration (TTL ``_CAM_CACHE_TTL``)."""
global _cam_cache, _cam_cache_time
now = time.monotonic()
if _cam_cache is not None and (now - _cam_cache_time) < _CAM_CACHE_TTL:
return _cam_cache
_cam_cache = list_cameras()
_cam_cache_time = now
return _cam_cache
def start_camera(index: int, width: int, height: int) -> bool:
"""Signal the Kotlin bridge to open camera ``index`` (on demand).
``width``/``height`` are the requested capture size (0 => let the
bridge pick a balanced default). Returns True if the camera began
streaming. False off-Android, when the bridge is unavailable, or
when the open failed (e.g. CAMERA permission denied, camera in use).
Monkeypatched in tests.
"""
bridge = _camera_bridge()
if bridge is None:
return False
try:
return bool(bridge.startCamera(index, width, height))
except Exception as exc: # pragma: no cover - Android-only path
logger.warning("CameraBridge.startCamera(%d) failed: %s", index, exc)
return False
def stop_camera(index: int) -> None:
"""Signal the Kotlin bridge to close the active camera. No-op off-Android."""
bridge = _camera_bridge()
if bridge is None:
return
try:
bridge.stopCamera()
except Exception as exc: # pragma: no cover - Android-only path
logger.debug("CameraBridge.stopCamera failed: %s", exc)
def push_frame(rgb_bytes: bytes, width: int, height: int) -> None:
"""Push one RGB frame from Kotlin into the capture pipeline.
Called from ``CameraBridge`` on its capture thread. The byte buffer
is interpreted as tightly-packed RGB (``width * height * 3`` bytes,
3 bytes/pixel — NOT RGBA). The buffer is copied out so Kotlin may
reuse its backing array; the oldest queued frame is dropped if the
consumer is slow.
"""
global _frames_received, _last_frame
expected = width * height * 3
if expected <= 0:
return
arr = np.frombuffer(rgb_bytes, dtype=np.uint8)
if arr.size < expected:
# Short/malformed buffer — drop rather than reshape-crash.
return
# Copy out of the read-only frombuffer view (and off any reusable
# Kotlin buffer) so the queued frame owns its memory. Mirrors
# mediaprojection_engine.push_frame's .copy().
rgb = arr[:expected].reshape((height, width, 3)).copy()
frame = ScreenCapture(
image=rgb,
width=width,
height=height,
display_index=_active_index,
)
_last_frame = frame
_frames_received += 1
if _frames_received == 1 or _frames_received % 100 == 0:
logger.info("Android camera: received %d frames", _frames_received)
# Drop oldest frame if queue is full (non-blocking).
try:
_frame_queue.put_nowait(frame)
except queue.Full:
try:
_frame_queue.get_nowait()
except queue.Empty:
pass
try:
_frame_queue.put_nowait(frame)
except queue.Full:
pass
def shutdown() -> None:
"""Deactivate the engine. Called when the Android app stops."""
global _active
_active = False
logger.info("Android camera engine shut down")
def _drain_queue() -> None:
"""Discard any queued frames (stale frames from a prior session)."""
global _last_frame
while not _frame_queue.empty():
try:
_frame_queue.get_nowait()
except queue.Empty:
break
_last_frame = None
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class AndroidCameraCaptureStream(CaptureStream):
"""Reads camera frames pushed by Kotlin from the module-level queue.
Opening the physical camera is on demand: :meth:`initialize` asks
the Kotlin bridge to open the camera bound to ``display_index`` and
:meth:`cleanup` asks it to close.
"""
def initialize(self) -> None:
if self._initialized:
return
if not is_android():
raise RuntimeError(
"Android camera engine not available. "
"This engine is only usable inside the Android app."
)
parsed = _parse_resolution(self.config.get("resolution", "auto"))
target_w, target_h = parsed if parsed is not None else (0, 0)
global _active, _active_index, _owner_index, _owner_refs
with _state_lock:
if _owner_index is not None and _owner_index != self.display_index:
# Another camera is already streaming — the bridge can only
# drive one at a time, so refuse rather than silently stealing
# the active camera's frames (mirrors the desktop CameraEngine's
# "already in use by another stream").
raise RuntimeError(
f"Android camera {_owner_index} is already in use by another "
f"capture; only one camera can stream at a time"
)
if _owner_index == self.display_index:
# Same camera already open — attach to it (ref-counted).
_owner_refs += 1
self._initialized = True
logger.info(
"Android camera capture stream attached (camera=%d, refs=%d)",
self.display_index,
_owner_refs,
)
return
# No camera open — open this one. Drain stale frames first so the
# first captured frame is actually current.
_drain_queue()
if not start_camera(self.display_index, target_w, target_h):
raise RuntimeError(
f"Failed to open Android camera {self.display_index} "
f"(CAMERA permission denied, camera in use, or unavailable)"
)
_owner_index = self.display_index
_owner_refs = 1
_active = True
_active_index = self.display_index
self._initialized = True
logger.info("Android camera capture stream initialized (camera=%d)", self.display_index)
def capture_frame(self) -> ScreenCapture | None:
if not self._initialized:
self.initialize()
# Prefer a fresh frame; fall back to the last one on a brief stall.
try:
return _frame_queue.get(timeout=0.1)
except queue.Empty:
return _last_frame
def cleanup(self) -> None:
if self._initialized:
global _active, _owner_index, _owner_refs
with _state_lock:
_owner_refs -= 1
if _owner_refs <= 0:
# Last owner released — actually stop the camera.
stop_camera(self.display_index)
_owner_index = None
_owner_refs = 0
_active = False
_drain_queue()
self._initialized = False
logger.info("Android camera capture stream cleaned up (camera=%d)", self.display_index)
else:
self._initialized = False
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class AndroidCameraEngine(CaptureEngine):
"""Android camera/webcam capture engine (Camera2 via Kotlin bridge).
Only available inside the LedGrab Android app with at least one
enumerable camera. Each camera is exposed as a selectable
"display", mirroring the desktop OpenCV :class:`CameraEngine`.
Selected explicitly via ``engine_type="android_camera"`` in a
capture template — never auto-selected (priority 0, below
MediaProjection's 100).
"""
ENGINE_TYPE = "android_camera"
ENGINE_PRIORITY = 0 # never auto-selected over MediaProjection (100); explicit only
HAS_OWN_DISPLAYS = True
@classmethod
def is_available(cls) -> bool:
return is_android() and len(_enumerate_cameras()) > 0
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {"resolution": "auto"}
@classmethod
def get_config_choices(cls) -> Dict[str, List[str]]:
return {"resolution": list(_RESOLUTION_CHOICES)}
@classmethod
def get_available_displays(cls) -> List[DisplayInfo]:
displays: List[DisplayInfo] = []
for cam in _enumerate_cameras():
idx = cam["index"]
displays.append(
DisplayInfo(
index=idx,
name=cam["name"],
width=0,
height=0,
x=idx * 500,
y=0,
is_primary=(idx == 0),
refresh_rate=30,
)
)
return displays
@classmethod
def create_stream(
cls, display_index: int, config: Dict[str, Any]
) -> AndroidCameraCaptureStream:
merged = {**cls.get_default_config(), **config}
return AndroidCameraCaptureStream(display_index, merged)
@@ -8,6 +8,8 @@ Supported platforms:
- **Windows**: polls toast notifications via winrt UserNotificationListener
(falls back to winsdk if winrt packages are not installed)
- **Linux**: monitors org.freedesktop.Notifications via D-Bus (dbus-next)
- **Android**: receives notifications pushed from a Kotlin NotificationListenerService
via Chaquopy (push-based; see push_notification() and _AndroidBackend)
"""
import asyncio
@@ -17,9 +19,10 @@ import platform
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Set
from typing import Callable, Dict, List, Optional, Set
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_linux
logger = get_logger(__name__)
@@ -30,15 +33,71 @@ _HISTORY_MAX = 50
# Module-level singleton for dependency access
_instance: Optional["OsNotificationListener"] = None
# Push target for the Android backend — set by _AndroidBackend.start(), read by
# push_notification(). None when the Android backend isn't running (desktop / server down).
_android_target: Callable[[str | None], None] | None = None
def get_os_notification_listener() -> Optional["OsNotificationListener"]:
"""Return the global OsNotificationListener instance (or None)."""
return _instance
def push_notification(app_name: str | None) -> None:
"""Receive an Android notification pushed from Kotlin via Chaquopy.
Called by the LedGrabNotificationListener service through
``Python.getInstance().getModule(...).callAttr("push_notification", label)``.
Routes the posting app's display label into the active listener's
``_on_new_notification`` handler. No-op when the Android backend isn't running,
so a notification arriving before the server is ready (or on desktop) is safely
ignored.
"""
# Snapshot into a local first: stop() may null _android_target concurrently, but an
# in-flight push then still completes against the prior callback. Do NOT collapse this
# into `if _android_target is not None: _android_target(...)` — that reintroduces a
# TOCTOU None-deref race.
cb = _android_target
if cb is None:
return
try:
cb(app_name)
except Exception as exc: # never let a JNI-side call crash the bound service
logger.warning("push_notification callback error: %s", exc)
# ── Platform backends ──────────────────────────────────────────────────
class _AndroidBackend:
"""Push-based backend — notifications arrive from Kotlin via push_notification().
Unlike the Windows/Linux backends (which poll or eavesdrop on a thread), Android
notifications are delivered by a Kotlin NotificationListenerService across the
Chaquopy JNI boundary into the module-level push_notification() receiver, so
start()/stop() simply register/clear the receiver target.
"""
def __init__(self, on_notification):
self._on_notification = on_notification
@staticmethod
def probe() -> bool:
"""Return True when running on Android (Chaquopy)."""
from ledgrab.utils.platform import is_android
return is_android()
def start(self) -> None:
global _android_target
_android_target = self._on_notification
logger.info("OS notification listener: Android backend active")
def stop(self) -> None:
global _android_target
_android_target = None
def _import_winrt_notifications():
"""Try to import WinRT notification APIs: winrt first, then winsdk fallback.
@@ -193,7 +252,9 @@ class _LinuxBackend:
@staticmethod
def probe() -> bool:
"""Return True if this backend can run on the current system."""
if platform.system() != "Linux":
# is_linux() excludes Android, which also reports platform.system() == "Linux"
# but has no D-Bus session — defense-in-depth beyond probe ordering.
if not is_linux():
return False
try:
import dbus_next # noqa: F401
@@ -312,8 +373,9 @@ class OsNotificationListener:
global _instance
_instance = self
# Try platform backends in order
for backend_cls in (_WindowsBackend, _LinuxBackend):
# Try platform backends in order (Android first — it reports platform.system()
# == "Linux", so probing it ahead of _LinuxBackend is the robust ordering).
for backend_cls in (_AndroidBackend, _WindowsBackend, _LinuxBackend):
if backend_cls.probe():
self._backend = backend_cls(on_notification=self._on_new_notification)
self._backend.start()
@@ -93,7 +93,7 @@ async def apply_scene_state(
proc = processor_manager.get_processor(ts.target_id)
if proc and proc.is_running:
css_changed = "color_strip_source_id" in changed
brightness_changed = "brightness" in changed
brightness_changed = "brightness_value_source_id" in changed
settings_changed = "fps" in changed
if css_changed:
target.sync_with_manager(
@@ -74,6 +74,19 @@
font-size: 0.85rem;
}
/* Android-only: shown in the application rule when Usage Access is missing,
so the foreground-app rule can't fire until the user grants it on the TV. */
.rule-usage-warning {
margin-bottom: 10px;
padding: 8px 12px;
border-radius: 6px;
font-size: 0.85rem;
line-height: 1.35;
color: var(--warning-color, #ff9800);
background: color-mix(in srgb, var(--warning-color, #ff9800) 12%, transparent);
border: 1px solid color-mix(in srgb, var(--warning-color, #ff9800) 35%, transparent);
}
.btn-remove-rule {
background: none;
border: none;
@@ -2,13 +2,19 @@
* Command-palette style name picker — reusable UI for browsing a list of
* names fetched from any API endpoint. Mirrors the EntityPalette pattern.
*
* Two concrete pickers are exported:
* Three concrete pickers are exported:
*
* - **ProcessPalette** — picks from running OS processes (`/system/processes`)
* - **NotificationAppPalette** — picks from OS notification history apps
* - **AppPalette** — picks from Android launchable apps (`/system/installed-apps`),
* displaying the human label but inserting the package name
*
* Both support single-select (returns one value) and multi-select (appends to
* a textarea).
* Items may be plain strings (display == stored value) or `{ value, label }`
* pairs (display the label, store the value — used by AppPalette so the rule
* stores the package name while the user sees "Netflix").
*
* All support single-select (returns one value) and multi-select (appends the
* value to a textarea).
*
* Usage:
*
@@ -29,8 +35,16 @@ import { ICON_SEARCH } from './icons.ts';
/* ─── types ────────────────────────────────────────────────── */
interface PaletteItem {
name: string;
/** An item with a display label distinct from its stored value. */
interface AppItem {
value: string;
label: string;
}
/** Raw items a fetcher may return: bare strings or labelled pairs. */
type RawItem = string | AppItem;
interface PaletteEntry extends AppItem {
added: boolean;
}
@@ -44,7 +58,9 @@ interface PickMultiOpts {
placeholder?: string;
}
type FetchItemsFn = () => Promise<string[]>;
type FetchItemsFn = () => Promise<RawItem[]>;
const DEFAULT_EMPTY_KEY = 'automations.condition.application.no_processes';
/* ─── generic NamePalette (shared logic) ───────────────────── */
@@ -53,19 +69,21 @@ class NamePalette {
private _input: HTMLInputElement;
private _list: HTMLDivElement;
private _fetchItems: FetchItemsFn;
private _emptyKey: string;
private _resolveSingle: ((v: string | undefined) => void) | null = null;
private _multiTextarea: HTMLTextAreaElement | null = null;
private _items: string[] = [];
private _items: AppItem[] = [];
private _existing: Set<string> = new Set();
private _filtered: PaletteItem[] = [];
private _filtered: PaletteEntry[] = [];
private _highlightIdx = 0;
private _currentValue: string | undefined;
private _isMulti = false;
constructor(fetchItems: FetchItemsFn) {
constructor(fetchItems: FetchItemsFn, emptyKey: string = DEFAULT_EMPTY_KEY) {
this._fetchItems = fetchItems;
this._emptyKey = emptyKey;
this._overlay = document.createElement('div');
this._overlay.className = 'entity-palette-overlay process-palette-overlay';
@@ -107,14 +125,20 @@ class NamePalette {
this._isMulti = true;
this._multiTextarea = opts.textarea;
this._resolveSingle = resolve as any;
this._existing = new Set(
opts.textarea.value.split('\n').map(s => s.trim().toLowerCase()).filter(Boolean),
);
this._existing = this._textareaValues(opts.textarea);
this._currentValue = undefined;
this._open(opts.placeholder);
});
}
private _textareaValues(ta: HTMLTextAreaElement): Set<string> {
return new Set(ta.value.split('\n').map(s => s.trim().toLowerCase()).filter(Boolean));
}
private _normalize(raw: RawItem[]): AppItem[] {
return raw.map(r => (typeof r === 'string' ? { value: r, label: r } : r));
}
private async _open(placeholder?: string) {
this._input.placeholder = placeholder || '';
this._input.value = '';
@@ -123,15 +147,13 @@ class NamePalette {
requestAnimationFrame(() => this._input.focus());
try {
this._items = await this._fetchItems();
this._items = this._normalize(await this._fetchItems());
} catch {
this._items = [];
}
if (this._isMulti) {
this._existing = new Set(
this._multiTextarea!.value.split('\n').map(s => s.trim().toLowerCase()).filter(Boolean),
);
this._existing = this._textareaValues(this._multiTextarea!);
}
this._filter();
@@ -142,14 +164,11 @@ class NamePalette {
private _filter() {
const q = this._input.value.toLowerCase().trim();
this._filtered = this._items
.filter(p => !q || p.toLowerCase().includes(q))
.map(p => ({
name: p,
added: this._existing.has(p.toLowerCase()),
}));
.filter(p => !q || p.label.toLowerCase().includes(q) || p.value.toLowerCase().includes(q))
.map(p => ({ ...p, added: this._existing.has(p.value.toLowerCase()) }));
this._highlightIdx = this._filtered.findIndex(
i => i.name.toLowerCase() === (this._currentValue || '').toLowerCase(),
i => i.value.toLowerCase() === (this._currentValue || '').toLowerCase(),
);
if (this._highlightIdx === -1) this._highlightIdx = 0;
this._render();
@@ -158,9 +177,7 @@ class NamePalette {
private _render() {
if (this._filtered.length === 0) {
this._list.innerHTML = `<div class="entity-palette-empty">${
this._items.length === 0
? t('automations.condition.application.no_processes')
: '—'
this._items.length === 0 ? t(this._emptyKey) : '—'
}</div>`;
return;
}
@@ -170,12 +187,21 @@ class NamePalette {
'entity-palette-item',
i === this._highlightIdx ? 'ep-highlight' : '',
item.added ? 'ep-current' : '',
item.name.toLowerCase() === (this._currentValue || '').toLowerCase() ? 'ep-current' : '',
item.value.toLowerCase() === (this._currentValue || '').toLowerCase() ? 'ep-current' : '',
].filter(Boolean).join(' ');
// When the label differs from the stored value (e.g. "Netflix" vs
// "com.netflix.mediaclient"), show the value as a secondary line so
// users can see exactly what gets matched. Otherwise fall back to the
// ✓ added-marker.
const showValue = item.label !== item.value;
const trailing = showValue
? `<span class="ep-item-desc">${escapeHtml(item.value)}</span>`
: (item.added ? '<span class="ep-item-desc">✓</span>' : '');
return `<div class="${cls}" data-idx="${i}">
<span class="ep-item-label">${escapeHtml(item.name)}</span>
${item.added ? '<span class="ep-item-desc">\u2713</span>' : ''}
<span class="ep-item-label">${escapeHtml(item.label)}</span>
${trailing}
</div>`;
}).join('');
@@ -192,19 +218,19 @@ class NamePalette {
/* ── selection ──────────────────────────────────────────── */
private _selectItem(item: PaletteItem) {
private _selectItem(item: PaletteEntry) {
if (this._isMulti) {
if (!item.added) {
const ta = this._multiTextarea!;
const cur = ta.value.trim();
ta.value = cur ? cur + '\n' + item.name : item.name;
this._existing.add(item.name.toLowerCase());
ta.value = cur ? cur + '\n' + item.value : item.value;
this._existing.add(item.value.toLowerCase());
item.added = true;
this._render();
}
} else {
this._overlay.classList.remove('open');
if (this._resolveSingle) this._resolveSingle(item.name);
if (this._resolveSingle) this._resolveSingle(item.value);
this._resolveSingle = null;
}
}
@@ -269,6 +295,17 @@ async function _fetchNotificationApps(): Promise<string[]> {
return Array.from(seen.values()).sort((a, b) => a.localeCompare(b));
}
async function _fetchInstalledApps(): Promise<AppItem[]> {
try {
const data = await apiGet<{ apps?: Array<{ package: string; label: string }> }>(
'/system/installed-apps',
);
return (data.apps || []).map(a => ({ value: a.package, label: a.label || a.package }));
} catch {
return [];
}
}
/* ─── ProcessPalette (running processes) ───────────────────── */
let _processInst: NamePalette | null = null;
@@ -301,6 +338,22 @@ export class NotificationAppPalette {
}
}
/* ─── AppPalette (Android launchable apps) ─────────────────── */
let _appInst: NamePalette | null = null;
export class AppPalette {
static pick(opts: PickOpts): Promise<string | undefined> {
if (!_appInst) _appInst = new NamePalette(_fetchInstalledApps, 'automations.rule.application.no_apps');
return _appInst.pickSingle(opts);
}
static pickMulti(opts: PickMultiOpts): Promise<void> {
if (!_appInst) _appInst = new NamePalette(_fetchInstalledApps, 'automations.rule.application.no_apps');
return _appInst.pickMulti(opts);
}
}
/* ─── drop-in replacement for the old attachProcessPicker ─── */
/**
@@ -334,3 +387,19 @@ export function attachNotificationAppPicker(containerEl: HTMLElement, textareaEl
});
});
}
/**
* Wire up a `.btn-browse-apps` button to open the Android launchable-app palette
* (multi-select, feeding package names into a textarea while showing labels).
*/
export function attachAppPicker(containerEl: HTMLElement, textareaEl: HTMLTextAreaElement): void {
const browseBtn = containerEl.querySelector('.btn-browse-apps');
if (!browseBtn) return;
browseBtn.addEventListener('click', () => {
AppPalette.pickMulti({
textarea: textareaEl,
placeholder: t('automations.rule.application.search_apps') || 'Filter apps…',
});
});
}
@@ -29,7 +29,7 @@ import { getBaseOrigin } from './settings.ts';
import { IconSelect } from '../core/icon-select.ts';
import { EntitySelect } from '../core/entity-palette.ts';
import { enhanceMiniSelects } from '../core/mini-select.ts';
import { attachProcessPicker } from '../core/process-picker.ts';
import { attachProcessPicker, attachAppPicker } from '../core/process-picker.ts';
import { TreeNav } from '../core/tree-nav.ts';
import { csScenes, createSceneCard, initScenePresetDelegation } from './scene-presets.ts';
import type { Automation, RuleType } from '../types.ts';
@@ -215,6 +215,28 @@ document.addEventListener('server:automation_state_changed', () => {
if (apiKey && isActiveTab('automations')) loadAutomations();
});
/** Platform capability signal from `/system/info` — drives the application-rule
* editor (process picker + match types on desktop vs. app picker + foreground-only
* on Android) and the Usage-Access banner. Fetched once and cached. */
interface PlatformInfo {
is_android: boolean;
app_match_kind: 'process' | 'package';
usage_access_granted: boolean;
}
let _platformInfo: PlatformInfo | null = null;
async function ensurePlatformInfo(): Promise<PlatformInfo> {
if (_platformInfo) return _platformInfo;
try {
_platformInfo = await apiGet<PlatformInfo>('/system/info');
} catch {
// Default to desktop semantics if the signal can't be fetched.
_platformInfo = { is_android: false, app_match_kind: 'process', usage_access_granted: true };
}
return _platformInfo;
}
export async function loadAutomations() {
if (_automationsLoading) return;
set_automationsLoading(true);
@@ -222,6 +244,10 @@ export async function loadAutomations() {
if (!container) { set_automationsLoading(false); return; }
if (!csAutomations.isMounted()) setTabRefreshing('automations-content', true);
// Prime the platform signal so the editor renders the right app source +
// match semantics without an async hop when a rule row is expanded.
void ensurePlatformInfo();
try {
const [automations, scenes] = await Promise.all([
automationsCacheObj.fetch(),
@@ -559,6 +585,11 @@ export async function openAutomationEditor(automationId?: any, cloneData?: any)
errorEl.style.display = 'none';
ruleList!.innerHTML = '';
// Ensure the platform signal is loaded before rendering rule rows so the
// application rule picks the right app source + match semantics. The
// automations tab primes this, but the graph editor opens this directly.
await ensurePlatformInfo();
_ensureRuleLogicIconSelect();
_ensureDeactivationModeIconSelect();
@@ -1129,6 +1160,33 @@ function _renderWebhookFields(container: HTMLElement, data: any): void {
function _renderApplicationFields(container: HTMLElement, data: any): void {
const appsValue = (data.apps || []).join('\n');
// On Android there is exactly one obtainable signal — the foreground app —
// so the match-type selector is hidden (match_type is forced to "topmost" by
// the collector) and the app list comes from launchable apps (package names)
// rather than running processes (process names).
if (_platformInfo?.is_android) {
const banner = _platformInfo.usage_access_granted
? ''
: `<div class="rule-usage-warning">${t('automations.rule.application.usage_access_required')}</div>`;
container.innerHTML = `
<div class="rule-fields">
${banner}
<div class="rule-field">
<div class="rule-apps-header">
<label>${t('automations.rule.application.apps')}</label>
<button type="button" class="btn btn-icon btn-secondary btn-browse-apps" title="${t('automations.rule.application.browse')}">${ICON_SEARCH}</button>
</div>
<textarea class="rule-apps" rows="3" placeholder="com.netflix.mediaclient&#10;com.android.chrome">${escapeHtml(appsValue)}</textarea>
<small class="rule-hint-desc">${t('automations.rule.application.apps.hint_android')}</small>
</div>
</div>
`;
const textarea = container.querySelector('.rule-apps') as HTMLTextAreaElement;
attachAppPicker(container, textarea);
return;
}
const matchType = data.match_type || 'running';
container.innerHTML = `
<div class="rule-fields">
@@ -1299,7 +1357,10 @@ const RULE_COLLECTORS: Record<RuleType, RuleCollector> = {
return r;
},
application: (row) => {
const matchType = (row.querySelector('.rule-match-type') as HTMLSelectElement).value;
// On Android the match-type selector is hidden (only the foreground app is
// detectable), so default to "topmost" when the select isn't present.
const matchSel = row.querySelector('.rule-match-type') as HTMLSelectElement | null;
const matchType = matchSel ? matchSel.value : 'topmost';
const appsText = (row.querySelector('.rule-apps') as HTMLTextAreaElement).value.trim();
const apps = appsText ? appsText.split('\n').map(a => a.trim()).filter(Boolean) : [];
return { rule_type: 'application', apps, match_type: matchType };
@@ -103,6 +103,7 @@
"templates.engine.wgc.desc": "Windows Graphics Capture",
"templates.engine.demo.desc": "Animated test pattern (demo mode)",
"templates.engine.mediaprojection.desc": "Native Android screen capture",
"templates.engine.android_camera.desc": "On-device camera capture (Camera2)",
"templates.config": "Configuration",
"templates.config.show": "Show configuration",
"templates.config.none": "No additional configuration",
@@ -1225,6 +1226,10 @@
"automations.rule.application.match_type.topmost_fullscreen.desc": "Foreground + fullscreen",
"automations.rule.application.match_type.fullscreen": "Fullscreen",
"automations.rule.application.match_type.fullscreen.desc": "Any fullscreen app",
"automations.rule.application.apps.hint_android": "Package names, one per line (e.g. com.netflix.mediaclient)",
"automations.rule.application.search_apps": "Filter apps...",
"automations.rule.application.no_apps": "No apps found",
"automations.rule.application.usage_access_required": "Needs Usage Access. On your LedGrab TV, open the app and tap 'Grant usage access'.",
"automations.rule.time_of_day": "Time of Day",
"automations.rule.time_of_day.desc": "Time range",
"automations.rule.time_of_day.start_time": "Start Time:",
@@ -158,6 +158,7 @@
"templates.engine.wgc.desc": "Windows Graphics Capture",
"templates.engine.demo.desc": "Тестовый анимированный шаблон (демо)",
"templates.engine.mediaprojection.desc": "Нативный захват экрана Android",
"templates.engine.android_camera.desc": "Захват камеры устройства (Camera2)",
"templates.config": "Конфигурация",
"templates.config.show": "Показать конфигурацию",
"templates.config.none": "Нет дополнительных настроек",
@@ -1259,6 +1260,10 @@
"automations.rule.application.match_type.topmost_fullscreen.desc": "В фокусе + полный экран",
"automations.rule.application.match_type.fullscreen": "Полный экран",
"automations.rule.application.match_type.fullscreen.desc": "Любое полноэкранное",
"automations.rule.application.apps.hint_android": "Имена пакетов, по одному в строке (напр. com.netflix.mediaclient)",
"automations.rule.application.search_apps": "Поиск приложений...",
"automations.rule.application.no_apps": "Приложения не найдены",
"automations.rule.application.usage_access_required": "Требуется доступ к статистике использования. Откройте LedGrab на телевизоре и нажмите «Разрешить доступ к статистике использования».",
"automations.rule.time_of_day": "Время суток",
"automations.rule.time_of_day.desc": "Диапазон времени",
"automations.rule.time_of_day.start_time": "Время начала:",
@@ -156,6 +156,7 @@
"templates.engine.wgc.desc": "Windows图形捕获",
"templates.engine.demo.desc": "动画测试图案(演示模式)",
"templates.engine.mediaprojection.desc": "原生Android屏幕捕获",
"templates.engine.android_camera.desc": "设备摄像头捕获 (Camera2)",
"templates.config": "配置",
"templates.config.show": "显示配置",
"templates.config.none": "无额外配置",
@@ -1255,6 +1256,10 @@
"automations.rule.application.match_type.topmost_fullscreen.desc": "前台 + 全屏",
"automations.rule.application.match_type.fullscreen": "全屏",
"automations.rule.application.match_type.fullscreen.desc": "任意全屏应用",
"automations.rule.application.apps.hint_android": "包名,每行一个(例如 com.netflix.mediaclient",
"automations.rule.application.search_apps": "筛选应用…",
"automations.rule.application.no_apps": "未找到应用",
"automations.rule.application.usage_access_required": "需要使用情况访问权限。在您的 LedGrab 电视上打开应用并点按「授予使用情况访问权限」。",
"automations.rule.time_of_day": "时段",
"automations.rule.time_of_day.desc": "时间范围",
"automations.rule.time_of_day.start_time": "开始时间:",
+15 -2
View File
@@ -30,11 +30,24 @@ class Rule:
@dataclass
class ApplicationRule(Rule):
"""Activate when specified applications are running or topmost."""
"""Activate when specified applications are running or topmost.
``apps`` values are platform-specific and NOT portable across OSes:
on Windows they are **process names** (e.g. ``chrome.exe``); on Android
they are **package names** (e.g. ``com.android.chrome``). Matching is
exact and case-insensitive. The automation editor sources values from the
right place per platform (running processes on desktop, launchable apps on
Android), so a rule authored on one OS will simply not match on another.
``match_type`` is honoured on Windows for all four values below. On Android
only the foreground app is obtainable, so every match type collapses to
"this app is in the foreground" and the editor hides the selector.
"""
rule_type: str = "application"
apps: List[str] = field(default_factory=list)
match_type: str = "running" # "running" | "topmost"
# "running" | "topmost" | "fullscreen" | "topmost_fullscreen"
match_type: str = "running"
def to_dict(self) -> dict:
d = super().to_dict()
@@ -92,3 +92,57 @@ class TestRootEndpoint:
resp = client.get("/")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
class TestInstalledAppsEndpoint:
def test_requires_auth(self, client):
resp = client.get("/api/v1/system/installed-apps")
assert resp.status_code == 401
def test_empty_off_android(self, client):
"""Desktop test host: is_android() is False, so the bridge wrapper
short-circuits to an empty list."""
resp = client.get("/api/v1/system/installed-apps", headers=_auth_headers())
assert resp.status_code == 200
assert resp.json() == {"apps": [], "count": 0}
def test_returns_apps_when_available(self, client, monkeypatch):
from ledgrab.core.automations import platform_detector as pd
monkeypatch.setattr(
pd,
"list_installed_apps",
lambda: [{"package": "com.netflix.mediaclient", "label": "Netflix"}],
)
resp = client.get("/api/v1/system/installed-apps", headers=_auth_headers())
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 1
assert data["apps"][0] == {"package": "com.netflix.mediaclient", "label": "Netflix"}
class TestSystemInfoEndpoint:
def test_requires_auth(self, client):
resp = client.get("/api/v1/system/info")
assert resp.status_code == 401
def test_desktop_signal(self, client):
resp = client.get("/api/v1/system/info", headers=_auth_headers())
assert resp.status_code == 200
data = resp.json()
assert data["is_android"] is False
assert data["app_match_kind"] == "process"
assert data["usage_access_granted"] is True
def test_android_signal(self, client, monkeypatch):
import ledgrab.utils.platform as plat
from ledgrab.core.automations import platform_detector as pd
monkeypatch.setattr(plat, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: False)
resp = client.get("/api/v1/system/info", headers=_auth_headers())
assert resp.status_code == 200
data = resp.json()
assert data["is_android"] is True
assert data["app_match_kind"] == "package"
assert data["usage_access_granted"] is False
@@ -0,0 +1,237 @@
"""Tests for the Android push-based notification backend.
These run on desktop CI (no Android device needed): ``is_android`` is
monkeypatched and the app label is pushed directly into the module-level
``push_notification`` receiver, exactly as the Kotlin
``NotificationListenerService`` would across the Chaquopy bridge.
Isolation (critical): the listener keeps process-global state
(``_android_target``, ``_instance``) and persists history to a hardcoded
``data/notification_history.json``. Every test resets those globals and
repoints ``_HISTORY_FILE`` to ``tmp_path`` so the suite never leaks state
between tests or clobbers the real repo data file.
"""
from datetime import datetime, timezone
import pytest
import ledgrab.core.processing.os_notification_listener as nl
from ledgrab.storage.color_strip_source import NotificationColorStripSource
PLATFORM_MOD = "ledgrab.utils.platform"
# ---------------------------------------------------------------------------
# Test doubles
# ---------------------------------------------------------------------------
class _FakeStream:
"""Stub NotificationColorStripStream — records fire() calls."""
def __init__(self, accept: bool = True):
self._accept = accept
self.fired_with: list = []
def fire(self, app_name=None) -> bool:
self.fired_with.append(app_name)
return self._accept
class _FakeStore:
def __init__(self, sources):
self._sources = sources
def get_all_sources(self):
return list(self._sources)
class _FakeStreamManager:
def __init__(self, streams):
self._streams = streams
def get_streams_by_source_id(self, source_id):
return list(self._streams)
def _notif_source(
*, source_id: str = "css_test", os_listener: bool = True
) -> NotificationColorStripSource:
now = datetime.now(timezone.utc)
return NotificationColorStripSource.create_from_kwargs(
id=source_id,
name="Test Notification Source",
source_type="notification",
created_at=now,
updated_at=now,
os_listener=os_listener,
)
# ---------------------------------------------------------------------------
# Fixtures — module-global + disk isolation
# ---------------------------------------------------------------------------
@pytest.fixture
def nl_mod(monkeypatch, tmp_path):
"""Reset module globals and repoint the history file to tmp_path.
``monkeypatch.setattr`` auto-restores originals on teardown, so even though
``start()``/``stop()`` rebind ``_android_target`` and ``_instance`` during a
test, the globals are returned to their pre-test values afterward — no
cross-test leakage and no write to the real repo ``data/`` dir.
"""
monkeypatch.setattr(nl, "_android_target", None)
monkeypatch.setattr(nl, "_instance", None)
monkeypatch.setattr(nl, "_HISTORY_FILE", tmp_path / "notification_history.json")
return nl
# ---------------------------------------------------------------------------
# _AndroidBackend.probe()
# ---------------------------------------------------------------------------
def test_probe_true_on_android(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
assert nl_mod._AndroidBackend.probe() is True
def test_probe_false_on_desktop(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: False)
assert nl_mod._AndroidBackend.probe() is False
# ---------------------------------------------------------------------------
# push_notification() routing contract
# ---------------------------------------------------------------------------
def test_push_is_noop_before_start(nl_mod):
# _android_target is None → no callback, no exception.
nl_mod.push_notification("Telegram") # must not raise
def test_push_routes_after_start_and_stops_after_stop(nl_mod):
received: list = []
backend = nl_mod._AndroidBackend(on_notification=received.append)
backend.start()
nl_mod.push_notification("Telegram")
assert received == ["Telegram"]
backend.stop()
nl_mod.push_notification("Signal") # no-op after stop
assert received == ["Telegram"]
def test_push_swallows_callback_exception(nl_mod):
def boom(_app):
raise RuntimeError("callback exploded")
nl_mod._AndroidBackend(on_notification=boom).start()
# JNI entry point must never propagate — would crash the bound service.
nl_mod.push_notification("X")
# ---------------------------------------------------------------------------
# Integration — start() selects Android, push fires the stream + records history
# ---------------------------------------------------------------------------
def test_android_selected_push_fires_stream_and_records_history(nl_mod, monkeypatch, tmp_path):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
stream = _FakeStream(accept=True)
listener = nl_mod.OsNotificationListener(
_FakeStore([_notif_source(os_listener=True)]),
_FakeStreamManager([stream]),
)
listener.start()
assert listener.available is True # flips True on backend selection, not on push
nl_mod.push_notification("Telegram")
assert stream.fired_with == ["Telegram"]
assert listener.recent_history[0]["app"] == "Telegram"
assert listener.recent_history[0]["fired"] == 1
# history written under tmp_path — never the repo data/ dir
assert nl_mod._HISTORY_FILE.exists()
assert nl_mod._HISTORY_FILE.parent == tmp_path
listener.stop()
def test_push_with_none_app_name_is_recorded(nl_mod, monkeypatch):
# The Windows (_extract_app_name) and Linux D-Bus paths can yield None;
# the Android path falls back to the package name, but None must still be
# handled end-to-end without raising.
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
stream = _FakeStream(accept=True)
listener = nl_mod.OsNotificationListener(
_FakeStore([_notif_source(os_listener=True)]),
_FakeStreamManager([stream]),
)
listener.start()
nl_mod.push_notification(None)
assert stream.fired_with == [None]
assert listener.recent_history[0]["app"] is None
listener.stop()
def test_get_os_notification_listener_tracks_started_instance(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
assert nl_mod.get_os_notification_listener() is None
listener = nl_mod.OsNotificationListener(_FakeStore([]), _FakeStreamManager([]))
listener.start()
assert nl_mod.get_os_notification_listener() is listener
listener.stop()
def test_source_with_os_listener_off_does_not_fire(nl_mod, monkeypatch):
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: True)
stream = _FakeStream()
listener = nl_mod.OsNotificationListener(
_FakeStore([_notif_source(os_listener=False)]),
_FakeStreamManager([stream]),
)
listener.start()
nl_mod.push_notification("Telegram")
assert stream.fired_with == [] # os_listener=False → skipped
listener.stop()
# ---------------------------------------------------------------------------
# Desktop regression — the probe-order change must not alter desktop selection
# ---------------------------------------------------------------------------
def test_android_probe_false_on_real_desktop(nl_mod, monkeypatch):
# With is_android() False, the new first-in-tuple backend must not be selectable.
monkeypatch.setattr(f"{PLATFORM_MOD}.is_android", lambda: False)
assert nl_mod._AndroidBackend.probe() is False
def test_desktop_selection_unchanged_windows_wins(nl_mod, monkeypatch):
# Deterministically control probes and stub start() so no real polling thread spawns.
# Order under test is (_AndroidBackend, _WindowsBackend, _LinuxBackend): Android skipped,
# Windows is the first True → it must be the selected backend, exactly as before.
monkeypatch.setattr(nl_mod._AndroidBackend, "probe", staticmethod(lambda: False))
monkeypatch.setattr(nl_mod._WindowsBackend, "probe", staticmethod(lambda: True))
monkeypatch.setattr(nl_mod._LinuxBackend, "probe", staticmethod(lambda: False))
started: list = []
monkeypatch.setattr(nl_mod._WindowsBackend, "start", lambda self: started.append("win"))
listener = nl_mod.OsNotificationListener(_FakeStore([]), _FakeStreamManager([]))
listener.start()
assert listener.available is True
assert isinstance(listener._backend, nl_mod._WindowsBackend)
assert started == ["win"]
@@ -0,0 +1,342 @@
"""Tests for the Android camera (webcam) capture engine.
These run on desktop CI (no Android device needed): ``is_android`` and the
Kotlin-bridge hooks (``list_cameras`` / ``start_camera`` / ``stop_camera``)
are monkeypatched, and RGB frames are pushed directly into the module-level
queue, exactly as the Kotlin ``CameraBridge`` would.
"""
import queue
import numpy as np
import pytest
# Importing the package triggers auto-registration of AndroidCameraEngine.
import ledgrab.core.capture_engines # noqa: F401
from ledgrab.core.capture_engines import android_camera_engine as eng
from ledgrab.core.capture_engines.factory import EngineRegistry
ENGINE_MOD = "ledgrab.core.capture_engines.android_camera_engine"
W = 16
H = 8
_FAKE_CAMERAS = [
{"index": 0, "name": "Back camera", "facing": "back"},
{"index": 1, "name": "Front camera", "facing": "front"},
]
# ---------------------------------------------------------------------------
# Helpers / fixtures
# ---------------------------------------------------------------------------
def _drain() -> None:
while not eng._frame_queue.empty():
try:
eng._frame_queue.get_nowait()
except queue.Empty:
break
def _frame(marker: int = 0, w: int = W, h: int = H) -> bytes:
"""A tightly-packed RGB frame whose first pixel's R channel is ``marker``."""
arr = np.zeros((h, w, 3), dtype=np.uint8)
arr[0, 0, 0] = marker
return arr.tobytes()
@pytest.fixture
def reset_engine():
"""Reset module-global engine state; snapshot/restore the registry.
The engine keeps its queue + caches in module globals and the registry
is a class-level singleton — both must be restored so this test file
never disturbs the desktop engines other tests rely on.
"""
saved_engines = dict(EngineRegistry._engines)
eng.shutdown()
_drain()
eng._frames_received = 0
eng._active = False
eng._active_index = 0
eng._last_frame = None
eng._cam_cache = None
eng._cam_cache_time = 0.0
eng._owner_index = None
eng._owner_refs = 0
yield eng
eng.shutdown()
_drain()
eng._cam_cache = None
eng._cam_cache_time = 0.0
eng._owner_index = None
eng._owner_refs = 0
EngineRegistry._engines.clear()
EngineRegistry._engines.update(saved_engines)
@pytest.fixture
def on_android(monkeypatch, reset_engine):
"""Engine fixture with ``is_android`` True, demo mode off, fake cameras,
and the open/close hooks stubbed to succeed (recording calls)."""
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr("ledgrab.core.capture_engines.factory.is_demo_mode", lambda: False)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
calls = {"start": [], "stop": []}
monkeypatch.setattr(
f"{ENGINE_MOD}.start_camera",
lambda index, w, h: calls["start"].append((index, w, h)) or True,
)
monkeypatch.setattr(
f"{ENGINE_MOD}.stop_camera",
lambda index: calls["stop"].append(index),
)
reset_engine.calls = calls
return reset_engine
# ---------------------------------------------------------------------------
# Queue / push contract
# ---------------------------------------------------------------------------
def test_push_frame_round_trips_rgb(on_android):
# Arrange
stream = eng.AndroidCameraEngine.create_stream(0, {})
stream.initialize()
# Act
eng.push_frame(_frame(marker=42), W, H)
got = stream.capture_frame()
# Assert
assert got is not None
assert got.image.shape == (H, W, 3)
assert got.image.dtype == np.uint8
assert int(got.image[0, 0, 0]) == 42
assert got.width == W and got.height == H
def test_queue_drops_oldest_when_full(reset_engine):
# Arrange
maxsize = eng._frame_queue.maxsize # 2
# Act — push more frames than the queue holds, each tagged 0..N-1
total = maxsize + 3
for i in range(total):
eng.push_frame(_frame(marker=i), W, H)
drained = []
while True:
try:
drained.append(eng._frame_queue.get_nowait())
except queue.Empty:
break
# Assert — only the newest `maxsize` frames survived, oldest dropped
assert len(drained) == maxsize
markers = [int(f.image[0, 0, 0]) for f in drained]
assert markers == list(range(total - maxsize, total))
def test_capture_frame_falls_back_to_last_frame_when_empty(on_android):
# Arrange
stream = eng.AndroidCameraEngine.create_stream(0, {})
stream.initialize()
eng.push_frame(_frame(marker=7), W, H)
# Act — first read drains the queue; second read finds it empty
first = stream.capture_frame()
second = stream.capture_frame()
# Assert — the static-frame fallback returns the cached last frame
assert first is not None
assert second is not None
assert int(second.image[0, 0, 0]) == 7
def test_push_frame_short_buffer_does_not_crash(reset_engine):
# A buffer shorter than width*height*3 must be dropped, not reshape-crash.
eng.push_frame(b"\x01\x02\x03", W, H) # far too short
assert eng._frame_queue.empty()
assert eng._last_frame is None
# ---------------------------------------------------------------------------
# On-demand open/close lifecycle
# ---------------------------------------------------------------------------
def test_initialize_opens_camera_with_parsed_resolution(on_android):
stream = eng.AndroidCameraEngine.create_stream(1, {"resolution": "1280x720"})
stream.initialize()
assert on_android.calls["start"] == [(1, 1280, 720)]
def test_initialize_auto_resolution_requests_zero(on_android):
stream = eng.AndroidCameraEngine.create_stream(0, {"resolution": "auto"})
stream.initialize()
assert on_android.calls["start"] == [(0, 0, 0)]
def test_cleanup_closes_camera_once(on_android):
stream = eng.AndroidCameraEngine.create_stream(0, {})
stream.initialize()
stream.cleanup()
assert on_android.calls["stop"] == [0]
# Idempotent — a second cleanup does not re-signal the bridge.
stream.cleanup()
assert on_android.calls["stop"] == [0]
def test_second_camera_index_is_refused(on_android):
# First stream owns camera 0.
s0 = eng.AndroidCameraEngine.create_stream(0, {})
s0.initialize()
# A stream on a DIFFERENT camera must be refused (one camera at a time),
# not silently steal camera 0's stream.
s1 = eng.AndroidCameraEngine.create_stream(1, {})
with pytest.raises(RuntimeError):
s1.initialize()
# Only the first open reached the bridge.
assert on_android.calls["start"] == [(0, 0, 0)]
def test_same_camera_attaches_and_refcounts(on_android):
# Two streams on the SAME camera share one physical open (ref-counted).
a = eng.AndroidCameraEngine.create_stream(0, {})
b = eng.AndroidCameraEngine.create_stream(0, {})
a.initialize()
b.initialize()
assert on_android.calls["start"] == [(0, 0, 0)] # opened once
# First release must NOT stop the camera (the other stream is still live).
a.cleanup()
assert on_android.calls["stop"] == []
# Last release stops it exactly once.
b.cleanup()
assert on_android.calls["stop"] == [0]
def test_camera_freed_after_release_allows_other_index(on_android):
# After fully releasing camera 0, a different camera can be opened.
s0 = eng.AndroidCameraEngine.create_stream(0, {})
s0.initialize()
s0.cleanup()
s1 = eng.AndroidCameraEngine.create_stream(1, {})
s1.initialize() # must not raise
assert on_android.calls["start"] == [(0, 0, 0), (1, 0, 0)]
def test_initialize_raises_when_open_fails(monkeypatch, reset_engine):
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr(f"{ENGINE_MOD}.start_camera", lambda index, w, h: False)
stream = eng.AndroidCameraEngine.create_stream(0, {})
with pytest.raises(RuntimeError):
stream.initialize()
def test_initialize_raises_off_android(monkeypatch, reset_engine):
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
stream = eng.AndroidCameraEngine.create_stream(0, {})
with pytest.raises(RuntimeError):
stream.initialize()
# ---------------------------------------------------------------------------
# Availability / enumeration (platform-gated)
# ---------------------------------------------------------------------------
def test_is_available_requires_android_and_cameras(monkeypatch, reset_engine):
# Off-Android → unavailable regardless of cameras.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
assert eng.AndroidCameraEngine.is_available() is False
# On-Android but no cameras → unavailable.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: [])
eng._cam_cache = None # bust the enumeration cache
assert eng.AndroidCameraEngine.is_available() is False
# On-Android with ≥1 camera → available.
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
eng._cam_cache = None
assert eng.AndroidCameraEngine.is_available() is True
def test_get_available_displays_maps_cameras(on_android):
displays = eng.AndroidCameraEngine.get_available_displays()
assert len(displays) == 2
assert displays[0].index == 0 and displays[0].name == "Back camera"
assert displays[0].is_primary is True
assert displays[1].index == 1 and displays[1].name == "Front camera"
assert displays[1].is_primary is False
def test_config_choices_expose_resolution(reset_engine):
choices = eng.AndroidCameraEngine.get_config_choices()
assert "resolution" in choices
assert "auto" in choices["resolution"]
assert "1920x1080" in choices["resolution"]
# ---------------------------------------------------------------------------
# Registry integration
# ---------------------------------------------------------------------------
def test_engine_registers_with_expected_type_and_priority():
# Auto-registration ran on import; the engine is in the registry.
assert "android_camera" in EngineRegistry.get_all_engines()
assert eng.AndroidCameraEngine.ENGINE_PRIORITY == 0
assert eng.AndroidCameraEngine.HAS_OWN_DISPLAYS is True
def test_does_not_beat_mediaprojection_by_priority(monkeypatch, reset_engine):
"""Priority 0 must never let the camera win the best-engine race over
MediaProjection (100) on Android."""
from ledgrab.core.capture_engines import mediaprojection_engine as mp
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr(f"{ENGINE_MOD}.list_cameras", lambda: list(_FAKE_CAMERAS))
monkeypatch.setattr("ledgrab.core.capture_engines.factory.is_demo_mode", lambda: False)
eng._cam_cache = None
# Controlled registry: just the two engines whose priority race we assert.
EngineRegistry._engines.clear()
EngineRegistry.register(mp.MediaProjectionEngine)
EngineRegistry.register(eng.AndroidCameraEngine)
mp.configure(640, 480) # make MediaProjection available
try:
best = EngineRegistry.get_best_available_engine()
assert best == "mediaprojection"
assert best != "android_camera"
finally:
mp.shutdown()
while not mp._frame_queue.empty():
try:
mp._frame_queue.get_nowait()
except queue.Empty:
break
def test_stream_via_registry_yields_pushed_frame(on_android):
# Arrange — register cleanly (fixture restores afterward).
stream = EngineRegistry.create_stream("android_camera", 0, {})
stream.initialize()
# Act
eng.push_frame(_frame(marker=99), W, H)
got = stream.capture_frame()
# Assert
assert got is not None
assert int(got.image[0, 0, 0]) == 99
assert got.display_index == 0
@@ -0,0 +1,194 @@
"""Tests for Android foreground-app detection in PlatformDetector.
These run on desktop CI (no Android device needed): ``is_android`` and the
Kotlin-bridge wrappers (``has_usage_access`` / ``get_foreground_package``) are
monkeypatched, exactly as the Kotlin ``ForegroundAppBridge`` would drive them on
device. The critical invariant under test is that the Android branch runs *ahead
of* the import-time ``_IS_WINDOWS`` guard, and that the Windows/desktop paths are
left untouched.
"""
import pytest
from ledgrab.core.automations import platform_detector as pd
from ledgrab.core.automations.platform_detector import PlatformDetector
@pytest.fixture
def detector(monkeypatch):
"""A PlatformDetector with the Windows display-power listener stubbed out.
``__init__`` otherwise spawns a thread that registers a global window class +
runs a ctypes message pump — irrelevant here and noisy when many detectors are
constructed in one process.
"""
monkeypatch.setattr(PlatformDetector, "_display_power_listener", lambda self: None)
return PlatformDetector()
@pytest.fixture(autouse=True)
def _reset_warn():
"""Reset the process-global warn-once flag around every test."""
pd._warned_no_usage_access = False
yield
pd._warned_no_usage_access = False
# ---------------------------------------------------------------------------
# topmost (foreground) detection
# ---------------------------------------------------------------------------
def test_topmost_android_returns_lowercased_foreground_package(detector, monkeypatch):
monkeypatch.setattr(pd, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: True)
monkeypatch.setattr(pd, "get_foreground_package", lambda: "com.Netflix.MediaClient")
assert detector._get_topmost_process_sync() == ("com.netflix.mediaclient", True)
def test_topmost_android_no_access_returns_none_and_warns_once(detector, monkeypatch):
monkeypatch.setattr(pd, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: False)
fg_calls = []
monkeypatch.setattr(pd, "get_foreground_package", lambda: fg_calls.append(1) or "x")
warns = []
monkeypatch.setattr(pd.logger, "warning", lambda *a, **k: warns.append(a))
assert detector._get_topmost_process_sync() == (None, False)
assert detector._get_topmost_process_sync() == (None, False)
# Foreground is never queried when access is missing; warned exactly once.
assert fg_calls == []
assert len(warns) == 1
assert pd._warned_no_usage_access is True
def test_topmost_android_no_foreground_event_returns_none(detector, monkeypatch):
monkeypatch.setattr(pd, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: True)
monkeypatch.setattr(pd, "get_foreground_package", lambda: None)
assert detector._get_topmost_process_sync() == (None, False)
def test_android_branch_precedes_windows_guard(detector, monkeypatch):
"""Even with _IS_WINDOWS True, is_android() must win.
Proves the Android branch sits ahead of the ``if not _IS_WINDOWS`` early
return and never falls through to the Win32 ctypes path (the plan-review
critical gap: a naive wiring would no-op behind the Windows guard).
"""
monkeypatch.setattr(pd, "_IS_WINDOWS", True)
monkeypatch.setattr(pd, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: True)
monkeypatch.setattr(pd, "get_foreground_package", lambda: "com.App.X")
assert detector._get_topmost_process_sync() == ("com.app.x", True)
# ---------------------------------------------------------------------------
# running / fullscreen best-effort (foreground app as the sole entry)
# ---------------------------------------------------------------------------
def test_running_and_fullscreen_android_return_foreground_set(detector, monkeypatch):
monkeypatch.setattr(pd, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: True)
monkeypatch.setattr(pd, "get_foreground_package", lambda: "com.App.Y")
assert detector._get_running_processes_sync() == {"com.app.y"}
assert detector._get_fullscreen_processes_sync() == {"com.app.y"}
def test_running_and_fullscreen_android_empty_without_access(detector, monkeypatch):
monkeypatch.setattr(pd, "is_android", lambda: True)
monkeypatch.setattr(pd, "has_usage_access", lambda: False)
monkeypatch.setattr(pd, "get_foreground_package", lambda: "x")
assert detector._get_running_processes_sync() == set()
assert detector._get_fullscreen_processes_sync() == set()
# ---------------------------------------------------------------------------
# desktop paths untouched
# ---------------------------------------------------------------------------
def test_non_android_non_windows_skips_bridge(detector, monkeypatch):
"""Desktop Linux/mac: no Android branch, no Win32 path, empty results, and
the bridge wrappers are never consulted."""
monkeypatch.setattr(pd, "_IS_WINDOWS", False)
monkeypatch.setattr(pd, "is_android", lambda: False)
calls = []
monkeypatch.setattr(pd, "get_foreground_package", lambda: calls.append("fg"))
monkeypatch.setattr(pd, "has_usage_access", lambda: calls.append("acc") or True)
assert detector._get_topmost_process_sync() == (None, False)
assert detector._get_running_processes_sync() == set()
assert detector._get_fullscreen_processes_sync() == set()
assert calls == []
def test_wrappers_return_safe_defaults_off_android(monkeypatch):
"""is_android() False short-circuits the bridge accessor to None, so the
public wrappers return safe defaults without any java interop."""
monkeypatch.setattr(pd, "is_android", lambda: False)
assert pd._foreground_bridge() is None
assert pd.has_usage_access() is False
assert pd.get_foreground_package() is None
assert pd.list_installed_apps() == []
# ---------------------------------------------------------------------------
# bridge-response parsing wrappers (fed via a fake bridge object)
# ---------------------------------------------------------------------------
class _FakeBridge:
"""Stand-in for the Kotlin ForegroundAppBridge singleton."""
def __init__(self, fg=None, apps_json=None):
self._fg = fg
self._apps_json = apps_json
def getForegroundPackage(self):
return self._fg
def listLaunchableApps(self):
return self._apps_json
def test_get_foreground_package_strips_whitespace(monkeypatch):
# Stripped but NOT lowercased — the caller (_get_android_foreground) lowercases.
monkeypatch.setattr(pd, "_foreground_bridge", lambda: _FakeBridge(fg=" com.App.X "))
assert pd.get_foreground_package() == "com.App.X"
def test_get_foreground_package_blank_returns_none(monkeypatch):
monkeypatch.setattr(pd, "_foreground_bridge", lambda: _FakeBridge(fg=" "))
assert pd.get_foreground_package() is None
def test_list_installed_apps_parses_and_filters(monkeypatch):
import json
payload = json.dumps(
[
{"package": "com.a", "label": "A"},
{"package": "com.b", "label": ""}, # blank label -> falls back to package
{"label": "no package"}, # skipped: no package
"not a dict", # skipped: not an object
]
)
monkeypatch.setattr(pd, "_foreground_bridge", lambda: _FakeBridge(apps_json=payload))
assert pd.list_installed_apps() == [
{"package": "com.a", "label": "A"},
{"package": "com.b", "label": "com.b"},
]
def test_list_installed_apps_invalid_json_returns_empty(monkeypatch):
monkeypatch.setattr(pd, "_foreground_bridge", lambda: _FakeBridge(apps_json="not json{"))
assert pd.list_installed_apps() == []