| from __future__ import annotations |
|
|
| import os |
| import secrets |
| from collections.abc import Iterable |
| from typing import Any |
|
|
| from fastapi import HTTPException, Request, status |
|
|
| SENSITIVE_ENV_SUFFIXES = ("KEY", "TOKEN", "SECRET", "PASSWORD", "PASS", "CREDENTIAL", "CREDENTIALS") |
|
|
|
|
| def authenticate(request: Request) -> None: |
| if not request.session.get("authenticated"): |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Login required") |
|
|
|
|
| def constant_time_equal(a: str, b: str) -> bool: |
| return secrets.compare_digest(a.encode("utf-8"), b.encode("utf-8")) |
|
|
|
|
| def known_secret_values(extra_values: Iterable[str | None] = ()) -> list[str]: |
| values: list[str] = [] |
| for key, value in os.environ.items(): |
| upper = key.upper() |
| if value and len(value) >= 4 and any(upper.endswith(suffix) or suffix in upper for suffix in SENSITIVE_ENV_SUFFIXES): |
| values.append(value) |
| for value in extra_values: |
| if value and len(value) >= 4: |
| values.append(value) |
| return sorted(set(values), key=len, reverse=True) |
|
|
|
|
| def redact_text(text: str | None, extra_values: Iterable[str | None] = ()) -> str: |
| if text is None: |
| return "" |
| safe = str(text) |
| for value in known_secret_values(extra_values): |
| safe = safe.replace(value, "[REDACTED]") |
| return safe |
|
|
|
|
| def redact_obj(value: Any) -> Any: |
| if isinstance(value, str): |
| return redact_text(value) |
| if isinstance(value, list): |
| return [redact_obj(item) for item in value] |
| if isinstance(value, dict): |
| return {key: redact_obj(item) for key, item in value.items()} |
| return value |
|
|