Hivra commited on
Commit
7eba146
·
verified ·
1 Parent(s): 0520686

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +118 -119
app.py CHANGED
@@ -6,6 +6,12 @@ import json
6
  import os
7
  import time
8
  import asyncio
 
 
 
 
 
 
9
 
10
  app = FastAPI()
11
 
@@ -45,8 +51,11 @@ DEFAULT_HEADERS = {
45
  SUPPORTED_MODELS = ["o3-mini", "gpt-4o-mini", "claude-3-haiku-20240307", "meta-llama/Llama-3.3-70B-Instruct-Turbo"]
46
  TIMEOUT = 30.0 # Seconds
47
 
48
- async def get_vqd():
49
- """Fetch DuckDuckGo authentication token."""
 
 
 
50
  try:
51
  async with httpx.AsyncClient() as client:
52
  response = await client.get(
@@ -55,107 +64,98 @@ async def get_vqd():
55
  timeout=10.0
56
  )
57
  response.raise_for_status()
58
- vqd = response.headers.get("x-vqd-4")
59
- if not vqd:
60
- raise ValueError("Missing x-vqd-4 header in response")
61
- return vqd
 
 
 
62
  except httpx.HTTPStatusError as e:
63
- raise HTTPException(status_code=e.response.status_code, detail=f"VQD fetch failed: {str(e)}")
 
64
  except Exception as e:
65
- raise HTTPException(status_code=500, detail=f"VQD error: {str(e)}")
 
66
 
67
- async def duckduckgo_chat_stream(model: str, messages: list):
68
- """Handle streaming chat response."""
 
69
  try:
70
- x_vqd_4 = await get_vqd()
71
- chat_headers = {
72
- **DEFAULT_HEADERS,
73
- "x-vqd-4": x_vqd_4,
74
- "Accept": "text/event-stream",
75
- }
76
-
77
- async with httpx.AsyncClient() as client:
78
- response = await client.post(
79
- CHAT_URL,
80
- headers=chat_headers,
81
- json={"model": model, "messages": messages},
82
- timeout=TIMEOUT
83
- )
84
- response.raise_for_status()
85
-
86
- async def event_generator():
87
- try:
88
- async for chunk in response.aiter_bytes():
89
- decoded_chunk = chunk.decode('utf-8')
90
- for line in decoded_chunk.split('\n'):
91
- line = line.strip()
92
- if line.startswith("data: "):
93
- try:
94
- data = json.loads(line[5:])
95
- if "error" in data:
96
- yield f"data: {json.dumps({'error': data['error']})}\n\n"
97
- return
98
-
99
- message = data.get("message", "")
100
- if not message:
101
- continue
102
-
103
- yield format_openai_chunk(message, model)
104
- await asyncio.sleep(0.001) # Rate limit
105
- except json.JSONDecodeError as e:
106
- yield f"data: {json.dumps({'error': f'JSON error: {str(e)}'})}\n\n"
107
- return
108
- except Exception as e:
109
- yield f"data: {json.dumps({'error': f'Stream error: {str(e)}'})}\n\n"
110
- finally:
111
- yield "data: [DONE]\n\n"
112
-
113
- return StreamingResponse(event_generator(), media_type="text/event-stream")
114
-
115
- except httpx.HTTPStatusError as e:
116
- raise HTTPException(status_code=e.response.status_code, detail=f"Chat error: {str(e)}")
117
  except Exception as e:
118
- raise HTTPException(status_code=500, detail=f"Chat error: {str(e)}")
 
 
 
119
 
120
- async def duckduckgo_chat_non_stream(model: str, messages: list):
121
- """Handle non-streaming chat response."""
122
  try:
123
- x_vqd_4 = await get_vqd()
124
- chat_headers = {
125
- **DEFAULT_HEADERS,
126
- "x-vqd-4": x_vqd_4,
127
- }
128
-
129
  async with httpx.AsyncClient() as client:
130
  response = await client.post(
131
  CHAT_URL,
132
- headers=chat_headers,
 
 
 
 
133
  json={"model": model, "messages": messages},
134
  timeout=TIMEOUT
135
  )
136
  response.raise_for_status()
137
-
138
- full_response = []
139
- async for chunk in response.aiter_bytes():
140
- decoded_chunk = chunk.decode('utf-8')
141
- for line in decoded_chunk.split('\n'):
142
- line = line.strip()
143
- if line.startswith("data: "):
144
- try:
145
- data = json.loads(line[5:])
146
- full_response.append(data.get("message", ""))
147
- except json.JSONDecodeError:
148
- continue
149
-
150
- return "".join(full_response)
151
-
152
  except httpx.HTTPStatusError as e:
153
- raise HTTPException(status_code=e.response.status_code, detail=f"Chat error: {str(e)}")
 
154
  except Exception as e:
155
- raise HTTPException(status_code=500, detail=f"Chat error: {str(e)}")
 
156
 
157
- def format_openai_chunk(content: str, model: str):
158
- """Format response chunk in OpenAI style."""
159
  return json.dumps({
160
  "id": f"chatcmpl-{int(time.time()*1000)}",
161
  "object": "chat.completion.chunk",
@@ -168,6 +168,13 @@ def format_openai_chunk(content: str, model: str):
168
  }]
169
  }) + "\n\n"
170
 
 
 
 
 
 
 
 
171
  @app.post("/v1/chat/completions")
172
  async def chat_completions(request: Request):
173
  try:
@@ -176,49 +183,41 @@ async def chat_completions(request: Request):
176
  messages = data.get("messages", [])
177
  stream = data.get("stream", False)
178
 
179
- # Validation
180
  if model not in SUPPORTED_MODELS:
181
  raise HTTPException(400, f"Unsupported model: {model}")
182
- if not messages:
183
- raise HTTPException(400, "Empty messages list")
184
-
185
  # Process messages
186
- system_message = next((m for m in messages if m["role"] == "system"), None)
187
- history = "\n".join(
188
- f"{m['role']}: {m['content']}"
189
- for m in messages
190
- if m["role"] != "system" and m != messages[-1]
191
- )
192
- current_query = messages[-1]["content"] if messages else ""
193
-
194
- combined_content = f"{system_message['content']}\n{history}\nUser: {current_query}" if system_message else f"{history}\nUser: {current_query}"
195
- payload = [{"role": "user", "content": combined_content}]
 
 
196
 
197
  if stream:
198
  return await duckduckgo_chat_stream(model, payload)
199
  else:
200
- response_text = await duckduckgo_chat_non_stream(model, payload)
201
- return JSONResponse({
202
- "id": f"chatcmpl-{int(time.time()*1000)}",
203
- "object": "chat.completion",
204
- "created": int(time.time()),
205
- "model": model,
206
- "choices": [{
207
- "message": {"role": "assistant", "content": response_text},
208
- "finish_reason": "stop",
209
- "index": 0
210
- }],
211
- "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
212
- })
213
-
214
- except HTTPException as e:
215
- raise e
216
  except Exception as e:
217
- raise HTTPException(500, f"Server error: {str(e)}")
 
218
 
219
- @app.get("/")
220
  async def health_check():
221
- return {"status": "healthy", "timestamp": int(time.time())}
222
 
223
  @app.exception_handler(HTTPException)
224
  async def http_error_handler(request: Request, exc: HTTPException):
 
6
  import os
7
  import time
8
  import asyncio
9
+ import logging
10
+ from typing import AsyncGenerator
11
+
12
+ # Configure logging
13
+ logging.basicConfig(level=logging.INFO)
14
+ logger = logging.getLogger(__name__)
15
 
16
  app = FastAPI()
17
 
 
51
  SUPPORTED_MODELS = ["o3-mini", "gpt-4o-mini", "claude-3-haiku-20240307", "meta-llama/Llama-3.3-70B-Instruct-Turbo"]
52
  TIMEOUT = 30.0 # Seconds
53
 
54
+ class StreamClosedError(Exception):
55
+ pass
56
+
57
+ async def get_vqd() -> str:
58
+ """Fetch and validate DuckDuckGo authentication token."""
59
  try:
60
  async with httpx.AsyncClient() as client:
61
  response = await client.get(
 
64
  timeout=10.0
65
  )
66
  response.raise_for_status()
67
+
68
+ if "x-vqd-4" not in response.headers:
69
+ logger.error("Missing x-vqd-4 header in response")
70
+ raise ValueError("Authentication failed: Missing VQD token")
71
+
72
+ return response.headers["x-vqd-4"]
73
+
74
  except httpx.HTTPStatusError as e:
75
+ logger.error(f"VQD fetch failed: {e.response.text}")
76
+ raise HTTPException(status_code=502, detail="Upstream authentication service unavailable")
77
  except Exception as e:
78
+ logger.exception("Critical VQD error")
79
+ raise HTTPException(status_code=500, detail="Internal authentication error")
80
 
81
+ async def stream_generator(response: httpx.Response) -> AsyncGenerator[str, None]:
82
+ """Handle streaming response with proper buffer management."""
83
+ buffer = ""
84
  try:
85
+ async for chunk in response.aiter_text():
86
+ buffer += chunk
87
+
88
+ while "\n\n" in buffer:
89
+ event, buffer = buffer.split("\n\n", 1)
90
+ for line in event.strip().split("\n"):
91
+ if not line.startswith("data: "):
92
+ continue
93
+
94
+ try:
95
+ data = json.loads(line[5:])
96
+ if error := data.get("error"):
97
+ logger.error(f"Upstream error: {error}")
98
+ yield format_error_chunk(error)
99
+ return
100
+
101
+ if message := data.get("message", ""):
102
+ yield format_openai_chunk(message, "gpt-4o-mini")
103
+ await asyncio.sleep(0.001)
104
+
105
+ except json.JSONDecodeError:
106
+ logger.warning(f"Invalid JSON line: {line[:100]}")
107
+ yield format_error_chunk("Invalid response format")
108
+ return
109
+ except Exception as e:
110
+ logger.error(f"Stream processing error: {str(e)}")
111
+ yield format_error_chunk("Stream processing failed")
112
+ return
113
+
114
+ except httpx.RemoteProtocolError:
115
+ logger.info("Connection closed by server")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  except Exception as e:
117
+ logger.error(f"Unexpected stream error: {str(e)}")
118
+ yield format_error_chunk("Stream connection failed")
119
+ finally:
120
+ yield "data: [DONE]\n\n"
121
 
122
+ async def duckduckgo_chat_stream(model: str, messages: list) -> StreamingResponse:
123
+ """Robust streaming handler with connection monitoring."""
124
  try:
125
+ logger.info(f"Initiating stream for model: {model}")
126
+ vqd_token = await get_vqd()
127
+
 
 
 
128
  async with httpx.AsyncClient() as client:
129
  response = await client.post(
130
  CHAT_URL,
131
+ headers={
132
+ **DEFAULT_HEADERS,
133
+ "x-vqd-4": vqd_token,
134
+ "Accept": "text/event-stream",
135
+ },
136
  json={"model": model, "messages": messages},
137
  timeout=TIMEOUT
138
  )
139
  response.raise_for_status()
140
+
141
+ return StreamingResponse(
142
+ stream_generator(response),
143
+ media_type="text/event-stream",
144
+ headers={
145
+ "Cache-Control": "no-cache",
146
+ "X-Stream-ID": f"stream_{int(time.time())}",
147
+ }
148
+ )
149
+
 
 
 
 
 
150
  except httpx.HTTPStatusError as e:
151
+ logger.error(f"Upstream API error: {e.response.text}")
152
+ raise HTTPException(status_code=502, detail="Upstream service unavailable")
153
  except Exception as e:
154
+ logger.exception("Stream setup failed")
155
+ raise HTTPException(status_code=500, detail="Stream initialization failed")
156
 
157
+ def format_openai_chunk(content: str, model: str) -> str:
158
+ """Generate OpenAI-compatible SSE chunk."""
159
  return json.dumps({
160
  "id": f"chatcmpl-{int(time.time()*1000)}",
161
  "object": "chat.completion.chunk",
 
168
  }]
169
  }) + "\n\n"
170
 
171
+ def format_error_chunk(message: str) -> str:
172
+ """Format error messages for SSE stream."""
173
+ return json.dumps({
174
+ "error": message,
175
+ "code": "STREAM_ERROR"
176
+ }) + "\n\n"
177
+
178
  @app.post("/v1/chat/completions")
179
  async def chat_completions(request: Request):
180
  try:
 
183
  messages = data.get("messages", [])
184
  stream = data.get("stream", False)
185
 
186
+ # Validate input
187
  if model not in SUPPORTED_MODELS:
188
  raise HTTPException(400, f"Unsupported model: {model}")
189
+ if not messages or not isinstance(messages, list):
190
+ raise HTTPException(400, "Invalid messages format")
191
+
192
  # Process messages
193
+ last_message = messages[-1]
194
+ if last_message.get("role") != "user":
195
+ raise HTTPException(400, "Last message must be from user")
196
+
197
+ # Build payload (simplified for example)
198
+ payload = [{
199
+ "role": "user",
200
+ "content": "\n".join(
201
+ f"{m['role']}: {m['content']}"
202
+ for m in messages
203
+ )
204
+ }]
205
 
206
  if stream:
207
  return await duckduckgo_chat_stream(model, payload)
208
  else:
209
+ # Non-streaming implementation
210
+ raise HTTPException(501, "Non-streaming mode temporarily disabled")
211
+
212
+ except HTTPException:
213
+ raise
 
 
 
 
 
 
 
 
 
 
 
214
  except Exception as e:
215
+ logger.exception("API handler error")
216
+ raise HTTPException(500, "Internal server error")
217
 
218
+ @app.get("/health")
219
  async def health_check():
220
+ return {"status": "ok", "timestamp": int(time.time())}
221
 
222
  @app.exception_handler(HTTPException)
223
  async def http_error_handler(request: Request, exc: HTTPException):