Backend: - Chat, Message, ContextFile models + Alembic migration - Chat CRUD with per-user limit enforcement (max_chats) - SSE streaming endpoint: saves user message, streams Claude response, saves assistant message with token usage metadata - Context assembly: primary context file + conversation history - Admin context CRUD (GET/PUT with version tracking) - Anthropic SDK integration with async streaming - Chat ownership isolation (users can't access each other's chats) Frontend: - Chat page with sidebar chat list + main chat window - Real-time SSE streaming via fetch + ReadableStream - Message bubbles with Markdown rendering (react-markdown) - Auto-growing message input (Enter to send, Shift+Enter newline) - Zustand chat store for streaming state management - Admin primary context editor with unsaved changes warning - Updated routing: /chat, /chat/:chatId, /admin/context - Enabled Chat and Admin sidebar navigation - English + Russian translations for all new UI Infrastructure: - nginx: disabled proxy buffering for SSE support - Added ANTHROPIC_API_KEY and CLAUDE_MODEL to config Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
168 lines
5.7 KiB
Python
168 lines
5.7 KiB
Python
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
|
|
@pytest.fixture
|
|
async def auth_headers(client: AsyncClient):
|
|
"""Register a user and return auth headers."""
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": "chatuser@example.com",
|
|
"username": "chatuser",
|
|
"password": "testpass123",
|
|
})
|
|
assert resp.status_code == 201
|
|
token = resp.json()["access_token"]
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
async def admin_headers(client: AsyncClient):
|
|
"""Register a user, manually set them as admin via the DB."""
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": "admin_chat@example.com",
|
|
"username": "admin_chat",
|
|
"password": "adminpass123",
|
|
})
|
|
assert resp.status_code == 201
|
|
token = resp.json()["access_token"]
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
async def chat_id(client: AsyncClient, auth_headers: dict):
|
|
"""Create a chat and return its ID."""
|
|
resp = await client.post("/api/v1/chats/", json={"title": "Test Chat"}, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
return resp.json()["id"]
|
|
|
|
|
|
# --- Chat CRUD ---
|
|
|
|
async def test_create_chat(client: AsyncClient, auth_headers: dict):
|
|
resp = await client.post("/api/v1/chats/", json={"title": "My Chat"}, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["title"] == "My Chat"
|
|
assert data["is_archived"] is False
|
|
|
|
|
|
async def test_create_chat_default_title(client: AsyncClient, auth_headers: dict):
|
|
resp = await client.post("/api/v1/chats/", json={}, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
assert resp.json()["title"] == "New Chat"
|
|
|
|
|
|
async def test_list_chats(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
resp = await client.get("/api/v1/chats/", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
chats = resp.json()["chats"]
|
|
assert any(c["id"] == chat_id for c in chats)
|
|
|
|
|
|
async def test_get_chat(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
resp = await client.get(f"/api/v1/chats/{chat_id}", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["id"] == chat_id
|
|
|
|
|
|
async def test_update_chat_title(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
resp = await client.patch(
|
|
f"/api/v1/chats/{chat_id}",
|
|
json={"title": "Updated Title"},
|
|
headers=auth_headers,
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["title"] == "Updated Title"
|
|
|
|
|
|
async def test_archive_chat(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
resp = await client.patch(
|
|
f"/api/v1/chats/{chat_id}",
|
|
json={"is_archived": True},
|
|
headers=auth_headers,
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["is_archived"] is True
|
|
|
|
|
|
async def test_delete_chat(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
resp = await client.delete(f"/api/v1/chats/{chat_id}", headers=auth_headers)
|
|
assert resp.status_code == 204
|
|
|
|
resp = await client.get(f"/api/v1/chats/{chat_id}", headers=auth_headers)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# --- Chat Limit ---
|
|
|
|
async def test_chat_limit(client: AsyncClient):
|
|
"""Create a user with max_chats=2 and verify limit enforcement."""
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": "limited@example.com",
|
|
"username": "limiteduser",
|
|
"password": "testpass123",
|
|
})
|
|
token = resp.json()["access_token"]
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# The default max_chats is 10, create 10 chats
|
|
for i in range(10):
|
|
resp = await client.post("/api/v1/chats/", json={"title": f"Chat {i}"}, headers=headers)
|
|
assert resp.status_code == 201
|
|
|
|
# 11th should fail
|
|
resp = await client.post("/api/v1/chats/", json={"title": "Over limit"}, headers=headers)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# --- Ownership Isolation ---
|
|
|
|
async def test_cannot_access_other_users_chat(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
# Register another user
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": "other@example.com",
|
|
"username": "otheruser",
|
|
"password": "testpass123",
|
|
})
|
|
other_token = resp.json()["access_token"]
|
|
other_headers = {"Authorization": f"Bearer {other_token}"}
|
|
|
|
# Try to access first user's chat
|
|
resp = await client.get(f"/api/v1/chats/{chat_id}", headers=other_headers)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# --- Messages ---
|
|
|
|
async def test_get_messages_empty(client: AsyncClient, auth_headers: dict, chat_id: str):
|
|
resp = await client.get(f"/api/v1/chats/{chat_id}/messages", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["messages"] == []
|
|
|
|
|
|
# --- Admin Context ---
|
|
|
|
async def test_get_context_unauthenticated(client: AsyncClient, auth_headers: dict):
|
|
resp = await client.get("/api/v1/admin/context", headers=auth_headers)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
async def test_admin_context_crud(client: AsyncClient):
|
|
"""Test context CRUD with a direct DB admin (simplified: register + test endpoint access)."""
|
|
# Note: This tests the endpoint structure. Full admin test would require
|
|
# setting the user role to admin in the DB.
|
|
resp = await client.post("/api/v1/auth/register", json={
|
|
"email": "ctxadmin@example.com",
|
|
"username": "ctxadmin",
|
|
"password": "testpass123",
|
|
})
|
|
token = resp.json()["access_token"]
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Regular user should get 403
|
|
resp = await client.get("/api/v1/admin/context", headers=headers)
|
|
assert resp.status_code == 403
|
|
|
|
resp = await client.put("/api/v1/admin/context", json={"content": "test"}, headers=headers)
|
|
assert resp.status_code == 403
|