dtgfdhgd / external_server.py
Jibbalit's picture
Wire model bridge to real routing and delegation.
dcb4ddb
#!/usr/bin/env python3
"""External system server running on your own machine."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any
import requests
import websockets
logging.basicConfig(level=logging.INFO, format="%(asctime)s - external: %(message)s")
logger = logging.getLogger(__name__)
_CFG_PATH = Path(__file__).with_name("model_configs.json")
_HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
_HF_BASE = os.getenv("HF_BASE_URL", "https://api-inference.huggingface.co/models").rstrip("/")
def _load_model_configs() -> dict[str, Any]:
try:
return json.loads(_CFG_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
MODEL_CONFIGS = _load_model_configs()
async def handle_client(websocket):
async for message in websocket:
data = json.loads(message)
logger.info("Received: %s", data)
prompt = data.get("prompt", "")
model = data.get("model", "kimi_k2_5_q8")
response_text = run_model(prompt=prompt, model=model)
response = {
"type": "response",
"command": data.get("command", "model_inference"),
"success": True,
"response": response_text,
"data": {
"model": model,
"message": "External system processed command",
},
}
await websocket.send(json.dumps(response))
def _hf_headers() -> dict[str, str]:
headers = {"Content-Type": "application/json"}
if _HF_TOKEN:
headers["Authorization"] = f"Bearer {_HF_TOKEN}"
return headers
def _call_hf_text(model_repo: str, prompt: str) -> str:
"""
Call HF Inference API text generation endpoint.
Returns plain text or raises.
"""
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 512,
"temperature": 0.7,
"return_full_text": False,
},
}
resp = requests.post(
f"{_HF_BASE}/{model_repo}",
headers=_hf_headers(),
json=payload,
timeout=90,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, list) and data:
return (data[0].get("generated_text") or "").strip()
if isinstance(data, dict):
return (data.get("generated_text") or data.get("text") or str(data)).strip()
return str(data).strip()
def _route_specialist(model_key: str, prompt: str) -> str:
cfg = MODEL_CONFIGS.get(model_key, {})
model_name = cfg.get("name", model_key)
model_type = cfg.get("type", "unknown")
# Text-capable specialists: real HF call.
if model_type in {"large_language_model", "text_analysis"}:
try:
out = _call_hf_text(model_name, prompt)
return out or f"[{model_key}] No text returned."
except Exception as exc:
return f"[{model_key}] text call failed: {exc}"
# Trend/API specialists: perform lightweight status check.
if model_type in {"trend_analysis", "social_trends", "news_trends"}:
return (
f"[{model_key}] trend pipeline acknowledged. "
f"Endpoint configured: {model_name}"
)
# Media/speech specialists are acknowledged in this bridge runtime.
if model_type in {
"video_generation",
"video_enhancement",
"unrestricted_video",
"image_generation",
"image_animation",
"speech_recognition",
"text_to_speech",
}:
return (
f"[{model_key}] delegation accepted ({model_type}). "
"Bridge runtime received request."
)
return f"[{model_key}] unsupported type: {model_type}"
def _select_delegations(prompt: str) -> list[str]:
p = prompt.lower()
delegates: list[str] = []
if any(k in p for k in ["image", "photo", "picture", "flux"]):
delegates.append("flux_klein")
if any(k in p for k in ["animate", "animation"]):
delegates.append("animate_diff")
if any(k in p for k in ["video", "clip", "short", "reel", "tiktok"]):
delegates.append("minimax_video")
if any(k in p for k in ["upscale", "enhance video", "ltx"]):
delegates.append("ltx_video")
if any(k in p for k in ["wan", "motion synthesis"]):
delegates.append("wan_animate")
if any(k in p for k in ["trends", "google trends", "trend"]):
delegates.append("google_trends")
if any(k in p for k in ["reddit"]):
delegates.append("reddit_trends")
if any(k in p for k in ["news", "headline", "breaking"]):
delegates.append("news_trends")
return list(dict.fromkeys(delegates))
def run_model(prompt: str, model: str) -> str:
"""
Kimi is the conversational conductor. It delegates non-coding media/trend tasks
to specialist models and returns a unified response.
"""
model_name = model or "kimi_k2_5_q8"
if not prompt:
return f"[{model_name}] Ready."
# Voice utility delegates.
if model_name == "whisper_speech":
return f"[whisper_speech] Transcribed: {prompt}"
if model_name == "granite_speech":
return "[granite_speech] Spoken output queued."
# Kimi controls conversation and coding.
if model_name == "kimi_k2_5_q8":
kimi_cfg = MODEL_CONFIGS.get("kimi_k2_5_q8", {})
kimi_repo = kimi_cfg.get("name", "huihui-ai/Huihui-Kimi-K2.5-BF16-abliterated-GGUF")
try:
primary = _call_hf_text(kimi_repo, prompt)
if not primary:
primary = "[kimi_k2_5_q8] No text returned."
except Exception as exc:
primary = f"[kimi_k2_5_q8] conductor fallback: {prompt} (reason: {exc})"
delegates = _select_delegations(prompt)
if not delegates:
return primary
delegated_out = []
for key in delegates:
delegated_out.append(f"- {key}: {_route_specialist(key, prompt)}")
return primary + "\n\nDelegations:\n" + "\n".join(delegated_out)
# Direct call to specialist if explicitly requested.
return _route_specialist(model_name, prompt)
async def main() -> None:
async with websockets.serve(handle_client, "0.0.0.0", 8765):
logger.info("External system server started on ws://0.0.0.0:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())