"""Gitea-specific bot command handler.""" from __future__ import annotations import asyncio import logging from collections.abc import Callable, Coroutine from typing import Any from ..database.models import ( CommandConfig, CommandTracker, ServiceProvider, TelegramBot, ) from ..services import make_gitea_provider from ..services.http_session import get_http_session from .base import CommandResponse, ProviderCommandHandler from .command_utils import get_last_event_str, get_tracked_collection_ids, get_trackers_for_provider from .handler import _render_cmd_template _LOGGER = logging.getLogger(__name__) _GITEA_COMMANDS = {"status", "repos", "issues", "prs", "commits"} def _get_tracked_repos( provider: ServiceProvider, trackers: list, ) -> list[tuple[ServiceProvider, str, str]]: """Get (provider, owner, repo) tuples from tracked collection_ids.""" if not provider.config.get("api_token"): return [] collection_ids = get_tracked_collection_ids(provider, trackers) repos: list[tuple[ServiceProvider, str, str]] = [] for full_name in collection_ids: parts = full_name.split("/", 1) if len(parts) == 2: repos.append((provider, parts[0], parts[1])) return repos # --------------------------------------------------------------------------- # Command dispatch table # --------------------------------------------------------------------------- _TEXT_COMMANDS: dict[str, Callable[..., Coroutine[Any, Any, dict[str, Any]]]] = {} def _text_cmd(fn: Callable[..., Coroutine[Any, Any, dict[str, Any]]]) -> Callable[..., Coroutine[Any, Any, dict[str, Any]]]: """Register a function in the text command dispatch table.""" name = fn.__name__.removeprefix("_cmd_") _TEXT_COMMANDS[name] = fn return fn class GiteaCommandHandler(ProviderCommandHandler): """Handles Gitea-specific bot commands.""" provider_type = "gitea" def get_provider_commands(self) -> set[str]: return _GITEA_COMMANDS def get_rate_categories(self) -> dict[str, str]: return { "repos": "api", "issues": "api", "prs": "api", "commits": "api", } async def handle( self, cmd: str, args: str, count: int, locale: str, response_mode: str, provider: ServiceProvider, cmd_templates: dict[str, dict[str, str]], bot: TelegramBot, tracker: CommandTracker, config: CommandConfig, ) -> CommandResponse | None: fn = _TEXT_COMMANDS.get(cmd) if fn is None: return None ctx = await fn(provider, count) return CommandResponse(text=_render_cmd_template(cmd_templates, cmd, locale, ctx)) @_text_cmd async def _cmd_status(provider: ServiceProvider, count: int) -> dict[str, Any]: trackers = await get_trackers_for_provider(provider.id) tracked_repos = _get_tracked_repos(provider, trackers) # Get server version server_version = "unknown" if provider.config.get("api_token"): http = await get_http_session() gitea = make_gitea_provider(http, provider) version = await gitea.client.get_server_version() if version: server_version = version tracker_ids = [t.id for t in trackers] last_str = await get_last_event_str(tracker_ids) return { "repos_count": len(tracked_repos), "server_version": server_version, "last_event": last_str, } @_text_cmd async def _cmd_repos(provider: ServiceProvider, count: int) -> dict[str, Any]: trackers = await get_trackers_for_provider(provider.id) tracked_repos = _get_tracked_repos(provider, trackers) repos_data: list[dict[str, Any]] = [] http = await get_http_session() async def _fetch_repo(prov: ServiceProvider, owner: str, repo: str) -> dict[str, Any]: gitea = make_gitea_provider(http, prov) # Use direct get_repo endpoint instead of listing all repos r = await gitea.client.get_repo(owner, repo) if r: return { "full_name": r.get("full_name", ""), "description": r.get("description", ""), "stars": r.get("stars_count", 0), "url": r.get("html_url", ""), } return { "full_name": f"{owner}/{repo}", "description": "", "stars": 0, "url": "", } tasks = [_fetch_repo(prov, owner, repo) for prov, owner, repo in tracked_repos] results = await asyncio.gather(*tasks, return_exceptions=True) for (prov, owner, repo), result in zip(tracked_repos, results): if isinstance(result, Exception): _LOGGER.warning("Failed to fetch repo %s/%s: %s", owner, repo, result) repos_data.append({ "full_name": f"{owner}/{repo}", "description": "?", "stars": 0, "url": "", }) else: repos_data.append(result) return {"repos": repos_data} @_text_cmd async def _cmd_issues(provider: ServiceProvider, count: int) -> dict[str, Any]: trackers = await get_trackers_for_provider(provider.id) tracked_repos = _get_tracked_repos(provider, trackers) all_issues: list[dict[str, Any]] = [] http = await get_http_session() async def _fetch_issues(prov: ServiceProvider, owner: str, repo: str) -> list[dict[str, Any]]: gitea = make_gitea_provider(http, prov) return await gitea.client.get_repo_issues(owner, repo, limit=count) tasks = [_fetch_issues(prov, owner, repo) for prov, owner, repo in tracked_repos] results = await asyncio.gather(*tasks, return_exceptions=True) for (prov, owner, repo), result in zip(tracked_repos, results): if isinstance(result, Exception): _LOGGER.warning("Failed to fetch issues for %s/%s: %s", owner, repo, result) continue for issue in result: all_issues.append({ "repo": f"{owner}/{repo}", "number": issue.get("number", 0), "title": issue.get("title", ""), "url": issue.get("html_url", ""), "user": issue.get("user", {}).get("login", ""), "state": issue.get("state", ""), }) all_issues.sort(key=lambda i: i.get("number", 0), reverse=True) return {"issues": all_issues[:count]} @_text_cmd async def _cmd_prs(provider: ServiceProvider, count: int) -> dict[str, Any]: trackers = await get_trackers_for_provider(provider.id) tracked_repos = _get_tracked_repos(provider, trackers) all_prs: list[dict[str, Any]] = [] http = await get_http_session() async def _fetch_prs(prov: ServiceProvider, owner: str, repo: str) -> list[dict[str, Any]]: gitea = make_gitea_provider(http, prov) return await gitea.client.get_repo_pulls(owner, repo, limit=count) tasks = [_fetch_prs(prov, owner, repo) for prov, owner, repo in tracked_repos] results = await asyncio.gather(*tasks, return_exceptions=True) for (prov, owner, repo), result in zip(tracked_repos, results): if isinstance(result, Exception): _LOGGER.warning("Failed to fetch PRs for %s/%s: %s", owner, repo, result) continue for pr in result: all_prs.append({ "repo": f"{owner}/{repo}", "number": pr.get("number", 0), "title": pr.get("title", ""), "url": pr.get("html_url", ""), "user": pr.get("user", {}).get("login", ""), "state": pr.get("state", ""), }) all_prs.sort(key=lambda p: p.get("number", 0), reverse=True) return {"prs": all_prs[:count]} @_text_cmd async def _cmd_commits(provider: ServiceProvider, count: int) -> dict[str, Any]: trackers = await get_trackers_for_provider(provider.id) tracked_repos = _get_tracked_repos(provider, trackers) all_commits: list[dict[str, Any]] = [] http = await get_http_session() async def _fetch_commits(prov: ServiceProvider, owner: str, repo: str) -> list[dict[str, Any]]: gitea = make_gitea_provider(http, prov) return await gitea.client.get_repo_commits(owner, repo, limit=count) tasks = [_fetch_commits(prov, owner, repo) for prov, owner, repo in tracked_repos] results = await asyncio.gather(*tasks, return_exceptions=True) for (prov, owner, repo), result in zip(tracked_repos, results): if isinstance(result, Exception): _LOGGER.warning("Failed to fetch commits for %s/%s: %s", owner, repo, result) continue for c in result: commit_data = c.get("commit", {}) all_commits.append({ "repo": f"{owner}/{repo}", "short_id": c.get("sha", "")[:7], "message": commit_data.get("message", "").split("\n")[0][:80], "author": commit_data.get("author", {}).get("name", ""), "url": c.get("html_url", ""), }) return {"commits": all_commits[:count]}