from http_storage import HTTPGPUStorage from gpu_chip import GPUChip from typing import Dict, Any, List, Optional import time import numpy as np class MultiGPUSystem: def __init__(self, num_gpus: int = 8, storage=None): self.storage = storage if self.storage is None: from http_storage import HTTPGPUStorage self.storage = HTTPGPUStorage() if not self.storage.wait_for_connection(): raise RuntimeError("Could not connect to GPU storage server") # Initialize GPUs with shared storage self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)] # Initialize system state self.system_state = { "num_gpus": num_gpus, "nvlink_state": { "connections": self._init_nvlink_topology(num_gpus), "active_transfers": {} }, "global_memory_state": { "total_vram_gb": num_gpus * 24, # Assuming 24GB per GPU "allocated_vram_gb": 0 }, "power_state": { "total_watts": 0, "gpu_watts": [0] * num_gpus } } self.store_system_state() def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]: """Initialize NVLink connection topology""" topology = {} for i in range(num_gpus): for j in range(i + 1, num_gpus): link_id = f"nvlink_{i}_{j}" topology[link_id] = { "gpu_a": i, "gpu_b": j, "bandwidth_gbps": 300, # NVLink 4.0 speed "active": True } return topology def store_system_state(self): """Store system state in HTTP storage""" self.storage.store_state("multi_gpu_system", "state", self.system_state) def allocate_distributed(self, size: int) -> List[str]: """Allocate memory across multiple GPUs""" size_per_gpu = size // len(self.gpus) block_ids = [] for gpu in self.gpus: block_id = gpu.allocate_memory(size_per_gpu) block_ids.append(block_id) self.system_state["global_memory_state"]["allocated_vram_gb"] += size / (1024 * 1024 * 1024) self.store_system_state() return block_ids def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str): """Transfer data between GPUs using NVLink simulation via HTTP""" if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)): raise ValueError("Invalid GPU indices") link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}" if link_id not in self.system_state["nvlink_state"]["connections"]: raise ValueError("No NVLink connection between specified GPUs") # Start transfer using HTTP API transfer_id = f"transfer_{time.time_ns()}" self.system_state["nvlink_state"]["active_transfers"][transfer_id] = { "source_gpu": src_gpu, "dest_gpu": dst_gpu, "data_id": data_id, "start_time": time.time_ns() } self.store_system_state() # Use HTTP storage transfer method new_block_id = self.storage.transfer_between_chips(src_gpu, dst_gpu, data_id) if new_block_id: # Update transfer state self.system_state["nvlink_state"]["active_transfers"][transfer_id]["completed"] = True self.system_state["nvlink_state"]["active_transfers"][transfer_id]["end_time"] = time.time_ns() self.system_state["nvlink_state"]["active_transfers"][transfer_id]["new_data_id"] = new_block_id self.store_system_state() return new_block_id return None def schedule_distributed_compute(self, compute_graph: Dict[str, Any]): """Schedule computation across multiple GPUs""" # Simple round-robin scheduling for now scheduled_ops = [] for i, op in enumerate(compute_graph["operations"]): gpu_index = i % len(self.gpus) warp_id = self.gpus[gpu_index].schedule_compute( sm_index=i % self.gpus[gpu_index].chip_state["num_sms"], warp_state=op ) scheduled_ops.append({ "op": op, "gpu": gpu_index, "warp_id": warp_id }) # Store scheduling decision self.storage.store_state( "compute_schedule", f"schedule_{time.time_ns()}", {"operations": scheduled_ops} ) return scheduled_ops def synchronize(self): """Synchronize all GPUs using HTTP barrier""" sync_point = f"sync_{time.time_ns()}" # Create synchronization barrier if not self.storage.create_sync_barrier(sync_point, len(self.gpus)): raise RuntimeError("Failed to create synchronization barrier") # Each GPU reaches the barrier for i, gpu in enumerate(self.gpus): gpu.chip_state["sync_point"] = sync_point gpu.store_chip_state() # Wait at barrier (in real implementation, this would be done in parallel) while not self.storage.wait_sync_barrier(sync_point): time.sleep(0.01) # Brief delay self.system_state["last_sync"] = sync_point self.store_system_state() def get_system_stats(self) -> Dict[str, Any]: """Get comprehensive system statistics""" stats = { "num_gpus": len(self.gpus), "total_vram_gb": self.system_state["global_memory_state"]["total_vram_gb"], "allocated_vram_gb": self.system_state["global_memory_state"]["allocated_vram_gb"], "gpus": [gpu.get_stats() for gpu in self.gpus], "nvlink": { "active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]), "active_transfers": len(self.system_state["nvlink_state"]["active_transfers"]) }, "power": { "total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus), "per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus] }, "connection_status": self.storage.get_connection_status() } return stats