Spaces:
Sleeping
Sleeping
| from http_storage import HTTPGPUStorage | |
| from gpu_chip import GPUChip | |
| from typing import Dict, Any, List, Optional | |
| import time | |
| import numpy as np | |
| class MultiGPUSystem: | |
| def __init__(self, num_gpus: int = 8, storage=None): | |
| self.storage = storage | |
| if self.storage is None: | |
| from http_storage import HTTPGPUStorage | |
| self.storage = HTTPGPUStorage() | |
| if not self.storage.wait_for_connection(): | |
| raise RuntimeError("Could not connect to GPU storage server") | |
| # Initialize GPUs with shared storage | |
| self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)] | |
| # Initialize system state | |
| self.system_state = { | |
| "num_gpus": num_gpus, | |
| "nvlink_state": { | |
| "connections": self._init_nvlink_topology(num_gpus), | |
| "active_transfers": {} | |
| }, | |
| "global_memory_state": { | |
| "total_vram_gb": num_gpus * 24, # Assuming 24GB per GPU | |
| "allocated_vram_gb": 0 | |
| }, | |
| "power_state": { | |
| "total_watts": 0, | |
| "gpu_watts": [0] * num_gpus | |
| } | |
| } | |
| self.store_system_state() | |
| def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]: | |
| """Initialize NVLink connection topology""" | |
| topology = {} | |
| for i in range(num_gpus): | |
| for j in range(i + 1, num_gpus): | |
| link_id = f"nvlink_{i}_{j}" | |
| topology[link_id] = { | |
| "gpu_a": i, | |
| "gpu_b": j, | |
| "bandwidth_gbps": 300, # NVLink 4.0 speed | |
| "active": True | |
| } | |
| return topology | |
| def store_system_state(self): | |
| """Store system state in HTTP storage""" | |
| self.storage.store_state("multi_gpu_system", "state", self.system_state) | |
| def allocate_distributed(self, size: int) -> List[str]: | |
| """Allocate memory across multiple GPUs""" | |
| size_per_gpu = size // len(self.gpus) | |
| block_ids = [] | |
| for gpu in self.gpus: | |
| block_id = gpu.allocate_memory(size_per_gpu) | |
| block_ids.append(block_id) | |
| self.system_state["global_memory_state"]["allocated_vram_gb"] += size / (1024 * 1024 * 1024) | |
| self.store_system_state() | |
| return block_ids | |
| def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str): | |
| """Transfer data between GPUs using NVLink simulation via HTTP""" | |
| if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)): | |
| raise ValueError("Invalid GPU indices") | |
| link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}" | |
| if link_id not in self.system_state["nvlink_state"]["connections"]: | |
| raise ValueError("No NVLink connection between specified GPUs") | |
| # Start transfer using HTTP API | |
| transfer_id = f"transfer_{time.time_ns()}" | |
| self.system_state["nvlink_state"]["active_transfers"][transfer_id] = { | |
| "source_gpu": src_gpu, | |
| "dest_gpu": dst_gpu, | |
| "data_id": data_id, | |
| "start_time": time.time_ns() | |
| } | |
| self.store_system_state() | |
| # Use HTTP storage transfer method | |
| new_block_id = self.storage.transfer_between_chips(src_gpu, dst_gpu, data_id) | |
| if new_block_id: | |
| # Update transfer state | |
| self.system_state["nvlink_state"]["active_transfers"][transfer_id]["completed"] = True | |
| self.system_state["nvlink_state"]["active_transfers"][transfer_id]["end_time"] = time.time_ns() | |
| self.system_state["nvlink_state"]["active_transfers"][transfer_id]["new_data_id"] = new_block_id | |
| self.store_system_state() | |
| return new_block_id | |
| return None | |
| def schedule_distributed_compute(self, compute_graph: Dict[str, Any]): | |
| """Schedule computation across multiple GPUs""" | |
| # Simple round-robin scheduling for now | |
| scheduled_ops = [] | |
| for i, op in enumerate(compute_graph["operations"]): | |
| gpu_index = i % len(self.gpus) | |
| warp_id = self.gpus[gpu_index].schedule_compute( | |
| sm_index=i % self.gpus[gpu_index].chip_state["num_sms"], | |
| warp_state=op | |
| ) | |
| scheduled_ops.append({ | |
| "op": op, | |
| "gpu": gpu_index, | |
| "warp_id": warp_id | |
| }) | |
| # Store scheduling decision | |
| self.storage.store_state( | |
| "compute_schedule", | |
| f"schedule_{time.time_ns()}", | |
| {"operations": scheduled_ops} | |
| ) | |
| return scheduled_ops | |
| def synchronize(self): | |
| """Synchronize all GPUs using HTTP barrier""" | |
| sync_point = f"sync_{time.time_ns()}" | |
| # Create synchronization barrier | |
| if not self.storage.create_sync_barrier(sync_point, len(self.gpus)): | |
| raise RuntimeError("Failed to create synchronization barrier") | |
| # Each GPU reaches the barrier | |
| for i, gpu in enumerate(self.gpus): | |
| gpu.chip_state["sync_point"] = sync_point | |
| gpu.store_chip_state() | |
| # Wait at barrier (in real implementation, this would be done in parallel) | |
| while not self.storage.wait_sync_barrier(sync_point): | |
| time.sleep(0.01) # Brief delay | |
| self.system_state["last_sync"] = sync_point | |
| self.store_system_state() | |
| def get_system_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive system statistics""" | |
| stats = { | |
| "num_gpus": len(self.gpus), | |
| "total_vram_gb": self.system_state["global_memory_state"]["total_vram_gb"], | |
| "allocated_vram_gb": self.system_state["global_memory_state"]["allocated_vram_gb"], | |
| "gpus": [gpu.get_stats() for gpu in self.gpus], | |
| "nvlink": { | |
| "active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]), | |
| "active_transfers": len(self.system_state["nvlink_state"]["active_transfers"]) | |
| }, | |
| "power": { | |
| "total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus), | |
| "per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus] | |
| }, | |
| "connection_status": self.storage.get_connection_status() | |
| } | |
| return stats | |