ppt-web / src /landppt /auth /request_context.py
26fwyzpz6f-max
Clean deploy without binary files
6aecb2e
Raw
History Blame Contribute Delete
4.49 kB
"""
Request-scoped context for enforcing per-user data isolation.
This module intentionally contains no FastAPI/DB imports to avoid circular dependencies.
"""
from __future__ import annotations
from contextvars import ContextVar
from typing import Optional
from urllib.parse import urlsplit
# Sentinel value: pass this as user_id to explicitly disable user scoping (admin/system paths).
USER_SCOPE_ALL = -1
# The current authenticated user's ID for the active request/task.
current_user_id: ContextVar[Optional[int]] = ContextVar("landppt_current_user_id", default=None)
# The current request's externally reachable base URL, when available.
current_base_url: ContextVar[Optional[str]] = ContextVar("landppt_current_base_url", default=None)
def _strip_default_port(host: str, scheme: str) -> str:
"""Strip default port from a host string."""
raw_host = (host or "").strip()
normalized_scheme = (scheme or "").strip().lower()
if not raw_host:
return raw_host
try:
parsed = urlsplit(f"{normalized_scheme or 'http'}://{raw_host}")
hostname = parsed.hostname or raw_host
port = parsed.port
if port is None:
return raw_host
if (normalized_scheme == "http" and port == 80) or (normalized_scheme == "https" and port == 443):
if ":" in hostname and not hostname.startswith("["):
return f"[{hostname}]"
return hostname
except Exception:
return raw_host
return raw_host
def _normalize_base_url_candidate(raw_url: Optional[str], default_scheme: str = "https") -> Optional[str]:
"""Normalize a request-derived base URL candidate to scheme://host[:port]."""
value = (str(raw_url).strip() if raw_url is not None else "")
if not value:
return None
if value.startswith("//"):
value = f"{default_scheme}:{value}"
elif "://" not in value:
value = f"{default_scheme}://{value.lstrip('/')}"
parsed = urlsplit(value)
scheme = (parsed.scheme or default_scheme or "https").strip().lower()
host = (parsed.netloc or "").strip()
if not host:
return None
if "," in host:
host = host.split(",", 1)[0].strip()
host = _strip_default_port(host, scheme)
if not host:
return None
return f"{scheme}://{host}"
def resolve_request_base_url(request) -> Optional[str]:
"""
Resolve the externally reachable base URL for the active request.
Uses proxy/origin headers first so reverse-proxy deployments do not fall back
to internal container hosts.
"""
if request is None:
return None
candidates = []
def add_candidate(raw_url: Optional[str], *, default_scheme: str = "https") -> None:
normalized = _normalize_base_url_candidate(raw_url, default_scheme=default_scheme)
if normalized and normalized not in candidates:
candidates.append(normalized)
try:
headers = request.headers
request_scheme = (getattr(request.url, "scheme", None) or "https").strip().lower()
add_candidate(headers.get("origin"), default_scheme=request_scheme)
referer = headers.get("referer")
if referer:
try:
referer_parts = urlsplit(referer)
add_candidate(f"{referer_parts.scheme}://{referer_parts.netloc}", default_scheme=request_scheme)
except Exception:
pass
forwarded_host = (headers.get("x-forwarded-host") or "").strip()
forwarded_proto = (headers.get("x-forwarded-proto") or request_scheme).strip().lower()
forwarded_port = (headers.get("x-forwarded-port") or "").strip()
if "," in forwarded_host:
forwarded_host = forwarded_host.split(",", 1)[0].strip()
if "," in forwarded_proto:
forwarded_proto = forwarded_proto.split(",", 1)[0].strip()
if forwarded_host and forwarded_port and ":" not in forwarded_host:
forwarded_host = f"{forwarded_host}:{forwarded_port}"
add_candidate(forwarded_host, default_scheme=forwarded_proto or request_scheme)
host = (headers.get("host") or getattr(request.url, "netloc", "") or "").strip()
add_candidate(host, default_scheme=request_scheme)
if getattr(request, "base_url", None):
add_candidate(str(request.base_url), default_scheme=request_scheme)
except Exception:
return None
return candidates[0] if candidates else None