rkihacker commited on
Commit
0f4286e
·
verified ·
1 Parent(s): c5a8085

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +88 -30
main.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import os
2
  import httpx
3
  import json
@@ -92,7 +93,7 @@ async def stream_replicate_sse(replicate_model_id: str, input_payload: dict):
92
  stream_url = prediction.get("urls", {}).get("stream")
93
  prediction_id = prediction.get("id", "stream-unknown")
94
  if not stream_url:
95
- yield json.dumps({"error": {"message": "Model did not return a stream URL."}})
96
  return
97
  except httpx.HTTPStatusError as e:
98
  error_details = e.response.text
@@ -100,7 +101,7 @@ async def stream_replicate_sse(replicate_model_id: str, input_payload: dict):
100
  error_json = e.response.json()
101
  error_details = error_json.get("detail", error_details)
102
  except json.JSONDecodeError: pass
103
- yield json.dumps({"error": {"message": f"Upstream Error: {error_details}", "type": "replicate_error"}})
104
  return
105
 
106
  try:
@@ -112,49 +113,106 @@ async def stream_replicate_sse(replicate_model_id: str, input_payload: dict):
112
  if line.startswith("event:"):
113
  current_event = line[len("event:"):].strip()
114
  elif line.startswith("data:"):
115
- # --- START OF DEFINITIVE FIX ---
116
- # Previous logic was flawed and removed critical whitespace,
117
- # causing both spacing issues and silent failures.
118
- # This new logic is simple, robust, and correct.
119
 
120
- # 1. Get the entire payload after "data:"
121
- raw_payload = line[len("data:"):]
 
 
 
 
 
 
 
 
122
 
123
- # 2. The SSE spec allows an optional leading space. Remove it if it exists.
124
- # This prevents parsing errors without destroying content.
125
- payload = raw_payload.lstrip(" ")
126
-
127
  if current_event == "output":
128
- if not payload: # Skip if the payload is now empty after lstrip
129
  continue
130
 
131
  content_token = ""
132
  try:
133
- # This handles JSON-encoded strings like "\" Hello\""
134
- content_token = json.loads(payload)
135
  except (json.JSONDecodeError, TypeError):
136
- # This handles plain text tokens
137
- content_token = payload
138
 
139
- # Yield the token. It can now correctly be a single space " ".
140
  chunk = {
141
- "id": prediction_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": replicate_model_id,
142
- "choices": [{"index": 0, "delta": {"content": content_token}, "finish_reason": None}]
 
 
 
 
 
 
 
 
 
 
143
  }
144
- yield json.dumps(chunk)
145
- # --- END OF DEFINITIVE FIX ---
146
  elif current_event == "done":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  break
148
  except httpx.ReadTimeout:
149
- yield json.dumps({"error": {"message": "Stream timed out.", "type": "timeout_error"}})
150
  return
151
 
152
- final_chunk = {
153
- "id": prediction_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": replicate_model_id,
154
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
155
- }
156
- yield json.dumps(final_chunk)
157
- yield "[DONE]"
158
 
159
  # --- Endpoints ---
160
  @app.get("/v1/models")
@@ -186,4 +244,4 @@ async def create_chat_completion(request: OpenAIChatCompletionRequest):
186
  "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
187
  }
188
  except httpx.HTTPStatusError as e:
189
- raise HTTPException(status_code=e.response.status_code, detail=f"Error from Replicate API: {e.response.text}")
 
1
+
2
  import os
3
  import httpx
4
  import json
 
93
  stream_url = prediction.get("urls", {}).get("stream")
94
  prediction_id = prediction.get("id", "stream-unknown")
95
  if not stream_url:
96
+ yield f"data: {json.dumps({'error': {'message': 'Model did not return a stream URL.'}})}\n\n"
97
  return
98
  except httpx.HTTPStatusError as e:
99
  error_details = e.response.text
 
101
  error_json = e.response.json()
102
  error_details = error_json.get("detail", error_details)
103
  except json.JSONDecodeError: pass
104
+ yield f"data: {json.dumps({'error': {'message': f'Upstream Error: {error_details}', 'type': 'replicate_error'}})}\n\n"
105
  return
106
 
107
  try:
 
113
  if line.startswith("event:"):
114
  current_event = line[len("event:"):].strip()
115
  elif line.startswith("data:"):
116
+ # FIXED: Preserve all whitespace including leading/trailing spaces
117
+ raw_data = line[5:] # Remove "data:" prefix
 
 
118
 
119
+ # Handle empty data lines (preserve them)
120
+ if not raw_data:
121
+ continue
122
+
123
+ # Remove only the optional single space after data: if present
124
+ # This is per SSE spec and preserves actual content spaces
125
+ if raw_data.startswith(" "):
126
+ data_content = raw_data[1:] # Remove the first space only
127
+ else:
128
+ data_content = raw_data
129
 
 
 
 
 
130
  if current_event == "output":
131
+ if not data_content:
132
  continue
133
 
134
  content_token = ""
135
  try:
136
+ # Handle JSON-encoded strings properly (including spaces)
137
+ content_token = json.loads(data_content)
138
  except (json.JSONDecodeError, TypeError):
139
+ # Handle plain text tokens (preserve as-is)
140
+ content_token = data_content
141
 
142
+ # Create chunk with exact format you specified
143
  chunk = {
144
+ "choices": [{
145
+ "delta": {"content": content_token},
146
+ "finish_reason": None,
147
+ "index": 0,
148
+ "logprobs": None,
149
+ "native_finish_reason": None
150
+ }],
151
+ "created": int(time.time()),
152
+ "id": f"gen-{int(time.time())}-{prediction_id[-12:]}", # Format like your example
153
+ "model": replicate_model_id,
154
+ "object": "chat.completion.chunk",
155
+ "provider": "Anthropic" if "anthropic" in replicate_model_id else "Replicate"
156
  }
157
+ yield f"data: {json.dumps(chunk)}\n\n"
158
+
159
  elif current_event == "done":
160
+ # Send usage chunk before done
161
+ usage_chunk = {
162
+ "choices": [{
163
+ "delta": {},
164
+ "finish_reason": None,
165
+ "index": 0,
166
+ "logprobs": None,
167
+ "native_finish_reason": None
168
+ }],
169
+ "created": int(time.time()),
170
+ "id": f"gen-{int(time.time())}-{prediction_id[-12:]}",
171
+ "model": replicate_model_id,
172
+ "object": "chat.completion.chunk",
173
+ "provider": "Anthropic" if "anthropic" in replicate_model_id else "Replicate",
174
+ "usage": {
175
+ "cache_discount": 0,
176
+ "completion_tokens": 0,
177
+ "completion_tokens_details": {"image_tokens": 0, "reasoning_tokens": 0},
178
+ "cost": 0,
179
+ "cost_details": {
180
+ "upstream_inference_completions_cost": 0,
181
+ "upstream_inference_cost": None,
182
+ "upstream_inference_prompt_cost": 0
183
+ },
184
+ "input_tokens": 0,
185
+ "is_byok": False,
186
+ "prompt_tokens": 0,
187
+ "prompt_tokens_details": {"audio_tokens": 0, "cached_tokens": 0},
188
+ "total_tokens": 0
189
+ }
190
+ }
191
+ yield f"data: {json.dumps(usage_chunk)}\n\n"
192
+
193
+ # Send final chunk with stop reason
194
+ final_chunk = {
195
+ "choices": [{
196
+ "delta": {},
197
+ "finish_reason": "stop",
198
+ "index": 0,
199
+ "logprobs": None,
200
+ "native_finish_reason": "end_turn"
201
+ }],
202
+ "created": int(time.time()),
203
+ "id": f"gen-{int(time.time())}-{prediction_id[-12:]}",
204
+ "model": replicate_model_id,
205
+ "object": "chat.completion.chunk",
206
+ "provider": "Anthropic" if "anthropic" in replicate_model_id else "Replicate"
207
+ }
208
+ yield f"data: {json.dumps(final_chunk)}\n\n"
209
  break
210
  except httpx.ReadTimeout:
211
+ yield f"data: {json.dumps({'error': {'message': 'Stream timed out.', 'type': 'timeout_error'}})}\n\n"
212
  return
213
 
214
+ # Send [DONE] event
215
+ yield "data: [DONE]\n\n"
 
 
 
 
216
 
217
  # --- Endpoints ---
218
  @app.get("/v1/models")
 
244
  "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
245
  }
246
  except httpx.HTTPStatusError as e:
247
+ raise HTTPException(status_code=e.response.status_code, detail=f"Error from Replicate API: {e.response.text}")