fix(utils): commit url_scheme + net_classify dependencies
The DDP commit (8f1140a) added imports of infer_http_scheme into
api/routes/devices.py but missed bringing in the module itself --
url_scheme.py and its net_classify.py dependency were in the working
tree as untracked files only. On a clean checkout the FastAPI app
fails to start with ModuleNotFoundError.
Caught by the pre-merge code review. The 1358 passing tests only
worked because the local working tree happens to have the files.
This commit adds:
- ledgrab.utils.url_scheme: infer_http_scheme() for LAN-vs-public WLED
URL scheme inference
- ledgrab.utils.net_classify: HostCategory enum + classify_ip() +
is_blocked_for_ssrf() + is_local_for_http_default() + is_loopback().
Single source of truth for IP categorisation used by safe_source
(SSRF), url_scheme (LAN), and auth (loopback exemption).
- 107 unit tests (test_url_scheme.py + test_net_classify.py).
net_classify.is_blocked_for_ssrf is the primitive the device-driver
validate_device methods will use in the next commit to close HIGH #4
from the review.
This commit is contained in:
@@ -0,0 +1,136 @@
|
||||
"""Single source of truth for IP / hostname categorisation.
|
||||
|
||||
Three modules used to encode their own copy of "is this address local"
|
||||
logic with subtly different policies:
|
||||
|
||||
* :mod:`ledgrab.utils.safe_source` — *blocks* private/loopback/link-local/
|
||||
reserved/multicast hosts for SSRF protection.
|
||||
* :mod:`ledgrab.utils.url_scheme` — *prefers* ``http://`` for the same
|
||||
set (private/loopback/link-local) when inferring schemes for WLED.
|
||||
* :mod:`ledgrab.api.auth` — *exempts* loopback hosts from API-key
|
||||
enforcement.
|
||||
|
||||
When those three implementations drift, the resulting inconsistencies are
|
||||
security-relevant (a host the SSRF guard treats as public could be the
|
||||
same host the auth gate treats as loopback). Centralise the IP-category
|
||||
predicate here; the callers express their *policy* on top of the shared
|
||||
enum so future audits only need to look in one place.
|
||||
|
||||
The module is intentionally tiny and pure — no I/O, no logging — so it
|
||||
can be imported anywhere without dragging dependencies in.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import ipaddress
|
||||
|
||||
|
||||
class HostCategory(str, enum.Enum):
|
||||
"""Mutually-exclusive categorisation of a host literal.
|
||||
|
||||
Use as ``str`` for cheap comparison / logging; downstream callers map
|
||||
the enum to their own policy (block, prefer-http, allow-anon, …).
|
||||
"""
|
||||
|
||||
LOOPBACK = "loopback" # 127.0.0.0/8, ::1
|
||||
PRIVATE = "private" # RFC1918 + ULA
|
||||
LINK_LOCAL = "link_local" # 169.254.0.0/16, fe80::/10
|
||||
RESERVED = "reserved" # IANA-reserved ranges
|
||||
MULTICAST = "multicast" # 224.0.0.0/4, ff00::/8
|
||||
UNSPECIFIED = "unspecified" # 0.0.0.0, ::
|
||||
PUBLIC = "public" # routable on the public internet
|
||||
UNPARSEABLE = "unparseable" # not a literal IP — caller decides
|
||||
|
||||
|
||||
def classify_ip(host: str) -> HostCategory:
|
||||
"""Categorise an IP literal *host*.
|
||||
|
||||
Falls back to :data:`HostCategory.UNPARSEABLE` for anything that isn't
|
||||
a literal address (hostnames, bare labels, garbage). The category is
|
||||
derived strictly from Python's :mod:`ipaddress` module, so any future
|
||||
upstream change to the well-known-address registry propagates
|
||||
automatically.
|
||||
"""
|
||||
try:
|
||||
addr = ipaddress.ip_address(host)
|
||||
except ValueError:
|
||||
return HostCategory.UNPARSEABLE
|
||||
|
||||
# Order matters — the categories below are not strictly disjoint in
|
||||
# Python's predicates (e.g. ``127.0.0.1`` is both ``is_private`` and
|
||||
# ``is_loopback``), so we test the most specific labels first.
|
||||
if addr.is_loopback:
|
||||
return HostCategory.LOOPBACK
|
||||
if addr.is_link_local:
|
||||
return HostCategory.LINK_LOCAL
|
||||
if addr.is_unspecified:
|
||||
return HostCategory.UNSPECIFIED
|
||||
if addr.is_multicast:
|
||||
return HostCategory.MULTICAST
|
||||
if addr.is_private:
|
||||
return HostCategory.PRIVATE
|
||||
if addr.is_reserved:
|
||||
return HostCategory.RESERVED
|
||||
return HostCategory.PUBLIC
|
||||
|
||||
|
||||
_NON_PUBLIC_FOR_SSRF: frozenset[HostCategory] = frozenset(
|
||||
{
|
||||
HostCategory.LOOPBACK,
|
||||
HostCategory.PRIVATE,
|
||||
HostCategory.LINK_LOCAL,
|
||||
HostCategory.RESERVED,
|
||||
HostCategory.MULTICAST,
|
||||
HostCategory.UNSPECIFIED,
|
||||
HostCategory.UNPARSEABLE,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def is_blocked_for_ssrf(host: str) -> bool:
|
||||
"""Return True when *host* must NOT be reached from outbound fetches.
|
||||
|
||||
Used by :mod:`ledgrab.utils.safe_source` to enforce SSRF protection.
|
||||
The reverse policy lives in :func:`is_local_for_http_default`.
|
||||
"""
|
||||
return classify_ip(host) in _NON_PUBLIC_FOR_SSRF
|
||||
|
||||
|
||||
_LOCAL_FOR_HTTP_DEFAULT: frozenset[HostCategory] = frozenset(
|
||||
{
|
||||
HostCategory.LOOPBACK,
|
||||
HostCategory.PRIVATE,
|
||||
HostCategory.LINK_LOCAL,
|
||||
HostCategory.UNSPECIFIED,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def is_local_for_http_default(host: str) -> bool:
|
||||
"""Return True when *host* should default to ``http://`` for LAN devices.
|
||||
|
||||
Used by :mod:`ledgrab.utils.url_scheme`. Returns False on
|
||||
:data:`HostCategory.UNPARSEABLE` — callers (e.g. ``url_scheme``) layer
|
||||
their own mDNS / bare-label heuristics on top.
|
||||
"""
|
||||
cat = classify_ip(host)
|
||||
return cat in _LOCAL_FOR_HTTP_DEFAULT
|
||||
|
||||
|
||||
def is_loopback(host: str) -> bool:
|
||||
"""Return True when *host* is a loopback literal.
|
||||
|
||||
Used by :mod:`ledgrab.api.auth` for the "unauthenticated loopback"
|
||||
policy. Also accepts the textual placeholders ``localhost`` and
|
||||
Starlette's TestClient marker ``testclient``.
|
||||
"""
|
||||
if not host:
|
||||
return False
|
||||
h = host.strip().lower()
|
||||
if h.startswith("[") and h.endswith("]"):
|
||||
h = h[1:-1]
|
||||
h = h.split("%", 1)[0] # strip IPv6 zone id
|
||||
if h in {"localhost", "testclient"}:
|
||||
return True
|
||||
return classify_ip(h) is HostCategory.LOOPBACK
|
||||
@@ -0,0 +1,150 @@
|
||||
"""Helpers for inferring URL schemes from user-supplied device addresses.
|
||||
|
||||
When a user types a bare host (``192.168.1.42``, ``wled-desk.local``, or just
|
||||
``wled``) we need to pick an HTTP scheme on their behalf. Local-network
|
||||
targets are addressed over ``http://`` and public hostnames over ``https://``,
|
||||
matching how WLED is actually reachable in practice.
|
||||
|
||||
Intended for HTTP-style local devices (e.g. WLED). Do not call this helper
|
||||
for protocols that aren't HTTP — serial, OpenRGB, MQTT, etc. each carry
|
||||
their own scheme and would be silently passed through unchanged but the
|
||||
helper would still strip whitespace and react to user input in ways those
|
||||
callers don't want.
|
||||
|
||||
This module shares its host-classification surface with
|
||||
:mod:`ledgrab.utils.safe_source`, which applies the *opposite* policy
|
||||
(blocking private/loopback/link-local for SSRF protection). Keep the
|
||||
predicates in sync if either side is updated.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
import re
|
||||
|
||||
from ledgrab.utils.net_classify import is_local_for_http_default
|
||||
|
||||
|
||||
# RFC 3986 scheme grammar: ALPHA *( ALPHA / DIGIT / "+" / "-" / "." ) followed
|
||||
# by ":". Used to recognise URIs like ``data:...``, ``javascript:...``,
|
||||
# ``mailto:...`` that omit the ``//`` authority component — we must not
|
||||
# coerce those into ``http(s)://data:...``.
|
||||
_URI_SCHEME_RE = re.compile(r"^([A-Za-z][A-Za-z0-9+\-.]*):(.*)$")
|
||||
|
||||
|
||||
_LOCAL_HOSTNAME_SUFFIXES: tuple[str, ...] = (
|
||||
".local",
|
||||
".lan",
|
||||
".home",
|
||||
".internal",
|
||||
)
|
||||
|
||||
# Characters that must never appear inside a bare host (C0 + DEL control
|
||||
# characters, plus the userinfo separator ``@``). Anything containing these
|
||||
# is treated as unparseable so the downstream validator reports a clean
|
||||
# error instead of httpx silently chewing on a malformed string.
|
||||
_FORBIDDEN_HOST_CHARS: frozenset[str] = frozenset(chr(c) for c in range(0x00, 0x20)) | frozenset(
|
||||
{chr(0x7F), "@"}
|
||||
)
|
||||
|
||||
|
||||
def _extract_hostname(host_part: str) -> str:
|
||||
"""Pull just the hostname out of ``host[:port]`` (or ``[ipv6]:port``).
|
||||
|
||||
Returns the empty string when *host_part* contains forbidden characters
|
||||
so the caller can route the input untouched and let the next layer
|
||||
surface a precise validation error.
|
||||
"""
|
||||
if any(ch in _FORBIDDEN_HOST_CHARS for ch in host_part):
|
||||
return ""
|
||||
if host_part.startswith("["):
|
||||
end = host_part.find("]")
|
||||
if end != -1:
|
||||
return host_part[1:end]
|
||||
return host_part[1:]
|
||||
# Bracketless IPv6 literal (``fe80::1``) — has multiple ``:`` so the
|
||||
# ``host:port`` shortcut below would misparse it. Probe ipaddress first.
|
||||
try:
|
||||
ipaddress.ip_address(host_part)
|
||||
return host_part
|
||||
except ValueError:
|
||||
pass
|
||||
if host_part.count(":") == 1:
|
||||
return host_part.split(":", 1)[0]
|
||||
return host_part
|
||||
|
||||
|
||||
def _is_local_host(host: str) -> bool:
|
||||
"""Return True when *host* refers to a private/loopback/link-local target.
|
||||
|
||||
A dotless label (``wled``, ``kitchen-strip``) is treated as local because
|
||||
those map to mDNS / NetBIOS resolution on a LAN. This is a deliberately
|
||||
permissive heuristic appropriate for a LAN-only LED controller, not a
|
||||
general-purpose host classifier.
|
||||
|
||||
IP-literal classification is delegated to
|
||||
:func:`ledgrab.utils.net_classify.is_local_for_http_default` so this
|
||||
module can't drift away from the SSRF / auth predicates that share the
|
||||
same address taxonomy.
|
||||
"""
|
||||
host = host.strip().lower()
|
||||
if not host:
|
||||
return False
|
||||
if host == "localhost":
|
||||
return True
|
||||
if host.endswith(_LOCAL_HOSTNAME_SUFFIXES):
|
||||
return True
|
||||
try:
|
||||
ipaddress.ip_address(host)
|
||||
except ValueError:
|
||||
# Bare label with no dot ⇒ treat as a local mDNS-style hostname.
|
||||
return "." not in host
|
||||
return is_local_for_http_default(host)
|
||||
|
||||
|
||||
def infer_http_scheme(url: str | None) -> str | None:
|
||||
"""Return *url* with an inferred ``http(s)://`` scheme when none is present.
|
||||
|
||||
Local hosts (private/loopback/link-local IPs, ``localhost``, ``.local``
|
||||
style hostnames, and bare single-label names) get ``http://``; everything
|
||||
else gets ``https://``. Inputs that already carry a scheme — including
|
||||
non-HTTP ones like ``ws://`` or ``openrgb://`` — are returned unchanged
|
||||
apart from surrounding whitespace.
|
||||
|
||||
Callers may pass ``None`` (e.g. an absent ``url`` field on a partial
|
||||
``DeviceUpdate``); the value round-trips untouched so the request
|
||||
validator can complain with the right error.
|
||||
"""
|
||||
if not url:
|
||||
return url
|
||||
trimmed = url.strip()
|
||||
if not trimmed:
|
||||
return trimmed
|
||||
if trimmed.lower().startswith(("http://", "https://")):
|
||||
return trimmed
|
||||
# Any other explicit scheme with ``//`` authority: leave the caller in
|
||||
# control (ws://, openrgb://, file://, gopher://, …).
|
||||
if "://" in trimmed:
|
||||
return trimmed
|
||||
host_part, _, _rest = trimmed.partition("/")
|
||||
# Authority-less URI schemes (``data:``, ``javascript:``, ``mailto:``):
|
||||
# the part after the ``:`` is not a port number. We must NOT coerce
|
||||
# these into ``http://data:...`` — that would produce an attacker-
|
||||
# controlled fetch URL. ``localhost:8080`` / ``192.168.1.1:8080`` still
|
||||
# work because the second group is all digits (a port), not a path.
|
||||
# IPv6 literals (e.g. ``fe80::1``) syntactically resemble ``scheme:rest``
|
||||
# — probe ipaddress first to disambiguate before applying the regex.
|
||||
try:
|
||||
ipaddress.ip_address(host_part)
|
||||
except ValueError:
|
||||
scheme_match = _URI_SCHEME_RE.match(host_part)
|
||||
if scheme_match and not scheme_match.group(2).isdigit():
|
||||
return trimmed
|
||||
hostname = _extract_hostname(host_part)
|
||||
if not hostname:
|
||||
# Forbidden characters or a host we can't safely classify — return
|
||||
# the input unchanged so the downstream validator can surface a
|
||||
# precise error rather than guessing a scheme.
|
||||
return trimmed
|
||||
scheme = "http" if _is_local_host(hostname) else "https"
|
||||
return f"{scheme}://{trimmed}"
|
||||
Reference in New Issue
Block a user