Files
tiny-forge/web/src/lib/sse.ts
T
alexei.dolgolyov 791cd4d6af
Build / build (push) Successful in 12m20s
feat: rename Docker Watcher to Tinyforge
Rebrand the project as Tinyforge to reflect its evolution from a Docker
container watcher into a self-hosted mini CI/deployment platform.

Rename covers: Go module path, Docker labels, DB/config filenames,
JWT issuer, Dockerfile binary, docker-compose, CI workflows, frontend
i18n, README with static sites docs, and all code comments.
2026-04-12 21:30:39 +03:00

211 lines
5.7 KiB
TypeScript

/**
* SSE client helper with auto-reconnect and exponential backoff.
*
* Provides type-safe event handling for Tinyforge's real-time
* event streams (deploy logs and instance status changes).
*/
import { getAuthToken } from './auth';
// ── Types ──────────────────────────────────────────────────────────
export type SSEEventType = 'deploy_log' | 'instance_status' | 'deploy_status' | 'event_log';
export interface SSEEvent<T = unknown> {
type: SSEEventType;
payload: T;
}
export interface DeployLogPayload {
deploy_id: string;
message: string;
level: 'info' | 'warn' | 'error';
}
export interface InstanceStatusPayload {
instance_id: string;
project_id: string;
stage_id: string;
status: string;
}
export interface DeployStatusPayload {
deploy_id: string;
project_id: string;
stage_id: string;
image_tag: string;
status: string;
error?: string;
}
export interface EventLogSSEPayload {
id: number;
source: string;
severity: string;
message: string;
metadata: string;
created_at: string;
}
type SSEPayload = DeployLogPayload | InstanceStatusPayload | DeployStatusPayload | EventLogSSEPayload;
export interface SSEOptions {
/** Called for each SSE event received. */
onEvent: (event: SSEEvent<SSEPayload>) => void;
/** Called when the connection is established. */
onOpen?: () => void;
/** Called when the connection is lost. Receives the retry attempt number. */
onError?: (attempt: number) => void;
/** Called when reconnection is given up (max retries exceeded). */
onGiveUp?: () => void;
/** Maximum number of reconnect attempts. 0 = infinite. Default: 0 */
maxRetries?: number;
/** Initial backoff delay in ms. Default: 1000 */
initialDelay?: number;
/** Maximum backoff delay in ms. Default: 30000 */
maxDelay?: number;
}
// ── SSE Connection ─────────────────────────────────────────────────
export interface SSEConnection {
/** Close the connection and stop reconnecting. */
close: () => void;
}
/**
* Creates an SSE connection to the given URL with auto-reconnect.
*
* Uses exponential backoff with jitter for reconnection.
* Returns an object with a `close` method to cleanly shut down.
*/
export function connectSSE(url: string, options: SSEOptions): SSEConnection {
const {
onEvent,
onOpen,
onError,
onGiveUp,
maxRetries = 0,
initialDelay = 1000,
maxDelay = 30000
} = options;
let eventSource: EventSource | null = null;
let retryCount = 0;
let retryTimeout: ReturnType<typeof setTimeout> | null = null;
let closed = false;
function connect(): void {
if (closed) return;
// Append auth token as query param (EventSource doesn't support custom headers).
const token = getAuthToken();
const authUrl = token ? `${url}${url.includes('?') ? '&' : '?'}token=${encodeURIComponent(token)}` : url;
eventSource = new EventSource(authUrl);
eventSource.onopen = () => {
retryCount = 0;
onOpen?.();
};
eventSource.onmessage = (messageEvent: MessageEvent) => {
try {
const parsed: SSEEvent<SSEPayload> = JSON.parse(messageEvent.data);
onEvent(parsed);
} catch {
// Ignore malformed events.
}
};
eventSource.onerror = () => {
eventSource?.close();
eventSource = null;
if (closed) return;
retryCount++;
onError?.(retryCount);
if (maxRetries > 0 && retryCount > maxRetries) {
onGiveUp?.();
return;
}
// Exponential backoff with jitter.
const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), maxDelay);
const jitter = delay * 0.2 * Math.random();
const totalDelay = delay + jitter;
retryTimeout = setTimeout(connect, totalDelay);
};
}
connect();
return {
close() {
closed = true;
if (retryTimeout !== null) {
clearTimeout(retryTimeout);
retryTimeout = null;
}
eventSource?.close();
eventSource = null;
}
};
}
// ── Convenience Factories ──────────────────────────────────────────
/**
* Connect to deploy log SSE stream for a specific deploy.
* Streams existing logs first, then real-time updates.
*/
export function connectDeployLogs(
deployId: string,
callbacks: {
onLog: (log: DeployLogPayload) => void;
onStatus?: (status: DeployStatusPayload) => void;
onOpen?: () => void;
onError?: (attempt: number) => void;
}
): SSEConnection {
return connectSSE(`/api/deploys/${deployId}/logs`, {
onEvent(event) {
if (event.type === 'deploy_log') {
callbacks.onLog(event.payload as DeployLogPayload);
} else if (event.type === 'deploy_status') {
callbacks.onStatus?.(event.payload as DeployStatusPayload);
}
},
onOpen: callbacks.onOpen,
onError: callbacks.onError
});
}
/**
* Connect to the global events SSE stream.
* Receives instance status changes and deploy status updates.
*/
export function connectGlobalEvents(callbacks: {
onInstanceStatus?: (payload: InstanceStatusPayload) => void;
onDeployStatus?: (payload: DeployStatusPayload) => void;
onEventLog?: (payload: EventLogSSEPayload) => void;
onOpen?: () => void;
onError?: (attempt: number) => void;
}): SSEConnection {
return connectSSE('/api/events', {
onEvent(event) {
if (event.type === 'instance_status') {
callbacks.onInstanceStatus?.(event.payload as InstanceStatusPayload);
} else if (event.type === 'deploy_status') {
callbacks.onDeployStatus?.(event.payload as DeployStatusPayload);
} else if (event.type === 'event_log') {
callbacks.onEventLog?.(event.payload as EventLogSSEPayload);
}
},
onOpen: callbacks.onOpen,
onError: callbacks.onError
});
}