| from __future__ import annotations |
|
|
| from pathlib import Path |
| from threading import Event, Thread |
|
|
| from fastapi import HTTPException, Request |
|
|
| from services.account_service import account_service |
| from services.auth_service import auth_service |
| from services.config import config |
|
|
| BASE_DIR = Path(__file__).resolve().parents[1] |
| WEB_DIST_DIR = BASE_DIR / "web_dist" |
|
|
|
|
| def extract_bearer_token(authorization: str | None) -> str: |
| scheme, _, value = str(authorization or "").partition(" ") |
| if scheme.lower() != "bearer" or not value.strip(): |
| return "" |
| return value.strip() |
|
|
|
|
| def _legacy_admin_identity(token: str) -> dict[str, object] | None: |
| auth_key = str(config.auth_key or "").strip() |
| if auth_key and token == auth_key: |
| return {"id": "admin", "name": "管理员", "role": "admin"} |
| return None |
|
|
|
|
| def require_identity(authorization: str | None) -> dict[str, object]: |
| token = extract_bearer_token(authorization) |
| identity = _legacy_admin_identity(token) or auth_service.authenticate(token) |
| if identity is None: |
| raise HTTPException(status_code=401, detail={"error": "密钥无效或已失效,请重新登录"}) |
| return identity |
|
|
|
|
| def require_auth_key(authorization: str | None) -> None: |
| require_identity(authorization) |
|
|
|
|
| def require_admin(authorization: str | None) -> dict[str, object]: |
| identity = require_identity(authorization) |
| if identity.get("role") != "admin": |
| raise HTTPException(status_code=403, detail={"error": "需要管理员权限才能执行这个操作"}) |
| return identity |
|
|
|
|
| def resolve_image_base_url(request: Request) -> str: |
| return config.base_url or f"{request.url.scheme}://{request.headers.get('host', request.url.netloc)}" |
|
|
|
|
| def raise_image_quota_error(exc: Exception) -> None: |
| message = str(exc) |
| if "no available image quota" in message.lower(): |
| raise HTTPException(status_code=429, detail={"error": "no available image quota"}) from exc |
| raise HTTPException(status_code=502, detail={"error": message}) from exc |
|
|
|
|
| def sanitize_cpa_pool(pool: dict | None) -> dict | None: |
| if not isinstance(pool, dict): |
| return None |
| return {key: value for key, value in pool.items() if key != "secret_key"} |
|
|
|
|
| def sanitize_cpa_pools(pools: list[dict]) -> list[dict]: |
| return [sanitized for pool in pools if (sanitized := sanitize_cpa_pool(pool)) is not None] |
|
|
|
|
| def sanitize_sub2api_server(server: dict | None) -> dict | None: |
| if not isinstance(server, dict): |
| return None |
| sanitized = {key: value for key, value in server.items() if key not in {"password", "api_key"}} |
| sanitized["has_api_key"] = bool(str(server.get("api_key") or "").strip()) |
| return sanitized |
|
|
|
|
| def sanitize_sub2api_servers(servers: list[dict]) -> list[dict]: |
| return [sanitized for server in servers if (sanitized := sanitize_sub2api_server(server)) is not None] |
|
|
|
|
| def start_limited_account_watcher(stop_event: Event) -> Thread: |
| interval_seconds = config.refresh_account_interval_minute * 60 |
|
|
| def worker() -> None: |
| while not stop_event.is_set(): |
| try: |
| limited_tokens = account_service.list_limited_tokens() |
| if limited_tokens: |
| print(f"[account-limited-watcher] checking {len(limited_tokens)} limited accounts") |
| account_service.refresh_accounts(limited_tokens) |
| except Exception as exc: |
| print(f"[account-limited-watcher] fail {exc}") |
| stop_event.wait(interval_seconds) |
|
|
| thread = Thread(target=worker, name="limited-account-watcher", daemon=True) |
| thread.start() |
| return thread |
|
|
|
|
| def resolve_web_asset(requested_path: str) -> Path | None: |
| if not WEB_DIST_DIR.exists(): |
| return None |
| clean_path = requested_path.strip("/") |
| base_dir = WEB_DIST_DIR.resolve() |
| candidates = [base_dir / "index.html"] if not clean_path else [ |
| base_dir / Path(clean_path), |
| base_dir / clean_path / "index.html", |
| base_dir / f"{clean_path}.html", |
| ] |
| for candidate in candidates: |
| try: |
| candidate.resolve().relative_to(base_dir) |
| except ValueError: |
| continue |
| if candidate.is_file(): |
| return candidate |
| return None |
|
|