Rename profiles to automations across backend and frontend

Rename the "profiles" entity to "automations" throughout the entire
codebase for clarity. Updates Python models, storage, API routes/schemas,
engine, frontend JS modules, HTML templates, CSS classes, i18n keys
(en/ru/zh), dashboard, tutorials, and command palette.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 18:01:39 +03:00
parent da3e53e1f1
commit 21248e2dc9
39 changed files with 1180 additions and 1179 deletions
@@ -0,0 +1 @@
"""Automation engine — condition evaluation and scene activation."""
@@ -0,0 +1,433 @@
"""Automation engine — background loop that evaluates conditions and activates scenes."""
import asyncio
import re
from datetime import datetime, timezone
from typing import Dict, List, Optional, Set
from wled_controller.core.automations.platform_detector import PlatformDetector
from wled_controller.storage.automation import (
AlwaysCondition,
ApplicationCondition,
Automation,
Condition,
DisplayStateCondition,
MQTTCondition,
SystemIdleCondition,
TimeOfDayCondition,
)
from wled_controller.storage.automation_store import AutomationStore
from wled_controller.storage.scene_preset import ScenePreset
from wled_controller.utils import get_logger
logger = get_logger(__name__)
class AutomationEngine:
"""Evaluates automation conditions and activates/deactivates scene presets."""
def __init__(
self,
automation_store: AutomationStore,
processor_manager,
poll_interval: float = 1.0,
mqtt_service=None,
scene_preset_store=None,
target_store=None,
device_store=None,
):
self._store = automation_store
self._manager = processor_manager
self._poll_interval = poll_interval
self._detector = PlatformDetector()
self._mqtt_service = mqtt_service
self._scene_preset_store = scene_preset_store
self._target_store = target_store
self._device_store = device_store
self._task: Optional[asyncio.Task] = None
self._eval_lock = asyncio.Lock()
# Runtime state (not persisted)
# automation_id → True when automation is currently active
self._active_automations: Dict[str, bool] = {}
# automation_id → snapshot captured before activation (for "revert" mode)
self._pre_activation_snapshots: Dict[str, ScenePreset] = {}
# automation_id → datetime of last activation / deactivation
self._last_activated: Dict[str, datetime] = {}
self._last_deactivated: Dict[str, datetime] = {}
async def start(self) -> None:
if self._task is not None:
return
self._task = asyncio.create_task(self._poll_loop())
logger.info("Automation engine started")
async def stop(self) -> None:
if self._task is None:
return
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
# Deactivate all automations
for automation_id in list(self._active_automations.keys()):
await self._deactivate_automation(automation_id)
logger.info("Automation engine stopped")
async def _poll_loop(self) -> None:
try:
while True:
try:
await self._evaluate_all()
except Exception as e:
logger.error(f"Automation evaluation error: {e}", exc_info=True)
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
pass
async def _evaluate_all(self) -> None:
async with self._eval_lock:
await self._evaluate_all_locked()
def _detect_all_sync(
self, needs_running: bool, needs_topmost: bool, needs_fullscreen: bool,
needs_idle: bool, needs_display_state: bool,
) -> tuple:
"""Run all platform detection in a single thread call.
Batching detection calls into one executor submission reduces
event-loop wake-ups, minimising asyncio.sleep() jitter in
latency-sensitive processing loops.
"""
running_procs = self._detector._get_running_processes_sync() if needs_running else set()
if needs_topmost:
topmost_proc, topmost_fullscreen = self._detector._get_topmost_process_sync()
else:
topmost_proc, topmost_fullscreen = None, False
fullscreen_procs = self._detector._get_fullscreen_processes_sync() if needs_fullscreen else set()
idle_seconds = self._detector._get_idle_seconds_sync() if needs_idle else None
display_state = self._detector._get_display_power_state_sync() if needs_display_state else None
return running_procs, topmost_proc, topmost_fullscreen, fullscreen_procs, idle_seconds, display_state
async def _evaluate_all_locked(self) -> None:
automations = self._store.get_all_automations()
if not automations:
# No automations — deactivate any stale state
for aid in list(self._active_automations.keys()):
await self._deactivate_automation(aid)
return
# Determine which detection methods are actually needed
match_types_used: set = set()
needs_idle = False
needs_display_state = False
for a in automations:
if a.enabled:
for c in a.conditions:
if isinstance(c, ApplicationCondition):
match_types_used.add(c.match_type)
elif isinstance(c, SystemIdleCondition):
needs_idle = True
elif isinstance(c, DisplayStateCondition):
needs_display_state = True
needs_running = "running" in match_types_used
needs_topmost = bool(match_types_used & {"topmost", "topmost_fullscreen"})
needs_fullscreen = "fullscreen" in match_types_used
# Single executor call for all platform detection
loop = asyncio.get_event_loop()
(running_procs, topmost_proc, topmost_fullscreen,
fullscreen_procs, idle_seconds, display_state) = (
await loop.run_in_executor(
None, self._detect_all_sync,
needs_running, needs_topmost, needs_fullscreen,
needs_idle, needs_display_state,
)
)
active_automation_ids = set()
for automation in automations:
should_be_active = (
automation.enabled
and (len(automation.conditions) == 0
or self._evaluate_conditions(
automation, running_procs, topmost_proc, topmost_fullscreen,
fullscreen_procs, idle_seconds, display_state))
)
is_active = automation.id in self._active_automations
if should_be_active and not is_active:
await self._activate_automation(automation)
active_automation_ids.add(automation.id)
elif should_be_active and is_active:
active_automation_ids.add(automation.id)
elif not should_be_active and is_active:
await self._deactivate_automation(automation.id)
# Deactivate automations that were removed from store while active
for aid in list(self._active_automations.keys()):
if aid not in active_automation_ids:
await self._deactivate_automation(aid)
def _evaluate_conditions(
self, automation: Automation, running_procs: Set[str],
topmost_proc: Optional[str], topmost_fullscreen: bool,
fullscreen_procs: Set[str],
idle_seconds: Optional[float], display_state: Optional[str],
) -> bool:
results = [
self._evaluate_condition(
c, running_procs, topmost_proc, topmost_fullscreen,
fullscreen_procs, idle_seconds, display_state,
)
for c in automation.conditions
]
if automation.condition_logic == "and":
return all(results)
return any(results) # "or" is default
def _evaluate_condition(
self, condition: Condition, running_procs: Set[str],
topmost_proc: Optional[str], topmost_fullscreen: bool,
fullscreen_procs: Set[str],
idle_seconds: Optional[float], display_state: Optional[str],
) -> bool:
if isinstance(condition, AlwaysCondition):
return True
if isinstance(condition, ApplicationCondition):
return self._evaluate_app_condition(condition, running_procs, topmost_proc, topmost_fullscreen, fullscreen_procs)
if isinstance(condition, TimeOfDayCondition):
return self._evaluate_time_of_day(condition)
if isinstance(condition, SystemIdleCondition):
return self._evaluate_idle(condition, idle_seconds)
if isinstance(condition, DisplayStateCondition):
return self._evaluate_display_state(condition, display_state)
if isinstance(condition, MQTTCondition):
return self._evaluate_mqtt(condition)
return False
@staticmethod
def _evaluate_time_of_day(condition: TimeOfDayCondition) -> bool:
now = datetime.now()
current = now.hour * 60 + now.minute
parts_s = condition.start_time.split(":")
parts_e = condition.end_time.split(":")
start = int(parts_s[0]) * 60 + int(parts_s[1])
end = int(parts_e[0]) * 60 + int(parts_e[1])
if start <= end:
return start <= current <= end
# Overnight range (e.g. 22:00 → 06:00)
return current >= start or current <= end
@staticmethod
def _evaluate_idle(condition: SystemIdleCondition, idle_seconds: Optional[float]) -> bool:
if idle_seconds is None:
return False
is_idle = idle_seconds >= (condition.idle_minutes * 60)
return is_idle if condition.when_idle else not is_idle
@staticmethod
def _evaluate_display_state(condition: DisplayStateCondition, display_state: Optional[str]) -> bool:
if display_state is None:
return False
return display_state == condition.state
def _evaluate_mqtt(self, condition: MQTTCondition) -> bool:
if self._mqtt_service is None or not self._mqtt_service.is_connected:
return False
value = self._mqtt_service.get_last_value(condition.topic)
if value is None:
return False
if condition.match_mode == "exact":
return value == condition.payload
if condition.match_mode == "contains":
return condition.payload in value
if condition.match_mode == "regex":
try:
return bool(re.search(condition.payload, value))
except re.error:
return False
return False
def _evaluate_app_condition(
self,
condition: ApplicationCondition,
running_procs: Set[str],
topmost_proc: Optional[str],
topmost_fullscreen: bool,
fullscreen_procs: Set[str],
) -> bool:
if not condition.apps:
return False
apps_lower = [a.lower() for a in condition.apps]
if condition.match_type == "fullscreen":
return any(app in fullscreen_procs for app in apps_lower)
if condition.match_type == "topmost_fullscreen":
if topmost_proc is None or not topmost_fullscreen:
return False
return any(app == topmost_proc for app in apps_lower)
if condition.match_type == "topmost":
if topmost_proc is None:
return False
return any(app == topmost_proc for app in apps_lower)
# Default: "running"
return any(app in running_procs for app in apps_lower)
async def _activate_automation(self, automation: Automation) -> None:
if not automation.scene_preset_id:
# No scene configured — just mark active (conditions matched but nothing to do)
self._active_automations[automation.id] = True
self._last_activated[automation.id] = datetime.now(timezone.utc)
self._fire_event(automation.id, "activated")
logger.info(f"Automation '{automation.name}' activated (no scene configured)")
return
if not self._scene_preset_store or not self._target_store or not self._device_store:
logger.warning(f"Automation '{automation.name}' matched but scene stores not available")
return
# Load the scene preset
try:
preset = self._scene_preset_store.get_preset(automation.scene_preset_id)
except ValueError:
logger.warning(f"Automation '{automation.name}': scene preset {automation.scene_preset_id} not found")
return
# For "revert" mode, capture current state before activating
if automation.deactivation_mode == "revert":
from wled_controller.core.scenes.scene_activator import capture_current_snapshot
targets, devices, automations = capture_current_snapshot(
self._target_store, self._device_store, self._store, self._manager,
)
self._pre_activation_snapshots[automation.id] = ScenePreset(
id=f"_revert_{automation.id}",
name=f"Pre-activation snapshot for {automation.name}",
targets=targets,
devices=devices,
profiles=automations,
)
# Apply the scene
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
preset, self._target_store, self._device_store, self._store,
self, self._manager, skip_automations=True,
)
self._active_automations[automation.id] = True
self._last_activated[automation.id] = datetime.now(timezone.utc)
self._fire_event(automation.id, "activated")
if errors:
logger.warning(f"Automation '{automation.name}' activated with errors: {errors}")
else:
logger.info(f"Automation '{automation.name}' activated (scene '{preset.name}' applied)")
async def _deactivate_automation(self, automation_id: str) -> None:
was_active = self._active_automations.pop(automation_id, False)
if not was_active:
return
# Look up the automation for deactivation settings
try:
automation = self._store.get_automation(automation_id)
except ValueError:
automation = None
deactivation_mode = automation.deactivation_mode if automation else "none"
if deactivation_mode == "revert":
snapshot = self._pre_activation_snapshots.pop(automation_id, None)
if snapshot and self._target_store and self._device_store:
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
snapshot, self._target_store, self._device_store, self._store,
self, self._manager, skip_automations=True,
)
if errors:
logger.warning(f"Automation {automation_id} revert errors: {errors}")
else:
logger.info(f"Automation {automation_id} deactivated (reverted to previous state)")
else:
logger.warning(f"Automation {automation_id}: no snapshot available for revert")
elif deactivation_mode == "fallback_scene":
fallback_id = automation.deactivation_scene_preset_id if automation else None
if fallback_id and self._scene_preset_store and self._target_store and self._device_store:
try:
fallback = self._scene_preset_store.get_preset(fallback_id)
from wled_controller.core.scenes.scene_activator import apply_scene_state
status, errors = await apply_scene_state(
fallback, self._target_store, self._device_store, self._store,
self, self._manager, skip_automations=True,
)
if errors:
logger.warning(f"Automation {automation_id} fallback errors: {errors}")
else:
logger.info(f"Automation {automation_id} deactivated (fallback scene '{fallback.name}' applied)")
except ValueError:
logger.warning(f"Automation {automation_id}: fallback scene {fallback_id} not found")
else:
logger.info(f"Automation {automation_id} deactivated (no fallback scene configured)")
else:
# "none" mode — just clear active state
logger.info(f"Automation {automation_id} deactivated")
self._last_deactivated[automation_id] = datetime.now(timezone.utc)
self._fire_event(automation_id, "deactivated")
# Clean up any leftover snapshot
self._pre_activation_snapshots.pop(automation_id, None)
def _fire_event(self, automation_id: str, action: str) -> None:
try:
self._manager._fire_event({
"type": "automation_state_changed",
"automation_id": automation_id,
"action": action,
})
except Exception:
pass
# ===== Public query methods (used by API) =====
def get_automation_state(self, automation_id: str) -> dict:
"""Get runtime state of a single automation."""
is_active = automation_id in self._active_automations
return {
"is_active": is_active,
"last_activated_at": self._last_activated.get(automation_id),
"last_deactivated_at": self._last_deactivated.get(automation_id),
}
def get_all_automation_states(self) -> Dict[str, dict]:
"""Get runtime states of all automations."""
result = {}
for automation in self._store.get_all_automations():
result[automation.id] = self.get_automation_state(automation.id)
return result
async def trigger_evaluate(self) -> None:
"""Run a single evaluation cycle immediately (used after enabling an automation)."""
try:
await self._evaluate_all()
except Exception as e:
logger.error(f"Immediate automation evaluation error: {e}", exc_info=True)
async def deactivate_if_active(self, automation_id: str) -> None:
"""Deactivate an automation immediately (used when disabling/deleting)."""
if automation_id in self._active_automations:
await self._deactivate_automation(automation_id)
@@ -0,0 +1,410 @@
"""Platform-specific process and window detection.
Windows: uses wmi for process listing, ctypes for foreground window detection.
Non-Windows: graceful degradation (returns empty results).
"""
import asyncio
import ctypes
import ctypes.wintypes
import os
import sys
import threading
from typing import Optional, Set
from wled_controller.utils import get_logger
logger = get_logger(__name__)
_IS_WINDOWS = sys.platform == "win32"
class PlatformDetector:
"""Detect running processes and the foreground window's process."""
def __init__(self) -> None:
self._display_on: bool = True
self._display_listener_started = False
if _IS_WINDOWS:
t = threading.Thread(target=self._display_power_listener, daemon=True)
t.start()
# ---- Display power state (event-driven) ----
def _display_power_listener(self) -> None:
"""Background thread: hidden window that receives display power events."""
try:
user32 = ctypes.windll.user32
WNDPROC = ctypes.WINFUNCTYPE(
ctypes.c_long,
ctypes.wintypes.HWND,
ctypes.c_uint,
ctypes.wintypes.WPARAM,
ctypes.wintypes.LPARAM,
)
WM_POWERBROADCAST = 0x0218
PBT_POWERSETTINGCHANGE = 0x8013
class POWERBROADCAST_SETTING(ctypes.Structure):
_fields_ = [
("PowerSetting", ctypes.c_ubyte * 16), # GUID
("DataLength", ctypes.wintypes.DWORD),
("Data", ctypes.c_ubyte * 1),
]
# GUID_CONSOLE_DISPLAY_STATE = {6FE69556-704A-47A0-8F24-C28D936FDA47}
GUID_CONSOLE_DISPLAY_STATE = (ctypes.c_ubyte * 16)(
0x56, 0x95, 0xE6, 0x6F, 0x4A, 0x70, 0xA0, 0x47,
0x8F, 0x24, 0xC2, 0x8D, 0x93, 0x6F, 0xDA, 0x47,
)
def wnd_proc(hwnd, msg, wparam, lparam):
if msg == WM_POWERBROADCAST and wparam == PBT_POWERSETTINGCHANGE:
try:
setting = ctypes.cast(
lparam, ctypes.POINTER(POWERBROADCAST_SETTING)
).contents
# Data: 0=off, 1=on, 2=dimmed (treat dimmed as on)
value = setting.Data[0]
self._display_on = value != 0
except Exception:
pass
return 0
return user32.DefWindowProcW(hwnd, msg, wparam, lparam)
wnd_proc_cb = WNDPROC(wnd_proc)
# Register window class
class WNDCLASSEXW(ctypes.Structure):
_fields_ = [
("cbSize", ctypes.c_uint),
("style", ctypes.c_uint),
("lpfnWndProc", WNDPROC),
("cbClsExtra", ctypes.c_int),
("cbWndExtra", ctypes.c_int),
("hInstance", ctypes.wintypes.HINSTANCE),
("hIcon", ctypes.wintypes.HICON),
("hCursor", ctypes.wintypes.HANDLE),
("hbrBackground", ctypes.wintypes.HBRUSH),
("lpszMenuName", ctypes.wintypes.LPCWSTR),
("lpszClassName", ctypes.wintypes.LPCWSTR),
("hIconSm", ctypes.wintypes.HICON),
]
wc = WNDCLASSEXW()
wc.cbSize = ctypes.sizeof(WNDCLASSEXW)
wc.lpfnWndProc = wnd_proc_cb
wc.lpszClassName = "LedGrabDisplayMonitor"
wc.hInstance = ctypes.windll.kernel32.GetModuleHandleW(None)
atom = user32.RegisterClassExW(ctypes.byref(wc))
if not atom:
logger.warning("Failed to register display monitor window class")
return
HWND_MESSAGE = ctypes.wintypes.HWND(-3)
hwnd = user32.CreateWindowExW(
0, wc.lpszClassName, "LedGrab Display Monitor",
0, 0, 0, 0, 0, HWND_MESSAGE, None, wc.hInstance, None,
)
if not hwnd:
logger.warning("Failed to create display monitor hidden window")
return
# Register for display power notifications
user32.RegisterPowerSettingNotification(
hwnd, ctypes.byref(GUID_CONSOLE_DISPLAY_STATE), 0
)
self._display_listener_started = True
logger.debug("Display power listener started")
# Message pump
msg = ctypes.wintypes.MSG()
while user32.GetMessageW(ctypes.byref(msg), None, 0, 0) > 0:
user32.TranslateMessage(ctypes.byref(msg))
user32.DispatchMessageW(ctypes.byref(msg))
except Exception as e:
logger.error(f"Display power listener failed: {e}")
def _get_display_power_state_sync(self) -> Optional[str]:
"""Get display power state: 'on' or 'off'. Returns None if unavailable."""
if not _IS_WINDOWS:
return None
return "on" if self._display_on else "off"
# ---- System idle detection ----
def _get_idle_seconds_sync(self) -> Optional[float]:
"""Get system idle time in seconds (keyboard/mouse inactivity).
Returns None if detection is unavailable.
"""
if not _IS_WINDOWS:
return None
try:
class LASTINPUTINFO(ctypes.Structure):
_fields_ = [
("cbSize", ctypes.c_uint),
("dwTime", ctypes.c_uint),
]
lii = LASTINPUTINFO()
lii.cbSize = ctypes.sizeof(LASTINPUTINFO)
if not ctypes.windll.user32.GetLastInputInfo(ctypes.byref(lii)):
return None
millis = ctypes.windll.kernel32.GetTickCount() - lii.dwTime
return millis / 1000.0
except Exception as e:
logger.error(f"Failed to get idle time: {e}")
return None
# ---- Process detection ----
def _get_running_processes_sync(self) -> Set[str]:
"""Get set of lowercase process names via Win32 EnumProcesses.
Uses PROCESS_QUERY_LIMITED_INFORMATION + QueryFullProcessImageNameW
which is ~300x faster than WMI (~8ms vs ~3s). System services
running under protected accounts are not visible, but all
user-facing applications are covered.
"""
if not _IS_WINDOWS:
return set()
try:
psapi = ctypes.windll.psapi
kernel32 = ctypes.windll.kernel32
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
# Enumerate all PIDs
pid_array = (ctypes.wintypes.DWORD * 2048)()
cb_needed = ctypes.wintypes.DWORD()
psapi.EnumProcesses(
ctypes.byref(pid_array), ctypes.sizeof(pid_array),
ctypes.byref(cb_needed),
)
n_pids = cb_needed.value // ctypes.sizeof(ctypes.wintypes.DWORD)
procs: Set[str] = set()
name_buf = ctypes.create_unicode_buffer(512)
for i in range(n_pids):
pid = pid_array[i]
if pid == 0:
continue
handle = kernel32.OpenProcess(
PROCESS_QUERY_LIMITED_INFORMATION, False, pid,
)
if not handle:
continue
try:
buf_size = ctypes.wintypes.DWORD(512)
if kernel32.QueryFullProcessImageNameW(
handle, 0, name_buf, ctypes.byref(buf_size),
):
procs.add(os.path.basename(name_buf.value).lower())
finally:
kernel32.CloseHandle(handle)
return procs
except Exception as e:
logger.error(f"Failed to enumerate processes: {e}")
return set()
def _get_topmost_process_sync(self) -> tuple:
"""Get (process_name, is_fullscreen) of the foreground window.
Returns (None, False) when detection fails.
Blocking — call via executor.
"""
if not _IS_WINDOWS:
return (None, False)
try:
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
psapi = ctypes.windll.psapi
hwnd = user32.GetForegroundWindow()
if not hwnd:
return (None, False)
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
if not pid.value:
return (None, False)
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
handle = kernel32.OpenProcess(
PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid.value
)
if not handle:
return (None, False)
proc_name = None
try:
buf = ctypes.create_unicode_buffer(512)
psapi.GetModuleFileNameExW(handle, None, buf, 512)
full_path = buf.value
if full_path:
proc_name = os.path.basename(full_path).lower()
finally:
kernel32.CloseHandle(handle)
if proc_name is None:
return (None, False)
# Check if the foreground window covers its entire monitor
is_fullscreen = self._is_window_fullscreen(user32, hwnd)
return (proc_name, is_fullscreen)
except Exception as e:
logger.error(f"Failed to get foreground process: {e}")
return (None, False)
@staticmethod
def _is_window_fullscreen(user32, hwnd) -> bool:
"""Check whether *hwnd* covers its monitor completely."""
try:
# Get window rectangle
win_rect = ctypes.wintypes.RECT()
user32.GetWindowRect(hwnd, ctypes.byref(win_rect))
# Get the monitor this window is on
MONITOR_DEFAULTTONEAREST = 2
hmon = user32.MonitorFromWindow(hwnd, MONITOR_DEFAULTTONEAREST)
if not hmon:
return False
# MONITORINFO struct
class MONITORINFO(ctypes.Structure):
_fields_ = [
("cbSize", ctypes.wintypes.DWORD),
("rcMonitor", ctypes.wintypes.RECT),
("rcWork", ctypes.wintypes.RECT),
("dwFlags", ctypes.wintypes.DWORD),
]
mi = MONITORINFO()
mi.cbSize = ctypes.sizeof(MONITORINFO)
if not user32.GetMonitorInfoW(hmon, ctypes.byref(mi)):
return False
mr = mi.rcMonitor
return (
win_rect.left <= mr.left
and win_rect.top <= mr.top
and win_rect.right >= mr.right
and win_rect.bottom >= mr.bottom
)
except Exception:
return False
def _get_fullscreen_processes_sync(self) -> Set[str]:
"""Get set of lowercase process names that have a fullscreen window.
Enumerates all top-level windows and checks each for fullscreen.
Returns process names (lowercase) whose window covers an entire monitor.
"""
if not _IS_WINDOWS:
return set()
try:
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
psapi = ctypes.windll.psapi
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
fullscreen_procs: Set[str] = set()
# Callback receives (hwnd, lparam); return True to continue enumeration
WNDENUMPROC = ctypes.WINFUNCTYPE(
ctypes.wintypes.BOOL,
ctypes.wintypes.HWND,
ctypes.wintypes.LPARAM,
)
# Skip the desktop and shell windows (always cover the full monitor)
desktop_hwnd = user32.GetDesktopWindow()
shell_hwnd = user32.GetShellWindow()
# Get shell process PID to filter all explorer desktop windows
shell_pid = 0
if shell_hwnd:
_spid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(shell_hwnd, ctypes.byref(_spid))
shell_pid = _spid.value
GWL_EXSTYLE = -20
WS_EX_TOOLWINDOW = 0x00000080
WS_EX_NOACTIVATE = 0x08000000
def _enum_callback(hwnd, _lparam):
# Skip invisible windows
if not user32.IsWindowVisible(hwnd):
return True
if hwnd == desktop_hwnd:
return True
# Skip tool/overlay/non-activatable windows (system UI, input hosts)
ex_style = user32.GetWindowLongW(hwnd, GWL_EXSTYLE)
if ex_style & (WS_EX_TOOLWINDOW | WS_EX_NOACTIVATE):
return True
# Quick fullscreen check before expensive process name lookup
if not self._is_window_fullscreen(user32, hwnd):
return True
# Get PID; skip shell process windows (desktop, taskbar)
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
if not pid.value or pid.value == shell_pid:
return True
handle = kernel32.OpenProcess(
PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid.value
)
if not handle:
return True
try:
buf = ctypes.create_unicode_buffer(512)
psapi.GetModuleFileNameExW(handle, None, buf, 512)
full_path = buf.value
if full_path:
fullscreen_procs.add(os.path.basename(full_path).lower())
finally:
kernel32.CloseHandle(handle)
return True
callback = WNDENUMPROC(_enum_callback)
user32.EnumWindows(callback, 0)
return fullscreen_procs
except Exception as e:
logger.error(f"Failed to enumerate fullscreen windows: {e}")
return set()
async def get_running_processes(self) -> Set[str]:
"""Get set of lowercase process names (async-safe)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._get_running_processes_sync)
async def get_topmost_process(self) -> tuple:
"""Get (process_name, is_fullscreen) of the foreground window (async-safe)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._get_topmost_process_sync)
async def get_fullscreen_processes(self) -> Set[str]:
"""Get set of process names that have a fullscreen window (async-safe)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._get_fullscreen_processes_sync)