feat(audio): Android on-device system playback capture

Enable audio-reactive lighting on the Android-TV build. A push-based
AndroidAudioEngine captures system playback audio via AudioPlaybackCapture
(API 29+), reusing the existing MediaProjection token, and feeds PCM into
the unchanged AudioAnalyzer pipeline. No new Python deps; no Chaquopy/pip
changes (numpy already bundled).

- Python: android_audio_engine.py — module-level queue + configure/
  push_samples/shutdown mirroring mediaprojection_engine; AndroidAudioEngine
  (priority 100) registered behind a guarded import. push_samples copies and
  defensively trims/clamps each block so the analyzer can't crash on
  variable-length or non-frame-divisible PCM.
- Kotlin: AudioCapture.kt — AudioRecord + AudioPlaybackCaptureConfiguration,
  fixed chunk-size block framing, little-endian float32, mic fallback;
  reads back the actual negotiated channel/sample rate. PythonBridge gains
  configureAudio/pushAudio/shutdownAudio with a cached module handle.
- Wiring: CaptureService starts/stops AudioCapture in the MediaProjection
  path (gated on API>=29 + RECORD_AUDIO + live projection); MainActivity
  requests RECORD_AUDIO; manifest declares it. Degrades gracefully when
  denied; root path stays audio-less by design.
- Tests: 13 desktop-CI tests incl. an over-length/non-divisible regression
  guard that exercises the full read_chunk -> AudioAnalyzer.analyze path.
This commit is contained in:
2026-06-02 03:28:22 +03:00
parent 669ae20824
commit fd62db1720
8 changed files with 833 additions and 0 deletions
+17
View File
@@ -38,6 +38,19 @@ try:
except ImportError:
_has_sounddevice = False
# Android playback-capture engine — pure Python (numpy only), but the
# guard keeps the registration pattern uniform and tolerant of any future
# import-time dependency.
try:
from ledgrab.core.audio.android_audio_engine import (
AndroidAudioEngine,
AndroidAudioCaptureStream,
)
_has_android_audio = True
except ImportError:
_has_android_audio = False
from ledgrab.core.audio.demo_engine import DemoAudioEngine, DemoAudioCaptureStream
# Auto-register available engines
@@ -45,6 +58,8 @@ if _has_wasapi:
AudioEngineRegistry.register(WasapiEngine)
if _has_sounddevice:
AudioEngineRegistry.register(SounddeviceEngine)
if _has_android_audio:
AudioEngineRegistry.register(AndroidAudioEngine)
AudioEngineRegistry.register(DemoAudioEngine)
__all__ = [
@@ -65,3 +80,5 @@ if _has_wasapi:
__all__ += ["WasapiEngine", "WasapiCaptureStream"]
if _has_sounddevice:
__all__ += ["SounddeviceEngine", "SounddeviceCaptureStream"]
if _has_android_audio:
__all__ += ["AndroidAudioEngine", "AndroidAudioCaptureStream"]
@@ -0,0 +1,229 @@
"""Android playback-capture audio engine.
Receives PCM pushed from Kotlin (via Chaquopy) through a module-level
sample queue. The Kotlin layer captures system playback audio with
``AudioRecord`` + ``AudioPlaybackCaptureConfiguration`` (reusing the
app's ``MediaProjection`` token) and calls :func:`push_samples` with
interleaved float32 PCM for each fixed-size block.
Mirrors the screen-capture bridge
(``core/capture_engines/mediaprojection_engine.py``): a module-level
queue plus ``configure`` / ``push_samples`` / ``shutdown`` filled by
Kotlin, consumed through the standard :class:`AudioCaptureStreamBase`
interface so :class:`~ledgrab.core.audio.audio_capture.ManagedAudioStream`
and :class:`~ledgrab.core.audio.analysis.AudioAnalyzer` work unchanged.
This engine is only available when running inside the LedGrab Android
app, which has set up the sample queue via :func:`configure`.
"""
import queue
from typing import Any, Dict, List
import numpy as np
from ledgrab.core.audio.base import (
AudioCaptureEngine,
AudioCaptureStreamBase,
AudioDeviceInfo,
)
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_android
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Sample queue — the bridge between Kotlin and Python
# ---------------------------------------------------------------------------
_pcm_queue: "queue.Queue[np.ndarray]" = queue.Queue(maxsize=8)
_sample_rate = 48000
_channels = 2
_chunk_size = 1024
_active = False
_frames_received = 0
def configure(sample_rate: int, channels: int, chunk_size: int) -> None:
"""Set the stream format. Called from Kotlin before frames flow.
Drains any stale PCM from a previous capture session so the first
chunk after a restart is actually current. ``channels`` /
``sample_rate`` should be the values the Kotlin ``AudioRecord``
actually negotiated (which can differ from the requested values,
e.g. a stereo request that falls back to mono) — the analyzer keys
off these, so they must match the interleaving of pushed samples.
"""
global _sample_rate, _channels, _chunk_size, _active, _frames_received
while not _pcm_queue.empty():
try:
_pcm_queue.get_nowait()
except queue.Empty:
break
_sample_rate = sample_rate
_channels = max(1, channels)
_chunk_size = max(1, chunk_size)
_frames_received = 0
_active = True
logger.info(
"Android audio engine configured: sr=%d channels=%d chunk=%d",
_sample_rate,
_channels,
_chunk_size,
)
def push_samples(pcm_float32: bytes) -> None:
"""Push one interleaved float32 PCM block from Kotlin.
The byte buffer is interpreted as native-endian float32 (Kotlin
packs little-endian; all Android ABIs are little-endian). Drops the
oldest queued block if the consumer is slow (non-blocking).
Defensive framing: the downstream :class:`AudioAnalyzer` reshapes to
``(-1, channels)`` and copies into ``chunk_size``-sized scratch
buffers, so it raises on a block whose length is not a whole number
of frames or that exceeds ``chunk_size`` frames. We trim to a whole
multiple of ``_channels`` and clamp to ``_chunk_size`` frames so a
malformed push can never crash the capture thread.
"""
global _frames_received
# np.frombuffer raises if the length isn't a whole number of float32s.
# Kotlin always pushes complete blocks, but guard so a malformed buffer is
# dropped here rather than surfacing as an exception across the JNI bridge.
if len(pcm_float32) % 4 != 0:
return
samples = np.frombuffer(pcm_float32, dtype=np.float32)
# Trim to whole frames, then clamp to chunk_size frames.
frames = len(samples) // _channels
if frames <= 0:
return
frames = min(frames, _chunk_size)
usable = frames * _channels
# Copy out of the read-only frombuffer view so the queued block owns its
# memory. This lets the Kotlin side push from a reusable buffer (low GC on
# low-end TV boxes) without the not-yet-consumed queued block aliasing
# bytes Kotlin is about to overwrite. Mirrors mediaprojection_engine's
# push_frame .copy().
block = samples[:usable].copy()
_frames_received += 1
if _frames_received == 1 or _frames_received % 100 == 0:
logger.info("Android audio: received %d blocks", _frames_received)
try:
_pcm_queue.put_nowait(block)
except queue.Full:
try:
_pcm_queue.get_nowait()
except queue.Empty:
pass
try:
_pcm_queue.put_nowait(block)
except queue.Full:
pass
def shutdown() -> None:
"""Deactivate the engine. Called when the Android app stops audio."""
global _active
_active = False
logger.info("Android audio engine shut down")
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class AndroidAudioCaptureStream(AudioCaptureStreamBase):
"""Reads PCM blocks pushed by Kotlin from the module-level queue."""
@property
def channels(self) -> int:
return _channels
@property
def sample_rate(self) -> int:
return _sample_rate
@property
def chunk_size(self) -> int:
return _chunk_size
def initialize(self) -> None:
if self._initialized:
return
if not _active:
raise RuntimeError(
"Android audio engine not configured. "
"This engine is only available inside the Android app."
)
self._initialized = True
logger.info("Android audio capture stream initialized")
def cleanup(self) -> None:
self._initialized = False
logger.info("Android audio capture stream cleaned up")
def read_chunk(self) -> np.ndarray | None:
try:
return _pcm_queue.get(timeout=0.1) # 1-D float32 interleaved
except queue.Empty:
return None
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class AndroidAudioEngine(AudioCaptureEngine):
"""Android playback-capture audio engine.
Only available when running inside the LedGrab Android app, which
calls :func:`configure` once audio capture is set up. Exposes a
single loopback "device" representing the system audio mix.
"""
ENGINE_TYPE = "android_playback"
ENGINE_PRIORITY = 100 # highest on a real Android device (demo only wins in demo mode)
@classmethod
def is_available(cls) -> bool:
return is_android() and _active
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"sample_rate": _sample_rate,
"channels": _channels,
"chunk_size": _chunk_size,
}
@classmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
if not cls.is_available():
return []
return [
AudioDeviceInfo(
index=0,
name="Android playback (system audio)",
is_input=True,
is_loopback=True,
channels=_channels,
default_samplerate=float(_sample_rate),
)
]
@classmethod
def create_stream(
cls,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> AndroidAudioCaptureStream:
merged = {**cls.get_default_config(), **config}
return AndroidAudioCaptureStream(device_index, is_loopback, merged)
@@ -0,0 +1,253 @@
"""Tests for the Android playback-capture audio engine.
These run on desktop CI (no Android device needed): ``is_android`` is
monkeypatched and PCM is pushed directly into the module-level queue,
exactly as the Kotlin bridge would.
"""
import queue
import numpy as np
import pytest
# Importing the package triggers auto-registration of AndroidAudioEngine.
import ledgrab.core.audio # noqa: F401
from ledgrab.core.audio import android_audio_engine as eng
from ledgrab.core.audio.analysis import AudioAnalysis, AudioAnalyzer
from ledgrab.core.audio.audio_capture import AudioCaptureManager
from ledgrab.core.audio.factory import AudioEngineRegistry
ENGINE_MOD = "ledgrab.core.audio.android_audio_engine"
SAMPLE_RATE = 48000
CHANNELS = 2
CHUNK = 1024
# ---------------------------------------------------------------------------
# Helpers / fixtures
# ---------------------------------------------------------------------------
def _drain() -> None:
while not eng._pcm_queue.empty():
try:
eng._pcm_queue.get_nowait()
except queue.Empty:
break
def _block(marker: float = 0.0, frames: int = CHUNK, channels: int = CHANNELS) -> np.ndarray:
"""A float32 interleaved block whose first sample is ``marker``."""
data = np.zeros(frames * channels, dtype=np.float32)
data[0] = marker
return data
@pytest.fixture
def reset_engine():
"""Reset module-global engine state; snapshot/restore the registry.
The engine keeps its queue + format in module globals and the registry
is a class-level singleton — both must be restored so this test file
never disturbs the desktop engines other tests rely on.
"""
saved_engines = dict(AudioEngineRegistry._engines)
eng.shutdown()
_drain()
eng._sample_rate = SAMPLE_RATE
eng._channels = CHANNELS
eng._chunk_size = CHUNK
eng._frames_received = 0
yield eng
eng.shutdown()
_drain()
AudioEngineRegistry._engines.clear()
AudioEngineRegistry._engines.update(saved_engines)
@pytest.fixture
def on_android(monkeypatch, reset_engine):
"""Engine fixture with ``is_android`` forced True and demo mode off."""
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr("ledgrab.core.audio.factory.is_demo_mode", lambda: False)
return reset_engine
# ---------------------------------------------------------------------------
# Queue / push contract
# ---------------------------------------------------------------------------
def test_configure_then_push_round_trips_samples(reset_engine):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
samples = np.arange(CHUNK * CHANNELS, dtype=np.float32)
# Act
eng.push_samples(samples.tobytes())
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
stream.initialize()
got = stream.read_chunk()
# Assert
assert got is not None
np.testing.assert_array_equal(got, samples)
def test_queue_drops_oldest_when_full(reset_engine):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
maxsize = eng._pcm_queue.maxsize # 8
# Act — push more blocks than the queue can hold, each tagged 0..N-1
total = maxsize + 2
for i in range(total):
eng.push_samples(_block(marker=float(i)).tobytes())
drained = []
while True:
try:
drained.append(eng._pcm_queue.get_nowait())
except queue.Empty:
break
# Assert — only the newest `maxsize` blocks survived, oldest dropped
assert len(drained) == maxsize
markers = [int(b[0]) for b in drained]
assert markers == list(range(total - maxsize, total))
def test_initialize_raises_when_not_configured(reset_engine):
# Arrange — fixture left the engine inactive
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
# Act / Assert
with pytest.raises(RuntimeError):
stream.initialize()
def test_read_chunk_returns_none_when_empty(reset_engine):
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
stream.initialize()
assert stream.read_chunk() is None
# ---------------------------------------------------------------------------
# Availability / enumeration (platform-gated)
# ---------------------------------------------------------------------------
def test_is_available_requires_android_and_active(monkeypatch, reset_engine):
# Not configured yet → inactive → unavailable even on Android.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
assert eng.AndroidAudioEngine.is_available() is False
# Configured → active + Android → available.
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
assert eng.AndroidAudioEngine.is_available() is True
# Active but not on Android → unavailable.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
assert eng.AndroidAudioEngine.is_available() is False
def test_enumerate_devices(on_android):
# Inactive → no devices.
assert eng.AndroidAudioEngine.enumerate_devices() == []
# Active → exactly one loopback device.
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
devices = eng.AndroidAudioEngine.enumerate_devices()
assert len(devices) == 1
dev = devices[0]
assert dev.is_loopback is True
assert dev.is_input is True
assert "Android playback" in dev.name
assert dev.channels == CHANNELS
# ---------------------------------------------------------------------------
# Regression guard — the analyzer must never crash on a malformed block
# (over-length or non-frame-divisible). This is the on-device failure the
# plan review surfaced; the desktop suite must catch it.
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw_floats",
[
(CHUNK + 100) * CHANNELS, # over-length (more frames than chunk_size)
CHUNK * CHANNELS + 1, # not a whole number of stereo frames
3, # tiny + odd
CHUNK * CHANNELS, # exact (control)
],
)
def test_pushed_block_never_crashes_analyzer(reset_engine, raw_floats):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
pcm = np.random.default_rng(0).standard_normal(raw_floats).astype(np.float32)
analyzer = AudioAnalyzer(sample_rate=SAMPLE_RATE, chunk_size=CHUNK)
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
stream.initialize()
# Act
eng.push_samples(pcm.tobytes())
chunk = stream.read_chunk()
# Assert — chunk is a safe shape and analyze() does not raise.
assert chunk is not None
assert len(chunk) % CHANNELS == 0
assert len(chunk) <= CHUNK * CHANNELS
analysis = analyzer.analyze(chunk, CHANNELS)
assert isinstance(analysis, AudioAnalysis)
# ---------------------------------------------------------------------------
# Registry integration
# ---------------------------------------------------------------------------
def test_best_available_engine_is_android_when_active(on_android):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
# Act
best = AudioEngineRegistry.get_best_available_engine()
# Assert — priority 100 beats every desktop engine; demo only wins in demo mode.
assert best == "android_playback"
def test_stream_via_registry_yields_pushed_chunk(on_android):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
samples = np.linspace(-1.0, 1.0, CHUNK * CHANNELS, dtype=np.float32)
# Act
stream = AudioEngineRegistry.create_stream("android_playback", 0, True, {})
stream.initialize()
eng.push_samples(samples.tobytes())
got = stream.read_chunk()
# Assert
assert stream.channels == CHANNELS
assert stream.sample_rate == SAMPLE_RATE
assert stream.chunk_size == CHUNK
np.testing.assert_array_equal(got, samples)
def test_device_surfaces_through_capture_manager(on_android):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
# Act
devices = AudioCaptureManager.enumerate_devices()
# Assert — the Android device is enumerated and tagged with its engine.
android = [d for d in devices if d["engine_type"] == "android_playback"]
assert len(android) == 1
assert android[0]["name"] == "Android playback (system audio)"
assert android[0]["is_loopback"] is True