Merge feature/android-audio-capture: Android on-device audio capture

Push-based AndroidAudioEngine (AudioPlaybackCapture, API 29+) reusing the
MediaProjection token, feeding the unchanged AudioAnalyzer. Build green
(assembleDebug), 1854+13 tests pass. Reviewed: READY/SECURE, 0 blockers.
This commit is contained in:
2026-06-02 03:28:37 +03:00
8 changed files with 833 additions and 0 deletions
+8
View File
@@ -39,6 +39,14 @@
<!-- POST_NOTIFICATIONS for Android 13+ foreground service notification -->
<uses-permission android:name="android.permission.POST_NOTIFICATIONS" />
<!-- RECORD_AUDIO for on-device system-playback capture (AudioPlaybackCapture,
API 29+) feeding audio-reactive lighting. Runtime "dangerous" permission,
requested in MainActivity; capture degrades gracefully when denied.
Playback capture runs under the existing mediaProjection FGS type, so no
FOREGROUND_SERVICE_MICROPHONE / microphone FGS type is needed (that would
only be required if the mic-fallback path ran inside the service). -->
<uses-permission android:name="android.permission.RECORD_AUDIO" />
<!-- Autostart on boot — BootReceiver spawns CaptureService in root
mode so capture resumes without the user touching the remote. -->
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
@@ -0,0 +1,234 @@
package com.ledgrab.android
import android.annotation.SuppressLint
import android.media.AudioAttributes
import android.media.AudioFormat
import android.media.AudioPlaybackCaptureConfiguration
import android.media.AudioRecord
import android.media.MediaRecorder
import android.media.projection.MediaProjection
import android.os.Build
import android.util.Log
import java.nio.ByteBuffer
import java.nio.ByteOrder
/**
* Captures audio with [AudioRecord] and pushes interleaved float32 PCM to
* the LedGrab Python server via [PythonBridge], where the
* `android_audio_engine` feeds it into the unchanged audio-analysis
* pipeline.
*
* Two sources:
* - [start] — system playback capture via `AudioPlaybackCapture` (API 29+),
* reusing the same [MediaProjection] token the app already holds for
* screen capture. This is the primary path on the consent flow.
* - [startMic] — microphone fallback (`AudioSource.MIC`) for paths with no
* MediaProjection (root mode) or API < 29.
*
* Mirrors [ScreenCapture]'s shape: a dedicated capture thread, a single
* reusable cross-JNI buffer (no per-block allocation → no GC churn on
* low-end TV boxes), and graceful teardown in [stop].
*
* The capture format is negotiated by [AudioRecord]; the **actual**
* channel count and sample rate are read back and forwarded to
* `configureAudio` so the Python analyzer's interleaving matches the bytes
* we push (e.g. a stereo request that the device satisfies as mono).
*/
class AudioCapture(
private val projection: MediaProjection?,
private val bridge: PythonBridge,
private val sampleRate: Int = 48000,
private val channels: Int = 2,
private val chunkFrames: Int = 1024,
) {
companion object {
private const val TAG = "AudioCapture"
private const val BYTES_PER_FLOAT = 4
}
private var audioRecord: AudioRecord? = null
private var captureThread: Thread? = null
@Volatile private var running = false
/**
* Start system playback capture (API 29+). Requires the app to hold
* RECORD_AUDIO and a valid [projection]. Returns true if capture began.
*/
@SuppressLint("MissingPermission")
fun start(): Boolean {
if (running) return true
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.Q) {
Log.i(TAG, "Playback capture needs API 29+; skipping (have ${Build.VERSION.SDK_INT})")
return false
}
val proj = projection
if (proj == null) {
Log.i(TAG, "No MediaProjection; playback capture unavailable")
return false
}
val config = AudioPlaybackCaptureConfiguration.Builder(proj)
.addMatchingUsage(AudioAttributes.USAGE_MEDIA)
.addMatchingUsage(AudioAttributes.USAGE_GAME)
.addMatchingUsage(AudioAttributes.USAGE_UNKNOWN)
.build()
val record = try {
AudioRecord.Builder()
.setAudioFormat(audioFormat())
.setBufferSizeInBytes(bufferBytes())
.setAudioPlaybackCaptureConfig(config)
.build()
} catch (e: Exception) {
Log.e(TAG, "Failed to build playback AudioRecord: ${e.message}")
return false
}
return begin(record, "playback")
}
/**
* Start microphone capture (fallback). Works on API 24+ and needs no
* MediaProjection. Requires RECORD_AUDIO. Returns true if capture began.
*
* ⚠️ SECURITY/POLICY: currently UNWIRED (no caller). Microphone capture is
* a materially different posture than playback capture — it records real
* room audio (bystander voices). Before wiring this into [CaptureService]:
* - add FOREGROUND_SERVICE_MICROPHONE permission + the `microphone` FGS
* type (on API 34+ the service is killed without it), and
* - add the Play Store privacy disclosure for microphone use,
* - re-trigger a security review.
* Do NOT call this from inside the foreground service without the above.
*/
@SuppressLint("MissingPermission")
fun startMic(): Boolean {
if (running) return true
val record = try {
AudioRecord.Builder()
.setAudioSource(MediaRecorder.AudioSource.MIC)
.setAudioFormat(audioFormat())
.setBufferSizeInBytes(bufferBytes())
.build()
} catch (e: Exception) {
Log.e(TAG, "Failed to build mic AudioRecord: ${e.message}")
return false
}
return begin(record, "mic")
}
/** Stop capturing and release all resources. Idempotent. */
fun stop() {
running = false
// AudioRecord.stop() unblocks a pending READ_BLOCKING read within
// milliseconds, so the loop sees running=false and returns well inside
// the 500ms join window — release() below won't race a live read.
// (Mirrors ScreenCapture's bounded join.)
runCatching { audioRecord?.stop() }
captureThread?.let { runCatching { it.join(500) } }
captureThread = null
runCatching { audioRecord?.release() }
audioRecord = null
runCatching { bridge.shutdownAudio() }
Log.i(TAG, "Audio capture stopped")
}
// ── internals ──────────────────────────────────────────────────────
private fun begin(record: AudioRecord, mode: String): Boolean {
if (record.state != AudioRecord.STATE_INITIALIZED) {
Log.e(TAG, "AudioRecord ($mode) failed to initialize")
runCatching { record.release() }
return false
}
val actualChannels = record.channelCount.coerceAtLeast(1)
val actualRate = record.sampleRate
// Confirm recording actually started before reporting success —
// startRecording() can throw (exclusive-capture contention) or
// leave the record in a non-recording state, in which case read()
// would only ever return errors.
val started = runCatching { record.startRecording() }.isSuccess &&
record.recordingState == AudioRecord.RECORDSTATE_RECORDING
if (!started) {
Log.e(TAG, "AudioRecord ($mode) failed to start recording")
runCatching { record.release() }
return false
}
// Recording confirmed — tell Python the real negotiated format
// before frames flow, so the analyzer's channel/sample-rate match
// the interleaving we push.
bridge.configureAudio(actualRate, actualChannels, chunkFrames)
audioRecord = record
running = true
captureThread = Thread(
{ captureLoop(record, actualChannels) },
"LedGrab-AudioCapture",
).also { it.start() }
Log.i(TAG, "Audio capture started ($mode, sr=$actualRate ch=$actualChannels chunk=$chunkFrames)")
return true
}
/**
* Blocking read loop. Accumulates into fixed `chunkFrames * channels`
* float blocks and pushes only COMPLETE blocks — [AudioRecord.read]
* returns a variable count, so partial reads are stitched here rather
* than handed to Python as ragged chunks (the analyzer requires
* whole-frame, ≤ chunk-size blocks).
*/
private fun captureLoop(record: AudioRecord, actualChannels: Int) {
val blockFloats = chunkFrames * actualChannels
val floatBuf = FloatArray(blockFloats)
// Reusable little-endian byte buffer — Python copies on push, so the
// same backing array is safe to overwrite next block. Default
// ByteBuffer order is BIG_ENDIAN, which would corrupt every sample;
// LITTLE_ENDIAN matches numpy's native float32 on all Android ABIs.
val byteBuf = ByteArray(blockFloats * BYTES_PER_FLOAT)
val floatView = ByteBuffer.wrap(byteBuf).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer()
var filled = 0
while (running) {
val n = record.read(floatBuf, filled, blockFloats - filled, AudioRecord.READ_BLOCKING)
if (n < 0) {
if (running) {
// A negative read (e.g. ERROR_DEAD_OBJECT after an audio-route
// change, ERROR_INVALID_OPERATION) means this AudioRecord is
// finished. Deactivate the Python engine so is_available() stops
// advertising a dead stream and the audio-reactive consumer isn't
// left polling an empty queue forever. We're on the capture thread,
// so we can't call stop() (it would self-join) — just flip running
// and shut the engine down; onDestroy's stop() releases the record.
Log.w(TAG, "AudioRecord.read error: $n — stopping audio capture")
running = false
runCatching { bridge.shutdownAudio() }
}
break
}
filled += n
if (filled < blockFloats) continue
floatView.clear()
floatView.put(floatBuf, 0, blockFloats)
bridge.pushAudio(byteBuf)
filled = 0
}
}
private fun channelMask(): Int =
if (channels >= 2) AudioFormat.CHANNEL_IN_STEREO else AudioFormat.CHANNEL_IN_MONO
private fun audioFormat(): AudioFormat =
AudioFormat.Builder()
.setEncoding(AudioFormat.ENCODING_PCM_FLOAT)
.setSampleRate(sampleRate)
.setChannelMask(channelMask())
.build()
private fun bufferBytes(): Int {
val minBuf = AudioRecord.getMinBufferSize(sampleRate, channelMask(), AudioFormat.ENCODING_PCM_FLOAT)
// A few blocks of headroom so a slow consumer doesn't overrun the
// hardware buffer between reads.
val want = chunkFrames * channels * BYTES_PER_FLOAT * 4
return if (minBuf > 0) maxOf(minBuf, want) else want
}
}
@@ -4,9 +4,11 @@ import android.app.Notification
import android.app.NotificationChannel
import android.app.NotificationManager
import android.app.PendingIntent
import android.Manifest
import android.app.Service
import android.content.Context
import android.content.Intent
import android.content.pm.PackageManager
import android.content.pm.ServiceInfo
import android.media.projection.MediaProjection
import android.media.projection.MediaProjectionManager
@@ -85,6 +87,7 @@ class CaptureService : Service() {
private var bridge: PythonBridge? = null
private var screenCapture: ScreenCapture? = null
private var rootCapture: RootScreenrecord? = null
private var audioCapture: AudioCapture? = null
private var mediaProjection: MediaProjection? = null
// Service-scoped coroutine scope for the root-capture watchdog.
@@ -338,6 +341,25 @@ class CaptureService : Service() {
onProjectionStopped = { stopSelf() },
).also { it.start() }
// Reuse the same projection to capture system playback audio so
// audio-reactive lighting works on-device (API 29+, RECORD_AUDIO
// granted). Best-effort: screen capture and the server keep running
// if audio is unavailable. Started AFTER ScreenCapture so the
// projection's callback is already registered.
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q &&
checkSelfPermission(Manifest.permission.RECORD_AUDIO) ==
PackageManager.PERMISSION_GRANTED
) {
audioCapture = AudioCapture(projection, newBridge).also { ac ->
if (!ac.start()) {
Log.i(TAG, "Playback audio capture unavailable — continuing without audio")
audioCapture = null
}
}
} else {
Log.i(TAG, "RECORD_AUDIO not granted or API < 29 — audio-reactive capture disabled")
}
Log.i(TAG, "LedGrab service started (MediaProjection) — web UI at $url")
}
@@ -351,6 +373,10 @@ class CaptureService : Service() {
screenCapture?.stop()
screenCapture = null
// Stop audio before the server: stop() calls bridge.shutdownAudio().
audioCapture?.stop()
audioCapture = null
rootCapture?.stop()
rootCapture = null
@@ -53,6 +53,7 @@ class MainActivity : Activity() {
private const val SERVER_PORT = 8080
private const val REQUEST_MEDIA_PROJECTION = 1001
private const val REQUEST_POST_NOTIFICATIONS = 1002
private const val REQUEST_RECORD_AUDIO = 1003
private const val QR_SIZE_PX = 560
}
@@ -215,6 +216,7 @@ class MainActivity : Activity() {
private fun startCaptureService(resultCode: Int, resultData: Intent) {
ensureNotificationPermission()
ensureAudioPermission()
val intent = CaptureService.createIntent(this, resultCode, resultData)
ContextCompat.startForegroundService(this, intent)
updateUI()
@@ -471,4 +473,24 @@ class MainActivity : Activity() {
}
}
}
/**
* Request RECORD_AUDIO (API 29+) so the capture service can capture
* system playback audio for audio-reactive lighting. Fire-and-forget,
* like [ensureNotificationPermission]: capture still works without it
* (just no audio), so we don't block on the result. If first granted
* here, audio becomes available on the next Start.
*/
private fun ensureAudioPermission() {
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.Q) return
if (checkSelfPermission(Manifest.permission.RECORD_AUDIO)
!= PackageManager.PERMISSION_GRANTED
) {
@Suppress("DEPRECATION")
requestPermissions(
arrayOf(Manifest.permission.RECORD_AUDIO),
REQUEST_RECORD_AUDIO,
)
}
}
}
@@ -28,6 +28,7 @@ class PythonBridge(private val context: Context) {
// single-writer/single-reader pattern we have here.
@Volatile private var mediaProjectionEngine: PyObject? = null
@Volatile private var rootEngine: PyObject? = null
@Volatile private var androidAudioEngine: PyObject? = null
/**
* Configure the MediaProjection engine with screen dimensions.
@@ -53,6 +54,49 @@ class PythonBridge(private val context: Context) {
Log.i(TAG, "Root screenrecord engine configured: ${width}x${height}")
}
/**
* Configure the Android playback-capture audio engine with the format
* actually negotiated by [AudioCapture]'s `AudioRecord`. Must be called
* before [pushAudio]. Caches the module handle for the per-block fast
* path (same pattern as [configureCapture]).
*/
fun configureAudio(sampleRate: Int, channels: Int, chunkFrames: Int) {
val py = Python.getInstance()
val engine = py.getModule("ledgrab.core.audio.android_audio_engine")
engine.callAttr("configure", sampleRate, channels, chunkFrames)
androidAudioEngine = engine
Log.i(TAG, "Android audio engine configured: sr=$sampleRate ch=$channels chunk=$chunkFrames")
}
/**
* Push one interleaved little-endian float32 PCM block to the Python
* audio engine. Called from [AudioCapture]'s capture thread. The byte
* array crosses the JNI boundary; Python copies it on receipt, so the
* caller may reuse the same buffer for the next block.
*/
fun pushAudio(pcmFloat32: ByteArray) {
if (!running) return
val engine = androidAudioEngine ?: return
try {
engine.callAttr("push_samples", pcmFloat32)
} catch (e: Exception) {
Log.w(TAG, "Failed to push audio: ${e.message}")
}
}
/**
* Deactivate the Python audio engine. Called from [AudioCapture.stop].
*/
fun shutdownAudio() {
val engine = androidAudioEngine ?: return
try {
engine.callAttr("shutdown")
} catch (e: Exception) {
Log.w(TAG, "Failed to shut down audio engine: ${e.message}")
}
androidAudioEngine = null
}
/**
* Start the LedGrab FastAPI server on a background thread.
*
+17
View File
@@ -38,6 +38,19 @@ try:
except ImportError:
_has_sounddevice = False
# Android playback-capture engine — pure Python (numpy only), but the
# guard keeps the registration pattern uniform and tolerant of any future
# import-time dependency.
try:
from ledgrab.core.audio.android_audio_engine import (
AndroidAudioEngine,
AndroidAudioCaptureStream,
)
_has_android_audio = True
except ImportError:
_has_android_audio = False
from ledgrab.core.audio.demo_engine import DemoAudioEngine, DemoAudioCaptureStream
# Auto-register available engines
@@ -45,6 +58,8 @@ if _has_wasapi:
AudioEngineRegistry.register(WasapiEngine)
if _has_sounddevice:
AudioEngineRegistry.register(SounddeviceEngine)
if _has_android_audio:
AudioEngineRegistry.register(AndroidAudioEngine)
AudioEngineRegistry.register(DemoAudioEngine)
__all__ = [
@@ -65,3 +80,5 @@ if _has_wasapi:
__all__ += ["WasapiEngine", "WasapiCaptureStream"]
if _has_sounddevice:
__all__ += ["SounddeviceEngine", "SounddeviceCaptureStream"]
if _has_android_audio:
__all__ += ["AndroidAudioEngine", "AndroidAudioCaptureStream"]
@@ -0,0 +1,229 @@
"""Android playback-capture audio engine.
Receives PCM pushed from Kotlin (via Chaquopy) through a module-level
sample queue. The Kotlin layer captures system playback audio with
``AudioRecord`` + ``AudioPlaybackCaptureConfiguration`` (reusing the
app's ``MediaProjection`` token) and calls :func:`push_samples` with
interleaved float32 PCM for each fixed-size block.
Mirrors the screen-capture bridge
(``core/capture_engines/mediaprojection_engine.py``): a module-level
queue plus ``configure`` / ``push_samples`` / ``shutdown`` filled by
Kotlin, consumed through the standard :class:`AudioCaptureStreamBase`
interface so :class:`~ledgrab.core.audio.audio_capture.ManagedAudioStream`
and :class:`~ledgrab.core.audio.analysis.AudioAnalyzer` work unchanged.
This engine is only available when running inside the LedGrab Android
app, which has set up the sample queue via :func:`configure`.
"""
import queue
from typing import Any, Dict, List
import numpy as np
from ledgrab.core.audio.base import (
AudioCaptureEngine,
AudioCaptureStreamBase,
AudioDeviceInfo,
)
from ledgrab.utils import get_logger
from ledgrab.utils.platform import is_android
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Sample queue — the bridge between Kotlin and Python
# ---------------------------------------------------------------------------
_pcm_queue: "queue.Queue[np.ndarray]" = queue.Queue(maxsize=8)
_sample_rate = 48000
_channels = 2
_chunk_size = 1024
_active = False
_frames_received = 0
def configure(sample_rate: int, channels: int, chunk_size: int) -> None:
"""Set the stream format. Called from Kotlin before frames flow.
Drains any stale PCM from a previous capture session so the first
chunk after a restart is actually current. ``channels`` /
``sample_rate`` should be the values the Kotlin ``AudioRecord``
actually negotiated (which can differ from the requested values,
e.g. a stereo request that falls back to mono) — the analyzer keys
off these, so they must match the interleaving of pushed samples.
"""
global _sample_rate, _channels, _chunk_size, _active, _frames_received
while not _pcm_queue.empty():
try:
_pcm_queue.get_nowait()
except queue.Empty:
break
_sample_rate = sample_rate
_channels = max(1, channels)
_chunk_size = max(1, chunk_size)
_frames_received = 0
_active = True
logger.info(
"Android audio engine configured: sr=%d channels=%d chunk=%d",
_sample_rate,
_channels,
_chunk_size,
)
def push_samples(pcm_float32: bytes) -> None:
"""Push one interleaved float32 PCM block from Kotlin.
The byte buffer is interpreted as native-endian float32 (Kotlin
packs little-endian; all Android ABIs are little-endian). Drops the
oldest queued block if the consumer is slow (non-blocking).
Defensive framing: the downstream :class:`AudioAnalyzer` reshapes to
``(-1, channels)`` and copies into ``chunk_size``-sized scratch
buffers, so it raises on a block whose length is not a whole number
of frames or that exceeds ``chunk_size`` frames. We trim to a whole
multiple of ``_channels`` and clamp to ``_chunk_size`` frames so a
malformed push can never crash the capture thread.
"""
global _frames_received
# np.frombuffer raises if the length isn't a whole number of float32s.
# Kotlin always pushes complete blocks, but guard so a malformed buffer is
# dropped here rather than surfacing as an exception across the JNI bridge.
if len(pcm_float32) % 4 != 0:
return
samples = np.frombuffer(pcm_float32, dtype=np.float32)
# Trim to whole frames, then clamp to chunk_size frames.
frames = len(samples) // _channels
if frames <= 0:
return
frames = min(frames, _chunk_size)
usable = frames * _channels
# Copy out of the read-only frombuffer view so the queued block owns its
# memory. This lets the Kotlin side push from a reusable buffer (low GC on
# low-end TV boxes) without the not-yet-consumed queued block aliasing
# bytes Kotlin is about to overwrite. Mirrors mediaprojection_engine's
# push_frame .copy().
block = samples[:usable].copy()
_frames_received += 1
if _frames_received == 1 or _frames_received % 100 == 0:
logger.info("Android audio: received %d blocks", _frames_received)
try:
_pcm_queue.put_nowait(block)
except queue.Full:
try:
_pcm_queue.get_nowait()
except queue.Empty:
pass
try:
_pcm_queue.put_nowait(block)
except queue.Full:
pass
def shutdown() -> None:
"""Deactivate the engine. Called when the Android app stops audio."""
global _active
_active = False
logger.info("Android audio engine shut down")
# ---------------------------------------------------------------------------
# CaptureStream
# ---------------------------------------------------------------------------
class AndroidAudioCaptureStream(AudioCaptureStreamBase):
"""Reads PCM blocks pushed by Kotlin from the module-level queue."""
@property
def channels(self) -> int:
return _channels
@property
def sample_rate(self) -> int:
return _sample_rate
@property
def chunk_size(self) -> int:
return _chunk_size
def initialize(self) -> None:
if self._initialized:
return
if not _active:
raise RuntimeError(
"Android audio engine not configured. "
"This engine is only available inside the Android app."
)
self._initialized = True
logger.info("Android audio capture stream initialized")
def cleanup(self) -> None:
self._initialized = False
logger.info("Android audio capture stream cleaned up")
def read_chunk(self) -> np.ndarray | None:
try:
return _pcm_queue.get(timeout=0.1) # 1-D float32 interleaved
except queue.Empty:
return None
# ---------------------------------------------------------------------------
# CaptureEngine
# ---------------------------------------------------------------------------
class AndroidAudioEngine(AudioCaptureEngine):
"""Android playback-capture audio engine.
Only available when running inside the LedGrab Android app, which
calls :func:`configure` once audio capture is set up. Exposes a
single loopback "device" representing the system audio mix.
"""
ENGINE_TYPE = "android_playback"
ENGINE_PRIORITY = 100 # highest on a real Android device (demo only wins in demo mode)
@classmethod
def is_available(cls) -> bool:
return is_android() and _active
@classmethod
def get_default_config(cls) -> Dict[str, Any]:
return {
"sample_rate": _sample_rate,
"channels": _channels,
"chunk_size": _chunk_size,
}
@classmethod
def enumerate_devices(cls) -> List[AudioDeviceInfo]:
if not cls.is_available():
return []
return [
AudioDeviceInfo(
index=0,
name="Android playback (system audio)",
is_input=True,
is_loopback=True,
channels=_channels,
default_samplerate=float(_sample_rate),
)
]
@classmethod
def create_stream(
cls,
device_index: int,
is_loopback: bool,
config: Dict[str, Any],
) -> AndroidAudioCaptureStream:
merged = {**cls.get_default_config(), **config}
return AndroidAudioCaptureStream(device_index, is_loopback, merged)
@@ -0,0 +1,253 @@
"""Tests for the Android playback-capture audio engine.
These run on desktop CI (no Android device needed): ``is_android`` is
monkeypatched and PCM is pushed directly into the module-level queue,
exactly as the Kotlin bridge would.
"""
import queue
import numpy as np
import pytest
# Importing the package triggers auto-registration of AndroidAudioEngine.
import ledgrab.core.audio # noqa: F401
from ledgrab.core.audio import android_audio_engine as eng
from ledgrab.core.audio.analysis import AudioAnalysis, AudioAnalyzer
from ledgrab.core.audio.audio_capture import AudioCaptureManager
from ledgrab.core.audio.factory import AudioEngineRegistry
ENGINE_MOD = "ledgrab.core.audio.android_audio_engine"
SAMPLE_RATE = 48000
CHANNELS = 2
CHUNK = 1024
# ---------------------------------------------------------------------------
# Helpers / fixtures
# ---------------------------------------------------------------------------
def _drain() -> None:
while not eng._pcm_queue.empty():
try:
eng._pcm_queue.get_nowait()
except queue.Empty:
break
def _block(marker: float = 0.0, frames: int = CHUNK, channels: int = CHANNELS) -> np.ndarray:
"""A float32 interleaved block whose first sample is ``marker``."""
data = np.zeros(frames * channels, dtype=np.float32)
data[0] = marker
return data
@pytest.fixture
def reset_engine():
"""Reset module-global engine state; snapshot/restore the registry.
The engine keeps its queue + format in module globals and the registry
is a class-level singleton — both must be restored so this test file
never disturbs the desktop engines other tests rely on.
"""
saved_engines = dict(AudioEngineRegistry._engines)
eng.shutdown()
_drain()
eng._sample_rate = SAMPLE_RATE
eng._channels = CHANNELS
eng._chunk_size = CHUNK
eng._frames_received = 0
yield eng
eng.shutdown()
_drain()
AudioEngineRegistry._engines.clear()
AudioEngineRegistry._engines.update(saved_engines)
@pytest.fixture
def on_android(monkeypatch, reset_engine):
"""Engine fixture with ``is_android`` forced True and demo mode off."""
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
monkeypatch.setattr("ledgrab.core.audio.factory.is_demo_mode", lambda: False)
return reset_engine
# ---------------------------------------------------------------------------
# Queue / push contract
# ---------------------------------------------------------------------------
def test_configure_then_push_round_trips_samples(reset_engine):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
samples = np.arange(CHUNK * CHANNELS, dtype=np.float32)
# Act
eng.push_samples(samples.tobytes())
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
stream.initialize()
got = stream.read_chunk()
# Assert
assert got is not None
np.testing.assert_array_equal(got, samples)
def test_queue_drops_oldest_when_full(reset_engine):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
maxsize = eng._pcm_queue.maxsize # 8
# Act — push more blocks than the queue can hold, each tagged 0..N-1
total = maxsize + 2
for i in range(total):
eng.push_samples(_block(marker=float(i)).tobytes())
drained = []
while True:
try:
drained.append(eng._pcm_queue.get_nowait())
except queue.Empty:
break
# Assert — only the newest `maxsize` blocks survived, oldest dropped
assert len(drained) == maxsize
markers = [int(b[0]) for b in drained]
assert markers == list(range(total - maxsize, total))
def test_initialize_raises_when_not_configured(reset_engine):
# Arrange — fixture left the engine inactive
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
# Act / Assert
with pytest.raises(RuntimeError):
stream.initialize()
def test_read_chunk_returns_none_when_empty(reset_engine):
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
stream.initialize()
assert stream.read_chunk() is None
# ---------------------------------------------------------------------------
# Availability / enumeration (platform-gated)
# ---------------------------------------------------------------------------
def test_is_available_requires_android_and_active(monkeypatch, reset_engine):
# Not configured yet → inactive → unavailable even on Android.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
assert eng.AndroidAudioEngine.is_available() is False
# Configured → active + Android → available.
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
assert eng.AndroidAudioEngine.is_available() is True
# Active but not on Android → unavailable.
monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
assert eng.AndroidAudioEngine.is_available() is False
def test_enumerate_devices(on_android):
# Inactive → no devices.
assert eng.AndroidAudioEngine.enumerate_devices() == []
# Active → exactly one loopback device.
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
devices = eng.AndroidAudioEngine.enumerate_devices()
assert len(devices) == 1
dev = devices[0]
assert dev.is_loopback is True
assert dev.is_input is True
assert "Android playback" in dev.name
assert dev.channels == CHANNELS
# ---------------------------------------------------------------------------
# Regression guard — the analyzer must never crash on a malformed block
# (over-length or non-frame-divisible). This is the on-device failure the
# plan review surfaced; the desktop suite must catch it.
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw_floats",
[
(CHUNK + 100) * CHANNELS, # over-length (more frames than chunk_size)
CHUNK * CHANNELS + 1, # not a whole number of stereo frames
3, # tiny + odd
CHUNK * CHANNELS, # exact (control)
],
)
def test_pushed_block_never_crashes_analyzer(reset_engine, raw_floats):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
pcm = np.random.default_rng(0).standard_normal(raw_floats).astype(np.float32)
analyzer = AudioAnalyzer(sample_rate=SAMPLE_RATE, chunk_size=CHUNK)
stream = eng.AndroidAudioEngine.create_stream(0, True, {})
stream.initialize()
# Act
eng.push_samples(pcm.tobytes())
chunk = stream.read_chunk()
# Assert — chunk is a safe shape and analyze() does not raise.
assert chunk is not None
assert len(chunk) % CHANNELS == 0
assert len(chunk) <= CHUNK * CHANNELS
analysis = analyzer.analyze(chunk, CHANNELS)
assert isinstance(analysis, AudioAnalysis)
# ---------------------------------------------------------------------------
# Registry integration
# ---------------------------------------------------------------------------
def test_best_available_engine_is_android_when_active(on_android):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
# Act
best = AudioEngineRegistry.get_best_available_engine()
# Assert — priority 100 beats every desktop engine; demo only wins in demo mode.
assert best == "android_playback"
def test_stream_via_registry_yields_pushed_chunk(on_android):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
samples = np.linspace(-1.0, 1.0, CHUNK * CHANNELS, dtype=np.float32)
# Act
stream = AudioEngineRegistry.create_stream("android_playback", 0, True, {})
stream.initialize()
eng.push_samples(samples.tobytes())
got = stream.read_chunk()
# Assert
assert stream.channels == CHANNELS
assert stream.sample_rate == SAMPLE_RATE
assert stream.chunk_size == CHUNK
np.testing.assert_array_equal(got, samples)
def test_device_surfaces_through_capture_manager(on_android):
# Arrange
eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
# Act
devices = AudioCaptureManager.enumerate_devices()
# Assert — the Android device is enumerated and tagged with its engine.
android = [d for d in devices if d["engine_type"] == "android_playback"]
assert len(android) == 1
assert android[0]["name"] == "Android playback (system audio)"
assert android[0]["is_loopback"] is True