import json import uuid from collections.abc import AsyncGenerator from anthropic import AsyncAnthropic from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.models.chat import Chat from app.models.message import Message from app.models.skill import Skill from app.services.context_service import DEFAULT_SYSTEM_PROMPT, get_primary_context, get_personal_context from app.services.chat_service import get_chat, save_message client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) async def assemble_context( db: AsyncSession, chat_id: uuid.UUID, user_id: uuid.UUID, user_message: str ) -> tuple[str, list[dict]]: """Assemble system prompt and messages for Claude API.""" system_parts = [] # 1. Primary context ctx = await get_primary_context(db) system_parts.append(ctx.content if ctx and ctx.content.strip() else DEFAULT_SYSTEM_PROMPT) # 2. Personal context personal_ctx = await get_personal_context(db, user_id) if personal_ctx and personal_ctx.content.strip(): system_parts.append(f"---\nUser Context:\n{personal_ctx.content}") # 3. Active skill system prompt chat = await get_chat(db, chat_id, user_id) if chat.skill_id: result = await db.execute(select(Skill).where(Skill.id == chat.skill_id)) skill = result.scalar_one_or_none() if skill and skill.is_active: system_parts.append(f"---\nSpecialist Role ({skill.name}):\n{skill.system_prompt}") system_prompt = "\n\n".join(system_parts) # 4. Conversation history result = await db.execute( select(Message) .where(Message.chat_id == chat_id, Message.role.in_(["user", "assistant"])) .order_by(Message.created_at.asc()) ) history = result.scalars().all() messages = [{"role": msg.role, "content": msg.content} for msg in history] # 5. Current user message messages.append({"role": "user", "content": user_message}) return system_prompt, messages def _sse_event(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data)}\n\n" async def stream_ai_response( db: AsyncSession, chat_id: uuid.UUID, user_id: uuid.UUID, user_message: str ) -> AsyncGenerator[str, None]: """Stream AI response as SSE events.""" # Verify ownership chat = await get_chat(db, chat_id, user_id) # Save user message await save_message(db, chat_id, "user", user_message) await db.commit() try: # Assemble context system_prompt, messages = await assemble_context(db, chat_id, user_id, user_message) # Stream from Claude full_content = "" assistant_msg_id = str(uuid.uuid4()) yield _sse_event("message_start", {"message_id": assistant_msg_id}) async with client.messages.stream( model=settings.CLAUDE_MODEL, max_tokens=4096, system=system_prompt, messages=messages, ) as stream: async for text in stream.text_stream: full_content += text yield _sse_event("content_delta", {"delta": text}) # Get final message for metadata final_message = await stream.get_final_message() metadata = { "model": final_message.model, "input_tokens": final_message.usage.input_tokens, "output_tokens": final_message.usage.output_tokens, } # Save assistant message saved_msg = await save_message(db, chat_id, "assistant", full_content, metadata) await db.commit() # Update chat title if first exchange result = await db.execute( select(Message).where(Message.chat_id == chat_id, Message.role == "assistant") ) assistant_count = len(result.scalars().all()) if assistant_count == 1 and chat.title == "New Chat": # Auto-generate title from first few words title = full_content[:50].split("\n")[0].strip() if len(title) > 40: title = title[:40] + "..." chat.title = title await db.commit() yield _sse_event("message_end", { "message_id": str(saved_msg.id), "metadata": metadata, }) except Exception as e: yield _sse_event("error", {"detail": str(e)})