feat: wire tracking-config display filters + per-tracker adaptive polling

Display filters (Immich tracking config):
- favorites_only drops events with no favorited new assets, or filters
  added_assets to favorites only
- assets_order_by/assets_order sort the rendered list
  (date / name / rating / random / none)
- max_assets_to_show caps rendered+attached media (default 5 -> 10)
- include_tags strips people from event extras and tags from each asset
- include_asset_details strips city/country/state/lat/lon/is_favorite/
  rating/description; load-bearing fields (thumbhash, file_size,
  playback_size, cache keys) preserved
- New apply_tracking_display_filters helper in dispatch_helpers; wired
  into watcher, webhooks, scheduled/periodic/memory, and manual
  test-dispatch
- Targets sharing a TrackingConfig dispatch together; targets with
  different TCs each see their own shaped event

Adaptive polling:
- Replace NotificationTracker.batch_duration with adaptive_max_skip
- Per-tracker opt-in: NULL/0 disables back-off (every tick runs);
  positive N caps the skip factor at (N-1)-in-N after long idle
- Scheduler caches the cap in module state for the tick fast-path
- Migration adds the new column; API schemas/responses, frontend types,
  i18n, and the tracker form updated to match
This commit is contained in:
2026-04-24 21:12:10 +03:00
parent 187b889c45
commit ab621b6abc
19 changed files with 367 additions and 72 deletions
@@ -116,7 +116,6 @@ class NotificationTrackerData(BaseModel):
collection_ids: list[str] = []
filters: dict[str, Any] = {}
scan_interval: int = 60
batch_duration: int = 0
default_tracking_config_id: int | None = None
default_template_config_id: int | None = None
enabled: bool = True
@@ -294,7 +294,6 @@ async def export_backup(
id=nt.id, provider_id=nt.provider_id, name=nt.name,
icon=nt.icon, collection_ids=nt.collection_ids,
filters=nt.filters, scan_interval=nt.scan_interval,
batch_duration=nt.batch_duration,
default_tracking_config_id=nt.default_tracking_config_id,
default_template_config_id=nt.default_template_config_id,
enabled=nt.enabled, targets=targets,
@@ -733,7 +732,6 @@ async def import_backup(
user_id=user_id, provider_id=provider_id,
name=name, icon=nt.icon, collection_ids=nt.collection_ids,
filters=nt.filters, scan_interval=nt.scan_interval,
batch_duration=nt.batch_duration,
default_tracking_config_id=_map_id(id_map, "tracking_configs", nt.default_tracking_config_id),
default_template_config_id=_map_id(id_map, "template_configs", nt.default_template_config_id),
enabled=nt.enabled,
@@ -2,15 +2,18 @@
from __future__ import annotations
import dataclasses
import logging
import random
from datetime import datetime, time, timezone
from typing import Any
from typing import Any, Callable
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from notify_bridge_core.models.events import ServiceEvent
from notify_bridge_core.models.media import MediaAsset
from notify_bridge_core.notifications.receiver import Receiver, build_receiver
from ..database.models import (
@@ -137,6 +140,143 @@ def event_allowed_by_config(
return flag_map.get(event_type, True)
# --- Display-time filters driven by TrackingConfig -------------------------
#
# These transform a ServiceEvent so the dispatched notification reflects the
# user's per-tracker "asset display" preferences. Event-tracking flags (which
# events fire at all) live in ``event_allowed_by_config`` above; the filters
# here only reshape an already-allowed event.
# Asset.extra keys stripped when ``include_asset_details=False``. These are
# the enrichment fields the default templates render as prose (city/country,
# ⭐ rating, ❤️ favorite). ``thumbhash``/``file_size``/``playback_size``/
# ``owner_id``/``cache_key`` stay — they are load-bearing for media send and
# caching, not user-facing prose.
_ASSET_DETAIL_KEYS: tuple[str, ...] = (
"city", "country", "state",
"latitude", "longitude",
"is_favorite", "rating",
)
def _sort_key_for(order_by: str) -> Callable[[MediaAsset], Any] | None:
if order_by == "date":
return lambda a: a.created_at
if order_by == "name":
return lambda a: a.filename.lower()
if order_by == "rating":
# None ratings sort last regardless of direction.
return lambda a: (
a.extra.get("rating") is None,
a.extra.get("rating") or 0,
)
return None
def _sort_assets(
assets: list[MediaAsset],
order_by: str,
order: str,
) -> list[MediaAsset]:
"""Sort MediaAssets by the configured key/direction.
``order_by="none"`` preserves the input order (the provider's own
ordering, usually detection order). ``"random"`` shuffles in place
on a copy so repeated renders of the same event aren't identical.
"""
if order_by in ("none", "") or len(assets) < 2:
return list(assets)
if order_by == "random":
shuffled = list(assets)
random.shuffle(shuffled)
return shuffled
key_fn = _sort_key_for(order_by)
if key_fn is None:
return list(assets)
return sorted(assets, key=key_fn, reverse=(order == "descending"))
def _transform_asset(
asset: MediaAsset,
*,
strip_details: bool,
strip_tags: bool,
) -> MediaAsset:
"""Return a copy of ``asset`` with details and/or tags removed."""
new_extra = asset.extra
new_description = asset.description
new_tags = asset.tags
if strip_details:
new_extra = {k: v for k, v in asset.extra.items() if k not in _ASSET_DETAIL_KEYS}
new_description = None
if strip_tags:
new_tags = []
return dataclasses.replace(
asset,
description=new_description,
tags=list(new_tags) if new_tags is not asset.tags else asset.tags,
extra=new_extra,
)
def apply_tracking_display_filters(
event: ServiceEvent,
tc: TrackingConfig | None,
) -> ServiceEvent | None:
"""Apply per-tracker display preferences to an already-allowed event.
Semantics:
* ``notify_favorites_only`` + ``assets_order_by`` + ``max_assets_to_show``
only apply to ``ASSETS_ADDED`` events — the album-change path. Scheduled
/ periodic / memory events have their own limits and ordering
(``scheduled_limit``, ``scheduled_order_by``, etc.), so reapplying the
album-change cap would wrongly truncate them.
* ``include_tags`` and ``include_asset_details`` apply to every event
that carries assets, since they control rendering irrespective of
how the assets were selected.
Returns:
A new ``ServiceEvent`` with filters applied, or ``None`` if the event
should be dropped entirely (``notify_favorites_only=True`` and none of
the added assets are favorites).
"""
if tc is None:
return event
assets = list(event.added_assets)
new_added_count = event.added_count
is_change_event = event.event_type.value == "assets_added"
if is_change_event:
if tc.notify_favorites_only:
assets = [a for a in assets if a.extra.get("is_favorite")]
new_added_count = len(assets)
if not assets:
return None
assets = _sort_assets(assets, tc.assets_order_by, tc.assets_order)
if tc.max_assets_to_show >= 0:
assets = assets[: tc.max_assets_to_show]
strip_details = not tc.include_asset_details
strip_tags = not tc.include_tags
if (strip_details or strip_tags) and assets:
assets = [
_transform_asset(a, strip_details=strip_details, strip_tags=strip_tags)
for a in assets
]
new_extra = event.extra
if strip_tags and "people" in event.extra:
new_extra = {k: v for k, v in event.extra.items() if k != "people"}
return dataclasses.replace(
event,
added_assets=assets,
added_count=new_added_count,
extra=new_extra,
)
async def _resolve_target(
session: AsyncSession,
target: NotificationTarget,
@@ -166,11 +166,25 @@ async def dispatch_test_notification(
}
# Dispatch each event to the same target (per-album fan-out sends N messages).
# Apply display filters so the test notification matches production behavior
# for ``favorites_only``, ``include_tags``, ``include_asset_details``, etc.
from .dispatch_helpers import apply_tracking_display_filters
url_cache, asset_cache = await _get_telegram_caches()
dispatcher = NotificationDispatcher(url_cache=url_cache, asset_cache=asset_cache)
all_results: list[dict[str, Any]] = []
for event in events:
results = await dispatcher.dispatch(event, [target_cfg])
shaped_event = apply_tracking_display_filters(event, tracking_config)
if shaped_event is None:
all_results.append({
"success": False,
"error": (
"Event suppressed by tracking config (favorites_only is on "
"but no added assets are favorites)."
),
})
continue
results = await dispatcher.dispatch(shaped_event, [target_cfg])
if results:
all_results.append(results[0])
@@ -42,6 +42,7 @@ from ..database.models import (
TrackingConfig,
)
from .dispatch_helpers import (
apply_tracking_display_filters,
event_allowed_by_config,
get_app_timezone,
load_link_data,
@@ -251,7 +252,9 @@ async def dispatch_scheduled_for_tracker(
# event_allowed_by_config, which inspects event timestamp. Per-event
# rebuilding also lets a per-link override disable one kind while
# keeping others live.
target_configs: list[TargetConfig] = []
# Group target configs by TrackingConfig identity so each unique TC
# gets its own ``apply_tracking_display_filters`` pass before dispatch.
groups: dict[int, tuple[TrackingConfig | None, list[TargetConfig]]] = {}
async with AsyncSession(engine) as session:
for ld in link_data:
tc = ld["tracking_config"] or default_tc
@@ -275,7 +278,7 @@ async def dispatch_scheduled_for_tracker(
locale_map = {s.locale: s.template for s in slot_rows}
template_slots = {EventType.SCHEDULED_MESSAGE.value: locale_map}
target_configs.append(TargetConfig(
target_cfg = TargetConfig(
type=ld["target_type"],
config=ld["target_config"],
template_slots=template_slots,
@@ -287,20 +290,36 @@ async def dispatch_scheduled_for_tracker(
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", ""),
receivers=ld["receivers"],
))
)
key = id(tc) if tc is not None else 0
if key not in groups:
groups[key] = (tc, [])
groups[key][1].append(target_cfg)
if not target_configs:
if not groups:
_LOGGER.info(
"Scheduled %s for tracker %d (collection=%r): no targets after filtering",
kind, tracker_id, event.collection_name,
)
continue
total_targets = sum(len(tg[1]) for tg in groups.values())
_LOGGER.info(
"Dispatching scheduled %s for tracker %d (collection=%r) to %d link(s)",
kind, tracker_id, event.collection_name, len(target_configs),
"Dispatching scheduled %s for tracker %d (collection=%r) to %d link(s) across %d group(s)",
kind, tracker_id, event.collection_name, total_targets, len(groups),
)
results = await dispatcher.dispatch(event, target_configs)
results: list = []
dispatched_any = False
for tc, target_configs in groups.values():
if not target_configs:
continue
shaped_event = apply_tracking_display_filters(event, tc)
if shaped_event is None:
continue
results.extend(await dispatcher.dispatch(shaped_event, target_configs))
dispatched_any = True
if not dispatched_any:
continue
any_sent = True
successes = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
@@ -322,7 +341,7 @@ async def dispatch_scheduled_for_tracker(
"timezone": app_tz,
"collection_mode": collection_mode,
"status": "sent",
"targets_dispatched": len(target_configs),
"targets_dispatched": total_targets,
"targets_succeeded": successes,
},
))
@@ -49,21 +49,44 @@ _scheduler: AsyncIOScheduler | None = None
# than one tick — but the steady-state HTTP cost for a fleet of idle
# trackers drops by ~75%.
#
# Opt-in per tracker via the ``adaptive_max_skip`` column:
# * NULL or 0 → adaptive polling disabled, every tick runs (default)
# * 2 → skip at most 1-in-2 ticks after long idle
# * 3, 4, ... → up to (N-1)-in-N skipping
# Thresholds are intentionally conservative: a tracker polling every 30 s
# needs 5 min of silence before we halve its effective rate, and 15 min
# before we quarter it. Any caller can disable adaptive behavior by passing
# ``adaptive=False`` in the tracker filters dict (checked in ``_poll_tracker``).
# before we quarter it.
# ---------------------------------------------------------------------------
_ADAPTIVE_HALVE_THRESHOLD = 10 # consecutive empty ticks → 1-in-2
_ADAPTIVE_QUARTER_THRESHOLD = 30 # consecutive empty ticks → 1-in-4
_ADAPTIVE_MAX_SKIP = 4 # hard cap on skip factor
# Per-tracker adaptive state, keyed by tracker_id. Rebuilt on process
# restart — a short warmup period is fine and avoids persisting what is
# effectively a performance heuristic.
_adaptive_state: dict[int, dict[str, int]] = {}
# Per-tracker cap on the skip factor, mirrored from the DB column at
# schedule time. Absence of an entry (or 0) means adaptive polling is off
# for that tracker — ``_adaptive_should_skip`` returns False immediately.
_adaptive_max_skip: dict[int, int] = {}
def set_adaptive_max_skip(tracker_id: int, max_skip: int | None) -> None:
"""Register/clear the adaptive cap for a tracker.
Called by the scheduling helpers so the tick-fast-path in
``_adaptive_should_skip`` doesn't need to re-query the DB. Values ≤ 1
disable back-off for the tracker — every scheduled tick runs.
"""
if max_skip and max_skip > 1:
_adaptive_max_skip[tracker_id] = int(max_skip)
else:
_adaptive_max_skip.pop(tracker_id, None)
# Opting in/out mid-session should drop any prior counters so the
# new behavior applies from the next tick, not N ticks later.
_adaptive_state.pop(tracker_id, None)
def _compute_jitter(interval_seconds: int) -> int:
"""Return a jitter bound (in seconds) suitable for an IntervalTrigger.
@@ -387,9 +410,11 @@ async def _load_tracker_jobs() -> None:
replace_existing=True,
max_instances=1,
)
set_adaptive_max_skip(tracker.id, tracker.adaptive_max_skip)
_LOGGER.info(
"Scheduled tracker %d (%s) every %ds (jitter ±%ds)",
"Scheduled tracker %d (%s) every %ds (jitter ±%ds, adaptive_max_skip=%s)",
tracker.id, tracker.name, tracker.scan_interval, jitter,
tracker.adaptive_max_skip,
)
@@ -429,14 +454,21 @@ async def schedule_tracker(
tracker_id: int,
interval: int,
cron_expression: str | None = None,
adaptive_max_skip: int | None = None,
) -> None:
"""Add or update a scheduler job for a tracker."""
"""Add or update a scheduler job for a tracker.
``adaptive_max_skip`` mirrors the DB column and is registered with the
adaptive module-state so tick-time skip decisions don't re-query the DB.
Pass ``None`` or ``0`` to disable back-off for the tracker.
"""
scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}"
# A reschedule typically follows a config edit or enable/disable flip —
# drop adaptive back-off so the first tick after the change runs promptly.
reset_adaptive_state(tracker_id)
set_adaptive_max_skip(tracker_id, adaptive_max_skip)
# Remove existing job first to allow trigger type changes
if scheduler.get_job(job_id):
@@ -461,7 +493,8 @@ async def schedule_tracker(
replace_existing=True,
)
_LOGGER.info(
"Scheduled tracker %d every %ds (jitter ±%ds)", tracker_id, interval, jitter,
"Scheduled tracker %d every %ds (jitter ±%ds, adaptive_max_skip=%s)",
tracker_id, interval, jitter, adaptive_max_skip,
)
@@ -470,6 +503,7 @@ async def unschedule_tracker(tracker_id: int) -> None:
scheduler = get_scheduler()
job_id = f"tracker_{tracker_id}"
reset_adaptive_state(tracker_id)
_adaptive_max_skip.pop(tracker_id, None)
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
_LOGGER.info("Unscheduled tracker %d", tracker_id)
@@ -478,10 +512,12 @@ async def unschedule_tracker(tracker_id: int) -> None:
def _adaptive_should_skip(tracker_id: int) -> bool:
"""Return True when the adaptive heuristic says to skip this tick.
Run-length skip: if we're in 1-in-K mode, skip (K-1) ticks between each
real poll. Stateless about the *current* tick counter except for the
``tick_counter`` we bump here.
Short-circuits to False for trackers without a registered cap (adaptive
off). Otherwise: if we're in 1-in-K mode, skip (K-1) ticks between each
real poll.
"""
if tracker_id not in _adaptive_max_skip:
return False
state = _adaptive_state.get(tracker_id)
if not state:
return False
@@ -494,7 +530,14 @@ def _adaptive_should_skip(tracker_id: int) -> bool:
def _adaptive_update(tracker_id: int, events_detected: int) -> None:
"""Update the adaptive counter after a real tick ran."""
"""Update the adaptive counter after a real tick ran.
No-op when the tracker has adaptive polling disabled — otherwise we'd
build up empty counters for trackers that will never use them.
"""
cap = _adaptive_max_skip.get(tracker_id)
if not cap or cap <= 1:
return
state = _adaptive_state.setdefault(
tracker_id, {"empty_count": 0, "skip_every": 1, "tick_counter": 0}
)
@@ -510,20 +553,22 @@ def _adaptive_update(tracker_id: int, events_detected: int) -> None:
return
state["empty_count"] = state.get("empty_count", 0) + 1
target_quarter = min(cap, 4)
if (
state["empty_count"] >= _ADAPTIVE_QUARTER_THRESHOLD
and state["skip_every"] < _ADAPTIVE_MAX_SKIP
and state["skip_every"] < target_quarter
):
state["skip_every"] = _ADAPTIVE_MAX_SKIP
state["skip_every"] = target_quarter
_LOGGER.info(
"Adaptive polling: tracker %d idle for %d ticks, skipping 3 of 4",
"Adaptive polling: tracker %d idle for %d ticks, skipping %d of %d",
tracker_id, state["empty_count"],
target_quarter - 1, target_quarter,
)
elif (
state["empty_count"] >= _ADAPTIVE_HALVE_THRESHOLD
and state["skip_every"] < 2
and state["skip_every"] < min(cap, 2)
):
state["skip_every"] = 2
state["skip_every"] = min(cap, 2)
_LOGGER.info(
"Adaptive polling: tracker %d idle for %d ticks, skipping every other",
tracker_id, state["empty_count"],
@@ -535,7 +580,8 @@ def reset_adaptive_state(tracker_id: int) -> None:
Used by API callers that make changes requiring the tracker to run
promptly on the next scheduled tick (enable/disable, config edits,
manual "check now" actions).
manual "check now" actions). Does NOT clear the configured cap — use
``set_adaptive_max_skip(..., None)`` for that.
"""
_adaptive_state.pop(tracker_id, None)
@@ -22,6 +22,7 @@ from ..database.models import (
ServiceProvider,
)
from .dispatch_helpers import (
apply_tracking_display_filters,
event_allowed_by_config,
get_app_timezone,
load_link_data,
@@ -382,16 +383,18 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
event.event_type.value, event.collection_name,
event.added_count, event.removed_count,
)
target_configs = []
# Group targets by tracking-config identity so each unique TC
# gets one event-transform pass; targets sharing a TC dispatch
# together (preserves the gather-fan-out inside the dispatcher).
groups: dict[int, tuple[Any, list[TargetConfig]]] = {}
for ld in link_data:
# Apply per-link event filtering from tracking config
tc = ld["tracking_config"]
if tc and not event_allowed_by_config(event, tc, app_tz):
_LOGGER.info(" Skipped by tracking config filter")
continue
tmpl = ld["template_config"]
target_configs.append(TargetConfig(
target_cfg = TargetConfig(
type=ld["target_type"],
config=ld["target_config"],
template_slots=ld["template_slots"],
@@ -401,10 +404,22 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
provider_internal_url=provider_config.get("url", ""),
provider_external_url=provider_config.get("external_domain", ""),
receivers=ld["receivers"],
))
)
key = id(tc) if tc is not None else 0
if key not in groups:
groups[key] = (tc, [])
groups[key][1].append(target_cfg)
if target_configs:
results = await dispatcher.dispatch(event, target_configs)
for tc, target_configs in groups.values():
if not target_configs:
continue
shaped_event = apply_tracking_display_filters(event, tc)
if shaped_event is None:
_LOGGER.info(
" Event suppressed by display filters (favorites_only)",
)
continue
results = await dispatcher.dispatch(shaped_event, target_configs)
for r in results:
if r.get("success"):
_LOGGER.info(" Notification sent successfully")