refactor: provider-agnostic bot command system + Gitea commands

Refactored the monolithic command handler (707 lines) into a pluggable
provider-handler architecture:

- Abstract ProviderCommandHandler interface (base.py)
- Handler dispatch registry routes commands by provider type
- Extracted all Immich logic into ImmichCommandHandler
- New GiteaCommandHandler with /status, /repos, /issues, /prs, /commits
- Multi-provider routing: groups context by provider type, finds handler
- handler.py reduced to ~280 line thin orchestrator

Gitea commands:
- Extended GiteaClient with get_repo_issues, get_repo_pulls, get_repo_commits
- 30 Jinja2 command templates (15 EN + 15 RU)
- Gitea capabilities updated with 6 commands + 15 command_slots
- Default command config + command template config seeded on startup
- Rate limiting: Gitea API commands share "api" category (15s cooldown)

Also:
- Command configs API accepts "gitea" provider type
- System command configs (user_id=0) visible to all users
- Webhook URL shown on Gitea provider card and edit form
- Scan interval hidden for webhook-based providers
This commit is contained in:
2026-03-22 17:44:47 +03:00
parent 0562f78b35
commit 63437c1841
45 changed files with 1175 additions and 397 deletions
@@ -0,0 +1,252 @@
"""Gitea-specific bot command handler."""
from __future__ import annotations
import logging
from typing import Any
import aiohttp
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from ..database.engine import get_engine
from ..database.models import (
CommandConfig, CommandTracker, EventLog,
NotificationTracker, ServiceProvider, TelegramBot,
)
from ..services import make_gitea_provider
from .base import ProviderCommandHandler
from .handler import _render_cmd_template, _get_notification_trackers_for_providers
_LOGGER = logging.getLogger(__name__)
_GITEA_COMMANDS = {"status", "repos", "issues", "prs", "commits"}
class GiteaCommandHandler(ProviderCommandHandler):
"""Handles Gitea-specific bot commands."""
provider_type = "gitea"
def get_provider_commands(self) -> set[str]:
return _GITEA_COMMANDS
def get_rate_categories(self) -> dict[str, str]:
return {
"repos": "api", "issues": "api",
"prs": "api", "commits": "api",
}
async def handle(
self,
cmd: str,
args: str,
count: int,
locale: str,
response_mode: str,
providers_map: dict[int, ServiceProvider],
cmd_templates: dict[str, dict[str, str]],
bot: TelegramBot,
ctx_tuples: list[tuple[CommandTracker, CommandConfig, ServiceProvider]],
) -> str | list[dict[str, Any]] | None:
if cmd == "status":
ctx = await _cmd_status(providers_map)
return _render_cmd_template(cmd_templates, "status", locale, ctx)
if cmd == "repos":
ctx = await _cmd_repos(providers_map)
return _render_cmd_template(cmd_templates, "repos", locale, ctx)
if cmd == "issues":
ctx = await _cmd_issues(providers_map, count)
return _render_cmd_template(cmd_templates, "issues", locale, ctx)
if cmd == "prs":
ctx = await _cmd_prs(providers_map, count)
return _render_cmd_template(cmd_templates, "prs", locale, ctx)
if cmd == "commits":
ctx = await _cmd_commits(providers_map, count)
return _render_cmd_template(cmd_templates, "commits", locale, ctx)
return None
def _get_tracked_repos(
providers_map: dict[int, ServiceProvider],
trackers: list[NotificationTracker],
) -> list[tuple[ServiceProvider, str, str]]:
"""Get (provider, owner, repo) tuples from tracked collection_ids."""
repos: list[tuple[ServiceProvider, str, str]] = []
for tracker in trackers:
provider = providers_map.get(tracker.provider_id)
if not provider or provider.type != "gitea":
continue
if not provider.config.get("api_token"):
continue
for full_name in (tracker.collection_ids or []):
parts = full_name.split("/", 1)
if len(parts) == 2:
repos.append((provider, parts[0], parts[1]))
# Also check filters.collections
for tracker in trackers:
provider = providers_map.get(tracker.provider_id)
if not provider or provider.type != "gitea":
continue
if not provider.config.get("api_token"):
continue
for full_name in (tracker.filters or {}).get("collections", []):
parts = full_name.split("/", 1)
if len(parts) == 2:
entry = (provider, parts[0], parts[1])
if entry not in repos:
repos.append(entry)
return repos[:20] # Cap to prevent API hammering
async def _cmd_status(providers_map: dict[int, ServiceProvider]) -> dict[str, Any]:
provider_ids = set(providers_map.keys())
trackers = await _get_notification_trackers_for_providers(provider_ids)
tracked_repos = _get_tracked_repos(providers_map, trackers)
# Get server version from first Gitea provider with token
server_version = "unknown"
async with aiohttp.ClientSession() as http:
for provider in providers_map.values():
if provider.type == "gitea" and provider.config.get("api_token"):
gitea = make_gitea_provider(http, provider)
version = await gitea.client.get_server_version()
if version:
server_version = version
break
# Last event
engine = get_engine()
async with AsyncSession(engine) as session:
tracker_ids = [t.id for t in trackers]
if tracker_ids:
result = await session.exec(
select(EventLog)
.where(EventLog.tracker_id.in_(tracker_ids))
.order_by(EventLog.created_at.desc()).limit(1)
)
last_event = result.first()
else:
last_event = None
last_str = last_event.created_at.strftime("%Y-%m-%d %H:%M") if last_event else "-"
return {
"repos_count": len(tracked_repos),
"server_version": server_version,
"last_event": last_str,
}
async def _cmd_repos(providers_map: dict[int, ServiceProvider]) -> dict[str, Any]:
provider_ids = set(providers_map.keys())
trackers = await _get_notification_trackers_for_providers(provider_ids)
tracked_repos = _get_tracked_repos(providers_map, trackers)
repos_data: list[dict[str, Any]] = []
async with aiohttp.ClientSession() as http:
for provider, owner, repo in tracked_repos:
gitea = make_gitea_provider(http, provider)
try:
all_repos = await gitea.client.get_repos(limit=50)
for r in all_repos:
if r.get("full_name") == f"{owner}/{repo}":
repos_data.append({
"full_name": r.get("full_name", ""),
"description": r.get("description", ""),
"stars": r.get("stars_count", 0),
"url": r.get("html_url", ""),
})
break
else:
repos_data.append({
"full_name": f"{owner}/{repo}",
"description": "",
"stars": 0,
"url": "",
})
except Exception:
repos_data.append({
"full_name": f"{owner}/{repo}",
"description": "?",
"stars": 0,
"url": "",
})
return {"repos": repos_data}
async def _cmd_issues(
providers_map: dict[int, ServiceProvider], count: int,
) -> dict[str, Any]:
provider_ids = set(providers_map.keys())
trackers = await _get_notification_trackers_for_providers(provider_ids)
tracked_repos = _get_tracked_repos(providers_map, trackers)
all_issues: list[dict[str, Any]] = []
async with aiohttp.ClientSession() as http:
for provider, owner, repo in tracked_repos:
gitea = make_gitea_provider(http, provider)
issues = await gitea.client.get_repo_issues(owner, repo, limit=count)
for issue in issues:
all_issues.append({
"repo": f"{owner}/{repo}",
"number": issue.get("number", 0),
"title": issue.get("title", ""),
"url": issue.get("html_url", ""),
"user": issue.get("user", {}).get("login", ""),
"state": issue.get("state", ""),
})
all_issues.sort(key=lambda i: i.get("number", 0), reverse=True)
return {"issues": all_issues[:count]}
async def _cmd_prs(
providers_map: dict[int, ServiceProvider], count: int,
) -> dict[str, Any]:
provider_ids = set(providers_map.keys())
trackers = await _get_notification_trackers_for_providers(provider_ids)
tracked_repos = _get_tracked_repos(providers_map, trackers)
all_prs: list[dict[str, Any]] = []
async with aiohttp.ClientSession() as http:
for provider, owner, repo in tracked_repos:
gitea = make_gitea_provider(http, provider)
prs = await gitea.client.get_repo_pulls(owner, repo, limit=count)
for pr in prs:
all_prs.append({
"repo": f"{owner}/{repo}",
"number": pr.get("number", 0),
"title": pr.get("title", ""),
"url": pr.get("html_url", ""),
"user": pr.get("user", {}).get("login", ""),
"state": pr.get("state", ""),
})
all_prs.sort(key=lambda p: p.get("number", 0), reverse=True)
return {"prs": all_prs[:count]}
async def _cmd_commits(
providers_map: dict[int, ServiceProvider], count: int,
) -> dict[str, Any]:
provider_ids = set(providers_map.keys())
trackers = await _get_notification_trackers_for_providers(provider_ids)
tracked_repos = _get_tracked_repos(providers_map, trackers)
all_commits: list[dict[str, Any]] = []
async with aiohttp.ClientSession() as http:
for provider, owner, repo in tracked_repos:
gitea = make_gitea_provider(http, provider)
commits = await gitea.client.get_repo_commits(owner, repo, limit=count)
for c in commits:
commit_data = c.get("commit", {})
all_commits.append({
"repo": f"{owner}/{repo}",
"short_id": c.get("sha", "")[:7],
"message": commit_data.get("message", "").split("\n")[0][:80],
"author": commit_data.get("author", {}).get("name", ""),
"url": c.get("html_url", ""),
})
return {"commits": all_commits[:count]}