Chris4K commited on
Commit
fdd48fc
·
verified ·
1 Parent(s): c17cc08

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +31 -11
main.py CHANGED
@@ -253,7 +253,10 @@ async def call_ki_fusion(messages, model, max_tokens=1024, temperature=0.7, stre
253
  if KF_KEY: headers["Authorization"] = f"Bearer {KF_KEY}"
254
  payload = {"model": model, "messages": messages,
255
  "max_tokens": max_tokens, "temperature": temperature, "stream": stream}
256
- async with httpx.AsyncClient(timeout=60) as client:
 
 
 
257
  if stream:
258
  async with client.stream("POST", f"{KF_BASE}/chat/completions",
259
  headers=headers, json=payload) as resp:
@@ -287,10 +290,22 @@ async def call_hf_api(messages, model, max_tokens=1024, temperature=0.7, stream=
287
 
288
  async def call_local_cpu(messages, model, max_tokens=512, temperature=0.7, stream=False):
289
  loop = asyncio.get_event_loop()
 
 
 
 
 
 
 
 
 
 
 
 
290
  def _run():
291
  pipe = get_local_pipe()
292
  if not pipe:
293
- raise Exception("Local model not available")
294
  # Build prompt from messages
295
  chat_messages = [{"role": m.get("role","user"),
296
  "content": m.get("content","") if isinstance(m.get("content"), str) else ""}
@@ -339,10 +354,14 @@ async def route_inference(messages: list, max_tokens: int = 1024, temperature: f
339
  tried = []
340
 
341
  providers_to_try = [provider]
342
- # Build fallback chain
343
- for fb in ["ki_fusion","hf_api","local_cpu"]:
 
344
  if fb not in providers_to_try and provider_health.get(fb, True):
345
  providers_to_try.append(fb)
 
 
 
346
 
347
  last_err = None
348
  for p in providers_to_try:
@@ -357,11 +376,9 @@ async def route_inference(messages: list, max_tokens: int = 1024, temperature: f
357
  reason += f" | fallback to {p}"
358
 
359
  if stream:
360
- # Streaming: yield raw SSE bytes
361
  async def _stream_gen():
362
  async for chunk in caller(messages, fb_model, max_tokens, temperature, stream=True):
363
  yield chunk
364
- # Return a special marker with the generator
365
  ms = int((time.time()-t0)*1000)
366
  record(p, task, cost_mode, True, ms, 0, fb_model, reason)
367
  return {
@@ -379,7 +396,6 @@ async def route_inference(messages: list, max_tokens: int = 1024, temperature: f
379
  ms = int((time.time()-t0)*1000)
380
  if isinstance(result, dict):
381
  tokens = result.get("usage",{}).get("total_tokens", 0)
382
- # Inject routing metadata into response
383
  result.setdefault("_nexus", {})
384
  result["_nexus"] = {"provider":p,"model":fb_model,"task":task,
385
  "complexity":complexity,"reason":reason,
@@ -389,10 +405,14 @@ async def route_inference(messages: list, max_tokens: int = 1024, temperature: f
389
 
390
  except Exception as e:
391
  last_err = str(e)
392
- log.warning(f"Provider {p} failed: {e}")
393
- provider_health[p] = False
 
 
 
 
 
394
  ok = False
395
- asyncio.get_event_loop().call_later(60, lambda pr=p: provider_health.update({pr: True}))
396
 
397
  ms = int((time.time()-t0)*1000)
398
  record(tried[-1] if tried else "none", task, cost_mode, False, ms, 0, model, reason)
@@ -1149,4 +1169,4 @@ loadLog();
1149
  setInterval(function(){loadLog();},8000);
1150
  </script>
1151
  </body>
1152
- </html>"""
 
253
  if KF_KEY: headers["Authorization"] = f"Bearer {KF_KEY}"
254
  payload = {"model": model, "messages": messages,
255
  "max_tokens": max_tokens, "temperature": temperature, "stream": stream}
256
+ # Fast-fail connect: 6s tells us immediately if your server is off.
257
+ # Read stays at 90s to handle long inference when server IS on.
258
+ timeout = httpx.Timeout(connect=6.0, read=90.0, write=10.0, pool=5.0)
259
+ async with httpx.AsyncClient(timeout=timeout) as client:
260
  if stream:
261
  async with client.stream("POST", f"{KF_BASE}/chat/completions",
262
  headers=headers, json=payload) as resp:
 
290
 
291
  async def call_local_cpu(messages, model, max_tokens=512, temperature=0.7, stream=False):
292
  loop = asyncio.get_event_loop()
293
+ # Bug fix: if model is still loading (_local_loading=True), wait up to 90s
294
+ # instead of failing immediately. This is the guaranteed last-resort provider.
295
+ waited = 0
296
+ while _local_loading and waited < 90:
297
+ log.info(f"[local_cpu] Model still loading, waiting… ({waited}s)")
298
+ await asyncio.sleep(3)
299
+ waited += 3
300
+ # If not loaded yet, trigger a load attempt now (synchronously in thread)
301
+ if not _local_pipe and not _local_loading:
302
+ log.info("[local_cpu] Triggering model load now (first request)")
303
+ await loop.run_in_executor(None, get_local_pipe)
304
+
305
  def _run():
306
  pipe = get_local_pipe()
307
  if not pipe:
308
+ raise Exception("Local model not available — transformers load failed. Check logs for OOM or missing dependencies.")
309
  # Build prompt from messages
310
  chat_messages = [{"role": m.get("role","user"),
311
  "content": m.get("content","") if isinstance(m.get("content"), str) else ""}
 
354
  tried = []
355
 
356
  providers_to_try = [provider]
357
+ # Build fallback chain: ki_fusion → hf_api can be skipped if health=False,
358
+ # but local_cpu is ALWAYS added last — it's the guaranteed offline fallback.
359
+ for fb in ["ki_fusion", "hf_api"]:
360
  if fb not in providers_to_try and provider_health.get(fb, True):
361
  providers_to_try.append(fb)
362
+ # local_cpu: always last, always tried — never skip it
363
+ if "local_cpu" not in providers_to_try:
364
+ providers_to_try.append("local_cpu")
365
 
366
  last_err = None
367
  for p in providers_to_try:
 
376
  reason += f" | fallback to {p}"
377
 
378
  if stream:
 
379
  async def _stream_gen():
380
  async for chunk in caller(messages, fb_model, max_tokens, temperature, stream=True):
381
  yield chunk
 
382
  ms = int((time.time()-t0)*1000)
383
  record(p, task, cost_mode, True, ms, 0, fb_model, reason)
384
  return {
 
396
  ms = int((time.time()-t0)*1000)
397
  if isinstance(result, dict):
398
  tokens = result.get("usage",{}).get("total_tokens", 0)
 
399
  result.setdefault("_nexus", {})
400
  result["_nexus"] = {"provider":p,"model":fb_model,"task":task,
401
  "complexity":complexity,"reason":reason,
 
405
 
406
  except Exception as e:
407
  last_err = str(e)
408
+ # Log full error so it appears in HF logs — key diagnostic info
409
+ log.error(f"[NEXUS] Provider '{p}' FAILED: {last_err}")
410
+ # Don't permanently disable local_cpu — it's the guaranteed fallback.
411
+ # Disabling it means ALL subsequent requests fail until 60s restore.
412
+ if p != "local_cpu":
413
+ provider_health[p] = False
414
+ asyncio.get_event_loop().call_later(60, lambda pr=p: provider_health.update({pr: True}))
415
  ok = False
 
416
 
417
  ms = int((time.time()-t0)*1000)
418
  record(tried[-1] if tried else "none", task, cost_mode, False, ms, 0, model, reason)
 
1169
  setInterval(function(){loadLog();},8000);
1170
  </script>
1171
  </body>
1172
+ </html>"""