Spaces:
Sleeping
Sleeping
File size: 17,302 Bytes
b687da5 0053c86 b687da5 85c8196 b687da5 504d810 b687da5 990adae b687da5 56d3b84 504d810 990adae b687da5 504d810 91e31f2 b687da5 990adae 504d810 990adae 0053c86 990adae fc1374e 990adae 0053c86 990adae b687da5 f5fea6c 990adae d86f2f1 504d810 b687da5 504d810 b687da5 85c8196 b687da5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
import os
import json
import uuid
import asyncio # <-- Added import
import time # Ensure time is imported for timestamps
from typing import List, Optional, Literal, Dict, Any, AsyncGenerator
import httpx
import dotenv
from fastapi import FastAPI, HTTPException, Request, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
# Load environment variables from .env file
dotenv.load_dotenv()
# --- Configuration ---
FLOWITH_AUTH_TOKEN = os.getenv("FLOWITH_AUTH_TOKEN")
FLOWITH_API_URL = "https://edge.flo.ing/completion?mode=general"
MODELS_FILE_PATH = "models.json"
API_KEY = os.getenv("API_KEY", "123456") # Read API Key
# --- Security Scheme ---
security = HTTPBearer()
if not FLOWITH_AUTH_TOKEN:
# In a real app, you might want to log this and exit,
# but for simplicity, we'll raise an error if accessed.
print("Warning: FLOWITH_AUTH_TOKEN environment variable not set.")
# raise ValueError("FLOWITH_AUTH_TOKEN environment variable is required.") # Or handle differently
# --- Load Model Mappings ---
try:
with open(MODELS_FILE_PATH, 'r') as f:
model_mappings = json.load(f)
except FileNotFoundError:
print(f"Error: Models file not found at {MODELS_FILE_PATH}")
model_mappings = {} # Or raise an error / exit
except json.JSONDecodeError:
print(f"Error: Invalid JSON in models file: {MODELS_FILE_PATH}")
model_mappings = {} # Or raise an error / exit
# --- Pydantic Models ---
class OpenAIMessage(BaseModel):
role: Literal["system", "user", "assistant"]
content: str
class OpenAIRequest(BaseModel):
model: str
messages: List[OpenAIMessage]
stream: Optional[bool] = False
# Add other potential OpenAI fields if needed, e.g., temperature, max_tokens
# temperature: Optional[float] = None
# max_tokens: Optional[int] = None
class FlowithMessage(BaseModel):
role: Literal["user", "assistant"] # Flowith uses 'user' and 'assistant'
content: str
class FlowithRequest(BaseModel):
model: str
messages: List[FlowithMessage]
stream: bool
nodeId: str # UUID for Flowith
# --- OpenAI Models Endpoint Models ---
class ModelCard(BaseModel):
id: str
object: str = "model"
# owned_by: str = "user" # Optional: Add other fields if needed
class ModelList(BaseModel):
object: str = "list"
data: List[ModelCard]
# --- FastAPI App ---
app = FastAPI(
title="OpenAI to Flowith Proxy",
description="Translates OpenAI-compatible chat completion requests to Flowith's format.",
)
# --- Helper for Streaming ---
async def stream_flowith_response(flowith_stream: httpx.Response) -> AsyncGenerator[str, None]:
"""Asynchronously streams the response from Flowith."""
async for chunk in flowith_stream.aiter_bytes():
# Assuming Flowith streams data in a format compatible with OpenAI's SSE
# If Flowith uses a different streaming format, this needs adjustment.
yield chunk.decode('utf-8') # Decode bytes to string
# --- Security Dependency ---
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify the provided API key against the environment variable."""
# Use constant time comparison to prevent timing attacks
# This requires Python 3.3+
import hmac
is_valid = hmac.compare_digest(credentials.credentials, API_KEY)
if not credentials or credentials.scheme != "Bearer" or not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API Key",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials # Return the key or True if successful
# --- API Endpoint ---
@app.post("/v1/chat/completions")
async def chat_completions(
request: OpenAIRequest,
http_request: Request,
api_key: str = Depends(verify_api_key) # Add API Key dependency
):
"""
Accepts OpenAI-like chat completion requests and forwards them to Flowith.
"""
if not FLOWITH_AUTH_TOKEN:
raise HTTPException(status_code=500, detail="Server configuration error: Flowith auth token not set.")
# 1. Map the model
flowith_model_name = model_mappings.get(request.model)
if not flowith_model_name:
raise HTTPException(
status_code=400,
detail=f"Model '{request.model}' not found in mappings. Available: {list(model_mappings.keys())}"
)
# 2. Generate nodeId
node_id = str(uuid.uuid4())
# 3. Process messages (Handle system prompt for specific models)
processed_messages: List[FlowithMessage] = []
# Check if the *target* Flowith model indicates Claude or Gemini
# Adjust this logic if the check should be based on the *source* OpenAI model name
is_claude_or_gemini = "claude" in flowith_model_name.lower() or "gemini" in flowith_model_name.lower()
for msg in request.messages:
original_role = msg.role.lower() # Get original role
new_role = original_role # Start with original role
# Check for Claude/Gemini system message conversion based on *requested* model
is_claude_or_gemini_requested = "claude" in request.model.lower() or "gemini" in request.model.lower()
if is_claude_or_gemini_requested and original_role == "system":
new_role = "user"
# Check for non-standard roles (applies AFTER potential system->user conversion for C/G)
elif original_role not in {"user", "assistant", "system"}:
new_role = "user"
print(f"Warning: Converting non-standard role '{original_role}' to 'user' for model {request.model}")
# Append message with the determined new_role, but only if it's valid for Flowith
# Flowith only accepts 'user' and 'assistant'
if new_role in ["user", "assistant"]:
processed_messages.append(FlowithMessage(role=new_role, content=msg.content))
# else: # Log or skip roles that are still 'system' after processing
# print(f"Skipping message with final role '{new_role}' as it's not 'user' or 'assistant'. Original role: '{original_role}'")
# 4. Construct Flowith Request Payload
flowith_payload = FlowithRequest(
model=flowith_model_name,
messages=processed_messages,
stream=True, # Always stream from Flowith
nodeId=node_id,
)
# 5. Prepare Headers for Flowith Request
# Headers exactly matching the curl -H flags provided
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
'content-type': 'application/json',
'responsetype': 'stream', # Restore this header
'origin': 'https://flowith.net',
'priority': 'u=1, i',
'referer': 'https://flowith.net/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
}
# 6. Make Asynchronous Request to Flowith
# Need time for simulated streaming chunks
import time
# Need JSONResponse
from fastapi.responses import JSONResponse
async with httpx.AsyncClient(timeout=600.0) as client:
try:
# Serialize payload manually
payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')
# Make a non-streaming POST request to Flowith
response = await client.post(
FLOWITH_API_URL,
content=payload_bytes,
headers=headers,
)
# Check status code after receiving the full response
if response.status_code != 200:
try:
error_detail = response.text # Use .text for non-streaming
detail_msg = f"Flowith API Error ({response.status_code}): {error_detail}"
except Exception:
detail_msg = f"Flowith API Error ({response.status_code})"
raise HTTPException(status_code=response.status_code, detail=detail_msg)
# Get the plain text response directly
flowith_text = response.text
# 7. Handle response based on *client's* request.stream preference
if not request.stream:
# Client wants non-streaming: Construct OpenAI-compatible JSON from plain text
completion_id = f"chatcmpl-{uuid.uuid4()}"
created_timestamp = int(time.time())
response_payload = {
"id": completion_id,
"object": "chat.completion",
"created": created_timestamp,
"model": request.model, # Use the model from the original request
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": flowith_text # Use the plain text here
},
"finish_reason": "stop" # Assume stop
}],
# "usage": {...} # Usage stats are typically not available/meaningful here
}
return JSONResponse(content=response_payload)
else:
# Client wants streaming: Implement keep-alive while fetching, then stream response
async def stream_generator():
response_ready_event = asyncio.Event()
fetch_task = None
flowith_text_local = None # Use a local variable to avoid confusion with outer scope
error_occurred = None
sse_model_name = request.model # Use the model requested by the client
# Coroutine to fetch data and signal completion
async def fetch_and_process(event):
nonlocal flowith_text_local, error_occurred
try:
# === Logic to prepare request (model_name, flowith_messages, etc.) ===
# These variables are captured from the outer scope:
# FLOWITH_API_URL, headers, flowith_payload
# The actual request is made here now, not before calling stream_generator
async with httpx.AsyncClient(timeout=300.0) as client:
payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8') # Use outer flowith_payload
response = await client.post(
FLOWITH_API_URL, headers=headers, content=payload_bytes # Use outer headers
)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
flowith_text_local = response.text # Store in local variable
except Exception as e:
# print(f"Error fetching from Flowith: {e}") # Optional debug
error_occurred = e # Store error to yield later
finally:
event.set() # Signal completion or failure
# Start fetching in the background
fetch_task = asyncio.create_task(fetch_and_process(response_ready_event))
try:
# Send keep-alive chunks while waiting
while not response_ready_event.is_set():
keep_alive_data = {"id": "chatcmpl-keepalive", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]}
yield f"data: {json.dumps(keep_alive_data)}\n\n"
# Wait for a short period or until the event is set
try:
await asyncio.wait_for(response_ready_event.wait(), timeout=3.0)
except asyncio.TimeoutError:
pass # Timeout means event not set, continue loop
# Event is set, check if fetch task had an error
if fetch_task.done() and fetch_task.exception():
# If fetch_task failed before setting event (shouldn't happen with finally, but check anyway)
error_occurred = fetch_task.exception()
if error_occurred:
# Yield an error chunk (optional, depends on desired behavior)
# print(f"Yielding error chunk: {error_occurred}") # Optional debug
error_content = f"Error processing request: {type(error_occurred).__name__}"
error_chunk = {"id": f"chatcmpl-error-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]} # Use finish_reason 'error' if possible
yield f"data: {json.dumps(error_chunk)}\n\n"
elif flowith_text_local is not None:
# Fetch succeeded, yield content chunks
chunk_id = f"chatcmpl-{uuid.uuid4()}"
chunk_size = 20
full_content = flowith_text_local
for i in range(0, len(full_content), chunk_size):
content_piece = full_content[i:i + chunk_size]
chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": sse_model_name,
"choices": [{"index": 0, "delta": {"content": content_piece}, "finish_reason": None}]
}
yield f"data: {json.dumps(chunk)}\n\n"
# Yield final chunk
final_chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": sse_model_name,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
except asyncio.CancelledError:
# print("Stream generator cancelled (client disconnected)") # Optional debug
raise # Re-raise cancellation
finally:
# Ensure fetch task is cancelled if generator exits early
if fetch_task and not fetch_task.done():
fetch_task.cancel()
# Send DONE message regardless of success/failure/cancellation
yield "data: [DONE]\n\n"
# The return statement remains the same
return StreamingResponse(stream_generator(), media_type="text/event-stream")
except httpx.RequestError as exc:
print(f"Error requesting Flowith: {exc}")
raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")
except HTTPException as http_exc:
# Re-raise HTTPExceptions (e.g., from status code check or JSON parsing)
raise http_exc
except Exception as exc:
print(f"Unexpected error during Flowith request/processing: {exc}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Internal server error: {exc}")
# --- Models Endpoint ---
@app.get("/v1/models", response_model=ModelList)
async def list_models(api_key: str = Depends(verify_api_key)): # Protect with existing auth
"""
Lists the available models based on the models.json mapping.
Follows the OpenAI API format.
"""
model_cards = [
ModelCard(id=model_id) for model_id in model_mappings.keys()
]
return ModelList(data=model_cards)
# --- Optional: Add a root endpoint for health check ---
@app.get("/")
async def root():
return {"message": "OpenAI to Flowith Proxy is running"}
# --- To run locally (for development) ---
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000) |