from websocket_storage import WebSocketGPUStorage from gpu_chip import GPUChip from typing import Dict, Any, List, Optional import time import numpy as np class MultiGPUSystem: def __init__(self, num_gpus: int = 8, storage=None): self.storage = storage if self.storage is None: from websocket_storage import WebSocketGPUStorage self.storage = WebSocketGPUStorage() if not self.storage.wait_for_connection(): raise RuntimeError("Could not connect to GPU storage server") # Initialize GPUs with shared storage self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)] # Initialize system state self.system_state = { "num_gpus": num_gpus, "nvlink_state": { "connections": self._init_nvlink_topology(num_gpus), "active_transfers": {} }, "global_memory_state": { "total_vram_gb": num_gpus * 24, # Assuming 24GB per GPU "allocated_vram_gb": 0 }, "power_state": { "total_watts": 0, "gpu_watts": [0] * num_gpus } } self.store_system_state() def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]: """Initialize NVLink connection topology""" topology = {} for i in range(num_gpus): for j in range(i + 1, num_gpus): link_id = f"nvlink_{i}_{j}" topology[link_id] = { "gpu_a": i, "gpu_b": j, "bandwidth_gbps": 300, # NVLink 4.0 speed "active": True } return topology def store_system_state(self): """Store system state in WebSocket storage""" self.storage.store_state("multi_gpu_system", "state", self.system_state) def allocate_distributed(self, size: int) -> List[str]: """Allocate memory across multiple GPUs""" size_per_gpu = size // len(self.gpus) block_ids = [] for gpu in self.gpus: block_id = gpu.allocate_memory(size_per_gpu) block_ids.append(block_id) self.system_state["global_memory_state"]["allocated_vram_gb"] += size / (1024 * 1024 * 1024) self.store_system_state() return block_ids def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str): """Transfer data between GPUs using NVLink""" if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)): raise ValueError("Invalid GPU indices") link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}" if link_id not in self.system_state["nvlink_state"]["connections"]: raise ValueError("No NVLink connection between specified GPUs") # Start transfer transfer_id = f"transfer_{time.time_ns()}" self.system_state["nvlink_state"]["active_transfers"][transfer_id] = { "source_gpu": src_gpu, "dest_gpu": dst_gpu, "data_id": data_id, "start_time": time.time_ns() } self.store_system_state() # Get data from source GPU data = self.storage.load_tensor(data_id) if data is not None: # Store in destination GPU new_block_id = self.gpus[dst_gpu].allocate_memory(len(data)) self.storage.store_tensor(new_block_id, data) # Update transfer state self.system_state["nvlink_state"]["active_transfers"][transfer_id]["completed"] = True self.system_state["nvlink_state"]["active_transfers"][transfer_id]["end_time"] = time.time_ns() self.store_system_state() return new_block_id return None def schedule_distributed_compute(self, compute_graph: Dict[str, Any]): """Schedule computation across multiple GPUs""" # Simple round-robin scheduling for now scheduled_ops = [] for i, op in enumerate(compute_graph["operations"]): gpu_index = i % len(self.gpus) warp_id = self.gpus[gpu_index].schedule_compute( sm_index=i % self.gpus[gpu_index].chip_state["num_sms"], warp_state=op ) scheduled_ops.append({ "op": op, "gpu": gpu_index, "warp_id": warp_id }) # Store scheduling decision self.storage.store_state( "compute_schedule", f"schedule_{time.time_ns()}", {"operations": scheduled_ops} ) return scheduled_ops def synchronize(self): """Synchronize all GPUs""" sync_point = f"sync_{time.time_ns()}" for i, gpu in enumerate(self.gpus): gpu.chip_state["sync_point"] = sync_point gpu.store_chip_state() self.system_state["last_sync"] = sync_point self.store_system_state() def get_system_stats(self) -> Dict[str, Any]: """Get comprehensive system statistics""" stats = { "num_gpus": len(self.gpus), "total_vram_gb": self.system_state["global_memory_state"]["total_vram_gb"], "allocated_vram_gb": self.system_state["global_memory_state"]["allocated_vram_gb"], "gpus": [gpu.get_stats() for gpu in self.gpus], "nvlink": { "active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]), "active_transfers": len(self.system_state["nvlink_state"]["active_transfers"]) }, "power": { "total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus), "per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus] } } return stats