AdarshJi commited on
Commit
baf1e81
·
verified ·
1 Parent(s): 16d6929

Update server.py

Browse files
Files changed (1) hide show
  1. server.py +74 -162
server.py CHANGED
@@ -3,16 +3,10 @@ from typing import List, Dict, Any, AsyncGenerator, Optional
3
  import re
4
  import orjson
5
  import httpx
6
- import logging
7
- import asyncio
8
  from fastapi import FastAPI, Request, HTTPException
9
- from fastapi.responses import StreamingResponse, JSONResponse
 
10
 
11
- # -------------------------
12
- # Logging
13
- # -------------------------
14
- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
15
- logger = logging.getLogger("chat-proxy")
16
 
17
 
18
 
@@ -284,48 +278,55 @@ def LLMCM() -> list:
284
  return R
285
 
286
 
 
 
 
287
  try:
288
- MODEL_NAMES = {"GROQ": GROQM(), "LLMC": LLMCM()}
289
  except Exception:
290
  MODEL_NAMES = {"GROQ": "GROQ-FALLBACK", "LLMC": "LLMC-FALLBACK"}
291
 
292
- # -------------------------
293
- # Configuration (conservative defaults)
294
- # -------------------------
295
  class Config:
296
  DEFAULT_PROVIDER = "GROQ"
297
  DEFAULT_MODEL = "llama-3.3-70b-versatile"
298
  DEFAULT_MAX_TOKENS = 512
299
  DEFAULT_TEMPERATURE = 0.7
300
  CHUNK_SIZE = 1000
301
- MAX_CONNECTIONS = 50 # lowered to reduce memory pressure
302
- HTTP2 = False # disable unless you need it
303
  TIMEOUT = 30.0
304
- STREAM_BATCH_BYTES = 0 # 0 = stream as tokens arrive
305
 
306
- # -------------------------
307
- # Provider templates
308
- # -------------------------
309
  PROVIDERS: Dict[str, Dict[str, Any]] = {
310
  "GROQ": {
311
  "AUTH": True,
312
  "BASE_URL": "https://api.groq.com/openai/v1/chat/completions",
313
  "DEFAULT_MODEL": "qwen/qwen3-32b",
314
  "HEADERS": {"Authorization": "Bearer {API}", "Content-Type": "application/json"},
315
- "PAYLOAD": {"model": "{model}", "messages": "{messages}", "temperature": "{temperature}", "max_tokens": "{max_tokens}", "stop": None, "stream": "{stream}"},
 
 
 
 
 
 
 
316
  },
317
  "LLMC": {
318
  "AUTH": True,
319
  "BASE_URL": "https://llmchat.in/inference/stream?model={model}",
320
  "DEFAULT_MODEL": "@cf/meta/llama-3.1-8b-instruct",
321
- "HEADERS": {"Content-Type": "application/json", "Accept": "*/*", "Origin": "https://llmchat.in", "Referer": "https://llmchat.in/"},
 
 
 
 
 
322
  "PAYLOAD": {"messages": "{messages}", "stream": "{stream}"},
323
  },
324
  }
325
 
326
- # -------------------------
327
- # Template helpers
328
- # -------------------------
329
  _placeholder_re = re.compile(r"\{(.*?)\}")
330
 
331
  def apply_values_to_template(template: Any, values: Dict[str, Any]) -> Any:
@@ -333,7 +334,6 @@ def apply_values_to_template(template: Any, values: Dict[str, Any]) -> Any:
333
  m = _placeholder_re.fullmatch(template.strip())
334
  if m:
335
  return values.get(m.group(1), template)
336
- # create safe stringified values for formatting
337
  str_values = {
338
  k: (v if isinstance(v, str) else (orjson.dumps(v).decode("utf-8") if not isinstance(v, (int, float, bool, type(None))) else v))
339
  for k, v in values.items()
@@ -360,9 +360,6 @@ def build_values_from_request(req: "ChatRequest") -> Dict[str, Any]:
360
  "stream": req.stream,
361
  }
362
 
363
- # -------------------------
364
- # Request dataclass
365
- # -------------------------
366
  @dataclass
367
  class ChatRequest:
368
  api_key: str
@@ -388,15 +385,20 @@ class ChatRequest:
388
  messages = [messages]
389
  return ChatRequest(api_key=api_key, messages=messages, model=model, provider=provider, max_tokens=max_tokens, temperature=temperature, stream=stream)
390
 
391
- # -------------------------
392
- # Upstream Async client
393
- # -------------------------
394
  class AsyncUpstreamClient:
395
  def __init__(self):
396
  limits = httpx.Limits(max_connections=Config.MAX_CONNECTIONS)
397
- # create client lazily with conservative settings
398
  self._client = httpx.AsyncClient(timeout=Config.TIMEOUT, limits=limits, http2=Config.HTTP2)
399
 
 
 
 
 
 
 
 
 
 
400
  async def close(self):
401
  await self._client.aclose()
402
 
@@ -431,19 +433,12 @@ class AsyncUpstreamClient:
431
  if not chunk:
432
  continue
433
  buf += chunk
434
- # parse events separated by double-newline
435
  while b"\n\n" in buf:
436
  event, buf = buf.split(b"\n\n", 1)
437
  for line in event.splitlines():
438
  if not line:
439
  continue
440
- payload_bytes = None
441
- if line.startswith(b"data:"):
442
- payload_bytes = line[len(b"data:"):].strip()
443
- elif b"data: " in line:
444
- payload_bytes = line[line.find(b"data: ") + 6:].strip()
445
- else:
446
- payload_bytes = line.strip()
447
  if payload_bytes == b"[DONE]":
448
  if reasoning_open:
449
  yield nd({"response": "\n</think>\n"})
@@ -453,21 +448,23 @@ class AsyncUpstreamClient:
453
  try:
454
  parsed = orjson.loads(payload_bytes)
455
  except Exception:
456
- # not JSON — pass through text
457
  try:
458
  txt = payload_bytes.decode("utf-8", errors="ignore")
459
- if txt and txt.strip():
460
- yield nd({"response": txt})
461
  except Exception:
462
  pass
 
 
463
  continue
464
-
465
- # handle nested response content
466
  if isinstance(parsed, dict) and "response" in parsed:
467
  resp_val = parsed.get("response")
468
  if resp_val is None:
469
  continue
470
- # nested JSON string?
 
 
 
 
471
  inner = None
472
  try:
473
  if isinstance(resp_val, str):
@@ -481,8 +478,6 @@ class AsyncUpstreamClient:
481
  else:
482
  yield nd({"response": str(resp_val)})
483
  continue
484
-
485
- # handle choices/delta format
486
  if isinstance(parsed, dict) and parsed.get("choices"):
487
  try:
488
  c0 = parsed["choices"][0]
@@ -512,11 +507,8 @@ class AsyncUpstreamClient:
512
  continue
513
  except Exception:
514
  pass
515
-
516
  if isinstance(parsed, dict) and self._is_metadata_blob(parsed):
517
  continue
518
-
519
- # fallback: stream parsed object
520
  try:
521
  yield nd({"response": parsed})
522
  except Exception:
@@ -524,19 +516,11 @@ class AsyncUpstreamClient:
524
  yield nd({"response": str(parsed)})
525
  except Exception:
526
  continue
527
-
528
- # handle leftover buffer at exit
529
  if buf:
530
  for line in buf.splitlines():
531
  if not line:
532
  continue
533
- payload_bytes = None
534
- if line.startswith(b"data:"):
535
- payload_bytes = line[len(b"data:"):].strip()
536
- elif b"data: " in line:
537
- payload_bytes = line[line.find(b"data: ") + 6:].strip()
538
- else:
539
- payload_bytes = line.strip()
540
  if payload_bytes == b"[DONE]":
541
  if reasoning_open:
542
  yield orjson.dumps({"response": "\n</think>\n"}) + b"\n"
@@ -556,6 +540,11 @@ class AsyncUpstreamClient:
556
  resp_val = parsed.get("response")
557
  if resp_val is None:
558
  continue
 
 
 
 
 
559
  inner = None
560
  try:
561
  if isinstance(resp_val, str):
@@ -605,9 +594,6 @@ class AsyncUpstreamClient:
605
  except Exception:
606
  continue
607
 
608
- # -------------------------
609
- # ChatService
610
- # -------------------------
611
  class ChatService:
612
  def __init__(self, client: Optional[AsyncUpstreamClient] = None):
613
  self.client = client or AsyncUpstreamClient()
@@ -621,139 +607,65 @@ class ChatService:
621
  if not values.get("model"):
622
  values["model"] = prov.get("DEFAULT_MODEL") or Config.DEFAULT_MODEL
623
  url = apply_values_to_template(prov.get("BASE_URL", ""), values)
624
- headers = {k: (v if isinstance(v, str) else str(v)) for k, v in (self.client._client.headers.items() if hasattr(self.client, "_client") else {})}
625
- # properly prepare headers using provider template
626
- headers = self.client._client.headers.copy() if hasattr(self.client, "_client") else {}
627
- headers.update(self.client._client.headers if hasattr(self.client, "_client") else {})
628
- headers = self.client._client.headers.copy() if hasattr(self.client, "_client") else {}
629
- # use the provider header template (fill placeholders)
630
- headers = self.client._client.headers.copy() if hasattr(self.client, "_client") else {}
631
- headers = {}
632
- for k, v in prov.get("HEADERS", {}).items():
633
- f = apply_values_to_template(v, values)
634
- if f is None:
635
- continue
636
- headers[k] = f if isinstance(f, str) else str(f)
637
  payload = apply_values_to_template(prov.get("PAYLOAD", {}), values)
638
  return {"url": url, "headers": headers, "payload": payload}
639
 
640
  async def generate(self, req: ChatRequest) -> str:
641
  data = self.build_request_for_provider(req)
642
- try:
643
- result = await self.client.post_json(data["url"], data["headers"], data["payload"])
644
- except Exception as e:
645
- logger.exception("Upstream generate error: %s", e)
646
- raise
647
  try:
648
  return result["choices"][0]["message"]["content"]
649
  except Exception:
650
  if isinstance(result, dict) and "response" in result:
651
  return result["response"]
652
- try:
653
- return orjson.dumps(result).decode("utf-8")
654
- except Exception:
655
- return str(result)
656
 
657
  async def generate_stream(self, req: ChatRequest) -> AsyncGenerator[bytes, None]:
658
  data = self.build_request_for_provider(req)
659
  async for token_bytes in self.client.stream_post(data["url"], data["headers"], data["payload"]):
660
  yield token_bytes
661
 
662
- # -------------------------
663
- # FastAPI app and lifecycle
664
- # -------------------------
665
  app = FastAPI(title="High-speed Chat Proxy")
666
- service: Optional[ChatService] = None
667
-
668
- @app.on_event("startup")
669
- async def startup_event():
670
- global service
671
- logger.info("Starting application and initializing upstream client")
672
- try:
673
- upstream_client = AsyncUpstreamClient()
674
- service = ChatService(client=upstream_client)
675
- logger.info("Upstream client initialized")
676
- except Exception as e:
677
- logger.exception("Failed to initialize upstream client: %s", e)
678
- # Let the exception bubble so the server fails fast and logs the problem.
679
- raise
680
 
681
  @app.on_event("shutdown")
682
  async def shutdown_event():
683
- global service
684
- logger.info("Shutting down application and closing upstream client")
685
- if service and getattr(service, "client", None):
686
- try:
687
- await service.client.close()
688
- logger.info("Upstream client closed")
689
- except Exception:
690
- logger.exception("Error closing upstream client")
691
-
692
- # -------------------------
693
- # Endpoints
694
- # -------------------------
695
- @app.get("/")
696
- async def root():
697
- return {"service": "High-speed Chat Proxy", "status": "running"}
698
-
699
- @app.get("/health")
700
- async def health():
701
- return JSONResponse({"status": "ok"})
702
-
703
- @app.get("/v1/models")
704
- async def models():
705
- return {"models": MODEL_NAMES, "default_model": Config.DEFAULT_MODEL}
706
 
707
  @app.post("/v1/chat/completions")
708
  async def completions(request: Request):
709
- try:
710
- body = await request.json()
711
- except Exception:
712
- raise HTTPException(status_code=400, detail="Invalid JSON body")
713
  req = ChatRequest.from_dict(body)
714
  if not req.api_key or not req.messages:
715
  raise HTTPException(status_code=400, detail="api_key and messages required")
716
 
717
- if service is None:
718
- raise HTTPException(status_code=503, detail="service not ready")
719
-
720
  async def streamer():
721
- # streaming path
722
  if req.stream:
723
  buf = bytearray()
724
  threshold = Config.STREAM_BATCH_BYTES
725
- try:
726
- async for chunk_bytes in service.generate_stream(req):
727
- if not chunk_bytes:
728
- continue
729
- buf.extend(chunk_bytes)
730
- if threshold > 0 and len(buf) >= threshold:
731
- yield bytes(buf)
732
- buf.clear()
733
- if buf:
734
  yield bytes(buf)
735
- except Exception as e:
736
- logger.exception("Stream error: %s", e)
737
- # yield an error as JSON so client sees it
738
- try:
739
- yield orjson.dumps({"error": "upstream stream error", "detail": str(e)} ) + b"\n"
740
- except Exception:
741
- pass
742
  else:
743
- # non-streaming: call generate and return NDJSON
744
- try:
745
- text = await service.generate(req)
746
- yield orjson.dumps({"response": text}) + b"\n"
747
- except Exception as e:
748
- logger.exception("Generate error: %s", e)
749
- yield orjson.dumps({"error": "upstream generate error", "detail": str(e)}) + b"\n"
750
 
751
  return StreamingResponse(streamer(), media_type="application/x-ndjson", headers={"Cache-Control": "no-cache"})
752
 
753
- # -------------------------
754
- # If run directly (use single process)
755
- # -------------------------
756
- if __name__ == "__main__":
757
- import uvicorn
758
- # IMPORTANT: run with a single process in platforms like Spaces. Do NOT use --workers there.
759
- uvicorn.run("server:app", host="0.0.0.0", port=8000, loop="asyncio", log_level="info", access_log=True)
 
3
  import re
4
  import orjson
5
  import httpx
 
 
6
  from fastapi import FastAPI, Request, HTTPException
7
+ from fastapi.responses import StreamingResponse
8
+
9
 
 
 
 
 
 
10
 
11
 
12
 
 
278
  return R
279
 
280
 
281
+
282
+
283
+
284
  try:
285
+ MODEL_NAMES = {"GROQ" : GROQM() , "LLMC" : LLMCM()}
286
  except Exception:
287
  MODEL_NAMES = {"GROQ": "GROQ-FALLBACK", "LLMC": "LLMC-FALLBACK"}
288
 
289
+
 
 
290
  class Config:
291
  DEFAULT_PROVIDER = "GROQ"
292
  DEFAULT_MODEL = "llama-3.3-70b-versatile"
293
  DEFAULT_MAX_TOKENS = 512
294
  DEFAULT_TEMPERATURE = 0.7
295
  CHUNK_SIZE = 1000
296
+ MAX_CONNECTIONS = 200
297
+ HTTP2 = True
298
  TIMEOUT = 30.0
299
+ STREAM_BATCH_BYTES = 0
300
 
 
 
 
301
  PROVIDERS: Dict[str, Dict[str, Any]] = {
302
  "GROQ": {
303
  "AUTH": True,
304
  "BASE_URL": "https://api.groq.com/openai/v1/chat/completions",
305
  "DEFAULT_MODEL": "qwen/qwen3-32b",
306
  "HEADERS": {"Authorization": "Bearer {API}", "Content-Type": "application/json"},
307
+ "PAYLOAD": {
308
+ "model": "{model}",
309
+ "messages": "{messages}",
310
+ "temperature": "{temperature}",
311
+ "max_tokens": "{max_tokens}",
312
+ "stop": None,
313
+ "stream": "{stream}",
314
+ },
315
  },
316
  "LLMC": {
317
  "AUTH": True,
318
  "BASE_URL": "https://llmchat.in/inference/stream?model={model}",
319
  "DEFAULT_MODEL": "@cf/meta/llama-3.1-8b-instruct",
320
+ "HEADERS": {
321
+ "Content-Type": "application/json",
322
+ "Accept": "*/*",
323
+ "Origin": "https://llmchat.in",
324
+ "Referer": "https://llmchat.in/",
325
+ },
326
  "PAYLOAD": {"messages": "{messages}", "stream": "{stream}"},
327
  },
328
  }
329
 
 
 
 
330
  _placeholder_re = re.compile(r"\{(.*?)\}")
331
 
332
  def apply_values_to_template(template: Any, values: Dict[str, Any]) -> Any:
 
334
  m = _placeholder_re.fullmatch(template.strip())
335
  if m:
336
  return values.get(m.group(1), template)
 
337
  str_values = {
338
  k: (v if isinstance(v, str) else (orjson.dumps(v).decode("utf-8") if not isinstance(v, (int, float, bool, type(None))) else v))
339
  for k, v in values.items()
 
360
  "stream": req.stream,
361
  }
362
 
 
 
 
363
  @dataclass
364
  class ChatRequest:
365
  api_key: str
 
385
  messages = [messages]
386
  return ChatRequest(api_key=api_key, messages=messages, model=model, provider=provider, max_tokens=max_tokens, temperature=temperature, stream=stream)
387
 
 
 
 
388
  class AsyncUpstreamClient:
389
  def __init__(self):
390
  limits = httpx.Limits(max_connections=Config.MAX_CONNECTIONS)
 
391
  self._client = httpx.AsyncClient(timeout=Config.TIMEOUT, limits=limits, http2=Config.HTTP2)
392
 
393
+ def _prepare_headers(self, headers_template: Dict[str, str], values: Dict[str, Any]) -> Dict[str, str]:
394
+ headers = {}
395
+ for k, v in headers_template.items():
396
+ f = apply_values_to_template(v, values)
397
+ if f is None:
398
+ continue
399
+ headers[k] = f if isinstance(f, str) else str(f)
400
+ return headers
401
+
402
  async def close(self):
403
  await self._client.aclose()
404
 
 
433
  if not chunk:
434
  continue
435
  buf += chunk
 
436
  while b"\n\n" in buf:
437
  event, buf = buf.split(b"\n\n", 1)
438
  for line in event.splitlines():
439
  if not line:
440
  continue
441
+ payload_bytes = line[len(b"data:"):].strip() if line.startswith(b"data:") else (line[line.find(b"data: ") + 6:].strip() if b"data: " in line else line.strip())
 
 
 
 
 
 
442
  if payload_bytes == b"[DONE]":
443
  if reasoning_open:
444
  yield nd({"response": "\n</think>\n"})
 
448
  try:
449
  parsed = orjson.loads(payload_bytes)
450
  except Exception:
451
+ txt = None
452
  try:
453
  txt = payload_bytes.decode("utf-8", errors="ignore")
 
 
454
  except Exception:
455
  pass
456
+ if txt and txt.strip():
457
+ yield nd({"response": txt})
458
  continue
 
 
459
  if isinstance(parsed, dict) and "response" in parsed:
460
  resp_val = parsed.get("response")
461
  if resp_val is None:
462
  continue
463
+ if isinstance(resp_val, (bytes, bytearray)):
464
+ try:
465
+ resp_val = resp_val.decode("utf-8", errors="ignore")
466
+ except Exception:
467
+ continue
468
  inner = None
469
  try:
470
  if isinstance(resp_val, str):
 
478
  else:
479
  yield nd({"response": str(resp_val)})
480
  continue
 
 
481
  if isinstance(parsed, dict) and parsed.get("choices"):
482
  try:
483
  c0 = parsed["choices"][0]
 
507
  continue
508
  except Exception:
509
  pass
 
510
  if isinstance(parsed, dict) and self._is_metadata_blob(parsed):
511
  continue
 
 
512
  try:
513
  yield nd({"response": parsed})
514
  except Exception:
 
516
  yield nd({"response": str(parsed)})
517
  except Exception:
518
  continue
 
 
519
  if buf:
520
  for line in buf.splitlines():
521
  if not line:
522
  continue
523
+ payload_bytes = line[len(b"data:"):].strip() if line.startswith(b"data:") else (line[line.find(b"data: ") + 6:].strip() if b"data: " in line else line.strip())
 
 
 
 
 
 
524
  if payload_bytes == b"[DONE]":
525
  if reasoning_open:
526
  yield orjson.dumps({"response": "\n</think>\n"}) + b"\n"
 
540
  resp_val = parsed.get("response")
541
  if resp_val is None:
542
  continue
543
+ if isinstance(resp_val, (bytes, bytearray)):
544
+ try:
545
+ resp_val = resp_val.decode("utf-8", errors="ignore")
546
+ except Exception:
547
+ continue
548
  inner = None
549
  try:
550
  if isinstance(resp_val, str):
 
594
  except Exception:
595
  continue
596
 
 
 
 
597
  class ChatService:
598
  def __init__(self, client: Optional[AsyncUpstreamClient] = None):
599
  self.client = client or AsyncUpstreamClient()
 
607
  if not values.get("model"):
608
  values["model"] = prov.get("DEFAULT_MODEL") or Config.DEFAULT_MODEL
609
  url = apply_values_to_template(prov.get("BASE_URL", ""), values)
610
+ headers = self.client._prepare_headers(prov.get("HEADERS", {}), values)
 
 
 
 
 
 
 
 
 
 
 
 
611
  payload = apply_values_to_template(prov.get("PAYLOAD", {}), values)
612
  return {"url": url, "headers": headers, "payload": payload}
613
 
614
  async def generate(self, req: ChatRequest) -> str:
615
  data = self.build_request_for_provider(req)
616
+ result = await self.client.post_json(data["url"], data["headers"], data["payload"])
 
 
 
 
617
  try:
618
  return result["choices"][0]["message"]["content"]
619
  except Exception:
620
  if isinstance(result, dict) and "response" in result:
621
  return result["response"]
622
+ return orjson.dumps(result).decode("utf-8")
 
 
 
623
 
624
  async def generate_stream(self, req: ChatRequest) -> AsyncGenerator[bytes, None]:
625
  data = self.build_request_for_provider(req)
626
  async for token_bytes in self.client.stream_post(data["url"], data["headers"], data["payload"]):
627
  yield token_bytes
628
 
 
 
 
629
  app = FastAPI(title="High-speed Chat Proxy")
630
+ service = ChatService()
 
 
 
 
 
 
 
 
 
 
 
 
 
631
 
632
  @app.on_event("shutdown")
633
  async def shutdown_event():
634
+ try:
635
+ await service.client.close()
636
+ except Exception:
637
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
638
 
639
  @app.post("/v1/chat/completions")
640
  async def completions(request: Request):
641
+ body = await request.json()
 
 
 
642
  req = ChatRequest.from_dict(body)
643
  if not req.api_key or not req.messages:
644
  raise HTTPException(status_code=400, detail="api_key and messages required")
645
 
 
 
 
646
  async def streamer():
 
647
  if req.stream:
648
  buf = bytearray()
649
  threshold = Config.STREAM_BATCH_BYTES
650
+ async for chunk_bytes in service.generate_stream(req):
651
+ if not chunk_bytes:
652
+ continue
653
+ buf.extend(chunk_bytes)
654
+ if len(buf) >= threshold:
 
 
 
 
655
  yield bytes(buf)
656
+ buf.clear()
657
+ if buf:
658
+ yield bytes(buf)
 
 
 
 
659
  else:
660
+ text = await service.generate(req)
661
+ yield orjson.dumps({"response": text}) + b"\n"
 
 
 
 
 
662
 
663
  return StreamingResponse(streamer(), media_type="application/x-ndjson", headers={"Cache-Control": "no-cache"})
664
 
665
+ @app.get("/v1/models")
666
+ async def models():
667
+ return {"models": MODEL_NAMES, "default_model": Config.DEFAULT_MODEL}
668
+
669
+ @app.get("/")
670
+ async def root():
671
+ return {"service": "High-speed Chat Proxy", "status": "running"}