proxy / app /main.py
kaka25zai's picture
Upload 3 files
0174493 verified
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, Response
import httpx # 使用 httpx 进行异步请求,或者也可以用 requests
import io
# Civitai API 基础 URL
CIVITAI_API_BASE_URL = "https://civitai.com/api/v1"
# 创建一个可复用的 HTTP 客户端 (推荐用于生产环境以优化性能)
client = httpx.AsyncClient(follow_redirects=True) # 启用自动跟踪重定向
@asynccontextmanager
async def lifespan(_: FastAPI):
# 启动时的代码
yield
# 关闭时的代码
await client.aclose()
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def read_root():
return {"message": "Civitai Proxy API is running!"}
# 保留一些扩展功能的代理端点
@app.get("/proxy/image")
async def get_civitai_image(image_url: str):
"""
获取图像的代理端点(保留兼容性)
"""
if not image_url or not image_url.startswith("https://image.civitai.com"):
raise HTTPException(status_code=400, detail="Invalid or missing image_url parameter. Must be a Civitai image URL.")
try:
print(f"Proxying image request to {image_url}")
response = await client.get(image_url, timeout=30.0) # 增加图片下载超时
response.raise_for_status()
# 获取内容类型
content_type = response.headers.get("content-type", "application/octet-stream")
# 将图片内容作为流式响应返回
return StreamingResponse(io.BytesIO(response.content), media_type=content_type)
except httpx.HTTPStatusError as exc:
print(f"Error proxying image request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying image: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai image server - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying image: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/civitai-image")
async def get_civitai_image_alt(image_url: str):
"""
获取图像的代理端点(兼容LoRA管理插件)
这是 /proxy/image 的别名端点,用于兼容不同的客户端
"""
if not image_url or not image_url.startswith("https://image.civitai.com"):
raise HTTPException(status_code=400, detail="Invalid or missing image_url parameter. Must be a Civitai image URL.")
try:
print(f"Proxying image request (alt endpoint) to {image_url}")
response = await client.get(image_url, timeout=30.0) # 增加图片下载超时
response.raise_for_status()
# 获取内容类型
content_type = response.headers.get("content-type", "application/octet-stream")
# 将图片内容作为流式响应返回
return StreamingResponse(io.BytesIO(response.content), media_type=content_type)
except httpx.HTTPStatusError as exc:
print(f"Error proxying image request (alt endpoint): {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying image (alt endpoint): {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai image server - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying image (alt endpoint): {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/proxy/download-url")
async def get_civitai_download_url(download_url: str):
"""
获取下载链接并处理重定向的代理端点(保留兼容性)
"""
if not download_url or not download_url.startswith("https://civitai.com"):
raise HTTPException(status_code=400, detail="Invalid or missing download_url parameter. Must be a Civitai download URL.")
try:
print(f"Resolving download URL: {download_url}")
async with httpx.AsyncClient(follow_redirects=False) as temp_client:
response = await temp_client.head(download_url, timeout=20.0)
if response.status_code in (301, 302, 303, 307, 308):
final_url = response.headers.get("location")
if not final_url:
raise HTTPException(status_code=500, detail="Redirect location header missing")
print(f"Redirected to: {final_url}")
return {"original_url": download_url, "final_url": final_url}
else:
return {"original_url": download_url, "final_url": download_url}
except httpx.HTTPStatusError as exc:
print(f"Error resolving download URL: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while resolving download URL: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while resolving download URL: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/proxy/download-by-version/{version_id}")
async def get_download_url_by_version(version_id: int, token: str = None):
"""
通过版本ID获取下载链接并处理重定向
"""
# 构建下载URL
download_url = f"https://civitai.com/api/download/models/{version_id}"
if token:
download_url = f"{download_url}?token={token}"
try:
print(f"Resolving download URL for version {version_id}: {download_url}")
async with httpx.AsyncClient(follow_redirects=False) as temp_client:
response = await temp_client.head(download_url, timeout=20.0)
if response.status_code in (301, 302, 303, 307, 308):
final_url = response.headers.get("location")
if not final_url:
raise HTTPException(status_code=500, detail="Redirect location header missing")
print(f"Redirected to: {final_url}")
return {"version_id": version_id, "original_url": download_url, "final_url": final_url}
else:
return {"version_id": version_id, "original_url": download_url, "final_url": download_url}
except httpx.HTTPStatusError as exc:
print(f"Error resolving download URL for version {version_id}: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while resolving download URL for version {version_id}: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while resolving download URL for version {version_id}: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/proxy/model-with-download/{model_id}")
async def get_civitai_model_with_download(model_id: int, model_version_id: int = None):
"""
获取模型信息并处理下载链接的代理端点(保留兼容性)
"""
if model_version_id:
target_url = f"{CIVITAI_API_BASE_URL}/model-versions/{model_version_id}"
else:
target_url = f"{CIVITAI_API_BASE_URL}/models/{model_id}"
try:
print(f"Fetching model info from: {target_url}")
response = await client.get(target_url, timeout=20.0)
response.raise_for_status()
model_data = response.json()
if "modelVersions" in model_data and model_data["modelVersions"]:
version = model_data["modelVersions"][0]
else:
version = model_data
if "files" in version and version["files"]:
processed_files = []
for file in version["files"]:
if "downloadUrl" in file:
download_url = file["downloadUrl"]
try:
async with httpx.AsyncClient(follow_redirects=False) as temp_client:
head_response = await temp_client.head(download_url, timeout=20.0)
if head_response.status_code in (301, 302, 303, 307, 308):
final_url = head_response.headers.get("location")
if final_url:
file["resolvedDownloadUrl"] = final_url
print(f"Resolved download URL for {file.get('name', 'unknown')}: {final_url}")
else:
file["resolvedDownloadUrl"] = download_url
except Exception as e:
print(f"Error resolving download URL for file {file.get('name', 'unknown')}: {str(e)}")
file["resolvedDownloadUrl"] = download_url
file["resolutionError"] = str(e)
processed_files.append(file)
if "modelVersions" in model_data:
model_data["modelVersions"][0]["files"] = processed_files
else:
model_data["files"] = processed_files
return model_data
except httpx.HTTPStatusError as exc:
print(f"Error fetching model info: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while fetching model info: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while fetching model info: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
# 标准 Civitai API v1 端点
@app.get("/api/v1/creators")
async def get_creators(
limit: int = 20,
page: int = 1,
query: str = None
):
"""获取创作者列表"""
target_url = f"{CIVITAI_API_BASE_URL}/creators"
params = {"limit": limit, "page": page}
if query:
params["query"] = query
try:
print(f"Proxying creators request to {target_url} with params {params}")
response = await client.get(target_url, params=params, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying creators request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying creators: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying creators: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/api/v1/images")
async def get_images(
limit: int = 100,
postId: int = None,
modelId: int = None,
modelVersionId: int = None,
username: str = None,
nsfw: str = None,
sort: str = None,
period: str = None,
page: int = None
):
"""获取图像列表"""
target_url = f"{CIVITAI_API_BASE_URL}/images"
params = {"limit": limit}
# 添加可选参数
if postId is not None:
params["postId"] = postId
if modelId is not None:
params["modelId"] = modelId
if modelVersionId is not None:
params["modelVersionId"] = modelVersionId
if username:
params["username"] = username
if nsfw:
params["nsfw"] = nsfw
if sort:
params["sort"] = sort
if period:
params["period"] = period
if page is not None:
params["page"] = page
try:
print(f"Proxying images request to {target_url} with params {params}")
response = await client.get(target_url, params=params, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying images request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying images: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying images: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/api/v1/models")
async def get_models(
limit: int = 100,
page: int = 1,
query: str = None,
tag: str = None,
username: str = None,
types: str = None,
sort: str = None,
period: str = None,
rating: int = None,
favorites: bool = None,
hidden: bool = None,
primaryFileOnly: bool = None,
allowNoCredit: bool = None,
allowDerivatives: bool = None,
allowDifferentLicenses: bool = None,
allowCommercialUse: str = None,
nsfw: bool = None,
supportsGeneration: bool = None,
cursor: str = None
):
"""获取模型列表"""
target_url = f"{CIVITAI_API_BASE_URL}/models"
params = {"limit": limit, "page": page}
# 添加可选参数
if query:
params["query"] = query
if tag:
params["tag"] = tag
if username:
params["username"] = username
if types:
params["types"] = types
if sort:
params["sort"] = sort
if period:
params["period"] = period
if rating is not None:
params["rating"] = rating
if favorites is not None:
params["favorites"] = favorites
if hidden is not None:
params["hidden"] = hidden
if primaryFileOnly is not None:
params["primaryFileOnly"] = primaryFileOnly
if allowNoCredit is not None:
params["allowNoCredit"] = allowNoCredit
if allowDerivatives is not None:
params["allowDerivatives"] = allowDerivatives
if allowDifferentLicenses is not None:
params["allowDifferentLicenses"] = allowDifferentLicenses
if allowCommercialUse:
params["allowCommercialUse"] = allowCommercialUse
if nsfw is not None:
params["nsfw"] = nsfw
if supportsGeneration is not None:
params["supportsGeneration"] = supportsGeneration
if cursor:
params["cursor"] = cursor
try:
print(f"Proxying models request to {target_url} with params {params}")
response = await client.get(target_url, params=params, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying models request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying models: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying models: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/api/v1/models/{model_id}")
async def get_model(model_id: int):
"""获取单个模型详情"""
target_url = f"{CIVITAI_API_BASE_URL}/models/{model_id}"
try:
print(f"Proxying model request to {target_url}")
response = await client.get(target_url, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying model request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying model: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying model: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/api/v1/model-versions/{model_version_id}")
async def get_model_version(model_version_id: int):
"""获取模型版本详情"""
target_url = f"{CIVITAI_API_BASE_URL}/model-versions/{model_version_id}"
try:
print(f"Proxying model version request to {target_url}")
response = await client.get(target_url, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying model version request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying model version: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying model version: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/api/v1/model-versions/by-hash/{model_hash}")
async def get_model_version_by_hash(model_hash: str):
"""通过哈希获取模型版本"""
target_url = f"{CIVITAI_API_BASE_URL}/model-versions/by-hash/{model_hash}"
try:
print(f"Proxying model version by hash request to {target_url}")
response = await client.get(target_url, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying model version by hash request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying model version by hash: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying model version by hash: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
@app.get("/api/v1/tags")
async def get_tags(
limit: int = 20,
page: int = 1,
query: str = None
):
"""获取标签列表"""
target_url = f"{CIVITAI_API_BASE_URL}/tags"
params = {"limit": limit, "page": page}
if query:
params["query"] = query
try:
print(f"Proxying tags request to {target_url} with params {params}")
response = await client.get(target_url, params=params, timeout=20.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
print(f"Error proxying tags request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying tags: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying tags: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
# 下载端点
@app.get("/api/download/models/{model_version_id}")
async def download_model(model_version_id: int, type: str = None, format: str = None, token: str = None):
"""下载模型文件"""
target_url = f"{CIVITAI_API_BASE_URL}/download/models/{model_version_id}"
params = {}
# 添加可选参数
if type:
params["type"] = type
if format:
params["format"] = format
if token:
params["token"] = token
try:
print(f"Proxying download request to {target_url} with params {params}")
# 对于下载请求,我们需要流式传输响应
async with httpx.AsyncClient(follow_redirects=True) as download_client:
async with download_client.stream("GET", target_url, params=params, timeout=60.0) as response:
response.raise_for_status()
# 获取响应头
headers = dict(response.headers)
# 创建流式响应
async def generate():
async for chunk in response.aiter_bytes():
yield chunk
return StreamingResponse(
generate(),
status_code=response.status_code,
headers=headers,
media_type=headers.get("content-type", "application/octet-stream")
)
except httpx.HTTPStatusError as exc:
print(f"Error proxying download request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying download: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying download: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
# 通用代理端点:完全保持与Civitai API一致的路径
@app.api_route("/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
async def proxy_civitai_api(path: str, request: Request):
"""
通用代理端点,与原始Civitai API保持完全一致的路径。
参数:
path: Civitai API的路径部分
返回:
来自Civitai API的响应
"""
# 构建目标URL
target_url = f"{CIVITAI_API_BASE_URL}/{path}"
# 获取所有查询参数
params = dict(request.query_params)
# 获取请求体
body = None
if request.method in ["POST", "PUT", "PATCH"]:
try:
body = await request.json()
except:
try:
body = await request.form()
except:
body = await request.body()
# 获取请求头
headers = dict(request.headers)
# 移除一些不应转发的头部
headers_to_remove = ["host", "connection", "content-length", "content-md5", "content-type"]
for header in headers_to_remove:
if header in headers:
del headers[header]
try:
print(f"Proxying {request.method} request to {target_url} with params {params}")
# 发送请求到Civitai API
response = await client.request(
method=request.method,
url=target_url,
params=params,
headers=headers,
json=body if isinstance(body, (dict, list)) else None,
data=body if not isinstance(body, (dict, list)) else None,
timeout=30.0
)
# 获取响应内容
content = response.content
# 获取响应头
response_headers = dict(response.headers)
# 创建FastAPI响应
return Response(
content=content,
status_code=response.status_code,
headers=response_headers,
media_type=response_headers.get("content-type", "application/json")
)
except httpx.HTTPStatusError as exc:
print(f"Error proxying request: {exc.response.status_code} - {exc.response.text}")
raise HTTPException(status_code=exc.response.status_code, detail=exc.response.text)
except httpx.RequestError as exc:
print(f"RequestError while proxying: {exc}")
raise HTTPException(status_code=503, detail=f"Service Unavailable: Could not connect to Civitai - {str(exc)}")
except Exception as e:
print(f"Unexpected error while proxying: {e}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")