Fix Phase 5 review issues: mark_all_read, datetime, WS payload, logging

Review fixes:
- mark_all_read now filters status IN (sent, delivered), preventing
  pending notifications from being incorrectly marked as read
- schedule_notification tool: force UTC on naive datetimes, reject
  past dates (send immediately instead)
- WebSocket notification payload now includes all fields matching
  NotificationResponse (user_id, channel, status, scheduled_at, etc.)
- Centralized _serialize_notification helper in notification_sender
- notification_sender: per-notification error handling with rollback
- health_review: moved import json to module top level

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 14:14:08 +03:00
parent ada7e82961
commit 04e3ae8319
4 changed files with 44 additions and 29 deletions

View File

@@ -129,10 +129,14 @@ async def _execute_tool(
return json.dumps({"entries": items, "count": len(items)}) return json.dumps({"entries": items, "count": len(items)})
elif tool_name == "schedule_notification": elif tool_name == "schedule_notification":
from datetime import datetime from datetime import datetime, timezone as tz
scheduled_at = None scheduled_at = None
if tool_input.get("scheduled_at"): if tool_input.get("scheduled_at"):
scheduled_at = datetime.fromisoformat(tool_input["scheduled_at"]) scheduled_at = datetime.fromisoformat(tool_input["scheduled_at"])
if scheduled_at.tzinfo is None:
scheduled_at = scheduled_at.replace(tzinfo=tz.utc)
if scheduled_at <= datetime.now(tz.utc):
scheduled_at = None # Past date → send immediately
notif = await create_notification( notif = await create_notification(
db, user_id, db, user_id,
@@ -145,15 +149,10 @@ async def _execute_tool(
# Push immediately if not scheduled # Push immediately if not scheduled
if not scheduled_at: if not scheduled_at:
from app.workers.notification_sender import _serialize_notification
await manager.send_to_user(user_id, { await manager.send_to_user(user_id, {
"type": "new_notification", "type": "new_notification",
"notification": { "notification": _serialize_notification(notif),
"id": str(notif.id),
"title": notif.title,
"body": notif.body,
"type": notif.type,
"created_at": notif.created_at.isoformat(),
},
}) })
return json.dumps({ return json.dumps({

View File

@@ -76,6 +76,7 @@ async def mark_all_read(db: AsyncSession, user_id: uuid.UUID) -> int:
.where( .where(
Notification.user_id == user_id, Notification.user_id == user_id,
Notification.read_at.is_(None), Notification.read_at.is_(None),
Notification.status.in_(["sent", "delivered"]),
) )
.values(status="read", read_at=datetime.now(timezone.utc)) .values(status="read", read_at=datetime.now(timezone.utc))
) )

View File

@@ -1,5 +1,6 @@
"""Daily job: proactive health review for all users with health data.""" """Daily job: proactive health review for all users with health data."""
import asyncio import asyncio
import json
import logging import logging
from anthropic import AsyncAnthropic from anthropic import AsyncAnthropic
@@ -48,7 +49,6 @@ async def _review_user(user: User):
messages=[{"role": "user", "content": f"User health profile:\n{memory_text}"}], messages=[{"role": "user", "content": f"User health profile:\n{memory_text}"}],
) )
import json
try: try:
text = response.content[0].text.strip() text = response.content[0].text.strip()
# Extract JSON from response # Extract JSON from response
@@ -71,13 +71,8 @@ async def _review_user(user: User):
) )
await db.commit() await db.commit()
from app.workers.notification_sender import _serialize_notification
await manager.send_to_user(user.id, { await manager.send_to_user(user.id, {
"type": "new_notification", "type": "new_notification",
"notification": { "notification": _serialize_notification(notif),
"id": str(notif.id),
"title": notif.title,
"body": notif.body,
"type": notif.type,
"created_at": notif.created_at.isoformat(),
},
}) })

View File

@@ -1,24 +1,44 @@
"""Periodic job: send pending scheduled notifications via WebSocket.""" """Periodic job: send pending scheduled notifications via WebSocket."""
import logging
from app.database import async_session_factory from app.database import async_session_factory
from app.services.notification_service import get_pending_scheduled, mark_as_sent from app.services.notification_service import get_pending_scheduled, mark_as_sent
from app.services.ws_manager import manager from app.services.ws_manager import manager
logger = logging.getLogger(__name__)
def _serialize_notification(notif) -> dict:
return {
"id": str(notif.id),
"user_id": str(notif.user_id),
"title": notif.title,
"body": notif.body,
"type": notif.type,
"channel": notif.channel,
"status": "sent",
"scheduled_at": notif.scheduled_at.isoformat() if notif.scheduled_at else None,
"sent_at": notif.sent_at.isoformat() if notif.sent_at else None,
"read_at": None,
"metadata": notif.metadata_,
"created_at": notif.created_at.isoformat(),
}
async def send_pending_notifications(): async def send_pending_notifications():
async with async_session_factory() as db: async with async_session_factory() as db:
pending = await get_pending_scheduled(db) pending = await get_pending_scheduled(db)
for notif in pending: for notif in pending:
await mark_as_sent(db, notif.id) try:
await db.commit() notif.status = "sent"
from datetime import datetime, timezone
notif.sent_at = datetime.now(timezone.utc)
await db.commit()
# Push via WebSocket await manager.send_to_user(notif.user_id, {
await manager.send_to_user(notif.user_id, { "type": "new_notification",
"type": "new_notification", "notification": _serialize_notification(notif),
"notification": { })
"id": str(notif.id), except Exception:
"title": notif.title, logger.exception(f"Failed to send notification {notif.id}")
"body": notif.body, await db.rollback()
"type": notif.type,
"created_at": notif.created_at.isoformat(),
},
})