Spaces:
Paused
Paused
| from __future__ import annotations | |
| from pathlib import Path | |
| from threading import Event, Thread | |
| from fastapi import HTTPException, Request | |
| from services.account_service import account_service | |
| from services.auth_service import auth_service | |
| from services.config import config | |
| BASE_DIR = Path(__file__).resolve().parents[1] | |
| WEB_DIST_DIR = BASE_DIR / "web_dist" | |
| def extract_bearer_token(authorization: str | None) -> str: | |
| scheme, _, value = str(authorization or "").partition(" ") | |
| if scheme.lower() != "bearer" or not value.strip(): | |
| return "" | |
| return value.strip() | |
| def _legacy_admin_identity(token: str) -> dict[str, object] | None: | |
| auth_key = str(config.auth_key or "").strip() | |
| if auth_key and token == auth_key: | |
| return {"id": "admin", "name": "管理员", "role": "admin"} | |
| return None | |
| def require_identity(authorization: str | None) -> dict[str, object]: | |
| token = extract_bearer_token(authorization) | |
| identity = _legacy_admin_identity(token) or auth_service.authenticate(token) | |
| if identity is None: | |
| raise HTTPException(status_code=401, detail={"error": "密钥无效或已失效,请重新登录"}) | |
| return identity | |
| def require_auth_key(authorization: str | None) -> None: | |
| require_identity(authorization) | |
| def require_admin(authorization: str | None) -> dict[str, object]: | |
| identity = require_identity(authorization) | |
| if identity.get("role") != "admin": | |
| raise HTTPException(status_code=403, detail={"error": "需要管理员权限才能执行这个操作"}) | |
| return identity | |
| def resolve_image_base_url(request: Request) -> str: | |
| return config.base_url or f"{request.url.scheme}://{request.headers.get('host', request.url.netloc)}" | |
| def raise_image_quota_error(exc: Exception) -> None: | |
| message = str(exc) | |
| if "no available image quota" in message.lower(): | |
| raise HTTPException(status_code=429, detail={"error": "no available image quota"}) from exc | |
| raise HTTPException(status_code=502, detail={"error": message}) from exc | |
| def sanitize_cpa_pool(pool: dict | None) -> dict | None: | |
| if not isinstance(pool, dict): | |
| return None | |
| return {key: value for key, value in pool.items() if key != "secret_key"} | |
| def sanitize_cpa_pools(pools: list[dict]) -> list[dict]: | |
| return [sanitized for pool in pools if (sanitized := sanitize_cpa_pool(pool)) is not None] | |
| def sanitize_sub2api_server(server: dict | None) -> dict | None: | |
| if not isinstance(server, dict): | |
| return None | |
| sanitized = {key: value for key, value in server.items() if key not in {"password", "api_key"}} | |
| sanitized["has_api_key"] = bool(str(server.get("api_key") or "").strip()) | |
| return sanitized | |
| def sanitize_sub2api_servers(servers: list[dict]) -> list[dict]: | |
| return [sanitized for server in servers if (sanitized := sanitize_sub2api_server(server)) is not None] | |
| def start_limited_account_watcher(stop_event: Event) -> Thread: | |
| interval_seconds = config.refresh_account_interval_minute * 60 | |
| def worker() -> None: | |
| while not stop_event.is_set(): | |
| try: | |
| limited_tokens = account_service.list_limited_tokens() | |
| if limited_tokens: | |
| print(f"[account-limited-watcher] checking {len(limited_tokens)} limited accounts") | |
| account_service.refresh_accounts(limited_tokens) | |
| except Exception as exc: | |
| print(f"[account-limited-watcher] fail {exc}") | |
| stop_event.wait(interval_seconds) | |
| thread = Thread(target=worker, name="limited-account-watcher", daemon=True) | |
| thread.start() | |
| return thread | |
| def resolve_web_asset(requested_path: str) -> Path | None: | |
| if not WEB_DIST_DIR.exists(): | |
| return None | |
| clean_path = requested_path.strip("/") | |
| base_dir = WEB_DIST_DIR.resolve() | |
| candidates = [base_dir / "index.html"] if not clean_path else [ | |
| base_dir / Path(clean_path), | |
| base_dir / clean_path / "index.html", | |
| base_dir / f"{clean_path}.html", | |
| ] | |
| for candidate in candidates: | |
| try: | |
| candidate.resolve().relative_to(base_dir) | |
| except ValueError: | |
| continue | |
| if candidate.is_file(): | |
| return candidate | |
| return None | |