from http_storage import LocalStorage from gpu_chip import GPUChip from typing import Dict, Any, List, Optional import time import numpy as np import os from vram.ram_controller import RAMController from config import get_db_url class MultiGPUSystem: def __init__(self, num_gpus: int = 8, db_url: str = None): # Initialize with remote storage using config self.storage = LocalStorage(db_url=db_url or get_db_url()) if not self.storage.wait_for_connection(timeout=30): raise RuntimeError("Could not initialize remote storage connection") # Initialize GPUs with shared storage self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)] # Initialize GPUs with shared storage self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)] # Initialize system state with unlimited memory and enhanced tracking self.system_state = { "num_gpus": num_gpus, "nvlink_state": { "connections": self._init_nvlink_topology(num_gpus), "active_transfers": {} }, "global_memory_state": { "total_vram_gb": float('inf'), # Unlimited total VRAM "allocated_vram_gb": 0, "virtual_vram_gb": 0, "allocation_map": {}, # Tracks all allocations "physical_vram_gb": num_gpus * 84 # Track physical VRAM for reference }, "power_state": { "total_watts": 0, "gpu_watts": [0] * num_gpus, "efficiency_metrics": {} }, "compute_state": { "active_operations": {}, "completed_operations": {}, "gpu_loads": [0] * num_gpus } } self.store_system_state() def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]: """Initialize NVLink connection topology""" topology = {} for i in range(num_gpus): for j in range(i + 1, num_gpus): link_id = f"nvlink_{i}_{j}" topology[link_id] = { "gpu_a": i, "gpu_b": j, "bandwidth_gbps": 300, # NVLink 4.0 speed "active": True } return topology def store_system_state(self): """Store system state in remote storage""" # Store system state with parent tracking state_id = f"system_state_{time.time_ns()}" self.storage.store_state("multi_gpu_system", state_id, { "state": self.system_state, "metadata": { "timestamp": time.time_ns(), "num_gpus": len(self.gpus), "state_type": "system_state" } }) def allocate_distributed(self, size: int) -> List[str]: """Allocate memory with unlimited capacity using optimized distribution""" block_ids = [] allocation_size = size # Create allocation entries with dynamic distribution for i in range(len(self.gpus)): block_id = f"block_{time.time_ns()}_{i}" allocation_size_per_gpu = allocation_size // (len(self.gpus) - i) allocation_size -= allocation_size_per_gpu # Record allocation with enhanced metadata self.system_state["global_memory_state"]["allocation_map"][block_id] = { "size": allocation_size_per_gpu, "gpu_id": i, "timestamp": time.time_ns(), "access_count": 0, "last_access": time.time_ns(), "virtual_addr": f"vaddr_{block_id}" } # Store allocation info in remote storage with proper metadata self.storage.store_state( "gpu_allocation", block_id, { "size": allocation_size_per_gpu, "gpu": i, "metadata": { "creation_time": time.time_ns(), "allocation_type": "distributed", "virtual_mapping": True, "device_id": f"gpu_{i}" } } ) block_ids.append(block_id) total_gb = size / (1024 * 1024 * 1024) self.system_state["global_memory_state"]["allocated_vram_gb"] += total_gb self.system_state["global_memory_state"]["virtual_vram_gb"] += total_gb self.store_system_state() return block_ids def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str): """Transfer data between GPUs using NVLink with local storage at electron speed""" if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)): raise ValueError("Invalid GPU indices") link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}" transfer_id = f"transfer_{time.time_ns()}" # Start transfer with enhanced tracking self.system_state["nvlink_state"]["active_transfers"][transfer_id] = { "source_gpu": src_gpu, "dest_gpu": dst_gpu, "data_id": data_id, "start_time": time.time_ns(), "transfer_type": "nvlink", "bandwidth_gbps": 300, # NVLink 4.0 "status": "in_progress" } # Transfer through remote storage at electron speed data = self.storage.load_tensor(data_id) if data is not None: new_block_id = f"block_{time.time_ns()}" # Store tensor with GPU-specific metadata self.storage.store_tensor( new_block_id, data, model_size=data.nbytes if hasattr(data, 'nbytes') else len(data) ) # Update allocation map with transfer metadata if data_id in self.system_state["global_memory_state"]["allocation_map"]: size = self.system_state["global_memory_state"]["allocation_map"][data_id]["size"] self.system_state["global_memory_state"]["allocation_map"][new_block_id] = { "size": size, "gpu_id": dst_gpu, "timestamp": time.time_ns(), "transfer_history": [{ "from_gpu": src_gpu, "to_gpu": dst_gpu, "time": time.time_ns(), "nvlink_id": link_id, "transfer_type": "nvlink_direct" }] } # Store transfer state in database self.storage.store_state( "gpu_transfer", transfer_id, { "status": "completed", "source_gpu": src_gpu, "dest_gpu": dst_gpu, "data_id": data_id, "new_block_id": new_block_id, "size_bytes": data.nbytes if hasattr(data, 'nbytes') else len(data), "start_time": time.time_ns(), "end_time": time.time_ns(), "nvlink_id": link_id } ) # Update system state self.system_state["nvlink_state"]["active_transfers"][transfer_id].update({ "completed": True, "end_time": time.time_ns(), "new_block_id": new_block_id, "status": "completed", "transfer_size_bytes": data.nbytes if hasattr(data, 'nbytes') else len(data) }) self.store_system_state() return new_block_id return None def schedule_distributed_compute(self, compute_graph: Dict[str, Any]): """Schedule computation across multiple GPUs using intelligent distribution""" from gpu_parallel_distributor import GPUParallelDistributor # Initialize parallel distributor distributor = GPUParallelDistributor(num_gpus=len(self.gpus)) scheduled_ops = [] # Distribute each operation optimally across GPUs for op in compute_graph["operations"]: distributed_chunks = distributor.distribute_operation(op) for chunk in distributed_chunks: gpu_id = chunk["gpu_id"] # Schedule on specific GPU with optimal SM selection warp_id = self.gpus[gpu_id].schedule_compute( sm_index=hash(str(chunk)) % self.gpus[gpu_id].chip_state["num_sms"], warp_state=chunk ) scheduled_ops.append({ "op": chunk, "gpu": gpu_id, "warp_id": warp_id }) # Store scheduling decision with metadata schedule_id = f"schedule_{time.time_ns()}" self.storage.store_state( "compute_schedule", schedule_id, { "operations": scheduled_ops, "metadata": { "timestamp": time.time_ns(), "num_gpus": len(self.gpus), "schedule_type": "distributed", "gpu_utilization": { f"gpu_{i}": len([op for op in scheduled_ops if op["gpu"] == i]) for i in range(len(self.gpus)) } } } ) return scheduled_ops def synchronize(self): """Synchronize all GPUs with local barrier at electron speed""" sync_point = time.time_ns() # Record sync start in system state self.system_state["sync_state"] = { "sync_point": sync_point, "start_time": time.time_ns(), "status": "in_progress", "gpu_status": {} } # Each GPU synchronizes at electron speed for i, gpu in enumerate(self.gpus): gpu.chip_state["sync_point"] = sync_point gpu.store_chip_state() # Record individual GPU sync status self.system_state["sync_state"]["gpu_status"][i] = { "reached_barrier": True, "timestamp": time.time_ns(), "state": "synced" } # Update system state with sync completion self.system_state["sync_state"]["status"] = "completed" self.system_state["sync_state"]["completion_time"] = time.time_ns() self.system_state["last_sync"] = sync_point # Store final state self.store_system_state() def get_system_stats(self) -> Dict[str, Any]: """Get comprehensive system statistics with enhanced metrics from remote storage""" # Get block statistics from database block_stats = self.storage.conn.execute(""" SELECT COUNT(*) as total_blocks, COALESCE(SUM(size), 0) as total_size, COUNT(DISTINCT device_id) as active_devices, MIN(created_at) as oldest_block, MAX(last_accessed) as latest_access FROM vram_blocks """).fetchone() # Get per-GPU memory usage gpu_memory = self.storage.conn.execute(""" SELECT device_id, COUNT(*) as block_count, COALESCE(SUM(size), 0) as used_memory, COUNT(CASE WHEN is_pinned THEN 1 END) as pinned_blocks FROM vram_blocks WHERE device_id IS NOT NULL GROUP BY device_id """).fetchall() # Get transfer statistics transfer_stats = self.storage.conn.execute(""" SELECT COUNT(*) as transfer_count, COALESCE(SUM(metadata->>'size'), 0) as total_transferred FROM states WHERE name = 'gpu_transfer' AND metadata->>'status' = 'completed' AND created_at >= CURRENT_TIMESTAMP - INTERVAL 1 HOUR """).fetchone() stats = { "num_gpus": len(self.gpus), "memory_state": { "physical_vram_gb": self.system_state["global_memory_state"]["physical_vram_gb"], "allocated_physical_gb": self.system_state["global_memory_state"]["allocated_vram_gb"], "virtual_vram_gb": self.system_state["global_memory_state"]["virtual_vram_gb"], "total_available_gb": float('inf'), "allocation_count": block_stats[0], "total_allocated_bytes": block_stats[1], "active_devices": block_stats[2], "oldest_allocation": block_stats[3], "latest_access": block_stats[4], "per_gpu_usage": { row[0]: { "blocks": row[1], "bytes_used": row[2], "pinned_blocks": row[3] } for row in gpu_memory } }, "gpus": [gpu.get_stats() for gpu in self.gpus], "nvlink": { "active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]), "active_transfers": len(self.system_state["nvlink_state"]["active_transfers"]), "total_bandwidth_tbps": len(self.gpus) * 300 / 1000, # Total NVLink bandwidth "transfer_history": self.system_state["nvlink_state"]["active_transfers"], "hourly_transfers": transfer_stats[0], "hourly_bytes_transferred": transfer_stats[1] }, "power": { "total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus), "per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus], "efficiency_metrics": self.system_state["power_state"]["efficiency_metrics"] }, "compute": { "active_operations": len(self.system_state["compute_state"]["active_operations"]), "completed_operations": len(self.system_state["compute_state"]["completed_operations"]), "gpu_loads": self.system_state["compute_state"]["gpu_loads"] }, "storage": { "path": self.storage.base_path, "virtual_blocks": len([k for k in os.listdir(self.storage.base_path) if k.startswith("virtual_block")]), "total_stored_tensors": len(self.system_state["global_memory_state"]["allocation_map"]) } } return stats