#!/usr/bin/env python3 """External system server running on your own machine.""" from __future__ import annotations import asyncio import json import logging import os from pathlib import Path from typing import Any import requests import websockets logging.basicConfig(level=logging.INFO, format="%(asctime)s - external: %(message)s") logger = logging.getLogger(__name__) _CFG_PATH = Path(__file__).with_name("model_configs.json") _HF_TOKEN = os.getenv("HF_TOKEN", "").strip() _HF_BASE = os.getenv("HF_BASE_URL", "https://api-inference.huggingface.co/models").rstrip("/") def _load_model_configs() -> dict[str, Any]: try: return json.loads(_CFG_PATH.read_text(encoding="utf-8")) except Exception: return {} MODEL_CONFIGS = _load_model_configs() async def handle_client(websocket): async for message in websocket: data = json.loads(message) logger.info("Received: %s", data) prompt = data.get("prompt", "") model = data.get("model", "kimi_k2_5_q8") response_text = run_model(prompt=prompt, model=model) response = { "type": "response", "command": data.get("command", "model_inference"), "success": True, "response": response_text, "data": { "model": model, "message": "External system processed command", }, } await websocket.send(json.dumps(response)) def _hf_headers() -> dict[str, str]: headers = {"Content-Type": "application/json"} if _HF_TOKEN: headers["Authorization"] = f"Bearer {_HF_TOKEN}" return headers def _call_hf_text(model_repo: str, prompt: str) -> str: """ Call HF Inference API text generation endpoint. Returns plain text or raises. """ payload = { "inputs": prompt, "parameters": { "max_new_tokens": 512, "temperature": 0.7, "return_full_text": False, }, } resp = requests.post( f"{_HF_BASE}/{model_repo}", headers=_hf_headers(), json=payload, timeout=90, ) resp.raise_for_status() data = resp.json() if isinstance(data, list) and data: return (data[0].get("generated_text") or "").strip() if isinstance(data, dict): return (data.get("generated_text") or data.get("text") or str(data)).strip() return str(data).strip() def _route_specialist(model_key: str, prompt: str) -> str: cfg = MODEL_CONFIGS.get(model_key, {}) model_name = cfg.get("name", model_key) model_type = cfg.get("type", "unknown") # Text-capable specialists: real HF call. if model_type in {"large_language_model", "text_analysis"}: try: out = _call_hf_text(model_name, prompt) return out or f"[{model_key}] No text returned." except Exception as exc: return f"[{model_key}] text call failed: {exc}" # Trend/API specialists: perform lightweight status check. if model_type in {"trend_analysis", "social_trends", "news_trends"}: return ( f"[{model_key}] trend pipeline acknowledged. " f"Endpoint configured: {model_name}" ) # Media/speech specialists are acknowledged in this bridge runtime. if model_type in { "video_generation", "video_enhancement", "unrestricted_video", "image_generation", "image_animation", "speech_recognition", "text_to_speech", }: return ( f"[{model_key}] delegation accepted ({model_type}). " "Bridge runtime received request." ) return f"[{model_key}] unsupported type: {model_type}" def _select_delegations(prompt: str) -> list[str]: p = prompt.lower() delegates: list[str] = [] if any(k in p for k in ["image", "photo", "picture", "flux"]): delegates.append("flux_klein") if any(k in p for k in ["animate", "animation"]): delegates.append("animate_diff") if any(k in p for k in ["video", "clip", "short", "reel", "tiktok"]): delegates.append("minimax_video") if any(k in p for k in ["upscale", "enhance video", "ltx"]): delegates.append("ltx_video") if any(k in p for k in ["wan", "motion synthesis"]): delegates.append("wan_animate") if any(k in p for k in ["trends", "google trends", "trend"]): delegates.append("google_trends") if any(k in p for k in ["reddit"]): delegates.append("reddit_trends") if any(k in p for k in ["news", "headline", "breaking"]): delegates.append("news_trends") return list(dict.fromkeys(delegates)) def run_model(prompt: str, model: str) -> str: """ Kimi is the conversational conductor. It delegates non-coding media/trend tasks to specialist models and returns a unified response. """ model_name = model or "kimi_k2_5_q8" if not prompt: return f"[{model_name}] Ready." # Voice utility delegates. if model_name == "whisper_speech": return f"[whisper_speech] Transcribed: {prompt}" if model_name == "granite_speech": return "[granite_speech] Spoken output queued." # Kimi controls conversation and coding. if model_name == "kimi_k2_5_q8": kimi_cfg = MODEL_CONFIGS.get("kimi_k2_5_q8", {}) kimi_repo = kimi_cfg.get("name", "huihui-ai/Huihui-Kimi-K2.5-BF16-abliterated-GGUF") try: primary = _call_hf_text(kimi_repo, prompt) if not primary: primary = "[kimi_k2_5_q8] No text returned." except Exception as exc: primary = f"[kimi_k2_5_q8] conductor fallback: {prompt} (reason: {exc})" delegates = _select_delegations(prompt) if not delegates: return primary delegated_out = [] for key in delegates: delegated_out.append(f"- {key}: {_route_specialist(key, prompt)}") return primary + "\n\nDelegations:\n" + "\n".join(delegated_out) # Direct call to specialist if explicitly requested. return _route_specialist(model_name, prompt) async def main() -> None: async with websockets.serve(handle_client, "0.0.0.0", 8765): logger.info("External system server started on ws://0.0.0.0:8765") await asyncio.Future() if __name__ == "__main__": asyncio.run(main())