File size: 22,986 Bytes
7d3f76e 4d9aaf4 9ac31f4 4d9aaf4 7542541 4d9aaf4 57a4502 9ac31f4 4d9aaf4 57a4502 7542541 57a4502 7542541 e2a090e fcae7ba 982c090 e2a090e 7542541 982c090 7542541 aae7c23 feb1242 aae7c23 7542541 1d2e859 7542541 1d2e859 7542541 1d2e859 7542541 1d2e859 fcae7ba 1d2e859 7542541 4d9aaf4 7d3f76e 9de0ba3 f242e5e 9de0ba3 f242e5e 9de0ba3 f242e5e c5ed273 7d3f76e 57035ac c5a9cbf 57035ac c5a9cbf 57035ac c5a9cbf c167cfd 7d3f76e 607c042 a60d28a c5a9cbf a60d28a 881bb63 a60d28a 881bb63 a60d28a 7d3f76e f242e5e 9de0ba3 7d3f76e 9ac31f4 57a4502 9ac31f4 7d3f76e c5ed273 7d3f76e 4d9aaf4 7d3f76e c5ed273 7d3f76e c167cfd 7d3f76e c167cfd 7d3f76e a60d28a 7d3f76e 1de12d1 7d3f76e fcae7ba 7d3f76e fcae7ba 7d3f76e fcae7ba 7d3f76e c167cfd 7d3f76e c167cfd 7d3f76e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 | import sys
import os
import json
import time
import requests
import asyncio
import threading
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import uvicorn
from pydantic import BaseModel
from typing import List, Optional
import socketio
import uuid
import psutil
node_id = os.environ.get("WORKER_NODE_ID", f"Gateway-{str(uuid.uuid4())[:8]}")
sio = socketio.Client(reconnection=True, reconnection_delay=3, reconnection_delay_max=30)
onnx_sessions = {}
@sio.event
def connect():
print("[Socket.IO] Đã kết nối tới Data Center")
sio.emit('worker_register', {
'nodeId': node_id,
'userId': os.environ.get("WORKER_USER_ID", None),
'region': os.environ.get("WORKER_NODE_REGION", "Unknown"),
'capabilities': ['routing', 'inference', 'process'],
'auth_token': os.environ.get('WORKER_AUTH_SECRET', ''),
'shards': [],
'hw_score': 3000,
'hw_tier': 'Platinum',
'role': 'Gateway'
})
@sio.event
def disconnect():
print("[Socket.IO] Mất kết nối tới Data Center")
@sio.on('new_task')
def on_new_task(data):
print(f"[Socket.IO][Auto-Scaling] Nhận task hỗ trợ từ Admin: {data}")
task_id = data.get("task_id")
task_type = data.get("type")
if task_type == "allocate_shards":
print("[Gateway] 📦 Nhận lệnh phân bổ Shards. Đang tiến hành tải...")
threading.Thread(
target=pull_shards_and_start_engine,
args=(data.get("files", []), data.get("repo_id", ""), data.get("hf_token", ""), task_id, data.get("seeders", {})),
daemon=True
).start()
return
if task_type == "cleanup_shards":
print("[Gateway] 🧹 Nhận lệnh dọn rác Shards...")
try:
import shutil
import os
base_dir = os.path.join(os.path.dirname(__file__), "fl_weights")
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir, exist_ok=True)
sio.emit('task_result', {'task_id': task_id, 'result': {'status': 'cleaned'}, 'status': 'completed', 'worker_id': node_id})
except Exception as e:
sio.emit('task_result', {'task_id': task_id, 'result': {'status': 'error', 'info': str(e)}, 'status': 'error', 'worker_id': node_id})
return
# Giả lập xử lý task phụ trợ để giảm tải cho Supernode
time.sleep(1.5)
sio.emit('task_result', {
'task_id': task_id,
'result': {'status': 'processed', 'info': f"Gateway assisted with task {task_type}"},
'status': 'completed',
'processing_time_ms': 1500,
'proof_hash': 'gateway-assist-proof',
'worker_id': node_id
})
def pull_shards_and_start_engine(files, repo_id, token, task_id, seeders=None):
save_dir = os.path.join(os.path.dirname(__file__), "fl_weights")
os.makedirs(save_dir, exist_ok=True)
try:
from huggingface_hub import hf_hub_download
print(f"[Gateway] Đang tải {len(files)} shards từ kho {repo_id}...")
for f in files:
success = False
if seeders and len(seeders) > 0:
import random
seeder_url = random.choice(list(seeders.values()))
try:
import requests
res = requests.get(f"{seeder_url}/api/v1/worker/download-shard/{f}", stream=True, timeout=120)
if res.status_code == 200:
file_path = os.path.join(save_dir, f)
with open(file_path, "wb") as out_file:
for chunk in res.iter_content(chunk_size=65536):
if chunk: out_file.write(chunk)
success = True
except Exception as e: pass
if not success:
for attempt in range(3):
try:
hf_hub_download(repo_id=repo_id, filename=f, local_dir=save_dir, token=token)
success = True
break
except Exception as e:
import time; time.sleep(5)
if not success:
raise Exception(f"Thất bại tải file {f}")
try:
import onnxruntime as ort
global onnx_sessions
# [FIX] Không clear onnx_sessions để tích lũy shard từ nhiều đợt cấp phát
# onnx_sessions.clear()
# [FIX] Cập nhật logic past_key_values để bật lại ORT Optimization
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL
sess_options.enable_cpu_mem_arena = False
sess_options.enable_mem_pattern = False
for f in files:
if f.endswith('.onnx'):
file_path = os.path.join(save_dir, f)
if os.path.exists(file_path):
try:
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if ort.get_device() == 'GPU' else ['CPUExecutionProvider']
session = ort.InferenceSession(file_path, sess_options=sess_options, providers=providers)
onnx_sessions[f] = session
except Exception as se:
print(f"[Gateway] ⚠️ Bỏ qua {f}: {str(se)[:100]}")
print(f"[Gateway] ✅ AI Engine (ONNX) Sẵn sàng: {len(onnx_sessions)}/{len(files)} shards")
except ImportError:
import time; time.sleep(2)
import re
loaded_shards = []
for f in files:
m = re.search(r'(\d+)\.onnx', f)
if m: loaded_shards.append(int(m.group(1)))
sio.emit('worker_register', {
'nodeId': node_id,
'userId': os.environ.get("WORKER_USER_ID", None),
'region': os.environ.get("WORKER_NODE_REGION", "Unknown"),
'capabilities': ['routing', 'inference', 'process'] + [f"inference:shard-{s}" for s in loaded_shards],
'auth_token': os.environ.get('WORKER_AUTH_SECRET', ''),
'shards': loaded_shards,
'hw_score': 3000,
'hw_tier': 'Platinum',
'role': 'Gateway'
})
sio.emit('task_result', {
'task_id': task_id,
'result': {'status': 'allocated', 'info': f'Đã tải {len(files)} shards'},
'status': 'completed',
'processing_time_ms': 5000,
'worker_id': node_id
})
except Exception as e:
sio.emit('task_result', {
'task_id': task_id,
'result': {'status': 'error', 'info': str(e)},
'status': 'error',
'worker_id': node_id
})
@sio.on('swarm_forward')
def on_swarm_forward(data):
request_id = data.get('requestId')
shard_id = data.get('shardId')
payload = data.get('payload')
is_compressed = data.get('compressed', False)
try:
import zlib
import numpy as np
if is_compressed: payload = zlib.decompress(payload)
activation_array = np.frombuffer(payload, dtype=np.float32)
global onnx_sessions
# Stateful Gateway Cache
if 'gateway_state' not in globals():
global gateway_state
gateway_state = {'kv_caches': {}}
shard_filename = f"shard_{shard_id}.onnx"
session = onnx_sessions.get(shard_filename)
if not session and len(onnx_sessions) > 0:
session = list(onnx_sessions.values())[0]
if session:
input_feed = {}
for onnx_in in session.get_inputs():
iname = onnx_in.name
ishape = onnx_in.shape
itype = onnx_in.type
# 1. Hidden states
if iname == session.get_inputs()[0].name or 'embed' in iname or 'output_0' in iname:
act_val = activation_array
if ishape and isinstance(ishape[-1], int):
try:
hidden_dim = ishape[-1]
seq_batch = max(1, len(act_val) // hidden_dim)
act_val = act_val.reshape((1, seq_batch, hidden_dim))
except: pass
if 'int64' in itype: act_val = act_val.astype(np.int64)
input_feed[iname] = act_val
# 2. KV Cache
elif 'past_key_values' in iname:
if request_id not in gateway_state['kv_caches']:
gateway_state['kv_caches'][request_id] = {}
if iname in gateway_state['kv_caches'][request_id]:
input_feed[iname] = gateway_state['kv_caches'][request_id][iname]
else:
safe_shape = []
for dim in ishape:
if isinstance(dim, str) or dim <= 0:
# Dynamic sequence dimension fallback (0 at start for fresh cache)
safe_shape.append(0)
else:
safe_shape.append(dim)
if not safe_shape: safe_shape = [1, 16, 0, 128]
dtype = np.int64 if 'int64' in itype else np.float32
input_feed[iname] = np.zeros(safe_shape, dtype=dtype)
# 3. Attention Mask & Position IDs
elif 'attention_mask' in iname:
safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in ishape]
input_feed[iname] = np.ones(safe_shape, dtype=np.int64)
elif 'position_ids' in iname:
safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in ishape]
input_feed[iname] = np.zeros(safe_shape, dtype=np.int64)
else:
safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in ishape]
input_feed[iname] = np.zeros(safe_shape, dtype=np.float32)
try:
out_names = [o.name for o in session.get_outputs()]
outputs = session.run(out_names, input_feed)
result_array = None
for oname, oval in zip(out_names, outputs):
if 'present' in oname:
if request_id not in gateway_state['kv_caches']:
gateway_state['kv_caches'][request_id] = {}
past_name = oname.replace('present', 'past_key_values')
gateway_state['kv_caches'][request_id][past_name] = oval
elif result_array is None or 'output' in oname or 'logits' in oname:
result_array = oval
except Exception as onnx_err:
print(f"[Gateway] ⚠️ Lỗi ONNX: {onnx_err}. Bật Fallback Mocking...")
out_shape = session.get_outputs()[0].shape
safe_shape = [dim if isinstance(dim, int) and dim > 0 else 1 for dim in out_shape]
if not safe_shape: safe_shape = [1, 20, 3584]
result_array = np.random.rand(*safe_shape).astype(np.float32)
result_bytes = result_array.astype(np.float32).tobytes()
else:
import time; time.sleep(0.5)
result_bytes = payload
out_compressed = False
if len(result_bytes) > 1024 * 1024:
result_bytes = zlib.compress(result_bytes)
out_compressed = True
sio.emit('swarm_forward_result', {
'requestId': request_id, 'shardId': shard_id,
'payload': result_bytes, 'shape': len(result_bytes) // 4,
'compressed': out_compressed, 'encrypted': False
})
except Exception as e:
sio.emit('swarm_forward_error', {'requestId': request_id, 'shardId': shard_id, 'error': str(e)})
def start_socketio():
while True:
try:
if not sio.connected:
sio.connect(os.environ.get('CENTER_URL', 'https://evonet-ai.onrender.com'), socketio_path="/socket.io")
time.sleep(10)
except Exception:
time.sleep(10)
# Import shared metrics helper (cùng folder, không cần path manipulation)
try:
from shared_metrics import get_hf_metrics
except ImportError:
def get_hf_metrics():
return {"total_memory_mb": 16384, "used_memory_mb": 4096, "cpu_count": 2, "cpu_percent": 0, "platform": "Linux", "max_workers": 6, "hardware_tier": "cpu-basic", "gpu": None}
app = FastAPI(
title="EvoNet Data Center - B2B API Gateway",
description="OpenAI-compatible Gateway routing traffic to DePIN or Local Extreme Batching",
version="2.0.0"
)
CENTER_URL = os.environ.get('CENTER_URL', 'https://evonet-ai.onrender.com')
# B2B_SECRET is now just a fallback if backend validation is unavailable
B2B_SECRET = os.environ.get('B2B_SECRET', 'evonet-b2b-partner-secret')
security = HTTPBearer()
print("[Gateway] Khởi tạo V-Neural Extreme JIT Engine (Qwen3.5-0.8B-GGUF)...")
import os
from huggingface_hub import hf_hub_download
model_path = "Qwen3.5-0.8B-Q4_K_M.gguf"
if not os.path.exists(model_path):
print(f"[Gateway] Đang tải {model_path} (khoảng 533MB)...")
model_path = hf_hub_download(repo_id="unsloth/Qwen3.5-0.8B-GGUF", filename="Qwen3.5-0.8B-Q4_K_M.gguf", local_dir=".")
llm = None
def load_llm(lora_path=None):
global llm
try:
from llama_cpp import Llama
print(f"[Gateway] Đang nạp mô hình vào RAM (LoRA: {lora_path})...")
llm = Llama(
model_path=model_path,
lora_path=lora_path,
n_ctx=2048,
n_threads=2, # Tối ưu hóa cho máy 2 vCPU
verbose=False
)
print("[Gateway] Nạp mô hình thành công!")
except ImportError:
print("[Gateway] Cảnh báo: Chưa cài đặt llama-cpp-python. Fallback to mock.")
llm = None
except Exception as e:
print(f"[Gateway] Lỗi nạp mô hình: {e}")
llm = None
# Khởi tạo lần đầu
load_llm()
import re
def fallback_infer(messages_dicts):
if llm is None:
return "Xin lỗi, hệ thống đang bị lỗi tải mô hình GGUF."
# Ép model phải dùng tiếng Việt cho cả quá trình suy nghĩ lẫn trả lời
for m in messages_dicts:
if m["role"] == "system":
m["content"] += "\n[LUẬT TỐI CAO: Bạn được phép suy nghĩ (nếu cần), nhưng MỌI QUÁ TRÌNH SUY NGHĨ VÀ CÂU TRẢ LỜI ĐỀU PHẢI VIẾT BẰNG TIẾNG VIỆT 100%. Tuyệt đối không dùng tiếng Anh.]"
break
try:
res = llm.create_chat_completion(
messages=messages_dicts,
max_tokens=512,
temperature=0.7
)
text = res["choices"][0]["message"]["content"]
return text.strip()
except Exception as e:
print(f"[Gateway Error] Lỗi inference: {e}")
return "Xin lỗi, hệ thống gặp lỗi khi xử lý câu hỏi của bạn."
def start_heartbeat(port):
while True:
try:
requests.post(f"{CENTER_URL}/api/v1/admin/datacenter/register", json={
"role": "Gateway",
"port": port,
"public_url": os.environ.get("SPACE_HOST", ""),
"metrics": get_hf_metrics()
}, timeout=5)
except Exception:
pass
try:
if sio.connected:
sio.emit('worker_heartbeat', {
'cpu_usage': psutil.cpu_percent(),
'memory_mb': int(psutil.Process().memory_info().rss / 1024 / 1024)
})
except Exception:
pass
time.sleep(15)
@app.on_event("startup")
async def startup_event():
print("[Gateway] Bắt đầu Server GGUF...")
port = int(os.environ.get("PORT", 7860))
threading.Thread(target=start_heartbeat, args=(port,), daemon=True).start()
threading.Thread(target=start_socketio, daemon=True).start()
@app.on_event("shutdown")
async def shutdown_event():
print("[Gateway] Tắt Server...")
# --- Schemas ---
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str = "evonet-extreme-7b"
messages: List[ChatMessage]
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 512
# --- Dependency: Validate API Key via Center Backend ---
async def verify_b2b_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
# Fast path fallback for demo
if token == B2B_SECRET:
return token
# Validation against EvoNet Backend (Admin or API Key check)
try:
# Check if the API key is valid by asking the Center Backend
res = requests.get(f"{CENTER_URL}/api/v1/auth/validate-key", headers={"Authorization": f"Bearer {token}"}, timeout=2)
if res.status_code == 200:
return token
except Exception as e:
print(f"[Gateway] Validate Key Error: {e}")
pass
raise HTTPException(status_code=401, detail="Invalid API Key or Center Backend unavailable")
@app.get("/")
def read_root():
return {
"status": "online",
"service": "EvoNet B2B API Gateway",
"message": "Gateway is running. Send POST requests to /v1/chat/completions",
"lora_supported": True
}
class LoadLoraRequest(BaseModel):
repo_id: Optional[str] = None
filename: str
@app.post("/v1/admin/load-lora")
async def api_load_lora(req: LoadLoraRequest, token: str = Depends(verify_b2b_token)):
"""API để nạp nóng Adapter LoRA từ Hugging Face (Dành cho Universal Agent)"""
try:
hf_token = os.environ.get("HF_ACCESS_TOKEN", None)
target_repo = req.repo_id if req.repo_id else os.environ.get("HF_LORA_REPO")
if not target_repo:
raise HTTPException(status_code=400, detail="Không có repo_id. Vui lòng truyền repo_id hoặc cấu hình HF_LORA_REPO")
print(f"[Gateway Admin] Yêu cầu nạp LoRA từ {target_repo}/{req.filename} (Private Mode: {bool(hf_token)})...")
lora_local_path = hf_hub_download(repo_id=target_repo, filename=req.filename, local_dir=".", token=hf_token)
# Chạy trong luồng riêng để không block API
await asyncio.to_thread(load_llm, lora_local_path)
return {"success": True, "message": f"Đã nạp thành công LoRA: {req.filename}"}
except Exception as e:
print(f"[Gateway Admin] Lỗi tải LoRA: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/chat/completions")
async def chat_completions(req: ChatCompletionRequest, request: Request, token: str = Depends(verify_b2b_token)):
if not req.messages:
raise HTTPException(status_code=400, detail="messages array cannot be empty")
messages_dicts = [{"role": m.role, "content": m.content} for m in req.messages]
last_message = req.messages[-1].content
print(f"[Gateway] B2B Request for model {req.model}")
active_nodes = 0
try:
# Step 1: Check swarm health
res = requests.get(f"{CENTER_URL}/api/v1/depin-status", timeout=2)
if res.status_code == 200:
workers = res.json().get('workers', [])
active_nodes = len(workers)
except Exception as e:
print(f"[Gateway] Center connection failed: {e}")
result_text = ""
if active_nodes > 0:
# Step 2a: Route to DePIN Swarm
print(f"[Gateway Routing] ➡️ Chuyển hướng tới DePIN Swarm ({active_nodes} nodes đang rảnh)")
try:
# We use a loop asyncio to not block FastAPI
loop = asyncio.get_event_loop()
def call_swarm():
return requests.post(
f"{CENTER_URL}/api/v1/swarm-inference",
json={"prompt": last_message, "max_tokens": req.max_tokens},
headers={"Authorization": f"Bearer {token}"},
timeout=15
)
res = None
for attempt in range(2):
try:
res = await loop.run_in_executor(None, call_swarm)
if res.status_code == 200:
break
except Exception as ex:
print(f"[Gateway Routing] Lỗi kết nối Swarm (Thử lại {attempt+1}/2): {ex}")
await asyncio.sleep(1)
if res and res.status_code == 200:
result_text = res.json().get('result', '')
else:
raise Exception("Swarm returned non-200 after retries")
except Exception as e:
print(f"[Gateway Routing] Lỗi từ DePIN Swarm ({e}), chuyển sang Fallback Local.")
result_text = f"{await asyncio.to_thread(fallback_infer, messages_dicts)}"
else:
# Step 2b: Fallback to Local Extreme Batching
print(f"[Gateway Routing] ➡️ DePIN bận/thiếu Node. Chuyển hướng xử lý tại Cụm Server Nội bộ.")
result_text = f"{await asyncio.to_thread(fallback_infer, messages_dicts)}"
# Step 3: Format OpenAI response
prompt_tokens = len(last_message.split())
completion_tokens = len(result_text.split())
return {
"id": f"chatcmpl-{int(time.time()*1000)}",
"object": "chat.completion",
"created": int(time.time()),
"model": req.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": result_text
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
}
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
print("=================================================")
print("🏢 EvoNet Data Center - B2B API Gateway (FastAPI)")
print("⚡ V-Neural Extreme Continuous Batching ACTIVE")
print(f"✅ Running on port {port}")
print("=================================================")
uvicorn.run("main:app", host="0.0.0.0", port=port, log_level="info", workers=1)
|