"""Log broadcaster: in-memory ring buffer + WebSocket fan-out for live log tailing.""" import asyncio import logging from collections import deque from typing import Set # Maximum number of log records kept in memory _BACKLOG_SIZE = 500 # A simple text formatter used by the broadcast handler _formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s — %(message)s") class LogBroadcaster: """Singleton that buffers recent log lines and fans them out to WS clients.""" def __init__(self, maxlen: int = _BACKLOG_SIZE) -> None: self._backlog: deque[str] = deque(maxlen=maxlen) self._clients: Set[asyncio.Queue] = set() # The async event loop where send_to_clients() is scheduled. # Set lazily on the first WS connection (inside the async context). self._loop: asyncio.AbstractEventLoop | None = None # ------------------------------------------------------------------ # Called from the logging.Handler (any thread) # ------------------------------------------------------------------ def emit(self, line: str) -> None: """Append *line* to the backlog and notify all connected WS clients. Safe to call from any thread — it schedules the async notification on the server's event loop without blocking. """ self._backlog.append(line) if self._clients and self._loop is not None: try: self._loop.call_soon_threadsafe(self._enqueue_line, line) except RuntimeError: # Loop closed / not running — silently drop pass def _enqueue_line(self, line: str) -> None: """Push *line* onto every connected client's queue (called from the event loop).""" dead: Set[asyncio.Queue] = set() for q in self._clients: try: q.put_nowait(line) except asyncio.QueueFull: dead.add(q) self._clients -= dead # ------------------------------------------------------------------ # Called from the async WS handler # ------------------------------------------------------------------ def subscribe(self) -> "asyncio.Queue[str]": """Register a new WS client and return its private line queue.""" if self._loop is None: try: self._loop = asyncio.get_running_loop() except RuntimeError: pass q: asyncio.Queue[str] = asyncio.Queue(maxsize=200) self._clients.add(q) return q def unsubscribe(self, q: "asyncio.Queue[str]") -> None: """Remove a WS client queue.""" self._clients.discard(q) def get_backlog(self) -> list[str]: """Return a snapshot of the current backlog (oldest → newest).""" return list(self._backlog) def ensure_loop(self) -> None: """Capture the running event loop if not already stored. Call this once from an async context (e.g. the WS endpoint) so that thread-safe scheduling works correctly even before the first client connects. """ if self._loop is None: try: self._loop = asyncio.get_running_loop() except RuntimeError: pass # Module-level singleton broadcaster = LogBroadcaster() class BroadcastLogHandler(logging.Handler): """A logging.Handler that pushes formatted records into the LogBroadcaster.""" def __init__(self) -> None: super().__init__() self.setFormatter(_formatter) def emit(self, record: logging.LogRecord) -> None: # type: ignore[override] try: line = self.format(record) broadcaster.emit(line) except Exception: self.handleError(record) def install_broadcast_handler() -> None: """Attach BroadcastLogHandler to the root logger (idempotent).""" root = logging.getLogger() # Avoid double-installation on hot reload / re-import for h in root.handlers: if isinstance(h, BroadcastLogHandler): return handler = BroadcastLogHandler() handler.setLevel(logging.DEBUG) root.addHandler(handler)