eeab9b2a26
Resolves the ruff I001 warning introduced by 61cdce9.
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
515 lines
19 KiB
Python
515 lines
19 KiB
Python
"""Foreground (topmost) window/process tracking.
|
|
|
|
Reports the process that currently owns the foreground window, plus useful
|
|
metadata (window title, executable path, monitor index, whether the window
|
|
covers a full monitor, process start time).
|
|
|
|
All probes happen behind a short TTL cache so the WebSocket status poll and
|
|
per-entity HA polls don't pay the OS call cost on every tick.
|
|
|
|
Windows uses the Win32 API via ``ctypes`` (no extra dependency) and falls back
|
|
gracefully when individual probes fail. Linux/macOS implementations are
|
|
best-effort and return ``available=False`` when the required tooling is
|
|
missing, so the rest of the stack keeps working.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import platform
|
|
import threading
|
|
import time
|
|
from dataclasses import asdict, dataclass, field
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_CACHE_TTL = 0.5 # seconds — fast enough for WebSocket broadcast loop
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ForegroundInfo:
|
|
"""Snapshot of the foreground window/process."""
|
|
|
|
available: bool
|
|
pid: int | None = None
|
|
process_name: str | None = None
|
|
executable_path: str | None = None
|
|
window_title: str | None = None
|
|
window_handle: int | None = None
|
|
is_fullscreen: bool = False
|
|
is_minimized: bool = False
|
|
monitor_id: int | None = None
|
|
monitor_geometry: dict[str, int] | None = None
|
|
window_geometry: dict[str, int] | None = None
|
|
started_at: float | None = None
|
|
platform: str = field(default_factory=lambda: platform.system())
|
|
error: str | None = None
|
|
# Populated only when the foreground process is a recognised web
|
|
# browser. ``browser_page_title`` is derived from the window title
|
|
# (suffix stripped); ``browser_url`` requires UIA to succeed.
|
|
is_browser: bool = False
|
|
browser_url: str | None = None
|
|
browser_page_title: str | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return asdict(self)
|
|
|
|
|
|
_UNAVAILABLE = ForegroundInfo(available=False)
|
|
|
|
|
|
class _Cache:
|
|
"""Single-slot TTL cache shared across callers."""
|
|
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._value: ForegroundInfo | None = None
|
|
self._fetched_at: float = 0.0
|
|
|
|
def get(self, ttl: float, fetch) -> ForegroundInfo:
|
|
with self._lock:
|
|
now = time.monotonic()
|
|
if self._value is not None and (now - self._fetched_at) < ttl:
|
|
return self._value
|
|
# Fetch outside the lock — OS calls can take tens of ms.
|
|
value = fetch()
|
|
with self._lock:
|
|
self._value = value
|
|
self._fetched_at = time.monotonic()
|
|
return value
|
|
|
|
def invalidate(self) -> None:
|
|
with self._lock:
|
|
self._value = None
|
|
self._fetched_at = 0.0
|
|
|
|
|
|
_cache = _Cache()
|
|
|
|
|
|
def _probe_windows() -> ForegroundInfo:
|
|
"""Probe foreground window state on Windows via Win32 API."""
|
|
import ctypes
|
|
import ctypes.wintypes as wt
|
|
|
|
user32 = ctypes.WinDLL("user32", use_last_error=True)
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
|
psapi = ctypes.WinDLL("psapi", use_last_error=True)
|
|
|
|
# CRITICAL: declare argtypes/restype on every Win32 call that returns a
|
|
# HANDLE/HWND/HMONITOR. ctypes defaults to `c_int` (32-bit) which
|
|
# silently truncates 64-bit pointer values on x64 — that corrupts the
|
|
# handle so `CloseHandle()` can either fail or close the wrong kernel
|
|
# object, and pointer-equality comparisons (monitor index lookup) miss.
|
|
user32.GetForegroundWindow.restype = wt.HWND
|
|
user32.GetWindowThreadProcessId.argtypes = [wt.HWND, ctypes.POINTER(wt.DWORD)]
|
|
user32.GetWindowThreadProcessId.restype = wt.DWORD
|
|
user32.GetWindowTextLengthW.argtypes = [wt.HWND]
|
|
user32.GetWindowTextLengthW.restype = ctypes.c_int
|
|
user32.GetWindowTextW.argtypes = [wt.HWND, wt.LPWSTR, ctypes.c_int]
|
|
user32.GetWindowTextW.restype = ctypes.c_int
|
|
user32.IsIconic.argtypes = [wt.HWND]
|
|
user32.IsIconic.restype = wt.BOOL
|
|
user32.GetWindowRect.argtypes = [wt.HWND, ctypes.POINTER(wt.RECT)]
|
|
user32.GetWindowRect.restype = wt.BOOL
|
|
user32.MonitorFromWindow.argtypes = [wt.HWND, wt.DWORD]
|
|
user32.MonitorFromWindow.restype = wt.HMONITOR
|
|
user32.GetMonitorInfoW.argtypes = [wt.HMONITOR, ctypes.c_void_p]
|
|
user32.GetMonitorInfoW.restype = wt.BOOL
|
|
|
|
kernel32.OpenProcess.argtypes = [wt.DWORD, wt.BOOL, wt.DWORD]
|
|
kernel32.OpenProcess.restype = wt.HANDLE
|
|
kernel32.CloseHandle.argtypes = [wt.HANDLE]
|
|
kernel32.CloseHandle.restype = wt.BOOL
|
|
kernel32.QueryFullProcessImageNameW.argtypes = [
|
|
wt.HANDLE, wt.DWORD, wt.LPWSTR, ctypes.POINTER(wt.DWORD)
|
|
]
|
|
kernel32.QueryFullProcessImageNameW.restype = wt.BOOL
|
|
kernel32.GetProcessTimes.argtypes = [
|
|
wt.HANDLE,
|
|
ctypes.POINTER(wt.FILETIME),
|
|
ctypes.POINTER(wt.FILETIME),
|
|
ctypes.POINTER(wt.FILETIME),
|
|
ctypes.POINTER(wt.FILETIME),
|
|
]
|
|
kernel32.GetProcessTimes.restype = wt.BOOL
|
|
|
|
psapi.GetModuleFileNameExW.argtypes = [wt.HANDLE, wt.HMODULE, wt.LPWSTR, wt.DWORD]
|
|
psapi.GetModuleFileNameExW.restype = wt.DWORD
|
|
|
|
hwnd = user32.GetForegroundWindow()
|
|
if not hwnd:
|
|
return ForegroundInfo(available=True, error="no foreground window")
|
|
|
|
# PID + window thread.
|
|
pid = wt.DWORD(0)
|
|
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
|
|
pid_val = int(pid.value) if pid.value else None
|
|
|
|
# Window title — Unicode.
|
|
length = user32.GetWindowTextLengthW(hwnd)
|
|
title_buf = ctypes.create_unicode_buffer(length + 1)
|
|
user32.GetWindowTextW(hwnd, title_buf, length + 1)
|
|
window_title = title_buf.value or None
|
|
|
|
# Minimized flag.
|
|
is_minimized = bool(user32.IsIconic(hwnd))
|
|
|
|
# Window rect (screen coords).
|
|
rect = wt.RECT()
|
|
window_geometry: dict[str, int] | None = None
|
|
if user32.GetWindowRect(hwnd, ctypes.byref(rect)):
|
|
window_geometry = {
|
|
"left": int(rect.left),
|
|
"top": int(rect.top),
|
|
"right": int(rect.right),
|
|
"bottom": int(rect.bottom),
|
|
"width": int(rect.right - rect.left),
|
|
"height": int(rect.bottom - rect.top),
|
|
}
|
|
|
|
# Monitor under the window + its geometry.
|
|
monitor_geometry: dict[str, int] | None = None
|
|
monitor_id: int | None = None
|
|
is_fullscreen = False
|
|
try:
|
|
MONITOR_DEFAULTTONEAREST = 2
|
|
|
|
class MONITORINFO(ctypes.Structure):
|
|
_fields_ = [
|
|
("cbSize", wt.DWORD),
|
|
("rcMonitor", wt.RECT),
|
|
("rcWork", wt.RECT),
|
|
("dwFlags", wt.DWORD),
|
|
]
|
|
|
|
hmon = user32.MonitorFromWindow(hwnd, MONITOR_DEFAULTTONEAREST)
|
|
if hmon:
|
|
mi = MONITORINFO()
|
|
mi.cbSize = ctypes.sizeof(mi)
|
|
if user32.GetMonitorInfoW(hmon, ctypes.byref(mi)):
|
|
monitor_geometry = {
|
|
"left": int(mi.rcMonitor.left),
|
|
"top": int(mi.rcMonitor.top),
|
|
"right": int(mi.rcMonitor.right),
|
|
"bottom": int(mi.rcMonitor.bottom),
|
|
"width": int(mi.rcMonitor.right - mi.rcMonitor.left),
|
|
"height": int(mi.rcMonitor.bottom - mi.rcMonitor.top),
|
|
}
|
|
# Fullscreen heuristic: window rect equals monitor rect AND
|
|
# not minimized. Many media players (VLC, browser fullscreen)
|
|
# set themselves to exactly the monitor bounds.
|
|
if window_geometry and not is_minimized:
|
|
is_fullscreen = (
|
|
window_geometry["left"] == monitor_geometry["left"]
|
|
and window_geometry["top"] == monitor_geometry["top"]
|
|
and window_geometry["right"] == monitor_geometry["right"]
|
|
and window_geometry["bottom"] == monitor_geometry["bottom"]
|
|
)
|
|
|
|
# Resolve monitor index by enumerating displays in order. Coerce
|
|
# both the foreground hmon and the per-enum hmon to int so the
|
|
# equality compare uses 64-bit values consistently regardless of
|
|
# how ctypes represents the handle internally.
|
|
try:
|
|
indexed: list[int] = []
|
|
|
|
def _cb(hm, _hdc, _rect, _data):
|
|
indexed.append(int(hm) if hm else 0)
|
|
return True
|
|
|
|
MONITORENUMPROC = ctypes.WINFUNCTYPE(
|
|
ctypes.c_int,
|
|
wt.HMONITOR,
|
|
wt.HDC,
|
|
ctypes.POINTER(wt.RECT),
|
|
wt.LPARAM,
|
|
)
|
|
user32.EnumDisplayMonitors.argtypes = [
|
|
wt.HDC, ctypes.POINTER(wt.RECT), MONITORENUMPROC, wt.LPARAM
|
|
]
|
|
user32.EnumDisplayMonitors.restype = wt.BOOL
|
|
user32.EnumDisplayMonitors(None, None, MONITORENUMPROC(_cb), 0)
|
|
target = int(hmon) if hmon else 0
|
|
if target and target in indexed:
|
|
monitor_id = indexed.index(target)
|
|
except Exception as e:
|
|
logger.debug("Monitor index resolution failed: %s", e)
|
|
except Exception as e:
|
|
logger.debug("Monitor info probe failed: %s", e)
|
|
|
|
# Process executable path + start time.
|
|
executable_path: str | None = None
|
|
process_name: str | None = None
|
|
started_at: float | None = None
|
|
if pid_val:
|
|
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
|
|
h_proc = kernel32.OpenProcess(
|
|
PROCESS_QUERY_LIMITED_INFORMATION, False, pid_val
|
|
)
|
|
if h_proc:
|
|
try:
|
|
# Image filename — full path. QueryFullProcessImageNameW works
|
|
# across 32/64-bit boundaries, unlike GetModuleFileNameExW.
|
|
buf = ctypes.create_unicode_buffer(1024)
|
|
size = wt.DWORD(len(buf))
|
|
if kernel32.QueryFullProcessImageNameW(
|
|
h_proc, 0, buf, ctypes.byref(size)
|
|
):
|
|
executable_path = buf.value or None
|
|
else:
|
|
# Fallback via psapi. Return value is the length copied
|
|
# into the buffer (0 on failure); ignoring it would leave
|
|
# `executable_path` as an empty string from the freshly
|
|
# allocated buffer instead of None.
|
|
written = psapi.GetModuleFileNameExW(h_proc, None, buf, len(buf))
|
|
if written:
|
|
executable_path = buf.value or None
|
|
else:
|
|
logger.debug(
|
|
"QueryFullProcessImageNameW + psapi fallback both "
|
|
"failed for pid=%s (err=%d)",
|
|
pid_val,
|
|
ctypes.get_last_error(),
|
|
)
|
|
|
|
if executable_path:
|
|
import os
|
|
process_name = os.path.basename(executable_path)
|
|
|
|
# Process creation time (FILETIME, 100ns ticks since 1601).
|
|
creation = wt.FILETIME()
|
|
exit_t = wt.FILETIME()
|
|
kernel_t = wt.FILETIME()
|
|
user_t = wt.FILETIME()
|
|
if kernel32.GetProcessTimes(
|
|
h_proc,
|
|
ctypes.byref(creation),
|
|
ctypes.byref(exit_t),
|
|
ctypes.byref(kernel_t),
|
|
ctypes.byref(user_t),
|
|
):
|
|
ticks = (creation.dwHighDateTime << 32) | creation.dwLowDateTime
|
|
# Convert to Unix epoch seconds (1601-01-01 → 1970-01-01).
|
|
if ticks:
|
|
started_at = (ticks - 116444736000000000) / 10_000_000
|
|
finally:
|
|
kernel32.CloseHandle(h_proc)
|
|
|
|
return ForegroundInfo(
|
|
available=True,
|
|
pid=pid_val,
|
|
process_name=process_name,
|
|
executable_path=executable_path,
|
|
window_title=window_title,
|
|
window_handle=int(hwnd) if hwnd else None,
|
|
is_fullscreen=is_fullscreen,
|
|
is_minimized=is_minimized,
|
|
monitor_id=monitor_id,
|
|
monitor_geometry=monitor_geometry,
|
|
window_geometry=window_geometry,
|
|
started_at=started_at,
|
|
)
|
|
|
|
|
|
def _probe_macos() -> ForegroundInfo:
|
|
"""Best-effort probe on macOS via AppKit (PyObjC).
|
|
|
|
Returns ``available=False`` when PyObjC is not installed — we don't take
|
|
a hard dependency on it because the typical macOS install path uses pip
|
|
+ the standalone wheel.
|
|
"""
|
|
try:
|
|
from AppKit import NSWorkspace # type: ignore
|
|
from Quartz import ( # type: ignore
|
|
CGWindowListCopyWindowInfo,
|
|
kCGNullWindowID,
|
|
kCGWindowListOptionOnScreenOnly,
|
|
)
|
|
except Exception:
|
|
return ForegroundInfo(available=False, error="AppKit/Quartz not available")
|
|
|
|
try:
|
|
ws = NSWorkspace.sharedWorkspace()
|
|
app = ws.frontmostApplication()
|
|
if app is None:
|
|
return ForegroundInfo(available=True, error="no frontmost app")
|
|
|
|
pid = int(app.processIdentifier())
|
|
process_name = str(app.localizedName() or "")
|
|
bundle_url = app.bundleURL()
|
|
executable_path = str(bundle_url.path()) if bundle_url else None
|
|
started_at = None
|
|
launch_date = app.launchDate()
|
|
if launch_date is not None:
|
|
started_at = float(launch_date.timeIntervalSince1970())
|
|
|
|
# Window title — frontmost on-screen window owned by this PID.
|
|
window_title: str | None = None
|
|
try:
|
|
windows = CGWindowListCopyWindowInfo(
|
|
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
|
|
)
|
|
for w in windows or []:
|
|
if int(w.get("kCGWindowOwnerPID", -1)) == pid:
|
|
name = w.get("kCGWindowName")
|
|
if name:
|
|
window_title = str(name)
|
|
break
|
|
except Exception as e:
|
|
logger.debug("CGWindowListCopyWindowInfo failed: %s", e)
|
|
|
|
return ForegroundInfo(
|
|
available=True,
|
|
pid=pid,
|
|
process_name=process_name,
|
|
executable_path=executable_path,
|
|
window_title=window_title,
|
|
started_at=started_at,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("macOS foreground probe failed: %s", e)
|
|
return ForegroundInfo(available=False, error=str(e))
|
|
|
|
|
|
def _probe_linux() -> ForegroundInfo:
|
|
"""Best-effort probe on Linux via Xlib (X11 only).
|
|
|
|
Wayland sessions intentionally hide window/process info from unprivileged
|
|
clients, so this returns ``available=False`` on Wayland. The caller still
|
|
gets a structured response and can render "unavailable" in the UI.
|
|
"""
|
|
import os
|
|
|
|
if os.environ.get("WAYLAND_DISPLAY"):
|
|
return ForegroundInfo(
|
|
available=False, error="Wayland session — foreground probe unavailable"
|
|
)
|
|
|
|
try:
|
|
from Xlib import X, display # type: ignore # noqa: F401
|
|
except Exception:
|
|
return ForegroundInfo(available=False, error="python-xlib not installed")
|
|
|
|
try:
|
|
d = display.Display()
|
|
root = d.screen().root
|
|
NET_ACTIVE_WINDOW = d.intern_atom("_NET_ACTIVE_WINDOW")
|
|
NET_WM_PID = d.intern_atom("_NET_WM_PID")
|
|
NET_WM_NAME = d.intern_atom("_NET_WM_NAME")
|
|
UTF8_STRING = d.intern_atom("UTF8_STRING")
|
|
|
|
active = root.get_full_property(NET_ACTIVE_WINDOW, X.AnyPropertyType)
|
|
if not active or not active.value:
|
|
return ForegroundInfo(available=True, error="no active window")
|
|
win_id = int(active.value[0])
|
|
win = d.create_resource_object("window", win_id)
|
|
|
|
pid_prop = win.get_full_property(NET_WM_PID, X.AnyPropertyType)
|
|
pid_val = int(pid_prop.value[0]) if pid_prop and pid_prop.value else None
|
|
|
|
name_prop = win.get_full_property(NET_WM_NAME, UTF8_STRING)
|
|
window_title = (
|
|
name_prop.value.decode("utf-8", "replace") if name_prop and name_prop.value else None
|
|
)
|
|
|
|
process_name: str | None = None
|
|
executable_path: str | None = None
|
|
started_at: float | None = None
|
|
if pid_val:
|
|
try:
|
|
exe = os.readlink(f"/proc/{pid_val}/exe")
|
|
executable_path = exe
|
|
process_name = os.path.basename(exe)
|
|
except OSError as e:
|
|
logger.debug("readlink /proc/%d/exe failed: %s", pid_val, e)
|
|
try:
|
|
started_at = os.stat(f"/proc/{pid_val}").st_ctime
|
|
except OSError as e:
|
|
logger.debug("stat /proc/%d failed: %s", pid_val, e)
|
|
|
|
return ForegroundInfo(
|
|
available=True,
|
|
pid=pid_val,
|
|
process_name=process_name,
|
|
executable_path=executable_path,
|
|
window_title=window_title,
|
|
window_handle=win_id,
|
|
started_at=started_at,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Linux foreground probe failed: %s", e)
|
|
return ForegroundInfo(available=False, error=str(e))
|
|
|
|
|
|
def _enrich_browser(info: ForegroundInfo) -> ForegroundInfo:
|
|
"""If ``info`` describes a focused browser, attach URL + page title.
|
|
|
|
The UIA lookup is wrapped in its own try/except so a failure here can't
|
|
take down the rest of the foreground probe.
|
|
"""
|
|
try:
|
|
from . import browser_url_service as bus
|
|
except Exception as e:
|
|
logger.debug("browser_url_service unavailable: %s", e)
|
|
return info
|
|
|
|
if not info.available or not bus.is_browser_process(info.process_name):
|
|
return info
|
|
|
|
try:
|
|
page = bus.get_browser_page(
|
|
hwnd=info.window_handle,
|
|
process_name=info.process_name,
|
|
window_title=info.window_title,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Browser URL enrichment failed: %s", e)
|
|
return info
|
|
|
|
# ``dataclasses.replace`` keeps the frozen-dataclass contract.
|
|
from dataclasses import replace
|
|
return replace(
|
|
info,
|
|
is_browser=True,
|
|
browser_url=page.url,
|
|
browser_page_title=page.page_title,
|
|
)
|
|
|
|
|
|
def _probe() -> ForegroundInfo:
|
|
system = platform.system()
|
|
try:
|
|
if system == "Windows":
|
|
info = _probe_windows()
|
|
elif system == "Darwin":
|
|
info = _probe_macos()
|
|
elif system == "Linux":
|
|
info = _probe_linux()
|
|
else:
|
|
return ForegroundInfo(
|
|
available=False, error=f"unsupported platform: {system}"
|
|
)
|
|
return _enrich_browser(info)
|
|
except Exception as e:
|
|
logger.warning("Foreground probe crashed: %s", e)
|
|
return ForegroundInfo(available=False, error=str(e))
|
|
|
|
|
|
def get_foreground_info(force_refresh: bool = False) -> ForegroundInfo:
|
|
"""Return the current foreground window/process snapshot.
|
|
|
|
Args:
|
|
force_refresh: bypass the short TTL cache. WebSocket broadcast loop
|
|
should leave this False; the REST endpoint accepts ?refresh=1
|
|
for callers that want a fresh probe.
|
|
"""
|
|
if force_refresh:
|
|
_cache.invalidate()
|
|
return _cache.get(_CACHE_TTL, _probe)
|
|
|
|
|
|
def reset_cache() -> None:
|
|
"""Reset the cache. Useful in tests."""
|
|
_cache.invalidate()
|