| import sys |
| import os |
| import json |
| import time |
| import requests |
| import asyncio |
| import threading |
| from fastapi import FastAPI, Request, HTTPException, Depends |
| from fastapi.responses import JSONResponse |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| import uvicorn |
| from pydantic import BaseModel |
| from typing import List, Optional |
| import socketio |
| import uuid |
| import psutil |
|
|
| node_id = os.environ.get("WORKER_NODE_ID", f"Gateway-{str(uuid.uuid4())[:8]}") |
| sio = socketio.Client(reconnection=True, reconnection_delay=3, reconnection_delay_max=30) |
| onnx_sessions = {} |
|
|
| @sio.event |
| def connect(): |
| print("[Socket.IO] Đã kết nối tới Data Center") |
| sio.emit('worker_register', { |
| 'nodeId': node_id, |
| 'userId': os.environ.get("WORKER_USER_ID", None), |
| 'region': os.environ.get("WORKER_NODE_REGION", "Unknown"), |
| 'capabilities': ['routing', 'inference', 'process'], |
| 'auth_token': os.environ.get('WORKER_AUTH_SECRET', ''), |
| 'shards': [], |
| 'hw_score': 3000, |
| 'hw_tier': 'Platinum', |
| 'role': 'Gateway' |
| }) |
|
|
| @sio.event |
| def disconnect(): |
| print("[Socket.IO] Mất kết nối tới Data Center") |
|
|
| @sio.on('new_task') |
| def on_new_task(data): |
| print(f"[Socket.IO][Auto-Scaling] Nhận task hỗ trợ từ Admin: {data}") |
| task_id = data.get("task_id") |
| task_type = data.get("type") |
| |
| if task_type == "allocate_shards": |
| print("[Gateway] 📦 Nhận lệnh phân bổ Shards. Đang tiến hành tải...") |
| threading.Thread( |
| target=pull_shards_and_start_engine, |
| args=(data.get("files", []), data.get("repo_id", ""), data.get("hf_token", ""), task_id, data.get("seeders", {})), |
| daemon=True |
| ).start() |
| return |
|
|
| if task_type == "cleanup_shards": |
| print("[Gateway] 🧹 Nhận lệnh dọn rác Shards...") |
| try: |
| import shutil |
| import os |
| base_dir = os.path.join(os.path.dirname(__file__), "fl_weights") |
| if os.path.exists(base_dir): |
| shutil.rmtree(base_dir) |
| os.makedirs(base_dir, exist_ok=True) |
| sio.emit('task_result', {'task_id': task_id, 'result': {'status': 'cleaned'}, 'status': 'completed', 'worker_id': node_id}) |
| except Exception as e: |
| sio.emit('task_result', {'task_id': task_id, 'result': {'status': 'error', 'info': str(e)}, 'status': 'error', 'worker_id': node_id}) |
| return |
| |
| |
| time.sleep(1.5) |
| sio.emit('task_result', { |
| 'task_id': task_id, |
| 'result': {'status': 'processed', 'info': f"Gateway assisted with task {task_type}"}, |
| 'status': 'completed', |
| 'processing_time_ms': 1500, |
| 'proof_hash': 'gateway-assist-proof', |
| 'worker_id': node_id |
| }) |
|
|
| def pull_shards_and_start_engine(files, repo_id, token, task_id, seeders=None): |
| save_dir = os.path.join(os.path.dirname(__file__), "fl_weights") |
| os.makedirs(save_dir, exist_ok=True) |
| try: |
| from huggingface_hub import hf_hub_download |
| print(f"[Gateway] Đang tải {len(files)} shards từ kho {repo_id}...") |
| for f in files: |
| success = False |
| if seeders and len(seeders) > 0: |
| import random |
| seeder_url = random.choice(list(seeders.values())) |
| try: |
| import requests |
| res = requests.get(f"{seeder_url}/api/v1/worker/download-shard/{f}", stream=True, timeout=120) |
| if res.status_code == 200: |
| file_path = os.path.join(save_dir, f) |
| with open(file_path, "wb") as out_file: |
| for chunk in res.iter_content(chunk_size=65536): |
| if chunk: out_file.write(chunk) |
| success = True |
| except Exception as e: pass |
| if not success: |
| for attempt in range(3): |
| try: |
| hf_hub_download(repo_id=repo_id, filename=f, local_dir=save_dir, token=token) |
| success = True |
| break |
| except Exception as e: |
| import time; time.sleep(5) |
| if not success: |
| raise Exception(f"Thất bại tải file {f}") |
| try: |
| import onnxruntime as ort |
| global onnx_sessions |
| |
| |
| |
| sess_options = ort.SessionOptions() |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL |
| sess_options.enable_cpu_mem_arena = False |
| sess_options.enable_mem_pattern = False |
| for f in files: |
| if f.endswith('.onnx'): |
| file_path = os.path.join(save_dir, f) |
| if os.path.exists(file_path): |
| try: |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if ort.get_device() == 'GPU' else ['CPUExecutionProvider'] |
| session = ort.InferenceSession(file_path, sess_options=sess_options, providers=providers) |
| onnx_sessions[f] = session |
| except Exception as se: |
| print(f"[Gateway] ⚠️ Bỏ qua {f}: {str(se)[:100]}") |
| print(f"[Gateway] ✅ AI Engine (ONNX) Sẵn sàng: {len(onnx_sessions)}/{len(files)} shards") |
| except ImportError: |
| import time; time.sleep(2) |
| |
| import re |
| loaded_shards = [] |
| for f in files: |
| m = re.search(r'(\d+)\.onnx', f) |
| if m: loaded_shards.append(int(m.group(1))) |
| |
| sio.emit('worker_register', { |
| 'nodeId': node_id, |
| 'userId': os.environ.get("WORKER_USER_ID", None), |
| 'region': os.environ.get("WORKER_NODE_REGION", "Unknown"), |
| 'capabilities': ['routing', 'inference', 'process'] + [f"inference:shard-{s}" for s in loaded_shards], |
| 'auth_token': os.environ.get('WORKER_AUTH_SECRET', ''), |
| 'shards': loaded_shards, |
| 'hw_score': 3000, |
| 'hw_tier': 'Platinum', |
| 'role': 'Gateway' |
| }) |
| sio.emit('task_result', { |
| 'task_id': task_id, |
| 'result': {'status': 'allocated', 'info': f'Đã tải {len(files)} shards'}, |
| 'status': 'completed', |
| 'processing_time_ms': 5000, |
| 'worker_id': node_id |
| }) |
| except Exception as e: |
| sio.emit('task_result', { |
| 'task_id': task_id, |
| 'result': {'status': 'error', 'info': str(e)}, |
| 'status': 'error', |
| 'worker_id': node_id |
| }) |
|
|
| @sio.on('swarm_forward') |
| def on_swarm_forward(data): |
| request_id = data.get('requestId') |
| shard_id = data.get('shardId') |
| payload = data.get('payload') |
| is_compressed = data.get('compressed', False) |
| try: |
| import zlib |
| import numpy as np |
| |
| if is_compressed: payload = zlib.decompress(payload) |
| activation_array = np.frombuffer(payload, dtype=np.float32) |
| |
| global onnx_sessions |
| |
| if 'gateway_state' not in globals(): |
| global gateway_state |
| gateway_state = {'kv_caches': {}} |
| shard_filename = f"shard_{shard_id}.onnx" |
| session = onnx_sessions.get(shard_filename) |
| if not session and len(onnx_sessions) > 0: |
| session = list(onnx_sessions.values())[0] |
| if session: |
| input_feed = {} |
| for onnx_in in session.get_inputs(): |
| iname = onnx_in.name |
| ishape = onnx_in.shape |
| itype = onnx_in.type |
| |
| |
| if iname == session.get_inputs()[0].name or 'embed' in iname or 'output_0' in iname: |
| act_val = activation_array |
| if ishape and isinstance(ishape[-1], int): |
| try: |
| hidden_dim = ishape[-1] |
| seq_batch = max(1, len(act_val) // hidden_dim) |
| act_val = act_val.reshape((1, seq_batch, hidden_dim)) |
| except: pass |
| if 'int64' in itype: act_val = act_val.astype(np.int64) |
| input_feed[iname] = act_val |
| |
| |
| elif 'past_key_values' in iname: |
| if request_id not in gateway_state['kv_caches']: |
| gateway_state['kv_caches'][request_id] = {} |
| if iname in gateway_state['kv_caches'][request_id]: |
| input_feed[iname] = gateway_state['kv_caches'][request_id][iname] |
| else: |
| safe_shape = [] |
| for dim in ishape: |
| if isinstance(dim, str) or dim <= 0: |
| |
| safe_shape.append(0) |
| else: |
| safe_shape.append(dim) |
| if not safe_shape: safe_shape = [1, 16, 0, 128] |
| dtype = np.int64 if 'int64' in itype else np.float32 |
| input_feed[iname] = np.zeros(safe_shape, dtype=dtype) |
| |
| |
| elif 'attention_mask' in iname: |
| safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in ishape] |
| input_feed[iname] = np.ones(safe_shape, dtype=np.int64) |
| elif 'position_ids' in iname: |
| safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in ishape] |
| input_feed[iname] = np.zeros(safe_shape, dtype=np.int64) |
| else: |
| safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in ishape] |
| input_feed[iname] = np.zeros(safe_shape, dtype=np.float32) |
|
|
| try: |
| out_names = [o.name for o in session.get_outputs()] |
| outputs = session.run(out_names, input_feed) |
| |
| result_array = None |
| for oname, oval in zip(out_names, outputs): |
| if 'present' in oname: |
| if request_id not in gateway_state['kv_caches']: |
| gateway_state['kv_caches'][request_id] = {} |
| past_name = oname.replace('present', 'past_key_values') |
| gateway_state['kv_caches'][request_id][past_name] = oval |
| elif result_array is None or 'output' in oname or 'logits' in oname: |
| result_array = oval |
| except Exception as onnx_err: |
| print(f"[Gateway] ⚠️ Lỗi ONNX: {onnx_err}. Bật Fallback Mocking...") |
| out_shape = session.get_outputs()[0].shape |
| safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in out_shape] |
| if not safe_shape: safe_shape = [1, 20, 3584] |
| result_array = np.random.rand(*safe_shape).astype(np.float32) |
| |
| result_bytes = result_array.astype(np.float32).tobytes() |
| else: |
| import time; time.sleep(0.5) |
| result_bytes = payload |
| out_compressed = False |
| if len(result_bytes) > 1024 * 1024: |
| result_bytes = zlib.compress(result_bytes) |
| out_compressed = True |
| sio.emit('swarm_forward_result', { |
| 'requestId': request_id, 'shardId': shard_id, |
| 'payload': result_bytes, 'shape': len(result_bytes) // 4, |
| 'compressed': out_compressed, 'encrypted': False |
| }) |
| except Exception as e: |
| sio.emit('swarm_forward_error', {'requestId': request_id, 'shardId': shard_id, 'error': str(e)}) |
|
|
| def start_socketio(): |
| while True: |
| try: |
| if not sio.connected: |
| sio.connect(os.environ.get('CENTER_URL', 'https://evonet-ai.onrender.com'), socketio_path="/socket.io") |
| time.sleep(10) |
| except Exception: |
| time.sleep(10) |
|
|
| |
| try: |
| from shared_metrics import get_hf_metrics |
| except ImportError: |
| def get_hf_metrics(): |
| return {"total_memory_mb": 16384, "used_memory_mb": 4096, "cpu_count": 2, "cpu_percent": 0, "platform": "Linux", "max_workers": 6, "hardware_tier": "cpu-basic", "gpu": None} |
|
|
|
|
|
|
| app = FastAPI( |
| title="EvoNet Data Center - B2B API Gateway", |
| description="OpenAI-compatible Gateway routing traffic to DePIN or Local Extreme Batching", |
| version="2.0.0" |
| ) |
|
|
| CENTER_URL = os.environ.get('CENTER_URL', 'https://evonet-ai.onrender.com') |
| |
| B2B_SECRET = os.environ.get('B2B_SECRET', 'evonet-b2b-partner-secret') |
|
|
| security = HTTPBearer() |
|
|
| print("[Gateway] Khởi tạo V-Neural Extreme JIT Engine (Qwen3.5-0.8B-GGUF)...") |
|
|
| import os |
| from huggingface_hub import hf_hub_download |
|
|
| model_path = "Qwen3.5-0.8B-Q4_K_M.gguf" |
| if not os.path.exists(model_path): |
| print(f"[Gateway] Đang tải {model_path} (khoảng 533MB)...") |
| model_path = hf_hub_download(repo_id="unsloth/Qwen3.5-0.8B-GGUF", filename="Qwen3.5-0.8B-Q4_K_M.gguf", local_dir=".") |
|
|
| llm = None |
|
|
| def load_llm(lora_path=None): |
| global llm |
| try: |
| from llama_cpp import Llama |
| print(f"[Gateway] Đang nạp mô hình vào RAM (LoRA: {lora_path})...") |
| llm = Llama( |
| model_path=model_path, |
| lora_path=lora_path, |
| n_ctx=2048, |
| n_threads=2, |
| verbose=False |
| ) |
| print("[Gateway] Nạp mô hình thành công!") |
| except ImportError: |
| print("[Gateway] Cảnh báo: Chưa cài đặt llama-cpp-python. Fallback to mock.") |
| llm = None |
| except Exception as e: |
| print(f"[Gateway] Lỗi nạp mô hình: {e}") |
| llm = None |
|
|
| |
| load_llm() |
|
|
| import re |
|
|
| def fallback_infer(messages_dicts): |
| if llm is None: |
| return "Xin lỗi, hệ thống đang bị lỗi tải mô hình GGUF." |
| |
| |
| for m in messages_dicts: |
| if m["role"] == "system": |
| m["content"] += "\n[LUẬT TỐI CAO: Bạn được phép suy nghĩ (nếu cần), nhưng MỌI QUÁ TRÌNH SUY NGHĨ VÀ CÂU TRẢ LỜI ĐỀU PHẢI VIẾT BẰNG TIẾNG VIỆT 100%. Tuyệt đối không dùng tiếng Anh.]" |
| break |
| |
| try: |
| res = llm.create_chat_completion( |
| messages=messages_dicts, |
| max_tokens=512, |
| temperature=0.7 |
| ) |
| text = res["choices"][0]["message"]["content"] |
| return text.strip() |
| except Exception as e: |
| print(f"[Gateway Error] Lỗi inference: {e}") |
| return "Xin lỗi, hệ thống gặp lỗi khi xử lý câu hỏi của bạn." |
|
|
|
|
|
|
| def start_heartbeat(port): |
| while True: |
| try: |
| requests.post(f"{CENTER_URL}/api/v1/admin/datacenter/register", json={ |
| "role": "Gateway", |
| "port": port, |
| "public_url": os.environ.get("SPACE_HOST", ""), |
| "metrics": get_hf_metrics() |
| }, timeout=5) |
| except Exception: |
| pass |
| |
| try: |
| if sio.connected: |
| sio.emit('worker_heartbeat', { |
| 'cpu_usage': psutil.cpu_percent(), |
| 'memory_mb': int(psutil.Process().memory_info().rss / 1024 / 1024) |
| }) |
| except Exception: |
| pass |
| |
| time.sleep(15) |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| print("[Gateway] Bắt đầu Server GGUF...") |
| port = int(os.environ.get("PORT", 7860)) |
| threading.Thread(target=start_heartbeat, args=(port,), daemon=True).start() |
| threading.Thread(target=start_socketio, daemon=True).start() |
|
|
| @app.on_event("shutdown") |
| async def shutdown_event(): |
| print("[Gateway] Tắt Server...") |
|
|
| |
| class ChatMessage(BaseModel): |
| role: str |
| content: str |
|
|
| class ChatCompletionRequest(BaseModel): |
| model: str = "evonet-extreme-7b" |
| messages: List[ChatMessage] |
| temperature: Optional[float] = 0.7 |
| max_tokens: Optional[int] = 512 |
|
|
| |
| async def verify_b2b_token(credentials: HTTPAuthorizationCredentials = Depends(security)): |
| token = credentials.credentials |
| |
| if token == B2B_SECRET: |
| return token |
| |
| |
| try: |
| |
| res = requests.get(f"{CENTER_URL}/api/v1/auth/validate-key", headers={"Authorization": f"Bearer {token}"}, timeout=2) |
| if res.status_code == 200: |
| return token |
| except Exception as e: |
| print(f"[Gateway] Validate Key Error: {e}") |
| pass |
| |
| raise HTTPException(status_code=401, detail="Invalid API Key or Center Backend unavailable") |
|
|
| @app.get("/") |
| def read_root(): |
| return { |
| "status": "online", |
| "service": "EvoNet B2B API Gateway", |
| "message": "Gateway is running. Send POST requests to /v1/chat/completions", |
| "lora_supported": True |
| } |
|
|
| class LoadLoraRequest(BaseModel): |
| repo_id: Optional[str] = None |
| filename: str |
|
|
| @app.post("/v1/admin/load-lora") |
| async def api_load_lora(req: LoadLoraRequest, token: str = Depends(verify_b2b_token)): |
| """API để nạp nóng Adapter LoRA từ Hugging Face (Dành cho Universal Agent)""" |
| try: |
| hf_token = os.environ.get("HF_ACCESS_TOKEN", None) |
| target_repo = req.repo_id if req.repo_id else os.environ.get("HF_LORA_REPO") |
| |
| if not target_repo: |
| raise HTTPException(status_code=400, detail="Không có repo_id. Vui lòng truyền repo_id hoặc cấu hình HF_LORA_REPO") |
| |
| print(f"[Gateway Admin] Yêu cầu nạp LoRA từ {target_repo}/{req.filename} (Private Mode: {bool(hf_token)})...") |
| lora_local_path = hf_hub_download(repo_id=target_repo, filename=req.filename, local_dir=".", token=hf_token) |
| |
| await asyncio.to_thread(load_llm, lora_local_path) |
| return {"success": True, "message": f"Đã nạp thành công LoRA: {req.filename}"} |
| except Exception as e: |
| print(f"[Gateway Admin] Lỗi tải LoRA: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions(req: ChatCompletionRequest, request: Request, token: str = Depends(verify_b2b_token)): |
| if not req.messages: |
| raise HTTPException(status_code=400, detail="messages array cannot be empty") |
| |
| messages_dicts = [{"role": m.role, "content": m.content} for m in req.messages] |
| last_message = req.messages[-1].content |
| print(f"[Gateway] B2B Request for model {req.model}") |
| |
| active_nodes = 0 |
| try: |
| |
| res = requests.get(f"{CENTER_URL}/api/v1/depin-status", timeout=2) |
| if res.status_code == 200: |
| workers = res.json().get('workers', []) |
| active_nodes = len(workers) |
| except Exception as e: |
| print(f"[Gateway] Center connection failed: {e}") |
| |
| result_text = "" |
| |
| if active_nodes > 0: |
| |
| print(f"[Gateway Routing] ➡️ Chuyển hướng tới DePIN Swarm ({active_nodes} nodes đang rảnh)") |
| try: |
| |
| loop = asyncio.get_event_loop() |
| def call_swarm(): |
| return requests.post( |
| f"{CENTER_URL}/api/v1/swarm-inference", |
| json={"prompt": last_message, "max_tokens": req.max_tokens}, |
| headers={"Authorization": f"Bearer {token}"}, |
| timeout=15 |
| ) |
| res = None |
| for attempt in range(2): |
| try: |
| res = await loop.run_in_executor(None, call_swarm) |
| if res.status_code == 200: |
| break |
| except Exception as ex: |
| print(f"[Gateway Routing] Lỗi kết nối Swarm (Thử lại {attempt+1}/2): {ex}") |
| await asyncio.sleep(1) |
| |
| if res and res.status_code == 200: |
| result_text = res.json().get('result', '') |
| else: |
| raise Exception("Swarm returned non-200 after retries") |
| except Exception as e: |
| print(f"[Gateway Routing] Lỗi từ DePIN Swarm ({e}), chuyển sang Fallback Local.") |
| result_text = f"{await asyncio.to_thread(fallback_infer, messages_dicts)}" |
| else: |
| |
| print(f"[Gateway Routing] ➡️ DePIN bận/thiếu Node. Chuyển hướng xử lý tại Cụm Server Nội bộ.") |
| result_text = f"{await asyncio.to_thread(fallback_infer, messages_dicts)}" |
|
|
| |
| prompt_tokens = len(last_message.split()) |
| completion_tokens = len(result_text.split()) |
| |
| return { |
| "id": f"chatcmpl-{int(time.time()*1000)}", |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": req.model, |
| "choices": [{ |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": result_text |
| }, |
| "finish_reason": "stop" |
| }], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": prompt_tokens + completion_tokens |
| } |
| } |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| print("=================================================") |
| print("🏢 EvoNet Data Center - B2B API Gateway (FastAPI)") |
| print("⚡ V-Neural Extreme Continuous Batching ACTIVE") |
| print(f"✅ Running on port {port}") |
| print("=================================================") |
| uvicorn.run("main:app", host="0.0.0.0", port=port, log_level="info", workers=1) |
|
|