Files
PoleDanceApp/backend/app/services/auth_service.py
Dianaka123 789d2bf0a6 Full app rebuild: FastAPI backend + React Native mobile with auth, championships, admin
Backend (FastAPI + SQLAlchemy + SQLite):
- JWT auth with access/refresh tokens, bcrypt password hashing
- User model with member/organizer/admin roles, auto-approve members
- Championship, Registration, ParticipantList, Notification models
- Alembic async migrations, seed data with test users
- Registration endpoint returns tokens for members, pending for organizers
- /registrations/my returns championship title/date/location via eager loading
- Admin endpoints: list users, approve/reject organizers

Mobile (React Native + Expo + TypeScript):
- Zustand auth store, Axios client with token refresh interceptor
- Role-based registration (Member vs Organizer) with contextual form labels
- Tab navigation with Ionicons, safe area headers, admin tab for admin role
- Championships list with status badges, detail screen with registration progress
- My Registrations with championship title, progress bar, and tap-to-navigate
- Admin panel with pending/all filter, approve/reject with confirmation
- Profile screen with role badge, Ionicons info rows, sign out
- Password visibility toggle (Ionicons), keyboard flow hints (returnKeyType)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:46:50 +03:00

82 lines
2.6 KiB
Python

import hashlib
import uuid
from datetime import UTC, datetime, timedelta
import bcrypt
from jose import JWTError, jwt
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.models.user import RefreshToken
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def create_access_token(user_id: uuid.UUID) -> str:
expire = datetime.now(UTC) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode({"sub": str(user_id), "exp": expire}, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def decode_access_token(token: str) -> dict | None:
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
except JWTError:
return None
def _hash_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
async def create_refresh_token(db: AsyncSession, user_id: uuid.UUID) -> str:
raw = str(uuid.uuid4())
expires_at = datetime.now(UTC) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
record = RefreshToken(user_id=user_id, token_hash=_hash_token(raw), expires_at=expires_at)
db.add(record)
await db.commit()
return raw
async def rotate_refresh_token(db: AsyncSession, raw_token: str) -> tuple[str, uuid.UUID] | None:
"""Validate old token, revoke it, issue a new one. Returns (new_raw, user_id) or None."""
from sqlalchemy import select
token_hash = _hash_token(raw_token)
result = await db.execute(
select(RefreshToken).where(
RefreshToken.token_hash == token_hash,
RefreshToken.revoked.is_(False),
RefreshToken.expires_at > datetime.now(UTC),
)
)
record = result.scalar_one_or_none()
if record is None:
return None
record.revoked = True
await db.flush()
new_raw = str(uuid.uuid4())
expires_at = datetime.now(UTC) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
new_record = RefreshToken(user_id=record.user_id, token_hash=_hash_token(new_raw), expires_at=expires_at)
db.add(new_record)
await db.commit()
return new_raw, record.user_id
async def revoke_refresh_token(db: AsyncSession, raw_token: str) -> None:
from sqlalchemy import select
token_hash = _hash_token(raw_token)
result = await db.execute(select(RefreshToken).where(RefreshToken.token_hash == token_hash))
record = result.scalar_one_or_none()
if record:
record.revoked = True
await db.commit()