feat(foreground): track topmost process + browser page title
Lint & Test / test (push) Failing after 8s
Lint & Test / test (push) Failing after 8s
Adds cross-platform foreground-window tracking and exposes it over REST (/api/foreground) and the existing WebSocket feed. - foreground_service.py: Windows probe via ctypes (HANDLE-correct argtypes to avoid 64-bit handle truncation); macOS via AppKit; Linux via Xlib (Wayland returns unavailable). TTL cache + per-platform fallback. - browser_url_service.py: when foreground is a recognised browser, extract the page title from the window title (browser-name suffix stripped) and surface `is_browser` + `browser_page_title`. Optional UIA-based URL extraction behind MEDIA_SERVER_BROWSER_UIA env flag (off by default — Chromium browsers keep their accessibility tree dormant otherwise). - websocket_manager: poll foreground every 1s inside the existing status loop, broadcast `foreground` on connect and `foreground_update` on change. Diff only on user-visible fields to avoid geometry spam. - WebUI: new editorial card rendered under the monitor list on the Display tab — process name, window title, fullscreen/minimized/monitor chips, browser block when applicable, exe path, PID, started-ago, geometry, platform. 16px inter-section gap matches Settings cadence. - i18n: 25 new keys added to both en.json and ru.json. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,296 @@
|
||||
"""Extract page-level metadata from a focused desktop web browser.
|
||||
|
||||
The browser's window title is the reliable signal — every major browser
|
||||
formats it as ``"<page title> - <Browser Name>"``, so stripping the suffix
|
||||
gives us the page title for free.
|
||||
|
||||
URL extraction was attempted via UI Automation (UIA), but Chromium-based
|
||||
browsers (Chrome/Edge/Brave/Vivaldi) keep their accessibility tree dormant
|
||||
unless a screen reader is active or ``--force-renderer-accessibility`` is
|
||||
set — neither is something we want to require from end users. The UIA
|
||||
machinery is still here behind a feature flag in case a future caller
|
||||
opts into the accessibility-flag path; by default we just return the
|
||||
page title and leave ``url=None``.
|
||||
|
||||
Other platforms (macOS via AppleScript, Linux via AT-SPI) are out of scope
|
||||
for this iteration.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# UIA URL extraction is opt-in because Chromium browsers keep their
|
||||
# accessibility tree dormant unless the user starts the browser with
|
||||
# ``--force-renderer-accessibility`` (or a screen reader is running).
|
||||
# Without that, `FindAll` throws and we'd burn 5s per probe retrying.
|
||||
# Set MEDIA_SERVER_BROWSER_UIA=1 to enable; default off.
|
||||
_UIA_ENABLED = os.environ.get("MEDIA_SERVER_BROWSER_UIA", "").lower() in (
|
||||
"1", "true", "yes", "on"
|
||||
)
|
||||
|
||||
|
||||
# Known browser executables (lowercase, .exe-stripped). Used to decide
|
||||
# whether to spend the UIA query budget on this foreground process.
|
||||
BROWSER_PROCESS_HINTS: frozenset[str] = frozenset({
|
||||
"chrome",
|
||||
"msedge",
|
||||
"firefox",
|
||||
"brave",
|
||||
"opera",
|
||||
"vivaldi",
|
||||
"yandex",
|
||||
"browser", # Yandex Browser sometimes reports as browser.exe
|
||||
"arc",
|
||||
"thorium",
|
||||
})
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BrowserPageInfo:
|
||||
url: str | None = None
|
||||
page_title: str | None = None
|
||||
|
||||
|
||||
_EMPTY = BrowserPageInfo()
|
||||
|
||||
|
||||
def is_browser_process(process_name: str | None) -> bool:
|
||||
"""Return True when ``process_name`` looks like a supported browser."""
|
||||
if not process_name:
|
||||
return False
|
||||
base = process_name.lower()
|
||||
if base.endswith(".exe"):
|
||||
base = base[:-4]
|
||||
return base in BROWSER_PROCESS_HINTS
|
||||
|
||||
|
||||
def _strip_browser_suffix(title: str | None, process_name: str | None) -> str | None:
|
||||
"""Pull the page title out of the browser's window title.
|
||||
|
||||
Most browsers format their window title as ``"<page> - <Browser Name>"``.
|
||||
We strip the trailing suffix so consumers get the page title alone. If
|
||||
the suffix can't be matched, return the raw title unchanged.
|
||||
"""
|
||||
if not title:
|
||||
return None
|
||||
suffixes = (
|
||||
" - Google Chrome",
|
||||
" — Google Chrome",
|
||||
" - Microsoft Edge",
|
||||
" - Microsoft Edge",
|
||||
" — Mozilla Firefox",
|
||||
" - Mozilla Firefox",
|
||||
" - Brave",
|
||||
" - Opera",
|
||||
" - Vivaldi",
|
||||
" - Yandex",
|
||||
)
|
||||
for s in suffixes:
|
||||
if title.endswith(s):
|
||||
return title[: -len(s)].strip() or None
|
||||
return title
|
||||
|
||||
|
||||
# ─── UIA lookup (Windows) ───────────────────────────────────────────
|
||||
|
||||
# UIA control type / property constants we need. Avoiding the full
|
||||
# UIAutomationClient typelib generation — those constants are stable.
|
||||
_UIA_EditControlTypeId = 50004
|
||||
_UIA_ControlTypePropertyId = 30003
|
||||
_UIA_ValueValuePropertyId = 30045
|
||||
_UIA_NamePropertyId = 30005
|
||||
_UIA_ValuePatternId = 10002
|
||||
_TreeScope_Descendants = 4
|
||||
_PropertyConditionFlags_IgnoreCase = 1
|
||||
|
||||
|
||||
# Lazy import + per-thread COM init.
|
||||
_uia_lock = threading.Lock()
|
||||
_uia_singleton = None
|
||||
_uia_load_error: str | None = None
|
||||
_uia_thread_local = threading.local()
|
||||
|
||||
|
||||
def _ensure_com() -> None:
|
||||
"""Initialise COM on the current thread (idempotent per thread)."""
|
||||
if getattr(_uia_thread_local, "initialised", False):
|
||||
return
|
||||
try:
|
||||
import comtypes # type: ignore
|
||||
|
||||
# COINIT_APARTMENTTHREADED is required by UIA; comtypes' default
|
||||
# CoInitializeEx already passes that flag.
|
||||
comtypes.CoInitialize()
|
||||
_uia_thread_local.initialised = True
|
||||
except Exception as e:
|
||||
logger.debug("CoInitialize failed: %s", e)
|
||||
|
||||
|
||||
def _get_uia():
|
||||
"""Return the IUIAutomation singleton, or None if unavailable."""
|
||||
global _uia_singleton, _uia_load_error
|
||||
if _uia_singleton is not None:
|
||||
return _uia_singleton
|
||||
if _uia_load_error is not None:
|
||||
return None
|
||||
with _uia_lock:
|
||||
if _uia_singleton is not None:
|
||||
return _uia_singleton
|
||||
try:
|
||||
import comtypes.client # type: ignore
|
||||
|
||||
# CLSID for CUIAutomation. Using GetActiveObject would fail,
|
||||
# so we cocreate. comtypes.client.CreateObject keeps the COM
|
||||
# plumbing tidy.
|
||||
_uia_singleton = comtypes.client.CreateObject(
|
||||
"{ff48dba4-60ef-4201-aa87-54103eef594e}",
|
||||
interface=comtypes.client.GetModule(
|
||||
"UIAutomationCore.dll"
|
||||
).IUIAutomation,
|
||||
)
|
||||
return _uia_singleton
|
||||
except Exception as e:
|
||||
_uia_load_error = str(e)
|
||||
logger.info("UIA unavailable; browser URL extraction disabled: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def _find_address_bar_value(hwnd: int) -> str | None:
|
||||
"""Walk the UIA tree under ``hwnd`` looking for the URL Edit control.
|
||||
|
||||
Strategy: find every descendant Edit control, then pick the first one
|
||||
whose Name contains an address-bar hint, or — failing that — the first
|
||||
one whose value parses as a URL-ish string. Browsers expose extra Edit
|
||||
controls (search bars, find-in-page) so name matching is the reliable
|
||||
signal; the URL-ish fallback covers locale variants we haven't seen.
|
||||
"""
|
||||
_ensure_com()
|
||||
uia = _get_uia()
|
||||
if uia is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
element = uia.ElementFromHandle(hwnd)
|
||||
if not element:
|
||||
return None
|
||||
|
||||
# Build a condition matching ControlType=Edit, then enumerate.
|
||||
edit_condition = uia.CreatePropertyCondition(
|
||||
_UIA_ControlTypePropertyId, _UIA_EditControlTypeId
|
||||
)
|
||||
edits = element.FindAll(_TreeScope_Descendants, edit_condition)
|
||||
count = edits.Length if edits else 0
|
||||
if count == 0:
|
||||
return None
|
||||
|
||||
# Hints (lowercase) used to identify the address bar by its Name
|
||||
# property. Covers en-US plus a few common locales / browsers.
|
||||
name_hints = (
|
||||
"address", # Chrome/Edge: "Address and search bar"
|
||||
"адрес", # Chrome ru: "Адресная строка и строка поиска"
|
||||
"адресная",
|
||||
"search with", # Firefox: "Search with Google or enter address"
|
||||
"поиск или ввод", # Firefox ru
|
||||
"url",
|
||||
"location",
|
||||
)
|
||||
|
||||
# First pass: name-based match (high confidence).
|
||||
candidates: list[tuple[int, str]] = []
|
||||
for i in range(count):
|
||||
edit = edits.GetElement(i)
|
||||
try:
|
||||
name = (edit.CurrentName or "").lower()
|
||||
except Exception:
|
||||
name = ""
|
||||
try:
|
||||
value = edit.GetCurrentPropertyValue(_UIA_ValueValuePropertyId)
|
||||
except Exception:
|
||||
value = None
|
||||
if value is None:
|
||||
continue
|
||||
value_str = str(value)
|
||||
for h in name_hints:
|
||||
if h in name:
|
||||
return value_str
|
||||
candidates.append((i, value_str))
|
||||
|
||||
# Second pass: URL-ish fallback. Pick the first candidate that
|
||||
# looks like a URL; this catches browser/locale combos we haven't
|
||||
# listed above.
|
||||
for _i, v in candidates:
|
||||
lv = v.lower()
|
||||
if (
|
||||
lv.startswith("http://")
|
||||
or lv.startswith("https://")
|
||||
or lv.startswith("about:")
|
||||
or lv.startswith("chrome://")
|
||||
or lv.startswith("edge://")
|
||||
or lv.startswith("brave://")
|
||||
or lv.startswith("file://")
|
||||
or lv.startswith("ftp://")
|
||||
):
|
||||
return v
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.debug("UIA address-bar lookup failed: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
# ─── Per-(hwnd, title) cache ────────────────────────────────────────
|
||||
|
||||
_cache_lock = threading.Lock()
|
||||
_cache_key: tuple[int | None, str | None] = (None, None)
|
||||
_cache_value: BrowserPageInfo = _EMPTY
|
||||
|
||||
|
||||
def get_browser_page(
|
||||
*,
|
||||
hwnd: int | None,
|
||||
process_name: str | None,
|
||||
window_title: str | None,
|
||||
) -> BrowserPageInfo:
|
||||
"""Return the URL + page title for the foreground browser tab, if any.
|
||||
|
||||
Callers pass the already-resolved foreground HWND/title/process_name so
|
||||
this service doesn't re-walk Win32 to find them. Returns ``_EMPTY`` for
|
||||
non-browser processes or when UIA can't resolve the URL.
|
||||
"""
|
||||
if not is_browser_process(process_name):
|
||||
return _EMPTY
|
||||
if platform.system() != "Windows":
|
||||
# macOS/Linux paths not implemented in this iteration.
|
||||
return _EMPTY
|
||||
if not hwnd:
|
||||
return _EMPTY
|
||||
|
||||
global _cache_key, _cache_value
|
||||
key = (hwnd, window_title)
|
||||
with _cache_lock:
|
||||
if key == _cache_key and _cache_value is not _EMPTY:
|
||||
return _cache_value
|
||||
|
||||
url = _find_address_bar_value(hwnd) if _UIA_ENABLED else None
|
||||
page_title = _strip_browser_suffix(window_title, process_name)
|
||||
info = BrowserPageInfo(url=url, page_title=page_title)
|
||||
|
||||
with _cache_lock:
|
||||
_cache_key = key
|
||||
_cache_value = info
|
||||
return info
|
||||
|
||||
|
||||
def reset_cache() -> None:
|
||||
"""Reset the cache. Useful in tests."""
|
||||
global _cache_key, _cache_value
|
||||
with _cache_lock:
|
||||
_cache_key = (None, None)
|
||||
_cache_value = _EMPTY
|
||||
@@ -0,0 +1,514 @@
|
||||
"""Foreground (topmost) window/process tracking.
|
||||
|
||||
Reports the process that currently owns the foreground window, plus useful
|
||||
metadata (window title, executable path, monitor index, whether the window
|
||||
covers a full monitor, process start time).
|
||||
|
||||
All probes happen behind a short TTL cache so the WebSocket status poll and
|
||||
per-entity HA polls don't pay the OS call cost on every tick.
|
||||
|
||||
Windows uses the Win32 API via ``ctypes`` (no extra dependency) and falls back
|
||||
gracefully when individual probes fail. Linux/macOS implementations are
|
||||
best-effort and return ``available=False`` when the required tooling is
|
||||
missing, so the rest of the stack keeps working.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import platform
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import asdict, dataclass, field
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_CACHE_TTL = 0.5 # seconds — fast enough for WebSocket broadcast loop
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ForegroundInfo:
|
||||
"""Snapshot of the foreground window/process."""
|
||||
|
||||
available: bool
|
||||
pid: int | None = None
|
||||
process_name: str | None = None
|
||||
executable_path: str | None = None
|
||||
window_title: str | None = None
|
||||
window_handle: int | None = None
|
||||
is_fullscreen: bool = False
|
||||
is_minimized: bool = False
|
||||
monitor_id: int | None = None
|
||||
monitor_geometry: dict[str, int] | None = None
|
||||
window_geometry: dict[str, int] | None = None
|
||||
started_at: float | None = None
|
||||
platform: str = field(default_factory=lambda: platform.system())
|
||||
error: str | None = None
|
||||
# Populated only when the foreground process is a recognised web
|
||||
# browser. ``browser_page_title`` is derived from the window title
|
||||
# (suffix stripped); ``browser_url`` requires UIA to succeed.
|
||||
is_browser: bool = False
|
||||
browser_url: str | None = None
|
||||
browser_page_title: str | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
_UNAVAILABLE = ForegroundInfo(available=False)
|
||||
|
||||
|
||||
class _Cache:
|
||||
"""Single-slot TTL cache shared across callers."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = threading.Lock()
|
||||
self._value: ForegroundInfo | None = None
|
||||
self._fetched_at: float = 0.0
|
||||
|
||||
def get(self, ttl: float, fetch) -> ForegroundInfo:
|
||||
with self._lock:
|
||||
now = time.monotonic()
|
||||
if self._value is not None and (now - self._fetched_at) < ttl:
|
||||
return self._value
|
||||
# Fetch outside the lock — OS calls can take tens of ms.
|
||||
value = fetch()
|
||||
with self._lock:
|
||||
self._value = value
|
||||
self._fetched_at = time.monotonic()
|
||||
return value
|
||||
|
||||
def invalidate(self) -> None:
|
||||
with self._lock:
|
||||
self._value = None
|
||||
self._fetched_at = 0.0
|
||||
|
||||
|
||||
_cache = _Cache()
|
||||
|
||||
|
||||
def _probe_windows() -> ForegroundInfo:
|
||||
"""Probe foreground window state on Windows via Win32 API."""
|
||||
import ctypes
|
||||
import ctypes.wintypes as wt
|
||||
|
||||
user32 = ctypes.WinDLL("user32", use_last_error=True)
|
||||
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
||||
psapi = ctypes.WinDLL("psapi", use_last_error=True)
|
||||
|
||||
# CRITICAL: declare argtypes/restype on every Win32 call that returns a
|
||||
# HANDLE/HWND/HMONITOR. ctypes defaults to `c_int` (32-bit) which
|
||||
# silently truncates 64-bit pointer values on x64 — that corrupts the
|
||||
# handle so `CloseHandle()` can either fail or close the wrong kernel
|
||||
# object, and pointer-equality comparisons (monitor index lookup) miss.
|
||||
user32.GetForegroundWindow.restype = wt.HWND
|
||||
user32.GetWindowThreadProcessId.argtypes = [wt.HWND, ctypes.POINTER(wt.DWORD)]
|
||||
user32.GetWindowThreadProcessId.restype = wt.DWORD
|
||||
user32.GetWindowTextLengthW.argtypes = [wt.HWND]
|
||||
user32.GetWindowTextLengthW.restype = ctypes.c_int
|
||||
user32.GetWindowTextW.argtypes = [wt.HWND, wt.LPWSTR, ctypes.c_int]
|
||||
user32.GetWindowTextW.restype = ctypes.c_int
|
||||
user32.IsIconic.argtypes = [wt.HWND]
|
||||
user32.IsIconic.restype = wt.BOOL
|
||||
user32.GetWindowRect.argtypes = [wt.HWND, ctypes.POINTER(wt.RECT)]
|
||||
user32.GetWindowRect.restype = wt.BOOL
|
||||
user32.MonitorFromWindow.argtypes = [wt.HWND, wt.DWORD]
|
||||
user32.MonitorFromWindow.restype = wt.HMONITOR
|
||||
user32.GetMonitorInfoW.argtypes = [wt.HMONITOR, ctypes.c_void_p]
|
||||
user32.GetMonitorInfoW.restype = wt.BOOL
|
||||
|
||||
kernel32.OpenProcess.argtypes = [wt.DWORD, wt.BOOL, wt.DWORD]
|
||||
kernel32.OpenProcess.restype = wt.HANDLE
|
||||
kernel32.CloseHandle.argtypes = [wt.HANDLE]
|
||||
kernel32.CloseHandle.restype = wt.BOOL
|
||||
kernel32.QueryFullProcessImageNameW.argtypes = [
|
||||
wt.HANDLE, wt.DWORD, wt.LPWSTR, ctypes.POINTER(wt.DWORD)
|
||||
]
|
||||
kernel32.QueryFullProcessImageNameW.restype = wt.BOOL
|
||||
kernel32.GetProcessTimes.argtypes = [
|
||||
wt.HANDLE,
|
||||
ctypes.POINTER(wt.FILETIME),
|
||||
ctypes.POINTER(wt.FILETIME),
|
||||
ctypes.POINTER(wt.FILETIME),
|
||||
ctypes.POINTER(wt.FILETIME),
|
||||
]
|
||||
kernel32.GetProcessTimes.restype = wt.BOOL
|
||||
|
||||
psapi.GetModuleFileNameExW.argtypes = [wt.HANDLE, wt.HMODULE, wt.LPWSTR, wt.DWORD]
|
||||
psapi.GetModuleFileNameExW.restype = wt.DWORD
|
||||
|
||||
hwnd = user32.GetForegroundWindow()
|
||||
if not hwnd:
|
||||
return ForegroundInfo(available=True, error="no foreground window")
|
||||
|
||||
# PID + window thread.
|
||||
pid = wt.DWORD(0)
|
||||
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
|
||||
pid_val = int(pid.value) if pid.value else None
|
||||
|
||||
# Window title — Unicode.
|
||||
length = user32.GetWindowTextLengthW(hwnd)
|
||||
title_buf = ctypes.create_unicode_buffer(length + 1)
|
||||
user32.GetWindowTextW(hwnd, title_buf, length + 1)
|
||||
window_title = title_buf.value or None
|
||||
|
||||
# Minimized flag.
|
||||
is_minimized = bool(user32.IsIconic(hwnd))
|
||||
|
||||
# Window rect (screen coords).
|
||||
rect = wt.RECT()
|
||||
window_geometry: dict[str, int] | None = None
|
||||
if user32.GetWindowRect(hwnd, ctypes.byref(rect)):
|
||||
window_geometry = {
|
||||
"left": int(rect.left),
|
||||
"top": int(rect.top),
|
||||
"right": int(rect.right),
|
||||
"bottom": int(rect.bottom),
|
||||
"width": int(rect.right - rect.left),
|
||||
"height": int(rect.bottom - rect.top),
|
||||
}
|
||||
|
||||
# Monitor under the window + its geometry.
|
||||
monitor_geometry: dict[str, int] | None = None
|
||||
monitor_id: int | None = None
|
||||
is_fullscreen = False
|
||||
try:
|
||||
MONITOR_DEFAULTTONEAREST = 2
|
||||
|
||||
class MONITORINFO(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("cbSize", wt.DWORD),
|
||||
("rcMonitor", wt.RECT),
|
||||
("rcWork", wt.RECT),
|
||||
("dwFlags", wt.DWORD),
|
||||
]
|
||||
|
||||
hmon = user32.MonitorFromWindow(hwnd, MONITOR_DEFAULTTONEAREST)
|
||||
if hmon:
|
||||
mi = MONITORINFO()
|
||||
mi.cbSize = ctypes.sizeof(mi)
|
||||
if user32.GetMonitorInfoW(hmon, ctypes.byref(mi)):
|
||||
monitor_geometry = {
|
||||
"left": int(mi.rcMonitor.left),
|
||||
"top": int(mi.rcMonitor.top),
|
||||
"right": int(mi.rcMonitor.right),
|
||||
"bottom": int(mi.rcMonitor.bottom),
|
||||
"width": int(mi.rcMonitor.right - mi.rcMonitor.left),
|
||||
"height": int(mi.rcMonitor.bottom - mi.rcMonitor.top),
|
||||
}
|
||||
# Fullscreen heuristic: window rect equals monitor rect AND
|
||||
# not minimized. Many media players (VLC, browser fullscreen)
|
||||
# set themselves to exactly the monitor bounds.
|
||||
if window_geometry and not is_minimized:
|
||||
is_fullscreen = (
|
||||
window_geometry["left"] == monitor_geometry["left"]
|
||||
and window_geometry["top"] == monitor_geometry["top"]
|
||||
and window_geometry["right"] == monitor_geometry["right"]
|
||||
and window_geometry["bottom"] == monitor_geometry["bottom"]
|
||||
)
|
||||
|
||||
# Resolve monitor index by enumerating displays in order. Coerce
|
||||
# both the foreground hmon and the per-enum hmon to int so the
|
||||
# equality compare uses 64-bit values consistently regardless of
|
||||
# how ctypes represents the handle internally.
|
||||
try:
|
||||
indexed: list[int] = []
|
||||
|
||||
def _cb(hm, _hdc, _rect, _data):
|
||||
indexed.append(int(hm) if hm else 0)
|
||||
return True
|
||||
|
||||
MONITORENUMPROC = ctypes.WINFUNCTYPE(
|
||||
ctypes.c_int,
|
||||
wt.HMONITOR,
|
||||
wt.HDC,
|
||||
ctypes.POINTER(wt.RECT),
|
||||
wt.LPARAM,
|
||||
)
|
||||
user32.EnumDisplayMonitors.argtypes = [
|
||||
wt.HDC, ctypes.POINTER(wt.RECT), MONITORENUMPROC, wt.LPARAM
|
||||
]
|
||||
user32.EnumDisplayMonitors.restype = wt.BOOL
|
||||
user32.EnumDisplayMonitors(None, None, MONITORENUMPROC(_cb), 0)
|
||||
target = int(hmon) if hmon else 0
|
||||
if target and target in indexed:
|
||||
monitor_id = indexed.index(target)
|
||||
except Exception as e:
|
||||
logger.debug("Monitor index resolution failed: %s", e)
|
||||
except Exception as e:
|
||||
logger.debug("Monitor info probe failed: %s", e)
|
||||
|
||||
# Process executable path + start time.
|
||||
executable_path: str | None = None
|
||||
process_name: str | None = None
|
||||
started_at: float | None = None
|
||||
if pid_val:
|
||||
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
|
||||
h_proc = kernel32.OpenProcess(
|
||||
PROCESS_QUERY_LIMITED_INFORMATION, False, pid_val
|
||||
)
|
||||
if h_proc:
|
||||
try:
|
||||
# Image filename — full path. QueryFullProcessImageNameW works
|
||||
# across 32/64-bit boundaries, unlike GetModuleFileNameExW.
|
||||
buf = ctypes.create_unicode_buffer(1024)
|
||||
size = wt.DWORD(len(buf))
|
||||
if kernel32.QueryFullProcessImageNameW(
|
||||
h_proc, 0, buf, ctypes.byref(size)
|
||||
):
|
||||
executable_path = buf.value or None
|
||||
else:
|
||||
# Fallback via psapi. Return value is the length copied
|
||||
# into the buffer (0 on failure); ignoring it would leave
|
||||
# `executable_path` as an empty string from the freshly
|
||||
# allocated buffer instead of None.
|
||||
written = psapi.GetModuleFileNameExW(h_proc, None, buf, len(buf))
|
||||
if written:
|
||||
executable_path = buf.value or None
|
||||
else:
|
||||
logger.debug(
|
||||
"QueryFullProcessImageNameW + psapi fallback both "
|
||||
"failed for pid=%s (err=%d)",
|
||||
pid_val,
|
||||
ctypes.get_last_error(),
|
||||
)
|
||||
|
||||
if executable_path:
|
||||
import os
|
||||
process_name = os.path.basename(executable_path)
|
||||
|
||||
# Process creation time (FILETIME, 100ns ticks since 1601).
|
||||
creation = wt.FILETIME()
|
||||
exit_t = wt.FILETIME()
|
||||
kernel_t = wt.FILETIME()
|
||||
user_t = wt.FILETIME()
|
||||
if kernel32.GetProcessTimes(
|
||||
h_proc,
|
||||
ctypes.byref(creation),
|
||||
ctypes.byref(exit_t),
|
||||
ctypes.byref(kernel_t),
|
||||
ctypes.byref(user_t),
|
||||
):
|
||||
ticks = (creation.dwHighDateTime << 32) | creation.dwLowDateTime
|
||||
# Convert to Unix epoch seconds (1601-01-01 → 1970-01-01).
|
||||
if ticks:
|
||||
started_at = (ticks - 116444736000000000) / 10_000_000
|
||||
finally:
|
||||
kernel32.CloseHandle(h_proc)
|
||||
|
||||
return ForegroundInfo(
|
||||
available=True,
|
||||
pid=pid_val,
|
||||
process_name=process_name,
|
||||
executable_path=executable_path,
|
||||
window_title=window_title,
|
||||
window_handle=int(hwnd) if hwnd else None,
|
||||
is_fullscreen=is_fullscreen,
|
||||
is_minimized=is_minimized,
|
||||
monitor_id=monitor_id,
|
||||
monitor_geometry=monitor_geometry,
|
||||
window_geometry=window_geometry,
|
||||
started_at=started_at,
|
||||
)
|
||||
|
||||
|
||||
def _probe_macos() -> ForegroundInfo:
|
||||
"""Best-effort probe on macOS via AppKit (PyObjC).
|
||||
|
||||
Returns ``available=False`` when PyObjC is not installed — we don't take
|
||||
a hard dependency on it because the typical macOS install path uses pip
|
||||
+ the standalone wheel.
|
||||
"""
|
||||
try:
|
||||
from AppKit import NSWorkspace # type: ignore
|
||||
from Quartz import ( # type: ignore
|
||||
CGWindowListCopyWindowInfo,
|
||||
kCGNullWindowID,
|
||||
kCGWindowListOptionOnScreenOnly,
|
||||
)
|
||||
except Exception:
|
||||
return ForegroundInfo(available=False, error="AppKit/Quartz not available")
|
||||
|
||||
try:
|
||||
ws = NSWorkspace.sharedWorkspace()
|
||||
app = ws.frontmostApplication()
|
||||
if app is None:
|
||||
return ForegroundInfo(available=True, error="no frontmost app")
|
||||
|
||||
pid = int(app.processIdentifier())
|
||||
process_name = str(app.localizedName() or "")
|
||||
bundle_url = app.bundleURL()
|
||||
executable_path = str(bundle_url.path()) if bundle_url else None
|
||||
started_at = None
|
||||
launch_date = app.launchDate()
|
||||
if launch_date is not None:
|
||||
started_at = float(launch_date.timeIntervalSince1970())
|
||||
|
||||
# Window title — frontmost on-screen window owned by this PID.
|
||||
window_title: str | None = None
|
||||
try:
|
||||
windows = CGWindowListCopyWindowInfo(
|
||||
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
|
||||
)
|
||||
for w in windows or []:
|
||||
if int(w.get("kCGWindowOwnerPID", -1)) == pid:
|
||||
name = w.get("kCGWindowName")
|
||||
if name:
|
||||
window_title = str(name)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.debug("CGWindowListCopyWindowInfo failed: %s", e)
|
||||
|
||||
return ForegroundInfo(
|
||||
available=True,
|
||||
pid=pid,
|
||||
process_name=process_name,
|
||||
executable_path=executable_path,
|
||||
window_title=window_title,
|
||||
started_at=started_at,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("macOS foreground probe failed: %s", e)
|
||||
return ForegroundInfo(available=False, error=str(e))
|
||||
|
||||
|
||||
def _probe_linux() -> ForegroundInfo:
|
||||
"""Best-effort probe on Linux via Xlib (X11 only).
|
||||
|
||||
Wayland sessions intentionally hide window/process info from unprivileged
|
||||
clients, so this returns ``available=False`` on Wayland. The caller still
|
||||
gets a structured response and can render "unavailable" in the UI.
|
||||
"""
|
||||
import os
|
||||
|
||||
if os.environ.get("WAYLAND_DISPLAY"):
|
||||
return ForegroundInfo(
|
||||
available=False, error="Wayland session — foreground probe unavailable"
|
||||
)
|
||||
|
||||
try:
|
||||
from Xlib import display, X # type: ignore # noqa: F401
|
||||
except Exception:
|
||||
return ForegroundInfo(available=False, error="python-xlib not installed")
|
||||
|
||||
try:
|
||||
d = display.Display()
|
||||
root = d.screen().root
|
||||
NET_ACTIVE_WINDOW = d.intern_atom("_NET_ACTIVE_WINDOW")
|
||||
NET_WM_PID = d.intern_atom("_NET_WM_PID")
|
||||
NET_WM_NAME = d.intern_atom("_NET_WM_NAME")
|
||||
UTF8_STRING = d.intern_atom("UTF8_STRING")
|
||||
|
||||
active = root.get_full_property(NET_ACTIVE_WINDOW, X.AnyPropertyType)
|
||||
if not active or not active.value:
|
||||
return ForegroundInfo(available=True, error="no active window")
|
||||
win_id = int(active.value[0])
|
||||
win = d.create_resource_object("window", win_id)
|
||||
|
||||
pid_prop = win.get_full_property(NET_WM_PID, X.AnyPropertyType)
|
||||
pid_val = int(pid_prop.value[0]) if pid_prop and pid_prop.value else None
|
||||
|
||||
name_prop = win.get_full_property(NET_WM_NAME, UTF8_STRING)
|
||||
window_title = (
|
||||
name_prop.value.decode("utf-8", "replace") if name_prop and name_prop.value else None
|
||||
)
|
||||
|
||||
process_name: str | None = None
|
||||
executable_path: str | None = None
|
||||
started_at: float | None = None
|
||||
if pid_val:
|
||||
try:
|
||||
exe = os.readlink(f"/proc/{pid_val}/exe")
|
||||
executable_path = exe
|
||||
process_name = os.path.basename(exe)
|
||||
except OSError as e:
|
||||
logger.debug("readlink /proc/%d/exe failed: %s", pid_val, e)
|
||||
try:
|
||||
started_at = os.stat(f"/proc/{pid_val}").st_ctime
|
||||
except OSError as e:
|
||||
logger.debug("stat /proc/%d failed: %s", pid_val, e)
|
||||
|
||||
return ForegroundInfo(
|
||||
available=True,
|
||||
pid=pid_val,
|
||||
process_name=process_name,
|
||||
executable_path=executable_path,
|
||||
window_title=window_title,
|
||||
window_handle=win_id,
|
||||
started_at=started_at,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Linux foreground probe failed: %s", e)
|
||||
return ForegroundInfo(available=False, error=str(e))
|
||||
|
||||
|
||||
def _enrich_browser(info: ForegroundInfo) -> ForegroundInfo:
|
||||
"""If ``info`` describes a focused browser, attach URL + page title.
|
||||
|
||||
The UIA lookup is wrapped in its own try/except so a failure here can't
|
||||
take down the rest of the foreground probe.
|
||||
"""
|
||||
try:
|
||||
from . import browser_url_service as bus
|
||||
except Exception as e:
|
||||
logger.debug("browser_url_service unavailable: %s", e)
|
||||
return info
|
||||
|
||||
if not info.available or not bus.is_browser_process(info.process_name):
|
||||
return info
|
||||
|
||||
try:
|
||||
page = bus.get_browser_page(
|
||||
hwnd=info.window_handle,
|
||||
process_name=info.process_name,
|
||||
window_title=info.window_title,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Browser URL enrichment failed: %s", e)
|
||||
return info
|
||||
|
||||
# ``dataclasses.replace`` keeps the frozen-dataclass contract.
|
||||
from dataclasses import replace
|
||||
return replace(
|
||||
info,
|
||||
is_browser=True,
|
||||
browser_url=page.url,
|
||||
browser_page_title=page.page_title,
|
||||
)
|
||||
|
||||
|
||||
def _probe() -> ForegroundInfo:
|
||||
system = platform.system()
|
||||
try:
|
||||
if system == "Windows":
|
||||
info = _probe_windows()
|
||||
elif system == "Darwin":
|
||||
info = _probe_macos()
|
||||
elif system == "Linux":
|
||||
info = _probe_linux()
|
||||
else:
|
||||
return ForegroundInfo(
|
||||
available=False, error=f"unsupported platform: {system}"
|
||||
)
|
||||
return _enrich_browser(info)
|
||||
except Exception as e:
|
||||
logger.warning("Foreground probe crashed: %s", e)
|
||||
return ForegroundInfo(available=False, error=str(e))
|
||||
|
||||
|
||||
def get_foreground_info(force_refresh: bool = False) -> ForegroundInfo:
|
||||
"""Return the current foreground window/process snapshot.
|
||||
|
||||
Args:
|
||||
force_refresh: bypass the short TTL cache. WebSocket broadcast loop
|
||||
should leave this False; the REST endpoint accepts ?refresh=1
|
||||
for callers that want a fresh probe.
|
||||
"""
|
||||
if force_refresh:
|
||||
_cache.invalidate()
|
||||
return _cache.get(_CACHE_TTL, _probe)
|
||||
|
||||
|
||||
def reset_cache() -> None:
|
||||
"""Reset the cache. Useful in tests."""
|
||||
_cache.invalidate()
|
||||
@@ -19,6 +19,9 @@ class ConnectionManager:
|
||||
self._active_connections: set[WebSocket] = set()
|
||||
self._lock = asyncio.Lock()
|
||||
self._last_status: dict[str, Any] | None = None
|
||||
self._last_foreground: dict[str, Any] | None = None
|
||||
self._foreground_poll_interval: float = 1.0
|
||||
self._last_foreground_poll: float = 0.0
|
||||
self._get_status_func: Callable[[], Coroutine[Any, Any, Any]] | None = None
|
||||
self._broadcast_task: asyncio.Task | None = None
|
||||
self._poll_interval: float = 0.5 # Internal poll interval for change detection
|
||||
@@ -54,6 +57,18 @@ class ConnectionManager:
|
||||
except Exception as e:
|
||||
logger.debug("Failed to send initial status: %s", e)
|
||||
|
||||
# Push a fresh foreground snapshot on connect so the UI can render
|
||||
# the tile immediately instead of waiting for the next change.
|
||||
try:
|
||||
from .foreground_service import get_foreground_info
|
||||
|
||||
fg = await asyncio.to_thread(get_foreground_info)
|
||||
fg_dict = fg.to_dict()
|
||||
self._last_foreground = fg_dict
|
||||
await websocket.send_json({"type": "foreground", "data": fg_dict})
|
||||
except Exception as e:
|
||||
logger.debug("Failed to send initial foreground snapshot: %s", e)
|
||||
|
||||
async def disconnect(self, websocket: WebSocket) -> None:
|
||||
"""Remove a WebSocket connection. Stops audio capture if last visualizer subscriber."""
|
||||
should_stop = False
|
||||
@@ -115,6 +130,35 @@ class ConnectionManager:
|
||||
await self.broadcast(message)
|
||||
logger.info("Broadcast sent: links_changed")
|
||||
|
||||
def foreground_changed(
|
||||
self, old: dict[str, Any] | None, new: dict[str, Any]
|
||||
) -> bool:
|
||||
"""Detect a meaningful change in the foreground process snapshot.
|
||||
|
||||
The probe also returns ``window_geometry`` which jitters on every
|
||||
pixel of cursor drag — comparing the whole dict would flood clients.
|
||||
We only diff the fields a user (or HA automation) would actually act
|
||||
on. ``window_geometry``/``monitor_geometry``/``started_at`` are still
|
||||
delivered in the payload, but they don't drive broadcast cadence.
|
||||
"""
|
||||
if old is None:
|
||||
return True
|
||||
diff_fields = (
|
||||
"pid",
|
||||
"process_name",
|
||||
"executable_path",
|
||||
"window_title",
|
||||
"is_fullscreen",
|
||||
"is_minimized",
|
||||
"monitor_id",
|
||||
"available",
|
||||
"error",
|
||||
)
|
||||
for f in diff_fields:
|
||||
if old.get(f) != new.get(f):
|
||||
return True
|
||||
return False
|
||||
|
||||
async def subscribe_visualizer(self, websocket: WebSocket) -> None:
|
||||
"""Subscribe a client to audio visualizer data. Starts capture on first subscriber."""
|
||||
should_start = False
|
||||
@@ -314,6 +358,10 @@ class ConnectionManager:
|
||||
get_status_func: Callable[[], Coroutine[Any, Any, Any]],
|
||||
) -> None:
|
||||
"""Background loop that polls for status changes and broadcasts."""
|
||||
# Foreground tracker is imported lazily so unit tests of the WS
|
||||
# manager don't drag in platform-specific probe code.
|
||||
from .foreground_service import get_foreground_info
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
# Only poll if we have connected clients
|
||||
@@ -340,6 +388,28 @@ class ConnectionManager:
|
||||
# Update cached status even without broadcast
|
||||
self._last_status = status_dict
|
||||
|
||||
# Foreground process — poll at a coarser interval than media
|
||||
# status. Broadcasts only fire on a real change, so a quiet
|
||||
# desktop costs nothing.
|
||||
now = time.time()
|
||||
if (
|
||||
now - self._last_foreground_poll
|
||||
) >= self._foreground_poll_interval:
|
||||
self._last_foreground_poll = now
|
||||
try:
|
||||
fg = await asyncio.to_thread(get_foreground_info)
|
||||
fg_dict = fg.to_dict()
|
||||
if self.foreground_changed(self._last_foreground, fg_dict):
|
||||
self._last_foreground = fg_dict
|
||||
await self.broadcast(
|
||||
{"type": "foreground_update", "data": fg_dict}
|
||||
)
|
||||
logger.debug("Broadcast sent: foreground change")
|
||||
else:
|
||||
self._last_foreground = fg_dict
|
||||
except Exception as e:
|
||||
logger.debug("Foreground poll failed: %s", e)
|
||||
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
|
||||
Reference in New Issue
Block a user