""" NEXUS — Model Router & Inference Gateway OpenAI-compatible API with intelligent routing across 3 providers. Providers (priority order): 1. ki_fusion — ki-fusion-labs.de/v1 (primary, LM Studio via PHP) 2. hf_api — HuggingFace Serverless Inference (HF_TOKEN env) 3. local_cpu — transformers, Qwen2.5-0.5B on CPU (always-on fallback) Routing decisions based on: task_type : simple / reasoning / planning / code / vision / embedding complexity : 1-10 score from message analysis cost_mode : cheap / balanced / best provider : explicit override MCP tools: nexus_chat, nexus_route_info, nexus_stats, nexus_models, nexus_health """ import os, uuid, json, asyncio, time, re, math, logging from pathlib import Path from datetime import datetime, timezone from typing import Optional, AsyncGenerator import httpx from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, HTMLResponse, StreamingResponse logging.basicConfig(level=logging.INFO) log = logging.getLogger("nexus") BASE = Path(__file__).parent STATS_FILE = BASE / "stats.json" # ── Env ─────────────────────────────────────────────────────────── KF_BASE = os.environ.get("KI_FUSION_URL", "https://ki-fusion-labs.de/v1") KF_KEY = os.environ.get("KI_FUSION_KEY", "") HF_TOKEN = os.environ.get("HF_TOKEN", "") HF_BASE = "https://api-inference.huggingface.co/models" HF_OAI = "https://api-inference.huggingface.co/v1" # ── Model catalogue ─────────────────────────────────────────────── # ki_fusion uses whatever LM Studio has loaded — model name is configurable KF_DEFAULT_MODEL = os.environ.get("KF_MODEL", "lm-studio") HF_MODELS = { "simple": "Qwen/Qwen2.5-7B-Instruct", "reasoning": "meta-llama/Llama-3.1-8B-Instruct", "planning": "mistralai/Mistral-7B-Instruct-v0.3", "code": "Qwen/Qwen2.5-Coder-7B-Instruct", "vision": "Qwen/Qwen2.5-VL-7B-Instruct", "fast": "Qwen/Qwen2.5-0.5B-Instruct", } LOCAL_MODEL_ID = os.environ.get("LOCAL_MODEL", "Qwen/Qwen2.5-0.5B-Instruct") # ── Stats ───────────────────────────────────────────────────────── def load_stats(): if STATS_FILE.exists(): try: return json.loads(STATS_FILE.read_text()) except: pass return { "total_requests": 0, "by_provider": {"ki_fusion":{"ok":0,"fail":0,"total_ms":0,"tokens":0}, "hf_api": {"ok":0,"fail":0,"total_ms":0,"tokens":0}, "local_cpu":{"ok":0,"fail":0,"total_ms":0,"tokens":0}}, "by_task": {"simple":0,"reasoning":0,"planning":0,"code":0,"vision":0,"embedding":0,"unknown":0}, "by_cost": {"cheap":0,"balanced":0,"best":0}, "recent": [], # last 20 routing decisions } def save_stats(s): STATS_FILE.write_text(json.dumps(s, indent=2)) STATS = load_stats() def record(provider, task, cost_mode, ok, ms, tokens, model, reason): STATS["total_requests"] += 1 p = STATS["by_provider"][provider] if ok: p["ok"] += 1 else: p["fail"] += 1 p["total_ms"] += ms p["tokens"] += tokens STATS["by_task"][task] = STATS["by_task"].get(task, 0) + 1 STATS["by_cost"][cost_mode] = STATS["by_cost"].get(cost_mode, 0) + 1 entry = { "id": uuid.uuid4().hex[:8], "ts": int(time.time()), "provider": provider, "model": model, "task": task, "cost_mode": cost_mode, "ok": ok, "ms": ms, "tokens": tokens, "reason": reason } STATS["recent"] = ([entry] + STATS["recent"])[:20] save_stats(STATS) # ── Task classifier ─────────────────────────────────────────────── TASK_PATTERNS = { "vision": [r"\bimage\b",r"\bscreenshot\b",r"\bphoto\b",r"\bpicture\b",r"\bdescribe.{0,20}image\b",r"\bvision\b"], "code": [r"\bcode\b",r"\bfunction\b",r"\bclass\b",r"\bdebug\b",r"\bimplements?\b",r"\bpython\b", r"\bjavascript\b",r"\brefactor\b",r"\bscript\b",r"\bbug\b",r"\bsyntax\b"], "reasoning": [r"\bwhy\b",r"\bexplain\b",r"\banalyze\b",r"\banalyse\b",r"\breason\b", r"\bprove\b",r"\bcompare\b",r"\bdifference\b",r"\badvantages?\b",r"\bthink\b"], "planning": [r"\bplan\b",r"\bstrategy\b",r"\bsteps?\b",r"\broadmap\b",r"\bschedule\b", r"\bprioritize\b",r"\bworkflow\b",r"\barchitecture\b",r"\bdesign\b"], "embedding": [r"\bembed\b",r"\bvector\b",r"\bsimilarity\b",r"\bsemantic\b",r"\bencod"], "simple": [], # fallthrough } def classify_task(messages: list) -> str: text = " ".join( m.get("content","") if isinstance(m.get("content"), str) else " ".join(c.get("text","") for c in m.get("content",[]) if isinstance(c,dict)) for m in messages ).lower() # Vision: check for image content blocks for m in messages: if isinstance(m.get("content"), list): for c in m["content"]: if isinstance(c, dict) and c.get("type") == "image_url": return "vision" for task, patterns in TASK_PATTERNS.items(): if task == "simple": continue for p in patterns: if re.search(p, text): return task return "simple" def score_complexity(messages: list) -> int: text = " ".join( m.get("content","") if isinstance(m.get("content"), str) else "" for m in messages ) score = 1 words = len(text.split()) if words > 50: score += 1 if words > 150: score += 1 if words > 400: score += 2 # Multi-step indicators if re.search(r"\bstep\s*\d|first.*then.*finally|\d+\.\s+", text.lower()): score += 1 # Technical density tech_words = ["algorithm","optimization","architecture","implement","integrate", "distributed","concurrent","neural","transformer","gradient","latency"] hits = sum(1 for w in tech_words if w in text.lower()) score += min(hits, 3) # Question count score += min(text.count("?"), 2) return min(score, 10) # ── Provider health ─────────────────────────────────────────────── provider_health = {"ki_fusion": True, "hf_api": bool(HF_TOKEN), "local_cpu": True} # Real model ID discovered from LM Studio at startup (or falls back to KF_DEFAULT_MODEL) _kf_actual_model: str = KF_DEFAULT_MODEL async def probe_ki_fusion() -> bool: """Test ki_fusion with a minimal chat completion — /v1/models may not be available. Uses KF_MODEL env var as the model name (set it to your actual loaded model id). """ global _kf_actual_model _kf_actual_model = KF_DEFAULT_MODEL # always use configured name, no discovery try: headers = {"Content-Type": "application/json"} if KF_KEY: headers["Authorization"] = f"Bearer {KF_KEY}" payload = { "model": KF_DEFAULT_MODEL, "messages": [{"role": "user", "content": "ping"}], "max_tokens": 1, "temperature": 0.0, } async with httpx.AsyncClient(timeout=httpx.Timeout(None, connect=6.0, read=15.0), verify=False) as c: r = await c.post(f"{KF_BASE}/chat/completions/", headers=headers, json=payload) if r.status_code < 400: log.info(f"[NEXUS] ki_fusion online ✓ model={_kf_actual_model} url={KF_BASE}") provider_health["ki_fusion"] = True return True else: log.warning(f"[NEXUS] ki_fusion probe HTTP {r.status_code}: {r.text[:120]}") provider_health["ki_fusion"] = False return False except Exception as e: log.warning(f"[NEXUS] ki_fusion probe failed: {e}") provider_health["ki_fusion"] = False return False async def ki_fusion_watchdog(): """Background task: probe ki_fusion every 30s to auto-recover after outages.""" while True: await asyncio.sleep(30) was_ok = provider_health["ki_fusion"] now_ok = await probe_ki_fusion() if not was_ok and now_ok: log.info("[NEXUS] ki_fusion recovered — back online") elif was_ok and not now_ok: log.warning("[NEXUS] ki_fusion went offline") # ── Local CPU model (lazy) ──────────────────────────────────────── _local_pipe = None _local_loading = False def get_local_pipe(): global _local_pipe, _local_loading if _local_pipe is not None: return _local_pipe if _local_loading: return None _local_loading = True try: from transformers import pipeline log.info(f"Loading local model {LOCAL_MODEL_ID} on CPU...") _local_pipe = pipeline( "text-generation", model=LOCAL_MODEL_ID, device="cpu", torch_dtype="auto", max_new_tokens=512, ) log.info("Local model loaded.") except Exception as e: log.warning(f"Local model load failed: {e}") _local_pipe = None provider_health["local_cpu"] = False finally: _local_loading = False return _local_pipe # Pre-warm in background async def warm_local(): await asyncio.sleep(5) loop = asyncio.get_event_loop() await loop.run_in_executor(None, get_local_pipe) # ── Router ──────────────────────────────────────────────────────── def select_provider_and_model(task: str, complexity: int, cost_mode: str, force_provider: str = "") -> tuple[str,str,str]: """Returns (provider, model, reason)""" # Explicit override if force_provider and provider_health.get(force_provider, False): model = _kf_actual_model if force_provider=="ki_fusion" else HF_MODELS.get(task, HF_MODELS["simple"]) if force_provider == "local_cpu": model = LOCAL_MODEL_ID return force_provider, model, f"explicit override to {force_provider}" # Vision always -> HF (vision models) if task == "vision": if provider_health["hf_api"]: return "hf_api", HF_MODELS["vision"], "vision task -> HF Qwen2.5-VL" if provider_health["ki_fusion"]: return "ki_fusion", _kf_actual_model, "vision fallback -> ki_fusion" # Embedding -> HF if task == "embedding": if provider_health["hf_api"]: return "hf_api", "sentence-transformers/all-MiniLM-L6-v2", "embedding -> HF sentence-transformers" # Cost mode: cheap -> prefer HF or local if cost_mode == "cheap": if task == "simple" and complexity <= 4: if provider_health["hf_api"]: return "hf_api", HF_MODELS["fast"], f"cheap+simple(c={complexity}) -> HF fast" if provider_health["local_cpu"]: return "local_cpu", LOCAL_MODEL_ID, f"cheap+simple -> local CPU" # Cost mode: best -> ki_fusion first (your own GPU) if cost_mode == "best": if provider_health["ki_fusion"]: return "ki_fusion", _kf_actual_model, f"best mode -> ki_fusion (LM Studio)" # Balanced routing by task + complexity if task in ("planning",) and complexity >= 6: if provider_health["ki_fusion"]: return "ki_fusion", _kf_actual_model, f"planning+complex(c={complexity}) -> ki_fusion" if task == "code": if provider_health["ki_fusion"]: return "ki_fusion", _kf_actual_model, f"code task -> ki_fusion (LM Studio)" if provider_health["hf_api"]: return "hf_api", HF_MODELS["code"], "code -> HF Qwen2.5-Coder" if task == "reasoning" and complexity >= 7: if provider_health["ki_fusion"]: return "ki_fusion", _kf_actual_model, f"hard reasoning(c={complexity}) -> ki_fusion" # Default balanced: HF for most tasks (free tier, good quality) if provider_health["hf_api"]: hf_model = HF_MODELS.get(task, HF_MODELS["simple"]) return "hf_api", hf_model, f"{task}(c={complexity}) -> HF {hf_model.split('/')[-1]}" # Fallback: ki_fusion if provider_health["ki_fusion"]: return "ki_fusion", _kf_actual_model, f"fallback -> ki_fusion" # Last resort: local CPU return "local_cpu", LOCAL_MODEL_ID, "last resort -> local CPU" # ── Inference calls ─────────────────────────────────────────────── async def call_ki_fusion(messages, model, max_tokens=1024, temperature=0.7, stream=False): headers = {"Content-Type":"application/json"} if KF_KEY: headers["Authorization"] = f"Bearer {KF_KEY}" payload = {"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream} # verify=False: ki-fusion-labs.de SSL cert may be expired (self-hosted). # Fast-fail connect: 6s tells us immediately if your server is off. timeout = httpx.Timeout(None, connect=6.0, read=90.0, write=10.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout, verify=False) as client: if stream: async with client.stream("POST", f"{KF_BASE}/chat/completions/", headers=headers, json=payload) as resp: resp.raise_for_status() async for chunk in resp.aiter_bytes(): yield chunk else: r = await client.post(f"{KF_BASE}/chat/completions/", headers=headers, json=payload) r.raise_for_status() yield r.json() async def call_hf_api(messages, model, max_tokens=1024, temperature=0.7, stream=False): if not HF_TOKEN: raise Exception("HF_TOKEN not set") headers = {"Authorization": f"Bearer {HF_TOKEN}", "Content-Type":"application/json"} # HF OpenAI-compatible endpoint payload = {"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": stream} url = f"{HF_OAI}/chat/completions" async with httpx.AsyncClient(timeout=90) as client: if stream: async with client.stream("POST", url, headers=headers, json=payload) as resp: resp.raise_for_status() async for chunk in resp.aiter_bytes(): yield chunk else: r = await client.post(url, headers=headers, json=payload) r.raise_for_status() yield r.json() async def call_local_cpu(messages, model, max_tokens=512, temperature=0.7, stream=False): loop = asyncio.get_event_loop() # Bug fix: if model is still loading (_local_loading=True), wait up to 90s # instead of failing immediately. This is the guaranteed last-resort provider. waited = 0 while _local_loading and waited < 90: log.info(f"[local_cpu] Model still loading, waiting… ({waited}s)") await asyncio.sleep(3) waited += 3 # If not loaded yet, trigger a load attempt now (synchronously in thread) if not _local_pipe and not _local_loading: log.info("[local_cpu] Triggering model load now (first request)") await loop.run_in_executor(None, get_local_pipe) def _run(): pipe = get_local_pipe() if not pipe: raise Exception("Local model not available — transformers load failed. Check logs for OOM or missing dependencies.") # Build prompt from messages chat_messages = [{"role": m.get("role","user"), "content": m.get("content","") if isinstance(m.get("content"), str) else ""} for m in messages] result = pipe(chat_messages, max_new_tokens=max_tokens, do_sample=temperature > 0, temperature=max(temperature, 0.01), pad_token_id=pipe.tokenizer.eos_token_id) if result and result[0]: generated = result[0].get("generated_text", "") if isinstance(generated, list): # Chat format: last message is the new assistant response last = generated[-1] if generated else {} content = last.get("content","") if isinstance(last, dict) else str(last) else: content = str(generated) # Strip prompt echo prompt_text = " ".join(m.get("content","") for m in messages if isinstance(m.get("content"),str)) if content.startswith(prompt_text): content = content[len(prompt_text):].strip() return content return "" content = await loop.run_in_executor(None, _run) response = { "id": f"local-{uuid.uuid4().hex[:8]}", "object": "chat.completion", "created": int(time.time()), "model": LOCAL_MODEL_ID, "choices": [{"index":0,"message":{"role":"assistant","content":content}, "finish_reason":"stop"}], "usage": {"prompt_tokens": 0, "completion_tokens": len(content.split()), "total_tokens": len(content.split())} } yield response # ── Core route function ─────────────────────────────────────────── async def route_inference(messages: list, max_tokens: int = 1024, temperature: float = 0.7, cost_mode: str = "balanced", force_provider: str = "", force_model: str = "", stream: bool = False): task = classify_task(messages) complexity = score_complexity(messages) provider, model, reason = select_provider_and_model(task, complexity, cost_mode, force_provider) if force_model: model = force_model t0 = time.time() tokens = 0 ok = True tried = [] providers_to_try = [provider] # Build fallback chain: ki_fusion -> hf_api can be skipped if health=False, # but local_cpu is ALWAYS added last — it's the guaranteed offline fallback. for fb in ["ki_fusion", "hf_api"]: if fb not in providers_to_try and provider_health.get(fb, True): providers_to_try.append(fb) # local_cpu: always last, always tried — never skip it if "local_cpu" not in providers_to_try: providers_to_try.append("local_cpu") last_err = None for p in providers_to_try: tried.append(p) try: fb_model = model if p == "ki_fusion": caller = call_ki_fusion elif p == "hf_api": caller = call_hf_api; fb_model = HF_MODELS.get(task, HF_MODELS["simple"]) else: caller = call_local_cpu; fb_model = LOCAL_MODEL_ID if p != provider: reason += f" | fallback to {p}" if stream: async def _stream_gen(): async for chunk in caller(messages, fb_model, max_tokens, temperature, stream=True): yield chunk ms = int((time.time()-t0)*1000) record(p, task, cost_mode, True, ms, 0, fb_model, reason) return { "_stream": True, "_gen": _stream_gen(), "_meta": {"provider":p,"model":fb_model,"task":task, "complexity":complexity,"reason":reason} } result = None async for r in caller(messages, fb_model, max_tokens, temperature, stream=False): result = r break ms = int((time.time()-t0)*1000) if isinstance(result, dict): tokens = result.get("usage",{}).get("total_tokens", 0) result.setdefault("_nexus", {}) result["_nexus"] = {"provider":p,"model":fb_model,"task":task, "complexity":complexity,"reason":reason, "latency_ms":ms,"fallback_chain":tried} record(p, task, cost_mode, True, ms, tokens, fb_model, reason) return result except Exception as e: last_err = str(e) log.error(f"[NEXUS] Provider '{p}' FAILED: {last_err}") # Mark unhealthy — watchdog will re-probe every 30s and restore when live again if p != "local_cpu": provider_health[p] = False ok = False ms = int((time.time()-t0)*1000) record(tried[-1] if tried else "none", task, cost_mode, False, ms, 0, model, reason) raise HTTPException(503, f"All providers failed. Last error: {last_err}") # ── FastAPI ─────────────────────────────────────────────────────── app = FastAPI(title="NEXUS Model Router") @app.on_event("startup") async def startup(): asyncio.create_task(warm_local()) # Probe ki_fusion immediately — discover actual model, set health state asyncio.create_task(probe_ki_fusion()) # Keep probing every 30s so recovery after outage is automatic asyncio.create_task(ki_fusion_watchdog()) def jresp(data, status=200): return JSONResponse(content=data, status_code=status) # ── OpenAI-compatible endpoints ─────────────────────────────────── @app.post("/v1/chat/completions") async def oai_chat(request: Request): body = await request.json() messages = body.get("messages", []) max_tokens = body.get("max_tokens", 1024) temperature = body.get("temperature", 0.7) stream = body.get("stream", False) cost_mode = body.get("cost_mode", "balanced") # nexus extension force_prov = body.get("provider", "") # nexus extension force_model = body.get("model", "") # Detect if model is actually a provider name if force_model in ("ki_fusion","hf_api","local_cpu"): force_prov = force_model; force_model = "" result = await route_inference(messages, max_tokens, temperature, cost_mode, force_prov, force_model, stream) if isinstance(result, dict) and result.get("_stream"): return StreamingResponse(result["_gen"], media_type="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no", "X-Nexus-Provider": result["_meta"]["provider"], "X-Nexus-Task": result["_meta"]["task"]}) # Add headers for transparency # HTTP headers must be latin-1 — strip any unicode (e.g. -> arrows in reason strings) def h(v): return str(v).encode('latin-1', errors='replace').decode('latin-1') return JSONResponse(content=result, headers={ "X-Nexus-Provider": h(result.get("_nexus",{}).get("provider","")), "X-Nexus-Task": h(result.get("_nexus",{}).get("task","")), "X-Nexus-Reason": h(result.get("_nexus",{}).get("reason","")[:120]), }) @app.get("/v1/models") async def oai_models(): models = [ {"id":"nexus-auto","object":"model","owned_by":"nexus","description":"Auto-routed"}, {"id":"nexus-cheap","object":"model","owned_by":"nexus","description":"Cost-optimized routing"}, {"id":"nexus-best","object":"model","owned_by":"nexus","description":"Best-quality routing"}, {"id":"ki_fusion","object":"model","owned_by":"ki-fusion-labs","description":f"Primary LM Studio ({_kf_actual_model})"}, {"id":_kf_actual_model,"object":"model","owned_by":"ki-fusion-labs","description":"Ki-Fusion loaded model"}, {"id":"hf_api","object":"model","owned_by":"huggingface","description":"HF Serverless Inference"}, {"id":"local_cpu","object":"model","owned_by":"local","description":f"Local CPU: {LOCAL_MODEL_ID}"}, ] for k,v in HF_MODELS.items(): models.append({"id":v,"object":"model","owned_by":"huggingface","description":f"HF {k} model"}) return jresp({"object":"list","data":models}) # ── Nexus-specific API ──────────────────────────────────────────── @app.post("/api/route") async def api_route(request: Request): """Route with full metadata returned""" body = await request.json() messages = body.get("messages",[{"role":"user","content":body.get("prompt","")}]) result = await route_inference( messages, body.get("max_tokens",512), body.get("temperature",0.7), body.get("cost_mode","balanced"), body.get("provider",""), ) return jresp(result) @app.post("/api/classify") async def api_classify(request: Request): body = await request.json() messages = body.get("messages",[{"role":"user","content":body.get("prompt","")}]) task = classify_task(messages) complexity = score_complexity(messages) provider, model, reason = select_provider_and_model(task, complexity, body.get("cost_mode","balanced")) return jresp({"task":task,"complexity":complexity, "selected_provider":provider,"selected_model":model,"reason":reason}) @app.get("/api/stats") async def api_stats(): s = STATS.copy() # Compute avg latencies for p, d in s["by_provider"].items(): total = d["ok"] + d["fail"] d["total"] = total d["success_rate"] = round(d["ok"]/total*100,1) if total else 0 d["avg_ms"] = round(d["total_ms"]/d["ok"],0) if d["ok"] else 0 return jresp(s) @app.get("/api/health") async def api_health(): checks = {} # ki_fusion: ping via chat/completions (v1/models may not be available on PHP proxy) try: headers = {"Content-Type": "application/json"} if KF_KEY: headers["Authorization"] = f"Bearer {KF_KEY}" payload = {"model": _kf_actual_model, "messages": [{"role":"user","content":"ping"}], "max_tokens": 1, "temperature": 0.0} async with httpx.AsyncClient(timeout=httpx.Timeout(None, connect=6.0, read=8.0), verify=False) as c: r = await c.post(f"{KF_BASE}/chat/completions", headers=headers, json=payload) checks["ki_fusion"] = { "ok": r.status_code < 400, "http_status": r.status_code, "model": _kf_actual_model, "url": KF_BASE, "error": r.text[:120] if r.status_code >= 400 else None, } provider_health["ki_fusion"] = r.status_code < 400 except Exception as e: checks["ki_fusion"] = {"ok": False, "error": str(e)[:120], "model": _kf_actual_model} provider_health["ki_fusion"] = False # HF checks["hf_api"] = {"ok": bool(HF_TOKEN), "status": "token configured" if HF_TOKEN else "HF_TOKEN not set"} # Local checks["local_cpu"] = {"ok": _local_pipe is not None, "status": "loaded" if _local_pipe else ("loading" if _local_loading else "not loaded")} return jresp(checks) @app.post("/api/providers/{provider}/toggle") async def toggle_provider(provider: str, request: Request): if provider not in provider_health: raise HTTPException(404) body = await request.json() provider_health[provider] = body.get("enabled", not provider_health[provider]) return jresp({"provider":provider,"enabled":provider_health[provider]}) # ── MCP ─────────────────────────────────────────────────────────── MCP_TOOLS = [ {"name":"nexus_chat","description":"Send a chat completion through the NEXUS router. Auto-selects best provider.", "inputSchema":{"type":"object","required":["messages"],"properties":{ "messages": {"type":"array","items":{"type":"object"}}, "max_tokens": {"type":"integer","default":1024}, "temperature":{"type":"number","default":0.7}, "cost_mode": {"type":"string","enum":["cheap","balanced","best"],"default":"balanced"}, "provider": {"type":"string","enum":["","ki_fusion","hf_api","local_cpu"]}, }}}, {"name":"nexus_route_info","description":"Predict routing for a prompt without running inference.", "inputSchema":{"type":"object","required":["prompt"],"properties":{ "prompt": {"type":"string"}, "cost_mode": {"type":"string","default":"balanced"}, }}}, {"name":"nexus_stats","description":"Get routing statistics and provider performance.", "inputSchema":{"type":"object","properties":{}}}, {"name":"nexus_models","description":"List all available models and providers.", "inputSchema":{"type":"object","properties":{}}}, {"name":"nexus_health","description":"Check provider health and availability.", "inputSchema":{"type":"object","properties":{}}}, ] async def mcp_call(name, args): if name == "nexus_chat": result = await route_inference( args["messages"], args.get("max_tokens",1024), args.get("temperature",0.7), args.get("cost_mode","balanced"), args.get("provider","")) return json.dumps(result) if name == "nexus_route_info": msgs = [{"role":"user","content":args["prompt"]}] task = classify_task(msgs); comp = score_complexity(msgs) p, m, r = select_provider_and_model(task, comp, args.get("cost_mode","balanced")) return json.dumps({"task":task,"complexity":comp,"provider":p,"model":m,"reason":r}) if name == "nexus_stats": return json.dumps(STATS) if name == "nexus_models": return json.dumps({"hf_models":HF_MODELS,"local_model":LOCAL_MODEL_ID, "ki_fusion_model":_kf_actual_model}) if name == "nexus_health": return json.dumps(provider_health) return json.dumps({"error":f"unknown: {name}"}) @app.get("/mcp/sse") async def mcp_sse(): async def stream(): init = {"jsonrpc":"2.0","method":"notifications/initialized", "params":{"serverInfo":{"name":"nexus-router","version":"1.0"}, "capabilities":{"tools":{}}}} yield f"data: {json.dumps(init)}\n\n" await asyncio.sleep(0.1) yield f"data: {json.dumps({'jsonrpc':'2.0','method':'notifications/tools/list_changed','params':{}})}\n\n" while True: await asyncio.sleep(25) yield f"data: {json.dumps({'jsonrpc':'2.0','method':'ping'})}\n\n" return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no"}) @app.post("/mcp") async def mcp_rpc(request: Request): body = await request.json() method = body.get("method",""); rid = body.get("id",1) if method == "initialize": return jresp({"jsonrpc":"2.0","id":rid,"result":{ "serverInfo":{"name":"nexus-router","version":"1.0"},"capabilities":{"tools":{}}}}) if method == "tools/list": return jresp({"jsonrpc":"2.0","id":rid,"result":{"tools":MCP_TOOLS}}) if method == "tools/call": p = body.get("params",{}); res = await mcp_call(p.get("name",""), p.get("arguments",{})) return jresp({"jsonrpc":"2.0","id":rid,"result":{"content":[{"type":"text","text":res}]}}) return jresp({"jsonrpc":"2.0","id":rid,"error":{"code":-32601,"message":"not found"}}) # ── SPA ─────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def ui(): return HTMLResponse(content=SPA, media_type="text/html; charset=utf-8") SPA = """
nexus_chat | POST /v1/chat/completions | GET /mcp/sse