li / api /app.py
xiaolongmr
Add configurable branding and login sessions
519f85d
Raw
History Blame Contribute Delete
6.83 kB
from __future__ import annotations
from contextlib import asynccontextmanager
from html import escape
from mimetypes import guess_type
from pathlib import Path
import re
from threading import Event
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, Response
from api import accounts, ai, image_tasks, register, shop, system
from api.errors import install_exception_handlers
from api.support import resolve_web_asset, start_limited_account_watcher
from services.backup_service import backup_service
from services.config import config
STATIC_CACHE_CONTROL = "public, max-age=31536000, immutable"
ASSET_CACHE_CONTROL = "public, max-age=86400"
HTML_CACHE_CONTROL = "no-cache"
COMPRESSIBLE_SUFFIXES = {".css", ".html", ".js", ".json", ".map", ".svg", ".txt", ".xml"}
TITLE_RE = re.compile(r"<title>.*?</title>", re.IGNORECASE | re.DOTALL)
ICON_LINK_RE = re.compile(r"<link\b[^>]*\brel=[\"'][^\"']*(?:icon|apple-touch-icon)[^\"']*[\"'][^>]*>", re.IGNORECASE)
def _web_cache_headers(full_path: str, asset: Path) -> dict[str, str]:
clean_path = full_path.strip("/")
suffix = asset.suffix.lower()
headers: dict[str, str] = {}
if clean_path.startswith("_next/static/"):
headers["Cache-Control"] = STATIC_CACHE_CONTROL
elif suffix in {".html", ".txt"}:
headers["Cache-Control"] = HTML_CACHE_CONTROL
else:
headers["Cache-Control"] = ASSET_CACHE_CONTROL
return headers
def _select_web_asset(request: Request, full_path: str, asset: Path) -> tuple[Path, dict[str, str], str | None]:
headers = _web_cache_headers(full_path, asset)
media_type = guess_type(str(asset))[0]
accept_encoding = str(request.headers.get("accept-encoding") or "").lower()
gzip_asset = asset.with_name(f"{asset.name}.gz")
is_compressible = asset.suffix.lower() in COMPRESSIBLE_SUFFIXES
if is_compressible:
headers = dict(headers)
headers["Vary"] = "Accept-Encoding"
if "gzip" in accept_encoding and is_compressible and gzip_asset.is_file():
headers = dict(headers)
headers["Content-Encoding"] = "gzip"
return gzip_asset, headers, media_type
return asset, headers, media_type
def _is_api_like_path(full_path: str) -> bool:
clean_path = full_path.strip("/")
return clean_path.startswith(("api/", "auth/", "v1/"))
def _guess_icon_type(href: str) -> str:
clean_href = href.split("?", 1)[0].split("#", 1)[0].lower()
if clean_href.endswith(".svg"):
return "image/svg+xml"
if clean_href.endswith((".jpg", ".jpeg")):
return "image/jpeg"
if clean_href.endswith(".webp"):
return "image/webp"
if clean_href.endswith(".ico"):
return "image/x-icon"
return "image/png"
def _inject_branding_head(html_text: str) -> str:
branding = config.get_branding()
title = escape(str(branding.get("site_title") or "ChatGPT 号池管理"), quote=False)
raw_icon_href = str(branding.get("site_logo_url") or "/favicon.ico")
icon_href = escape(raw_icon_href, quote=True)
icon_type = escape(_guess_icon_type(raw_icon_href), quote=True)
icon_markup = (
f'<link rel="icon" href="{icon_href}" type="{icon_type}" data-branding-icon="favicon">'
f'<link rel="apple-touch-icon" href="{icon_href}" data-branding-icon="apple-touch-icon">'
)
next_html = TITLE_RE.sub(f"<title>{title}</title>", html_text, count=1)
if next_html == html_text and "</head>" in next_html:
next_html = next_html.replace("</head>", f"<title>{title}</title></head>", 1)
next_html = ICON_LINK_RE.sub("", next_html)
if "</head>" in next_html:
next_html = next_html.replace("</head>", f"{icon_markup}</head>", 1)
return next_html
def _web_response(full_path: str, request: Request, asset: Path) -> Response:
if asset.suffix.lower() == ".html":
html_text = asset.read_text(encoding="utf-8")
return HTMLResponse(
_inject_branding_head(html_text),
headers=_web_cache_headers(full_path, asset),
)
selected_asset, headers, media_type = _select_web_asset(request, full_path, asset)
return FileResponse(selected_asset, headers=headers, media_type=media_type)
def create_app() -> FastAPI:
app_version = config.app_version
@asynccontextmanager
async def lifespan(_: FastAPI):
stop_event = Event()
thread = start_limited_account_watcher(stop_event)
backup_service.start()
config.cleanup_old_images()
try:
yield
finally:
stop_event.set()
thread.join(timeout=1)
backup_service.stop()
app = FastAPI(title="chatgpt2api", version=app_version, lifespan=lifespan)
install_exception_handlers(app)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(ai.create_router())
app.include_router(accounts.create_router())
app.include_router(image_tasks.create_router())
app.include_router(register.create_router())
app.include_router(shop.create_router())
app.include_router(system.create_router(app_version))
@app.get("/{full_path:path}", include_in_schema=False)
async def serve_web(full_path: str, request: Request):
asset = resolve_web_asset(full_path)
if asset is not None:
return _web_response(full_path, request, asset)
if full_path.strip("/").startswith("_next/"):
raise HTTPException(status_code=404, detail="Not Found")
fallback = resolve_web_asset("")
if fallback is None:
raise HTTPException(status_code=404, detail="Not Found")
return _web_response("", request, fallback)
@app.head("/{full_path:path}", include_in_schema=False)
async def serve_web_head(full_path: str, request: Request):
if _is_api_like_path(full_path):
raise HTTPException(status_code=405, detail="Method Not Allowed")
asset = resolve_web_asset(full_path)
if asset is not None:
selected_asset, headers, media_type = _select_web_asset(request, full_path, asset)
return FileResponse(selected_asset, headers=headers, media_type=media_type)
if full_path.strip("/").startswith("_next/"):
raise HTTPException(status_code=404, detail="Not Found")
fallback = resolve_web_asset("")
if fallback is None:
raise HTTPException(status_code=404, detail="Not Found")
selected_asset, headers, media_type = _select_web_asset(request, "", fallback)
return FileResponse(selected_asset, headers=headers, media_type=media_type)
return app