File size: 17,302 Bytes
b687da5
 
 
0053c86
 
b687da5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85c8196
 
 
 
 
 
 
 
 
 
 
b687da5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
504d810
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b687da5
 
 
 
 
 
 
 
 
 
 
 
990adae
b687da5
56d3b84
504d810
990adae
b687da5
 
 
 
 
 
 
 
 
 
 
 
 
504d810
 
 
 
 
91e31f2
b687da5
990adae
 
 
 
 
 
 
 
 
 
 
 
504d810
990adae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0053c86
990adae
 
 
 
 
 
 
 
 
 
 
fc1374e
990adae
 
 
 
0053c86
990adae
 
b687da5
f5fea6c
990adae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d86f2f1
 
 
 
 
 
 
 
 
 
 
 
504d810
b687da5
504d810
 
 
 
b687da5
 
85c8196
 
 
 
 
 
 
 
 
 
 
 
 
b687da5
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
import os
import json
import uuid
import asyncio # <-- Added import
import time # Ensure time is imported for timestamps
from typing import List, Optional, Literal, Dict, Any, AsyncGenerator

import httpx
import dotenv
from fastapi import FastAPI, HTTPException, Request, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field

# Load environment variables from .env file
dotenv.load_dotenv()

# --- Configuration ---
FLOWITH_AUTH_TOKEN = os.getenv("FLOWITH_AUTH_TOKEN")
FLOWITH_API_URL = "https://edge.flo.ing/completion?mode=general"
MODELS_FILE_PATH = "models.json"
API_KEY = os.getenv("API_KEY", "123456") # Read API Key

# --- Security Scheme ---
security = HTTPBearer()

if not FLOWITH_AUTH_TOKEN:
    # In a real app, you might want to log this and exit,
    # but for simplicity, we'll raise an error if accessed.
    print("Warning: FLOWITH_AUTH_TOKEN environment variable not set.")
    # raise ValueError("FLOWITH_AUTH_TOKEN environment variable is required.") # Or handle differently

# --- Load Model Mappings ---
try:
    with open(MODELS_FILE_PATH, 'r') as f:
        model_mappings = json.load(f)
except FileNotFoundError:
    print(f"Error: Models file not found at {MODELS_FILE_PATH}")
    model_mappings = {} # Or raise an error / exit
except json.JSONDecodeError:
    print(f"Error: Invalid JSON in models file: {MODELS_FILE_PATH}")
    model_mappings = {} # Or raise an error / exit

# --- Pydantic Models ---
class OpenAIMessage(BaseModel):
    role: Literal["system", "user", "assistant"]
    content: str

class OpenAIRequest(BaseModel):
    model: str
    messages: List[OpenAIMessage]
    stream: Optional[bool] = False
    # Add other potential OpenAI fields if needed, e.g., temperature, max_tokens
    # temperature: Optional[float] = None
    # max_tokens: Optional[int] = None

class FlowithMessage(BaseModel):
    role: Literal["user", "assistant"] # Flowith uses 'user' and 'assistant'
    content: str

class FlowithRequest(BaseModel):
    model: str
    messages: List[FlowithMessage]
    stream: bool
    nodeId: str # UUID for Flowith


# --- OpenAI Models Endpoint Models ---
class ModelCard(BaseModel):
    id: str
    object: str = "model"
    # owned_by: str = "user" # Optional: Add other fields if needed

class ModelList(BaseModel):
    object: str = "list"
    data: List[ModelCard]

# --- FastAPI App ---
app = FastAPI(
    title="OpenAI to Flowith Proxy",
    description="Translates OpenAI-compatible chat completion requests to Flowith's format.",
)

# --- Helper for Streaming ---
async def stream_flowith_response(flowith_stream: httpx.Response) -> AsyncGenerator[str, None]:
    """Asynchronously streams the response from Flowith."""
    async for chunk in flowith_stream.aiter_bytes():
        # Assuming Flowith streams data in a format compatible with OpenAI's SSE
        # If Flowith uses a different streaming format, this needs adjustment.
        yield chunk.decode('utf-8') # Decode bytes to string

# --- Security Dependency ---
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verify the provided API key against the environment variable."""
    # Use constant time comparison to prevent timing attacks
    # This requires Python 3.3+
    import hmac
    is_valid = hmac.compare_digest(credentials.credentials, API_KEY)

    if not credentials or credentials.scheme != "Bearer" or not is_valid:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing API Key",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return credentials.credentials # Return the key or True if successful

# --- API Endpoint ---
@app.post("/v1/chat/completions")
async def chat_completions(
    request: OpenAIRequest,
    http_request: Request,
    api_key: str = Depends(verify_api_key) # Add API Key dependency
):
    """
    Accepts OpenAI-like chat completion requests and forwards them to Flowith.
    """
    if not FLOWITH_AUTH_TOKEN:
         raise HTTPException(status_code=500, detail="Server configuration error: Flowith auth token not set.")

    # 1. Map the model
    flowith_model_name = model_mappings.get(request.model)
    if not flowith_model_name:
        raise HTTPException(
            status_code=400,
            detail=f"Model '{request.model}' not found in mappings. Available: {list(model_mappings.keys())}"
        )

    # 2. Generate nodeId
    node_id = str(uuid.uuid4())

    # 3. Process messages (Handle system prompt for specific models)
    processed_messages: List[FlowithMessage] = []
    # Check if the *target* Flowith model indicates Claude or Gemini
    # Adjust this logic if the check should be based on the *source* OpenAI model name
    is_claude_or_gemini = "claude" in flowith_model_name.lower() or "gemini" in flowith_model_name.lower()

    for msg in request.messages:
        original_role = msg.role.lower() # Get original role
        new_role = original_role # Start with original role

        # Check for Claude/Gemini system message conversion based on *requested* model
        is_claude_or_gemini_requested = "claude" in request.model.lower() or "gemini" in request.model.lower()
        if is_claude_or_gemini_requested and original_role == "system":
            new_role = "user"
        # Check for non-standard roles (applies AFTER potential system->user conversion for C/G)
        elif original_role not in {"user", "assistant", "system"}:
             new_role = "user"
             print(f"Warning: Converting non-standard role '{original_role}' to 'user' for model {request.model}")

        # Append message with the determined new_role, but only if it's valid for Flowith
        # Flowith only accepts 'user' and 'assistant'
        if new_role in ["user", "assistant"]:
             processed_messages.append(FlowithMessage(role=new_role, content=msg.content))
        # else: # Log or skip roles that are still 'system' after processing
        #    print(f"Skipping message with final role '{new_role}' as it's not 'user' or 'assistant'. Original role: '{original_role}'")

    # 4. Construct Flowith Request Payload
    flowith_payload = FlowithRequest(
        model=flowith_model_name,
        messages=processed_messages,
        stream=True, # Always stream from Flowith
        nodeId=node_id,
    )

    # 5. Prepare Headers for Flowith Request
    # Headers exactly matching the curl -H flags provided
    headers = {
        'accept': '*/*',
        'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
        'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
        'content-type': 'application/json',
        'responsetype': 'stream', # Restore this header
        'origin': 'https://flowith.net',
        'priority': 'u=1, i',
        'referer': 'https://flowith.net/',
        'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"Windows"',
        'sec-fetch-dest': 'empty',
        'sec-fetch-mode': 'cors',
        'sec-fetch-site': 'cross-site',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
    }

    # 6. Make Asynchronous Request to Flowith
    # Need time for simulated streaming chunks
    import time
    # Need JSONResponse
    from fastapi.responses import JSONResponse

    async with httpx.AsyncClient(timeout=600.0) as client:
        try:
            # Serialize payload manually
            payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')

            # Make a non-streaming POST request to Flowith
            response = await client.post(
                FLOWITH_API_URL,
                content=payload_bytes,
                headers=headers,
            )

            # Check status code after receiving the full response
            if response.status_code != 200:
                try:
                    error_detail = response.text # Use .text for non-streaming
                    detail_msg = f"Flowith API Error ({response.status_code}): {error_detail}"
                except Exception:
                    detail_msg = f"Flowith API Error ({response.status_code})"
                raise HTTPException(status_code=response.status_code, detail=detail_msg)

            # Get the plain text response directly
            flowith_text = response.text

            # 7. Handle response based on *client's* request.stream preference
            if not request.stream:
                # Client wants non-streaming: Construct OpenAI-compatible JSON from plain text
                completion_id = f"chatcmpl-{uuid.uuid4()}"
                created_timestamp = int(time.time())
                response_payload = {
                    "id": completion_id,
                    "object": "chat.completion",
                    "created": created_timestamp,
                    "model": request.model, # Use the model from the original request
                    "choices": [{
                        "index": 0,
                        "message": {
                            "role": "assistant",
                            "content": flowith_text # Use the plain text here
                        },
                        "finish_reason": "stop" # Assume stop
                    }],
                    # "usage": {...} # Usage stats are typically not available/meaningful here
                }
                return JSONResponse(content=response_payload)
            else:
                # Client wants streaming: Implement keep-alive while fetching, then stream response
                async def stream_generator():
                    response_ready_event = asyncio.Event()
                    fetch_task = None
                    flowith_text_local = None # Use a local variable to avoid confusion with outer scope
                    error_occurred = None
                    sse_model_name = request.model # Use the model requested by the client

                    # Coroutine to fetch data and signal completion
                    async def fetch_and_process(event):
                        nonlocal flowith_text_local, error_occurred
                        try:
                            # === Logic to prepare request (model_name, flowith_messages, etc.) ===
                            # These variables are captured from the outer scope:
                            # FLOWITH_API_URL, headers, flowith_payload
                            # The actual request is made here now, not before calling stream_generator
                            async with httpx.AsyncClient(timeout=300.0) as client:
                                payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8') # Use outer flowith_payload
                                response = await client.post(
                                    FLOWITH_API_URL, headers=headers, content=payload_bytes # Use outer headers
                                )
                            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
                            flowith_text_local = response.text # Store in local variable
                        except Exception as e:
                            # print(f"Error fetching from Flowith: {e}") # Optional debug
                            error_occurred = e # Store error to yield later
                        finally:
                            event.set() # Signal completion or failure

                    # Start fetching in the background
                    fetch_task = asyncio.create_task(fetch_and_process(response_ready_event))

                    try:
                        # Send keep-alive chunks while waiting
                        while not response_ready_event.is_set():
                            keep_alive_data = {"id": "chatcmpl-keepalive", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]}
                            yield f"data: {json.dumps(keep_alive_data)}\n\n"
                            # Wait for a short period or until the event is set
                            try:
                                await asyncio.wait_for(response_ready_event.wait(), timeout=3.0)
                            except asyncio.TimeoutError:
                                pass # Timeout means event not set, continue loop

                        # Event is set, check if fetch task had an error
                        if fetch_task.done() and fetch_task.exception():
                            # If fetch_task failed before setting event (shouldn't happen with finally, but check anyway)
                            error_occurred = fetch_task.exception()

                        if error_occurred:
                            # Yield an error chunk (optional, depends on desired behavior)
                            # print(f"Yielding error chunk: {error_occurred}") # Optional debug
                            error_content = f"Error processing request: {type(error_occurred).__name__}"
                            error_chunk = {"id": f"chatcmpl-error-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]} # Use finish_reason 'error' if possible
                            yield f"data: {json.dumps(error_chunk)}\n\n"
                        elif flowith_text_local is not None:
                            # Fetch succeeded, yield content chunks
                            chunk_id = f"chatcmpl-{uuid.uuid4()}"
                            chunk_size = 20
                            full_content = flowith_text_local

                            for i in range(0, len(full_content), chunk_size):
                                content_piece = full_content[i:i + chunk_size]
                                chunk = {
                                    "id": chunk_id,
                                    "object": "chat.completion.chunk",
                                    "created": int(time.time()),
                                    "model": sse_model_name,
                                    "choices": [{"index": 0, "delta": {"content": content_piece}, "finish_reason": None}]
                                }
                                yield f"data: {json.dumps(chunk)}\n\n"

                            # Yield final chunk
                            final_chunk = {
                                "id": chunk_id,
                                "object": "chat.completion.chunk",
                                "created": int(time.time()),
                                "model": sse_model_name,
                                "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
                            }
                            yield f"data: {json.dumps(final_chunk)}\n\n"

                    except asyncio.CancelledError:
                         # print("Stream generator cancelled (client disconnected)") # Optional debug
                         raise # Re-raise cancellation
                    finally:
                        # Ensure fetch task is cancelled if generator exits early
                        if fetch_task and not fetch_task.done():
                            fetch_task.cancel()
                        # Send DONE message regardless of success/failure/cancellation
                        yield "data: [DONE]\n\n"

                # The return statement remains the same
                return StreamingResponse(stream_generator(), media_type="text/event-stream")

        except httpx.RequestError as exc:
            print(f"Error requesting Flowith: {exc}")
            raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")
        except HTTPException as http_exc:
            # Re-raise HTTPExceptions (e.g., from status code check or JSON parsing)
            raise http_exc
        except Exception as exc:
            print(f"Unexpected error during Flowith request/processing: {exc}")
            import traceback
            traceback.print_exc()
            raise HTTPException(status_code=500, detail=f"Internal server error: {exc}")


# --- Models Endpoint ---
@app.get("/v1/models", response_model=ModelList)
async def list_models(api_key: str = Depends(verify_api_key)): # Protect with existing auth
    """
    Lists the available models based on the models.json mapping.
    Follows the OpenAI API format.
    """
    model_cards = [
        ModelCard(id=model_id) for model_id in model_mappings.keys()
    ]
    return ModelList(data=model_cards)


# --- Optional: Add a root endpoint for health check ---
@app.get("/")
async def root():
    return {"message": "OpenAI to Flowith Proxy is running"}

# --- To run locally (for development) ---
# if __name__ == "__main__":
#     import uvicorn
#     uvicorn.run(app, host="0.0.0.0", port=8000)