| """ |
| API 与配置页鉴权。 |
| |
| - auth.api_key: 保护 /{type}/v1/* |
| - auth.config_secret: 保护 /config 与 /api/config、/api/types |
| |
| 全局鉴权设置优先级:数据库 > 环境变量回退 > YAML > 默认值。 |
| config_secret 在文件模式下会回写为带前缀的 PBKDF2 哈希;环境变量回退模式下仅在内存中哈希。 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import base64 |
| import hashlib |
| import hmac |
| import os |
| import re |
| import secrets |
| import time |
| from dataclasses import dataclass, field |
| from functools import lru_cache |
| from typing import Any, Literal |
|
|
| from fastapi import HTTPException, Request, status |
|
|
| from core.config.repository import ( |
| APP_SETTING_AUTH_API_KEY, |
| APP_SETTING_AUTH_CONFIG_SECRET_HASH, |
| ConfigRepository, |
| ) |
| from core.config.settings import ( |
| get, |
| get_config_path, |
| has_env_override, |
| load_config, |
| reset_cache, |
| ) |
|
|
| API_AUTH_REALM = "Bearer" |
| CONFIG_SECRET_PREFIX = "web2api_pbkdf2_sha256" |
| CONFIG_SECRET_ITERATIONS = 600_000 |
| ADMIN_SESSION_COOKIE = "web2api_admin_session" |
| DEFAULT_ADMIN_SESSION_TTL_SECONDS = 7 * 24 * 60 * 60 |
| DEFAULT_ADMIN_LOGIN_MAX_FAILURES = 5 |
| DEFAULT_ADMIN_LOGIN_LOCK_SECONDS = 10 * 60 |
| AuthSource = Literal["env", "db", "yaml", "default"] |
|
|
|
|
| @dataclass(frozen=True) |
| class EffectiveAuthSettings: |
| api_key_text: str |
| api_key_source: AuthSource |
| config_secret_hash: str |
| config_secret_source: AuthSource |
|
|
| @property |
| def api_keys(self) -> list[str]: |
| return parse_api_keys(self.api_key_text) |
|
|
| @property |
| def api_key_env_managed(self) -> bool: |
| return False |
|
|
| @property |
| def config_secret_env_managed(self) -> bool: |
| return False |
|
|
| @property |
| def config_login_enabled(self) -> bool: |
| return bool(self.config_secret_hash) |
|
|
|
|
| def parse_api_keys(raw: Any) -> list[str]: |
| if isinstance(raw, list): |
| return [str(item).strip() for item in raw if str(item).strip()] |
| if raw is None: |
| return [] |
| text = str(raw).replace("\n", ",") |
| return [part.strip() for part in text.split(",") if part.strip()] |
|
|
|
|
| def normalize_api_key_text(raw: Any) -> str: |
| if isinstance(raw, list): |
| return "\n".join(str(item).strip() for item in raw if str(item).strip()) |
| return str(raw or "").strip() |
|
|
|
|
| def _yaml_auth_config() -> dict[str, Any]: |
| auth_cfg = load_config().get("auth") or {} |
| return auth_cfg if isinstance(auth_cfg, dict) else {} |
|
|
|
|
| def _normalize_config_secret_hash(value: Any) -> str: |
| secret = str(value or "").strip() |
| if not secret: |
| return "" |
| return secret if _is_hashed_config_secret(secret) else hash_config_secret(secret) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _hosted_config_secret_hash() -> str: |
| return _normalize_config_secret_hash(get("auth", "config_secret", "")) |
|
|
|
|
| def build_effective_auth_settings( |
| repo: ConfigRepository | None = None, |
| ) -> EffectiveAuthSettings: |
| stored = repo.load_app_settings() if repo is not None else {} |
| yaml_auth = _yaml_auth_config() |
|
|
| if APP_SETTING_AUTH_API_KEY in stored: |
| api_key_text = normalize_api_key_text(stored.get(APP_SETTING_AUTH_API_KEY, "")) |
| api_key_source: AuthSource = "db" |
| elif has_env_override("auth", "api_key"): |
| api_key_text = normalize_api_key_text(get("auth", "api_key", "")) |
| api_key_source = "env" |
| elif "api_key" in yaml_auth: |
| api_key_text = normalize_api_key_text(yaml_auth.get("api_key", "")) |
| api_key_source = "yaml" |
| else: |
| api_key_text = "" |
| api_key_source = "default" |
|
|
| if APP_SETTING_AUTH_CONFIG_SECRET_HASH in stored: |
| config_secret_hash = _normalize_config_secret_hash( |
| stored.get(APP_SETTING_AUTH_CONFIG_SECRET_HASH, "") |
| ) |
| config_secret_source: AuthSource = "db" |
| elif has_env_override("auth", "config_secret"): |
| config_secret_hash = _hosted_config_secret_hash() |
| config_secret_source = "env" |
| elif "config_secret" in yaml_auth: |
| config_secret_hash = _normalize_config_secret_hash(yaml_auth.get("config_secret", "")) |
| config_secret_source = "yaml" |
| else: |
| config_secret_hash = "" |
| config_secret_source = "default" |
|
|
| return EffectiveAuthSettings( |
| api_key_text=api_key_text, |
| api_key_source=api_key_source, |
| config_secret_hash=config_secret_hash, |
| config_secret_source=config_secret_source, |
| ) |
|
|
|
|
| def refresh_runtime_auth_settings(app: Any) -> EffectiveAuthSettings: |
| repo = getattr(app.state, "config_repo", None) |
| settings = build_effective_auth_settings(repo) |
| app.state.auth_settings = settings |
| return settings |
|
|
|
|
| def get_effective_auth_settings(request: Request | None = None) -> EffectiveAuthSettings: |
| if request is not None: |
| settings = getattr(request.app.state, "auth_settings", None) |
| if isinstance(settings, EffectiveAuthSettings): |
| return settings |
| repo = getattr(request.app.state, "config_repo", None) |
| return build_effective_auth_settings(repo) |
| return build_effective_auth_settings() |
|
|
|
|
| def configured_api_keys(repo: ConfigRepository | None = None) -> list[str]: |
| return build_effective_auth_settings(repo).api_keys |
|
|
|
|
| def _extract_request_api_key(request: Request) -> str: |
| key = (request.headers.get("x-api-key") or "").strip() |
| if key: |
| return key |
| authorization = (request.headers.get("authorization") or "").strip() |
| if authorization.lower().startswith("bearer "): |
| return authorization[7:].strip() |
| return "" |
|
|
|
|
| def require_api_key(request: Request) -> None: |
| expected_keys = get_effective_auth_settings(request).api_keys |
| if not expected_keys: |
| return |
| provided = _extract_request_api_key(request) |
| if provided: |
| for expected in expected_keys: |
| if secrets.compare_digest(provided, expected): |
| return |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Unauthorized. Provide a valid API key.", |
| headers={"WWW-Authenticate": API_AUTH_REALM}, |
| ) |
|
|
|
|
| def _is_hashed_config_secret(value: str) -> bool: |
| return value.startswith(f"{CONFIG_SECRET_PREFIX}$") |
|
|
|
|
| def configured_config_secret_hash(repo: ConfigRepository | None = None) -> str: |
| return build_effective_auth_settings(repo).config_secret_hash |
|
|
|
|
| def config_login_enabled(request: Request | None = None) -> bool: |
| return get_effective_auth_settings(request).config_login_enabled |
|
|
|
|
| def configured_config_login_max_failures() -> int: |
| raw = get("auth", "config_login_max_failures", DEFAULT_ADMIN_LOGIN_MAX_FAILURES) |
| try: |
| return max(1, int(raw)) |
| except Exception: |
| return DEFAULT_ADMIN_LOGIN_MAX_FAILURES |
|
|
|
|
| def configured_config_login_lock_seconds() -> int: |
| raw = get("auth", "config_login_lock_seconds", DEFAULT_ADMIN_LOGIN_LOCK_SECONDS) |
| try: |
| return max(1, int(raw)) |
| except Exception: |
| return DEFAULT_ADMIN_LOGIN_LOCK_SECONDS |
|
|
|
|
| def hash_config_secret(secret: str) -> str: |
| salt = os.urandom(16) |
| digest = hashlib.pbkdf2_hmac( |
| "sha256", |
| secret.encode("utf-8"), |
| salt, |
| CONFIG_SECRET_ITERATIONS, |
| ) |
| return ( |
| f"{CONFIG_SECRET_PREFIX}" |
| f"${CONFIG_SECRET_ITERATIONS}" |
| f"${base64.urlsafe_b64encode(salt).decode('ascii')}" |
| f"${base64.urlsafe_b64encode(digest).decode('ascii')}" |
| ) |
|
|
|
|
| def verify_config_secret(secret: str, encoded: str) -> bool: |
| try: |
| prefix, iterations_s, salt_b64, digest_b64 = encoded.split("$", 3) |
| except ValueError: |
| return False |
| if prefix != CONFIG_SECRET_PREFIX: |
| return False |
| try: |
| iterations = int(iterations_s) |
| salt = base64.urlsafe_b64decode(salt_b64.encode("ascii")) |
| expected = base64.urlsafe_b64decode(digest_b64.encode("ascii")) |
| except Exception: |
| return False |
| actual = hashlib.pbkdf2_hmac( |
| "sha256", |
| secret.encode("utf-8"), |
| salt, |
| iterations, |
| ) |
| return hmac.compare_digest(actual, expected) |
|
|
|
|
| def ensure_config_secret_hashed(repo: ConfigRepository | None = None) -> None: |
| if has_env_override("auth", "config_secret"): |
| _hosted_config_secret_hash() |
| return |
| if repo is not None and repo.get_app_setting(APP_SETTING_AUTH_CONFIG_SECRET_HASH) is not None: |
| return |
| cfg = load_config() |
| auth_cfg = cfg.get("auth") |
| if not isinstance(auth_cfg, dict): |
| return |
| raw_value = auth_cfg.get("config_secret") |
| secret = str(raw_value or "").strip() |
| if not secret or _is_hashed_config_secret(secret): |
| return |
| encoded = hash_config_secret(secret) |
| config_path = get_config_path() |
| if not config_path.exists(): |
| return |
| original = config_path.read_text(encoding="utf-8") |
| pattern = re.compile(r"^([ \t]*)config_secret\s*:\s*.*$", re.MULTILINE) |
| replacement = None |
| for line in original.splitlines(): |
| match = pattern.match(line) |
| if match: |
| replacement = f"{match.group(1)}config_secret: '{encoded}'" |
| break |
| updated: str |
| if replacement is not None: |
| updated, count = pattern.subn(replacement, original, count=1) |
| if count != 1: |
| return |
| else: |
| auth_pattern = re.compile(r"^auth\s*:\s*$", re.MULTILINE) |
| match = auth_pattern.search(original) |
| if match: |
| insert_at = match.end() |
| updated = ( |
| original[:insert_at] |
| + "\n" |
| + f" config_secret: '{encoded}'" |
| + original[insert_at:] |
| ) |
| else: |
| suffix = "" if original.endswith("\n") or not original else "\n" |
| updated = ( |
| original |
| + suffix |
| + "auth:\n" |
| + f" config_secret: '{encoded}'\n" |
| ) |
| tmp_path = config_path.with_suffix(config_path.suffix + ".tmp") |
| tmp_path.write_text(updated, encoding="utf-8") |
| tmp_path.replace(config_path) |
| reset_cache() |
| load_config() |
|
|
|
|
| @dataclass |
| class AdminSessionStore: |
| ttl_seconds: int = DEFAULT_ADMIN_SESSION_TTL_SECONDS |
| _sessions: dict[str, float] = field(default_factory=dict) |
|
|
| def create(self) -> str: |
| token = secrets.token_urlsafe(32) |
| self._sessions[token] = time.time() + self.ttl_seconds |
| return token |
|
|
| def is_valid(self, token: str) -> bool: |
| if not token: |
| return False |
| self.cleanup() |
| expires_at = self._sessions.get(token) |
| if expires_at is None: |
| return False |
| if expires_at < time.time(): |
| self._sessions.pop(token, None) |
| return False |
| return True |
|
|
| def revoke(self, token: str) -> None: |
| if token: |
| self._sessions.pop(token, None) |
|
|
| def cleanup(self) -> None: |
| now = time.time() |
| expired = [token for token, expires_at in self._sessions.items() if expires_at < now] |
| for token in expired: |
| self._sessions.pop(token, None) |
|
|
|
|
| @dataclass |
| class LoginAttemptState: |
| failures: int = 0 |
| locked_until: float = 0.0 |
| last_seen: float = 0.0 |
|
|
|
|
| @dataclass |
| class AdminLoginAttemptStore: |
| max_failures: int = DEFAULT_ADMIN_LOGIN_MAX_FAILURES |
| lock_seconds: int = DEFAULT_ADMIN_LOGIN_LOCK_SECONDS |
| _attempts: dict[str, LoginAttemptState] = field(default_factory=dict) |
|
|
| def is_locked(self, client_ip: str) -> int: |
| self.cleanup() |
| state = self._attempts.get(client_ip) |
| if state is None: |
| return 0 |
| remaining = int(state.locked_until - time.time()) |
| if remaining <= 0: |
| return 0 |
| return remaining |
|
|
| def record_failure(self, client_ip: str) -> int: |
| now = time.time() |
| state = self._attempts.setdefault(client_ip, LoginAttemptState()) |
| if state.locked_until > now: |
| state.last_seen = now |
| return int(state.locked_until - now) |
| state.failures += 1 |
| state.last_seen = now |
| if state.failures >= self.max_failures: |
| state.failures = 0 |
| state.locked_until = now + self.lock_seconds |
| return self.lock_seconds |
| return 0 |
|
|
| def record_success(self, client_ip: str) -> None: |
| self._attempts.pop(client_ip, None) |
|
|
| def cleanup(self) -> None: |
| now = time.time() |
| stale_before = now - max(self.lock_seconds * 2, 3600) |
| expired = [ |
| ip |
| for ip, state in self._attempts.items() |
| if state.locked_until <= now and state.last_seen < stale_before |
| ] |
| for ip in expired: |
| self._attempts.pop(ip, None) |
|
|
|
|
| def _admin_store(request: Request) -> AdminSessionStore: |
| store = getattr(request.app.state, "admin_sessions", None) |
| if store is None: |
| raise HTTPException(status_code=503, detail="Admin session store is unavailable") |
| return store |
|
|
|
|
| def _admin_login_attempt_store(request: Request) -> AdminLoginAttemptStore: |
| store = getattr(request.app.state, "admin_login_attempts", None) |
| if store is None: |
| raise HTTPException(status_code=503, detail="Login rate limiter is unavailable") |
| return store |
|
|
|
|
| def client_ip_of(request: Request) -> str: |
| client = getattr(request, "client", None) |
| host = getattr(client, "host", None) |
| return str(host or "unknown") |
|
|
|
|
| def check_admin_login_rate_limit(request: Request) -> None: |
| remaining = _admin_login_attempt_store(request).is_locked(client_ip_of(request)) |
| if remaining > 0: |
| raise HTTPException( |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| detail=f"Too many failed login attempts. Try again in {remaining} seconds.", |
| ) |
|
|
|
|
| def record_admin_login_failure(request: Request) -> int: |
| return _admin_login_attempt_store(request).record_failure(client_ip_of(request)) |
|
|
|
|
| def record_admin_login_success(request: Request) -> None: |
| _admin_login_attempt_store(request).record_success(client_ip_of(request)) |
|
|
|
|
| def admin_logged_in(request: Request) -> bool: |
| if not config_login_enabled(request): |
| return False |
| token = (request.cookies.get(ADMIN_SESSION_COOKIE) or "").strip() |
| return _admin_store(request).is_valid(token) |
|
|
|
|
| def require_config_login_enabled(request: Request | None = None) -> None: |
| if not config_login_enabled(request): |
| raise HTTPException(status_code=404, detail="Config dashboard is disabled") |
|
|
|
|
| def require_config_login(request: Request) -> None: |
| require_config_login_enabled(request) |
| if admin_logged_in(request): |
| return |
| raise HTTPException(status_code=401, detail="Please sign in to access the config dashboard") |
|
|