Files
media-player-server/media_server/services/display_service.py
T
alexei.dolgolyov d1f621f0b4
Lint & Test / test (push) Successful in 10s
fix(displays): verify DDC/CI writes and trust capability string for picture mode
DDC/CI writes are fire-and-forget at the protocol level: a successful send
does not mean the monitor honored the value. Many monitors (LG ultrawides
in particular) silently drop writes for VCP codes whose registers exist
but whose feature isn't really implemented in firmware.

- New _verify_after_set helper polls readback after every DDC/CI write and
  reports {success: false} when the monitor didn't apply the value. Wired
  into set_contrast, set_input_source, set_color_preset, set_picture_mode.
  Input source uses a longer settle window since switching can briefly
  disrupt the DDC/CI link.

- Picture mode (VCP 0xDC) now requires the capability string to declare
  supported codes under cmds[0xDC]. Without that declaration we treat the
  feature as unsupported even when reads succeed - the LG case where reads
  return a stuck value and every write is silently ignored.
2026-05-15 14:45:40 +03:00

653 lines
23 KiB
Python

"""Display brightness, power, contrast, input-source and color-preset control."""
import ctypes
import ctypes.wintypes
import logging
import platform
import struct
import time
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# VCP 0xDC "Display Application" — picture / scene mode.
# Vendors deviate from the MCCS spec, but these labels match the standard
# meanings and cover what most monitors report through their capability
# string. Unknown codes fall back to "Mode <n>".
PICTURE_MODE_VCP = 0xDC
PICTURE_MODE_LABELS: dict[int, str] = {
0x00: "Default",
0x01: "Standalone",
0x02: "Mixed",
0x03: "Productivity",
0x04: "Movie",
0x05: "Game",
0x06: "Sports",
0x07: "Professional",
0x08: "Standard",
0x09: "Default",
0x0A: "Movie (Reduced Effects)",
0x0B: "Movie (Enhanced)",
0x0C: "User 1",
0x0D: "User 2",
0x0E: "User 3",
}
_sbc = None
_monitorcontrol = None
def _load_sbc():
global _sbc
if _sbc is None:
try:
import screen_brightness_control as sbc
_sbc = sbc
except ImportError:
logger.warning("screen_brightness_control not installed - brightness control unavailable")
return _sbc
def _load_monitorcontrol():
global _monitorcontrol
if _monitorcontrol is None:
try:
import monitorcontrol
_monitorcontrol = monitorcontrol
except ImportError:
logger.warning("monitorcontrol not installed - DDC/CI control unavailable")
return _monitorcontrol
def _parse_edid_resolution(edid_hex: str) -> str | None:
"""Parse resolution from EDID hex string (first detailed timing descriptor)."""
try:
edid = bytes.fromhex(edid_hex)
if len(edid) < 58:
return None
dtd = edid[54:]
pixel_clock = struct.unpack('<H', dtd[0:2])[0]
if pixel_clock == 0:
return None
h_active = dtd[2] | ((dtd[4] >> 4) << 8)
v_active = dtd[5] | ((dtd[7] >> 4) << 8)
return f"{h_active}x{v_active}"
except Exception:
return None
@dataclass
class MonitorInfo:
id: int
name: str
brightness: int | None
power_supported: bool
power_on: bool = True
model: str = ""
manufacturer: str = ""
resolution: str | None = None
is_primary: bool = False
contrast: int | None = None
contrast_supported: bool = False
input_source: str | None = None
available_input_sources: list[str] = field(default_factory=list)
input_source_supported: bool = False
color_preset: str | None = None
available_color_presets: list[str] = field(default_factory=list)
color_preset_supported: bool = False
picture_mode: str | None = None
picture_mode_code: int | None = None
available_picture_modes: list[dict] = field(default_factory=list)
picture_mode_supported: bool = False
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"brightness": self.brightness,
"power_supported": self.power_supported,
"power_on": self.power_on,
"model": self.model,
"manufacturer": self.manufacturer,
"resolution": self.resolution,
"is_primary": self.is_primary,
"contrast": self.contrast,
"contrast_supported": self.contrast_supported,
"input_source": self.input_source,
"available_input_sources": self.available_input_sources,
"input_source_supported": self.input_source_supported,
"color_preset": self.color_preset,
"available_color_presets": self.available_color_presets,
"color_preset_supported": self.color_preset_supported,
"picture_mode": self.picture_mode,
"picture_mode_code": self.picture_mode_code,
"available_picture_modes": self.available_picture_modes,
"picture_mode_supported": self.picture_mode_supported,
}
def _detect_primary_resolution() -> str | None:
"""Detect the primary display resolution via Windows API."""
if platform.system() != "Windows":
return None
try:
class MONITORINFO(ctypes.Structure):
_fields_ = [
("cbSize", ctypes.wintypes.DWORD),
("rcMonitor", ctypes.wintypes.RECT),
("rcWork", ctypes.wintypes.RECT),
("dwFlags", ctypes.wintypes.DWORD),
]
MONITORINFOF_PRIMARY = 1
primary_res = None
def callback(hmon, hdc, rect, data):
nonlocal primary_res
mi = MONITORINFO()
mi.cbSize = ctypes.sizeof(mi)
ctypes.windll.user32.GetMonitorInfoW(hmon, ctypes.byref(mi))
if mi.dwFlags & MONITORINFOF_PRIMARY:
w = mi.rcMonitor.right - mi.rcMonitor.left
h = mi.rcMonitor.bottom - mi.rcMonitor.top
primary_res = f"{w}x{h}"
return True
MONITORENUMPROC = ctypes.WINFUNCTYPE(
ctypes.c_int,
ctypes.wintypes.HMONITOR,
ctypes.wintypes.HDC,
ctypes.POINTER(ctypes.wintypes.RECT),
ctypes.wintypes.LPARAM,
)
ctypes.windll.user32.EnumDisplayMonitors(
None, None, MONITORENUMPROC(callback), 0
)
return primary_res
except Exception:
return None
def _mark_primary(monitors: list[MonitorInfo]) -> None:
"""Mark the primary display in the monitor list."""
if not monitors:
return
primary_res = _detect_primary_resolution()
if primary_res:
for m in monitors:
if m.resolution == primary_res:
m.is_primary = True
return
# Fallback: mark first monitor as primary
monitors[0].is_primary = True
# Short TTL cache of the assembled monitor list (full response).
_monitor_cache: list[MonitorInfo] | None = None
_cache_time: float = 0
_CACHE_TTL = 5.0 # seconds
# Per-monitor cache of static capabilities (option lists + support flags).
# DDC/CI capability discovery is the slow part — it only changes when a
# monitor is replaced or rewired, so we probe it once per monitor and reuse
# it across refreshes. Cleared on explicit `rediscover` or when the monitor
# count changes (cheap stale-detection for hot-plug events).
_static_cache: dict[int, dict] = {}
_static_cache_monitor_count: int = -1
def _enum_name(value, enum_cls=None) -> str | None:
"""Best-effort name for an enum or raw int returned by monitorcontrol.
monitorcontrol's getters sometimes hand back raw ints when the monitor
reports a value the library wraps incompletely. Re-map those through the
matching enum class so HA selects still receive symbolic option names.
"""
if value is None:
return None
name = getattr(value, "name", None)
if name:
return name
if enum_cls is not None:
try:
return enum_cls(value).name
except (ValueError, KeyError):
pass
return str(value)
def _probe_static_open(mon, mc, monitor_id: int) -> dict:
"""Probe per-monitor static capabilities.
Must be called inside an open `with mon:` DDC/CI context. Tries each
feature once to confirm the monitor responds, and enumerates option
lists from the capability string. Heavy: this is what the cache is for.
"""
static = {
"contrast_supported": False,
"input_source_supported": False,
"available_input_sources": [],
"color_preset_supported": False,
"available_color_presets": [],
"picture_mode_supported": False,
"available_picture_modes": [],
}
try:
caps = mon.get_vcp_capabilities() or {}
except Exception as e:
caps = {}
logger.debug("Monitor %d: get_vcp_capabilities failed: %s", monitor_id, e)
try:
mon.get_contrast()
static["contrast_supported"] = True
except Exception as e:
logger.debug("Monitor %d: contrast unsupported: %s", monitor_id, e)
try:
mon.get_input_source()
static["input_source_supported"] = True
except Exception as e:
logger.debug("Monitor %d: input_source unsupported: %s", monitor_id, e)
inputs = caps.get("inputs") or []
input_enum = mc.InputSource if mc else None
static["available_input_sources"] = [
n for n in (_enum_name(s, input_enum) for s in inputs) if n is not None
]
try:
mon.get_color_preset()
static["color_preset_supported"] = True
if mc is not None:
static["available_color_presets"] = [p.name for p in mc.ColorPreset]
except Exception as e:
logger.debug("Monitor %d: color_preset unsupported: %s", monitor_id, e)
# Picture / scene mode (VCP 0xDC). Trickier than color preset because
# many monitors (LG ultrawides included) respond to READS but silently
# drop every WRITE - they implement the register but not the feature.
# The capability string is the most reliable signal: a monitor that
# really implements picture mode declares its supported codes under
# cmds[0xDC]. If 0xDC isn't declared, treat the feature as unsupported
# to avoid exposing a non-functional select.
cmds = caps.get("cmds") or {}
declared = cmds.get(PICTURE_MODE_VCP)
if declared:
try:
mon.vcp.get_vcp_feature(PICTURE_MODE_VCP)
static["picture_mode_supported"] = True
static["available_picture_modes"] = [
{"code": c, "label": PICTURE_MODE_LABELS.get(c, f"Mode {c}")}
for c in sorted(declared)
]
except Exception as e:
logger.debug("Monitor %d: picture_mode declared but unreadable: %s", monitor_id, e)
else:
logger.debug("Monitor %d: picture_mode (VCP 0xDC) not declared in capability string", monitor_id)
return static
def _probe_dynamic_open(mon, mc, monitor_id: int, static: dict) -> dict:
"""Read current values for features known to be supported.
Must be called inside an open `with mon:` context. Skips reads for
unsupported features (saves one I2C roundtrip each), so the warm path
only touches features the monitor actually responds to.
"""
dynamic = {
"power_on": True,
"contrast": None,
"input_source": None,
"color_preset": None,
"picture_mode": None,
"picture_mode_code": None,
}
try:
dynamic["power_on"] = mon.get_power_mode() == mc.PowerMode.on
except Exception as e:
logger.debug("Monitor %d: power readback failed: %s", monitor_id, e)
if static.get("contrast_supported"):
try:
dynamic["contrast"] = mon.get_contrast()
except Exception as e:
logger.debug("Monitor %d: contrast readback failed: %s", monitor_id, e)
if static.get("input_source_supported"):
try:
src = mon.get_input_source()
dynamic["input_source"] = _enum_name(src, mc.InputSource if mc else None)
except Exception as e:
logger.debug("Monitor %d: input_source readback failed: %s", monitor_id, e)
if static.get("color_preset_supported"):
try:
preset = mon.get_color_preset()
dynamic["color_preset"] = _enum_name(preset, mc.ColorPreset if mc else None)
except Exception as e:
logger.debug("Monitor %d: color_preset readback failed: %s", monitor_id, e)
if static.get("picture_mode_supported"):
try:
current, _maximum = mon.vcp.get_vcp_feature(PICTURE_MODE_VCP)
dynamic["picture_mode_code"] = current
dynamic["picture_mode"] = PICTURE_MODE_LABELS.get(current, f"Mode {current}")
except Exception as e:
logger.debug("Monitor %d: picture_mode readback failed: %s", monitor_id, e)
return dynamic
def list_monitors(force_refresh: bool = False, rediscover: bool = False) -> list[MonitorInfo]:
"""List all connected monitors with their current state.
Args:
force_refresh: bypass the short TTL response cache.
rediscover: also drop the per-monitor static capability cache, so the
next probe re-runs DDC/CI capability discovery. Use after hot-plug
or when a monitor's reported capabilities change.
"""
global _monitor_cache, _cache_time, _static_cache_monitor_count
if (
not force_refresh
and not rediscover
and _monitor_cache is not None
and (time.time() - _cache_time) < _CACHE_TTL
):
return _monitor_cache
sbc = _load_sbc()
if sbc is None:
return []
monitors = []
try:
info_list = sbc.list_monitors_info()
brightnesses = sbc.get_brightness()
# Invalidate the static cache on explicit rediscover OR on topology
# change (hot-plug / disconnect). Both indicate the cached probe is
# potentially stale.
if rediscover or len(info_list) != _static_cache_monitor_count:
_static_cache.clear()
_static_cache_monitor_count = len(info_list)
mc = _load_monitorcontrol()
ddc_monitors = []
if mc:
try:
ddc_monitors = mc.get_monitors()
except Exception:
pass
for i, info in enumerate(info_list):
name = info.get("name", f"Monitor {i}")
model = info.get("model", "")
manufacturer = info.get("manufacturer", "")
brightness = brightnesses[i] if i < len(brightnesses) else None
# VCP method monitors support DDC/CI power control
method = str(info.get("method", "")).lower()
power_supported = "vcp" in method
# Parse resolution from EDID
edid = info.get("edid", "")
resolution = _parse_edid_resolution(edid) if edid else None
static: dict = {}
dynamic: dict = {}
# Open the DDC handle ONCE; do static probe (if needed) + dynamic
# readback inside the same context. Opening the handle is the
# expensive part — keep both phases under one open.
if power_supported and i < len(ddc_monitors):
try:
with ddc_monitors[i] as mon:
if i not in _static_cache:
_static_cache[i] = _probe_static_open(mon, mc, i)
static = _static_cache[i]
dynamic = _probe_dynamic_open(mon, mc, i, static)
except Exception as e:
logger.debug("Monitor %d: DDC/CI session failed: %s", i, e)
static = _static_cache.get(i, {})
monitors.append(MonitorInfo(
id=i,
name=name,
brightness=brightness,
power_supported=power_supported,
power_on=dynamic.get("power_on", True),
model=model,
manufacturer=manufacturer,
resolution=resolution,
contrast=dynamic.get("contrast"),
contrast_supported=static.get("contrast_supported", False),
input_source=dynamic.get("input_source"),
available_input_sources=static.get("available_input_sources", []),
input_source_supported=static.get("input_source_supported", False),
color_preset=dynamic.get("color_preset"),
available_color_presets=static.get("available_color_presets", []),
color_preset_supported=static.get("color_preset_supported", False),
picture_mode=dynamic.get("picture_mode"),
picture_mode_code=dynamic.get("picture_mode_code"),
available_picture_modes=static.get("available_picture_modes", []),
picture_mode_supported=static.get("picture_mode_supported", False),
))
except Exception as e:
logger.error("Failed to enumerate monitors: %s", e)
_mark_primary(monitors)
_monitor_cache = monitors
_cache_time = time.time()
return monitors
def get_brightness(monitor_id: int) -> int | None:
"""Get brightness for a specific monitor."""
sbc = _load_sbc()
if sbc is None:
return None
try:
result = sbc.get_brightness(display=monitor_id)
if isinstance(result, list):
return result[0] if result else None
return result
except Exception as e:
logger.error("Failed to get brightness for monitor %d: %s", monitor_id, e)
return None
def set_brightness(monitor_id: int, value: int) -> bool:
"""Set brightness for a specific monitor (0-100)."""
sbc = _load_sbc()
if sbc is None:
return False
value = max(0, min(100, value))
try:
sbc.set_brightness(value, display=monitor_id)
_invalidate_cache()
return True
except Exception as e:
logger.error("Failed to set brightness for monitor %d: %s", monitor_id, e)
return False
def set_power(monitor_id: int, on: bool) -> bool:
"""Turn a monitor on or off via DDC/CI."""
mc = _load_monitorcontrol()
if mc is None:
logger.error("monitorcontrol not available for power control")
return False
try:
ddc_monitors = mc.get_monitors()
if monitor_id >= len(ddc_monitors):
logger.error("Monitor %d not found in DDC/CI monitors", monitor_id)
return False
with ddc_monitors[monitor_id] as monitor:
if on:
monitor.set_power_mode(mc.PowerMode.on)
else:
monitor.set_power_mode(mc.PowerMode.off_soft)
_invalidate_cache()
return True
except Exception as e:
logger.error("Failed to set power for monitor %d: %s", monitor_id, e)
return False
def _verify_after_set(getter, expected, *, retries: int = 3, delay: float = 0.1) -> bool:
"""Poll a DDC/CI getter to confirm the monitor actually applied a write.
DDC/CI writes are fire-and-forget at the protocol level: a successful
send does not mean the monitor honored the value. Many monitors silently
drop writes for codes their firmware doesn't really implement (LG's
ColorPreset / Picture Mode are common offenders). Without this check the
API would report `success: true` while the monitor sat unchanged.
Compares both raw and `.value` forms so enum/int mismatches don't flag a
spurious failure.
"""
expected_int = getattr(expected, "value", expected)
for _ in range(retries):
time.sleep(delay)
try:
actual = getter()
except Exception:
continue
actual_int = getattr(actual, "value", actual)
if actual == expected or actual_int == expected_int:
return True
return False
def set_contrast(monitor_id: int, value: int) -> bool:
"""Set contrast for a specific monitor (0-100) via DDC/CI."""
mc = _load_monitorcontrol()
if mc is None:
return False
value = max(0, min(100, value))
try:
ddc_monitors = mc.get_monitors()
if monitor_id >= len(ddc_monitors):
return False
with ddc_monitors[monitor_id] as monitor:
monitor.set_contrast(value)
if not _verify_after_set(monitor.get_contrast, value):
logger.warning("Monitor %d: contrast %d not applied", monitor_id, value)
return False
_invalidate_cache()
return True
except Exception as e:
logger.error("Failed to set contrast for monitor %d: %s", monitor_id, e)
return False
def set_input_source(monitor_id: int, source: str) -> bool:
"""Set the DDC/CI input source by enum name (e.g. 'HDMI1', 'DP1')."""
mc = _load_monitorcontrol()
if mc is None:
return False
try:
target = mc.InputSource[source]
except KeyError:
logger.error("Unknown input source: %s", source)
return False
try:
ddc_monitors = mc.get_monitors()
if monitor_id >= len(ddc_monitors):
return False
with ddc_monitors[monitor_id] as monitor:
monitor.set_input_source(target)
# Source switches can briefly disrupt the DDC/CI link; allow a
# longer settle window before declaring failure.
if not _verify_after_set(monitor.get_input_source, target, retries=5, delay=0.2):
logger.warning("Monitor %d: input source %s not applied", monitor_id, source)
return False
_invalidate_cache()
return True
except Exception as e:
logger.error("Failed to set input source for monitor %d: %s", monitor_id, e)
return False
def set_color_preset(monitor_id: int, preset: str) -> bool:
"""Set the DDC/CI color preset by enum name (e.g. 'COLOR_TEMP_6500K')."""
mc = _load_monitorcontrol()
if mc is None:
return False
try:
target = mc.ColorPreset[preset]
except KeyError:
logger.error("Unknown color preset: %s", preset)
return False
try:
ddc_monitors = mc.get_monitors()
if monitor_id >= len(ddc_monitors):
return False
with ddc_monitors[monitor_id] as monitor:
monitor.set_color_preset(target)
if not _verify_after_set(monitor.get_color_preset, target):
logger.warning(
"Monitor %d: color preset %s not applied (monitor silently rejected)",
monitor_id, preset,
)
return False
_invalidate_cache()
return True
except Exception as e:
logger.error("Failed to set color preset for monitor %d: %s", monitor_id, e)
return False
def set_picture_mode(monitor_id: int, code: int) -> bool:
"""Set the DDC/CI picture/scene mode (VCP 0xDC) by raw code."""
mc = _load_monitorcontrol()
if mc is None:
return False
if not 0 <= code <= 255:
logger.error("Picture mode code %d out of range", code)
return False
try:
ddc_monitors = mc.get_monitors()
if monitor_id >= len(ddc_monitors):
return False
with ddc_monitors[monitor_id] as monitor:
monitor.vcp.set_vcp_feature(PICTURE_MODE_VCP, code)
# Raw VCP read returns (current, maximum) — only compare current.
def _read_picture_mode():
current, _ = monitor.vcp.get_vcp_feature(PICTURE_MODE_VCP)
return current
if not _verify_after_set(_read_picture_mode, code):
logger.warning(
"Monitor %d: picture mode code %d not applied (monitor silently rejected)",
monitor_id, code,
)
return False
_invalidate_cache()
return True
except Exception as e:
logger.error("Failed to set picture mode for monitor %d: %s", monitor_id, e)
return False
def _invalidate_cache() -> None:
global _monitor_cache
_monitor_cache = None