Phase 1: Foundation — backend auth, frontend shell, Docker setup

Backend (FastAPI):
- App factory with async SQLAlchemy 2.0 + PostgreSQL
- Alembic migration for users and sessions tables
- JWT auth (access + refresh tokens, bcrypt passwords)
- Auth endpoints: register, login, refresh, logout, me
- Admin seed script, role-based access deps

Frontend (React + TypeScript):
- Vite + Tailwind CSS + shadcn/ui theme (health-oriented palette)
- i18n with English and Russian translations
- Zustand auth/UI stores with localStorage persistence
- Axios client with automatic token refresh on 401
- Login/register pages, protected routing
- App layout: collapsible sidebar, header with theme/language toggles
- Dashboard with placeholder stats

Infrastructure:
- Docker Compose (postgres, backend, frontend, nginx)
- Nginx reverse proxy with WebSocket support
- Dev override with hot reload

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 12:25:02 +03:00
parent 5bdc296172
commit 7c752cae6b
75 changed files with 7706 additions and 2 deletions

14
backend/Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev \
&& rm -rf /var/lib/apt/lists/*
COPY pyproject.toml .
RUN pip install --no-cache-dir .
COPY . .
CMD ["sh", "-c", "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"]

36
backend/alembic.ini Normal file
View File

@@ -0,0 +1,36 @@
[alembic]
script_location = alembic
sqlalchemy.url = driver://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

46
backend/alembic/env.py Normal file
View File

@@ -0,0 +1,46 @@
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from app.config import settings
from app.database import Base
from app.models import User, Session # noqa: F401 — ensure models are registered
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = settings.DATABASE_URL
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
connectable = create_async_engine(settings.DATABASE_URL)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())

View File

@@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,53 @@
"""Create users and sessions tables
Revision ID: 001
Revises:
Create Date: 2026-03-19
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision: str = "001"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"users",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("email", sa.String(255), unique=True, nullable=False, index=True),
sa.Column("username", sa.String(100), unique=True, nullable=False, index=True),
sa.Column("hashed_password", sa.String(255), nullable=False),
sa.Column("full_name", sa.String(255), nullable=True),
sa.Column("role", sa.String(20), nullable=False, server_default="user"),
sa.Column("is_active", sa.Boolean, nullable=False, server_default=sa.text("true")),
sa.Column("max_chats", sa.Integer, nullable=False, server_default=sa.text("10")),
sa.Column("oauth_provider", sa.String(50), nullable=True),
sa.Column("oauth_provider_id", sa.String(255), nullable=True),
sa.Column("telegram_chat_id", sa.BigInteger, nullable=True),
sa.Column("avatar_url", sa.String(500), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"sessions",
sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("user_id", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True),
sa.Column("refresh_token_hash", sa.String(255), nullable=False, index=True),
sa.Column("device_info", sa.String(500), nullable=True),
sa.Column("ip_address", sa.String(45), nullable=True),
sa.Column("expires_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
def downgrade() -> None:
op.drop_table("sessions")
op.drop_table("users")

0
backend/app/__init__.py Normal file
View File

View File

67
backend/app/api/deps.py Normal file
View File

@@ -0,0 +1,67 @@
import uuid
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import decode_access_token
from app.database import get_db
from app.models.user import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login", auto_error=False)
async def get_current_user(
token: Annotated[str | None, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_access_token(token)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
uid = uuid.UUID(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
result = await db.execute(select(User).where(User.id == uid))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def require_admin(
user: Annotated[User, Depends(get_current_user)],
) -> User:
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user

View File

View File

@@ -0,0 +1,70 @@
from typing import Annotated
from fastapi import APIRouter, Depends, Request, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user
from app.database import get_db
from app.models.user import User
from app.schemas.auth import (
AuthResponse,
LoginRequest,
RefreshRequest,
TokenResponse,
UserResponse,
RegisterRequest,
)
from app.services import auth_service
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED)
async def register(
data: RegisterRequest,
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
):
return await auth_service.register_user(
db,
data,
ip_address=request.client.host if request.client else None,
device_info=request.headers.get("user-agent"),
)
@router.post("/login", response_model=AuthResponse)
async def login(
data: LoginRequest,
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
):
return await auth_service.login_user(
db,
email=data.email,
password=data.password,
remember_me=data.remember_me,
ip_address=request.client.host if request.client else None,
device_info=request.headers.get("user-agent"),
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh(
data: RefreshRequest,
db: Annotated[AsyncSession, Depends(get_db)],
):
return await auth_service.refresh_tokens(db, data.refresh_token)
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT)
async def logout(
data: RefreshRequest,
db: Annotated[AsyncSession, Depends(get_db)],
):
await auth_service.logout_user(db, data.refresh_token)
@router.get("/me", response_model=UserResponse)
async def me(user: Annotated[User, Depends(get_current_user)]):
return UserResponse.model_validate(user)

View File

@@ -0,0 +1,12 @@
from fastapi import APIRouter
from app.api.v1.auth import router as auth_router
api_v1_router = APIRouter(prefix="/api/v1")
api_v1_router.include_router(auth_router)
@api_v1_router.get("/health")
async def health():
return {"status": "ok"}

22
backend/app/config.py Normal file
View File

@@ -0,0 +1,22 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql+asyncpg://ai_assistant:changeme@postgres:5432/ai_assistant"
SECRET_KEY: str = "changeme_secret_key_at_least_32_chars_long"
ENVIRONMENT: str = "development"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_DAYS: int = 30
REFRESH_TOKEN_EXPIRE_HOURS: int = 24
BACKEND_CORS_ORIGINS: list[str] = ["http://localhost", "http://localhost:3000"]
FIRST_ADMIN_EMAIL: str = "admin@example.com"
FIRST_ADMIN_USERNAME: str = "admin"
FIRST_ADMIN_PASSWORD: str = "changeme_admin_password"
model_config = {"env_file": ".env", "extra": "ignore"}
settings = Settings()

View File

View File

@@ -0,0 +1,49 @@
import hashlib
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(user_id: uuid.UUID, role: str) -> str:
now = datetime.now(timezone.utc)
expire = now + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": str(user_id),
"role": role,
"exp": expire,
"iat": now,
"jti": str(uuid.uuid4()),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict:
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return {}
def generate_refresh_token() -> str:
return secrets.token_hex(64)
def hash_refresh_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()

32
backend/app/database.py Normal file
View File

@@ -0,0 +1,32 @@
import uuid
from collections.abc import AsyncGenerator
from datetime import datetime, timezone
from sqlalchemy import DateTime, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from app.config import settings
engine = create_async_engine(settings.DATABASE_URL, echo=settings.ENVIRONMENT == "development")
async_session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

40
backend/app/main.py Normal file
View File

@@ -0,0 +1,40 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
from alembic import command
from alembic.config import Config
alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")
yield
def create_app() -> FastAPI:
app = FastAPI(
title="AI Assistant API",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from app.api.v1.router import api_v1_router
app.include_router(api_v1_router)
return app
app = create_app()

View File

@@ -0,0 +1,4 @@
from app.models.user import User
from app.models.session import Session
__all__ = ["User", "Session"]

View File

@@ -0,0 +1,22 @@
import uuid
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, String
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Session(Base):
__tablename__ = "sessions"
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
refresh_token_hash: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
device_info: Mapped[str | None] = mapped_column(String(500), nullable=True)
ip_address: Mapped[str | None] = mapped_column(String(45), nullable=True)
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
user: Mapped["User"] = relationship(back_populates="sessions") # noqa: F821

View File

@@ -0,0 +1,27 @@
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Integer, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class User(Base):
__tablename__ = "users"
email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False)
username: Mapped[str] = mapped_column(String(100), unique=True, index=True, nullable=False)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
full_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
role: Mapped[str] = mapped_column(String(20), nullable=False, default="user")
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
max_chats: Mapped[int] = mapped_column(Integer, nullable=False, default=10)
oauth_provider: Mapped[str | None] = mapped_column(String(50), nullable=True)
oauth_provider_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
telegram_chat_id: Mapped[int | None] = mapped_column(nullable=True)
avatar_url: Mapped[str | None] = mapped_column(String(500), nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)
sessions: Mapped[list["Session"]] = relationship(back_populates="user", cascade="all, delete-orphan") # noqa: F821

View File

View File

@@ -0,0 +1,46 @@
import uuid
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
class RegisterRequest(BaseModel):
email: EmailStr
username: str = Field(min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_-]+$")
password: str = Field(min_length=8, max_length=128)
full_name: str | None = Field(default=None, max_length=255)
class LoginRequest(BaseModel):
email: EmailStr
password: str
remember_me: bool = False
class RefreshRequest(BaseModel):
refresh_token: str
class UserResponse(BaseModel):
id: uuid.UUID
email: str
username: str
full_name: str | None
role: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class AuthResponse(BaseModel):
user: UserResponse
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"

View File

@@ -0,0 +1,5 @@
from pydantic import BaseModel
class MessageResponse(BaseModel):
message: str

View File

View File

@@ -0,0 +1,162 @@
import uuid
from datetime import datetime, timedelta, timezone
from fastapi import HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.core.security import (
create_access_token,
generate_refresh_token,
hash_password,
hash_refresh_token,
verify_password,
)
from app.models.session import Session
from app.models.user import User
from app.schemas.auth import AuthResponse, RegisterRequest, TokenResponse, UserResponse
async def _create_session(
db: AsyncSession,
user: User,
remember_me: bool,
ip_address: str | None = None,
device_info: str | None = None,
) -> tuple[str, str]:
access_token = create_access_token(user.id, user.role)
refresh_token = generate_refresh_token()
if remember_me:
expires_at = datetime.now(timezone.utc) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
else:
expires_at = datetime.now(timezone.utc) + timedelta(hours=settings.REFRESH_TOKEN_EXPIRE_HOURS)
session = Session(
user_id=user.id,
refresh_token_hash=hash_refresh_token(refresh_token),
device_info=device_info,
ip_address=ip_address,
expires_at=expires_at,
)
db.add(session)
await db.flush()
return access_token, refresh_token
async def register_user(
db: AsyncSession,
data: RegisterRequest,
ip_address: str | None = None,
device_info: str | None = None,
) -> AuthResponse:
existing = await db.execute(
select(User).where((User.email == data.email) | (User.username == data.username))
)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with this email or username already exists",
)
user = User(
email=data.email,
username=data.username,
hashed_password=hash_password(data.password),
full_name=data.full_name,
)
db.add(user)
await db.flush()
access_token, refresh_token = await _create_session(db, user, remember_me=False, ip_address=ip_address, device_info=device_info)
return AuthResponse(
user=UserResponse.model_validate(user),
access_token=access_token,
refresh_token=refresh_token,
)
async def login_user(
db: AsyncSession,
email: str,
password: str,
remember_me: bool = False,
ip_address: str | None = None,
device_info: str | None = None,
) -> AuthResponse:
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is deactivated",
)
access_token, refresh_token = await _create_session(db, user, remember_me, ip_address, device_info)
return AuthResponse(
user=UserResponse.model_validate(user),
access_token=access_token,
refresh_token=refresh_token,
)
async def refresh_tokens(db: AsyncSession, refresh_token: str) -> TokenResponse:
token_hash = hash_refresh_token(refresh_token)
result = await db.execute(
select(Session).where(Session.refresh_token_hash == token_hash)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
if session.expires_at < datetime.now(timezone.utc):
await db.delete(session)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token expired",
)
result = await db.execute(select(User).where(User.id == session.user_id))
user = result.scalar_one_or_none()
if not user or not user.is_active:
await db.delete(session)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
# Rotate refresh token
new_refresh_token = generate_refresh_token()
session.refresh_token_hash = hash_refresh_token(new_refresh_token)
new_access_token = create_access_token(user.id, user.role)
return TokenResponse(
access_token=new_access_token,
refresh_token=new_refresh_token,
)
async def logout_user(db: AsyncSession, refresh_token: str) -> None:
token_hash = hash_refresh_token(refresh_token)
result = await db.execute(
select(Session).where(Session.refresh_token_hash == token_hash)
)
session = result.scalar_one_or_none()
if session:
await db.delete(session)

33
backend/pyproject.toml Normal file
View File

@@ -0,0 +1,33 @@
[project]
name = "ai-assistant-backend"
version = "0.1.0"
description = "Personal AI Assistant - Backend"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.30.0",
"sqlalchemy[asyncio]>=2.0.30",
"asyncpg>=0.30.0",
"alembic>=1.13.0",
"pydantic[email]>=2.9.0",
"pydantic-settings>=2.5.0",
"passlib[bcrypt]>=1.7.4",
"python-jose[cryptography]>=3.3.0",
"python-multipart>=0.0.9",
"httpx>=0.27.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.24.0",
"httpx>=0.27.0",
]
[build-system]
requires = ["setuptools>=69.0"]
build-backend = "setuptools.build_meta"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

View File

View File

@@ -0,0 +1,35 @@
"""Create the initial admin user if it doesn't already exist."""
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.core.security import hash_password
from app.database import async_session_factory
from app.models.user import User
async def seed_admin() -> None:
async with async_session_factory() as db:
result = await db.execute(select(User).where(User.email == settings.FIRST_ADMIN_EMAIL))
existing = result.scalar_one_or_none()
if existing:
print(f"Admin user already exists: {existing.email}")
return
admin = User(
email=settings.FIRST_ADMIN_EMAIL,
username=settings.FIRST_ADMIN_USERNAME,
hashed_password=hash_password(settings.FIRST_ADMIN_PASSWORD),
full_name="Administrator",
role="admin",
)
db.add(admin)
await db.commit()
print(f"Admin user created: {admin.email}")
if __name__ == "__main__":
asyncio.run(seed_admin())

View File

61
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,61 @@
import asyncio
import uuid
from collections.abc import AsyncGenerator
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.database import Base, get_db
from app.main import create_app
# Use a unique in-memory-like approach: create a separate test DB schema
TEST_DATABASE_URL = "postgresql+asyncpg://ai_assistant:changeme@localhost:5432/ai_assistant_test"
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_engine():
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def db_session(test_engine) -> AsyncGenerator[AsyncSession, None]:
session_factory = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
@pytest.fixture
async def client(test_engine) -> AsyncGenerator[AsyncClient, None]:
session_factory = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
async def override_get_db():
async with session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
app = create_app()
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac

135
backend/tests/test_auth.py Normal file
View File

@@ -0,0 +1,135 @@
import pytest
from httpx import AsyncClient
@pytest.fixture
async def registered_user(client: AsyncClient):
resp = await client.post("/api/v1/auth/register", json={
"email": "test@example.com",
"username": "testuser",
"password": "testpass123",
"full_name": "Test User",
})
assert resp.status_code == 201
return resp.json()
async def test_health(client: AsyncClient):
resp = await client.get("/api/v1/health")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
async def test_register_success(client: AsyncClient):
resp = await client.post("/api/v1/auth/register", json={
"email": "new@example.com",
"username": "newuser",
"password": "password123",
})
assert resp.status_code == 201
data = resp.json()
assert data["user"]["email"] == "new@example.com"
assert data["user"]["username"] == "newuser"
assert data["user"]["role"] == "user"
assert "access_token" in data
assert "refresh_token" in data
async def test_register_duplicate_email(client: AsyncClient, registered_user):
resp = await client.post("/api/v1/auth/register", json={
"email": "test@example.com",
"username": "different",
"password": "password123",
})
assert resp.status_code == 409
async def test_register_short_password(client: AsyncClient):
resp = await client.post("/api/v1/auth/register", json={
"email": "short@example.com",
"username": "shortpw",
"password": "short",
})
assert resp.status_code == 422
async def test_login_success(client: AsyncClient, registered_user):
resp = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "testpass123",
})
assert resp.status_code == 200
data = resp.json()
assert data["user"]["email"] == "test@example.com"
assert "access_token" in data
assert "refresh_token" in data
async def test_login_invalid_credentials(client: AsyncClient, registered_user):
resp = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrongpassword",
})
assert resp.status_code == 401
async def test_login_nonexistent_user(client: AsyncClient):
resp = await client.post("/api/v1/auth/login", json={
"email": "nobody@example.com",
"password": "password123",
})
assert resp.status_code == 401
async def test_me_authenticated(client: AsyncClient, registered_user):
token = registered_user["access_token"]
resp = await client.get("/api/v1/auth/me", headers={
"Authorization": f"Bearer {token}",
})
assert resp.status_code == 200
assert resp.json()["email"] == "test@example.com"
async def test_me_unauthenticated(client: AsyncClient):
resp = await client.get("/api/v1/auth/me")
assert resp.status_code == 401
async def test_me_invalid_token(client: AsyncClient):
resp = await client.get("/api/v1/auth/me", headers={
"Authorization": "Bearer invalid_token_here",
})
assert resp.status_code == 401
async def test_refresh_token(client: AsyncClient, registered_user):
refresh = registered_user["refresh_token"]
resp = await client.post("/api/v1/auth/refresh", json={
"refresh_token": refresh,
})
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["refresh_token"] != refresh # rotated
async def test_refresh_invalid_token(client: AsyncClient):
resp = await client.post("/api/v1/auth/refresh", json={
"refresh_token": "invalid_token",
})
assert resp.status_code == 401
async def test_logout(client: AsyncClient, registered_user):
refresh = registered_user["refresh_token"]
resp = await client.post("/api/v1/auth/logout", json={
"refresh_token": refresh,
})
assert resp.status_code == 204
# Refresh should fail after logout
resp = await client.post("/api/v1/auth/refresh", json={
"refresh_token": refresh,
})
assert resp.status_code == 401