web2api / core /api /auth.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
"""
API 与配置页鉴权。
- auth.api_key: 保护 /{type}/v1/*
- auth.config_secret: 保护 /config 与 /api/config、/api/types
全局鉴权设置优先级:数据库 > 环境变量回退 > YAML > 默认值。
config_secret 在文件模式下会回写为带前缀的 PBKDF2 哈希;环境变量回退模式下仅在内存中哈希。
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import os
import re
import secrets
import time
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Any, Literal
from fastapi import HTTPException, Request, status
from core.config.repository import (
APP_SETTING_AUTH_API_KEY,
APP_SETTING_AUTH_CONFIG_SECRET_HASH,
ConfigRepository,
)
from core.config.settings import (
get,
get_config_path,
has_env_override,
load_config,
reset_cache,
)
API_AUTH_REALM = "Bearer"
CONFIG_SECRET_PREFIX = "web2api_pbkdf2_sha256"
CONFIG_SECRET_ITERATIONS = 600_000
ADMIN_SESSION_COOKIE = "web2api_admin_session"
DEFAULT_ADMIN_SESSION_TTL_SECONDS = 7 * 24 * 60 * 60
DEFAULT_ADMIN_LOGIN_MAX_FAILURES = 5
DEFAULT_ADMIN_LOGIN_LOCK_SECONDS = 10 * 60
AuthSource = Literal["env", "db", "yaml", "default"]
@dataclass(frozen=True)
class EffectiveAuthSettings:
api_key_text: str
api_key_source: AuthSource
config_secret_hash: str
config_secret_source: AuthSource
@property
def api_keys(self) -> list[str]:
return parse_api_keys(self.api_key_text)
@property
def api_key_env_managed(self) -> bool:
return False
@property
def config_secret_env_managed(self) -> bool:
return False
@property
def config_login_enabled(self) -> bool:
return bool(self.config_secret_hash)
def parse_api_keys(raw: Any) -> list[str]:
if isinstance(raw, list):
return [str(item).strip() for item in raw if str(item).strip()]
if raw is None:
return []
text = str(raw).replace("\n", ",")
return [part.strip() for part in text.split(",") if part.strip()]
def normalize_api_key_text(raw: Any) -> str:
if isinstance(raw, list):
return "\n".join(str(item).strip() for item in raw if str(item).strip())
return str(raw or "").strip()
def _yaml_auth_config() -> dict[str, Any]:
auth_cfg = load_config().get("auth") or {}
return auth_cfg if isinstance(auth_cfg, dict) else {}
def _normalize_config_secret_hash(value: Any) -> str:
secret = str(value or "").strip()
if not secret:
return ""
return secret if _is_hashed_config_secret(secret) else hash_config_secret(secret)
@lru_cache(maxsize=1)
def _hosted_config_secret_hash() -> str:
return _normalize_config_secret_hash(get("auth", "config_secret", ""))
def build_effective_auth_settings(
repo: ConfigRepository | None = None,
) -> EffectiveAuthSettings:
stored = repo.load_app_settings() if repo is not None else {}
yaml_auth = _yaml_auth_config()
if APP_SETTING_AUTH_API_KEY in stored:
api_key_text = normalize_api_key_text(stored.get(APP_SETTING_AUTH_API_KEY, ""))
api_key_source: AuthSource = "db"
elif has_env_override("auth", "api_key"):
api_key_text = normalize_api_key_text(get("auth", "api_key", ""))
api_key_source = "env"
elif "api_key" in yaml_auth:
api_key_text = normalize_api_key_text(yaml_auth.get("api_key", ""))
api_key_source = "yaml"
else:
api_key_text = ""
api_key_source = "default"
if APP_SETTING_AUTH_CONFIG_SECRET_HASH in stored:
config_secret_hash = _normalize_config_secret_hash(
stored.get(APP_SETTING_AUTH_CONFIG_SECRET_HASH, "")
)
config_secret_source: AuthSource = "db"
elif has_env_override("auth", "config_secret"):
config_secret_hash = _hosted_config_secret_hash()
config_secret_source = "env"
elif "config_secret" in yaml_auth:
config_secret_hash = _normalize_config_secret_hash(yaml_auth.get("config_secret", ""))
config_secret_source = "yaml"
else:
config_secret_hash = ""
config_secret_source = "default"
return EffectiveAuthSettings(
api_key_text=api_key_text,
api_key_source=api_key_source,
config_secret_hash=config_secret_hash,
config_secret_source=config_secret_source,
)
def refresh_runtime_auth_settings(app: Any) -> EffectiveAuthSettings:
repo = getattr(app.state, "config_repo", None)
settings = build_effective_auth_settings(repo)
app.state.auth_settings = settings
return settings
def get_effective_auth_settings(request: Request | None = None) -> EffectiveAuthSettings:
if request is not None:
settings = getattr(request.app.state, "auth_settings", None)
if isinstance(settings, EffectiveAuthSettings):
return settings
repo = getattr(request.app.state, "config_repo", None)
return build_effective_auth_settings(repo)
return build_effective_auth_settings()
def configured_api_keys(repo: ConfigRepository | None = None) -> list[str]:
return build_effective_auth_settings(repo).api_keys
def _extract_request_api_key(request: Request) -> str:
key = (request.headers.get("x-api-key") or "").strip()
if key:
return key
authorization = (request.headers.get("authorization") or "").strip()
if authorization.lower().startswith("bearer "):
return authorization[7:].strip()
return ""
def require_api_key(request: Request) -> None:
expected_keys = get_effective_auth_settings(request).api_keys
if not expected_keys:
return
provided = _extract_request_api_key(request)
if provided:
for expected in expected_keys:
if secrets.compare_digest(provided, expected):
return
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized. Provide a valid API key.",
headers={"WWW-Authenticate": API_AUTH_REALM},
)
def _is_hashed_config_secret(value: str) -> bool:
return value.startswith(f"{CONFIG_SECRET_PREFIX}$")
def configured_config_secret_hash(repo: ConfigRepository | None = None) -> str:
return build_effective_auth_settings(repo).config_secret_hash
def config_login_enabled(request: Request | None = None) -> bool:
return get_effective_auth_settings(request).config_login_enabled
def configured_config_login_max_failures() -> int:
raw = get("auth", "config_login_max_failures", DEFAULT_ADMIN_LOGIN_MAX_FAILURES)
try:
return max(1, int(raw))
except Exception:
return DEFAULT_ADMIN_LOGIN_MAX_FAILURES
def configured_config_login_lock_seconds() -> int:
raw = get("auth", "config_login_lock_seconds", DEFAULT_ADMIN_LOGIN_LOCK_SECONDS)
try:
return max(1, int(raw))
except Exception:
return DEFAULT_ADMIN_LOGIN_LOCK_SECONDS
def hash_config_secret(secret: str) -> str:
salt = os.urandom(16)
digest = hashlib.pbkdf2_hmac(
"sha256",
secret.encode("utf-8"),
salt,
CONFIG_SECRET_ITERATIONS,
)
return (
f"{CONFIG_SECRET_PREFIX}"
f"${CONFIG_SECRET_ITERATIONS}"
f"${base64.urlsafe_b64encode(salt).decode('ascii')}"
f"${base64.urlsafe_b64encode(digest).decode('ascii')}"
)
def verify_config_secret(secret: str, encoded: str) -> bool:
try:
prefix, iterations_s, salt_b64, digest_b64 = encoded.split("$", 3)
except ValueError:
return False
if prefix != CONFIG_SECRET_PREFIX:
return False
try:
iterations = int(iterations_s)
salt = base64.urlsafe_b64decode(salt_b64.encode("ascii"))
expected = base64.urlsafe_b64decode(digest_b64.encode("ascii"))
except Exception:
return False
actual = hashlib.pbkdf2_hmac(
"sha256",
secret.encode("utf-8"),
salt,
iterations,
)
return hmac.compare_digest(actual, expected)
def ensure_config_secret_hashed(repo: ConfigRepository | None = None) -> None:
if has_env_override("auth", "config_secret"):
_hosted_config_secret_hash()
return
if repo is not None and repo.get_app_setting(APP_SETTING_AUTH_CONFIG_SECRET_HASH) is not None:
return
cfg = load_config()
auth_cfg = cfg.get("auth")
if not isinstance(auth_cfg, dict):
return
raw_value = auth_cfg.get("config_secret")
secret = str(raw_value or "").strip()
if not secret or _is_hashed_config_secret(secret):
return
encoded = hash_config_secret(secret)
config_path = get_config_path()
if not config_path.exists():
return
original = config_path.read_text(encoding="utf-8")
pattern = re.compile(r"^([ \t]*)config_secret\s*:\s*.*$", re.MULTILINE)
replacement = None
for line in original.splitlines():
match = pattern.match(line)
if match:
replacement = f"{match.group(1)}config_secret: '{encoded}'"
break
updated: str
if replacement is not None:
updated, count = pattern.subn(replacement, original, count=1)
if count != 1:
return
else:
auth_pattern = re.compile(r"^auth\s*:\s*$", re.MULTILINE)
match = auth_pattern.search(original)
if match:
insert_at = match.end()
updated = (
original[:insert_at]
+ "\n"
+ f" config_secret: '{encoded}'"
+ original[insert_at:]
)
else:
suffix = "" if original.endswith("\n") or not original else "\n"
updated = (
original
+ suffix
+ "auth:\n"
+ f" config_secret: '{encoded}'\n"
)
tmp_path = config_path.with_suffix(config_path.suffix + ".tmp")
tmp_path.write_text(updated, encoding="utf-8")
tmp_path.replace(config_path)
reset_cache()
load_config()
@dataclass
class AdminSessionStore:
ttl_seconds: int = DEFAULT_ADMIN_SESSION_TTL_SECONDS
_sessions: dict[str, float] = field(default_factory=dict)
def create(self) -> str:
token = secrets.token_urlsafe(32)
self._sessions[token] = time.time() + self.ttl_seconds
return token
def is_valid(self, token: str) -> bool:
if not token:
return False
self.cleanup()
expires_at = self._sessions.get(token)
if expires_at is None:
return False
if expires_at < time.time():
self._sessions.pop(token, None)
return False
return True
def revoke(self, token: str) -> None:
if token:
self._sessions.pop(token, None)
def cleanup(self) -> None:
now = time.time()
expired = [token for token, expires_at in self._sessions.items() if expires_at < now]
for token in expired:
self._sessions.pop(token, None)
@dataclass
class LoginAttemptState:
failures: int = 0
locked_until: float = 0.0
last_seen: float = 0.0
@dataclass
class AdminLoginAttemptStore:
max_failures: int = DEFAULT_ADMIN_LOGIN_MAX_FAILURES
lock_seconds: int = DEFAULT_ADMIN_LOGIN_LOCK_SECONDS
_attempts: dict[str, LoginAttemptState] = field(default_factory=dict)
def is_locked(self, client_ip: str) -> int:
self.cleanup()
state = self._attempts.get(client_ip)
if state is None:
return 0
remaining = int(state.locked_until - time.time())
if remaining <= 0:
return 0
return remaining
def record_failure(self, client_ip: str) -> int:
now = time.time()
state = self._attempts.setdefault(client_ip, LoginAttemptState())
if state.locked_until > now:
state.last_seen = now
return int(state.locked_until - now)
state.failures += 1
state.last_seen = now
if state.failures >= self.max_failures:
state.failures = 0
state.locked_until = now + self.lock_seconds
return self.lock_seconds
return 0
def record_success(self, client_ip: str) -> None:
self._attempts.pop(client_ip, None)
def cleanup(self) -> None:
now = time.time()
stale_before = now - max(self.lock_seconds * 2, 3600)
expired = [
ip
for ip, state in self._attempts.items()
if state.locked_until <= now and state.last_seen < stale_before
]
for ip in expired:
self._attempts.pop(ip, None)
def _admin_store(request: Request) -> AdminSessionStore:
store = getattr(request.app.state, "admin_sessions", None)
if store is None:
raise HTTPException(status_code=503, detail="Admin session store is unavailable")
return store
def _admin_login_attempt_store(request: Request) -> AdminLoginAttemptStore:
store = getattr(request.app.state, "admin_login_attempts", None)
if store is None:
raise HTTPException(status_code=503, detail="Login rate limiter is unavailable")
return store
def client_ip_of(request: Request) -> str:
client = getattr(request, "client", None)
host = getattr(client, "host", None)
return str(host or "unknown")
def check_admin_login_rate_limit(request: Request) -> None:
remaining = _admin_login_attempt_store(request).is_locked(client_ip_of(request))
if remaining > 0:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Too many failed login attempts. Try again in {remaining} seconds.",
)
def record_admin_login_failure(request: Request) -> int:
return _admin_login_attempt_store(request).record_failure(client_ip_of(request))
def record_admin_login_success(request: Request) -> None:
_admin_login_attempt_store(request).record_success(client_ip_of(request))
def admin_logged_in(request: Request) -> bool:
if not config_login_enabled(request):
return False
token = (request.cookies.get(ADMIN_SESSION_COOKIE) or "").strip()
return _admin_store(request).is_valid(token)
def require_config_login_enabled(request: Request | None = None) -> None:
if not config_login_enabled(request):
raise HTTPException(status_code=404, detail="Config dashboard is disabled")
def require_config_login(request: Request) -> None:
require_config_login_enabled(request)
if admin_logged_in(request):
return
raise HTTPException(status_code=401, detail="Please sign in to access the config dashboard")