| from __future__ import annotations |
|
|
| from contextlib import asynccontextmanager |
| from html import escape |
| from mimetypes import guess_type |
| from pathlib import Path |
| import re |
| from threading import Event |
|
|
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, HTMLResponse, Response |
|
|
| from api import accounts, ai, image_tasks, register, shop, system |
| from api.errors import install_exception_handlers |
| from api.support import resolve_web_asset, start_limited_account_watcher |
| from services.backup_service import backup_service |
| from services.config import config |
|
|
| STATIC_CACHE_CONTROL = "public, max-age=31536000, immutable" |
| ASSET_CACHE_CONTROL = "public, max-age=86400" |
| HTML_CACHE_CONTROL = "no-cache" |
| COMPRESSIBLE_SUFFIXES = {".css", ".html", ".js", ".json", ".map", ".svg", ".txt", ".xml"} |
| TITLE_RE = re.compile(r"<title>.*?</title>", re.IGNORECASE | re.DOTALL) |
| ICON_LINK_RE = re.compile(r"<link\b[^>]*\brel=[\"'][^\"']*(?:icon|apple-touch-icon)[^\"']*[\"'][^>]*>", re.IGNORECASE) |
|
|
|
|
| def _web_cache_headers(full_path: str, asset: Path) -> dict[str, str]: |
| clean_path = full_path.strip("/") |
| suffix = asset.suffix.lower() |
| headers: dict[str, str] = {} |
| if clean_path.startswith("_next/static/"): |
| headers["Cache-Control"] = STATIC_CACHE_CONTROL |
| elif suffix in {".html", ".txt"}: |
| headers["Cache-Control"] = HTML_CACHE_CONTROL |
| else: |
| headers["Cache-Control"] = ASSET_CACHE_CONTROL |
| return headers |
|
|
|
|
| def _select_web_asset(request: Request, full_path: str, asset: Path) -> tuple[Path, dict[str, str], str | None]: |
| headers = _web_cache_headers(full_path, asset) |
| media_type = guess_type(str(asset))[0] |
| accept_encoding = str(request.headers.get("accept-encoding") or "").lower() |
| gzip_asset = asset.with_name(f"{asset.name}.gz") |
| is_compressible = asset.suffix.lower() in COMPRESSIBLE_SUFFIXES |
| if is_compressible: |
| headers = dict(headers) |
| headers["Vary"] = "Accept-Encoding" |
| if "gzip" in accept_encoding and is_compressible and gzip_asset.is_file(): |
| headers = dict(headers) |
| headers["Content-Encoding"] = "gzip" |
| return gzip_asset, headers, media_type |
| return asset, headers, media_type |
|
|
|
|
| def _is_api_like_path(full_path: str) -> bool: |
| clean_path = full_path.strip("/") |
| return clean_path.startswith(("api/", "auth/", "v1/")) |
|
|
|
|
| def _guess_icon_type(href: str) -> str: |
| clean_href = href.split("?", 1)[0].split("#", 1)[0].lower() |
| if clean_href.endswith(".svg"): |
| return "image/svg+xml" |
| if clean_href.endswith((".jpg", ".jpeg")): |
| return "image/jpeg" |
| if clean_href.endswith(".webp"): |
| return "image/webp" |
| if clean_href.endswith(".ico"): |
| return "image/x-icon" |
| return "image/png" |
|
|
|
|
| def _inject_branding_head(html_text: str) -> str: |
| branding = config.get_branding() |
| title = escape(str(branding.get("site_title") or "ChatGPT 号池管理"), quote=False) |
| raw_icon_href = str(branding.get("site_logo_url") or "/favicon.ico") |
| icon_href = escape(raw_icon_href, quote=True) |
| icon_type = escape(_guess_icon_type(raw_icon_href), quote=True) |
| icon_markup = ( |
| f'<link rel="icon" href="{icon_href}" type="{icon_type}" data-branding-icon="favicon">' |
| f'<link rel="apple-touch-icon" href="{icon_href}" data-branding-icon="apple-touch-icon">' |
| ) |
| next_html = TITLE_RE.sub(f"<title>{title}</title>", html_text, count=1) |
| if next_html == html_text and "</head>" in next_html: |
| next_html = next_html.replace("</head>", f"<title>{title}</title></head>", 1) |
| next_html = ICON_LINK_RE.sub("", next_html) |
| if "</head>" in next_html: |
| next_html = next_html.replace("</head>", f"{icon_markup}</head>", 1) |
| return next_html |
|
|
|
|
| def _web_response(full_path: str, request: Request, asset: Path) -> Response: |
| if asset.suffix.lower() == ".html": |
| html_text = asset.read_text(encoding="utf-8") |
| return HTMLResponse( |
| _inject_branding_head(html_text), |
| headers=_web_cache_headers(full_path, asset), |
| ) |
| selected_asset, headers, media_type = _select_web_asset(request, full_path, asset) |
| return FileResponse(selected_asset, headers=headers, media_type=media_type) |
|
|
|
|
| def create_app() -> FastAPI: |
| app_version = config.app_version |
|
|
| @asynccontextmanager |
| async def lifespan(_: FastAPI): |
| stop_event = Event() |
| thread = start_limited_account_watcher(stop_event) |
| backup_service.start() |
| config.cleanup_old_images() |
| try: |
| yield |
| finally: |
| stop_event.set() |
| thread.join(timeout=1) |
| backup_service.stop() |
|
|
| app = FastAPI(title="chatgpt2api", version=app_version, lifespan=lifespan) |
| install_exception_handlers(app) |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=False, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| app.include_router(ai.create_router()) |
| app.include_router(accounts.create_router()) |
| app.include_router(image_tasks.create_router()) |
| app.include_router(register.create_router()) |
| app.include_router(shop.create_router()) |
| app.include_router(system.create_router(app_version)) |
|
|
| @app.get("/{full_path:path}", include_in_schema=False) |
| async def serve_web(full_path: str, request: Request): |
| asset = resolve_web_asset(full_path) |
| if asset is not None: |
| return _web_response(full_path, request, asset) |
| if full_path.strip("/").startswith("_next/"): |
| raise HTTPException(status_code=404, detail="Not Found") |
| fallback = resolve_web_asset("") |
| if fallback is None: |
| raise HTTPException(status_code=404, detail="Not Found") |
| return _web_response("", request, fallback) |
|
|
| @app.head("/{full_path:path}", include_in_schema=False) |
| async def serve_web_head(full_path: str, request: Request): |
| if _is_api_like_path(full_path): |
| raise HTTPException(status_code=405, detail="Method Not Allowed") |
| asset = resolve_web_asset(full_path) |
| if asset is not None: |
| selected_asset, headers, media_type = _select_web_asset(request, full_path, asset) |
| return FileResponse(selected_asset, headers=headers, media_type=media_type) |
| if full_path.strip("/").startswith("_next/"): |
| raise HTTPException(status_code=404, detail="Not Found") |
| fallback = resolve_web_asset("") |
| if fallback is None: |
| raise HTTPException(status_code=404, detail="Not Found") |
| selected_asset, headers, media_type = _select_web_asset(request, "", fallback) |
| return FileResponse(selected_asset, headers=headers, media_type=media_type) |
|
|
| return app |
|
|