Files
notify-bridge/packages/server/src/notify_bridge_server/commands/nut_handler.py
T
alexei.dolgolyov 3b76a09759 feat(commands): per-chat album scope derived from notification routing
The "per-chat album scope" feature stored on CommandTrackerListener was
really per-bot: listener_id = bot.id, and every chat that bot served
shared the same scope.  Commands like /albums, /random, /status,
/events leaked the full provider catalog into chats that were never
wired up to receive notifications from those trackers.

New model: the album scope for /commands in a given chat is derived
from the notification-routing graph.  For a (provider, bot, chat_id)
triple we walk TargetReceiver (chat_id match, enabled) →
NotificationTarget (telegram or broadcast parent) →
NotificationTrackerTarget → NotificationTracker (provider match) and
union their collection_ids.  That's the natural "what does this chat
get notifications about" set, and it becomes the command scope.

- New helper: command_utils.resolve_chat_album_scope(provider_id,
  bot_id, chat_id) -> set[str].  Empty set is the default for chats
  with no routing — commands return nothing rather than leaking the
  provider's catalog.
- Dispatcher computes the scope per (tracker, bot, chat) and threads
  it through handler.handle(..., allowed_album_ids=...).  Explicit
  CommandTrackerListener.allowed_album_ids override, when set, still
  wins verbatim (kept as an escape hatch for users who want a divergent
  scope for a whole bot).
- /status, /albums, /events, and all /_cmd_immich-routed commands
  (/random, /search, /find, /latest, /memory, /summary, /favorites,
  /place, /person) now intersect with the resolved scope.
- UI scope modal relabeled: it's an explicit *override for this bot*,
  not a per-chat setting.  Default is "derive from notification
  routing", which matches what users already configured elsewhere.

Also:
- /search, /find, /person, /place — _enrich_assets return value was
  discarded, dropping public_url enrichment.  Assign the return value.
- search_smart / search_metadata — consolidated into _search_items
  helper that logs non-200 responses and transport errors instead of
  silently returning [].  Makes "always no results" bugs actually
  diagnosable.  Also accepts the alternate {"assets": [...]} flat-list
  shape from older Immich versions.
- Immich search error bodies go through _redact_body so credentials
  echoed by authenticating proxies don't land in server logs.
2026-04-22 03:20:51 +03:00

124 lines
4.3 KiB
Python

"""NUT (UPS)-specific bot command handler."""
from __future__ import annotations
import logging
from collections.abc import Callable, Coroutine
from typing import Any
from ..database.models import CommandConfig, CommandTracker, ServiceProvider, TelegramBot
from ..services import make_nut_provider
from .base import CommandResponse, ProviderCommandHandler
from .handler import _render_cmd_template
_LOGGER = logging.getLogger(__name__)
_NUT_COMMANDS = {"status", "devices", "battery"}
# ---------------------------------------------------------------------------
# Command dispatch table
# ---------------------------------------------------------------------------
_TEXT_COMMANDS: dict[str, Callable[..., Coroutine[Any, Any, dict[str, Any]]]] = {}
def _text_cmd(fn: Callable[..., Coroutine[Any, Any, dict[str, Any]]]) -> Callable[..., Coroutine[Any, Any, dict[str, Any]]]:
"""Register a function in the text command dispatch table."""
name = fn.__name__.removeprefix("_cmd_")
_TEXT_COMMANDS[name] = fn
return fn
class NutCommandHandler(ProviderCommandHandler):
"""Handles NUT-specific bot commands."""
provider_type = "nut"
def get_provider_commands(self) -> set[str]:
return _NUT_COMMANDS
def get_rate_categories(self) -> dict[str, str]:
return {"devices": "api", "battery": "api", "status": "api"}
async def handle(
self,
cmd: str,
args: str,
count: int,
locale: str,
response_mode: str,
provider: ServiceProvider,
cmd_templates: dict[str, dict[str, str]],
bot: TelegramBot,
tracker: CommandTracker,
config: CommandConfig,
*,
listener: Any = None,
allowed_album_ids: set[str] | None = None, # noqa: ARG002 — unused (NUT has no album model)
page: int = 1,
) -> CommandResponse | None:
fn = _TEXT_COMMANDS.get(cmd)
if fn is None:
return None
ctx = await fn(provider, count)
return CommandResponse(text=_render_cmd_template(cmd_templates, cmd, locale, ctx))
async def _query_ups(
provider: ServiceProvider,
) -> list[dict[str, Any]]:
"""Connect to a NUT provider and query UPS data."""
from notify_bridge_core.providers.nut.models import NutUpsData
results: list[dict[str, Any]] = []
nut = make_nut_provider(provider)
try:
client = nut._make_client()
await client.connect()
try:
devices = await client.list_ups()
for dev in devices:
variables = await client.list_var(dev.name)
data = NutUpsData.from_variables(dev.name, variables)
results.append({
"name": data.name,
"description": data.description,
"model": data.model,
"manufacturer": data.manufacturer,
"status": data.status,
"battery_charge": int(data.battery_charge) if data.battery_charge is not None else None,
"battery_runtime": data.battery_runtime_formatted,
"ups_load": int(data.ups_load) if data.ups_load is not None else None,
"input_voltage": str(data.input_voltage) if data.input_voltage is not None else None,
"output_voltage": str(data.output_voltage) if data.output_voltage is not None else None,
})
finally:
await client.disconnect()
except Exception as exc:
_LOGGER.warning("Failed to query NUT provider %s: %s", provider.name, exc)
return results
@_text_cmd
async def _cmd_status(provider: ServiceProvider, count: int) -> dict[str, Any]:
devices = await _query_ups(provider)
return {"devices": devices}
@_text_cmd
async def _cmd_devices(provider: ServiceProvider, count: int) -> dict[str, Any]:
devices: list[dict[str, Any]] = []
nut = make_nut_provider(provider)
try:
device_list = await nut.list_collections()
devices.extend(device_list)
except Exception as exc:
_LOGGER.warning("Failed to list devices from %s: %s", provider.name, exc)
return {"devices": devices}
@_text_cmd
async def _cmd_battery(provider: ServiceProvider, count: int) -> dict[str, Any]:
devices = await _query_ups(provider)
return {"devices": devices}