feat(assistant): стриминг ответов Квантика (фича 1/6)

Ответ модели «печатается» вживую через SSE поверх POST (fetch-stream,
не EventSource). Бэкенд: callLLMStream (stream:true, парсинг SSE upstream) +
callLLMStreamFailover (failover только до первого куска) + endpoint
POST /assistant/ask/stream (события meta|delta|done; быстрые пути FAQ/кэш/мета
отдаются одним done). buildAskMessages выделен из askModel (DRY).
Клиент: LS.assistantAskStream (fetch-stream + парсер SSE). Виджет: send()
стримит дельты как plain-текст с CSS-кареткой, на done — KaTeX-рендер,
источники, ссылки, оценка. Фоллбэк на sendNonStream (старый путь) если
стриминг недоступен/упал до первого куска. Cache-Control: no-transform
отключает буферизацию compression.

Проверено против живого шлюза: 24 дельты, первый текст ~1.3с, 100% русский.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Maxim Dolgolyov
2026-06-24 14:50:11 +03:00
parent 5b4d9324a4
commit 089f93b8ee
4 changed files with 240 additions and 3 deletions
+125 -2
View File
@@ -453,6 +453,64 @@ async function callLLMFailover(messages, maxTokens) {
return last;
}
/* Потоковый вызов OpenAI-совместимого chat/completions (stream:true).
* onDelta(piece) — на каждый кусок текста. Возвращает { text, any, error }. */
async function callLLMStream(messages, maxTokens, cfg, onDelta) {
if (typeof fetch !== 'function' || !cfg.on) return { text: null, any: false, error: 'off' };
const ctrl = new AbortController();
const timer = setTimeout(() => ctrl.abort(), 60000); // стриминг длиннее обычного
try {
const r = await fetch(cfg.url, {
method: 'POST',
headers: Object.assign({ 'Content-Type': 'application/json' }, cfg.key ? { Authorization: `Bearer ${cfg.key}` } : {}),
body: JSON.stringify({ model: cfg.model, temperature: 0.3, max_tokens: maxTokens || 1200, messages, stream: true }),
signal: ctrl.signal,
});
if (!r.ok) return { text: null, any: false, error: r.status === 429 ? 'rate_limit' : 'http', status: r.status };
if (!r.body) return { text: null, any: false, error: 'empty' };
const dec = new TextDecoder();
let buf = '', full = '', any = false;
for await (const chunk of r.body) {
buf += dec.decode(chunk, { stream: true });
let nl;
while ((nl = buf.indexOf('\n')) >= 0) {
const line = buf.slice(0, nl).trim(); buf = buf.slice(nl + 1);
if (!line.startsWith('data:')) continue;
const data = line.slice(5).trim();
if (data === '[DONE]') return { text: full || null, any, error: full ? null : 'empty' };
try {
const j = JSON.parse(data);
const d = j.choices && j.choices[0] && j.choices[0].delta;
const piece = d && d.content;
if (piece) { full += piece; any = true; onDelta(piece); }
} catch (e) { /* частичный/служебный кусок — пропускаем */ }
}
}
return { text: full || null, any, error: full ? null : 'empty' };
} catch (e) { return { text: null, any: false, error: e.name === 'AbortError' ? 'timeout' : 'network' }; }
finally { clearTimeout(timer); }
}
/* Стриминг с перебором провайдеров. Failover возможен ТОЛЬКО до первого куска;
* как только клиенту ушёл текст (any) — остаёмся на этом провайдере. */
async function callLLMStreamFailover(messages, maxTokens, onDelta) {
const cfgs = providersOrdered();
if (!cfgs.length) return { text: null, error: 'off' };
let firstErr = null;
for (let i = 0; i < cfgs.length; i++) {
const res = await callLLMStream(messages, maxTokens, cfgs[i], onDelta);
if (i === 0) firstErr = res.error;
if (res.text) {
if (i === 0) _clearFailover(); else _recordFailover(cfgs[0], cfgs[i], firstErr);
return res;
}
if (res.any) return res; // часть уже улетела клиенту — переключиться нельзя
if (!_RETRYABLE[res.error]) break;
}
if (_RETRYABLE[firstErr]) _recordFailover(cfgs[0], null, firstErr);
return { text: null, error: firstErr || 'error' };
}
/* Тест-пинг для админки: подробный статус (status/ошибка/пример ответа). */
async function pingLLM(override) {
const cfg = override || llmConfig();
@@ -498,7 +556,8 @@ const META_RE = new RegExp('(' + _SELF + '[\\sа-яёa-z0-9,?!.-]{0,25}' + _TERM
'|на\\s+ч[её]м\\s+ты\\s+(?:работа|сдела|постро|основ)|кто\\s+тебя\\s+(?:сделал|создал|обуч|разработ|написал)|систем[а-яё]*\\s+промпт|what\\s+model\\s+are\\s+you|which\\s+(?:ai\\s+)?model|your\\s+system\\s+prompt)', 'i');
const META_ANSWER = 'Я — Квантик, помощник LearnSpace. Помогаю с учёбой и навигацией по платформе. Давай вернёмся к делу — что объяснить или подсказать?';
async function askModel(q, hits, context, history, role, mode, mem) {
// Сборка messages+cap для модели — общая для обычного и стримингового ответа.
function buildAskMessages(q, hits, context, history, role, mode, mem) {
const ref = hits.map((h, i) => `${i + 1}. ${h.q}\n${h.a}${h.url ? ` (раздел: ${h.url})` : ''}`).join('\n') || '(пусто)';
const user = (context ? `Контекст (опирайся на него, если относится к вопросу):\n${context}\n\n` : '') +
`Справка по платформе:\n${ref}\n\nВопрос: ${q}`;
@@ -518,6 +577,11 @@ async function askModel(q, hits, context, history, role, mode, mem) {
msgs.push({ role: 'user', content: user });
// подсказка короткая; ответ/проверка — длиннее, чтобы пошаговое решение с формулами не обрезалось на середине
const cap = mode === 'hint' ? 320 : (mode === 'check' ? 900 : 1200);
return { msgs, cap };
}
async function askModel(q, hits, context, history, role, mode, mem) {
const { msgs, cap } = buildAskMessages(q, hits, context, history, role, mode, mem);
return callLLMFailover(msgs, cap);
}
@@ -574,6 +638,65 @@ async function ask(req, res) {
res.json({ source: 'faq', answer: null, answers: faqJson, sources: [] });
}
/* ── POST /api/assistant/ask/stream ── то же, что ask, но ответ модели стримится
* по SSE (event: meta|delta|done). Быстрые пути (FAQ/кэш/мета) отдаются одним done. */
async function askStream(req, res) {
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // не буферизовать за прокси
if (res.flushHeaders) res.flushHeaders();
const sse = (event, data) => { try { res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); } catch (e) {} };
const q = String((req.body && req.body.q) || '').trim().slice(0, 500);
if (!q || q.length < 2) { sse('done', { source: 'faq', answer: null, answers: [] }); return res.end(); }
if (META_RE.test(q)) { sse('delta', { t: META_ANSWER }); sse('done', { source: 'model', answers: [], sources: [] }); return res.end(); }
const pageCtx = String((req.body && req.body.context) || '').slice(0, 4000);
const mode = ['hint', 'check'].includes(req.body && req.body.mode) ? req.body.mode : 'answer';
let history = (req.body && req.body.history);
history = Array.isArray(history) ? history.slice(-6) : [];
const hits = searchFaq(q, 3);
const faqJson = hits.map(h => ({ id: h.id, q: h.q, a: h.a, url: h.url || null }));
sse('meta', { answers: faqJson });
if (!providersOrdered().length) { bumpUsage('faq'); sse('done', { source: 'faq', answer: null, answers: faqJson, sources: [] }); return res.end(); }
const rag = ragContext(q);
const mem = _memoryBlock(req.user.id);
const cacheable = mode === 'answer' && !pageCtx && !history.length && !mem;
const qhash = q.toLowerCase().replace(/\s+/g, ' ').trim();
if (cacheable) {
try {
const c = db.prepare("SELECT answer FROM assistant_cache WHERE qhash = ? AND created_at > datetime('now','-7 days')").get(qhash);
if (c) { bumpUsage('cache_hits'); sse('delta', { t: c.answer }); sse('done', { source: 'model', answers: faqJson, sources: rag.sources, cached: true }); return res.end(); }
} catch (e) {}
}
if (rag.sources && rag.sources.length) sse('meta', { sources: rag.sources });
let context = pageCtx;
if (rag.text) context = (context ? context + '\n\n' : '') + 'Из учебников:\n' + rag.text;
const { msgs, cap } = buildAskMessages(q, hits, context, history, req.user && req.user.role, mode, mem);
let full = '';
let r = { text: null, error: 'network' };
try { r = await callLLMStreamFailover(msgs, cap, (piece) => { full += piece; sse('delta', { t: piece }); }); }
catch (e) { r = { text: null, error: 'network' }; }
const answer = (r && r.text) || full;
if (answer) {
bumpUsage('model_calls');
if (cacheable) { try { db.prepare("INSERT OR REPLACE INTO assistant_cache (qhash, answer, created_at) VALUES (?, ?, datetime('now'))").run(qhash, answer); } catch (e) {} }
if (_setting('assistant_memory') !== '0' && (mode === 'check' || history.length >= 4)) _extractMemory(req.user.id, q, answer);
sse('done', { source: 'model', answers: faqJson, sources: rag.sources });
return res.end();
}
bumpUsage('faq');
if (r && r.error === 'rate_limit') sse('done', { source: 'limit', answer: 'Сейчас слишком много запросов к ИИ за короткое время — подожди минутку и спроси снова. Память диалога не потеряется.', answers: faqJson, sources: [] });
else if (r && (r.error === 'timeout' || r.error === 'network' || r.error === 'http')) sse('done', { source: 'error', answer: 'Не получилось обратиться к ИИ. Попробуй ещё раз чуть позже.', answers: faqJson, sources: [] });
else sse('done', { source: 'faq', answer: null, answers: faqJson, sources: [] });
res.end();
}
/* ── POST /api/assistant/feedback { rating, q? } ── лайк/дизлайк ответа ── */
function feedback(req, res) {
const rating = (req.body && req.body.rating) === 1 ? 1 : ((req.body && req.body.rating) === -1 ? -1 : 0);
@@ -623,4 +746,4 @@ async function flashcardsFromText(req, res) {
res.json({ title, cards });
}
module.exports = { getContext, markSeen, dismiss, setSettings, ask, flashcardsFromText, feedback, getMemory, clearMemory, getStudentProfile, llmConfig, pingLLM, clearFailover: _clearFailover, callLLMFailover };
module.exports = { getContext, markSeen, dismiss, setSettings, ask, askStream, flashcardsFromText, feedback, getMemory, clearMemory, getStudentProfile, llmConfig, pingLLM, clearFailover: _clearFailover, callLLMFailover };