nacho commited on
Commit
d14a146
·
1 Parent(s): 569153a

fix: SSE keepalive comments every 5s to prevent AstrBot ReadTimeout

Browse files
Files changed (1) hide show
  1. main.py +87 -78
main.py CHANGED
@@ -114,90 +114,99 @@ def verify_api_key(authorization: Optional[str] = Header(None)) -> str:
114
 
115
 
116
  async def _stream_chat_response(
117
- browser,
118
- prompt: str,
119
- model: str,
120
- has_tools: bool,
121
- manager,
122
- account,
123
  ):
124
- """Shared async generator for streaming chat completions with optional tool call detection."""
125
  chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
126
- try:
127
- is_tool_call = False
128
- not_tool_call = False
129
- content_buffer = ""
130
-
131
- async for chunk_data in browser.stream_message(prompt, timeout=120, model=model):
132
- chunk_type = chunk_data.get("type", "content")
133
- chunk_text = chunk_data.get("chunk", "")
134
-
135
- if chunk_type == "thinking":
136
- delta = {"reasoning_content": chunk_text}
137
- else:
138
- if has_tools and not is_tool_call and not not_tool_call:
139
- content_buffer += chunk_text
140
- if len(content_buffer) < 12:
141
- if not "<tool_call>".startswith(content_buffer):
142
- not_tool_call = True
143
- delta = {"content": content_buffer}
 
 
 
144
  else:
145
- continue
 
 
 
 
 
 
 
 
146
  else:
147
- if content_buffer.startswith("<tool_call>"):
148
- is_tool_call = True
149
- continue
150
- else:
151
- not_tool_call = True
152
- delta = {"content": content_buffer}
153
- elif has_tools and is_tool_call:
154
- content_buffer += chunk_text
155
- continue
156
- else:
157
- delta = {"content": chunk_text}
158
-
159
- data = {
160
- "id": chunk_id,
161
- "object": "chat.completion.chunk",
162
- "created": int(time.time()),
163
- "model": model,
164
- "choices": [{"index": 0, "delta": delta, "finish_reason": None}],
165
- }
166
- yield f"data: {json.dumps(data)}\n\n"
167
-
168
- if is_tool_call:
169
- m = re.search(r'<tool_call>(.*?)</tool_call>', content_buffer, re.DOTALL)
170
- if m:
171
- try:
172
- tcall = json.loads(m.group(1))
173
- delta = {
174
- "tool_calls": [{
175
- "index": 0,
176
- "id": f"call_{uuid.uuid4().hex[:8]}",
177
- "type": "function",
178
- "function": {
179
- "name": tcall.get("name", ""),
180
- "arguments": json.dumps(tcall.get("arguments", {}))
181
- }
182
- }]
183
- }
184
- data = {
185
- "id": chunk_id,
186
- "object": "chat.completion.chunk",
187
- "created": int(time.time()),
188
- "model": model,
189
- "choices": [{"index": 0, "delta": delta, "finish_reason": "tool_calls"}]
190
- }
191
- yield f"data: {json.dumps(data)}\n\n"
192
- except Exception as e:
193
- logger.error("Failed to parse tool call: %s", e)
194
 
195
- yield f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
196
- yield "data: [DONE]\n\n"
197
- except Exception as e:
198
- yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n"
 
 
 
 
199
  finally:
200
- await manager.release(account)
 
201
 
202
 
203
  @app.get("/v1/models")
 
114
 
115
 
116
  async def _stream_chat_response(
117
+ browser, prompt, model, has_tools, manager, account,
 
 
 
 
 
118
  ):
 
119
  chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
120
+ q = asyncio.Queue()
121
+
122
+ async def _producer():
123
+ try:
124
+ is_tool_call = False
125
+ not_tool_call = False
126
+ content_buffer = ""
127
+ async for chunk_data in browser.stream_message(prompt, timeout=120, model=model):
128
+ chunk_type = chunk_data.get("type", "content")
129
+ chunk_text = chunk_data.get("chunk", "")
130
+ if chunk_type == "thinking":
131
+ delta = {"reasoning_content": chunk_text}
132
+ else:
133
+ if has_tools and not is_tool_call and not not_tool_call:
134
+ content_buffer += chunk_text
135
+ if len(content_buffer) < 12:
136
+ if not "<tool_call>".startswith(content_buffer):
137
+ not_tool_call = True
138
+ delta = {"content": content_buffer}
139
+ else:
140
+ continue
141
  else:
142
+ if content_buffer.startswith("<tool_call>"):
143
+ is_tool_call = True
144
+ continue
145
+ else:
146
+ not_tool_call = True
147
+ delta = {"content": content_buffer}
148
+ elif has_tools and is_tool_call:
149
+ content_buffer += chunk_text
150
+ continue
151
  else:
152
+ delta = {"content": chunk_text}
153
+ data = {
154
+ "id": chunk_id, "object": "chat.completion.chunk",
155
+ "created": int(time.time()), "model": model,
156
+ "choices": [{"index": 0, "delta": delta, "finish_reason": None}],
157
+ }
158
+ await q.put(f"data: {json.dumps(data)}\n\n")
159
+ if is_tool_call:
160
+ m = re.search(r'<tool_call>(.*?)</tool_call>', content_buffer, re.DOTALL)
161
+ if m:
162
+ try:
163
+ tcall = json.loads(m.group(1))
164
+ delta = {
165
+ "tool_calls": [{
166
+ "index": 0,
167
+ "id": f"call_{uuid.uuid4().hex[:8]}",
168
+ "type": "function",
169
+ "function": {
170
+ "name": tcall.get("name", ""),
171
+ "arguments": json.dumps(tcall.get("arguments", {}))
172
+ }
173
+ }]
174
+ }
175
+ data = {
176
+ "id": chunk_id, "object": "chat.completion.chunk",
177
+ "created": int(time.time()), "model": model,
178
+ "choices": [{"index": 0, "delta": delta, "finish_reason": "tool_calls"}]
179
+ }
180
+ await q.put(f"data: {json.dumps(data)}\n\n")
181
+ except Exception as e:
182
+ logger.error("Failed to parse tool call: %s", e)
183
+ await q.put(f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n")
184
+ await q.put("data: [DONE]\n\n")
185
+ except Exception as e:
186
+ await q.put(f"data: {json.dumps({'error': {'message': str(e)}})}\n\n")
187
+ finally:
188
+ await q.put(None)
189
+ await manager.release(account)
190
+
191
+ async def _keepalive():
192
+ while True:
193
+ await asyncio.sleep(5)
194
+ try:
195
+ q.put_nowait(": keepalive\n\n")
196
+ except asyncio.QueueFull:
197
+ pass
 
198
 
199
+ pt = asyncio.create_task(_producer())
200
+ kt = asyncio.create_task(_keepalive())
201
+ try:
202
+ while True:
203
+ item = await q.get()
204
+ if item is None:
205
+ break
206
+ yield item
207
  finally:
208
+ kt.cancel()
209
+ pt.cancel()
210
 
211
 
212
  @app.get("/v1/models")