|
|
from http_storage import LocalStorage
|
|
|
from gpu_chip import GPUChip
|
|
|
from typing import Dict, Any, List, Optional
|
|
|
import time
|
|
|
import numpy as np
|
|
|
import os
|
|
|
from vram.ram_controller import RAMController
|
|
|
from config import get_db_url
|
|
|
|
|
|
class MultiGPUSystem:
|
|
|
def __init__(self, num_gpus: int = 8, db_url: str = None):
|
|
|
|
|
|
self.storage = LocalStorage(db_url=db_url or get_db_url())
|
|
|
if not self.storage.wait_for_connection(timeout=30):
|
|
|
raise RuntimeError("Could not initialize remote storage connection")
|
|
|
|
|
|
|
|
|
self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)]
|
|
|
|
|
|
|
|
|
self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)]
|
|
|
|
|
|
|
|
|
self.system_state = {
|
|
|
"num_gpus": num_gpus,
|
|
|
"nvlink_state": {
|
|
|
"connections": self._init_nvlink_topology(num_gpus),
|
|
|
"active_transfers": {}
|
|
|
},
|
|
|
"global_memory_state": {
|
|
|
"total_vram_gb": float('inf'),
|
|
|
"allocated_vram_gb": 0,
|
|
|
"virtual_vram_gb": 0,
|
|
|
"allocation_map": {},
|
|
|
"physical_vram_gb": num_gpus * 84
|
|
|
},
|
|
|
"power_state": {
|
|
|
"total_watts": 0,
|
|
|
"gpu_watts": [0] * num_gpus,
|
|
|
"efficiency_metrics": {}
|
|
|
},
|
|
|
"compute_state": {
|
|
|
"active_operations": {},
|
|
|
"completed_operations": {},
|
|
|
"gpu_loads": [0] * num_gpus
|
|
|
}
|
|
|
}
|
|
|
self.store_system_state()
|
|
|
|
|
|
def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]:
|
|
|
"""Initialize NVLink connection topology"""
|
|
|
topology = {}
|
|
|
for i in range(num_gpus):
|
|
|
for j in range(i + 1, num_gpus):
|
|
|
link_id = f"nvlink_{i}_{j}"
|
|
|
topology[link_id] = {
|
|
|
"gpu_a": i,
|
|
|
"gpu_b": j,
|
|
|
"bandwidth_gbps": 300,
|
|
|
"active": True
|
|
|
}
|
|
|
return topology
|
|
|
|
|
|
def store_system_state(self):
|
|
|
"""Store system state in remote storage"""
|
|
|
|
|
|
state_id = f"system_state_{time.time_ns()}"
|
|
|
|
|
|
self.storage.store_state("multi_gpu_system", state_id, {
|
|
|
"state": self.system_state,
|
|
|
"metadata": {
|
|
|
"timestamp": time.time_ns(),
|
|
|
"num_gpus": len(self.gpus),
|
|
|
"state_type": "system_state"
|
|
|
}
|
|
|
})
|
|
|
|
|
|
def allocate_distributed(self, size: int) -> List[str]:
|
|
|
"""Allocate memory with unlimited capacity using optimized distribution"""
|
|
|
block_ids = []
|
|
|
allocation_size = size
|
|
|
|
|
|
|
|
|
for i in range(len(self.gpus)):
|
|
|
block_id = f"block_{time.time_ns()}_{i}"
|
|
|
allocation_size_per_gpu = allocation_size // (len(self.gpus) - i)
|
|
|
allocation_size -= allocation_size_per_gpu
|
|
|
|
|
|
|
|
|
self.system_state["global_memory_state"]["allocation_map"][block_id] = {
|
|
|
"size": allocation_size_per_gpu,
|
|
|
"gpu_id": i,
|
|
|
"timestamp": time.time_ns(),
|
|
|
"access_count": 0,
|
|
|
"last_access": time.time_ns(),
|
|
|
"virtual_addr": f"vaddr_{block_id}"
|
|
|
}
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
"gpu_allocation",
|
|
|
block_id,
|
|
|
{
|
|
|
"size": allocation_size_per_gpu,
|
|
|
"gpu": i,
|
|
|
"metadata": {
|
|
|
"creation_time": time.time_ns(),
|
|
|
"allocation_type": "distributed",
|
|
|
"virtual_mapping": True,
|
|
|
"device_id": f"gpu_{i}"
|
|
|
}
|
|
|
}
|
|
|
)
|
|
|
|
|
|
block_ids.append(block_id)
|
|
|
|
|
|
total_gb = size / (1024 * 1024 * 1024)
|
|
|
self.system_state["global_memory_state"]["allocated_vram_gb"] += total_gb
|
|
|
self.system_state["global_memory_state"]["virtual_vram_gb"] += total_gb
|
|
|
self.store_system_state()
|
|
|
|
|
|
return block_ids
|
|
|
|
|
|
def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str):
|
|
|
"""Transfer data between GPUs using NVLink with local storage at electron speed"""
|
|
|
if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)):
|
|
|
raise ValueError("Invalid GPU indices")
|
|
|
|
|
|
link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}"
|
|
|
transfer_id = f"transfer_{time.time_ns()}"
|
|
|
|
|
|
|
|
|
self.system_state["nvlink_state"]["active_transfers"][transfer_id] = {
|
|
|
"source_gpu": src_gpu,
|
|
|
"dest_gpu": dst_gpu,
|
|
|
"data_id": data_id,
|
|
|
"start_time": time.time_ns(),
|
|
|
"transfer_type": "nvlink",
|
|
|
"bandwidth_gbps": 300,
|
|
|
"status": "in_progress"
|
|
|
}
|
|
|
|
|
|
|
|
|
data = self.storage.load_tensor(data_id)
|
|
|
if data is not None:
|
|
|
new_block_id = f"block_{time.time_ns()}"
|
|
|
|
|
|
|
|
|
self.storage.store_tensor(
|
|
|
new_block_id,
|
|
|
data,
|
|
|
model_size=data.nbytes if hasattr(data, 'nbytes') else len(data)
|
|
|
)
|
|
|
|
|
|
|
|
|
if data_id in self.system_state["global_memory_state"]["allocation_map"]:
|
|
|
size = self.system_state["global_memory_state"]["allocation_map"][data_id]["size"]
|
|
|
self.system_state["global_memory_state"]["allocation_map"][new_block_id] = {
|
|
|
"size": size,
|
|
|
"gpu_id": dst_gpu,
|
|
|
"timestamp": time.time_ns(),
|
|
|
"transfer_history": [{
|
|
|
"from_gpu": src_gpu,
|
|
|
"to_gpu": dst_gpu,
|
|
|
"time": time.time_ns(),
|
|
|
"nvlink_id": link_id,
|
|
|
"transfer_type": "nvlink_direct"
|
|
|
}]
|
|
|
}
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
"gpu_transfer",
|
|
|
transfer_id,
|
|
|
{
|
|
|
"status": "completed",
|
|
|
"source_gpu": src_gpu,
|
|
|
"dest_gpu": dst_gpu,
|
|
|
"data_id": data_id,
|
|
|
"new_block_id": new_block_id,
|
|
|
"size_bytes": data.nbytes if hasattr(data, 'nbytes') else len(data),
|
|
|
"start_time": time.time_ns(),
|
|
|
"end_time": time.time_ns(),
|
|
|
"nvlink_id": link_id
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
self.system_state["nvlink_state"]["active_transfers"][transfer_id].update({
|
|
|
"completed": True,
|
|
|
"end_time": time.time_ns(),
|
|
|
"new_block_id": new_block_id,
|
|
|
"status": "completed",
|
|
|
"transfer_size_bytes": data.nbytes if hasattr(data, 'nbytes') else len(data)
|
|
|
})
|
|
|
|
|
|
self.store_system_state()
|
|
|
return new_block_id
|
|
|
|
|
|
return None
|
|
|
|
|
|
def schedule_distributed_compute(self, compute_graph: Dict[str, Any]):
|
|
|
"""Schedule computation across multiple GPUs using intelligent distribution"""
|
|
|
from gpu_parallel_distributor import GPUParallelDistributor
|
|
|
|
|
|
|
|
|
distributor = GPUParallelDistributor(num_gpus=len(self.gpus))
|
|
|
scheduled_ops = []
|
|
|
|
|
|
|
|
|
for op in compute_graph["operations"]:
|
|
|
distributed_chunks = distributor.distribute_operation(op)
|
|
|
|
|
|
for chunk in distributed_chunks:
|
|
|
gpu_id = chunk["gpu_id"]
|
|
|
|
|
|
warp_id = self.gpus[gpu_id].schedule_compute(
|
|
|
sm_index=hash(str(chunk)) % self.gpus[gpu_id].chip_state["num_sms"],
|
|
|
warp_state=chunk
|
|
|
)
|
|
|
scheduled_ops.append({
|
|
|
"op": chunk,
|
|
|
"gpu": gpu_id,
|
|
|
"warp_id": warp_id
|
|
|
})
|
|
|
|
|
|
|
|
|
schedule_id = f"schedule_{time.time_ns()}"
|
|
|
self.storage.store_state(
|
|
|
"compute_schedule",
|
|
|
schedule_id,
|
|
|
{
|
|
|
"operations": scheduled_ops,
|
|
|
"metadata": {
|
|
|
"timestamp": time.time_ns(),
|
|
|
"num_gpus": len(self.gpus),
|
|
|
"schedule_type": "distributed",
|
|
|
"gpu_utilization": {
|
|
|
f"gpu_{i}": len([op for op in scheduled_ops if op["gpu"] == i])
|
|
|
for i in range(len(self.gpus))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
)
|
|
|
|
|
|
return scheduled_ops
|
|
|
|
|
|
def synchronize(self):
|
|
|
"""Synchronize all GPUs with local barrier at electron speed"""
|
|
|
sync_point = time.time_ns()
|
|
|
|
|
|
|
|
|
self.system_state["sync_state"] = {
|
|
|
"sync_point": sync_point,
|
|
|
"start_time": time.time_ns(),
|
|
|
"status": "in_progress",
|
|
|
"gpu_status": {}
|
|
|
}
|
|
|
|
|
|
|
|
|
for i, gpu in enumerate(self.gpus):
|
|
|
gpu.chip_state["sync_point"] = sync_point
|
|
|
gpu.store_chip_state()
|
|
|
|
|
|
|
|
|
self.system_state["sync_state"]["gpu_status"][i] = {
|
|
|
"reached_barrier": True,
|
|
|
"timestamp": time.time_ns(),
|
|
|
"state": "synced"
|
|
|
}
|
|
|
|
|
|
|
|
|
self.system_state["sync_state"]["status"] = "completed"
|
|
|
self.system_state["sync_state"]["completion_time"] = time.time_ns()
|
|
|
self.system_state["last_sync"] = sync_point
|
|
|
|
|
|
|
|
|
self.store_system_state()
|
|
|
|
|
|
def get_system_stats(self) -> Dict[str, Any]:
|
|
|
"""Get comprehensive system statistics with enhanced metrics from remote storage"""
|
|
|
|
|
|
|
|
|
block_stats = self.storage.conn.execute("""
|
|
|
SELECT
|
|
|
COUNT(*) as total_blocks,
|
|
|
COALESCE(SUM(size), 0) as total_size,
|
|
|
COUNT(DISTINCT device_id) as active_devices,
|
|
|
MIN(created_at) as oldest_block,
|
|
|
MAX(last_accessed) as latest_access
|
|
|
FROM vram_blocks
|
|
|
""").fetchone()
|
|
|
|
|
|
|
|
|
gpu_memory = self.storage.conn.execute("""
|
|
|
SELECT
|
|
|
device_id,
|
|
|
COUNT(*) as block_count,
|
|
|
COALESCE(SUM(size), 0) as used_memory,
|
|
|
COUNT(CASE WHEN is_pinned THEN 1 END) as pinned_blocks
|
|
|
FROM vram_blocks
|
|
|
WHERE device_id IS NOT NULL
|
|
|
GROUP BY device_id
|
|
|
""").fetchall()
|
|
|
|
|
|
|
|
|
transfer_stats = self.storage.conn.execute("""
|
|
|
SELECT
|
|
|
COUNT(*) as transfer_count,
|
|
|
COALESCE(SUM(metadata->>'size'), 0) as total_transferred
|
|
|
FROM states
|
|
|
WHERE name = 'gpu_transfer'
|
|
|
AND metadata->>'status' = 'completed'
|
|
|
AND created_at >= CURRENT_TIMESTAMP - INTERVAL 1 HOUR
|
|
|
""").fetchone()
|
|
|
|
|
|
stats = {
|
|
|
"num_gpus": len(self.gpus),
|
|
|
"memory_state": {
|
|
|
"physical_vram_gb": self.system_state["global_memory_state"]["physical_vram_gb"],
|
|
|
"allocated_physical_gb": self.system_state["global_memory_state"]["allocated_vram_gb"],
|
|
|
"virtual_vram_gb": self.system_state["global_memory_state"]["virtual_vram_gb"],
|
|
|
"total_available_gb": float('inf'),
|
|
|
"allocation_count": block_stats[0],
|
|
|
"total_allocated_bytes": block_stats[1],
|
|
|
"active_devices": block_stats[2],
|
|
|
"oldest_allocation": block_stats[3],
|
|
|
"latest_access": block_stats[4],
|
|
|
"per_gpu_usage": {
|
|
|
row[0]: {
|
|
|
"blocks": row[1],
|
|
|
"bytes_used": row[2],
|
|
|
"pinned_blocks": row[3]
|
|
|
} for row in gpu_memory
|
|
|
}
|
|
|
},
|
|
|
"gpus": [gpu.get_stats() for gpu in self.gpus],
|
|
|
"nvlink": {
|
|
|
"active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]),
|
|
|
"active_transfers": len(self.system_state["nvlink_state"]["active_transfers"]),
|
|
|
"total_bandwidth_tbps": len(self.gpus) * 300 / 1000,
|
|
|
"transfer_history": self.system_state["nvlink_state"]["active_transfers"],
|
|
|
"hourly_transfers": transfer_stats[0],
|
|
|
"hourly_bytes_transferred": transfer_stats[1]
|
|
|
},
|
|
|
"power": {
|
|
|
"total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus),
|
|
|
"per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus],
|
|
|
"efficiency_metrics": self.system_state["power_state"]["efficiency_metrics"]
|
|
|
},
|
|
|
"compute": {
|
|
|
"active_operations": len(self.system_state["compute_state"]["active_operations"]),
|
|
|
"completed_operations": len(self.system_state["compute_state"]["completed_operations"]),
|
|
|
"gpu_loads": self.system_state["compute_state"]["gpu_loads"]
|
|
|
},
|
|
|
"storage": {
|
|
|
"path": self.storage.base_path,
|
|
|
"virtual_blocks": len([k for k in os.listdir(self.storage.base_path) if k.startswith("virtual_block")]),
|
|
|
"total_stored_tensors": len(self.system_state["global_memory_state"]["allocation_map"])
|
|
|
}
|
|
|
}
|
|
|
return stats
|
|
|
|
|
|
|