|
|
try:
|
|
|
import numpy as np
|
|
|
except ImportError:
|
|
|
import pip
|
|
|
pip.main(['install', 'numpy'])
|
|
|
import numpy as np
|
|
|
|
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
import time
|
|
|
import json
|
|
|
import logging
|
|
|
import duckdb
|
|
|
from huggingface_hub import HfApi, HfFileSystem
|
|
|
from .hal.hal import HardwareAbstractionLayer, HardwareType
|
|
|
from .memory.duckdb_memory_manager import DuckDBMemoryManager
|
|
|
from .commands.command_processor import CommandProcessor
|
|
|
from .graphics.graphics_api import GraphicsAPI
|
|
|
from .memory.memory_pool import MemoryPool, SharedMemoryPool
|
|
|
from .graphics.texture_buffer_manager import (
|
|
|
Texture, Buffer, Framebuffer, TextureFormat, FilterMode,
|
|
|
WrapMode, BufferType, MSAASamples, AttachmentType
|
|
|
)
|
|
|
from .graphics.rasterizer import Rasterizer
|
|
|
|
|
|
from tensor_core import TensorCore
|
|
|
from config import get_db_url, get_hf_token_cached
|
|
|
from multi_gpu_system_http import MultiGPUSystem
|
|
|
from gpu_parallel_distributor import GPUParallelDistributor
|
|
|
from parallel_array_distributor import ParallelArrayDistributor
|
|
|
from streaming_multiprocessor import StreamingMultiprocessor
|
|
|
from electron_speed import max_switch_freq, GATE_DELAY
|
|
|
from logic_gates import LogicGate, NANDGate, ANDGate
|
|
|
from .warp import Warp
|
|
|
|
|
|
|
|
|
DEFAULT_NUM_GPUS = 8
|
|
|
DEFAULT_SMS_PER_GPU = 1500
|
|
|
DEFAULT_CORES_PER_SM = 128
|
|
|
THREADS_PER_WARP = 32
|
|
|
|
|
|
|
|
|
DEFAULT_SHARED_MEMORY_PER_SM = 32 * 1024
|
|
|
DEFAULT_REGISTERS_PER_SM = 65536
|
|
|
DEFAULT_MAX_WARPS_PER_SM = DEFAULT_CORES_PER_SM // THREADS_PER_WARP
|
|
|
|
|
|
|
|
|
DEFAULT_LOCALITY_WEIGHT = 0.7
|
|
|
DEFAULT_LOAD_WEIGHT = 0.3
|
|
|
|
|
|
class GPUError(Exception):
|
|
|
"""Base class for GPU-related errors"""
|
|
|
pass
|
|
|
|
|
|
class MemoryError(GPUError):
|
|
|
"""Memory allocation or access error"""
|
|
|
pass
|
|
|
|
|
|
class StreamError(GPUError):
|
|
|
"""Stream operation error"""
|
|
|
pass
|
|
|
|
|
|
class TensorError(GPUError):
|
|
|
"""Tensor operation error"""
|
|
|
pass
|
|
|
|
|
|
class VirtualGPUDriver:
|
|
|
DB_URL = "hf://datasets/Fred808/helium/storage.json"
|
|
|
|
|
|
def __init__(self, num_gpus: int = DEFAULT_NUM_GPUS,
|
|
|
num_sms_per_gpu: int = DEFAULT_SMS_PER_GPU,
|
|
|
cores_per_sm: int = DEFAULT_CORES_PER_SM,
|
|
|
db_url: Optional[str] = None):
|
|
|
"""Initialize GPU driver with configurable architecture parameters"""
|
|
|
|
|
|
self.num_gpus = num_gpus
|
|
|
self.num_sms_per_gpu = num_sms_per_gpu
|
|
|
self.cores_per_sm = cores_per_sm
|
|
|
self.threads_per_warp = THREADS_PER_WARP
|
|
|
|
|
|
|
|
|
self.db_url = db_url or self.DB_URL
|
|
|
self.max_retries = 3
|
|
|
self._connect_with_retries()
|
|
|
|
|
|
|
|
|
self.hal = HardwareAbstractionLayer(db_url=self.db_url)
|
|
|
self.multi_gpu_system = MultiGPUSystem(num_gpus=self.num_gpus, db_url=self.db_url)
|
|
|
self.parallel_distributor = GPUParallelDistributor(num_gpus=self.num_gpus)
|
|
|
|
|
|
|
|
|
self.streaming_multiprocessors = {}
|
|
|
|
|
|
|
|
|
self._setup_database()
|
|
|
|
|
|
|
|
|
for chip_id in range(num_gpus):
|
|
|
self.streaming_multiprocessors[chip_id] = {}
|
|
|
|
|
|
|
|
|
self._initialize_graphics_subsystem()
|
|
|
|
|
|
def _connect_with_retries(self):
|
|
|
"""Establish database connection with retry logic"""
|
|
|
for attempt in range(self.max_retries):
|
|
|
try:
|
|
|
self.conn = self._init_db_connection()
|
|
|
return
|
|
|
except Exception as e:
|
|
|
if attempt == self.max_retries - 1:
|
|
|
raise RuntimeError(f"Failed to initialize database after {self.max_retries} attempts: {str(e)}")
|
|
|
time.sleep(1)
|
|
|
|
|
|
def _init_db_connection(self) -> duckdb.DuckDBPyConnection:
|
|
|
"""Initialize database connection with HuggingFace configuration"""
|
|
|
|
|
|
_, _, owner, dataset, db_file = self.db_url.split('/', 4)
|
|
|
db_path = f"s3://datasets-cached/{owner}/{dataset}/{db_file}"
|
|
|
|
|
|
|
|
|
conn = duckdb.connect(db_path)
|
|
|
conn.execute("INSTALL httpfs;")
|
|
|
conn.execute("LOAD httpfs;")
|
|
|
conn.execute("SET s3_endpoint='s3.us-east-1.amazonaws.com';")
|
|
|
conn.execute("SET s3_use_ssl=true;")
|
|
|
conn.execute("SET s3_url_style='path';")
|
|
|
conn.execute(f"SET s3_access_key_id='{self.HF_TOKEN}';")
|
|
|
conn.execute(f"SET s3_secret_access_key='{self.HF_TOKEN}';")
|
|
|
return conn
|
|
|
|
|
|
def _setup_database(self):
|
|
|
"""Initialize database tables"""
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS gpu_state (
|
|
|
chip_id INTEGER PRIMARY KEY,
|
|
|
num_sms INTEGER,
|
|
|
cores_per_sm INTEGER,
|
|
|
memory_size BIGINT,
|
|
|
utilization DOUBLE,
|
|
|
temperature DOUBLE,
|
|
|
power_usage DOUBLE,
|
|
|
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
state_json JSON
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS streaming_multiprocessors (
|
|
|
sm_id VARCHAR PRIMARY KEY,
|
|
|
chip_id INTEGER,
|
|
|
num_cores INTEGER,
|
|
|
active_warps INTEGER,
|
|
|
shared_memory_used BIGINT,
|
|
|
registers_used INTEGER,
|
|
|
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
state_json JSON
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS warp_execution (
|
|
|
warp_id VARCHAR PRIMARY KEY,
|
|
|
sm_id VARCHAR,
|
|
|
chip_id INTEGER,
|
|
|
num_threads INTEGER,
|
|
|
program_counter BIGINT,
|
|
|
status VARCHAR,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
updated_at TIMESTAMP,
|
|
|
state_json JSON,
|
|
|
FOREIGN KEY (sm_id) REFERENCES streaming_multiprocessors(sm_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS shared_memory_pools (
|
|
|
pool_id VARCHAR PRIMARY KEY,
|
|
|
gpu_id INTEGER,
|
|
|
size BIGINT,
|
|
|
allocated_size BIGINT,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
last_accessed TIMESTAMP,
|
|
|
last_modified TIMESTAMP,
|
|
|
state_json JSON
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
def _initialize_graphics_subsystem(self):
|
|
|
"""Initialize graphics-related subsystems"""
|
|
|
from src.graphics.texture_buffer_manager import (
|
|
|
Texture, Buffer, Framebuffer, TextureFormat, FilterMode,
|
|
|
WrapMode, BufferType, MSAASamples
|
|
|
)
|
|
|
from src.graphics.rasterizer import Rasterizer
|
|
|
|
|
|
|
|
|
self.TextureFormat = TextureFormat
|
|
|
self.FilterMode = FilterMode
|
|
|
self.WrapMode = WrapMode
|
|
|
self.BufferType = BufferType
|
|
|
self.MSAASamples = MSAASamples
|
|
|
|
|
|
|
|
|
self.texture_manager = {}
|
|
|
self.buffer_manager = {}
|
|
|
self.framebuffer_manager = {}
|
|
|
self.next_texture_id = 0
|
|
|
self.next_buffer_id = 0
|
|
|
self.next_fb_id = 0
|
|
|
|
|
|
|
|
|
self.rasterizers = {
|
|
|
chip_id: Rasterizer(self)
|
|
|
for chip_id in range(self.num_gpus)
|
|
|
}
|
|
|
|
|
|
def create_texture(self, width: int, height: int,
|
|
|
format: TextureFormat = None,
|
|
|
filter_mode: FilterMode = None,
|
|
|
wrap_mode: WrapMode = None,
|
|
|
generate_mipmaps: bool = True,
|
|
|
aniso_level: int = 1) -> int:
|
|
|
"""
|
|
|
Create a new texture with specified parameters
|
|
|
Returns texture_id
|
|
|
"""
|
|
|
format = format or self.TextureFormat.RGBA8
|
|
|
filter_mode = filter_mode or self.FilterMode.BILINEAR
|
|
|
wrap_mode = wrap_mode or self.WrapMode.REPEAT
|
|
|
|
|
|
texture = Texture(width, height, format, filter_mode,
|
|
|
wrap_mode, generate_mipmaps, aniso_level)
|
|
|
texture_id = self.next_texture_id
|
|
|
self.texture_manager[texture_id] = texture
|
|
|
self.next_texture_id += 1
|
|
|
return texture_id
|
|
|
|
|
|
def create_buffer(self, data: np.ndarray,
|
|
|
buffer_type: BufferType = None,
|
|
|
dynamic: bool = False,
|
|
|
map_write: bool = False) -> int:
|
|
|
"""
|
|
|
Create a new buffer with specified parameters
|
|
|
Returns buffer_id
|
|
|
"""
|
|
|
buffer_type = buffer_type or self.BufferType.VERTEX
|
|
|
buffer = Buffer(data, buffer_type, dynamic, map_write)
|
|
|
buffer_id = self.next_buffer_id
|
|
|
self.buffer_manager[buffer_id] = buffer
|
|
|
self.next_buffer_id += 1
|
|
|
return buffer_id
|
|
|
|
|
|
def create_framebuffer(self, width: int, height: int,
|
|
|
color_formats: List[TextureFormat] = None,
|
|
|
depth_format: Optional[TextureFormat] = None,
|
|
|
stencil_format: Optional[TextureFormat] = None,
|
|
|
samples: MSAASamples = None) -> int:
|
|
|
"""
|
|
|
Create a new framebuffer with specified parameters
|
|
|
Returns framebuffer_id
|
|
|
"""
|
|
|
color_formats = color_formats or [self.TextureFormat.RGBA8]
|
|
|
samples = samples or self.MSAASamples.MSAA_1X
|
|
|
|
|
|
fb = Framebuffer(width, height, color_formats,
|
|
|
depth_format, stencil_format, samples)
|
|
|
fb_id = self.next_fb_id
|
|
|
self.framebuffer_manager[fb_id] = fb
|
|
|
self.next_fb_id += 1
|
|
|
return fb_id
|
|
|
|
|
|
def upload_texture_data(self, texture_id: int, data: np.ndarray,
|
|
|
generate_mipmaps: bool = True):
|
|
|
"""Upload new data to existing texture"""
|
|
|
if texture_id not in self.texture_manager:
|
|
|
raise GPUError(f"Invalid texture ID: {texture_id}")
|
|
|
texture = self.texture_manager[texture_id]
|
|
|
texture.upload(data, generate_mipmaps)
|
|
|
|
|
|
def upload_buffer_data(self, buffer_id: int, data: np.ndarray):
|
|
|
"""Upload new data to existing buffer"""
|
|
|
if buffer_id not in self.buffer_manager:
|
|
|
raise GPUError(f"Invalid buffer ID: {buffer_id}")
|
|
|
buffer = self.buffer_manager[buffer_id]
|
|
|
buffer.upload(data)
|
|
|
|
|
|
def map_buffer(self, buffer_id: int) -> Optional[np.ndarray]:
|
|
|
"""Map buffer for CPU access"""
|
|
|
if buffer_id not in self.buffer_manager:
|
|
|
raise GPUError(f"Invalid buffer ID: {buffer_id}")
|
|
|
buffer = self.buffer_manager[buffer_id]
|
|
|
return buffer.map()
|
|
|
|
|
|
def unmap_buffer(self, buffer_id: int):
|
|
|
"""Unmap previously mapped buffer"""
|
|
|
if buffer_id not in self.buffer_manager:
|
|
|
raise GPUError(f"Invalid buffer ID: {buffer_id}")
|
|
|
buffer = self.buffer_manager[buffer_id]
|
|
|
buffer.unmap()
|
|
|
|
|
|
def draw_triangles(self, vertex_buffer_id: int,
|
|
|
index_buffer_id: Optional[int],
|
|
|
framebuffer_id: int,
|
|
|
shader_program: Dict,
|
|
|
num_vertices: int,
|
|
|
start_vertex: int = 0,
|
|
|
chip_id: int = 0):
|
|
|
"""
|
|
|
Draw triangles using specified buffers and shader program
|
|
|
|
|
|
Args:
|
|
|
vertex_buffer_id: Buffer containing vertex data
|
|
|
index_buffer_id: Optional buffer containing indices
|
|
|
framebuffer_id: Target framebuffer
|
|
|
shader_program: Dict containing vertex and fragment shader code
|
|
|
num_vertices: Number of vertices to draw
|
|
|
start_vertex: Starting vertex offset
|
|
|
chip_id: GPU to use for rendering
|
|
|
"""
|
|
|
if vertex_buffer_id not in self.buffer_manager:
|
|
|
raise GPUError(f"Invalid vertex buffer ID: {vertex_buffer_id}")
|
|
|
if index_buffer_id and index_buffer_id not in self.buffer_manager:
|
|
|
raise GPUError(f"Invalid index buffer ID: {index_buffer_id}")
|
|
|
if framebuffer_id not in self.framebuffer_manager:
|
|
|
raise GPUError(f"Invalid framebuffer ID: {framebuffer_id}")
|
|
|
|
|
|
vertex_buffer = self.buffer_manager[vertex_buffer_id]
|
|
|
index_buffer = (self.buffer_manager[index_buffer_id]
|
|
|
if index_buffer_id else None)
|
|
|
framebuffer = self.framebuffer_manager[framebuffer_id]
|
|
|
|
|
|
|
|
|
if index_buffer:
|
|
|
indices = index_buffer.data[start_vertex:start_vertex + num_vertices]
|
|
|
vertices = vertex_buffer.data[indices]
|
|
|
else:
|
|
|
vertices = vertex_buffer.data[start_vertex:start_vertex + num_vertices]
|
|
|
|
|
|
|
|
|
for i in range(0, len(vertices), 3):
|
|
|
v0, v1, v2 = vertices[i:i+3]
|
|
|
|
|
|
|
|
|
fragments = self.rasterizers[chip_id].rasterize_triangle(
|
|
|
v0, v1, v2,
|
|
|
framebuffer.width,
|
|
|
framebuffer.height,
|
|
|
framebuffer.samples
|
|
|
)
|
|
|
|
|
|
|
|
|
processed = self.rasterizers[chip_id].process_fragments(
|
|
|
fragments,
|
|
|
shader_program,
|
|
|
chip_id,
|
|
|
early_z=True,
|
|
|
hierarchical_z=True
|
|
|
)
|
|
|
|
|
|
|
|
|
passed, depth_buffer = self.rasterizers[chip_id].depth_test(
|
|
|
processed,
|
|
|
framebuffer.depth_attachment.data.tobytes()
|
|
|
if framebuffer.depth_attachment else None,
|
|
|
framebuffer.width
|
|
|
)
|
|
|
|
|
|
|
|
|
for target_idx, target in enumerate(framebuffer.color_attachments):
|
|
|
target.data = np.frombuffer(
|
|
|
self.rasterizers[chip_id].write_to_framebuffer(
|
|
|
passed,
|
|
|
target.data.tobytes(),
|
|
|
framebuffer.width,
|
|
|
blend_enable=True
|
|
|
),
|
|
|
dtype=target.data.dtype
|
|
|
).reshape(target.data.shape)
|
|
|
|
|
|
def _initialize_hal_schema(self):
|
|
|
"""Initialize HAL database schema for multi-GPU tracking"""
|
|
|
|
|
|
self.hal.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS cross_gpu_operations (
|
|
|
operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
operation_type TEXT,
|
|
|
source_chip INTEGER,
|
|
|
target_chip INTEGER,
|
|
|
nvlink_path TEXT,
|
|
|
start_time TIMESTAMP,
|
|
|
end_time TIMESTAMP,
|
|
|
state_json JSON
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.hal.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS memory_coherence (
|
|
|
address INTEGER,
|
|
|
chip_id INTEGER,
|
|
|
version INTEGER,
|
|
|
last_modified TIMESTAMP,
|
|
|
dirty BOOLEAN,
|
|
|
PRIMARY KEY (address, chip_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
self.hal.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS sync_points (
|
|
|
sync_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
operation_id INTEGER,
|
|
|
chips_involved JSON,
|
|
|
reached_by JSON,
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
self.hal.conn.commit()
|
|
|
|
|
|
def _initialize_memory_management(self, num_chips: int, vram_size_gb: int):
|
|
|
"""Initialize cross-GPU memory management"""
|
|
|
|
|
|
for chip_id in range(num_chips):
|
|
|
|
|
|
pool_id = f"gpu_{chip_id}_global"
|
|
|
self.memory_pools[pool_id] = MemoryPool(
|
|
|
size_bytes=vram_size_gb * 1024 * 1024 * 1024,
|
|
|
chip_id=chip_id
|
|
|
)
|
|
|
|
|
|
|
|
|
for other_chip in range(chip_id + 1, num_chips):
|
|
|
shared_pool_id = f"shared_pool_{chip_id}_{other_chip}"
|
|
|
self.shared_pools[shared_pool_id] = SharedMemoryPool(
|
|
|
size_bytes=1 * 1024 * 1024 * 1024,
|
|
|
gpu_a=chip_id,
|
|
|
gpu_b=other_chip
|
|
|
)
|
|
|
|
|
|
|
|
|
self.hal.conn.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS memory_transfers (
|
|
|
transfer_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
source_chip INTEGER,
|
|
|
target_chip INTEGER,
|
|
|
size_bytes INTEGER,
|
|
|
nvlink_path TEXT,
|
|
|
start_time TIMESTAMP,
|
|
|
end_time TIMESTAMP,
|
|
|
bandwidth_used FLOAT
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
def _initialize_command_processors(self, num_chips: int):
|
|
|
"""Initialize command processors for cross-GPU operations"""
|
|
|
|
|
|
for chip_a in range(num_chips):
|
|
|
for chip_b in range(chip_a + 1, num_chips):
|
|
|
queue_id = f"xfer_{chip_a}_{chip_b}"
|
|
|
self.hal.conn.execute("""
|
|
|
INSERT INTO hardware_queues (
|
|
|
queue_id, hardware_type, chip_id, sm_id, instructions
|
|
|
) VALUES (?, ?, ?, ?, ?)
|
|
|
""", (
|
|
|
queue_id,
|
|
|
'DMA_ENGINE',
|
|
|
chip_a,
|
|
|
None,
|
|
|
json.dumps([])
|
|
|
))
|
|
|
|
|
|
|
|
|
for chip_id in range(num_chips):
|
|
|
self.hal.conn.execute("""
|
|
|
INSERT INTO hardware_queues (
|
|
|
queue_id, hardware_type, chip_id, sm_id, instructions
|
|
|
) VALUES (?, ?, ?, ?, ?)
|
|
|
""", (
|
|
|
f"compute_{chip_id}",
|
|
|
'COMPUTE_UNIT',
|
|
|
chip_id,
|
|
|
None,
|
|
|
json.dumps([])
|
|
|
))
|
|
|
|
|
|
for chip_id in range(num_chips):
|
|
|
for sm_id in range(self.num_sms_per_chip):
|
|
|
sm = StreamingMultiprocessor(
|
|
|
sm_id=sm_id,
|
|
|
chip_id=chip_id,
|
|
|
num_cores=self.cores_per_sm,
|
|
|
db_url=self.db_url
|
|
|
)
|
|
|
self.streaming_multiprocessors[chip_id][sm_id] = sm
|
|
|
|
|
|
|
|
|
self.hal.conn.execute("""
|
|
|
INSERT INTO hardware_states (
|
|
|
component_id, hardware_type, chip_id, sm_id, state_json
|
|
|
) VALUES (?, ?, ?, ?, ?)
|
|
|
""", (
|
|
|
f"sm_{chip_id}_{sm_id}",
|
|
|
'STREAMING_MULTIPROCESSOR',
|
|
|
chip_id,
|
|
|
sm_id,
|
|
|
json.dumps(sm.sm_state)
|
|
|
))
|
|
|
|
|
|
|
|
|
self.array_distributors = {}
|
|
|
for chip_id in range(self.num_gpus):
|
|
|
distributor = ParallelArrayDistributor(
|
|
|
num_sms=self.num_sms_per_gpu,
|
|
|
cores_per_sm=self.cores_per_sm
|
|
|
)
|
|
|
self.array_distributors[chip_id] = distributor
|
|
|
|
|
|
|
|
|
distributor.storage = self.local_storage
|
|
|
distributor.streaming_multiprocessors = self.streaming_multiprocessors[chip_id]
|
|
|
|
|
|
|
|
|
self.warps = {}
|
|
|
for chip_id in range(self.num_gpus):
|
|
|
self.warps[chip_id] = {}
|
|
|
for sm_id in range(self.num_sms_per_gpu):
|
|
|
self.warps[chip_id][sm_id] = []
|
|
|
sm = self.streaming_multiprocessors[chip_id][sm_id]
|
|
|
|
|
|
warps_per_sm = self.cores_per_sm // self.threads_per_warp
|
|
|
for warp_id in range(warps_per_sm):
|
|
|
warp = Warp(warp_id)
|
|
|
self.warps[chip_id][sm_id].append(warp)
|
|
|
|
|
|
|
|
|
sm.schedule_warp(str(warp_id), {
|
|
|
'warp_id': warp_id,
|
|
|
'status': 'initialized',
|
|
|
'priority': 0,
|
|
|
'dependencies': [],
|
|
|
'resource_requirements': {
|
|
|
'tensor_cores': 1,
|
|
|
'shared_memory': 32768
|
|
|
}
|
|
|
})
|
|
|
|
|
|
|
|
|
self.hal = HardwareAbstractionLayer()
|
|
|
self.tensor_core = TensorCore()
|
|
|
self.local_storage = LocalStorage()
|
|
|
self.memory_manager = DuckDBMemoryManager(get_db_url())
|
|
|
self.stream_manager = {}
|
|
|
|
|
|
|
|
|
self.initialized = False
|
|
|
self._sync_primitives = {}
|
|
|
self.memory_pools = {}
|
|
|
self.shared_pools = {}
|
|
|
|
|
|
|
|
|
self.gate_delay = GATE_DELAY
|
|
|
self.max_switch_freq = max_switch_freq
|
|
|
self.logic_gates = {
|
|
|
'nand': NANDGate(),
|
|
|
'and': ANDGate()
|
|
|
}
|
|
|
|
|
|
def allocate_memory(self, size_bytes, chip_id=None, key=None, tensor_shape=None, dtype=None):
|
|
|
"""
|
|
|
Allocate memory with support for tensor operations and local storage across multiple GPUs.
|
|
|
Args:
|
|
|
size_bytes: Size of memory to allocate
|
|
|
chip_id: Target GPU chip (if None, will be automatically selected)
|
|
|
key: Optional persistent storage key
|
|
|
tensor_shape: Shape for tensor allocation
|
|
|
dtype: Data type for tensor allocation
|
|
|
"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
|
|
|
try:
|
|
|
|
|
|
if chip_id is None:
|
|
|
gpu_states = self.multi_gpu_system.system_state["global_memory_state"]["allocation_map"]
|
|
|
|
|
|
available_memory = {gpu: state.get("free_memory", float('inf'))
|
|
|
for gpu, state in gpu_states.items()}
|
|
|
chip_id = max(available_memory.items(), key=lambda x: x[1])[0]
|
|
|
|
|
|
|
|
|
if chip_id >= len(self.streaming_multiprocessors):
|
|
|
raise MemoryError(f"Invalid chip_id: {chip_id}")
|
|
|
|
|
|
address = None
|
|
|
|
|
|
|
|
|
if tensor_shape is not None:
|
|
|
if dtype is None:
|
|
|
dtype = np.float32
|
|
|
|
|
|
|
|
|
array_distributor = self.array_distributors[chip_id]
|
|
|
sm_assignments = array_distributor.get_optimal_sm_assignment(tensor_shape)
|
|
|
|
|
|
|
|
|
address = self.memory_manager.allocate_tensor(
|
|
|
tensor_shape,
|
|
|
str(dtype),
|
|
|
chip_id,
|
|
|
sm_assignments=sm_assignments
|
|
|
)
|
|
|
|
|
|
if key:
|
|
|
self.local_storage.store(key, {
|
|
|
'address': address,
|
|
|
'shape': tensor_shape,
|
|
|
'dtype': str(dtype),
|
|
|
'chip_id': chip_id,
|
|
|
'sm_assignments': sm_assignments
|
|
|
})
|
|
|
|
|
|
|
|
|
self.multi_gpu_system.update_memory_allocation(
|
|
|
chip_id,
|
|
|
size_bytes,
|
|
|
tensor=True,
|
|
|
sm_assignments=sm_assignments
|
|
|
)
|
|
|
|
|
|
|
|
|
elif key is not None:
|
|
|
address = self.memory_manager.allocate_with_key(
|
|
|
size_bytes,
|
|
|
key,
|
|
|
chip_id,
|
|
|
tensor_shape=tensor_shape,
|
|
|
dtype=str(dtype) if dtype else None
|
|
|
)
|
|
|
self.local_storage.store(key, {
|
|
|
'address': address,
|
|
|
'chip_id': chip_id
|
|
|
})
|
|
|
self.multi_gpu_system.update_memory_allocation(chip_id, size_bytes)
|
|
|
|
|
|
|
|
|
else:
|
|
|
address = self.memory_manager.allocate(size_bytes, chip_id)
|
|
|
self.multi_gpu_system.update_memory_allocation(chip_id, size_bytes)
|
|
|
|
|
|
return address
|
|
|
|
|
|
except Exception as e:
|
|
|
if tensor_shape is not None:
|
|
|
raise TensorError(f"Failed to allocate tensor: {str(e)}")
|
|
|
raise MemoryError(f"Failed to allocate memory: {str(e)}")
|
|
|
|
|
|
def list_memory_keys(self):
|
|
|
"""List all string keys for persistent memory blocks"""
|
|
|
if hasattr(self.memory_manager, 'list_keys'):
|
|
|
return self.memory_manager.list_keys()
|
|
|
return []
|
|
|
|
|
|
def _calculate_data_locality(self, sm_id: int, operation_data: Dict[str, Any]) -> float:
|
|
|
"""
|
|
|
Calculate data locality score for an SM based on input data location
|
|
|
Returns score between 0 and 1 (1 = all data is local)
|
|
|
"""
|
|
|
total_data_size = 0
|
|
|
local_data_size = 0
|
|
|
|
|
|
|
|
|
for tensor_info in operation_data["inputs"]:
|
|
|
size = tensor_info.get("size", 0)
|
|
|
total_data_size += size
|
|
|
|
|
|
|
|
|
if self._is_data_local(sm_id, tensor_info["address"]):
|
|
|
local_data_size += size
|
|
|
|
|
|
return local_data_size / total_data_size if total_data_size > 0 else 0.0
|
|
|
|
|
|
def _is_data_local(self, sm_id: int, address: int) -> bool:
|
|
|
"""Check if data at address is local to the given SM"""
|
|
|
sm = self.streaming_multiprocessors[self.active_chip_id][sm_id]
|
|
|
|
|
|
|
|
|
if address in sm.sm_state["l1_cache"]:
|
|
|
return True
|
|
|
|
|
|
|
|
|
if str(address) in sm.sm_state["shared_memory"]:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
def _select_best_sm(self, operation_data: Dict[str, Any]) -> Tuple[int, int]:
|
|
|
"""
|
|
|
Select best SM for operation based on locality and load
|
|
|
Returns (chip_id, sm_id)
|
|
|
"""
|
|
|
best_score = -1
|
|
|
best_sm = None
|
|
|
best_chip = None
|
|
|
|
|
|
|
|
|
for chip_id in range(self.num_gpus):
|
|
|
for sm_id in range(self.num_sms_per_gpu):
|
|
|
sm = self.streaming_multiprocessors[chip_id][sm_id]
|
|
|
|
|
|
|
|
|
load = len(sm.current_tensor_ops)
|
|
|
|
|
|
|
|
|
locality_score = self._calculate_data_locality(sm_id, operation_data)
|
|
|
|
|
|
|
|
|
load_score = 1.0 / (load + 1)
|
|
|
final_score = (locality_score * DEFAULT_LOCALITY_WEIGHT +
|
|
|
load_score * DEFAULT_LOAD_WEIGHT)
|
|
|
|
|
|
if final_score > best_score:
|
|
|
best_score = final_score
|
|
|
best_sm = sm_id
|
|
|
best_chip = chip_id
|
|
|
|
|
|
return best_chip, best_sm
|
|
|
|
|
|
def schedule_tensor_operation(self, operation_data: Dict[str, Any]) -> Tuple[int, int]:
|
|
|
"""
|
|
|
Schedule a tensor operation on the best SM considering:
|
|
|
- Data locality (70% weight)
|
|
|
- Current SM load (30% weight)
|
|
|
Returns (chip_id, sm_id) of selected SM
|
|
|
"""
|
|
|
|
|
|
if "preferred_sm" in operation_data:
|
|
|
chip_id = operation_data.get("preferred_chip", 0)
|
|
|
sm_id = operation_data["preferred_sm"]
|
|
|
|
|
|
|
|
|
sm = self.streaming_multiprocessors[chip_id][sm_id]
|
|
|
if len(sm.current_tensor_ops) < DEFAULT_MAX_WARPS_PER_SM:
|
|
|
return chip_id, sm_id
|
|
|
|
|
|
|
|
|
return self._select_best_sm(operation_data)
|
|
|
|
|
|
def create_stream(self, priority=0):
|
|
|
"""Create a new execution stream"""
|
|
|
stream_id = len(self.stream_manager)
|
|
|
self.stream_manager[stream_id] = {
|
|
|
'priority': priority,
|
|
|
'pending_ops': [],
|
|
|
'sync_points': set()
|
|
|
}
|
|
|
return stream_id
|
|
|
|
|
|
def stream_synchronize(self, stream_id):
|
|
|
"""Wait for all operations in a stream to complete"""
|
|
|
if stream_id not in self.stream_manager:
|
|
|
raise StreamError(f"Invalid stream ID: {stream_id}")
|
|
|
stream = self.stream_manager[stream_id]
|
|
|
|
|
|
|
|
|
while stream['pending_ops']:
|
|
|
op = stream['pending_ops'].pop(0)
|
|
|
try:
|
|
|
self._execute_stream_op(op)
|
|
|
except Exception as e:
|
|
|
raise StreamError(f"Stream operation failed: {str(e)}")
|
|
|
|
|
|
|
|
|
stream['sync_points'].clear()
|
|
|
|
|
|
def _execute_stream_op(self, op):
|
|
|
"""Execute a single stream operation"""
|
|
|
op_type = op.get('type')
|
|
|
if op_type == 'tensor':
|
|
|
self._execute_tensor_op(op)
|
|
|
elif op_type == 'memory':
|
|
|
self._execute_memory_op(op)
|
|
|
elif op_type == 'sync':
|
|
|
self._handle_sync_point(op)
|
|
|
else:
|
|
|
raise StreamError(f"Unknown operation type: {op_type}")
|
|
|
|
|
|
def add_stream_sync_point(self, stream_id, sync_point_id):
|
|
|
"""Add a synchronization point to a stream"""
|
|
|
if stream_id not in self.stream_manager:
|
|
|
raise StreamError(f"Invalid stream ID: {stream_id}")
|
|
|
self.stream_manager[stream_id]['sync_points'].add(sync_point_id)
|
|
|
self._sync_primitives[sync_point_id] = False
|
|
|
|
|
|
def execute_tensor_op(self, op_type, inputs, output, stream_id=None):
|
|
|
"""Execute a tensor operation with parallel distribution across multiple GPUs"""
|
|
|
if stream_id is not None and stream_id not in self.stream_manager:
|
|
|
raise StreamError(f"Invalid stream ID: {stream_id}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
input_sizes = {addr: inp.nbytes for addr, inp in inputs.items()}
|
|
|
target_gpus = self._select_optimal_gpus(input_sizes)
|
|
|
|
|
|
|
|
|
distributed_ops = self.parallel_distributor.distribute_operation({
|
|
|
'type': op_type,
|
|
|
'inputs': inputs,
|
|
|
'output': output,
|
|
|
'input_size': sum(input_sizes.values()),
|
|
|
'target_gpus': target_gpus
|
|
|
})
|
|
|
|
|
|
|
|
|
op_id = self._register_cross_gpu_operation(op_type, distributed_ops)
|
|
|
|
|
|
|
|
|
self._setup_memory_coherence(distributed_ops)
|
|
|
|
|
|
|
|
|
start_time = time.time()
|
|
|
expected_compute_time = len(distributed_ops) * self.gate_delay
|
|
|
|
|
|
|
|
|
operations = []
|
|
|
for dist_op in distributed_ops:
|
|
|
gpu_id = dist_op['gpu_id']
|
|
|
sm_id = self._select_optimal_sm(gpu_id, dist_op)
|
|
|
|
|
|
|
|
|
warp = self._get_available_warp(gpu_id, sm_id)
|
|
|
|
|
|
op = {
|
|
|
'type': 'tensor',
|
|
|
'op_type': op_type,
|
|
|
'gpu_id': gpu_id,
|
|
|
'sm_id': sm_id,
|
|
|
'warp_id': warp.warp_id,
|
|
|
'inputs': dist_op['inputs'],
|
|
|
'output_slice': dist_op.get('output_slice'),
|
|
|
'stream_id': stream_id,
|
|
|
'all_inputs': inputs,
|
|
|
'all_output': output
|
|
|
}
|
|
|
|
|
|
|
|
|
if stream_id is not None:
|
|
|
|
|
|
self.stream_manager[stream_id]['pending_ops'].append(op)
|
|
|
return None
|
|
|
else:
|
|
|
|
|
|
return self._execute_tensor_op(op)
|
|
|
|
|
|
except Exception as e:
|
|
|
raise TensorError(f"Tensor operation failed: {str(e)}")
|
|
|
|
|
|
def _execute_tensor_op(self, op):
|
|
|
"""Execute a tensor operation"""
|
|
|
op_type = op['op_type']
|
|
|
inputs = op['inputs']
|
|
|
output = op['output']
|
|
|
|
|
|
|
|
|
for addr in inputs:
|
|
|
if not self.memory_manager.is_tensor(addr):
|
|
|
raise TensorError(f"Invalid tensor address: {addr}")
|
|
|
|
|
|
if not self.memory_manager.is_tensor(output):
|
|
|
raise TensorError(f"Invalid output tensor address: {output}")
|
|
|
|
|
|
|
|
|
return self.tensor_core.execute_op(op_type, inputs, output)
|
|
|
|
|
|
def read_memory_by_key(self, key):
|
|
|
"""
|
|
|
Read memory block by string key (if supported).
|
|
|
"""
|
|
|
if hasattr(self.memory_manager, 'read_by_key'):
|
|
|
return self.memory_manager.read_by_key(key)
|
|
|
raise NotImplementedError("Underlying memory manager does not support read_by_key.")
|
|
|
|
|
|
|
|
|
def layernorm(self, x, gamma, beta, eps=1e-5, chip_id=0, sm_id=0):
|
|
|
|
|
|
mean = np.mean(x, axis=-1, keepdims=True)
|
|
|
var = np.var(x, axis=-1, keepdims=True)
|
|
|
x_norm = (x - mean) / np.sqrt(var + eps)
|
|
|
return gamma * x_norm + beta
|
|
|
|
|
|
def gelu(self, x, chip_id=0, sm_id=0):
|
|
|
|
|
|
return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * np.power(x, 3))))
|
|
|
|
|
|
def softmax(self, x, axis=-1, chip_id=0, sm_id=0):
|
|
|
|
|
|
x_max = np.max(x, axis=axis, keepdims=True)
|
|
|
e_x = np.exp(x - x_max)
|
|
|
return e_x / np.sum(e_x, axis=axis, keepdims=True)
|
|
|
|
|
|
def matmul(self, A, B, chip_id=0, sm_id=0):
|
|
|
|
|
|
return self.hal.v2_tensor_matmul(chip_id, A, B)
|
|
|
|
|
|
|
|
|
def initialize(self, num_chips=8, vram_size_gb=16, num_sms_per_chip=1500,
|
|
|
num_cores_per_sm=128, threads_per_core=700000, threads_per_block=32,
|
|
|
num_tensor_cores_per_sm=8):
|
|
|
"""Initialize the virtual GPU driver with full multi-GPU support"""
|
|
|
print("Initializing Virtual GPU Driver...")
|
|
|
|
|
|
|
|
|
total_threads = num_chips * num_sms_per_chip * num_cores_per_sm * threads_per_core
|
|
|
max_total_threads = self.max_switch_freq * self.gate_delay
|
|
|
if total_threads > max_total_threads:
|
|
|
raise ValueError(
|
|
|
f"Configuration exceeds maximum possible threads at current electron speed.\n"
|
|
|
f"Requested: {total_threads:,} threads\n"
|
|
|
f"Maximum possible: {max_total_threads:,} threads"
|
|
|
)
|
|
|
|
|
|
|
|
|
memory_clock_hz = 2132000000
|
|
|
memory_bus_width = 384
|
|
|
memory_bandwidth_per_gpu = (memory_clock_hz * memory_bus_width) / (8 * 1e9)
|
|
|
watts_per_sm = 25
|
|
|
total_power_per_gpu = min(watts_per_sm * num_sms_per_chip, 350)
|
|
|
|
|
|
|
|
|
self.hardware_config = {
|
|
|
|
|
|
'num_chips': num_chips,
|
|
|
'vram_size_gb': vram_size_gb,
|
|
|
'num_sms_per_chip': num_sms_per_chip,
|
|
|
'num_cores_per_sm': num_cores_per_sm,
|
|
|
'threads_per_core': threads_per_core,
|
|
|
'threads_per_block': threads_per_block,
|
|
|
'num_tensor_cores_per_sm': num_tensor_cores_per_sm,
|
|
|
'total_threads': total_threads,
|
|
|
|
|
|
|
|
|
'gate_delay': self.gate_delay,
|
|
|
'max_switch_freq': self.max_switch_freq,
|
|
|
|
|
|
|
|
|
'memory_config': {
|
|
|
'memory_clock_hz': memory_clock_hz,
|
|
|
'memory_bus_width': memory_bus_width,
|
|
|
'bandwidth_gb_per_sec': memory_bandwidth_per_gpu,
|
|
|
'nvlink_bandwidth_gbps': 900,
|
|
|
'l1_cache_size_kb': 128,
|
|
|
'l2_cache_size_mb': 6,
|
|
|
'shared_memory_per_sm_kb': 164
|
|
|
},
|
|
|
|
|
|
|
|
|
'power_config': {
|
|
|
'watts_per_sm': watts_per_sm,
|
|
|
'total_power_limit': total_power_per_gpu,
|
|
|
'thermal_limit_celsius': 85,
|
|
|
'total_system_power': total_power_per_gpu * num_chips
|
|
|
},
|
|
|
|
|
|
|
|
|
'compute_capability': {
|
|
|
'major': 9,
|
|
|
'minor': 0,
|
|
|
'features': {
|
|
|
'tensor_cores': True,
|
|
|
'ray_tracing_cores': True,
|
|
|
'optical_flow': True,
|
|
|
'concurrent_kernels': 128,
|
|
|
'max_shared_memory_per_sm': 164 * 1024,
|
|
|
'max_registers_per_thread': 255,
|
|
|
'max_warps_per_sm': 64
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
self.hal.initialize_hardware(
|
|
|
num_chips=num_chips,
|
|
|
vram_size_gb=vram_size_gb,
|
|
|
num_sms_per_chip=num_sms_per_chip,
|
|
|
num_cores_per_sm=num_cores_per_sm,
|
|
|
threads_per_core=threads_per_core,
|
|
|
threads_per_block=threads_per_block,
|
|
|
num_tensor_cores_per_sm=num_tensor_cores_per_sm,
|
|
|
memory_config=self.hardware_config['memory_config'],
|
|
|
power_config=self.hardware_config['power_config'],
|
|
|
compute_capability=self.hardware_config['compute_capability']
|
|
|
)
|
|
|
|
|
|
|
|
|
for chip_id in range(num_chips):
|
|
|
|
|
|
for sm_id in range(num_sms_per_chip):
|
|
|
sm = self.streaming_multiprocessors[chip_id][sm_id]
|
|
|
sm.initialize(
|
|
|
threads_per_core=threads_per_core,
|
|
|
threads_per_block=threads_per_block,
|
|
|
num_tensor_cores=num_tensor_cores_per_sm
|
|
|
)
|
|
|
|
|
|
|
|
|
self.array_distributors[chip_id].initialize(
|
|
|
threads_per_core=threads_per_core,
|
|
|
gate_delay=self.gate_delay
|
|
|
)
|
|
|
|
|
|
|
|
|
self.parallel_distributor.initialize(
|
|
|
hardware_config=self.hardware_config,
|
|
|
nvlink_topology=self.multi_gpu_system.system_state["nvlink_state"]["connections"]
|
|
|
)
|
|
|
|
|
|
|
|
|
for chip_id in range(num_chips):
|
|
|
pool_id = f"gpu_{chip_id}_global"
|
|
|
self.memory_pools[pool_id] = MemoryPool(
|
|
|
size_bytes=vram_size_gb * 1024 * 1024 * 1024,
|
|
|
chip_id=chip_id
|
|
|
)
|
|
|
|
|
|
|
|
|
for i in range(num_chips):
|
|
|
for j in range(i + 1, num_chips):
|
|
|
pool_id = f"shared_pool_{i}_{j}"
|
|
|
self.shared_pools[pool_id] = SharedMemoryPool(
|
|
|
size_bytes=1 * 1024 * 1024 * 1024,
|
|
|
gpu_a=i,
|
|
|
gpu_b=j
|
|
|
)
|
|
|
|
|
|
self.initialized = True
|
|
|
print(f"Virtual GPU Driver initialized with:")
|
|
|
print(f"- {num_chips} GPUs")
|
|
|
print(f"- {num_sms_per_chip} SMs per GPU")
|
|
|
print(f"- {vram_size_gb}GB VRAM per GPU")
|
|
|
print(f"- {self.hardware_config['total_threads']:,} total threads")
|
|
|
print(f"- {len(self.shared_pools)} shared memory pools")
|
|
|
print(f"- Gate delay: {self.gate_delay:.2e} seconds")
|
|
|
print(f"- Max switching frequency: {self.max_switch_freq:.2e} Hz")
|
|
|
|
|
|
def shutdown(self):
|
|
|
print("Shutting down Virtual GPU Driver...")
|
|
|
self.hal.shutdown_hardware()
|
|
|
|
|
|
|
|
|
for chip_id in range(len(self.streaming_multiprocessors)):
|
|
|
for sm_id in self.streaming_multiprocessors[chip_id]:
|
|
|
self.streaming_multiprocessors[chip_id][sm_id].shutdown()
|
|
|
|
|
|
self.multi_gpu_system.store_system_state()
|
|
|
self.initialized = False
|
|
|
print("Virtual GPU Driver shut down.")
|
|
|
|
|
|
def _select_optimal_sm(self, gpu_id: int, operation: Dict[str, Any]) -> int:
|
|
|
"""Select the optimal SM for an operation based on load and locality"""
|
|
|
sms = self.streaming_multiprocessors[gpu_id]
|
|
|
|
|
|
|
|
|
sm_loads = {
|
|
|
sm_id: len(sm.current_tensor_ops)
|
|
|
for sm_id, sm in sms.items()
|
|
|
}
|
|
|
|
|
|
|
|
|
sm_locality_scores = {
|
|
|
sm_id: self._calculate_data_locality(sm, operation)
|
|
|
for sm_id, sm in sms.items()
|
|
|
}
|
|
|
|
|
|
|
|
|
sm_scores = {
|
|
|
sm_id: (sm_locality_scores[sm_id] * DEFAULT_LOCALITY_WEIGHT +
|
|
|
(1.0 / (load + 1)) * DEFAULT_LOAD_WEIGHT)
|
|
|
for sm_id, load in sm_loads.items()
|
|
|
}
|
|
|
|
|
|
return max(sm_scores.items(), key=lambda x: x[1])[0]
|
|
|
|
|
|
def _calculate_data_locality(self, sm: StreamingMultiprocessor, operation: Dict[str, Any]) -> float:
|
|
|
"""Calculate data locality score for an SM based on input data location"""
|
|
|
locality_score = 0.0
|
|
|
|
|
|
|
|
|
for input_data in operation['inputs'].values():
|
|
|
if hasattr(input_data, 'address'):
|
|
|
if sm.has_data_in_local_memory(input_data.address):
|
|
|
locality_score += 1.0
|
|
|
|
|
|
return locality_score
|
|
|
|
|
|
def _get_available_warp(self, gpu_id: int, sm_id: int) -> Warp:
|
|
|
"""Get an available warp from the specified SM"""
|
|
|
warps = self.warps[gpu_id][sm_id]
|
|
|
|
|
|
|
|
|
warp_loads = [len(warp.get_active_threads()) for warp in warps]
|
|
|
min_load_idx = warp_loads.index(min(warp_loads))
|
|
|
|
|
|
return warps[min_load_idx]
|
|
|
|
|
|
def allocate_memory(self, size_bytes, chip_id=0):
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
return self.memory_manager.allocate(size_bytes, chip_id)
|
|
|
|
|
|
|
|
|
|
|
|
def batch_allocate_memory(self, allocations: list[tuple[int, str]], chip_id=0):
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
if hasattr(self.memory_manager, 'batch_allocate_with_keys'):
|
|
|
return self.memory_manager.batch_allocate_with_keys(allocations, chip_id)
|
|
|
else:
|
|
|
|
|
|
addresses = []
|
|
|
for size_bytes, key in allocations:
|
|
|
addresses.append(self.memory_manager.allocate_with_key(size_bytes, key, chip_id))
|
|
|
return addresses
|
|
|
|
|
|
def free_memory(self, address, chip_id=0):
|
|
|
"""Free allocated memory"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
try:
|
|
|
|
|
|
tensor_info = self.memory_manager.get_tensor_info(address)
|
|
|
if tensor_info:
|
|
|
|
|
|
key = f"tensor_{address}"
|
|
|
if self.local_storage.exists(key):
|
|
|
self.local_storage.delete(key)
|
|
|
self.memory_manager.free(address, chip_id)
|
|
|
return
|
|
|
|
|
|
|
|
|
self.memory_manager.free(address, chip_id)
|
|
|
|
|
|
|
|
|
key = f"mem_{address}"
|
|
|
if self.local_storage.exists(key):
|
|
|
self.local_storage.delete(key)
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to free memory at {address}: {str(e)}")
|
|
|
|
|
|
def write_memory(self, address, data, chip_id=0):
|
|
|
"""Write data to allocated memory"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
try:
|
|
|
|
|
|
tensor_info = self.memory_manager.get_tensor_info(address)
|
|
|
if tensor_info:
|
|
|
|
|
|
data_arr = np.asarray(data)
|
|
|
if data_arr.shape != tensor_info['shape']:
|
|
|
raise ValueError(f"Data shape {data_arr.shape} does not match tensor shape {tensor_info['shape']}")
|
|
|
self.memory_manager.write_data(address, data_arr.tobytes(), chip_id)
|
|
|
return
|
|
|
|
|
|
|
|
|
if isinstance(data, (bytes, bytearray)):
|
|
|
self.memory_manager.write_data(address, data, chip_id)
|
|
|
else:
|
|
|
self.memory_manager.write_data(address, bytes(data), chip_id)
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to write memory at {address}: {str(e)}")
|
|
|
|
|
|
def read_memory(self, address, size_bytes, chip_id=0):
|
|
|
"""Read data from allocated memory"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
try:
|
|
|
|
|
|
tensor_info = self.memory_manager.get_tensor_info(address)
|
|
|
if tensor_info:
|
|
|
data = self.memory_manager.read_data(address, size_bytes, chip_id)
|
|
|
return np.frombuffer(data, dtype=tensor_info['dtype']).reshape(tensor_info['shape'])
|
|
|
|
|
|
|
|
|
return self.memory_manager.read_data(address, size_bytes, chip_id)
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to read memory at {address}: {str(e)}")
|
|
|
|
|
|
def add_command(self, command_type, **kwargs):
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
self.command_processor.add_command(command_type, **kwargs)
|
|
|
|
|
|
def submit_commands(self, chip_id=0):
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
return self.command_processor.submit_commands(chip_id)
|
|
|
|
|
|
def clear_commands(self):
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
self.command_processor.clear_commands()
|
|
|
|
|
|
def create_memory_pool(self, size_bytes: int, shared: bool = False) -> str:
|
|
|
"""Create a new memory pool"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
|
|
|
try:
|
|
|
pool_id = f"pool_{len(self.memory_pools)}"
|
|
|
|
|
|
if shared:
|
|
|
pool = SharedMemoryPool(size_bytes)
|
|
|
self.shared_pools[pool_id] = pool
|
|
|
else:
|
|
|
pool = MemoryPool(size_bytes)
|
|
|
self.memory_pools[pool_id] = pool
|
|
|
|
|
|
return pool_id
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to create memory pool: {str(e)}")
|
|
|
|
|
|
def allocate_from_pool(self, pool_id: str, size_bytes: int) -> int:
|
|
|
"""Allocate memory from a specific pool"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
|
|
|
try:
|
|
|
if pool_id in self.shared_pools:
|
|
|
address = self.shared_pools[pool_id].atomic_allocate(size_bytes)
|
|
|
elif pool_id in self.memory_pools:
|
|
|
address = self.memory_pools[pool_id].allocate(size_bytes)
|
|
|
else:
|
|
|
raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
|
|
|
if address is None:
|
|
|
raise MemoryError(f"Failed to allocate {size_bytes} bytes from pool {pool_id}")
|
|
|
|
|
|
return address
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to allocate from pool: {str(e)}")
|
|
|
|
|
|
def free_pool_memory(self, pool_id: str, address: int):
|
|
|
"""Free memory allocated from a pool"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
|
|
|
try:
|
|
|
if pool_id in self.shared_pools:
|
|
|
self.shared_pools[pool_id].atomic_free(address)
|
|
|
elif pool_id in self.memory_pools:
|
|
|
self.memory_pools[pool_id].free(address)
|
|
|
else:
|
|
|
raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to free pool memory: {str(e)}")
|
|
|
|
|
|
def get_pool_stats(self, pool_id: str) -> dict:
|
|
|
"""Get statistics about a memory pool"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
|
|
|
try:
|
|
|
if pool_id in self.shared_pools:
|
|
|
pool = self.shared_pools[pool_id]
|
|
|
elif pool_id in self.memory_pools:
|
|
|
pool = self.memory_pools[pool_id]
|
|
|
else:
|
|
|
raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
|
|
|
return {
|
|
|
'fragmentation': pool.get_fragmentation(),
|
|
|
'total_size': pool.total_size,
|
|
|
'is_shared': pool_id in self.shared_pools,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to get pool stats: {str(e)}")
|
|
|
|
|
|
def delete_pool(self, pool_id: str):
|
|
|
"""Delete a memory pool"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized.")
|
|
|
|
|
|
try:
|
|
|
if pool_id in self.shared_pools:
|
|
|
del self.shared_pools[pool_id]
|
|
|
elif pool_id in self.memory_pools:
|
|
|
del self.memory_pools[pool_id]
|
|
|
else:
|
|
|
raise ValueError(f"Invalid pool ID: {pool_id}")
|
|
|
|
|
|
except Exception as e:
|
|
|
raise MemoryError(f"Failed to delete pool: {str(e)}")
|
|
|
|
|
|
def execute_kernel(self, chip_id, sm_id, core_id, thread_block_config, kernel_func, *args, **kwargs):
|
|
|
"""
|
|
|
Execute a kernel function across multiple thread blocks.
|
|
|
thread_block_config: {
|
|
|
'block_dim': (x, y, z), # threads per block
|
|
|
'grid_dim': (x, y, z), # blocks per grid
|
|
|
'shared_memory_size': int # bytes per block
|
|
|
}
|
|
|
"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("Driver not initialized")
|
|
|
|
|
|
blocks_per_grid = (
|
|
|
thread_block_config['grid_dim'][0] *
|
|
|
thread_block_config['grid_dim'][1] *
|
|
|
thread_block_config['grid_dim'][2]
|
|
|
)
|
|
|
|
|
|
threads_per_block = (
|
|
|
thread_block_config['block_dim'][0] *
|
|
|
thread_block_config['block_dim'][1] *
|
|
|
thread_block_config['block_dim'][2]
|
|
|
)
|
|
|
|
|
|
total_threads = blocks_per_grid * threads_per_block
|
|
|
|
|
|
if total_threads > self.hardware_config['threads_per_core']:
|
|
|
raise ValueError(f"Total threads {total_threads} exceeds maximum threads per core {self.hardware_config['threads_per_core']}")
|
|
|
|
|
|
self.add_command(
|
|
|
"execute_kernel",
|
|
|
chip_id=chip_id,
|
|
|
sm_id=sm_id,
|
|
|
core_id=core_id,
|
|
|
thread_block_config=thread_block_config,
|
|
|
kernel_func=kernel_func,
|
|
|
args=args,
|
|
|
kwargs=kwargs
|
|
|
)
|
|
|
print(f"Added kernel execution command for Chip {chip_id}, SM {sm_id}, Core {core_id} "
|
|
|
f"with {blocks_per_grid} blocks × {threads_per_block} threads = {total_threads} total threads")
|
|
|
|
|
|
def matmul(self, chip_id, sm_id, A, B):
|
|
|
self.add_command("matmul", chip_id=chip_id, sm_id=sm_id, A=A, B=B)
|
|
|
print(f"Added matmul command for Chip {chip_id}, SM {sm_id}.")
|
|
|
|
|
|
def block_barrier(self, chip_id, sm_id, core_id, block_id):
|
|
|
"""Synchronize all threads within a block"""
|
|
|
self.add_command(
|
|
|
"block_barrier",
|
|
|
chip_id=chip_id,
|
|
|
sm_id=sm_id,
|
|
|
core_id=core_id,
|
|
|
block_id=block_id
|
|
|
)
|
|
|
|
|
|
def core_barrier(self, chip_id, sm_id, core_id):
|
|
|
"""Synchronize all threads within a core"""
|
|
|
self.add_command(
|
|
|
"core_barrier",
|
|
|
chip_id=chip_id,
|
|
|
sm_id=sm_id,
|
|
|
core_id=core_id
|
|
|
)
|
|
|
|
|
|
def global_barrier(self, chip_id=0):
|
|
|
self.add_command("global_barrier", chip_id=chip_id)
|
|
|
print(f"Added global barrier command for Chip {chip_id}.")
|
|
|
|
|
|
def shared_memory_barrier(self, chip_id, sm_id):
|
|
|
self.add_command("shared_memory_barrier", chip_id=chip_id, sm_id=sm_id)
|
|
|
print(f"Added shared memory barrier command for Chip {chip_id}, SM {sm_id}.")
|
|
|
|
|
|
def atomic_operation(self, chip_id, sm_id, address, operation, value):
|
|
|
self.add_command("atomic_operation", chip_id=chip_id, sm_id=sm_id, address=address, operation=operation, value=value)
|
|
|
print(f"Added atomic operation command for Chip {chip_id}, SM {sm_id}.")
|
|
|
|
|
|
|
|
|
def create_buffer(self, size_bytes, buffer_type="vertex"):
|
|
|
return self.graphics_api.create_buffer(size_bytes, buffer_type)
|
|
|
|
|
|
def delete_buffer(self, buffer_id):
|
|
|
self.graphics_api.delete_buffer(buffer_id)
|
|
|
|
|
|
def buffer_data(self, buffer_id, data):
|
|
|
self.graphics_api.buffer_data(buffer_id, data)
|
|
|
|
|
|
def draw_arrays(self, mode, first, count):
|
|
|
self.graphics_api.draw_arrays(mode, first, count)
|
|
|
|
|
|
def draw_indexed(self, mode, count, index_buffer_id, index_offset=0):
|
|
|
self.graphics_api.draw_indexed(mode, count, index_buffer_id, index_offset)
|
|
|
|
|
|
def compile_shader(self, shader_source, shader_type="vertex"):
|
|
|
return self.graphics_api.compile_shader(shader_source, shader_type)
|
|
|
|
|
|
def use_program(self, program_id):
|
|
|
self.graphics_api.use_program(program_id)
|
|
|
|
|
|
def create_framebuffer(self, width, height):
|
|
|
return self.graphics_api.create_framebuffer(width, height)
|
|
|
|
|
|
def bind_framebuffer(self, framebuffer_id):
|
|
|
self.graphics_api.bind_framebuffer(framebuffer_id)
|
|
|
|
|
|
def clear_color(self, r, g, b, a):
|
|
|
self.graphics_api.clear_color(r, g, b, a)
|
|
|
|
|
|
def clear_depth(self, depth):
|
|
|
self.graphics_api.clear_depth(depth)
|
|
|
|
|
|
|
|
|
|