0eb899afb9
Notifications: - Add shared http_base, redact, and SSRF hardening modules - Refactor dispatcher, queue, receiver and per-provider clients (telegram, discord, email, matrix, ntfy, slack, webhook) to use the shared base, with bounded queue and redacted error logs - Tests for ssrf, redact, http_base, queue bounds, dispatcher aggregation, telegram media partition, email and matrix clients Frontend: - Settings: log level / log format selectors now use IconGridSelect with per-option icons and i18n descriptions - Minor providers page and entity-cache store updates Tooling: - Document code-review-graph MCP usage in CLAUDE.md - Ignore .code-review-graph/, register .mcp.json
85 lines
2.3 KiB
Python
85 lines
2.3 KiB
Python
"""NotificationQueue bound + concurrency regression tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from notify_bridge_core.notifications.queue import (
|
|
DEFAULT_MAX_QUEUE_SIZE,
|
|
NotificationQueue,
|
|
)
|
|
|
|
|
|
class _MemBackend:
|
|
"""In-memory storage backend stub for tests."""
|
|
|
|
def __init__(self) -> None:
|
|
self._data: dict[str, Any] | None = None
|
|
|
|
async def load(self) -> dict[str, Any] | None:
|
|
return self._data
|
|
|
|
async def save(self, data: dict[str, Any]) -> None:
|
|
self._data = data
|
|
|
|
async def remove(self) -> None:
|
|
self._data = None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_load_with_garbage_falls_back_to_empty() -> None:
|
|
backend = _MemBackend()
|
|
backend._data = {"queue": "not-a-list"} # type: ignore[assignment]
|
|
q = NotificationQueue(backend)
|
|
await q.async_load()
|
|
assert q.get_all() == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_enqueue_caps_at_max_size() -> None:
|
|
backend = _MemBackend()
|
|
q = NotificationQueue(backend, max_size=3)
|
|
await q.async_load()
|
|
for i in range(10):
|
|
await q.async_enqueue({"i": i})
|
|
items = q.get_all()
|
|
assert len(items) == 3
|
|
# FIFO drop: most recent three are kept (i=7..9).
|
|
assert [it["params"]["i"] for it in items] == [7, 8, 9]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_all_returns_deep_copy() -> None:
|
|
backend = _MemBackend()
|
|
q = NotificationQueue(backend, max_size=10)
|
|
await q.async_load()
|
|
await q.async_enqueue({"key": "value"})
|
|
snap = q.get_all()
|
|
snap[0]["params"]["key"] = "MUTATED"
|
|
snap2 = q.get_all()
|
|
assert snap2[0]["params"]["key"] == "value"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_enqueue_and_clear_no_corruption() -> None:
|
|
backend = _MemBackend()
|
|
q = NotificationQueue(backend, max_size=DEFAULT_MAX_QUEUE_SIZE)
|
|
await q.async_load()
|
|
|
|
async def producer() -> None:
|
|
for i in range(50):
|
|
await q.async_enqueue({"i": i})
|
|
|
|
async def clearer() -> None:
|
|
for _ in range(10):
|
|
await asyncio.sleep(0)
|
|
await q.async_clear()
|
|
|
|
await asyncio.gather(producer(), clearer())
|
|
# No exceptions = no race-induced "dictionary changed size during iteration".
|
|
items = q.get_all()
|
|
assert isinstance(items, list)
|