feat: add update-available notification system
Lint & Test / test (push) Successful in 10s

- Abstract ReleaseProvider protocol for platform-agnostic version checking
- GiteaReleaseProvider implementation using stdlib urllib
- UpdateChecker service with periodic background checks and WS broadcast
- Persistent dismissible banner in Web UI when a new version is detected
- Health endpoint now returns cached update info
- Configurable via update_check_enabled and update_check_interval settings
- i18n support (EN/RU)
This commit is contained in:
2026-03-25 11:37:09 +03:00
parent 1410a8d2cb
commit 795a15cb8b
12 changed files with 357 additions and 7 deletions
@@ -0,0 +1,77 @@
"""Gitea release provider implementation."""
import json
import logging
import urllib.error
import urllib.request
from typing import Optional
from .release_provider import ReleaseInfo, ReleaseProvider
logger = logging.getLogger(__name__)
# Default repository coordinates
_DEFAULT_BASE_URL = "https://git.dolgolyov-family.by"
_DEFAULT_OWNER = "alexei.dolgolyov"
_DEFAULT_REPO = "media-player-server"
class GiteaReleaseProvider(ReleaseProvider):
"""Fetches the latest release from a Gitea repository."""
def __init__(
self,
base_url: str = _DEFAULT_BASE_URL,
owner: str = _DEFAULT_OWNER,
repo: str = _DEFAULT_REPO,
timeout: float = 10.0,
) -> None:
self._api_url = f"{base_url}/api/v1/repos/{owner}/{repo}/releases"
self._release_page_url = f"{base_url}/{owner}/{repo}/releases/tag"
self._timeout = timeout
async def get_latest_release(self) -> Optional[ReleaseInfo]:
"""Fetch the latest stable release from Gitea API.
Returns:
ReleaseInfo for the latest non-prerelease, or None on failure.
"""
import asyncio
try:
data = await asyncio.to_thread(self._fetch_releases)
except Exception as e:
logger.warning("Failed to check for updates: %s", e)
return None
if not data:
return None
# Find the first non-prerelease, non-draft release
for release in data:
if release.get("draft") or release.get("prerelease"):
continue
tag = release.get("tag_name", "")
version = tag.lstrip("v")
if not version:
continue
return ReleaseInfo(
version=version,
url=f"{self._release_page_url}/{tag}",
prerelease=False,
)
logger.debug("No stable releases found")
return None
def _fetch_releases(self) -> list[dict]:
"""Synchronous HTTP fetch of releases (run in thread)."""
url = f"{self._api_url}?limit=5"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=self._timeout) as resp:
return json.loads(resp.read().decode())
except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, OSError) as e:
raise RuntimeError(f"Gitea API request failed: {e}") from e
+29
View File
@@ -0,0 +1,29 @@
"""Abstract release provider interface for version checking."""
from dataclasses import dataclass
from typing import Protocol
@dataclass(frozen=True)
class ReleaseInfo:
"""Version-provider-agnostic release metadata."""
version: str # e.g. "1.1.0" (no "v" prefix)
url: str # release page URL
prerelease: bool
class ReleaseProvider(Protocol):
"""Abstract interface for fetching the latest release.
Implement this protocol to support different hosting platforms
(Gitea, GitHub, GitLab, etc.).
"""
async def get_latest_release(self) -> ReleaseInfo | None:
"""Fetch the latest stable release.
Returns:
ReleaseInfo if a release was found, None on failure.
"""
...
+120
View File
@@ -0,0 +1,120 @@
"""Provider-agnostic update checker service."""
import asyncio
import logging
from typing import Any, Optional
from .release_provider import ReleaseProvider
from .websocket_manager import ws_manager
logger = logging.getLogger(__name__)
def _parse_version(version: str) -> tuple[int, ...]:
"""Parse a version string into a comparable tuple.
Handles versions like "1.0.0", "1.2.3", ignoring non-numeric suffixes.
"""
parts: list[int] = []
for part in version.split("."):
digits = ""
for ch in part:
if ch.isdigit():
digits += ch
else:
break
if digits:
parts.append(int(digits))
return tuple(parts)
class UpdateChecker:
"""Periodically checks for new releases using a ReleaseProvider."""
def __init__(self, provider: ReleaseProvider, current_version: str) -> None:
self._provider = provider
self._current_version = current_version
self._current_parsed = _parse_version(current_version)
self._task: Optional[asyncio.Task] = None
self._cached_update: Optional[dict[str, Any]] = None
@property
def cached_update(self) -> Optional[dict[str, Any]]:
"""Return the cached update info, or None if up-to-date."""
return self._cached_update
async def check_for_update(self) -> Optional[dict[str, Any]]:
"""Check for a newer release.
Returns:
Dict with current/latest/url if an update exists, None otherwise.
"""
release = await self._provider.get_latest_release()
if release is None:
return None
latest_parsed = _parse_version(release.version)
if latest_parsed <= self._current_parsed:
return None
return {
"current": self._current_version,
"latest": release.version,
"url": release.url,
}
async def start(self, interval: int) -> None:
"""Start periodic update checking.
Checks immediately on start, then every `interval` seconds.
"""
if self._task is not None:
return
self._task = asyncio.create_task(self._check_loop(interval))
logger.info("Update checker started (interval: %ds)", interval)
async def stop(self) -> None:
"""Stop periodic update checking."""
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info("Update checker stopped")
async def _check_loop(self, interval: int) -> None:
"""Background loop that checks for updates periodically."""
# Initial check with a small delay to let the server finish starting
await asyncio.sleep(5)
while True:
try:
update = await self.check_for_update()
if update and update != self._cached_update:
self._cached_update = update
logger.info(
"New version available: %s%s (%s)",
update["current"],
update["latest"],
update["url"],
)
await ws_manager.broadcast(
{"type": "update_available", "data": update}
)
elif update is None and self._cached_update is not None:
# Version was updated (or release removed) — clear cache
self._cached_update = None
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("Update check failed: %s", e)
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
break