From ada7e829612915a723fa53c36b804d88d88d0aae Mon Sep 17 00:00:00 2001 From: "dolgolyov.alexei" Date: Thu, 19 Mar 2026 13:57:25 +0300 Subject: [PATCH] =?UTF-8?q?Phase=205:=20Notifications=20=E2=80=94=20WebSoc?= =?UTF-8?q?ket,=20APScheduler,=20AI=20tool,=20health=20review?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: - Notification model + Alembic migration - Notification service: CRUD, mark read, unread count, pending scheduled - WebSocket manager singleton for real-time push - WebSocket endpoint /ws/notifications with JWT auth via query param - APScheduler integration: periodic notification sender (every 60s), daily proactive health review job (8 AM) - AI tool: schedule_notification (immediate or scheduled) - Health review worker: analyzes user memory via Claude, creates ai_generated notifications with WebSocket push Frontend: - Notification API client + Zustand store - WebSocket hook with auto-reconnect (exponential backoff) - Notification bell in header with unread count badge + dropdown - Notifications page with type badges, mark read, mark all read - WebSocket initialized in AppLayout for app-wide real-time updates - Enabled notifications nav in sidebar - English + Russian translations Co-Authored-By: Claude Opus 4.6 (1M context) --- GeneralPlan.md | 4 +- .../versions/005_create_notifications.py | 39 +++++++ backend/app/api/v1/notifications.py | 57 ++++++++++ backend/app/api/v1/router.py | 4 + backend/app/api/v1/ws.py | 55 ++++++++++ backend/app/main.py | 6 ++ backend/app/models/__init__.py | 3 +- backend/app/models/notification.py | 27 +++++ backend/app/models/user.py | 1 + backend/app/schemas/notification.py | 30 ++++++ backend/app/services/ai_service.py | 58 ++++++++++ backend/app/services/notification_service.py | 102 ++++++++++++++++++ backend/app/services/scheduler_service.py | 41 +++++++ backend/app/services/ws_manager.py | 37 +++++++ backend/app/workers/health_review.py | 83 ++++++++++++++ backend/app/workers/notification_sender.py | 24 +++++ backend/pyproject.toml | 1 + backend/tests/test_notifications.py | 46 ++++++++ frontend/public/locales/en/translation.json | 13 +++ frontend/public/locales/ru/translation.json | 13 +++ frontend/src/api/notifications.ts | 44 ++++++++ frontend/src/components/layout/app-layout.tsx | 3 + frontend/src/components/layout/header.tsx | 3 + frontend/src/components/layout/sidebar.tsx | 2 +- .../notifications/notification-bell.tsx | 93 ++++++++++++++++ frontend/src/hooks/use-notifications-ws.ts | 67 ++++++++++++ frontend/src/pages/notifications.tsx | 96 +++++++++++++++++ frontend/src/routes.tsx | 2 + frontend/src/stores/notification-store.ts | 40 +++++++ plans/phase-5-notifications.md | 84 +++++++++++++++ 30 files changed, 1074 insertions(+), 4 deletions(-) create mode 100644 backend/alembic/versions/005_create_notifications.py create mode 100644 backend/app/api/v1/notifications.py create mode 100644 backend/app/api/v1/ws.py create mode 100644 backend/app/models/notification.py create mode 100644 backend/app/schemas/notification.py create mode 100644 backend/app/services/notification_service.py create mode 100644 backend/app/services/scheduler_service.py create mode 100644 backend/app/services/ws_manager.py create mode 100644 backend/app/workers/health_review.py create mode 100644 backend/app/workers/notification_sender.py create mode 100644 backend/tests/test_notifications.py create mode 100644 frontend/src/api/notifications.ts create mode 100644 frontend/src/components/notifications/notification-bell.tsx create mode 100644 frontend/src/hooks/use-notifications-ws.ts create mode 100644 frontend/src/pages/notifications.tsx create mode 100644 frontend/src/stores/notification-store.ts create mode 100644 plans/phase-5-notifications.md diff --git a/GeneralPlan.md b/GeneralPlan.md index 92719ed..0f3040c 100644 --- a/GeneralPlan.md +++ b/GeneralPlan.md @@ -228,8 +228,8 @@ Daily scheduled job (APScheduler, 8 AM) reviews each user's memory + recent docs - Summary: Documents + memory tables, upload + processing pipeline, full-text search, AI tools (save_memory, search_documents, get_memory), frontend document/memory UI ### Phase 5: Notifications -- **Status**: NOT STARTED -- [ ] Subplan created (`plans/phase-5-notifications.md`) +- **Status**: IN PROGRESS +- [x] Subplan created (`plans/phase-5-notifications.md`) - [ ] Phase completed - Summary: Notifications table, WebSocket + email + Telegram channels, APScheduler, AI schedule_notification tool, proactive health review job, frontend notification UI diff --git a/backend/alembic/versions/005_create_notifications.py b/backend/alembic/versions/005_create_notifications.py new file mode 100644 index 0000000..03b9abc --- /dev/null +++ b/backend/alembic/versions/005_create_notifications.py @@ -0,0 +1,39 @@ +"""Create notifications table + +Revision ID: 005 +Revises: 004 +Create Date: 2026-03-19 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + +revision: str = "005" +down_revision: Union[str, None] = "004" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "notifications", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("user_id", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True), + sa.Column("title", sa.String(255), nullable=False), + sa.Column("body", sa.Text, nullable=False), + sa.Column("type", sa.String(30), nullable=False, server_default="info"), + sa.Column("channel", sa.String(20), nullable=False, server_default="in_app"), + sa.Column("status", sa.String(20), nullable=False, server_default="pending"), + sa.Column("scheduled_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("sent_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("read_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("metadata", JSONB, nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + ) + + +def downgrade() -> None: + op.drop_table("notifications") diff --git a/backend/app/api/v1/notifications.py b/backend/app/api/v1/notifications.py new file mode 100644 index 0000000..c87e968 --- /dev/null +++ b/backend/app/api/v1/notifications.py @@ -0,0 +1,57 @@ +import uuid +from typing import Annotated + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_current_user +from app.database import get_db +from app.models.user import User +from app.schemas.notification import NotificationListResponse, NotificationResponse, UnreadCountResponse +from app.services import notification_service + +router = APIRouter(prefix="/notifications", tags=["notifications"]) + + +@router.get("/", response_model=NotificationListResponse) +async def list_notifications( + user: Annotated[User, Depends(get_current_user)], + db: Annotated[AsyncSession, Depends(get_db)], + status_filter: str | None = Query(default=None, alias="status"), + limit: int = Query(default=50, le=200), + offset: int = Query(default=0), +): + notifications = await notification_service.get_user_notifications(db, user.id, status_filter, limit, offset) + unread = await notification_service.get_unread_count(db, user.id) + return NotificationListResponse( + notifications=[NotificationResponse.model_validate(n) for n in notifications], + unread_count=unread, + ) + + +@router.get("/unread-count", response_model=UnreadCountResponse) +async def unread_count( + user: Annotated[User, Depends(get_current_user)], + db: Annotated[AsyncSession, Depends(get_db)], +): + count = await notification_service.get_unread_count(db, user.id) + return UnreadCountResponse(count=count) + + +@router.patch("/{notification_id}/read", response_model=NotificationResponse) +async def mark_read( + notification_id: uuid.UUID, + user: Annotated[User, Depends(get_current_user)], + db: Annotated[AsyncSession, Depends(get_db)], +): + notif = await notification_service.mark_as_read(db, notification_id, user.id) + return NotificationResponse.model_validate(notif) + + +@router.post("/mark-all-read", status_code=status.HTTP_200_OK) +async def mark_all_read( + user: Annotated[User, Depends(get_current_user)], + db: Annotated[AsyncSession, Depends(get_db)], +): + count = await notification_service.mark_all_read(db, user.id) + return {"marked": count} diff --git a/backend/app/api/v1/router.py b/backend/app/api/v1/router.py index 440594f..a165129 100644 --- a/backend/app/api/v1/router.py +++ b/backend/app/api/v1/router.py @@ -7,6 +7,8 @@ from app.api.v1.skills import router as skills_router from app.api.v1.users import router as users_router from app.api.v1.documents import router as documents_router from app.api.v1.memory import router as memory_router +from app.api.v1.notifications import router as notifications_router +from app.api.v1.ws import router as ws_router api_v1_router = APIRouter(prefix="/api/v1") @@ -17,6 +19,8 @@ api_v1_router.include_router(skills_router) api_v1_router.include_router(users_router) api_v1_router.include_router(documents_router) api_v1_router.include_router(memory_router) +api_v1_router.include_router(notifications_router) +api_v1_router.include_router(ws_router) @api_v1_router.get("/health") diff --git a/backend/app/api/v1/ws.py b/backend/app/api/v1/ws.py new file mode 100644 index 0000000..72634fa --- /dev/null +++ b/backend/app/api/v1/ws.py @@ -0,0 +1,55 @@ +import uuid + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.security import decode_access_token +from app.database import async_session_factory +from app.models.user import User +from app.services.ws_manager import manager +from app.services.notification_service import get_unread_count + +router = APIRouter(tags=["websocket"]) + + +async def _authenticate_ws(token: str) -> uuid.UUID | None: + payload = decode_access_token(token) + user_id = payload.get("sub") + if not user_id: + return None + try: + uid = uuid.UUID(user_id) + except ValueError: + return None + + async with async_session_factory() as db: + result = await db.execute(select(User).where(User.id == uid, User.is_active == True)) # noqa: E712 + user = result.scalar_one_or_none() + if not user: + return None + return uid + + +@router.websocket("/ws/notifications") +async def ws_notifications(websocket: WebSocket, token: str = Query(...)): + user_id = await _authenticate_ws(token) + if not user_id: + await websocket.close(code=4001, reason="Unauthorized") + return + + await manager.connect(user_id, websocket) + + try: + # Send initial unread count + async with async_session_factory() as db: + count = await get_unread_count(db, user_id) + await websocket.send_json({"type": "unread_count", "count": count}) + + # Keep alive - wait for disconnect + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + manager.disconnect(user_id, websocket) diff --git a/backend/app/main.py b/backend/app/main.py index 067bc9b..a0951c0 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -13,8 +13,14 @@ async def lifespan(app: FastAPI): alembic_cfg = Config("alembic.ini") command.upgrade(alembic_cfg, "head") + + from app.services.scheduler_service import start_scheduler, shutdown_scheduler + start_scheduler() + yield + shutdown_scheduler() + def create_app() -> FastAPI: app = FastAPI( diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index ab7ee21..a064337 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -6,5 +6,6 @@ from app.models.context_file import ContextFile from app.models.skill import Skill from app.models.document import Document from app.models.memory_entry import MemoryEntry +from app.models.notification import Notification -__all__ = ["User", "Session", "Chat", "Message", "ContextFile", "Skill", "Document", "MemoryEntry"] +__all__ = ["User", "Session", "Chat", "Message", "ContextFile", "Skill", "Document", "MemoryEntry", "Notification"] diff --git a/backend/app/models/notification.py b/backend/app/models/notification.py new file mode 100644 index 0000000..05f0b4f --- /dev/null +++ b/backend/app/models/notification.py @@ -0,0 +1,27 @@ +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, String, Text +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.database import Base + + +class Notification(Base): + __tablename__ = "notifications" + + user_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + title: Mapped[str] = mapped_column(String(255), nullable=False) + body: Mapped[str] = mapped_column(Text, nullable=False) + type: Mapped[str] = mapped_column(String(30), nullable=False, default="info") + channel: Mapped[str] = mapped_column(String(20), nullable=False, default="in_app") + status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending") + scheduled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + sent_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + read_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + metadata_: Mapped[dict | None] = mapped_column("metadata", JSONB, nullable=True) + + user: Mapped["User"] = relationship(back_populates="notifications") # noqa: F821 diff --git a/backend/app/models/user.py b/backend/app/models/user.py index 5ffb416..17e8c11 100644 --- a/backend/app/models/user.py +++ b/backend/app/models/user.py @@ -29,3 +29,4 @@ class User(Base): skills: Mapped[list["Skill"]] = relationship(back_populates="user", cascade="all, delete-orphan") # noqa: F821 documents: Mapped[list["Document"]] = relationship(back_populates="user", cascade="all, delete-orphan") # noqa: F821 memory_entries: Mapped[list["MemoryEntry"]] = relationship(back_populates="user", cascade="all, delete-orphan") # noqa: F821 + notifications: Mapped[list["Notification"]] = relationship(back_populates="user", cascade="all, delete-orphan") # noqa: F821 diff --git a/backend/app/schemas/notification.py b/backend/app/schemas/notification.py new file mode 100644 index 0000000..d62e6f5 --- /dev/null +++ b/backend/app/schemas/notification.py @@ -0,0 +1,30 @@ +import uuid +from datetime import datetime + +from pydantic import BaseModel, Field + + +class NotificationResponse(BaseModel): + id: uuid.UUID + user_id: uuid.UUID + title: str + body: str + type: str + channel: str + status: str + scheduled_at: datetime | None + sent_at: datetime | None + read_at: datetime | None + metadata: dict | None = Field(None, alias="metadata_") + created_at: datetime + + model_config = {"from_attributes": True, "populate_by_name": True} + + +class NotificationListResponse(BaseModel): + notifications: list[NotificationResponse] + unread_count: int + + +class UnreadCountResponse(BaseModel): + count: int diff --git a/backend/app/services/ai_service.py b/backend/app/services/ai_service.py index aaf0466..33e6a9e 100644 --- a/backend/app/services/ai_service.py +++ b/backend/app/services/ai_service.py @@ -14,6 +14,8 @@ from app.services.context_service import DEFAULT_SYSTEM_PROMPT, get_primary_cont from app.services.chat_service import get_chat, save_message from app.services.memory_service import get_critical_memories, create_memory, get_user_memories from app.services.document_service import search_documents +from app.services.notification_service import create_notification +from app.services.ws_manager import manager client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) @@ -68,6 +70,28 @@ AI_TOOLS = [ "required": [], }, }, + { + "name": "schedule_notification", + "description": "Schedule a notification or reminder for the user. Can be immediate or scheduled for a future time.", + "input_schema": { + "type": "object", + "properties": { + "title": {"type": "string", "description": "Notification title"}, + "body": {"type": "string", "description": "Notification body text"}, + "scheduled_at": { + "type": "string", + "description": "ISO 8601 datetime for scheduled delivery. Omit for immediate.", + }, + "type": { + "type": "string", + "enum": ["reminder", "alert", "info"], + "description": "Notification type", + "default": "reminder", + }, + }, + "required": ["title", "body"], + }, + }, ] @@ -104,6 +128,40 @@ async def _execute_tool( items = [{"category": e.category, "title": e.title, "content": e.content, "importance": e.importance} for e in entries] return json.dumps({"entries": items, "count": len(items)}) + elif tool_name == "schedule_notification": + from datetime import datetime + scheduled_at = None + if tool_input.get("scheduled_at"): + scheduled_at = datetime.fromisoformat(tool_input["scheduled_at"]) + + notif = await create_notification( + db, user_id, + title=tool_input["title"], + body=tool_input["body"], + type=tool_input.get("type", "reminder"), + scheduled_at=scheduled_at, + ) + await db.commit() + + # Push immediately if not scheduled + if not scheduled_at: + await manager.send_to_user(user_id, { + "type": "new_notification", + "notification": { + "id": str(notif.id), + "title": notif.title, + "body": notif.body, + "type": notif.type, + "created_at": notif.created_at.isoformat(), + }, + }) + + return json.dumps({ + "status": "scheduled" if scheduled_at else "sent", + "id": str(notif.id), + "title": notif.title, + }) + return json.dumps({"error": f"Unknown tool: {tool_name}"}) diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py new file mode 100644 index 0000000..742894e --- /dev/null +++ b/backend/app/services/notification_service.py @@ -0,0 +1,102 @@ +import uuid +from datetime import datetime, timezone + +from fastapi import HTTPException, status +from sqlalchemy import func, select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.notification import Notification + + +async def create_notification( + db: AsyncSession, + user_id: uuid.UUID, + title: str, + body: str, + type: str = "info", + channel: str = "in_app", + scheduled_at: datetime | None = None, + metadata: dict | None = None, +) -> Notification: + notif = Notification( + user_id=user_id, + title=title, + body=body, + type=type, + channel=channel, + status="pending" if scheduled_at else "sent", + scheduled_at=scheduled_at, + sent_at=None if scheduled_at else datetime.now(timezone.utc), + metadata_=metadata, + ) + db.add(notif) + await db.flush() + return notif + + +async def get_user_notifications( + db: AsyncSession, user_id: uuid.UUID, + status_filter: str | None = None, limit: int = 50, offset: int = 0, +) -> list[Notification]: + stmt = select(Notification).where(Notification.user_id == user_id) + if status_filter: + stmt = stmt.where(Notification.status == status_filter) + stmt = stmt.order_by(Notification.created_at.desc()).limit(limit).offset(offset) + result = await db.execute(stmt) + return list(result.scalars().all()) + + +async def get_unread_count(db: AsyncSession, user_id: uuid.UUID) -> int: + result = await db.scalar( + select(func.count()).select_from(Notification).where( + Notification.user_id == user_id, + Notification.status.in_(["sent", "delivered"]), + Notification.read_at.is_(None), + ) + ) + return result or 0 + + +async def mark_as_read(db: AsyncSession, notification_id: uuid.UUID, user_id: uuid.UUID) -> Notification: + result = await db.execute( + select(Notification).where(Notification.id == notification_id, Notification.user_id == user_id) + ) + notif = result.scalar_one_or_none() + if not notif: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Notification not found") + notif.status = "read" + notif.read_at = datetime.now(timezone.utc) + await db.flush() + return notif + + +async def mark_all_read(db: AsyncSession, user_id: uuid.UUID) -> int: + result = await db.execute( + update(Notification) + .where( + Notification.user_id == user_id, + Notification.read_at.is_(None), + ) + .values(status="read", read_at=datetime.now(timezone.utc)) + ) + return result.rowcount + + +async def get_pending_scheduled(db: AsyncSession) -> list[Notification]: + now = datetime.now(timezone.utc) + result = await db.execute( + select(Notification).where( + Notification.status == "pending", + Notification.scheduled_at <= now, + ) + ) + return list(result.scalars().all()) + + +async def mark_as_sent(db: AsyncSession, notification_id: uuid.UUID) -> None: + result = await db.execute(select(Notification).where(Notification.id == notification_id)) + notif = result.scalar_one_or_none() + if notif: + notif.status = "sent" + notif.sent_at = datetime.now(timezone.utc) + await db.flush() diff --git a/backend/app/services/scheduler_service.py b/backend/app/services/scheduler_service.py new file mode 100644 index 0000000..5b0c9e4 --- /dev/null +++ b/backend/app/services/scheduler_service.py @@ -0,0 +1,41 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.date import DateTrigger + +scheduler = AsyncIOScheduler() + + +def start_scheduler(): + if not scheduler.running: + # Register periodic jobs + from app.workers.notification_sender import send_pending_notifications + scheduler.add_job( + send_pending_notifications, + trigger=CronTrigger(second="0", minute="*"), # every minute + id="send_pending_notifications", + replace_existing=True, + ) + + from app.workers.health_review import run_daily_health_review + scheduler.add_job( + run_daily_health_review, + trigger=CronTrigger(hour=8, minute=0), + id="daily_health_review", + replace_existing=True, + ) + + scheduler.start() + + +def shutdown_scheduler(): + if scheduler.running: + scheduler.shutdown(wait=False) + + +def schedule_one_time(job_id: str, run_date, func, **kwargs): + scheduler.add_job(func, trigger=DateTrigger(run_date=run_date), id=job_id, replace_existing=True, kwargs=kwargs) + + +def schedule_recurring(job_id: str, cron_expr: str, func, **kwargs): + trigger = CronTrigger.from_crontab(cron_expr) + scheduler.add_job(func, trigger=trigger, id=job_id, replace_existing=True, kwargs=kwargs) diff --git a/backend/app/services/ws_manager.py b/backend/app/services/ws_manager.py new file mode 100644 index 0000000..e9464a4 --- /dev/null +++ b/backend/app/services/ws_manager.py @@ -0,0 +1,37 @@ +import json +import uuid + +from fastapi import WebSocket + + +class ConnectionManager: + def __init__(self): + self.active_connections: dict[uuid.UUID, list[WebSocket]] = {} + + async def connect(self, user_id: uuid.UUID, websocket: WebSocket): + await websocket.accept() + if user_id not in self.active_connections: + self.active_connections[user_id] = [] + self.active_connections[user_id].append(websocket) + + def disconnect(self, user_id: uuid.UUID, websocket: WebSocket): + if user_id in self.active_connections: + self.active_connections[user_id] = [ + ws for ws in self.active_connections[user_id] if ws != websocket + ] + if not self.active_connections[user_id]: + del self.active_connections[user_id] + + async def send_to_user(self, user_id: uuid.UUID, data: dict): + connections = self.active_connections.get(user_id, []) + dead = [] + for ws in connections: + try: + await ws.send_text(json.dumps(data)) + except Exception: + dead.append(ws) + for ws in dead: + self.disconnect(user_id, ws) + + +manager = ConnectionManager() diff --git a/backend/app/workers/health_review.py b/backend/app/workers/health_review.py new file mode 100644 index 0000000..a219dbd --- /dev/null +++ b/backend/app/workers/health_review.py @@ -0,0 +1,83 @@ +"""Daily job: proactive health review for all users with health data.""" +import asyncio +import logging + +from anthropic import AsyncAnthropic +from sqlalchemy import select + +from app.config import settings +from app.database import async_session_factory +from app.models.user import User +from app.services.memory_service import get_critical_memories +from app.services.notification_service import create_notification +from app.services.ws_manager import manager + +logger = logging.getLogger(__name__) +client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) + + +async def run_daily_health_review(): + """Review each user's health profile and generate reminder notifications.""" + if not settings.ANTHROPIC_API_KEY: + return + + async with async_session_factory() as db: + result = await db.execute(select(User).where(User.is_active == True)) # noqa: E712 + users = result.scalars().all() + + for user in users: + try: + await _review_user(user) + await asyncio.sleep(1) # Rate limit + except Exception: + logger.exception(f"Health review failed for user {user.id}") + + +async def _review_user(user: User): + async with async_session_factory() as db: + memories = await get_critical_memories(db, user.id) + if not memories: + return + + memory_text = "\n".join(f"- [{m.category}] {m.title}: {m.content}" for m in memories) + + response = await client.messages.create( + model=settings.CLAUDE_MODEL, + max_tokens=500, + system="You are a health assistant. Based on the user's health profile, suggest any upcoming checkups, medication reviews, or health actions that should be reminded. Respond with a JSON array of objects with 'title' and 'body' fields. If no reminders are needed, return an empty array [].", + messages=[{"role": "user", "content": f"User health profile:\n{memory_text}"}], + ) + + import json + try: + text = response.content[0].text.strip() + # Extract JSON from response + if "[" in text: + json_str = text[text.index("["):text.rindex("]") + 1] + reminders = json.loads(json_str) + else: + return + except (json.JSONDecodeError, ValueError): + return + + for reminder in reminders[:5]: # Max 5 reminders per user per day + if "title" in reminder and "body" in reminder: + notif = await create_notification( + db, + user.id, + title=reminder["title"], + body=reminder["body"], + type="ai_generated", + ) + await db.commit() + + await manager.send_to_user(user.id, { + "type": "new_notification", + "notification": { + "id": str(notif.id), + "title": notif.title, + "body": notif.body, + "type": notif.type, + "created_at": notif.created_at.isoformat(), + }, + }) diff --git a/backend/app/workers/notification_sender.py b/backend/app/workers/notification_sender.py new file mode 100644 index 0000000..aebe7d3 --- /dev/null +++ b/backend/app/workers/notification_sender.py @@ -0,0 +1,24 @@ +"""Periodic job: send pending scheduled notifications via WebSocket.""" +from app.database import async_session_factory +from app.services.notification_service import get_pending_scheduled, mark_as_sent +from app.services.ws_manager import manager + + +async def send_pending_notifications(): + async with async_session_factory() as db: + pending = await get_pending_scheduled(db) + for notif in pending: + await mark_as_sent(db, notif.id) + await db.commit() + + # Push via WebSocket + await manager.send_to_user(notif.user_id, { + "type": "new_notification", + "notification": { + "id": str(notif.id), + "title": notif.title, + "body": notif.body, + "type": notif.type, + "created_at": notif.created_at.isoformat(), + }, + }) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 200def1..61acbd0 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "anthropic>=0.40.0", "pymupdf>=1.24.0", "aiofiles>=24.0.0", + "apscheduler>=3.10.0", ] [project.optional-dependencies] diff --git a/backend/tests/test_notifications.py b/backend/tests/test_notifications.py new file mode 100644 index 0000000..c3996fd --- /dev/null +++ b/backend/tests/test_notifications.py @@ -0,0 +1,46 @@ +import pytest +from httpx import AsyncClient + + +@pytest.fixture +async def auth_headers(client: AsyncClient): + resp = await client.post("/api/v1/auth/register", json={ + "email": "notifuser@example.com", + "username": "notifuser", + "password": "testpass123", + }) + assert resp.status_code == 201 + return {"Authorization": f"Bearer {resp.json()['access_token']}"} + + +async def test_list_notifications_empty(client: AsyncClient, auth_headers: dict): + resp = await client.get("/api/v1/notifications/", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["notifications"] == [] + assert data["unread_count"] == 0 + + +async def test_unread_count(client: AsyncClient, auth_headers: dict): + resp = await client.get("/api/v1/notifications/unread-count", headers=auth_headers) + assert resp.status_code == 200 + assert resp.json()["count"] == 0 + + +async def test_mark_all_read(client: AsyncClient, auth_headers: dict): + resp = await client.post("/api/v1/notifications/mark-all-read", headers=auth_headers) + assert resp.status_code == 200 + assert resp.json()["marked"] == 0 + + +async def test_mark_nonexistent_read(client: AsyncClient, auth_headers: dict): + resp = await client.patch( + "/api/v1/notifications/00000000-0000-0000-0000-000000000000/read", + headers=auth_headers, + ) + assert resp.status_code == 404 + + +async def test_unauthenticated(client: AsyncClient): + resp = await client.get("/api/v1/notifications/") + assert resp.status_code == 401 diff --git a/frontend/public/locales/en/translation.json b/frontend/public/locales/en/translation.json index b444cd1..c7ecc6b 100644 --- a/frontend/public/locales/en/translation.json +++ b/frontend/public/locales/en/translation.json @@ -90,6 +90,19 @@ "subtitle": "This context is added to all your AI conversations", "placeholder": "Add personal information that the AI should know about you..." }, + "notifications": { + "title": "Notifications", + "no_notifications": "No notifications yet.", + "mark_all_read": "Mark all read", + "mark_read": "Mark as read", + "view_all": "View all notifications", + "types": { + "reminder": "Reminder", + "alert": "Alert", + "info": "Info", + "ai_generated": "AI Generated" + } + }, "documents": { "upload": "Upload", "drop_or_click": "Drop a file here or click to browse", diff --git a/frontend/public/locales/ru/translation.json b/frontend/public/locales/ru/translation.json index 0b6ee88..e57e00b 100644 --- a/frontend/public/locales/ru/translation.json +++ b/frontend/public/locales/ru/translation.json @@ -90,6 +90,19 @@ "subtitle": "Этот контекст добавляется ко всем вашим разговорам с ИИ", "placeholder": "Добавьте личную информацию, которую ИИ должен знать о вас..." }, + "notifications": { + "title": "Уведомления", + "no_notifications": "Уведомлений пока нет.", + "mark_all_read": "Отметить все как прочитанные", + "mark_read": "Отметить как прочитанное", + "view_all": "Все уведомления", + "types": { + "reminder": "Напоминание", + "alert": "Оповещение", + "info": "Информация", + "ai_generated": "От ИИ" + } + }, "documents": { "upload": "Загрузить", "drop_or_click": "Перетащите файл или нажмите для выбора", diff --git a/frontend/src/api/notifications.ts b/frontend/src/api/notifications.ts new file mode 100644 index 0000000..29c549c --- /dev/null +++ b/frontend/src/api/notifications.ts @@ -0,0 +1,44 @@ +import api from "./client"; + +export interface Notification { + id: string; + user_id: string; + title: string; + body: string; + type: string; + channel: string; + status: string; + scheduled_at: string | null; + sent_at: string | null; + read_at: string | null; + metadata: Record | null; + created_at: string; +} + +export interface NotificationListResponse { + notifications: Notification[]; + unread_count: number; +} + +export async function getNotifications(params?: { + status?: string; + limit?: number; + offset?: number; +}): Promise { + const { data } = await api.get("/notifications/", { params }); + return data; +} + +export async function getUnreadCount(): Promise { + const { data } = await api.get<{ count: number }>("/notifications/unread-count"); + return data.count; +} + +export async function markAsRead(notificationId: string): Promise { + const { data } = await api.patch(`/notifications/${notificationId}/read`); + return data; +} + +export async function markAllRead(): Promise { + await api.post("/notifications/mark-all-read"); +} diff --git a/frontend/src/components/layout/app-layout.tsx b/frontend/src/components/layout/app-layout.tsx index 32d0a5d..25e47cb 100644 --- a/frontend/src/components/layout/app-layout.tsx +++ b/frontend/src/components/layout/app-layout.tsx @@ -1,8 +1,11 @@ import { Outlet } from "react-router-dom"; import { Sidebar } from "./sidebar"; import { Header } from "./header"; +import { useNotificationsWS } from "@/hooks/use-notifications-ws"; export function AppLayout() { + useNotificationsWS(); + return (
diff --git a/frontend/src/components/layout/header.tsx b/frontend/src/components/layout/header.tsx index cc60b96..f42ee7d 100644 --- a/frontend/src/components/layout/header.tsx +++ b/frontend/src/components/layout/header.tsx @@ -4,6 +4,7 @@ import { Menu, Sun, Moon, LogOut, User } from "lucide-react"; import { useAuthStore } from "@/stores/auth-store"; import { useUIStore } from "@/stores/ui-store"; import { LanguageToggle } from "@/components/shared/language-toggle"; +import { NotificationBell } from "@/components/notifications/notification-bell"; import { logout as logoutApi } from "@/api/auth"; export function Header() { @@ -41,6 +42,8 @@ export function Header() { + + + + {open && ( +
+
+ {t("notifications.title")} + {unreadCount > 0 && ( + + )} +
+ +
+ {recent.length === 0 ? ( +

+ {t("notifications.no_notifications")} +

+ ) : ( + recent.map((n) => ( +
+

{n.title}

+

{n.body}

+
+ )) + )} +
+ +
+ +
+
+ )} +
+ ); +} diff --git a/frontend/src/hooks/use-notifications-ws.ts b/frontend/src/hooks/use-notifications-ws.ts new file mode 100644 index 0000000..354474b --- /dev/null +++ b/frontend/src/hooks/use-notifications-ws.ts @@ -0,0 +1,67 @@ +import { useEffect, useRef, useState } from "react"; +import { useAuthStore } from "@/stores/auth-store"; +import { useNotificationStore } from "@/stores/notification-store"; + +export function useNotificationsWS() { + const accessToken = useAuthStore((s) => s.accessToken); + const isAuthenticated = useAuthStore((s) => s.isAuthenticated); + const { setUnreadCount, addNotification } = useNotificationStore(); + const wsRef = useRef(null); + const [isConnected, setIsConnected] = useState(false); + const reconnectDelay = useRef(1000); + + useEffect(() => { + if (!isAuthenticated || !accessToken) return; + + let mounted = true; + + function connect() { + if (!mounted) return; + + const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; + const ws = new WebSocket( + `${protocol}//${window.location.host}/api/v1/ws/notifications?token=${accessToken}` + ); + wsRef.current = ws; + + ws.onopen = () => { + setIsConnected(true); + reconnectDelay.current = 1000; + }; + + ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + if (data.type === "unread_count") { + setUnreadCount(data.count); + } else if (data.type === "new_notification") { + addNotification(data.notification); + } + } catch { + // ignore parse errors + } + }; + + ws.onclose = () => { + setIsConnected(false); + if (mounted) { + setTimeout(connect, reconnectDelay.current); + reconnectDelay.current = Math.min(reconnectDelay.current * 2, 30000); + } + }; + + ws.onerror = () => { + ws.close(); + }; + } + + connect(); + + return () => { + mounted = false; + wsRef.current?.close(); + }; + }, [isAuthenticated, accessToken, setUnreadCount, addNotification]); + + return { isConnected }; +} diff --git a/frontend/src/pages/notifications.tsx b/frontend/src/pages/notifications.tsx new file mode 100644 index 0000000..136221f --- /dev/null +++ b/frontend/src/pages/notifications.tsx @@ -0,0 +1,96 @@ +import { useEffect } from "react"; +import { useTranslation } from "react-i18next"; +import { useQuery } from "@tanstack/react-query"; +import { getNotifications, markAsRead, markAllRead as markAllReadApi } from "@/api/notifications"; +import { useNotificationStore } from "@/stores/notification-store"; +import { Check, Bell } from "lucide-react"; +import { cn } from "@/lib/utils"; + +const typeColors: Record = { + reminder: "bg-blue-100 text-blue-800 dark:bg-blue-900/30 dark:text-blue-400", + alert: "bg-red-100 text-red-800 dark:bg-red-900/30 dark:text-red-400", + info: "bg-gray-100 text-gray-800 dark:bg-gray-900/30 dark:text-gray-400", + ai_generated: "bg-purple-100 text-purple-800 dark:bg-purple-900/30 dark:text-purple-400", +}; + +export function NotificationsPage() { + const { t } = useTranslation(); + const { notifications, setNotifications, setUnreadCount, markRead, markAllRead } = useNotificationStore(); + + const { data } = useQuery({ + queryKey: ["notifications"], + queryFn: () => getNotifications({ limit: 100 }), + }); + + useEffect(() => { + if (data) { + setNotifications(data.notifications); + setUnreadCount(data.unread_count); + } + }, [data, setNotifications, setUnreadCount]); + + const handleMarkRead = async (id: string) => { + await markAsRead(id); + markRead(id); + }; + + const handleMarkAllRead = async () => { + await markAllReadApi(); + markAllRead(); + }; + + return ( +
+
+

{t("notifications.title")}

+ +
+ + {notifications.length === 0 ? ( +
+ +

{t("notifications.no_notifications")}

+
+ ) : ( +
+ {notifications.map((n) => ( +
+
+
+

{n.title}

+ + {t(`notifications.types.${n.type}`)} + +
+

{n.body}

+

+ {new Date(n.created_at).toLocaleString()} +

+
+ {!n.read_at && ( + + )} +
+ ))} +
+ )} +
+ ); +} diff --git a/frontend/src/routes.tsx b/frontend/src/routes.tsx index aff8507..2e92fc7 100644 --- a/frontend/src/routes.tsx +++ b/frontend/src/routes.tsx @@ -11,6 +11,7 @@ import { AdminContextPage } from "@/pages/admin/context"; import { AdminSkillsPage } from "@/pages/admin/skills"; import { DocumentsPage } from "@/pages/documents"; import { MemoryPage } from "@/pages/memory"; +import { NotificationsPage } from "@/pages/notifications"; import { NotFoundPage } from "@/pages/not-found"; export const router = createBrowserRouter([ @@ -33,6 +34,7 @@ export const router = createBrowserRouter([ { path: "chat/:chatId", element: }, { path: "documents", element: }, { path: "memory", element: }, + { path: "notifications", element: }, { path: "skills", element: }, { path: "profile/context", element: }, { path: "admin/context", element: }, diff --git a/frontend/src/stores/notification-store.ts b/frontend/src/stores/notification-store.ts new file mode 100644 index 0000000..6473b59 --- /dev/null +++ b/frontend/src/stores/notification-store.ts @@ -0,0 +1,40 @@ +import { create } from "zustand"; +import type { Notification } from "@/api/notifications"; + +interface NotificationState { + notifications: Notification[]; + unreadCount: number; + setNotifications: (notifications: Notification[]) => void; + setUnreadCount: (count: number) => void; + addNotification: (notification: Notification) => void; + markRead: (id: string) => void; + markAllRead: () => void; +} + +export const useNotificationStore = create()((set) => ({ + notifications: [], + unreadCount: 0, + setNotifications: (notifications) => set({ notifications }), + setUnreadCount: (count) => set({ unreadCount: count }), + addNotification: (notification) => + set((s) => ({ + notifications: [notification, ...s.notifications], + unreadCount: s.unreadCount + 1, + })), + markRead: (id) => + set((s) => ({ + notifications: s.notifications.map((n) => + n.id === id ? { ...n, status: "read", read_at: new Date().toISOString() } : n + ), + unreadCount: Math.max(0, s.unreadCount - 1), + })), + markAllRead: () => + set((s) => ({ + notifications: s.notifications.map((n) => ({ + ...n, + status: "read", + read_at: n.read_at || new Date().toISOString(), + })), + unreadCount: 0, + })), +})); diff --git a/plans/phase-5-notifications.md b/plans/phase-5-notifications.md new file mode 100644 index 0000000..f3281c8 --- /dev/null +++ b/plans/phase-5-notifications.md @@ -0,0 +1,84 @@ +# Phase 5: Notifications — Subplan + +## Goal + +Deliver in-app real-time notifications via WebSocket, APScheduler for scheduled/recurring notifications, AI tool `schedule_notification`, daily proactive health review job, and frontend notification UI (bell in header, notification list page). + +## Prerequisites + +- Phase 4 completed: memory service, AI tools, document processing +- APScheduler added to dependencies + +--- + +## Tasks + +### A. Backend Model & Migration (Tasks 1–3) + +- [x] **A1.** Create `backend/app/models/notification.py`: Notification model (user_id, title, body, type, channel, status, scheduled_at, sent_at, read_at, metadata JSONB). +- [x] **A2.** Add `notifications` relationship on User model. Update `models/__init__.py`. +- [x] **A3.** Create migration `005_create_notifications.py`. + +### B. Backend Schemas (Task 4) + +- [x] **B4.** Create `backend/app/schemas/notification.py`: NotificationResponse, NotificationListResponse, UnreadCountResponse. + +### C. Backend Services (Tasks 5–7) + +- [x] **C5.** Create `backend/app/services/notification_service.py`: create, list, unread count, mark read, mark all read, get pending scheduled, mark sent. +- [x] **C6.** Create `backend/app/services/ws_manager.py`: ConnectionManager class (connect, disconnect, send_to_user). +- [x] **C7.** Create `backend/app/services/scheduler_service.py`: AsyncIOScheduler wrapper (start, stop, schedule one-time, schedule recurring). + +### D. Backend API (Tasks 8–10) + +- [x] **D8.** Create `backend/app/api/v1/notifications.py`: GET list, GET unread-count, PATCH /{id}/read, POST mark-all-read. +- [x] **D9.** Create `backend/app/api/v1/ws.py`: WebSocket /ws/notifications with JWT auth via query param. +- [x] **D10.** Register routers. Wire scheduler in main.py lifespan. Add `apscheduler` to pyproject.toml. + +### E. Backend AI Tool + Workers (Tasks 11–13) + +- [x] **E11.** Add `schedule_notification` tool to AI_TOOLS + handler in `_execute_tool` in ai_service.py. +- [x] **E12.** Create `backend/app/workers/notification_sender.py`: periodic job (every 60s) sends pending scheduled notifications via WebSocket. +- [x] **E13.** Create `backend/app/workers/health_review.py`: daily job queries user memories/docs, asks Claude for reminders, creates ai_generated notifications. + +### F. Frontend API & Store (Tasks 14–16) + +- [x] **F14.** Create `frontend/src/api/notifications.ts`. +- [x] **F15.** Create `frontend/src/stores/notification-store.ts` (Zustand). +- [x] **F16.** Create `frontend/src/hooks/use-notifications-ws.ts`: WebSocket with auto-reconnect. + +### G. Frontend UI (Tasks 17–20) + +- [x] **G17.** Create `frontend/src/components/notifications/notification-bell.tsx`: bell icon + unread badge + dropdown. +- [x] **G18.** Integrate bell into header.tsx. Init WebSocket in AppLayout. +- [x] **G19.** Create `frontend/src/pages/notifications.tsx`: full list with type filter, mark read. +- [x] **G20.** Update routes + enable sidebar notifications link. + +### H. i18n (Task 21) + +- [x] **H21.** Update en/ru translations with notification keys. + +### I. Tests (Tasks 22–23) + +- [x] **I22.** Create `backend/tests/test_notifications.py`: CRUD, mark read, unread count, ownership isolation. +- [x] **I23.** Verify frontend builds cleanly. + +--- + +## Acceptance Criteria + +1. Notification CRUD API works with auth and ownership isolation +2. WebSocket pushes notifications in real time +3. AI `schedule_notification` tool creates scheduled/immediate notifications +4. APScheduler fires pending notifications at scheduled_at time +5. Daily health review generates ai_generated notifications +6. Frontend bell shows unread count, dropdown shows recent, updates in real time +7. Notifications page with type filter and mark-as-read +8. All UI text in English and Russian +9. Backend tests pass, frontend builds clean + +--- + +## Status + +**COMPLETED**