Clear project — starting fresh from spec

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dianaka123
2026-02-24 14:36:47 +03:00
parent 6fe452d4dc
commit 9eb68695e9
91 changed files with 310 additions and 13106 deletions

View File

@@ -1,54 +0,0 @@
import hashlib
import uuid
from datetime import datetime, timedelta, timezone
import jwt
from passlib.context import CryptContext
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(user_id: str, role: str, status: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.access_token_expire_minutes
)
payload = {
"sub": user_id,
"role": role,
"status": status,
"exp": expire,
"type": "access",
}
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def create_refresh_token() -> tuple[str, str, datetime]:
"""Returns (raw_token, hashed_token, expires_at)."""
raw = str(uuid.uuid4())
hashed = hashlib.sha256(raw.encode()).hexdigest()
expires_at = datetime.now(timezone.utc) + timedelta(
days=settings.refresh_token_expire_days
)
return raw, hashed, expires_at
def decode_access_token(token: str) -> dict:
"""Raises jwt.InvalidTokenError on failure."""
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
if payload.get("type") != "access":
raise jwt.InvalidTokenError("Not an access token")
return payload
def hash_token(raw: str) -> str:
return hashlib.sha256(raw.encode()).hexdigest()

View File

@@ -1,226 +0,0 @@
"""
Instagram Graph API polling service.
Setup requirements:
1. Convert organizer's Instagram to Business/Creator account and link to a Facebook Page.
2. Create a Facebook App at developers.facebook.com.
3. Add Instagram Graph API product with permissions: instagram_basic, pages_read_engagement.
4. Generate a long-lived User Access Token (valid 60 days) and set INSTAGRAM_ACCESS_TOKEN in .env.
5. Find your Instagram numeric user ID and set INSTAGRAM_USER_ID in .env.
The scheduler runs every INSTAGRAM_POLL_INTERVAL seconds (default: 1800 = 30 min).
Token is refreshed weekly to prevent expiry.
"""
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_session_factory
from app.models.championship import Championship
logger = logging.getLogger(__name__)
GRAPH_BASE = "https://graph.facebook.com/v21.0"
# Russian month names → month number
RU_MONTHS = {
"января": 1, "февраля": 2, "марта": 3, "апреля": 4,
"мая": 5, "июня": 6, "июля": 7, "августа": 8,
"сентября": 9, "октября": 10, "ноября": 11, "декабря": 12,
}
LOCATION_PREFIXES = ["место:", "адрес:", "location:", "venue:", "зал:", "address:"]
DATE_PATTERNS = [
# 15 марта 2025
(
r"\b(\d{1,2})\s+("
+ "|".join(RU_MONTHS.keys())
+ r")\s+(\d{4})\b",
"ru",
),
# 15.03.2025
(r"\b(\d{1,2})\.(\d{2})\.(\d{4})\b", "dot"),
# March 15 2025 or March 15, 2025
(
r"\b(January|February|March|April|May|June|July|August|September|October|November|December)"
r"\s+(\d{1,2}),?\s+(\d{4})\b",
"en",
),
]
EN_MONTHS = {
"january": 1, "february": 2, "march": 3, "april": 4,
"may": 5, "june": 6, "july": 7, "august": 8,
"september": 9, "october": 10, "november": 11, "december": 12,
}
@dataclass
class ParsedChampionship:
title: str
description: Optional[str]
location: Optional[str]
event_date: Optional[datetime]
raw_caption_text: str
image_url: Optional[str]
def parse_caption(text: str, image_url: str | None = None) -> ParsedChampionship:
lines = [line.strip() for line in text.strip().splitlines() if line.strip()]
title = lines[0] if lines else "Untitled Championship"
description = "\n".join(lines[1:]) if len(lines) > 1 else None
location = None
for line in lines:
lower = line.lower()
for prefix in LOCATION_PREFIXES:
if lower.startswith(prefix):
location = line[len(prefix):].strip()
break
event_date = _extract_date(text)
return ParsedChampionship(
title=title,
description=description,
location=location,
event_date=event_date,
raw_caption_text=text,
image_url=image_url,
)
def _extract_date(text: str) -> Optional[datetime]:
for pattern, fmt in DATE_PATTERNS:
m = re.search(pattern, text, re.IGNORECASE)
if not m:
continue
try:
if fmt == "ru":
day, month_name, year = int(m.group(1)), m.group(2).lower(), int(m.group(3))
month = RU_MONTHS.get(month_name)
if month:
return datetime(year, month, day, tzinfo=timezone.utc)
elif fmt == "dot":
day, month, year = int(m.group(1)), int(m.group(2)), int(m.group(3))
return datetime(year, month, day, tzinfo=timezone.utc)
elif fmt == "en":
month_name, day, year = m.group(1).lower(), int(m.group(2)), int(m.group(3))
month = EN_MONTHS.get(month_name)
if month:
return datetime(year, month, day, tzinfo=timezone.utc)
except ValueError:
continue
return None
async def _upsert_championship(
session: AsyncSession,
instagram_media_id: str,
parsed: ParsedChampionship,
) -> Championship:
result = await session.execute(
select(Championship).where(
Championship.instagram_media_id == instagram_media_id
)
)
champ = result.scalar_one_or_none()
if champ:
champ.title = parsed.title
champ.description = parsed.description
champ.location = parsed.location
champ.event_date = parsed.event_date
champ.raw_caption_text = parsed.raw_caption_text
champ.image_url = parsed.image_url
else:
champ = Championship(
title=parsed.title,
description=parsed.description,
location=parsed.location,
event_date=parsed.event_date,
status="draft",
source="instagram",
instagram_media_id=instagram_media_id,
raw_caption_text=parsed.raw_caption_text,
image_url=parsed.image_url,
)
session.add(champ)
await session.commit()
return champ
async def poll_instagram() -> None:
"""Fetch recent posts from the monitored Instagram account and sync championships."""
if not settings.instagram_user_id or not settings.instagram_access_token:
logger.warning("Instagram credentials not configured — skipping poll")
return
url = (
f"{GRAPH_BASE}/{settings.instagram_user_id}/media"
f"?fields=id,caption,media_url,timestamp"
f"&access_token={settings.instagram_access_token}"
)
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
except Exception as exc:
logger.error("Instagram API request failed: %s", exc)
return
posts = data.get("data", [])
logger.info("Instagram poll: fetched %d posts", len(posts))
async with get_session_factory()() as session:
for post in posts:
media_id = post.get("id")
caption = post.get("caption", "")
image_url = post.get("media_url")
if not caption:
continue
try:
parsed = parse_caption(caption, image_url)
await _upsert_championship(session, media_id, parsed)
logger.info("Synced championship from Instagram post %s: %s", media_id, parsed.title)
except Exception as exc:
logger.error("Failed to sync Instagram post %s: %s", media_id, exc)
async def refresh_instagram_token() -> None:
"""Refresh the long-lived Instagram token before it expires (run weekly)."""
if not settings.instagram_access_token:
return
url = (
f"{GRAPH_BASE}/oauth/access_token"
f"?grant_type=ig_refresh_token"
f"&access_token={settings.instagram_access_token}"
)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
new_token = response.json().get("access_token")
if new_token:
# In a production setup, persist the new token to the DB or secrets manager.
# For now, log it so it can be manually updated in .env.
logger.warning(
"Instagram token refreshed. Update INSTAGRAM_ACCESS_TOKEN in .env:\n%s",
new_token,
)
except Exception as exc:
logger.error("Failed to refresh Instagram token: %s", exc)

View File

@@ -1,44 +0,0 @@
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.notification_log import NotificationLog
from app.models.user import User
EXPO_PUSH_URL = "https://exp.host/--/api/v2/push/send"
async def send_push_notification(
db: AsyncSession,
user: User,
title: str,
body: str,
notif_type: str,
registration_id: str | None = None,
) -> None:
delivery_status = "skipped"
if user.expo_push_token:
payload = {
"to": user.expo_push_token,
"title": title,
"body": body,
"data": {"type": notif_type, "registration_id": registration_id},
"sound": "default",
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(EXPO_PUSH_URL, json=payload)
delivery_status = "sent" if response.status_code == 200 else "failed"
except Exception:
delivery_status = "failed"
log = NotificationLog(
user_id=user.id,
registration_id=registration_id,
type=notif_type,
title=title,
body=body,
delivery_status=delivery_status,
)
db.add(log)
await db.commit()

View File

@@ -1,49 +0,0 @@
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from app.crud import crud_championship, crud_participant, crud_registration, crud_user
from app.models.participant_list import ParticipantList
from app.models.user import User
from app.services import notification_service
async def publish_participant_list(
db: AsyncSession,
championship_id: uuid.UUID,
organizer: User,
) -> ParticipantList:
pl = await crud_participant.get_by_championship(db, championship_id)
if not pl:
raise ValueError("Participant list not found — create it first")
pl = await crud_participant.publish(db, pl)
championship = await crud_championship.get(db, championship_id)
registrations = await crud_registration.list_by_championship(db, championship_id)
for reg in registrations:
user = await crud_user.get(db, reg.user_id)
if not user:
continue
if reg.status == "accepted":
title = "Congratulations!"
body = f"You've been accepted to {championship.title}!"
elif reg.status == "rejected":
title = "Application Update"
body = f"Unfortunately, your application to {championship.title} was not accepted this time."
else:
title = "Application Update"
body = f"You are on the waitlist for {championship.title}."
await notification_service.send_push_notification(
db=db,
user=user,
title=title,
body=body,
notif_type=reg.status,
registration_id=str(reg.id),
)
return pl