FServe / multi_gpu_system_http.py
Factor Studios
Upload 37 files
e9bc512 verified
from http_storage import HTTPGPUStorage
from gpu_chip import GPUChip
from typing import Dict, Any, List, Optional
import time
import numpy as np
class MultiGPUSystem:
def __init__(self, num_gpus: int = 8, storage=None):
self.storage = storage
if self.storage is None:
from http_storage import HTTPGPUStorage
self.storage = HTTPGPUStorage()
if not self.storage.wait_for_connection():
raise RuntimeError("Could not connect to GPU storage server")
# Initialize GPUs with shared storage
self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)]
# Initialize system state
self.system_state = {
"num_gpus": num_gpus,
"nvlink_state": {
"connections": self._init_nvlink_topology(num_gpus),
"active_transfers": {}
},
"global_memory_state": {
"total_vram_gb": num_gpus * 24, # Assuming 24GB per GPU
"allocated_vram_gb": 0
},
"power_state": {
"total_watts": 0,
"gpu_watts": [0] * num_gpus
}
}
self.store_system_state()
def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]:
"""Initialize NVLink connection topology"""
topology = {}
for i in range(num_gpus):
for j in range(i + 1, num_gpus):
link_id = f"nvlink_{i}_{j}"
topology[link_id] = {
"gpu_a": i,
"gpu_b": j,
"bandwidth_gbps": 300, # NVLink 4.0 speed
"active": True
}
return topology
def store_system_state(self):
"""Store system state in HTTP storage"""
self.storage.store_state("multi_gpu_system", "state", self.system_state)
def allocate_distributed(self, size: int) -> List[str]:
"""Allocate memory across multiple GPUs"""
size_per_gpu = size // len(self.gpus)
block_ids = []
for gpu in self.gpus:
block_id = gpu.allocate_memory(size_per_gpu)
block_ids.append(block_id)
self.system_state["global_memory_state"]["allocated_vram_gb"] += size / (1024 * 1024 * 1024)
self.store_system_state()
return block_ids
def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str):
"""Transfer data between GPUs using NVLink simulation via HTTP"""
if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)):
raise ValueError("Invalid GPU indices")
link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}"
if link_id not in self.system_state["nvlink_state"]["connections"]:
raise ValueError("No NVLink connection between specified GPUs")
# Start transfer using HTTP API
transfer_id = f"transfer_{time.time_ns()}"
self.system_state["nvlink_state"]["active_transfers"][transfer_id] = {
"source_gpu": src_gpu,
"dest_gpu": dst_gpu,
"data_id": data_id,
"start_time": time.time_ns()
}
self.store_system_state()
# Use HTTP storage transfer method
new_block_id = self.storage.transfer_between_chips(src_gpu, dst_gpu, data_id)
if new_block_id:
# Update transfer state
self.system_state["nvlink_state"]["active_transfers"][transfer_id]["completed"] = True
self.system_state["nvlink_state"]["active_transfers"][transfer_id]["end_time"] = time.time_ns()
self.system_state["nvlink_state"]["active_transfers"][transfer_id]["new_data_id"] = new_block_id
self.store_system_state()
return new_block_id
return None
def schedule_distributed_compute(self, compute_graph: Dict[str, Any]):
"""Schedule computation across multiple GPUs"""
# Simple round-robin scheduling for now
scheduled_ops = []
for i, op in enumerate(compute_graph["operations"]):
gpu_index = i % len(self.gpus)
warp_id = self.gpus[gpu_index].schedule_compute(
sm_index=i % self.gpus[gpu_index].chip_state["num_sms"],
warp_state=op
)
scheduled_ops.append({
"op": op,
"gpu": gpu_index,
"warp_id": warp_id
})
# Store scheduling decision
self.storage.store_state(
"compute_schedule",
f"schedule_{time.time_ns()}",
{"operations": scheduled_ops}
)
return scheduled_ops
def synchronize(self):
"""Synchronize all GPUs using HTTP barrier"""
sync_point = f"sync_{time.time_ns()}"
# Create synchronization barrier
if not self.storage.create_sync_barrier(sync_point, len(self.gpus)):
raise RuntimeError("Failed to create synchronization barrier")
# Each GPU reaches the barrier
for i, gpu in enumerate(self.gpus):
gpu.chip_state["sync_point"] = sync_point
gpu.store_chip_state()
# Wait at barrier (in real implementation, this would be done in parallel)
while not self.storage.wait_sync_barrier(sync_point):
time.sleep(0.01) # Brief delay
self.system_state["last_sync"] = sync_point
self.store_system_state()
def get_system_stats(self) -> Dict[str, Any]:
"""Get comprehensive system statistics"""
stats = {
"num_gpus": len(self.gpus),
"total_vram_gb": self.system_state["global_memory_state"]["total_vram_gb"],
"allocated_vram_gb": self.system_state["global_memory_state"]["allocated_vram_gb"],
"gpus": [gpu.get_stats() for gpu in self.gpus],
"nvlink": {
"active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]),
"active_transfers": len(self.system_state["nvlink_state"]["active_transfers"])
},
"power": {
"total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus),
"per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus]
},
"connection_status": self.storage.get_connection_status()
}
return stats