import uuid from datetime import datetime, timezone from fastapi import HTTPException, status from sqlalchemy import func, select, update from sqlalchemy.ext.asyncio import AsyncSession from app.models.notification import Notification async def create_notification( db: AsyncSession, user_id: uuid.UUID, title: str, body: str, type: str = "info", channel: str = "in_app", scheduled_at: datetime | None = None, metadata: dict | None = None, ) -> Notification: notif = Notification( user_id=user_id, title=title, body=body, type=type, channel=channel, status="pending" if scheduled_at else "sent", scheduled_at=scheduled_at, sent_at=None if scheduled_at else datetime.now(timezone.utc), metadata_=metadata, ) db.add(notif) await db.flush() return notif async def get_user_notifications( db: AsyncSession, user_id: uuid.UUID, status_filter: str | None = None, limit: int = 50, offset: int = 0, ) -> list[Notification]: stmt = select(Notification).where(Notification.user_id == user_id) if status_filter: stmt = stmt.where(Notification.status == status_filter) stmt = stmt.order_by(Notification.created_at.desc()).limit(limit).offset(offset) result = await db.execute(stmt) return list(result.scalars().all()) async def get_unread_count(db: AsyncSession, user_id: uuid.UUID) -> int: result = await db.scalar( select(func.count()).select_from(Notification).where( Notification.user_id == user_id, Notification.status.in_(["sent", "delivered"]), Notification.read_at.is_(None), ) ) return result or 0 async def mark_as_read(db: AsyncSession, notification_id: uuid.UUID, user_id: uuid.UUID) -> Notification: result = await db.execute( select(Notification).where(Notification.id == notification_id, Notification.user_id == user_id) ) notif = result.scalar_one_or_none() if not notif: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Notification not found") notif.status = "read" notif.read_at = datetime.now(timezone.utc) await db.flush() return notif async def mark_all_read(db: AsyncSession, user_id: uuid.UUID) -> int: result = await db.execute( update(Notification) .where( Notification.user_id == user_id, Notification.read_at.is_(None), Notification.status.in_(["sent", "delivered"]), ) .values(status="read", read_at=datetime.now(timezone.utc)) ) return result.rowcount async def get_pending_scheduled(db: AsyncSession) -> list[Notification]: now = datetime.now(timezone.utc) result = await db.execute( select(Notification).where( Notification.status == "pending", Notification.scheduled_at <= now, ) ) return list(result.scalars().all()) async def mark_as_sent(db: AsyncSession, notification_id: uuid.UUID) -> None: result = await db.execute(select(Notification).where(Notification.id == notification_id)) notif = result.scalar_one_or_none() if notif: notif.status = "sent" notif.sent_at = datetime.now(timezone.utc) await db.flush()