Files
alexei.dolgolyov 0eb899afb9 feat: harden notification stack and switch logging selectors to icon grid
Notifications:
- Add shared http_base, redact, and SSRF hardening modules
- Refactor dispatcher, queue, receiver and per-provider clients
  (telegram, discord, email, matrix, ntfy, slack, webhook) to use
  the shared base, with bounded queue and redacted error logs
- Tests for ssrf, redact, http_base, queue bounds, dispatcher
  aggregation, telegram media partition, email and matrix clients

Frontend:
- Settings: log level / log format selectors now use IconGridSelect
  with per-option icons and i18n descriptions
- Minor providers page and entity-cache store updates

Tooling:
- Document code-review-graph MCP usage in CLAUDE.md
- Ignore .code-review-graph/, register .mcp.json
2026-05-07 13:53:26 +03:00

85 lines
2.3 KiB
Python

"""NotificationQueue bound + concurrency regression tests."""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from notify_bridge_core.notifications.queue import (
DEFAULT_MAX_QUEUE_SIZE,
NotificationQueue,
)
class _MemBackend:
"""In-memory storage backend stub for tests."""
def __init__(self) -> None:
self._data: dict[str, Any] | None = None
async def load(self) -> dict[str, Any] | None:
return self._data
async def save(self, data: dict[str, Any]) -> None:
self._data = data
async def remove(self) -> None:
self._data = None
@pytest.mark.asyncio
async def test_load_with_garbage_falls_back_to_empty() -> None:
backend = _MemBackend()
backend._data = {"queue": "not-a-list"} # type: ignore[assignment]
q = NotificationQueue(backend)
await q.async_load()
assert q.get_all() == []
@pytest.mark.asyncio
async def test_enqueue_caps_at_max_size() -> None:
backend = _MemBackend()
q = NotificationQueue(backend, max_size=3)
await q.async_load()
for i in range(10):
await q.async_enqueue({"i": i})
items = q.get_all()
assert len(items) == 3
# FIFO drop: most recent three are kept (i=7..9).
assert [it["params"]["i"] for it in items] == [7, 8, 9]
@pytest.mark.asyncio
async def test_get_all_returns_deep_copy() -> None:
backend = _MemBackend()
q = NotificationQueue(backend, max_size=10)
await q.async_load()
await q.async_enqueue({"key": "value"})
snap = q.get_all()
snap[0]["params"]["key"] = "MUTATED"
snap2 = q.get_all()
assert snap2[0]["params"]["key"] == "value"
@pytest.mark.asyncio
async def test_concurrent_enqueue_and_clear_no_corruption() -> None:
backend = _MemBackend()
q = NotificationQueue(backend, max_size=DEFAULT_MAX_QUEUE_SIZE)
await q.async_load()
async def producer() -> None:
for i in range(50):
await q.async_enqueue({"i": i})
async def clearer() -> None:
for _ in range(10):
await asyncio.sleep(0)
await q.async_clear()
await asyncio.gather(producer(), clearer())
# No exceptions = no race-induced "dictionary changed size during iteration".
items = q.get_all()
assert isinstance(items, list)