INV / multi_gpu_system_http.py
Fred808's picture
Upload 256 files
7a0c684 verified
from http_storage import LocalStorage
from gpu_chip import GPUChip
from typing import Dict, Any, List, Optional
import time
import numpy as np
import os
from vram.ram_controller import RAMController
from config import get_db_url
class MultiGPUSystem:
def __init__(self, num_gpus: int = 8, db_url: str = None):
# Initialize with remote storage using config
self.storage = LocalStorage(db_url=db_url or get_db_url())
if not self.storage.wait_for_connection(timeout=30):
raise RuntimeError("Could not initialize remote storage connection")
# Initialize GPUs with shared storage
self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)]
# Initialize GPUs with shared storage
self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)]
# Initialize system state with unlimited memory and enhanced tracking
self.system_state = {
"num_gpus": num_gpus,
"nvlink_state": {
"connections": self._init_nvlink_topology(num_gpus),
"active_transfers": {}
},
"global_memory_state": {
"total_vram_gb": float('inf'), # Unlimited total VRAM
"allocated_vram_gb": 0,
"virtual_vram_gb": 0,
"allocation_map": {}, # Tracks all allocations
"physical_vram_gb": num_gpus * 84 # Track physical VRAM for reference
},
"power_state": {
"total_watts": 0,
"gpu_watts": [0] * num_gpus,
"efficiency_metrics": {}
},
"compute_state": {
"active_operations": {},
"completed_operations": {},
"gpu_loads": [0] * num_gpus
}
}
self.store_system_state()
def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]:
"""Initialize NVLink connection topology"""
topology = {}
for i in range(num_gpus):
for j in range(i + 1, num_gpus):
link_id = f"nvlink_{i}_{j}"
topology[link_id] = {
"gpu_a": i,
"gpu_b": j,
"bandwidth_gbps": 300, # NVLink 4.0 speed
"active": True
}
return topology
def store_system_state(self):
"""Store system state in remote storage"""
# Store system state with parent tracking
state_id = f"system_state_{time.time_ns()}"
self.storage.store_state("multi_gpu_system", state_id, {
"state": self.system_state,
"metadata": {
"timestamp": time.time_ns(),
"num_gpus": len(self.gpus),
"state_type": "system_state"
}
})
def allocate_distributed(self, size: int) -> List[str]:
"""Allocate memory with unlimited capacity using optimized distribution"""
block_ids = []
allocation_size = size
# Create allocation entries with dynamic distribution
for i in range(len(self.gpus)):
block_id = f"block_{time.time_ns()}_{i}"
allocation_size_per_gpu = allocation_size // (len(self.gpus) - i)
allocation_size -= allocation_size_per_gpu
# Record allocation with enhanced metadata
self.system_state["global_memory_state"]["allocation_map"][block_id] = {
"size": allocation_size_per_gpu,
"gpu_id": i,
"timestamp": time.time_ns(),
"access_count": 0,
"last_access": time.time_ns(),
"virtual_addr": f"vaddr_{block_id}"
}
# Store allocation info in remote storage with proper metadata
self.storage.store_state(
"gpu_allocation",
block_id,
{
"size": allocation_size_per_gpu,
"gpu": i,
"metadata": {
"creation_time": time.time_ns(),
"allocation_type": "distributed",
"virtual_mapping": True,
"device_id": f"gpu_{i}"
}
}
)
block_ids.append(block_id)
total_gb = size / (1024 * 1024 * 1024)
self.system_state["global_memory_state"]["allocated_vram_gb"] += total_gb
self.system_state["global_memory_state"]["virtual_vram_gb"] += total_gb
self.store_system_state()
return block_ids
def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str):
"""Transfer data between GPUs using NVLink with local storage at electron speed"""
if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)):
raise ValueError("Invalid GPU indices")
link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}"
transfer_id = f"transfer_{time.time_ns()}"
# Start transfer with enhanced tracking
self.system_state["nvlink_state"]["active_transfers"][transfer_id] = {
"source_gpu": src_gpu,
"dest_gpu": dst_gpu,
"data_id": data_id,
"start_time": time.time_ns(),
"transfer_type": "nvlink",
"bandwidth_gbps": 300, # NVLink 4.0
"status": "in_progress"
}
# Transfer through remote storage at electron speed
data = self.storage.load_tensor(data_id)
if data is not None:
new_block_id = f"block_{time.time_ns()}"
# Store tensor with GPU-specific metadata
self.storage.store_tensor(
new_block_id,
data,
model_size=data.nbytes if hasattr(data, 'nbytes') else len(data)
)
# Update allocation map with transfer metadata
if data_id in self.system_state["global_memory_state"]["allocation_map"]:
size = self.system_state["global_memory_state"]["allocation_map"][data_id]["size"]
self.system_state["global_memory_state"]["allocation_map"][new_block_id] = {
"size": size,
"gpu_id": dst_gpu,
"timestamp": time.time_ns(),
"transfer_history": [{
"from_gpu": src_gpu,
"to_gpu": dst_gpu,
"time": time.time_ns(),
"nvlink_id": link_id,
"transfer_type": "nvlink_direct"
}]
}
# Store transfer state in database
self.storage.store_state(
"gpu_transfer",
transfer_id,
{
"status": "completed",
"source_gpu": src_gpu,
"dest_gpu": dst_gpu,
"data_id": data_id,
"new_block_id": new_block_id,
"size_bytes": data.nbytes if hasattr(data, 'nbytes') else len(data),
"start_time": time.time_ns(),
"end_time": time.time_ns(),
"nvlink_id": link_id
}
)
# Update system state
self.system_state["nvlink_state"]["active_transfers"][transfer_id].update({
"completed": True,
"end_time": time.time_ns(),
"new_block_id": new_block_id,
"status": "completed",
"transfer_size_bytes": data.nbytes if hasattr(data, 'nbytes') else len(data)
})
self.store_system_state()
return new_block_id
return None
def schedule_distributed_compute(self, compute_graph: Dict[str, Any]):
"""Schedule computation across multiple GPUs using intelligent distribution"""
from gpu_parallel_distributor import GPUParallelDistributor
# Initialize parallel distributor
distributor = GPUParallelDistributor(num_gpus=len(self.gpus))
scheduled_ops = []
# Distribute each operation optimally across GPUs
for op in compute_graph["operations"]:
distributed_chunks = distributor.distribute_operation(op)
for chunk in distributed_chunks:
gpu_id = chunk["gpu_id"]
# Schedule on specific GPU with optimal SM selection
warp_id = self.gpus[gpu_id].schedule_compute(
sm_index=hash(str(chunk)) % self.gpus[gpu_id].chip_state["num_sms"],
warp_state=chunk
)
scheduled_ops.append({
"op": chunk,
"gpu": gpu_id,
"warp_id": warp_id
})
# Store scheduling decision with metadata
schedule_id = f"schedule_{time.time_ns()}"
self.storage.store_state(
"compute_schedule",
schedule_id,
{
"operations": scheduled_ops,
"metadata": {
"timestamp": time.time_ns(),
"num_gpus": len(self.gpus),
"schedule_type": "distributed",
"gpu_utilization": {
f"gpu_{i}": len([op for op in scheduled_ops if op["gpu"] == i])
for i in range(len(self.gpus))
}
}
}
)
return scheduled_ops
def synchronize(self):
"""Synchronize all GPUs with local barrier at electron speed"""
sync_point = time.time_ns()
# Record sync start in system state
self.system_state["sync_state"] = {
"sync_point": sync_point,
"start_time": time.time_ns(),
"status": "in_progress",
"gpu_status": {}
}
# Each GPU synchronizes at electron speed
for i, gpu in enumerate(self.gpus):
gpu.chip_state["sync_point"] = sync_point
gpu.store_chip_state()
# Record individual GPU sync status
self.system_state["sync_state"]["gpu_status"][i] = {
"reached_barrier": True,
"timestamp": time.time_ns(),
"state": "synced"
}
# Update system state with sync completion
self.system_state["sync_state"]["status"] = "completed"
self.system_state["sync_state"]["completion_time"] = time.time_ns()
self.system_state["last_sync"] = sync_point
# Store final state
self.store_system_state()
def get_system_stats(self) -> Dict[str, Any]:
"""Get comprehensive system statistics with enhanced metrics from remote storage"""
# Get block statistics from database
block_stats = self.storage.conn.execute("""
SELECT
COUNT(*) as total_blocks,
COALESCE(SUM(size), 0) as total_size,
COUNT(DISTINCT device_id) as active_devices,
MIN(created_at) as oldest_block,
MAX(last_accessed) as latest_access
FROM vram_blocks
""").fetchone()
# Get per-GPU memory usage
gpu_memory = self.storage.conn.execute("""
SELECT
device_id,
COUNT(*) as block_count,
COALESCE(SUM(size), 0) as used_memory,
COUNT(CASE WHEN is_pinned THEN 1 END) as pinned_blocks
FROM vram_blocks
WHERE device_id IS NOT NULL
GROUP BY device_id
""").fetchall()
# Get transfer statistics
transfer_stats = self.storage.conn.execute("""
SELECT
COUNT(*) as transfer_count,
COALESCE(SUM(metadata->>'size'), 0) as total_transferred
FROM states
WHERE name = 'gpu_transfer'
AND metadata->>'status' = 'completed'
AND created_at >= CURRENT_TIMESTAMP - INTERVAL 1 HOUR
""").fetchone()
stats = {
"num_gpus": len(self.gpus),
"memory_state": {
"physical_vram_gb": self.system_state["global_memory_state"]["physical_vram_gb"],
"allocated_physical_gb": self.system_state["global_memory_state"]["allocated_vram_gb"],
"virtual_vram_gb": self.system_state["global_memory_state"]["virtual_vram_gb"],
"total_available_gb": float('inf'),
"allocation_count": block_stats[0],
"total_allocated_bytes": block_stats[1],
"active_devices": block_stats[2],
"oldest_allocation": block_stats[3],
"latest_access": block_stats[4],
"per_gpu_usage": {
row[0]: {
"blocks": row[1],
"bytes_used": row[2],
"pinned_blocks": row[3]
} for row in gpu_memory
}
},
"gpus": [gpu.get_stats() for gpu in self.gpus],
"nvlink": {
"active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]),
"active_transfers": len(self.system_state["nvlink_state"]["active_transfers"]),
"total_bandwidth_tbps": len(self.gpus) * 300 / 1000, # Total NVLink bandwidth
"transfer_history": self.system_state["nvlink_state"]["active_transfers"],
"hourly_transfers": transfer_stats[0],
"hourly_bytes_transferred": transfer_stats[1]
},
"power": {
"total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus),
"per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus],
"efficiency_metrics": self.system_state["power_state"]["efficiency_metrics"]
},
"compute": {
"active_operations": len(self.system_state["compute_state"]["active_operations"]),
"completed_operations": len(self.system_state["compute_state"]["completed_operations"]),
"gpu_loads": self.system_state["compute_state"]["gpu_loads"]
},
"storage": {
"path": self.storage.base_path,
"virtual_blocks": len([k for k in os.listdir(self.storage.base_path) if k.startswith("virtual_block")]),
"total_stored_tensors": len(self.system_state["global_memory_state"]["allocation_map"])
}
}
return stats