From 96fd9106031d130ecc1b8451ec786dd7b4429302 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Mon, 13 Apr 2026 00:12:14 +0300 Subject: [PATCH] fix: resolve ERR_INSUFFICIENT_RESOURCES connection exhaustion - Add concurrency limiter (max 4 GET requests) to API layer, leaving slots for SSE and health checks. Write ops bypass the limiter. - Add AbortController to ContainerStats, project detail page, and dashboard to cancel in-flight requests on navigation/unmount. - Move global SSE connection from layout to events page (only consumer). - Add 30s heartbeat to SSE endpoint to detect zombie connections. - Serialize dashboard project fetches to avoid parallel burst. - Rebuild frontend in dev-server.sh so go:embed stays in sync. --- internal/api/sse.go | 9 ++ scripts/dev-server.sh | 4 + web/src/lib/api.ts | 107 +++++++++++---- web/src/lib/components/ContainerStats.svelte | 27 ++-- web/src/routes/+layout.svelte | 23 +--- web/src/routes/+page.svelte | 133 ++++++++++++++++--- web/src/routes/events/+page.svelte | 17 +-- 7 files changed, 233 insertions(+), 87 deletions(-) diff --git a/internal/api/sse.go b/internal/api/sse.go index 36b6e54..ce9085c 100644 --- a/internal/api/sse.go +++ b/internal/api/sse.go @@ -7,6 +7,7 @@ import ( "log/slog" "net/http" "strings" + "time" "github.com/go-chi/chi/v5" @@ -158,6 +159,10 @@ func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) { }) defer s.eventBus.Unsubscribe(sub) + // Periodic heartbeat so the browser detects dead connections. + heartbeat := time.NewTicker(30 * time.Second) + defer heartbeat.Stop() + ctx := r.Context() for { select { @@ -168,6 +173,10 @@ func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) { return } writeSSE(w, flusher, evt) + case <-heartbeat.C: + // SSE comment line — keeps the connection alive without triggering onmessage. + fmt.Fprintf(w, ": heartbeat\n\n") + flusher.Flush() } } } diff --git a/scripts/dev-server.sh b/scripts/dev-server.sh index 432319b..af0aa2a 100644 --- a/scripts/dev-server.sh +++ b/scripts/dev-server.sh @@ -32,6 +32,10 @@ export ENCRYPTION_KEY export ADMIN_PASSWORD="${ADMIN_PASSWORD:-admin123}" export LISTEN_ADDR="${PORT}" +# Rebuild frontend so go:embed picks up changes. +echo "Building frontend..." +(cd web && npm run build --silent) + echo "Starting Tinyforge on http://localhost:${PORT_NUM}" echo "Login: admin / ${ADMIN_PASSWORD}" exec go run ./cmd/server diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index feced09..bdafa9f 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -43,7 +43,58 @@ class ApiError extends Error { import { getAuthToken, clearAuth } from './auth'; +// ── Concurrency limiter ──────────────────────────────────────────── +// Chrome allows only 6 concurrent HTTP/1.1 connections per host. +// Reserve slots for the persistent SSE stream and health checks by +// capping regular API requests to 4 concurrent. +const MAX_CONCURRENT = 4; +let inflight = 0; +const queue: Array<() => void> = []; + +function acquireSlot(signal?: AbortSignal | null): Promise { + if (inflight < MAX_CONCURRENT) { + inflight++; + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + const entry = () => { inflight++; resolve(); }; + queue.push(entry); + + signal?.addEventListener('abort', () => { + const idx = queue.indexOf(entry); + if (idx !== -1) { + queue.splice(idx, 1); + reject(signal.reason ?? new DOMException('Aborted', 'AbortError')); + } + }, { once: true }); + }); +} + +function releaseSlot(): void { + const next = queue.shift(); + if (next) { + next(); + } else { + inflight--; + } +} + async function request(path: string, init?: RequestInit): Promise { + // Write operations (user-initiated) bypass the concurrency limiter + // so they are never blocked behind background polling. + const method = init?.method?.toUpperCase() ?? 'GET'; + if (method !== 'GET') { + return requestInner(path, init); + } + await acquireSlot(init?.signal); + try { + return await requestInner(path, init); + } finally { + releaseSlot(); + } +} + +async function requestInner(path: string, init?: RequestInit): Promise { const token = getAuthToken(); const headers: Record = { 'Content-Type': 'application/json', @@ -82,8 +133,8 @@ async function request(path: string, init?: RequestInit): Promise { return envelope.data as T; } -function get(path: string): Promise { - return request(path); +function get(path: string, signal?: AbortSignal): Promise { + return request(path, signal ? { signal } : undefined); } function post(path: string, body?: unknown): Promise { @@ -106,12 +157,12 @@ function del(path: string): Promise { // ── Projects ──────────────────────────────────────────────────────── -export function listProjects(): Promise { - return get('/api/projects'); +export function listProjects(signal?: AbortSignal): Promise { + return get('/api/projects', signal); } -export function getProject(id: string): Promise { - return get(`/api/projects/${id}`); +export function getProject(id: string, signal?: AbortSignal): Promise { + return get(`/api/projects/${id}`, signal); } export function createProject(data: Partial): Promise { @@ -142,8 +193,8 @@ export function deleteStage(projectId: string, stageId: string): Promise { // ── Instances ─────────────────────────────────────────────────────── -export function listInstances(projectId: string, stageId: string): Promise { - return get(`/api/projects/${projectId}/stages/${stageId}/instances`); +export function listInstances(projectId: string, stageId: string, signal?: AbortSignal): Promise { + return get(`/api/projects/${projectId}/stages/${stageId}/instances`, signal); } export function deployInstance( @@ -198,8 +249,8 @@ export function restartInstance( // ── Deploys ───────────────────────────────────────────────────────── -export function listDeploys(limit = 50): Promise { - return get(`/api/deploys?limit=${limit}`); +export function listDeploys(limit = 50, signal?: AbortSignal): Promise { + return get(`/api/deploys?limit=${limit}`, signal); } export function getDeployLogs(deployId: string): Promise { @@ -246,7 +297,9 @@ export function testRegistry(id: string): Promise<{ status: string }> { } export function listRegistryTags(registryId: string, image: string): Promise { - return get(`/api/registries/${registryId}/tags/${encodeURIComponent(image)}`); + // Image contains slashes (e.g. "owner/name") — don't encode them; + // the backend route uses a wildcard (/tags/*) that expects path segments. + return get(`/api/registries/${registryId}/tags/${image}`); } export function listRegistryImages(registryId: string): Promise { @@ -255,8 +308,8 @@ export function listRegistryImages(registryId: string): Promise // ── Settings ──────────────────────────────────────────────────────── -export function getSettings(): Promise { - return get('/api/settings'); +export function getSettings(signal?: AbortSignal): Promise { + return get('/api/settings', signal); } export function updateSettings(data: Partial): Promise { @@ -285,14 +338,14 @@ export function fetchContainerLogs( return get(`/api/projects/${projectId}/stages/${stageId}/instances/${instanceId}/logs?tail=${tail}`); } -export function listProjectImages(projectId: string): Promise { - return get(`/api/projects/${projectId}/images`); +export function listProjectImages(projectId: string, signal?: AbortSignal): Promise { + return get(`/api/projects/${projectId}/images`, signal); } -export function getUnusedImageStats(): Promise<{ +export function getUnusedImageStats(signal?: AbortSignal): Promise<{ total_size_mb: number; count: number; threshold_mb: number; exceeded: boolean; }> { - return get<{ total_size_mb: number; count: number; threshold_mb: number; exceeded: boolean }>('/api/docker/unused-images'); + return get<{ total_size_mb: number; count: number; threshold_mb: number; exceeded: boolean }>('/api/docker/unused-images', signal); } export function pruneImages(): Promise<{ images_removed: number; space_reclaimed_mb: number }> { @@ -580,8 +633,8 @@ export function clearAllEvents(): Promise<{ status: string; count: number }> { // ── Stale Containers ──────────────────────────────────────────────── -export function fetchStaleContainers(): Promise { - return get('/api/containers/stale'); +export function fetchStaleContainers(signal?: AbortSignal): Promise { + return get('/api/containers/stale', signal); } export function cleanupStaleContainer(id: string): Promise<{ deleted: string }> { @@ -597,10 +650,12 @@ export function bulkCleanupStaleContainers(): Promise<{ deleted: number }> { export function fetchContainerStats( projectId: string, stageId: string, - instanceId: string + instanceId: string, + signal?: AbortSignal ): Promise { return get( - `/api/projects/${projectId}/stages/${stageId}/instances/${instanceId}/stats` + `/api/projects/${projectId}/stages/${stageId}/instances/${instanceId}/stats`, + signal ); } @@ -608,8 +663,8 @@ export function fetchContainerStats( import type { StaticSite, StaticSiteSecret, FolderEntry, GitProvider, RepoInfo } from './types'; -export function listStaticSites(): Promise { - return get('/api/sites'); +export function listStaticSites(signal?: AbortSignal): Promise { + return get('/api/sites', signal); } export function getStaticSite(id: string): Promise { @@ -710,4 +765,10 @@ export function deleteStaticSiteSecret( return del<{ deleted: string }>(`/api/sites/${siteId}/secrets/${secretId}`); } +export function getStaticSiteStorage( + siteId: string +): Promise { + return get(`/api/sites/${siteId}/storage`); +} + export { ApiError }; diff --git a/web/src/lib/components/ContainerStats.svelte b/web/src/lib/components/ContainerStats.svelte index 189211a..b2a612e 100644 --- a/web/src/lib/components/ContainerStats.svelte +++ b/web/src/lib/components/ContainerStats.svelte @@ -18,24 +18,19 @@ let error = $state(false); $effect(() => { - let cancelled = false; - let inflight = false; + let controller = new AbortController(); async function load() { - if (inflight) return; // Skip if previous request still pending. - inflight = true; + // Abort any previous in-flight request before starting a new one. + controller.abort(); + controller = new AbortController(); try { - const result = await api.fetchContainerStats(projectId, stageId, instanceId); - if (!cancelled) { - stats = result; - error = false; - } - } catch { - if (!cancelled) { - error = true; - } - } finally { - inflight = false; + const result = await api.fetchContainerStats(projectId, stageId, instanceId, controller.signal); + stats = result; + error = false; + } catch (e) { + if (e instanceof DOMException && e.name === 'AbortError') return; + error = true; } } @@ -45,7 +40,7 @@ const interval = setInterval(load, 30_000); return () => { - cancelled = true; + controller.abort(); clearInterval(interval); }; }); diff --git a/web/src/routes/+layout.svelte b/web/src/routes/+layout.svelte index bc7fe9c..ea3cc9d 100644 --- a/web/src/routes/+layout.svelte +++ b/web/src/routes/+layout.svelte @@ -8,12 +8,9 @@ import LocaleSwitcher from '$lib/components/LocaleSwitcher.svelte'; import { IconDashboard, IconProjects, IconDeploy, IconEvents, IconWifi, IconSettings, IconMenu, IconX, IconLogout, IconGlobe } from '$lib/components/icons'; import { goto } from '$app/navigation'; - import { connectGlobalEvents, type SSEConnection } from '$lib/sse'; - import { instanceStatusStore } from '$lib/stores/instance-status'; import { resolvedTheme, applyTheme } from '$lib/stores/theme'; import { exchangeOidcToken, setAuthToken, clearAuth, isAuthenticated } from '$lib/auth'; import { logout as apiLogout, getHealth } from '$lib/api'; - import { publishEventLog } from '$lib/stores/event-log-bus'; import type { DockerHealth, ProxyHealth } from '$lib/types'; import { t } from '$lib/i18n'; @@ -38,7 +35,6 @@ return pathname.startsWith(href); } - let sseConnection: SSEConnection | null = null; let sidebarOpen = $state(false); let dockerHealth = $state(null); let proxyHealth = $state(null); @@ -85,26 +81,13 @@ } }); - // Start SSE and health polling when authenticated. + // Start health polling when authenticated. // Uses $effect to react to route changes (e.g., after login navigation). $effect(() => { void $page.url.pathname; - if (!isAuthenticated() || sseConnection) return; + if (!isAuthenticated() || healthInterval) return; - sseConnection = connectGlobalEvents({ - onInstanceStatus(payload) { - instanceStatusStore.update(payload); - }, - onDeployStatus(payload) { - instanceStatusStore.notifyDeploy(payload); - }, - onEventLog(payload) { - publishEventLog(payload); - } - }); - - // Poll Docker health every 30s. async function checkHealth() { try { const h = await getHealth(); @@ -121,8 +104,6 @@ }); onDestroy(() => { - sseConnection?.close(); - sseConnection = null; if (healthInterval) clearInterval(healthInterval); }); diff --git a/web/src/routes/+page.svelte b/web/src/routes/+page.svelte index e3b0465..3ef6224 100644 --- a/web/src/routes/+page.svelte +++ b/web/src/routes/+page.svelte @@ -1,11 +1,11 @@ @@ -96,7 +127,7 @@ -
+
@@ -152,6 +200,53 @@ + + {#if !loading} +
+
+

{$t('dashboard.staticSites')}

+ {#if sites.length > 0} + + {$t('dashboard.viewAllSites')} → + + {/if} +
+ + {#if sites.length === 0} +
+ +
+ {:else} + + {/if} +
+ {/if} +

{$t('dashboard.projects')}

diff --git a/web/src/routes/events/+page.svelte b/web/src/routes/events/+page.svelte index 0b5af56..6dea53d 100644 --- a/web/src/routes/events/+page.svelte +++ b/web/src/routes/events/+page.svelte @@ -8,8 +8,7 @@ import { fetchEventLog, fetchEventLogStats, clearAllEvents, deleteEvent } from '$lib/api'; import ConfirmDialog from '$lib/components/ConfirmDialog.svelte'; import { toasts } from '$lib/stores/toast'; - import { subscribeEventLog } from '$lib/stores/event-log-bus'; - import type { EventLogSSEPayload } from '$lib/sse'; + import { connectGlobalEvents, type SSEConnection, type EventLogSSEPayload } from '$lib/sse'; import type { EventLogEntry, EventLogStats } from '$lib/types'; import EventLogEntryComponent from '$lib/components/EventLogEntry.svelte'; import EventLogFilter from '$lib/components/EventLogFilter.svelte'; @@ -36,7 +35,7 @@ const PAGE_SIZE = 50; let offset = $state(0); - let unsubscribeEventLog: (() => void) | null = null; + let sseConnection: SSEConnection | null = null; let listEl: HTMLDivElement | undefined = $state(); let showClearConfirm = $state(false); @@ -199,15 +198,17 @@ loadEvents(); loadStats(); - // Subscribe to event_log events from the global SSE connection (no duplicate connection). - unsubscribeEventLog = subscribeEventLog((payload: EventLogSSEPayload) => { - handleSSEEvent(payload); + // Open SSE connection only while this page is mounted. + sseConnection = connectGlobalEvents({ + onEventLog(payload) { + handleSSEEvent(payload); + } }); }); onDestroy(() => { - unsubscribeEventLog?.(); - unsubscribeEventLog = null; + sseConnection?.close(); + sseConnection = null; });