| |
| """External system server running on your own machine.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| from typing import Any |
|
|
| import requests |
| import websockets |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - external: %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| _CFG_PATH = Path(__file__).with_name("model_configs.json") |
| _HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
| _HF_BASE = os.getenv("HF_BASE_URL", "https://api-inference.huggingface.co/models").rstrip("/") |
|
|
|
|
| def _load_model_configs() -> dict[str, Any]: |
| try: |
| return json.loads(_CFG_PATH.read_text(encoding="utf-8")) |
| except Exception: |
| return {} |
|
|
|
|
| MODEL_CONFIGS = _load_model_configs() |
|
|
|
|
| async def handle_client(websocket): |
| async for message in websocket: |
| data = json.loads(message) |
| logger.info("Received: %s", data) |
| prompt = data.get("prompt", "") |
| model = data.get("model", "kimi_k2_5_q8") |
|
|
| response_text = run_model(prompt=prompt, model=model) |
| response = { |
| "type": "response", |
| "command": data.get("command", "model_inference"), |
| "success": True, |
| "response": response_text, |
| "data": { |
| "model": model, |
| "message": "External system processed command", |
| }, |
| } |
| await websocket.send(json.dumps(response)) |
|
|
|
|
| def _hf_headers() -> dict[str, str]: |
| headers = {"Content-Type": "application/json"} |
| if _HF_TOKEN: |
| headers["Authorization"] = f"Bearer {_HF_TOKEN}" |
| return headers |
|
|
|
|
| def _call_hf_text(model_repo: str, prompt: str) -> str: |
| """ |
| Call HF Inference API text generation endpoint. |
| Returns plain text or raises. |
| """ |
| payload = { |
| "inputs": prompt, |
| "parameters": { |
| "max_new_tokens": 512, |
| "temperature": 0.7, |
| "return_full_text": False, |
| }, |
| } |
| resp = requests.post( |
| f"{_HF_BASE}/{model_repo}", |
| headers=_hf_headers(), |
| json=payload, |
| timeout=90, |
| ) |
| resp.raise_for_status() |
| data = resp.json() |
| if isinstance(data, list) and data: |
| return (data[0].get("generated_text") or "").strip() |
| if isinstance(data, dict): |
| return (data.get("generated_text") or data.get("text") or str(data)).strip() |
| return str(data).strip() |
|
|
|
|
| def _route_specialist(model_key: str, prompt: str) -> str: |
| cfg = MODEL_CONFIGS.get(model_key, {}) |
| model_name = cfg.get("name", model_key) |
| model_type = cfg.get("type", "unknown") |
|
|
| |
| if model_type in {"large_language_model", "text_analysis"}: |
| try: |
| out = _call_hf_text(model_name, prompt) |
| return out or f"[{model_key}] No text returned." |
| except Exception as exc: |
| return f"[{model_key}] text call failed: {exc}" |
|
|
| |
| if model_type in {"trend_analysis", "social_trends", "news_trends"}: |
| return ( |
| f"[{model_key}] trend pipeline acknowledged. " |
| f"Endpoint configured: {model_name}" |
| ) |
|
|
| |
| if model_type in { |
| "video_generation", |
| "video_enhancement", |
| "unrestricted_video", |
| "image_generation", |
| "image_animation", |
| "speech_recognition", |
| "text_to_speech", |
| }: |
| return ( |
| f"[{model_key}] delegation accepted ({model_type}). " |
| "Bridge runtime received request." |
| ) |
|
|
| return f"[{model_key}] unsupported type: {model_type}" |
|
|
|
|
| def _select_delegations(prompt: str) -> list[str]: |
| p = prompt.lower() |
| delegates: list[str] = [] |
| if any(k in p for k in ["image", "photo", "picture", "flux"]): |
| delegates.append("flux_klein") |
| if any(k in p for k in ["animate", "animation"]): |
| delegates.append("animate_diff") |
| if any(k in p for k in ["video", "clip", "short", "reel", "tiktok"]): |
| delegates.append("minimax_video") |
| if any(k in p for k in ["upscale", "enhance video", "ltx"]): |
| delegates.append("ltx_video") |
| if any(k in p for k in ["wan", "motion synthesis"]): |
| delegates.append("wan_animate") |
| if any(k in p for k in ["trends", "google trends", "trend"]): |
| delegates.append("google_trends") |
| if any(k in p for k in ["reddit"]): |
| delegates.append("reddit_trends") |
| if any(k in p for k in ["news", "headline", "breaking"]): |
| delegates.append("news_trends") |
| return list(dict.fromkeys(delegates)) |
|
|
|
|
| def run_model(prompt: str, model: str) -> str: |
| """ |
| Kimi is the conversational conductor. It delegates non-coding media/trend tasks |
| to specialist models and returns a unified response. |
| """ |
| model_name = model or "kimi_k2_5_q8" |
| if not prompt: |
| return f"[{model_name}] Ready." |
|
|
| |
| if model_name == "whisper_speech": |
| return f"[whisper_speech] Transcribed: {prompt}" |
| if model_name == "granite_speech": |
| return "[granite_speech] Spoken output queued." |
|
|
| |
| if model_name == "kimi_k2_5_q8": |
| kimi_cfg = MODEL_CONFIGS.get("kimi_k2_5_q8", {}) |
| kimi_repo = kimi_cfg.get("name", "huihui-ai/Huihui-Kimi-K2.5-BF16-abliterated-GGUF") |
| try: |
| primary = _call_hf_text(kimi_repo, prompt) |
| if not primary: |
| primary = "[kimi_k2_5_q8] No text returned." |
| except Exception as exc: |
| primary = f"[kimi_k2_5_q8] conductor fallback: {prompt} (reason: {exc})" |
|
|
| delegates = _select_delegations(prompt) |
| if not delegates: |
| return primary |
|
|
| delegated_out = [] |
| for key in delegates: |
| delegated_out.append(f"- {key}: {_route_specialist(key, prompt)}") |
|
|
| return primary + "\n\nDelegations:\n" + "\n".join(delegated_out) |
|
|
| |
| return _route_specialist(model_name, prompt) |
|
|
|
|
| async def main() -> None: |
| async with websockets.serve(handle_client, "0.0.0.0", 8765): |
| logger.info("External system server started on ws://0.0.0.0:8765") |
| await asyncio.Future() |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|