diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml
index 0adf48c..bb74f08 100644
--- a/android/app/src/main/AndroidManifest.xml
+++ b/android/app/src/main/AndroidManifest.xml
@@ -39,6 +39,14 @@
+
+
+
diff --git a/android/app/src/main/java/com/ledgrab/android/AudioCapture.kt b/android/app/src/main/java/com/ledgrab/android/AudioCapture.kt
new file mode 100644
index 0000000..6911257
--- /dev/null
+++ b/android/app/src/main/java/com/ledgrab/android/AudioCapture.kt
@@ -0,0 +1,234 @@
+package com.ledgrab.android
+
+import android.annotation.SuppressLint
+import android.media.AudioAttributes
+import android.media.AudioFormat
+import android.media.AudioPlaybackCaptureConfiguration
+import android.media.AudioRecord
+import android.media.MediaRecorder
+import android.media.projection.MediaProjection
+import android.os.Build
+import android.util.Log
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+
+/**
+ * Captures audio with [AudioRecord] and pushes interleaved float32 PCM to
+ * the LedGrab Python server via [PythonBridge], where the
+ * `android_audio_engine` feeds it into the unchanged audio-analysis
+ * pipeline.
+ *
+ * Two sources:
+ * - [start] — system playback capture via `AudioPlaybackCapture` (API 29+),
+ * reusing the same [MediaProjection] token the app already holds for
+ * screen capture. This is the primary path on the consent flow.
+ * - [startMic] — microphone fallback (`AudioSource.MIC`) for paths with no
+ * MediaProjection (root mode) or API < 29.
+ *
+ * Mirrors [ScreenCapture]'s shape: a dedicated capture thread, a single
+ * reusable cross-JNI buffer (no per-block allocation → no GC churn on
+ * low-end TV boxes), and graceful teardown in [stop].
+ *
+ * The capture format is negotiated by [AudioRecord]; the **actual**
+ * channel count and sample rate are read back and forwarded to
+ * `configureAudio` so the Python analyzer's interleaving matches the bytes
+ * we push (e.g. a stereo request that the device satisfies as mono).
+ */
+class AudioCapture(
+ private val projection: MediaProjection?,
+ private val bridge: PythonBridge,
+ private val sampleRate: Int = 48000,
+ private val channels: Int = 2,
+ private val chunkFrames: Int = 1024,
+) {
+ companion object {
+ private const val TAG = "AudioCapture"
+ private const val BYTES_PER_FLOAT = 4
+ }
+
+ private var audioRecord: AudioRecord? = null
+ private var captureThread: Thread? = null
+ @Volatile private var running = false
+
+ /**
+ * Start system playback capture (API 29+). Requires the app to hold
+ * RECORD_AUDIO and a valid [projection]. Returns true if capture began.
+ */
+ @SuppressLint("MissingPermission")
+ fun start(): Boolean {
+ if (running) return true
+ if (Build.VERSION.SDK_INT < Build.VERSION_CODES.Q) {
+ Log.i(TAG, "Playback capture needs API 29+; skipping (have ${Build.VERSION.SDK_INT})")
+ return false
+ }
+ val proj = projection
+ if (proj == null) {
+ Log.i(TAG, "No MediaProjection; playback capture unavailable")
+ return false
+ }
+
+ val config = AudioPlaybackCaptureConfiguration.Builder(proj)
+ .addMatchingUsage(AudioAttributes.USAGE_MEDIA)
+ .addMatchingUsage(AudioAttributes.USAGE_GAME)
+ .addMatchingUsage(AudioAttributes.USAGE_UNKNOWN)
+ .build()
+
+ val record = try {
+ AudioRecord.Builder()
+ .setAudioFormat(audioFormat())
+ .setBufferSizeInBytes(bufferBytes())
+ .setAudioPlaybackCaptureConfig(config)
+ .build()
+ } catch (e: Exception) {
+ Log.e(TAG, "Failed to build playback AudioRecord: ${e.message}")
+ return false
+ }
+ return begin(record, "playback")
+ }
+
+ /**
+ * Start microphone capture (fallback). Works on API 24+ and needs no
+ * MediaProjection. Requires RECORD_AUDIO. Returns true if capture began.
+ *
+ * ⚠️ SECURITY/POLICY: currently UNWIRED (no caller). Microphone capture is
+ * a materially different posture than playback capture — it records real
+ * room audio (bystander voices). Before wiring this into [CaptureService]:
+ * - add FOREGROUND_SERVICE_MICROPHONE permission + the `microphone` FGS
+ * type (on API 34+ the service is killed without it), and
+ * - add the Play Store privacy disclosure for microphone use,
+ * - re-trigger a security review.
+ * Do NOT call this from inside the foreground service without the above.
+ */
+ @SuppressLint("MissingPermission")
+ fun startMic(): Boolean {
+ if (running) return true
+ val record = try {
+ AudioRecord.Builder()
+ .setAudioSource(MediaRecorder.AudioSource.MIC)
+ .setAudioFormat(audioFormat())
+ .setBufferSizeInBytes(bufferBytes())
+ .build()
+ } catch (e: Exception) {
+ Log.e(TAG, "Failed to build mic AudioRecord: ${e.message}")
+ return false
+ }
+ return begin(record, "mic")
+ }
+
+ /** Stop capturing and release all resources. Idempotent. */
+ fun stop() {
+ running = false
+ // AudioRecord.stop() unblocks a pending READ_BLOCKING read within
+ // milliseconds, so the loop sees running=false and returns well inside
+ // the 500ms join window — release() below won't race a live read.
+ // (Mirrors ScreenCapture's bounded join.)
+ runCatching { audioRecord?.stop() }
+ captureThread?.let { runCatching { it.join(500) } }
+ captureThread = null
+ runCatching { audioRecord?.release() }
+ audioRecord = null
+ runCatching { bridge.shutdownAudio() }
+ Log.i(TAG, "Audio capture stopped")
+ }
+
+ // ── internals ──────────────────────────────────────────────────────
+
+ private fun begin(record: AudioRecord, mode: String): Boolean {
+ if (record.state != AudioRecord.STATE_INITIALIZED) {
+ Log.e(TAG, "AudioRecord ($mode) failed to initialize")
+ runCatching { record.release() }
+ return false
+ }
+ val actualChannels = record.channelCount.coerceAtLeast(1)
+ val actualRate = record.sampleRate
+
+ // Confirm recording actually started before reporting success —
+ // startRecording() can throw (exclusive-capture contention) or
+ // leave the record in a non-recording state, in which case read()
+ // would only ever return errors.
+ val started = runCatching { record.startRecording() }.isSuccess &&
+ record.recordingState == AudioRecord.RECORDSTATE_RECORDING
+ if (!started) {
+ Log.e(TAG, "AudioRecord ($mode) failed to start recording")
+ runCatching { record.release() }
+ return false
+ }
+
+ // Recording confirmed — tell Python the real negotiated format
+ // before frames flow, so the analyzer's channel/sample-rate match
+ // the interleaving we push.
+ bridge.configureAudio(actualRate, actualChannels, chunkFrames)
+
+ audioRecord = record
+ running = true
+ captureThread = Thread(
+ { captureLoop(record, actualChannels) },
+ "LedGrab-AudioCapture",
+ ).also { it.start() }
+ Log.i(TAG, "Audio capture started ($mode, sr=$actualRate ch=$actualChannels chunk=$chunkFrames)")
+ return true
+ }
+
+ /**
+ * Blocking read loop. Accumulates into fixed `chunkFrames * channels`
+ * float blocks and pushes only COMPLETE blocks — [AudioRecord.read]
+ * returns a variable count, so partial reads are stitched here rather
+ * than handed to Python as ragged chunks (the analyzer requires
+ * whole-frame, ≤ chunk-size blocks).
+ */
+ private fun captureLoop(record: AudioRecord, actualChannels: Int) {
+ val blockFloats = chunkFrames * actualChannels
+ val floatBuf = FloatArray(blockFloats)
+ // Reusable little-endian byte buffer — Python copies on push, so the
+ // same backing array is safe to overwrite next block. Default
+ // ByteBuffer order is BIG_ENDIAN, which would corrupt every sample;
+ // LITTLE_ENDIAN matches numpy's native float32 on all Android ABIs.
+ val byteBuf = ByteArray(blockFloats * BYTES_PER_FLOAT)
+ val floatView = ByteBuffer.wrap(byteBuf).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer()
+
+ var filled = 0
+ while (running) {
+ val n = record.read(floatBuf, filled, blockFloats - filled, AudioRecord.READ_BLOCKING)
+ if (n < 0) {
+ if (running) {
+ // A negative read (e.g. ERROR_DEAD_OBJECT after an audio-route
+ // change, ERROR_INVALID_OPERATION) means this AudioRecord is
+ // finished. Deactivate the Python engine so is_available() stops
+ // advertising a dead stream and the audio-reactive consumer isn't
+ // left polling an empty queue forever. We're on the capture thread,
+ // so we can't call stop() (it would self-join) — just flip running
+ // and shut the engine down; onDestroy's stop() releases the record.
+ Log.w(TAG, "AudioRecord.read error: $n — stopping audio capture")
+ running = false
+ runCatching { bridge.shutdownAudio() }
+ }
+ break
+ }
+ filled += n
+ if (filled < blockFloats) continue
+
+ floatView.clear()
+ floatView.put(floatBuf, 0, blockFloats)
+ bridge.pushAudio(byteBuf)
+ filled = 0
+ }
+ }
+
+ private fun channelMask(): Int =
+ if (channels >= 2) AudioFormat.CHANNEL_IN_STEREO else AudioFormat.CHANNEL_IN_MONO
+
+ private fun audioFormat(): AudioFormat =
+ AudioFormat.Builder()
+ .setEncoding(AudioFormat.ENCODING_PCM_FLOAT)
+ .setSampleRate(sampleRate)
+ .setChannelMask(channelMask())
+ .build()
+
+ private fun bufferBytes(): Int {
+ val minBuf = AudioRecord.getMinBufferSize(sampleRate, channelMask(), AudioFormat.ENCODING_PCM_FLOAT)
+ // A few blocks of headroom so a slow consumer doesn't overrun the
+ // hardware buffer between reads.
+ val want = chunkFrames * channels * BYTES_PER_FLOAT * 4
+ return if (minBuf > 0) maxOf(minBuf, want) else want
+ }
+}
diff --git a/android/app/src/main/java/com/ledgrab/android/CaptureService.kt b/android/app/src/main/java/com/ledgrab/android/CaptureService.kt
index 079c586..0eb14e3 100644
--- a/android/app/src/main/java/com/ledgrab/android/CaptureService.kt
+++ b/android/app/src/main/java/com/ledgrab/android/CaptureService.kt
@@ -4,9 +4,11 @@ import android.app.Notification
import android.app.NotificationChannel
import android.app.NotificationManager
import android.app.PendingIntent
+import android.Manifest
import android.app.Service
import android.content.Context
import android.content.Intent
+import android.content.pm.PackageManager
import android.content.pm.ServiceInfo
import android.media.projection.MediaProjection
import android.media.projection.MediaProjectionManager
@@ -85,6 +87,7 @@ class CaptureService : Service() {
private var bridge: PythonBridge? = null
private var screenCapture: ScreenCapture? = null
private var rootCapture: RootScreenrecord? = null
+ private var audioCapture: AudioCapture? = null
private var mediaProjection: MediaProjection? = null
// Service-scoped coroutine scope for the root-capture watchdog.
@@ -338,6 +341,25 @@ class CaptureService : Service() {
onProjectionStopped = { stopSelf() },
).also { it.start() }
+ // Reuse the same projection to capture system playback audio so
+ // audio-reactive lighting works on-device (API 29+, RECORD_AUDIO
+ // granted). Best-effort: screen capture and the server keep running
+ // if audio is unavailable. Started AFTER ScreenCapture so the
+ // projection's callback is already registered.
+ if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q &&
+ checkSelfPermission(Manifest.permission.RECORD_AUDIO) ==
+ PackageManager.PERMISSION_GRANTED
+ ) {
+ audioCapture = AudioCapture(projection, newBridge).also { ac ->
+ if (!ac.start()) {
+ Log.i(TAG, "Playback audio capture unavailable — continuing without audio")
+ audioCapture = null
+ }
+ }
+ } else {
+ Log.i(TAG, "RECORD_AUDIO not granted or API < 29 — audio-reactive capture disabled")
+ }
+
Log.i(TAG, "LedGrab service started (MediaProjection) — web UI at $url")
}
@@ -351,6 +373,10 @@ class CaptureService : Service() {
screenCapture?.stop()
screenCapture = null
+ // Stop audio before the server: stop() calls bridge.shutdownAudio().
+ audioCapture?.stop()
+ audioCapture = null
+
rootCapture?.stop()
rootCapture = null
diff --git a/android/app/src/main/java/com/ledgrab/android/MainActivity.kt b/android/app/src/main/java/com/ledgrab/android/MainActivity.kt
index c9a92f2..8d47a99 100644
--- a/android/app/src/main/java/com/ledgrab/android/MainActivity.kt
+++ b/android/app/src/main/java/com/ledgrab/android/MainActivity.kt
@@ -53,6 +53,7 @@ class MainActivity : Activity() {
private const val SERVER_PORT = 8080
private const val REQUEST_MEDIA_PROJECTION = 1001
private const val REQUEST_POST_NOTIFICATIONS = 1002
+ private const val REQUEST_RECORD_AUDIO = 1003
private const val QR_SIZE_PX = 560
}
@@ -215,6 +216,7 @@ class MainActivity : Activity() {
private fun startCaptureService(resultCode: Int, resultData: Intent) {
ensureNotificationPermission()
+ ensureAudioPermission()
val intent = CaptureService.createIntent(this, resultCode, resultData)
ContextCompat.startForegroundService(this, intent)
updateUI()
@@ -471,4 +473,24 @@ class MainActivity : Activity() {
}
}
}
+
+ /**
+ * Request RECORD_AUDIO (API 29+) so the capture service can capture
+ * system playback audio for audio-reactive lighting. Fire-and-forget,
+ * like [ensureNotificationPermission]: capture still works without it
+ * (just no audio), so we don't block on the result. If first granted
+ * here, audio becomes available on the next Start.
+ */
+ private fun ensureAudioPermission() {
+ if (Build.VERSION.SDK_INT < Build.VERSION_CODES.Q) return
+ if (checkSelfPermission(Manifest.permission.RECORD_AUDIO)
+ != PackageManager.PERMISSION_GRANTED
+ ) {
+ @Suppress("DEPRECATION")
+ requestPermissions(
+ arrayOf(Manifest.permission.RECORD_AUDIO),
+ REQUEST_RECORD_AUDIO,
+ )
+ }
+ }
}
diff --git a/android/app/src/main/java/com/ledgrab/android/PythonBridge.kt b/android/app/src/main/java/com/ledgrab/android/PythonBridge.kt
index 8843b6e..618cade 100644
--- a/android/app/src/main/java/com/ledgrab/android/PythonBridge.kt
+++ b/android/app/src/main/java/com/ledgrab/android/PythonBridge.kt
@@ -28,6 +28,7 @@ class PythonBridge(private val context: Context) {
// single-writer/single-reader pattern we have here.
@Volatile private var mediaProjectionEngine: PyObject? = null
@Volatile private var rootEngine: PyObject? = null
+ @Volatile private var androidAudioEngine: PyObject? = null
/**
* Configure the MediaProjection engine with screen dimensions.
@@ -53,6 +54,49 @@ class PythonBridge(private val context: Context) {
Log.i(TAG, "Root screenrecord engine configured: ${width}x${height}")
}
+ /**
+ * Configure the Android playback-capture audio engine with the format
+ * actually negotiated by [AudioCapture]'s `AudioRecord`. Must be called
+ * before [pushAudio]. Caches the module handle for the per-block fast
+ * path (same pattern as [configureCapture]).
+ */
+ fun configureAudio(sampleRate: Int, channels: Int, chunkFrames: Int) {
+ val py = Python.getInstance()
+ val engine = py.getModule("ledgrab.core.audio.android_audio_engine")
+ engine.callAttr("configure", sampleRate, channels, chunkFrames)
+ androidAudioEngine = engine
+ Log.i(TAG, "Android audio engine configured: sr=$sampleRate ch=$channels chunk=$chunkFrames")
+ }
+
+ /**
+ * Push one interleaved little-endian float32 PCM block to the Python
+ * audio engine. Called from [AudioCapture]'s capture thread. The byte
+ * array crosses the JNI boundary; Python copies it on receipt, so the
+ * caller may reuse the same buffer for the next block.
+ */
+ fun pushAudio(pcmFloat32: ByteArray) {
+ if (!running) return
+ val engine = androidAudioEngine ?: return
+ try {
+ engine.callAttr("push_samples", pcmFloat32)
+ } catch (e: Exception) {
+ Log.w(TAG, "Failed to push audio: ${e.message}")
+ }
+ }
+
+ /**
+ * Deactivate the Python audio engine. Called from [AudioCapture.stop].
+ */
+ fun shutdownAudio() {
+ val engine = androidAudioEngine ?: return
+ try {
+ engine.callAttr("shutdown")
+ } catch (e: Exception) {
+ Log.w(TAG, "Failed to shut down audio engine: ${e.message}")
+ }
+ androidAudioEngine = null
+ }
+
/**
* Start the LedGrab FastAPI server on a background thread.
*
diff --git a/server/src/ledgrab/core/audio/__init__.py b/server/src/ledgrab/core/audio/__init__.py
index b9f059e..20f4475 100644
--- a/server/src/ledgrab/core/audio/__init__.py
+++ b/server/src/ledgrab/core/audio/__init__.py
@@ -38,6 +38,19 @@ try:
except ImportError:
_has_sounddevice = False
+# Android playback-capture engine — pure Python (numpy only), but the
+# guard keeps the registration pattern uniform and tolerant of any future
+# import-time dependency.
+try:
+ from ledgrab.core.audio.android_audio_engine import (
+ AndroidAudioEngine,
+ AndroidAudioCaptureStream,
+ )
+
+ _has_android_audio = True
+except ImportError:
+ _has_android_audio = False
+
from ledgrab.core.audio.demo_engine import DemoAudioEngine, DemoAudioCaptureStream
# Auto-register available engines
@@ -45,6 +58,8 @@ if _has_wasapi:
AudioEngineRegistry.register(WasapiEngine)
if _has_sounddevice:
AudioEngineRegistry.register(SounddeviceEngine)
+if _has_android_audio:
+ AudioEngineRegistry.register(AndroidAudioEngine)
AudioEngineRegistry.register(DemoAudioEngine)
__all__ = [
@@ -65,3 +80,5 @@ if _has_wasapi:
__all__ += ["WasapiEngine", "WasapiCaptureStream"]
if _has_sounddevice:
__all__ += ["SounddeviceEngine", "SounddeviceCaptureStream"]
+if _has_android_audio:
+ __all__ += ["AndroidAudioEngine", "AndroidAudioCaptureStream"]
diff --git a/server/src/ledgrab/core/audio/android_audio_engine.py b/server/src/ledgrab/core/audio/android_audio_engine.py
new file mode 100644
index 0000000..c617013
--- /dev/null
+++ b/server/src/ledgrab/core/audio/android_audio_engine.py
@@ -0,0 +1,229 @@
+"""Android playback-capture audio engine.
+
+Receives PCM pushed from Kotlin (via Chaquopy) through a module-level
+sample queue. The Kotlin layer captures system playback audio with
+``AudioRecord`` + ``AudioPlaybackCaptureConfiguration`` (reusing the
+app's ``MediaProjection`` token) and calls :func:`push_samples` with
+interleaved float32 PCM for each fixed-size block.
+
+Mirrors the screen-capture bridge
+(``core/capture_engines/mediaprojection_engine.py``): a module-level
+queue plus ``configure`` / ``push_samples`` / ``shutdown`` filled by
+Kotlin, consumed through the standard :class:`AudioCaptureStreamBase`
+interface so :class:`~ledgrab.core.audio.audio_capture.ManagedAudioStream`
+and :class:`~ledgrab.core.audio.analysis.AudioAnalyzer` work unchanged.
+
+This engine is only available when running inside the LedGrab Android
+app, which has set up the sample queue via :func:`configure`.
+"""
+
+import queue
+from typing import Any, Dict, List
+
+import numpy as np
+
+from ledgrab.core.audio.base import (
+ AudioCaptureEngine,
+ AudioCaptureStreamBase,
+ AudioDeviceInfo,
+)
+from ledgrab.utils import get_logger
+from ledgrab.utils.platform import is_android
+
+logger = get_logger(__name__)
+
+# ---------------------------------------------------------------------------
+# Sample queue — the bridge between Kotlin and Python
+# ---------------------------------------------------------------------------
+
+_pcm_queue: "queue.Queue[np.ndarray]" = queue.Queue(maxsize=8)
+_sample_rate = 48000
+_channels = 2
+_chunk_size = 1024
+_active = False
+_frames_received = 0
+
+
+def configure(sample_rate: int, channels: int, chunk_size: int) -> None:
+ """Set the stream format. Called from Kotlin before frames flow.
+
+ Drains any stale PCM from a previous capture session so the first
+ chunk after a restart is actually current. ``channels`` /
+ ``sample_rate`` should be the values the Kotlin ``AudioRecord``
+ actually negotiated (which can differ from the requested values,
+ e.g. a stereo request that falls back to mono) — the analyzer keys
+ off these, so they must match the interleaving of pushed samples.
+ """
+ global _sample_rate, _channels, _chunk_size, _active, _frames_received
+ while not _pcm_queue.empty():
+ try:
+ _pcm_queue.get_nowait()
+ except queue.Empty:
+ break
+ _sample_rate = sample_rate
+ _channels = max(1, channels)
+ _chunk_size = max(1, chunk_size)
+ _frames_received = 0
+ _active = True
+ logger.info(
+ "Android audio engine configured: sr=%d channels=%d chunk=%d",
+ _sample_rate,
+ _channels,
+ _chunk_size,
+ )
+
+
+def push_samples(pcm_float32: bytes) -> None:
+ """Push one interleaved float32 PCM block from Kotlin.
+
+ The byte buffer is interpreted as native-endian float32 (Kotlin
+ packs little-endian; all Android ABIs are little-endian). Drops the
+ oldest queued block if the consumer is slow (non-blocking).
+
+ Defensive framing: the downstream :class:`AudioAnalyzer` reshapes to
+ ``(-1, channels)`` and copies into ``chunk_size``-sized scratch
+ buffers, so it raises on a block whose length is not a whole number
+ of frames or that exceeds ``chunk_size`` frames. We trim to a whole
+ multiple of ``_channels`` and clamp to ``_chunk_size`` frames so a
+ malformed push can never crash the capture thread.
+ """
+ global _frames_received
+ # np.frombuffer raises if the length isn't a whole number of float32s.
+ # Kotlin always pushes complete blocks, but guard so a malformed buffer is
+ # dropped here rather than surfacing as an exception across the JNI bridge.
+ if len(pcm_float32) % 4 != 0:
+ return
+ samples = np.frombuffer(pcm_float32, dtype=np.float32)
+
+ # Trim to whole frames, then clamp to chunk_size frames.
+ frames = len(samples) // _channels
+ if frames <= 0:
+ return
+ frames = min(frames, _chunk_size)
+ usable = frames * _channels
+
+ # Copy out of the read-only frombuffer view so the queued block owns its
+ # memory. This lets the Kotlin side push from a reusable buffer (low GC on
+ # low-end TV boxes) without the not-yet-consumed queued block aliasing
+ # bytes Kotlin is about to overwrite. Mirrors mediaprojection_engine's
+ # push_frame .copy().
+ block = samples[:usable].copy()
+
+ _frames_received += 1
+ if _frames_received == 1 or _frames_received % 100 == 0:
+ logger.info("Android audio: received %d blocks", _frames_received)
+
+ try:
+ _pcm_queue.put_nowait(block)
+ except queue.Full:
+ try:
+ _pcm_queue.get_nowait()
+ except queue.Empty:
+ pass
+ try:
+ _pcm_queue.put_nowait(block)
+ except queue.Full:
+ pass
+
+
+def shutdown() -> None:
+ """Deactivate the engine. Called when the Android app stops audio."""
+ global _active
+ _active = False
+ logger.info("Android audio engine shut down")
+
+
+# ---------------------------------------------------------------------------
+# CaptureStream
+# ---------------------------------------------------------------------------
+
+
+class AndroidAudioCaptureStream(AudioCaptureStreamBase):
+ """Reads PCM blocks pushed by Kotlin from the module-level queue."""
+
+ @property
+ def channels(self) -> int:
+ return _channels
+
+ @property
+ def sample_rate(self) -> int:
+ return _sample_rate
+
+ @property
+ def chunk_size(self) -> int:
+ return _chunk_size
+
+ def initialize(self) -> None:
+ if self._initialized:
+ return
+ if not _active:
+ raise RuntimeError(
+ "Android audio engine not configured. "
+ "This engine is only available inside the Android app."
+ )
+ self._initialized = True
+ logger.info("Android audio capture stream initialized")
+
+ def cleanup(self) -> None:
+ self._initialized = False
+ logger.info("Android audio capture stream cleaned up")
+
+ def read_chunk(self) -> np.ndarray | None:
+ try:
+ return _pcm_queue.get(timeout=0.1) # 1-D float32 interleaved
+ except queue.Empty:
+ return None
+
+
+# ---------------------------------------------------------------------------
+# CaptureEngine
+# ---------------------------------------------------------------------------
+
+
+class AndroidAudioEngine(AudioCaptureEngine):
+ """Android playback-capture audio engine.
+
+ Only available when running inside the LedGrab Android app, which
+ calls :func:`configure` once audio capture is set up. Exposes a
+ single loopback "device" representing the system audio mix.
+ """
+
+ ENGINE_TYPE = "android_playback"
+ ENGINE_PRIORITY = 100 # highest on a real Android device (demo only wins in demo mode)
+
+ @classmethod
+ def is_available(cls) -> bool:
+ return is_android() and _active
+
+ @classmethod
+ def get_default_config(cls) -> Dict[str, Any]:
+ return {
+ "sample_rate": _sample_rate,
+ "channels": _channels,
+ "chunk_size": _chunk_size,
+ }
+
+ @classmethod
+ def enumerate_devices(cls) -> List[AudioDeviceInfo]:
+ if not cls.is_available():
+ return []
+ return [
+ AudioDeviceInfo(
+ index=0,
+ name="Android playback (system audio)",
+ is_input=True,
+ is_loopback=True,
+ channels=_channels,
+ default_samplerate=float(_sample_rate),
+ )
+ ]
+
+ @classmethod
+ def create_stream(
+ cls,
+ device_index: int,
+ is_loopback: bool,
+ config: Dict[str, Any],
+ ) -> AndroidAudioCaptureStream:
+ merged = {**cls.get_default_config(), **config}
+ return AndroidAudioCaptureStream(device_index, is_loopback, merged)
diff --git a/server/tests/core/test_android_audio_engine.py b/server/tests/core/test_android_audio_engine.py
new file mode 100644
index 0000000..0696df7
--- /dev/null
+++ b/server/tests/core/test_android_audio_engine.py
@@ -0,0 +1,253 @@
+"""Tests for the Android playback-capture audio engine.
+
+These run on desktop CI (no Android device needed): ``is_android`` is
+monkeypatched and PCM is pushed directly into the module-level queue,
+exactly as the Kotlin bridge would.
+"""
+
+import queue
+
+import numpy as np
+import pytest
+
+# Importing the package triggers auto-registration of AndroidAudioEngine.
+import ledgrab.core.audio # noqa: F401
+from ledgrab.core.audio import android_audio_engine as eng
+from ledgrab.core.audio.analysis import AudioAnalysis, AudioAnalyzer
+from ledgrab.core.audio.audio_capture import AudioCaptureManager
+from ledgrab.core.audio.factory import AudioEngineRegistry
+
+ENGINE_MOD = "ledgrab.core.audio.android_audio_engine"
+SAMPLE_RATE = 48000
+CHANNELS = 2
+CHUNK = 1024
+
+
+# ---------------------------------------------------------------------------
+# Helpers / fixtures
+# ---------------------------------------------------------------------------
+
+
+def _drain() -> None:
+ while not eng._pcm_queue.empty():
+ try:
+ eng._pcm_queue.get_nowait()
+ except queue.Empty:
+ break
+
+
+def _block(marker: float = 0.0, frames: int = CHUNK, channels: int = CHANNELS) -> np.ndarray:
+ """A float32 interleaved block whose first sample is ``marker``."""
+ data = np.zeros(frames * channels, dtype=np.float32)
+ data[0] = marker
+ return data
+
+
+@pytest.fixture
+def reset_engine():
+ """Reset module-global engine state; snapshot/restore the registry.
+
+ The engine keeps its queue + format in module globals and the registry
+ is a class-level singleton — both must be restored so this test file
+ never disturbs the desktop engines other tests rely on.
+ """
+ saved_engines = dict(AudioEngineRegistry._engines)
+ eng.shutdown()
+ _drain()
+ eng._sample_rate = SAMPLE_RATE
+ eng._channels = CHANNELS
+ eng._chunk_size = CHUNK
+ eng._frames_received = 0
+
+ yield eng
+
+ eng.shutdown()
+ _drain()
+ AudioEngineRegistry._engines.clear()
+ AudioEngineRegistry._engines.update(saved_engines)
+
+
+@pytest.fixture
+def on_android(monkeypatch, reset_engine):
+ """Engine fixture with ``is_android`` forced True and demo mode off."""
+ monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
+ monkeypatch.setattr("ledgrab.core.audio.factory.is_demo_mode", lambda: False)
+ return reset_engine
+
+
+# ---------------------------------------------------------------------------
+# Queue / push contract
+# ---------------------------------------------------------------------------
+
+
+def test_configure_then_push_round_trips_samples(reset_engine):
+ # Arrange
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ samples = np.arange(CHUNK * CHANNELS, dtype=np.float32)
+
+ # Act
+ eng.push_samples(samples.tobytes())
+ stream = eng.AndroidAudioEngine.create_stream(0, True, {})
+ stream.initialize()
+ got = stream.read_chunk()
+
+ # Assert
+ assert got is not None
+ np.testing.assert_array_equal(got, samples)
+
+
+def test_queue_drops_oldest_when_full(reset_engine):
+ # Arrange
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ maxsize = eng._pcm_queue.maxsize # 8
+
+ # Act — push more blocks than the queue can hold, each tagged 0..N-1
+ total = maxsize + 2
+ for i in range(total):
+ eng.push_samples(_block(marker=float(i)).tobytes())
+
+ drained = []
+ while True:
+ try:
+ drained.append(eng._pcm_queue.get_nowait())
+ except queue.Empty:
+ break
+
+ # Assert — only the newest `maxsize` blocks survived, oldest dropped
+ assert len(drained) == maxsize
+ markers = [int(b[0]) for b in drained]
+ assert markers == list(range(total - maxsize, total))
+
+
+def test_initialize_raises_when_not_configured(reset_engine):
+ # Arrange — fixture left the engine inactive
+ stream = eng.AndroidAudioEngine.create_stream(0, True, {})
+
+ # Act / Assert
+ with pytest.raises(RuntimeError):
+ stream.initialize()
+
+
+def test_read_chunk_returns_none_when_empty(reset_engine):
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ stream = eng.AndroidAudioEngine.create_stream(0, True, {})
+ stream.initialize()
+ assert stream.read_chunk() is None
+
+
+# ---------------------------------------------------------------------------
+# Availability / enumeration (platform-gated)
+# ---------------------------------------------------------------------------
+
+
+def test_is_available_requires_android_and_active(monkeypatch, reset_engine):
+ # Not configured yet → inactive → unavailable even on Android.
+ monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: True)
+ assert eng.AndroidAudioEngine.is_available() is False
+
+ # Configured → active + Android → available.
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ assert eng.AndroidAudioEngine.is_available() is True
+
+ # Active but not on Android → unavailable.
+ monkeypatch.setattr(f"{ENGINE_MOD}.is_android", lambda: False)
+ assert eng.AndroidAudioEngine.is_available() is False
+
+
+def test_enumerate_devices(on_android):
+ # Inactive → no devices.
+ assert eng.AndroidAudioEngine.enumerate_devices() == []
+
+ # Active → exactly one loopback device.
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ devices = eng.AndroidAudioEngine.enumerate_devices()
+ assert len(devices) == 1
+ dev = devices[0]
+ assert dev.is_loopback is True
+ assert dev.is_input is True
+ assert "Android playback" in dev.name
+ assert dev.channels == CHANNELS
+
+
+# ---------------------------------------------------------------------------
+# Regression guard — the analyzer must never crash on a malformed block
+# (over-length or non-frame-divisible). This is the on-device failure the
+# plan review surfaced; the desktop suite must catch it.
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.parametrize(
+ "raw_floats",
+ [
+ (CHUNK + 100) * CHANNELS, # over-length (more frames than chunk_size)
+ CHUNK * CHANNELS + 1, # not a whole number of stereo frames
+ 3, # tiny + odd
+ CHUNK * CHANNELS, # exact (control)
+ ],
+)
+def test_pushed_block_never_crashes_analyzer(reset_engine, raw_floats):
+ # Arrange
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ pcm = np.random.default_rng(0).standard_normal(raw_floats).astype(np.float32)
+ analyzer = AudioAnalyzer(sample_rate=SAMPLE_RATE, chunk_size=CHUNK)
+ stream = eng.AndroidAudioEngine.create_stream(0, True, {})
+ stream.initialize()
+
+ # Act
+ eng.push_samples(pcm.tobytes())
+ chunk = stream.read_chunk()
+
+ # Assert — chunk is a safe shape and analyze() does not raise.
+ assert chunk is not None
+ assert len(chunk) % CHANNELS == 0
+ assert len(chunk) <= CHUNK * CHANNELS
+ analysis = analyzer.analyze(chunk, CHANNELS)
+ assert isinstance(analysis, AudioAnalysis)
+
+
+# ---------------------------------------------------------------------------
+# Registry integration
+# ---------------------------------------------------------------------------
+
+
+def test_best_available_engine_is_android_when_active(on_android):
+ # Arrange
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+
+ # Act
+ best = AudioEngineRegistry.get_best_available_engine()
+
+ # Assert — priority 100 beats every desktop engine; demo only wins in demo mode.
+ assert best == "android_playback"
+
+
+def test_stream_via_registry_yields_pushed_chunk(on_android):
+ # Arrange
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+ samples = np.linspace(-1.0, 1.0, CHUNK * CHANNELS, dtype=np.float32)
+
+ # Act
+ stream = AudioEngineRegistry.create_stream("android_playback", 0, True, {})
+ stream.initialize()
+ eng.push_samples(samples.tobytes())
+ got = stream.read_chunk()
+
+ # Assert
+ assert stream.channels == CHANNELS
+ assert stream.sample_rate == SAMPLE_RATE
+ assert stream.chunk_size == CHUNK
+ np.testing.assert_array_equal(got, samples)
+
+
+def test_device_surfaces_through_capture_manager(on_android):
+ # Arrange
+ eng.configure(SAMPLE_RATE, CHANNELS, CHUNK)
+
+ # Act
+ devices = AudioCaptureManager.enumerate_devices()
+
+ # Assert — the Android device is enumerated and tagged with its engine.
+ android = [d for d in devices if d["engine_type"] == "android_playback"]
+ assert len(android) == 1
+ assert android[0]["name"] == "Android playback (system audio)"
+ assert android[0]["is_loopback"] is True