from __future__ import annotations from contextlib import asynccontextmanager from html import escape from mimetypes import guess_type from pathlib import Path import re from threading import Event from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse, Response from api import accounts, ai, image_tasks, register, shop, system from api.errors import install_exception_handlers from api.support import resolve_web_asset, start_limited_account_watcher from services.backup_service import backup_service from services.config import config STATIC_CACHE_CONTROL = "public, max-age=31536000, immutable" ASSET_CACHE_CONTROL = "public, max-age=86400" HTML_CACHE_CONTROL = "no-cache" COMPRESSIBLE_SUFFIXES = {".css", ".html", ".js", ".json", ".map", ".svg", ".txt", ".xml"} TITLE_RE = re.compile(r".*?", re.IGNORECASE | re.DOTALL) ICON_LINK_RE = re.compile(r"]*\brel=[\"'][^\"']*(?:icon|apple-touch-icon)[^\"']*[\"'][^>]*>", re.IGNORECASE) def _web_cache_headers(full_path: str, asset: Path) -> dict[str, str]: clean_path = full_path.strip("/") suffix = asset.suffix.lower() headers: dict[str, str] = {} if clean_path.startswith("_next/static/"): headers["Cache-Control"] = STATIC_CACHE_CONTROL elif suffix in {".html", ".txt"}: headers["Cache-Control"] = HTML_CACHE_CONTROL else: headers["Cache-Control"] = ASSET_CACHE_CONTROL return headers def _select_web_asset(request: Request, full_path: str, asset: Path) -> tuple[Path, dict[str, str], str | None]: headers = _web_cache_headers(full_path, asset) media_type = guess_type(str(asset))[0] accept_encoding = str(request.headers.get("accept-encoding") or "").lower() gzip_asset = asset.with_name(f"{asset.name}.gz") is_compressible = asset.suffix.lower() in COMPRESSIBLE_SUFFIXES if is_compressible: headers = dict(headers) headers["Vary"] = "Accept-Encoding" if "gzip" in accept_encoding and is_compressible and gzip_asset.is_file(): headers = dict(headers) headers["Content-Encoding"] = "gzip" return gzip_asset, headers, media_type return asset, headers, media_type def _is_api_like_path(full_path: str) -> bool: clean_path = full_path.strip("/") return clean_path.startswith(("api/", "auth/", "v1/")) def _guess_icon_type(href: str) -> str: clean_href = href.split("?", 1)[0].split("#", 1)[0].lower() if clean_href.endswith(".svg"): return "image/svg+xml" if clean_href.endswith((".jpg", ".jpeg")): return "image/jpeg" if clean_href.endswith(".webp"): return "image/webp" if clean_href.endswith(".ico"): return "image/x-icon" return "image/png" def _inject_branding_head(html_text: str) -> str: branding = config.get_branding() title = escape(str(branding.get("site_title") or "ChatGPT 号池管理"), quote=False) raw_icon_href = str(branding.get("site_logo_url") or "/favicon.ico") icon_href = escape(raw_icon_href, quote=True) icon_type = escape(_guess_icon_type(raw_icon_href), quote=True) icon_markup = ( f'' f'' ) next_html = TITLE_RE.sub(f"{title}", html_text, count=1) if next_html == html_text and "" in next_html: next_html = next_html.replace("", f"{title}", 1) next_html = ICON_LINK_RE.sub("", next_html) if "" in next_html: next_html = next_html.replace("", f"{icon_markup}", 1) return next_html def _web_response(full_path: str, request: Request, asset: Path) -> Response: if asset.suffix.lower() == ".html": html_text = asset.read_text(encoding="utf-8") return HTMLResponse( _inject_branding_head(html_text), headers=_web_cache_headers(full_path, asset), ) selected_asset, headers, media_type = _select_web_asset(request, full_path, asset) return FileResponse(selected_asset, headers=headers, media_type=media_type) def create_app() -> FastAPI: app_version = config.app_version @asynccontextmanager async def lifespan(_: FastAPI): stop_event = Event() thread = start_limited_account_watcher(stop_event) backup_service.start() config.cleanup_old_images() try: yield finally: stop_event.set() thread.join(timeout=1) backup_service.stop() app = FastAPI(title="chatgpt2api", version=app_version, lifespan=lifespan) install_exception_handlers(app) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) app.include_router(ai.create_router()) app.include_router(accounts.create_router()) app.include_router(image_tasks.create_router()) app.include_router(register.create_router()) app.include_router(shop.create_router()) app.include_router(system.create_router(app_version)) @app.get("/{full_path:path}", include_in_schema=False) async def serve_web(full_path: str, request: Request): asset = resolve_web_asset(full_path) if asset is not None: return _web_response(full_path, request, asset) if full_path.strip("/").startswith("_next/"): raise HTTPException(status_code=404, detail="Not Found") fallback = resolve_web_asset("") if fallback is None: raise HTTPException(status_code=404, detail="Not Found") return _web_response("", request, fallback) @app.head("/{full_path:path}", include_in_schema=False) async def serve_web_head(full_path: str, request: Request): if _is_api_like_path(full_path): raise HTTPException(status_code=405, detail="Method Not Allowed") asset = resolve_web_asset(full_path) if asset is not None: selected_asset, headers, media_type = _select_web_asset(request, full_path, asset) return FileResponse(selected_asset, headers=headers, media_type=media_type) if full_path.strip("/").startswith("_next/"): raise HTTPException(status_code=404, detail="Not Found") fallback = resolve_web_asset("") if fallback is None: raise HTTPException(status_code=404, detail="Not Found") selected_asset, headers, media_type = _select_web_asset(request, "", fallback) return FileResponse(selected_asset, headers=headers, media_type=media_type) return app