"""Display brightness, power, contrast, input-source and color-preset control.""" import ctypes import ctypes.wintypes import logging import platform import struct import time from dataclasses import dataclass, field logger = logging.getLogger(__name__) # VCP 0xDC "Display Application" — picture / scene mode. # Vendors deviate from the MCCS spec, but these labels match the standard # meanings and cover what most monitors report through their capability # string. Unknown codes fall back to "Mode ". PICTURE_MODE_VCP = 0xDC PICTURE_MODE_LABELS: dict[int, str] = { 0x00: "Default", 0x01: "Standalone", 0x02: "Mixed", 0x03: "Productivity", 0x04: "Movie", 0x05: "Game", 0x06: "Sports", 0x07: "Professional", 0x08: "Standard", 0x09: "Default", 0x0A: "Movie (Reduced Effects)", 0x0B: "Movie (Enhanced)", 0x0C: "User 1", 0x0D: "User 2", 0x0E: "User 3", } _sbc = None _monitorcontrol = None def _load_sbc(): global _sbc if _sbc is None: try: import screen_brightness_control as sbc _sbc = sbc except ImportError: logger.warning("screen_brightness_control not installed - brightness control unavailable") return _sbc def _load_monitorcontrol(): global _monitorcontrol if _monitorcontrol is None: try: import monitorcontrol _monitorcontrol = monitorcontrol except ImportError: logger.warning("monitorcontrol not installed - DDC/CI control unavailable") return _monitorcontrol def _parse_edid_resolution(edid_hex: str) -> str | None: """Parse resolution from EDID hex string (first detailed timing descriptor).""" try: edid = bytes.fromhex(edid_hex) if len(edid) < 58: return None dtd = edid[54:] pixel_clock = struct.unpack('> 4) << 8) v_active = dtd[5] | ((dtd[7] >> 4) << 8) return f"{h_active}x{v_active}" except Exception: return None @dataclass class MonitorInfo: id: int name: str brightness: int | None power_supported: bool power_on: bool = True model: str = "" manufacturer: str = "" resolution: str | None = None is_primary: bool = False contrast: int | None = None contrast_supported: bool = False input_source: str | None = None available_input_sources: list[str] = field(default_factory=list) input_source_supported: bool = False color_preset: str | None = None available_color_presets: list[str] = field(default_factory=list) color_preset_supported: bool = False picture_mode: str | None = None picture_mode_code: int | None = None available_picture_modes: list[dict] = field(default_factory=list) picture_mode_supported: bool = False def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "brightness": self.brightness, "power_supported": self.power_supported, "power_on": self.power_on, "model": self.model, "manufacturer": self.manufacturer, "resolution": self.resolution, "is_primary": self.is_primary, "contrast": self.contrast, "contrast_supported": self.contrast_supported, "input_source": self.input_source, "available_input_sources": self.available_input_sources, "input_source_supported": self.input_source_supported, "color_preset": self.color_preset, "available_color_presets": self.available_color_presets, "color_preset_supported": self.color_preset_supported, "picture_mode": self.picture_mode, "picture_mode_code": self.picture_mode_code, "available_picture_modes": self.available_picture_modes, "picture_mode_supported": self.picture_mode_supported, } def _detect_primary_resolution() -> str | None: """Detect the primary display resolution via Windows API.""" if platform.system() != "Windows": return None try: class MONITORINFO(ctypes.Structure): _fields_ = [ ("cbSize", ctypes.wintypes.DWORD), ("rcMonitor", ctypes.wintypes.RECT), ("rcWork", ctypes.wintypes.RECT), ("dwFlags", ctypes.wintypes.DWORD), ] MONITORINFOF_PRIMARY = 1 primary_res = None def callback(hmon, hdc, rect, data): nonlocal primary_res mi = MONITORINFO() mi.cbSize = ctypes.sizeof(mi) ctypes.windll.user32.GetMonitorInfoW(hmon, ctypes.byref(mi)) if mi.dwFlags & MONITORINFOF_PRIMARY: w = mi.rcMonitor.right - mi.rcMonitor.left h = mi.rcMonitor.bottom - mi.rcMonitor.top primary_res = f"{w}x{h}" return True MONITORENUMPROC = ctypes.WINFUNCTYPE( ctypes.c_int, ctypes.wintypes.HMONITOR, ctypes.wintypes.HDC, ctypes.POINTER(ctypes.wintypes.RECT), ctypes.wintypes.LPARAM, ) ctypes.windll.user32.EnumDisplayMonitors( None, None, MONITORENUMPROC(callback), 0 ) return primary_res except Exception: return None def _mark_primary(monitors: list[MonitorInfo]) -> None: """Mark the primary display in the monitor list.""" if not monitors: return primary_res = _detect_primary_resolution() if primary_res: for m in monitors: if m.resolution == primary_res: m.is_primary = True return # Fallback: mark first monitor as primary monitors[0].is_primary = True # Short TTL cache of the assembled monitor list (full response). _monitor_cache: list[MonitorInfo] | None = None _cache_time: float = 0 _CACHE_TTL = 5.0 # seconds # Per-monitor cache of static capabilities (option lists + support flags). # DDC/CI capability discovery is the slow part — it only changes when a # monitor is replaced or rewired, so we probe it once per monitor and reuse # it across refreshes. Keyed by a stable identity tuple # (manufacturer, model, edid_hash) so that hot-plug swaps where the new # topology has the same number of monitors but different devices still # refresh the cache for the new monitor instead of serving stale capabilities. _static_cache: dict[tuple, dict] = {} def _enum_name(value, enum_cls=None) -> str | None: """Best-effort name for an enum or raw int returned by monitorcontrol. monitorcontrol's getters sometimes hand back raw ints when the monitor reports a value the library wraps incompletely. Re-map those through the matching enum class so HA selects still receive symbolic option names. """ if value is None: return None name = getattr(value, "name", None) if name: return name if enum_cls is not None: try: return enum_cls(value).name except (ValueError, KeyError): pass return str(value) def _probe_static_open(mon, mc, monitor_id: int) -> dict: """Probe per-monitor static capabilities. Must be called inside an open `with mon:` DDC/CI context. Tries each feature once to confirm the monitor responds, and enumerates option lists from the capability string. Heavy: this is what the cache is for. """ static = { "contrast_supported": False, "input_source_supported": False, "available_input_sources": [], "color_preset_supported": False, "available_color_presets": [], "picture_mode_supported": False, "available_picture_modes": [], } try: caps = mon.get_vcp_capabilities() or {} except Exception as e: caps = {} logger.debug("Monitor %d: get_vcp_capabilities failed: %s", monitor_id, e) try: mon.get_contrast() static["contrast_supported"] = True except Exception as e: logger.debug("Monitor %d: contrast unsupported: %s", monitor_id, e) try: mon.get_input_source() static["input_source_supported"] = True except Exception as e: logger.debug("Monitor %d: input_source unsupported: %s", monitor_id, e) inputs = caps.get("inputs") or [] input_enum = mc.InputSource if mc else None static["available_input_sources"] = [ n for n in (_enum_name(s, input_enum) for s in inputs) if n is not None ] try: mon.get_color_preset() static["color_preset_supported"] = True if mc is not None: static["available_color_presets"] = [p.name for p in mc.ColorPreset] except Exception as e: logger.debug("Monitor %d: color_preset unsupported: %s", monitor_id, e) # Picture / scene mode (VCP 0xDC). Trickier than color preset because # many monitors (LG ultrawides included) respond to READS but silently # drop every WRITE - they implement the register but not the feature. # The capability string is the most reliable signal: a monitor that # really implements picture mode declares its supported codes under # cmds[0xDC]. If 0xDC isn't declared, treat the feature as unsupported # to avoid exposing a non-functional select. cmds = caps.get("cmds") or {} declared = cmds.get(PICTURE_MODE_VCP) if declared: try: mon.vcp.get_vcp_feature(PICTURE_MODE_VCP) static["picture_mode_supported"] = True static["available_picture_modes"] = [ {"code": c, "label": PICTURE_MODE_LABELS.get(c, f"Mode {c}")} for c in sorted(declared) ] except Exception as e: logger.debug("Monitor %d: picture_mode declared but unreadable: %s", monitor_id, e) else: logger.debug("Monitor %d: picture_mode (VCP 0xDC) not declared in capability string", monitor_id) return static def _probe_dynamic_open(mon, mc, monitor_id: int, static: dict) -> dict: """Read current values for features known to be supported. Must be called inside an open `with mon:` context. Skips reads for unsupported features (saves one I2C roundtrip each), so the warm path only touches features the monitor actually responds to. """ dynamic = { "power_on": True, "contrast": None, "input_source": None, "color_preset": None, "picture_mode": None, "picture_mode_code": None, } try: dynamic["power_on"] = mon.get_power_mode() == mc.PowerMode.on except Exception as e: logger.debug("Monitor %d: power readback failed: %s", monitor_id, e) if static.get("contrast_supported"): try: dynamic["contrast"] = mon.get_contrast() except Exception as e: logger.debug("Monitor %d: contrast readback failed: %s", monitor_id, e) if static.get("input_source_supported"): try: src = mon.get_input_source() dynamic["input_source"] = _enum_name(src, mc.InputSource if mc else None) except Exception as e: logger.debug("Monitor %d: input_source readback failed: %s", monitor_id, e) if static.get("color_preset_supported"): try: preset = mon.get_color_preset() dynamic["color_preset"] = _enum_name(preset, mc.ColorPreset if mc else None) except Exception as e: logger.debug("Monitor %d: color_preset readback failed: %s", monitor_id, e) if static.get("picture_mode_supported"): try: current, _maximum = mon.vcp.get_vcp_feature(PICTURE_MODE_VCP) dynamic["picture_mode_code"] = current dynamic["picture_mode"] = PICTURE_MODE_LABELS.get(current, f"Mode {current}") except Exception as e: logger.debug("Monitor %d: picture_mode readback failed: %s", monitor_id, e) return dynamic def list_monitors(force_refresh: bool = False, rediscover: bool = False) -> list[MonitorInfo]: """List all connected monitors with their current state. Args: force_refresh: bypass the short TTL response cache. rediscover: also drop the per-monitor static capability cache, so the next probe re-runs DDC/CI capability discovery. Use after hot-plug or when a monitor's reported capabilities change. """ global _monitor_cache, _cache_time if ( not force_refresh and not rediscover and _monitor_cache is not None and (time.time() - _cache_time) < _CACHE_TTL ): return _monitor_cache sbc = _load_sbc() if sbc is None: return [] monitors = [] try: info_list = sbc.list_monitors_info() brightnesses = sbc.get_brightness() # Explicit rediscover wipes the whole cache; otherwise rely on stable # per-monitor keys (manufacturer|model|edid_hash) so a hot-plug swap # invalidates the entry for the missing monitor automatically. if rediscover: _static_cache.clear() mc = _load_monitorcontrol() ddc_monitors = [] if mc: try: ddc_monitors = mc.get_monitors() except Exception: pass import hashlib seen_keys: set[tuple] = set() for i, info in enumerate(info_list): name = info.get("name", f"Monitor {i}") model = info.get("model", "") manufacturer = info.get("manufacturer", "") brightness = brightnesses[i] if i < len(brightnesses) else None # VCP method monitors support DDC/CI power control method = str(info.get("method", "")).lower() power_supported = "vcp" in method # Parse resolution from EDID edid = info.get("edid", "") resolution = _parse_edid_resolution(edid) if edid else None # Stable cache key — EDID hash is unique per physical monitor. # Fall back to (manufacturer, model, serial-ish) when EDID is # missing, then to the legacy index as a last resort. if edid: edid_hash = hashlib.blake2b( edid.encode("utf-8") if isinstance(edid, str) else bytes(edid), digest_size=8, ).hexdigest() cache_key: tuple = ("edid", edid_hash) elif manufacturer or model: cache_key = ("mm", manufacturer, model, name) else: cache_key = ("idx", i) seen_keys.add(cache_key) static: dict = {} dynamic: dict = {} # Open the DDC handle ONCE; do static probe (if needed) + dynamic # readback inside the same context. Opening the handle is the # expensive part — keep both phases under one open. if power_supported and i < len(ddc_monitors): try: with ddc_monitors[i] as mon: if cache_key not in _static_cache: _static_cache[cache_key] = _probe_static_open(mon, mc, i) static = _static_cache[cache_key] dynamic = _probe_dynamic_open(mon, mc, i, static) except Exception as e: logger.debug("Monitor %d: DDC/CI session failed: %s", i, e) static = _static_cache.get(cache_key, {}) monitors.append(MonitorInfo( id=i, name=name, brightness=brightness, power_supported=power_supported, power_on=dynamic.get("power_on", True), model=model, manufacturer=manufacturer, resolution=resolution, contrast=dynamic.get("contrast"), contrast_supported=static.get("contrast_supported", False), input_source=dynamic.get("input_source"), available_input_sources=static.get("available_input_sources", []), input_source_supported=static.get("input_source_supported", False), color_preset=dynamic.get("color_preset"), available_color_presets=static.get("available_color_presets", []), color_preset_supported=static.get("color_preset_supported", False), picture_mode=dynamic.get("picture_mode"), picture_mode_code=dynamic.get("picture_mode_code"), available_picture_modes=static.get("available_picture_modes", []), picture_mode_supported=static.get("picture_mode_supported", False), )) # Evict cache entries for monitors that disappeared from this scan so # the next hot-plug of a different monitor with the same identity # tuple (e.g. same model) doesn't hit a stale entry first. for stale_key in list(_static_cache.keys()): if stale_key not in seen_keys: _static_cache.pop(stale_key, None) except Exception as e: logger.error("Failed to enumerate monitors: %s", e) _mark_primary(monitors) _monitor_cache = monitors _cache_time = time.time() return monitors def get_brightness(monitor_id: int) -> int | None: """Get brightness for a specific monitor.""" sbc = _load_sbc() if sbc is None: return None try: result = sbc.get_brightness(display=monitor_id) if isinstance(result, list): return result[0] if result else None return result except Exception as e: logger.error("Failed to get brightness for monitor %d: %s", monitor_id, e) return None def set_brightness(monitor_id: int, value: int) -> bool: """Set brightness for a specific monitor (0-100).""" sbc = _load_sbc() if sbc is None: return False value = max(0, min(100, value)) try: sbc.set_brightness(value, display=monitor_id) _invalidate_cache() return True except Exception as e: logger.error("Failed to set brightness for monitor %d: %s", monitor_id, e) return False def set_power(monitor_id: int, on: bool) -> bool: """Turn a monitor on or off via DDC/CI.""" mc = _load_monitorcontrol() if mc is None: logger.error("monitorcontrol not available for power control") return False try: ddc_monitors = mc.get_monitors() if monitor_id >= len(ddc_monitors): logger.error("Monitor %d not found in DDC/CI monitors", monitor_id) return False with ddc_monitors[monitor_id] as monitor: if on: monitor.set_power_mode(mc.PowerMode.on) else: monitor.set_power_mode(mc.PowerMode.off_soft) _invalidate_cache() return True except Exception as e: logger.error("Failed to set power for monitor %d: %s", monitor_id, e) return False def _verify_after_set(getter, expected, *, retries: int = 3, delay: float = 0.1) -> bool: """Poll a DDC/CI getter to confirm the monitor actually applied a write. DDC/CI writes are fire-and-forget at the protocol level: a successful send does not mean the monitor honored the value. Many monitors silently drop writes for codes their firmware doesn't really implement (LG's ColorPreset / Picture Mode are common offenders). Without this check the API would report `success: true` while the monitor sat unchanged. Compares both raw and `.value` forms so enum/int mismatches don't flag a spurious failure. """ expected_int = getattr(expected, "value", expected) for _ in range(retries): time.sleep(delay) try: actual = getter() except Exception: continue actual_int = getattr(actual, "value", actual) if actual == expected or actual_int == expected_int: return True return False def set_contrast(monitor_id: int, value: int) -> bool: """Set contrast for a specific monitor (0-100) via DDC/CI.""" mc = _load_monitorcontrol() if mc is None: return False value = max(0, min(100, value)) try: ddc_monitors = mc.get_monitors() if monitor_id >= len(ddc_monitors): return False with ddc_monitors[monitor_id] as monitor: monitor.set_contrast(value) if not _verify_after_set(monitor.get_contrast, value): logger.warning("Monitor %d: contrast %d not applied", monitor_id, value) return False _invalidate_cache() return True except Exception as e: logger.error("Failed to set contrast for monitor %d: %s", monitor_id, e) return False def set_input_source(monitor_id: int, source: str) -> bool: """Set the DDC/CI input source by enum name (e.g. 'HDMI1', 'DP1').""" mc = _load_monitorcontrol() if mc is None: return False try: target = mc.InputSource[source] except KeyError: logger.error("Unknown input source: %s", source) return False try: ddc_monitors = mc.get_monitors() if monitor_id >= len(ddc_monitors): return False with ddc_monitors[monitor_id] as monitor: monitor.set_input_source(target) # Source switches can briefly disrupt the DDC/CI link; allow a # longer settle window before declaring failure. if not _verify_after_set(monitor.get_input_source, target, retries=5, delay=0.2): logger.warning("Monitor %d: input source %s not applied", monitor_id, source) return False _invalidate_cache() return True except Exception as e: logger.error("Failed to set input source for monitor %d: %s", monitor_id, e) return False def set_color_preset(monitor_id: int, preset: str) -> bool: """Set the DDC/CI color preset by enum name (e.g. 'COLOR_TEMP_6500K').""" mc = _load_monitorcontrol() if mc is None: return False try: target = mc.ColorPreset[preset] except KeyError: logger.error("Unknown color preset: %s", preset) return False try: ddc_monitors = mc.get_monitors() if monitor_id >= len(ddc_monitors): return False with ddc_monitors[monitor_id] as monitor: monitor.set_color_preset(target) if not _verify_after_set(monitor.get_color_preset, target): logger.warning( "Monitor %d: color preset %s not applied (monitor silently rejected)", monitor_id, preset, ) return False _invalidate_cache() return True except Exception as e: logger.error("Failed to set color preset for monitor %d: %s", monitor_id, e) return False def set_picture_mode(monitor_id: int, code: int) -> bool: """Set the DDC/CI picture/scene mode (VCP 0xDC) by raw code.""" mc = _load_monitorcontrol() if mc is None: return False if not 0 <= code <= 255: logger.error("Picture mode code %d out of range", code) return False try: ddc_monitors = mc.get_monitors() if monitor_id >= len(ddc_monitors): return False with ddc_monitors[monitor_id] as monitor: monitor.vcp.set_vcp_feature(PICTURE_MODE_VCP, code) # Raw VCP read returns (current, maximum) — only compare current. def _read_picture_mode(): current, _ = monitor.vcp.get_vcp_feature(PICTURE_MODE_VCP) return current if not _verify_after_set(_read_picture_mode, code): logger.warning( "Monitor %d: picture mode code %d not applied (monitor silently rejected)", monitor_id, code, ) return False _invalidate_cache() return True except Exception as e: logger.error("Failed to set picture mode for monitor %d: %s", monitor_id, e) return False def _invalidate_cache() -> None: global _monitor_cache _monitor_cache = None