| """ |
| Request-scoped context for enforcing per-user data isolation. |
| |
| This module intentionally contains no FastAPI/DB imports to avoid circular dependencies. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from contextvars import ContextVar |
| from typing import Optional |
| from urllib.parse import urlsplit |
|
|
| |
| USER_SCOPE_ALL = -1 |
|
|
| |
| current_user_id: ContextVar[Optional[int]] = ContextVar("landppt_current_user_id", default=None) |
|
|
| |
| current_base_url: ContextVar[Optional[str]] = ContextVar("landppt_current_base_url", default=None) |
|
|
|
|
| def _strip_default_port(host: str, scheme: str) -> str: |
| """Strip default port from a host string.""" |
| raw_host = (host or "").strip() |
| normalized_scheme = (scheme or "").strip().lower() |
| if not raw_host: |
| return raw_host |
|
|
| try: |
| parsed = urlsplit(f"{normalized_scheme or 'http'}://{raw_host}") |
| hostname = parsed.hostname or raw_host |
| port = parsed.port |
| if port is None: |
| return raw_host |
| if (normalized_scheme == "http" and port == 80) or (normalized_scheme == "https" and port == 443): |
| if ":" in hostname and not hostname.startswith("["): |
| return f"[{hostname}]" |
| return hostname |
| except Exception: |
| return raw_host |
|
|
| return raw_host |
|
|
|
|
| def _normalize_base_url_candidate(raw_url: Optional[str], default_scheme: str = "https") -> Optional[str]: |
| """Normalize a request-derived base URL candidate to scheme://host[:port].""" |
| value = (str(raw_url).strip() if raw_url is not None else "") |
| if not value: |
| return None |
|
|
| if value.startswith("//"): |
| value = f"{default_scheme}:{value}" |
| elif "://" not in value: |
| value = f"{default_scheme}://{value.lstrip('/')}" |
|
|
| parsed = urlsplit(value) |
| scheme = (parsed.scheme or default_scheme or "https").strip().lower() |
| host = (parsed.netloc or "").strip() |
| if not host: |
| return None |
|
|
| if "," in host: |
| host = host.split(",", 1)[0].strip() |
| host = _strip_default_port(host, scheme) |
| if not host: |
| return None |
| return f"{scheme}://{host}" |
|
|
|
|
| def resolve_request_base_url(request) -> Optional[str]: |
| """ |
| Resolve the externally reachable base URL for the active request. |
| |
| Uses proxy/origin headers first so reverse-proxy deployments do not fall back |
| to internal container hosts. |
| """ |
| if request is None: |
| return None |
|
|
| candidates = [] |
|
|
| def add_candidate(raw_url: Optional[str], *, default_scheme: str = "https") -> None: |
| normalized = _normalize_base_url_candidate(raw_url, default_scheme=default_scheme) |
| if normalized and normalized not in candidates: |
| candidates.append(normalized) |
|
|
| try: |
| headers = request.headers |
| request_scheme = (getattr(request.url, "scheme", None) or "https").strip().lower() |
|
|
| add_candidate(headers.get("origin"), default_scheme=request_scheme) |
|
|
| referer = headers.get("referer") |
| if referer: |
| try: |
| referer_parts = urlsplit(referer) |
| add_candidate(f"{referer_parts.scheme}://{referer_parts.netloc}", default_scheme=request_scheme) |
| except Exception: |
| pass |
|
|
| forwarded_host = (headers.get("x-forwarded-host") or "").strip() |
| forwarded_proto = (headers.get("x-forwarded-proto") or request_scheme).strip().lower() |
| forwarded_port = (headers.get("x-forwarded-port") or "").strip() |
| if "," in forwarded_host: |
| forwarded_host = forwarded_host.split(",", 1)[0].strip() |
| if "," in forwarded_proto: |
| forwarded_proto = forwarded_proto.split(",", 1)[0].strip() |
| if forwarded_host and forwarded_port and ":" not in forwarded_host: |
| forwarded_host = f"{forwarded_host}:{forwarded_port}" |
| add_candidate(forwarded_host, default_scheme=forwarded_proto or request_scheme) |
|
|
| host = (headers.get("host") or getattr(request.url, "netloc", "") or "").strip() |
| add_candidate(host, default_scheme=request_scheme) |
|
|
| if getattr(request, "base_url", None): |
| add_candidate(str(request.base_url), default_scheme=request_scheme) |
| except Exception: |
| return None |
|
|
| return candidates[0] if candidates else None |
|
|