0eb899afb9
Notifications: - Add shared http_base, redact, and SSRF hardening modules - Refactor dispatcher, queue, receiver and per-provider clients (telegram, discord, email, matrix, ntfy, slack, webhook) to use the shared base, with bounded queue and redacted error logs - Tests for ssrf, redact, http_base, queue bounds, dispatcher aggregation, telegram media partition, email and matrix clients Frontend: - Settings: log level / log format selectors now use IconGridSelect with per-option icons and i18n descriptions - Minor providers page and entity-cache store updates Tooling: - Document code-review-graph MCP usage in CLAUDE.md - Ignore .code-review-graph/, register .mcp.json
74 lines
2.5 KiB
Python
74 lines
2.5 KiB
Python
"""SSRF hardening regression tests.
|
|
|
|
Covers cases the original guard missed: IPv4-mapped IPv6, CGNAT,
|
|
trailing-dot hostnames, IPv6 zone identifiers, and the safe-host repr
|
|
used in error messages.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from notify_bridge_core.notifications.ssrf import (
|
|
UnsafeURLError,
|
|
PinnedResolver,
|
|
avalidate_outbound_url_full,
|
|
validate_outbound_url,
|
|
)
|
|
|
|
|
|
class TestBlockedRanges:
|
|
@pytest.mark.parametrize(
|
|
"url",
|
|
[
|
|
"http://[::ffff:127.0.0.1]/", # IPv4-mapped IPv6 → loopback
|
|
"http://[::ffff:10.0.0.1]/", # IPv4-mapped IPv6 → RFC1918
|
|
"http://100.64.0.1/", # CGNAT (RFC 6598)
|
|
"http://0.0.0.0/", # unspecified
|
|
],
|
|
)
|
|
def test_rejects_extra_ranges(self, url: str) -> None:
|
|
with pytest.raises(UnsafeURLError):
|
|
validate_outbound_url(url)
|
|
|
|
|
|
class TestHostnameNormalization:
|
|
def test_strips_trailing_dot(self) -> None:
|
|
# ``localhost.`` should normalize to ``localhost`` and still resolve
|
|
# to the loopback address — and be blocked.
|
|
with pytest.raises(UnsafeURLError):
|
|
validate_outbound_url("http://localhost./")
|
|
|
|
def test_rejects_bad_scheme_uppercase(self) -> None:
|
|
with pytest.raises(UnsafeURLError):
|
|
validate_outbound_url("FILE:///etc/passwd")
|
|
|
|
|
|
class TestErrorMessages:
|
|
def test_error_does_not_leak_long_hosts(self) -> None:
|
|
with pytest.raises(UnsafeURLError) as ei:
|
|
validate_outbound_url("http://" + "a" * 1024 + ".invalid/")
|
|
# Truncated to 64 chars in the error string.
|
|
assert len(str(ei.value)) < 256
|
|
|
|
|
|
class TestPinnedResolverSync:
|
|
def test_pin_returns_pinned_ip(self) -> None:
|
|
resolver = PinnedResolver({"example.com": "93.184.216.34"})
|
|
# Just exercise the dict path — full resolve runs in async tests.
|
|
assert resolver._map["example.com"] == "93.184.216.34" # type: ignore[attr-defined]
|
|
|
|
|
|
class TestAsyncFullValidator:
|
|
@pytest.mark.asyncio
|
|
async def test_returns_resolved_ip(self) -> None:
|
|
# Literal IP — no DNS lookup; we still get back a ValidatedURL.
|
|
result = await avalidate_outbound_url_full("http://8.8.8.8/")
|
|
assert result.ip == "8.8.8.8"
|
|
assert result.host == "8.8.8.8"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rejects_blocked_literal(self) -> None:
|
|
with pytest.raises(UnsafeURLError):
|
|
await avalidate_outbound_url_full("http://[::ffff:127.0.0.1]/")
|