chatgpt2api / api /support.py
tx1538's picture
Upload 179 files
9d7ddb9 verified
Raw
History Blame
4.26 kB
from __future__ import annotations
from pathlib import Path
from threading import Event, Thread
from fastapi import HTTPException, Request
from services.account_service import account_service
from services.auth_service import auth_service
from services.config import config
BASE_DIR = Path(__file__).resolve().parents[1]
WEB_DIST_DIR = BASE_DIR / "web_dist"
def extract_bearer_token(authorization: str | None) -> str:
scheme, _, value = str(authorization or "").partition(" ")
if scheme.lower() != "bearer" or not value.strip():
return ""
return value.strip()
def _legacy_admin_identity(token: str) -> dict[str, object] | None:
auth_key = str(config.auth_key or "").strip()
if auth_key and token == auth_key:
return {"id": "admin", "name": "管理员", "role": "admin"}
return None
def require_identity(authorization: str | None) -> dict[str, object]:
token = extract_bearer_token(authorization)
identity = _legacy_admin_identity(token) or auth_service.authenticate(token)
if identity is None:
raise HTTPException(status_code=401, detail={"error": "密钥无效或已失效,请重新登录"})
return identity
def require_auth_key(authorization: str | None) -> None:
require_identity(authorization)
def require_admin(authorization: str | None) -> dict[str, object]:
identity = require_identity(authorization)
if identity.get("role") != "admin":
raise HTTPException(status_code=403, detail={"error": "需要管理员权限才能执行这个操作"})
return identity
def resolve_image_base_url(request: Request) -> str:
return config.base_url or f"{request.url.scheme}://{request.headers.get('host', request.url.netloc)}"
def raise_image_quota_error(exc: Exception) -> None:
message = str(exc)
if "no available image quota" in message.lower():
raise HTTPException(status_code=429, detail={"error": "no available image quota"}) from exc
raise HTTPException(status_code=502, detail={"error": message}) from exc
def sanitize_cpa_pool(pool: dict | None) -> dict | None:
if not isinstance(pool, dict):
return None
return {key: value for key, value in pool.items() if key != "secret_key"}
def sanitize_cpa_pools(pools: list[dict]) -> list[dict]:
return [sanitized for pool in pools if (sanitized := sanitize_cpa_pool(pool)) is not None]
def sanitize_sub2api_server(server: dict | None) -> dict | None:
if not isinstance(server, dict):
return None
sanitized = {key: value for key, value in server.items() if key not in {"password", "api_key"}}
sanitized["has_api_key"] = bool(str(server.get("api_key") or "").strip())
return sanitized
def sanitize_sub2api_servers(servers: list[dict]) -> list[dict]:
return [sanitized for server in servers if (sanitized := sanitize_sub2api_server(server)) is not None]
def start_limited_account_watcher(stop_event: Event) -> Thread:
interval_seconds = config.refresh_account_interval_minute * 60
def worker() -> None:
while not stop_event.is_set():
try:
limited_tokens = account_service.list_limited_tokens()
if limited_tokens:
print(f"[account-limited-watcher] checking {len(limited_tokens)} limited accounts")
account_service.refresh_accounts(limited_tokens)
except Exception as exc:
print(f"[account-limited-watcher] fail {exc}")
stop_event.wait(interval_seconds)
thread = Thread(target=worker, name="limited-account-watcher", daemon=True)
thread.start()
return thread
def resolve_web_asset(requested_path: str) -> Path | None:
if not WEB_DIST_DIR.exists():
return None
clean_path = requested_path.strip("/")
base_dir = WEB_DIST_DIR.resolve()
candidates = [base_dir / "index.html"] if not clean_path else [
base_dir / Path(clean_path),
base_dir / clean_path / "index.html",
base_dir / f"{clean_path}.html",
]
for candidate in candidates:
try:
candidate.resolve().relative_to(base_dir)
except ValueError:
continue
if candidate.is_file():
return candidate
return None