fix(scheduler): honor app timezone for cron triggers and log scheduled events

CronTrigger.from_crontab was constructed without a timezone, so a cron like
'0 9 * * *' fired at 09:00 host-local instead of 09:00 in the admin-configured
timezone. Now all tracker/action cron triggers are built with the app tz, and
the setting endpoint rebuilds existing cron jobs when the tz changes (since
CronTrigger freezes its tz at construction time).

The scheduler provider also renders current_date/time/datetime/weekday in the
configured tz and exposes a new 'timezone' template variable.

EventLog entries for scheduled_message now include schedule_type,
cron_expression/interval_seconds, timezone, and fire_count, and the dashboard
shows the event type with a label/icon/color.
This commit is contained in:
2026-04-23 13:35:49 +03:00
parent 5604c733d1
commit 1024085cdd
10 changed files with 209 additions and 17 deletions
@@ -5,6 +5,7 @@ from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from notify_bridge_core.models.events import EventType, ServiceEvent
from notify_bridge_core.providers.base import ServiceProvider, ServiceProviderType
@@ -57,6 +58,13 @@ SCHEDULER_VARIABLES: list[TemplateVariableDefinition] = [
example="Monday",
provider_type=ServiceProviderType.SCHEDULER,
),
TemplateVariableDefinition(
name="timezone",
type="string",
description="IANA timezone used to compute current_date/time",
example="Europe/Warsaw",
provider_type=ServiceProviderType.SCHEDULER,
),
TemplateVariableDefinition(
name="custom_vars",
type="dict",
@@ -83,7 +91,8 @@ class SchedulerServiceProvider(ServiceProvider):
custom_variables: dict[str, str] | None = None,
date_format: str = "%d.%m.%Y",
time_format: str = "%H:%M",
datetime_format: str = "%d.%m.%Y, %H:%M UTC",
datetime_format: str = "%d.%m.%Y, %H:%M %Z",
timezone_name: str | None = None,
) -> None:
self._name = name
self._tracker_name = tracker_name
@@ -91,6 +100,18 @@ class SchedulerServiceProvider(ServiceProvider):
self._date_format = date_format
self._time_format = time_format
self._datetime_format = datetime_format
# Resolve a timezone for date/time rendering. Falls back to UTC on
# invalid IANA names so a typo in app settings doesn't break polls.
tz: ZoneInfo
if timezone_name:
try:
tz = ZoneInfo(timezone_name)
except (ZoneInfoNotFoundError, ValueError):
_LOGGER.warning("Unknown timezone %r; falling back to UTC", timezone_name)
tz = ZoneInfo("UTC")
else:
tz = ZoneInfo("UTC")
self._tz = tz
async def connect(self) -> bool:
return True # virtual provider — always connected
@@ -103,7 +124,8 @@ class SchedulerServiceProvider(ServiceProvider):
collection_ids: list[str],
tracker_state: dict[str, Any],
) -> tuple[list[ServiceEvent], dict[str, Any]]:
now = datetime.now(timezone.utc)
now_utc = datetime.now(timezone.utc)
now = now_utc.astimezone(self._tz)
# State uses {collection_id: {dict}} convention like other providers
sched_state = tracker_state.get("scheduler", {})
fire_count = sched_state.get("fire_count", 0) + 1
@@ -115,6 +137,7 @@ class SchedulerServiceProvider(ServiceProvider):
"current_time": now.strftime(self._time_format),
"current_datetime": now.strftime(self._datetime_format),
"weekday": _WEEKDAYS[now.weekday()],
"timezone": self._tz.key,
"custom_vars": dict(self._custom_variables),
}
# Flatten custom variables at top level for easy template access
@@ -224,6 +224,7 @@ def build_template_context(
ctx.setdefault("current_time", event.extra.get("current_time", ""))
ctx.setdefault("current_datetime", event.extra.get("current_datetime", ""))
ctx.setdefault("weekday", event.extra.get("weekday", ""))
ctx.setdefault("timezone", event.extra.get("timezone", "UTC"))
ctx.setdefault("custom_vars", event.extra.get("custom_vars", {}))
return ctx
@@ -94,6 +94,7 @@ async def update_settings(
old_base_url = await get_setting(session, "external_url")
old_secret = await get_setting(session, "telegram_webhook_secret")
old_cache_values = {k: await get_setting(session, k) for k in _CACHE_SETTING_KEYS}
old_timezone = await get_setting(session, "timezone")
for key in _SETTING_KEYS:
value = getattr(body, key, None)
@@ -128,6 +129,14 @@ async def update_settings(
new_base_url = await get_setting(session, "external_url")
new_secret = await get_setting(session, "telegram_webhook_secret")
new_timezone = await get_setting(session, "timezone")
# Cron triggers freeze their timezone at construction time, so a tz change
# has no effect until the jobs are rebuilt — do that here, before we
# return success, so the UI reflects the actual schedule immediately.
if new_timezone != old_timezone:
from ..services.scheduler import reschedule_cron_jobs_for_timezone_change
await reschedule_cron_jobs_for_timezone_change()
# Update webhook secret in the webhook handler module
if new_secret != old_secret:
@@ -242,6 +242,8 @@ async def get_template_variables(
"current_date": "Current date (formatted)",
"current_time": "Current time (formatted)",
"current_datetime": "Current date and time (formatted)",
"weekday": "Day of the week (Monday..Sunday)",
"timezone": "IANA timezone used for current_date/time",
},
},
}
@@ -188,6 +188,7 @@ _SAMPLE_CONTEXT = {
"current_time": "09:00",
"current_datetime": "22.03.2026, 09:00 UTC",
"weekday": "Monday",
"timezone": "UTC",
"custom_vars": {"team": "Engineering", "message": "Time for standup!"},
"team": "Engineering",
"message": "Time for standup!",
@@ -3,11 +3,39 @@
from __future__ import annotations
import logging
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from apscheduler.schedulers.asyncio import AsyncIOScheduler
_LOGGER = logging.getLogger(__name__)
def _resolve_zoneinfo(tz_name: str | None) -> ZoneInfo:
"""Resolve an IANA tz string to a ZoneInfo, falling back to UTC on any error.
Kept local to avoid importing from api/dispatch layers inside the scheduler
module (which is loaded at startup, before the API routers).
"""
if not tz_name:
return ZoneInfo("UTC")
try:
return ZoneInfo(tz_name)
except (ZoneInfoNotFoundError, ValueError):
_LOGGER.warning("Unknown timezone %r; falling back to UTC", tz_name)
return ZoneInfo("UTC")
async def _load_app_timezone() -> ZoneInfo:
"""Load the admin-configured app timezone from AppSetting (falls back to UTC)."""
from sqlmodel.ext.asyncio.session import AsyncSession
from ..api.app_settings import get_setting
from ..database.engine import get_engine
async with AsyncSession(get_engine()) as session:
tz_name = await get_setting(session, "timezone")
return _resolve_zoneinfo(tz_name)
_scheduler: AsyncIOScheduler | None = None
# ---------------------------------------------------------------------------
@@ -293,6 +321,8 @@ async def _load_tracker_jobs() -> None:
)
provider_types = {p.id: p.type for p in provider_result.all()}
tz = await _load_app_timezone()
for tracker in trackers:
job_id = f"tracker_{tracker.id}"
if scheduler.get_job(job_id):
@@ -306,7 +336,7 @@ async def _load_tracker_jobs() -> None:
cron_expr = filters.get("cron_expression", "")
if cron_expr:
try:
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name)
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
continue
except Exception as e:
_LOGGER.error(
@@ -337,10 +367,18 @@ def _add_cron_job(
tracker_id: int,
cron_expression: str,
tracker_name: str,
tz: ZoneInfo,
) -> None:
"""Add a cron-triggered job for a scheduler-type tracker."""
"""Add a cron-triggered job for a scheduler-type tracker.
``tz`` is the user-configured app timezone; without it APScheduler
interprets the crontab in the host's local timezone, which surfaces as
events firing at the "wrong" wall-clock time for operators in a non-UTC
zone (see the companion fix in ``update_settings`` which reschedules on
timezone changes).
"""
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(cron_expression)
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
scheduler.add_job(
_poll_tracker,
trigger,
@@ -349,7 +387,10 @@ def _add_cron_job(
replace_existing=True,
max_instances=1,
)
_LOGGER.info("Scheduled tracker %d (%s) with cron: %s", tracker_id, tracker_name, cron_expression)
_LOGGER.info(
"Scheduled tracker %d (%s) with cron: %s [tz=%s]",
tracker_id, tracker_name, cron_expression, tz.key,
)
async def schedule_tracker(
@@ -371,7 +412,8 @@ async def schedule_tracker(
if cron_expression:
try:
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}")
tz = await _load_app_timezone()
_add_cron_job(scheduler, job_id, tracker_id, cron_expression, f"tracker-{tracker_id}", tz)
return
except Exception as e:
_LOGGER.error("Invalid cron for tracker %d: %s — using interval", tracker_id, e)
@@ -506,6 +548,8 @@ async def _load_action_jobs() -> None:
)
actions = result.all()
tz = await _load_app_timezone()
for action in actions:
job_id = f"action_{action.id}"
if scheduler.get_job(job_id):
@@ -514,7 +558,7 @@ async def _load_action_jobs() -> None:
if action.schedule_type == "cron" and action.schedule_cron:
try:
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(action.schedule_cron)
trigger = CronTrigger.from_crontab(action.schedule_cron, timezone=tz)
scheduler.add_job(
_run_action,
trigger,
@@ -523,8 +567,8 @@ async def _load_action_jobs() -> None:
replace_existing=True,
)
_LOGGER.info(
"Scheduled action %d (%s) with cron: %s",
action.id, action.name, action.schedule_cron,
"Scheduled action %d (%s) with cron: %s [tz=%s]",
action.id, action.name, action.schedule_cron, tz.key,
)
continue
except Exception as e:
@@ -563,7 +607,8 @@ async def schedule_action(
if schedule_type == "cron" and cron_expression:
try:
from apscheduler.triggers.cron import CronTrigger
trigger = CronTrigger.from_crontab(cron_expression)
tz = await _load_app_timezone()
trigger = CronTrigger.from_crontab(cron_expression, timezone=tz)
scheduler.add_job(
_run_action,
trigger,
@@ -571,7 +616,10 @@ async def schedule_action(
args=[action_id],
replace_existing=True,
)
_LOGGER.info("Scheduled action %d with cron: %s", action_id, cron_expression)
_LOGGER.info(
"Scheduled action %d with cron: %s [tz=%s]",
action_id, cron_expression, tz.key,
)
return
except Exception as e:
_LOGGER.error("Invalid cron for action %d: %s — using interval", action_id, e)
@@ -596,6 +644,92 @@ async def unschedule_action(action_id: int) -> None:
_LOGGER.info("Unscheduled action %d", action_id)
async def reschedule_cron_jobs_for_timezone_change() -> None:
"""Re-add every cron-triggered tracker/action job under the new app timezone.
Called by the admin settings endpoint after the ``timezone`` AppSetting is
updated. APScheduler's ``CronTrigger`` freezes its timezone at construction
time, so a timezone change has no effect on jobs already in the scheduler
— we have to rebuild those jobs. Interval-triggered jobs are tz-agnostic
and are left alone.
"""
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import Action, NotificationTracker, ServiceProvider as ServiceProviderModel
engine = get_engine()
scheduler = get_scheduler()
tz = await _load_app_timezone()
rescheduled = 0
async with AsyncSession(engine) as session:
# Trackers with cron scheduling (scheduler provider + schedule_type=cron).
trackers = (await session.exec(
select(NotificationTracker).where(NotificationTracker.enabled == True) # noqa: E712
)).all()
provider_ids = list({t.provider_id for t in trackers})
provider_types: dict[int, str] = {}
if provider_ids:
rows = await session.exec(
select(ServiceProviderModel).where(ServiceProviderModel.id.in_(provider_ids))
)
provider_types = {p.id: p.type for p in rows.all()}
for tracker in trackers:
if provider_types.get(tracker.provider_id) != "scheduler":
continue
filters = tracker.filters or {}
if filters.get("schedule_type") != "cron":
continue
cron_expr = filters.get("cron_expression", "")
if not cron_expr:
continue
job_id = f"tracker_{tracker.id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
try:
_add_cron_job(scheduler, job_id, tracker.id, cron_expr, tracker.name, tz)
rescheduled += 1
except Exception as e: # noqa: BLE001
_LOGGER.error(
"Failed to re-apply cron for tracker %d on tz change: %s",
tracker.id, e,
)
# Actions with cron schedules.
actions = (await session.exec(
select(Action).where(Action.enabled == True) # noqa: E712
)).all()
from apscheduler.triggers.cron import CronTrigger
for action in actions:
if action.schedule_type != "cron" or not action.schedule_cron:
continue
job_id = f"action_{action.id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
try:
scheduler.add_job(
_run_action,
CronTrigger.from_crontab(action.schedule_cron, timezone=tz),
id=job_id,
args=[action.id],
replace_existing=True,
)
rescheduled += 1
except Exception as e: # noqa: BLE001
_LOGGER.error(
"Failed to re-apply cron for action %d on tz change: %s",
action.id, e,
)
_LOGGER.info(
"Rescheduled %d cron job(s) for new app timezone %s", rescheduled, tz.key,
)
async def _run_action(action_id: int) -> None:
"""Run an action (called by APScheduler)."""
from .action_runner import run_action
@@ -246,6 +246,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
name=provider_name,
tracker_name=tracker_name,
custom_variables=custom_vars,
timezone_name=app_tz,
)
events, new_state = await sched.poll(collection_ids, state_dict)
elif provider_type == "nut":
@@ -317,6 +318,26 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
for event in events:
assets_count = event.added_count or event.removed_count or 0
details: dict[str, Any] = {
"added_count": event.added_count,
"removed_count": event.removed_count,
"provider_type": event.provider_type.value,
}
# Scheduler/periodic events carry the schedule context in ``extra``
# (cron expression, interval, timezone, fire count). Surface that
# in the event log so the dashboard and audit queries can show
# *why* the event fired, not just that it did.
if event.event_type.value == "scheduled_message":
sched_type = tracker_filters.get("schedule_type", "interval")
details["schedule_type"] = sched_type
if sched_type == "cron":
details["cron_expression"] = tracker_filters.get("cron_expression", "")
else:
details["interval_seconds"] = tracker.scan_interval
details["timezone"] = app_tz
fire_count = event.extra.get("fire_count") if event.extra else None
if fire_count is not None:
details["fire_count"] = fire_count
log = EventLog(
user_id=tracker.user_id,
tracker_id=tracker_id,
@@ -327,11 +348,7 @@ async def check_tracker(tracker_id: int) -> dict[str, Any]:
collection_id=event.collection_id,
collection_name=event.collection_name,
assets_count=assets_count,
details={
"added_count": event.added_count,
"removed_count": event.removed_count,
"provider_type": event.provider_type.value,
},
details=details,
)
session.add(log)