from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse, Response import httpx # 使用 httpx 进行异步请求,或者也可以用 requests import io # Civitai API 基础 URL CIVITAI_API_BASE_URL = "https://civitai.com/api/v1" # 创建一个可复用的 HTTP 客户端 (推荐用于生产环境以优化性能) client = httpx.AsyncClient(follow_redirects=True) # 启用自动跟踪重定向 @asynccontextmanager async def lifespan(_: FastAPI): # 启动时的代码 yield # 关闭时的代码 await client.aclose() app = FastAPI(lifespan=lifespan) @app.get("/") async def read_root(): return {"message": "Civitai Proxy API is running!"} # 保留一些扩展功能的代理端点 @app.get("/proxy/image") async def get_civitai_image(image_url: str): """ 获取图像的代理端点(保留兼容性) """ if not image_url or not image_url.startswith("https://image.civitai.com"): raise HTTPException(status_code=400, detail="Invalid or missing image_url parameter. Must be a Civitai image URL.") try: print(f"Proxying image request to {image_url}") response = await client.get(image_url, timeout=30.0) # 增加图片下载超时 response.raise_for_status() # 获取内容类型 content_type = response.headers.get("content-type", "application/octet-stream") # 将图片内容作为流式响应返回 return StreamingResponse(io.BytesIO(response.content), media_type=content_type) except httpx.HTTPStatusError as exc: print(f"Error proxying image request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying image: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai image server - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying image: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/civitai-image") async def get_civitai_image_alt(image_url: str): """ 获取图像的代理端点(兼容LoRA管理插件) 这是 /proxy/image 的别名端点,用于兼容不同的客户端 """ if not image_url or not image_url.startswith("https://image.civitai.com"): raise HTTPException(status_code=400, detail="Invalid or missing image_url parameter. Must be a Civitai image URL.") try: print(f"Proxying image request (alt endpoint) to {image_url}") response = await client.get(image_url, timeout=30.0) # 增加图片下载超时 response.raise_for_status() # 获取内容类型 content_type = response.headers.get("content-type", "application/octet-stream") # 将图片内容作为流式响应返回 return StreamingResponse(io.BytesIO(response.content), media_type=content_type) except httpx.HTTPStatusError as exc: print(f"Error proxying image request (alt endpoint): {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying image (alt endpoint): {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai image server - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying image (alt endpoint): {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/proxy/download-url") async def get_civitai_download_url(download_url: str): """ 获取下载链接并处理重定向的代理端点(保留兼容性) """ if not download_url or not download_url.startswith("https://civitai.com"): raise HTTPException(status_code=400, detail="Invalid or missing download_url parameter. Must be a Civitai download URL.") try: print(f"Resolving download URL: {download_url}") async with httpx.AsyncClient(follow_redirects=False) as temp_client: response = await temp_client.head(download_url, timeout=20.0) if response.status_code in (301, 302, 303, 307, 308): final_url = response.headers.get("location") if not final_url: raise HTTPException(status_code=500, detail="Redirect location header missing") print(f"Redirected to: {final_url}") return {"original_url": download_url, "final_url": final_url} else: return {"original_url": download_url, "final_url": download_url} except httpx.HTTPStatusError as exc: print(f"Error resolving download URL: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while resolving download URL: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while resolving download URL: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/proxy/download-by-version/{version_id}") async def get_download_url_by_version(version_id: int, token: str = None): """ 通过版本ID获取下载链接并处理重定向 """ # 构建下载URL download_url = f"https://civitai.com/api/download/models/{version_id}" if token: download_url = f"{download_url}?token={token}" try: print(f"Resolving download URL for version {version_id}: {download_url}") async with httpx.AsyncClient(follow_redirects=False) as temp_client: response = await temp_client.head(download_url, timeout=20.0) if response.status_code in (301, 302, 303, 307, 308): final_url = response.headers.get("location") if not final_url: raise HTTPException(status_code=500, detail="Redirect location header missing") print(f"Redirected to: {final_url}") return {"version_id": version_id, "original_url": download_url, "final_url": final_url} else: return {"version_id": version_id, "original_url": download_url, "final_url": download_url} except httpx.HTTPStatusError as exc: print(f"Error resolving download URL for version {version_id}: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while resolving download URL for version {version_id}: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while resolving download URL for version {version_id}: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/proxy/model-with-download/{model_id}") async def get_civitai_model_with_download(model_id: int, model_version_id: int = None): """ 获取模型信息并处理下载链接的代理端点(保留兼容性) """ if model_version_id: target_url = f"{CIVITAI_API_BASE_URL}/model-versions/{model_version_id}" else: target_url = f"{CIVITAI_API_BASE_URL}/models/{model_id}" try: print(f"Fetching model info from: {target_url}") response = await client.get(target_url, timeout=20.0) response.raise_for_status() model_data = response.json() if "modelVersions" in model_data and model_data["modelVersions"]: version = model_data["modelVersions"][0] else: version = model_data if "files" in version and version["files"]: processed_files = [] for file in version["files"]: if "downloadUrl" in file: download_url = file["downloadUrl"] try: async with httpx.AsyncClient(follow_redirects=False) as temp_client: head_response = await temp_client.head(download_url, timeout=20.0) if head_response.status_code in (301, 302, 303, 307, 308): final_url = head_response.headers.get("location") if final_url: file["resolvedDownloadUrl"] = final_url print(f"Resolved download URL for {file.get('name', 'unknown')}: {final_url}") else: file["resolvedDownloadUrl"] = download_url except Exception as e: print(f"Error resolving download URL for file {file.get('name', 'unknown')}: {str(e)}") file["resolvedDownloadUrl"] = download_url file["resolutionError"] = str(e) processed_files.append(file) if "modelVersions" in model_data: model_data["modelVersions"][0]["files"] = processed_files else: model_data["files"] = processed_files return model_data except httpx.HTTPStatusError as exc: print(f"Error fetching model info: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while fetching model info: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while fetching model info: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") # 标准 Civitai API v1 端点 @app.get("/api/v1/creators") async def get_creators( limit: int = 20, page: int = 1, query: str = None ): """获取创作者列表""" target_url = f"{CIVITAI_API_BASE_URL}/creators" params = {"limit": limit, "page": page} if query: params["query"] = query try: print(f"Proxying creators request to {target_url} with params {params}") response = await client.get(target_url, params=params, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying creators request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying creators: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying creators: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/api/v1/images") async def get_images( limit: int = 100, postId: int = None, modelId: int = None, modelVersionId: int = None, username: str = None, nsfw: str = None, sort: str = None, period: str = None, page: int = None ): """获取图像列表""" target_url = f"{CIVITAI_API_BASE_URL}/images" params = {"limit": limit} # 添加可选参数 if postId is not None: params["postId"] = postId if modelId is not None: params["modelId"] = modelId if modelVersionId is not None: params["modelVersionId"] = modelVersionId if username: params["username"] = username if nsfw: params["nsfw"] = nsfw if sort: params["sort"] = sort if period: params["period"] = period if page is not None: params["page"] = page try: print(f"Proxying images request to {target_url} with params {params}") response = await client.get(target_url, params=params, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying images request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying images: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying images: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/api/v1/models") async def get_models( limit: int = 100, page: int = 1, query: str = None, tag: str = None, username: str = None, types: str = None, sort: str = None, period: str = None, rating: int = None, favorites: bool = None, hidden: bool = None, primaryFileOnly: bool = None, allowNoCredit: bool = None, allowDerivatives: bool = None, allowDifferentLicenses: bool = None, allowCommercialUse: str = None, nsfw: bool = None, supportsGeneration: bool = None, cursor: str = None ): """获取模型列表""" target_url = f"{CIVITAI_API_BASE_URL}/models" params = {"limit": limit, "page": page} # 添加可选参数 if query: params["query"] = query if tag: params["tag"] = tag if username: params["username"] = username if types: params["types"] = types if sort: params["sort"] = sort if period: params["period"] = period if rating is not None: params["rating"] = rating if favorites is not None: params["favorites"] = favorites if hidden is not None: params["hidden"] = hidden if primaryFileOnly is not None: params["primaryFileOnly"] = primaryFileOnly if allowNoCredit is not None: params["allowNoCredit"] = allowNoCredit if allowDerivatives is not None: params["allowDerivatives"] = allowDerivatives if allowDifferentLicenses is not None: params["allowDifferentLicenses"] = allowDifferentLicenses if allowCommercialUse: params["allowCommercialUse"] = allowCommercialUse if nsfw is not None: params["nsfw"] = nsfw if supportsGeneration is not None: params["supportsGeneration"] = supportsGeneration if cursor: params["cursor"] = cursor try: print(f"Proxying models request to {target_url} with params {params}") response = await client.get(target_url, params=params, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying models request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying models: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying models: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/api/v1/models/{model_id}") async def get_model(model_id: int): """获取单个模型详情""" target_url = f"{CIVITAI_API_BASE_URL}/models/{model_id}" try: print(f"Proxying model request to {target_url}") response = await client.get(target_url, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying model request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying model: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying model: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/api/v1/model-versions/{model_version_id}") async def get_model_version(model_version_id: int): """获取模型版本详情""" target_url = f"{CIVITAI_API_BASE_URL}/model-versions/{model_version_id}" try: print(f"Proxying model version request to {target_url}") response = await client.get(target_url, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying model version request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying model version: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying model version: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/api/v1/model-versions/by-hash/{model_hash}") async def get_model_version_by_hash(model_hash: str): """通过哈希获取模型版本""" target_url = f"{CIVITAI_API_BASE_URL}/model-versions/by-hash/{model_hash}" try: print(f"Proxying model version by hash request to {target_url}") response = await client.get(target_url, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying model version by hash request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying model version by hash: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying model version by hash: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @app.get("/api/v1/tags") async def get_tags( limit: int = 20, page: int = 1, query: str = None ): """获取标签列表""" target_url = f"{CIVITAI_API_BASE_URL}/tags" params = {"limit": limit, "page": page} if query: params["query"] = query try: print(f"Proxying tags request to {target_url} with params {params}") response = await client.get(target_url, params=params, timeout=20.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: print(f"Error proxying tags request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying tags: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying tags: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") # 下载端点 @app.get("/api/download/models/{model_version_id}") async def download_model(model_version_id: int, type: str = None, format: str = None, token: str = None): """下载模型文件""" target_url = f"{CIVITAI_API_BASE_URL}/download/models/{model_version_id}" params = {} # 添加可选参数 if type: params["type"] = type if format: params["format"] = format if token: params["token"] = token try: print(f"Proxying download request to {target_url} with params {params}") # 对于下载请求,我们需要流式传输响应 async with httpx.AsyncClient(follow_redirects=True) as download_client: async with download_client.stream("GET", target_url, params=params, timeout=60.0) as response: response.raise_for_status() # 获取响应头 headers = dict(response.headers) # 创建流式响应 async def generate(): async for chunk in response.aiter_bytes(): yield chunk return StreamingResponse( generate(), status_code=response.status_code, headers=headers, media_type=headers.get("content-type", "application/octet-stream") ) except httpx.HTTPStatusError as exc: print(f"Error proxying download request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying download: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying download: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") # 通用代理端点:完全保持与Civitai API一致的路径 @app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]) async def proxy_civitai_api(path: str, request: Request): """ 通用代理端点,与原始Civitai API保持完全一致的路径。 参数: path: Civitai API的路径部分 返回: 来自Civitai API的响应 """ # 构建目标URL target_url = f"{CIVITAI_API_BASE_URL}/{path}" # 获取所有查询参数 params = dict(request.query_params) # 获取请求体 body = None if request.method in ["POST", "PUT", "PATCH"]: try: body = await request.json() except: try: body = await request.form() except: body = await request.body() # 获取请求头 headers = dict(request.headers) # 移除一些不应转发的头部 headers_to_remove = ["host", "connection", "content-length", "content-md5", "content-type"] for header in headers_to_remove: if header in headers: del headers[header] try: print(f"Proxying {request.method} request to {target_url} with params {params}") # 发送请求到Civitai API response = await client.request( method=request.method, url=target_url, params=params, headers=headers, json=body if isinstance(body, (dict, list)) else None, data=body if not isinstance(body, (dict, list)) else None, timeout=30.0 ) # 获取响应内容 content = response.content # 获取响应头 response_headers = dict(response.headers) # 创建FastAPI响应 return Response( content=content, status_code=response.status_code, headers=response_headers, media_type=response_headers.get("content-type", "application/json") ) except httpx.HTTPStatusError as exc: print(f"Error proxying request: {exc.response.status_code} - {exc.response.text}") raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text) except httpx.RequestError as exc: print(f"RequestError while proxying: {exc}") raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}") except Exception as e: print(f"Unexpected error while proxying: {e}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")