LisaMegaWatts commited on
Commit
21fcfa2
Β·
verified Β·
1 Parent(s): 6fe84c4

True token-by-token SSE streaming via thread + queue

Browse files
Files changed (1) hide show
  1. server.py +47 -17
server.py CHANGED
@@ -3,10 +3,14 @@
3
  SymbioGPT-10M base model with Grammar Expert LoRA adapter merged at startup.
4
  The LoRA was discovered via evolutionary search on CoLA (grammar acceptability).
5
  Downloads base checkpoint + LoRA weights from HuggingFace on first run.
 
 
6
  """
7
  import json as json_mod
8
  import math
9
  import os
 
 
10
  import time
11
  import uuid
12
 
@@ -167,12 +171,14 @@ print(f" Merged {n_merged} LoRA weight pairs (rank={LORA_RANK}, alpha={LORA_ALP
167
 
168
  model.eval()
169
  n_params = sum(p.numel() for p in model.parameters())
170
- print(f" Model ready: {n_params/1e6:.1f}M params (base) + LoRA merged")
171
 
172
  # ═══════════════════════════════════════════════════════════════════
173
  # Generation
174
  # ═══════════════════════════════════════════════════════════════════
175
 
 
 
176
 
177
  @torch.no_grad()
178
  def generate(
@@ -181,8 +187,10 @@ def generate(
181
  temperature: float = 0.8,
182
  top_k: int = 40,
183
  top_p: float = 1.0,
184
- on_token=None,
185
  ) -> str:
 
 
186
  tokens = tokenizer.encode(prompt)
187
  if not tokens:
188
  tokens = [0]
@@ -216,8 +224,11 @@ def generate(
216
  generated_ids.append(next_id)
217
  idx = torch.cat([idx, torch.tensor([[next_id]])], dim=1)
218
 
219
- if on_token is not None:
220
- on_token(tokenizer.decode([next_id]))
 
 
 
221
 
222
  return tokenizer.decode(generated_ids)
223
 
@@ -250,7 +261,7 @@ def extract_prompt(messages):
250
  def health():
251
  return {
252
  "name": "SymbioGPT-GrammarExpert",
253
- "version": "1.0.0",
254
  "description": "SymbioGPT-10M + Grammar Expert LoRA (evolved on CoLA)",
255
  "architecture": "4-organelle decoder (CausalConv + Monarch + LongConv + Attention) "
256
  "+ OrganelleGate + LoRA (rank=8, attn+ffn)",
@@ -305,6 +316,7 @@ async def chat_completions(request: Request):
305
 
306
  if stream:
307
  def sse_stream():
 
308
  initial = {
309
  "id": completion_id,
310
  "object": "chat.completion.chunk",
@@ -314,26 +326,41 @@ async def chat_completions(request: Request):
314
  }
315
  yield f"data: {json_mod.dumps(initial)}\n\n"
316
 
317
- token_count = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
318
 
319
- def on_token(token_str):
320
- nonlocal token_count
 
 
 
 
321
  token_count += 1
322
-
323
- text = generate(prompt_text, max_tokens=max_tokens, temperature=temperature,
324
- top_k=top_k_val, top_p=top_p_val, on_token=on_token)
325
-
326
- for word in text.split(" "):
327
- chunk_text = word + " " if word else ""
328
  chunk = {
329
  "id": completion_id,
330
  "object": "chat.completion.chunk",
331
  "created": created,
332
  "model": MODEL_ID,
333
- "choices": [{"index": 0, "delta": {"content": chunk_text}, "finish_reason": None}],
334
  }
335
  yield f"data: {json_mod.dumps(chunk)}\n\n"
336
 
 
 
 
337
  finish = {
338
  "id": completion_id,
339
  "object": "chat.completion.chunk",
@@ -342,8 +369,8 @@ async def chat_completions(request: Request):
342
  "choices": [{"index": 0, "delta": {}, "finish_reason": "length" if token_count >= max_tokens else "stop"}],
343
  "usage": {
344
  "prompt_tokens": prompt_tokens,
345
- "completion_tokens": max_tokens,
346
- "total_tokens": prompt_tokens + max_tokens,
347
  },
348
  }
349
  yield f"data: {json_mod.dumps(finish)}\n\n"
@@ -377,4 +404,7 @@ async def chat_completions(request: Request):
377
 
378
  if __name__ == "__main__":
379
  print(f"\nSymbioGPT-GrammarExpert server starting on 0.0.0.0:{PORT} ...")
 
 
 
380
  uvicorn.run(app, host="0.0.0.0", port=PORT)
 
3
  SymbioGPT-10M base model with Grammar Expert LoRA adapter merged at startup.
4
  The LoRA was discovered via evolutionary search on CoLA (grammar acceptability).
5
  Downloads base checkpoint + LoRA weights from HuggingFace on first run.
6
+
7
+ True token-by-token SSE streaming via background thread + queue.
8
  """
9
  import json as json_mod
10
  import math
11
  import os
12
+ import queue
13
+ import threading
14
  import time
15
  import uuid
16
 
 
171
 
172
  model.eval()
173
  n_params = sum(p.numel() for p in model.parameters())
174
+ print(f" Model ready: {n_params/1e6:.1f}M params (base + LoRA merged)")
175
 
176
  # ═══════════════════════════════════════════════════════════════════
177
  # Generation
178
  # ═══════════════════════════════════════════════════════════════════
179
 
180
+ _SENTINEL = object() # marks end of generation
181
+
182
 
183
  @torch.no_grad()
184
  def generate(
 
187
  temperature: float = 0.8,
188
  top_k: int = 40,
189
  top_p: float = 1.0,
190
+ token_queue: queue.Queue = None,
191
  ) -> str:
192
+ """Generate text. If token_queue is provided, pushes each token string
193
+ to the queue as it's generated for true streaming."""
194
  tokens = tokenizer.encode(prompt)
195
  if not tokens:
196
  tokens = [0]
 
224
  generated_ids.append(next_id)
225
  idx = torch.cat([idx, torch.tensor([[next_id]])], dim=1)
226
 
227
+ if token_queue is not None:
228
+ token_queue.put(tokenizer.decode([next_id]))
229
+
230
+ if token_queue is not None:
231
+ token_queue.put(_SENTINEL)
232
 
233
  return tokenizer.decode(generated_ids)
234
 
 
261
  def health():
262
  return {
263
  "name": "SymbioGPT-GrammarExpert",
264
+ "version": "1.1.0",
265
  "description": "SymbioGPT-10M + Grammar Expert LoRA (evolved on CoLA)",
266
  "architecture": "4-organelle decoder (CausalConv + Monarch + LongConv + Attention) "
267
  "+ OrganelleGate + LoRA (rank=8, attn+ffn)",
 
316
 
317
  if stream:
318
  def sse_stream():
319
+ # Initial chunk with role
320
  initial = {
321
  "id": completion_id,
322
  "object": "chat.completion.chunk",
 
326
  }
327
  yield f"data: {json_mod.dumps(initial)}\n\n"
328
 
329
+ # Start generation in background thread
330
+ q = queue.Queue()
331
+ gen_thread = threading.Thread(
332
+ target=generate,
333
+ kwargs={
334
+ "prompt": prompt_text,
335
+ "max_tokens": max_tokens,
336
+ "temperature": temperature,
337
+ "top_k": top_k_val,
338
+ "top_p": top_p_val,
339
+ "token_queue": q,
340
+ },
341
+ daemon=True,
342
+ )
343
+ gen_thread.start()
344
 
345
+ # Stream tokens as they arrive
346
+ token_count = 0
347
+ while True:
348
+ tok = q.get()
349
+ if tok is _SENTINEL:
350
+ break
351
  token_count += 1
 
 
 
 
 
 
352
  chunk = {
353
  "id": completion_id,
354
  "object": "chat.completion.chunk",
355
  "created": created,
356
  "model": MODEL_ID,
357
+ "choices": [{"index": 0, "delta": {"content": tok}, "finish_reason": None}],
358
  }
359
  yield f"data: {json_mod.dumps(chunk)}\n\n"
360
 
361
+ gen_thread.join(timeout=5.0)
362
+
363
+ # Final chunk
364
  finish = {
365
  "id": completion_id,
366
  "object": "chat.completion.chunk",
 
369
  "choices": [{"index": 0, "delta": {}, "finish_reason": "length" if token_count >= max_tokens else "stop"}],
370
  "usage": {
371
  "prompt_tokens": prompt_tokens,
372
+ "completion_tokens": token_count,
373
+ "total_tokens": prompt_tokens + token_count,
374
  },
375
  }
376
  yield f"data: {json_mod.dumps(finish)}\n\n"
 
404
 
405
  if __name__ == "__main__":
406
  print(f"\nSymbioGPT-GrammarExpert server starting on 0.0.0.0:{PORT} ...")
407
+ print(f" GET http://localhost:{PORT}/")
408
+ print(f" GET http://localhost:{PORT}/v1/models")
409
+ print(f" POST http://localhost:{PORT}/v1/chat/completions")
410
  uvicorn.run(app, host="0.0.0.0", port=PORT)