|
|
"""
|
|
|
Handles parallel distribution of operations across multiple GPUs at electron speed.
|
|
|
Implements advanced workload distribution strategies with NVLink topology awareness.
|
|
|
"""
|
|
|
import numpy as np
|
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
import time
|
|
|
import json
|
|
|
import logging
|
|
|
from http_storage import LocalStorage
|
|
|
from config import get_db_url
|
|
|
from electron_speed import max_switch_freq, GATE_DELAY
|
|
|
from logic_gates import LogicGate
|
|
|
from virtual_vram import VirtualVRAM
|
|
|
from cross_gpu_stream import CrossGPUStreamManager
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
class GPUParallelDistributor:
|
|
|
def __init__(self, num_gpus: int = 8):
|
|
|
self.num_gpus = num_gpus
|
|
|
self.storage = LocalStorage(db_url=get_db_url())
|
|
|
self.initialized = False
|
|
|
self.hardware_config = None
|
|
|
self.nvlink_topology = None
|
|
|
|
|
|
|
|
|
self.stream_manager = CrossGPUStreamManager(storage_url=get_db_url())
|
|
|
|
|
|
|
|
|
self.operation_history = {}
|
|
|
self.gpu_load_history = {i: [] for i in range(num_gpus)}
|
|
|
self.bandwidth_usage = {i: 0 for i in range(num_gpus)}
|
|
|
|
|
|
|
|
|
self.load_threshold = 0.8
|
|
|
self.min_chunk_size = 3024
|
|
|
self.max_concurrent_kernels = 128
|
|
|
|
|
|
def initialize(self, hardware_config: Dict[str, Any], nvlink_topology: Dict[str, Any]):
|
|
|
"""Initialize the distributor with hardware configuration and NVLink topology"""
|
|
|
self.hardware_config = hardware_config
|
|
|
self.nvlink_topology = nvlink_topology
|
|
|
|
|
|
|
|
|
self.peak_flops = (
|
|
|
hardware_config['num_chips'] *
|
|
|
hardware_config['num_sms_per_chip'] *
|
|
|
hardware_config['num_cores_per_sm'] *
|
|
|
2
|
|
|
) * hardware_config['max_switch_freq']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
vram_sizes = hardware_config.get('per_gpu_vram_sizes', {})
|
|
|
default_vram_size = hardware_config.get('vram_size_gb', None)
|
|
|
|
|
|
for i in range(self.num_gpus):
|
|
|
|
|
|
vram_size = vram_sizes.get(i, default_vram_size)
|
|
|
|
|
|
self.vram = {}
|
|
|
self.sm_capacity = {}
|
|
|
|
|
|
for i in range(self.num_gpus):
|
|
|
|
|
|
self.vram[i] = VirtualVRAM(size_gb=None, storage=self.storage)
|
|
|
self.sm_capacity[i] = hardware_config['num_sms_per_chip']
|
|
|
|
|
|
|
|
|
self.hal = self.storage.get_hal_connection()
|
|
|
|
|
|
self.initialized = True
|
|
|
|
|
|
def _calculate_nvlink_score(self, gpu_id: int) -> float:
|
|
|
"""Calculate NVLink connectivity score for a GPU"""
|
|
|
total_bandwidth = 0
|
|
|
used_bandwidth = 0
|
|
|
|
|
|
|
|
|
links = self.hal.execute("""
|
|
|
SELECT bandwidth_tbps, state_json
|
|
|
FROM optical_interconnects
|
|
|
WHERE chip_a_id = ? OR chip_b_id = ?
|
|
|
""", (gpu_id, gpu_id)).fetchall()
|
|
|
|
|
|
for bandwidth_tbps, state_json in links:
|
|
|
state = json.loads(state_json)
|
|
|
total_bandwidth += bandwidth_tbps * 1000
|
|
|
used_bandwidth += state['current_bandwidth_usage']
|
|
|
|
|
|
return 1.0 - (used_bandwidth / total_bandwidth) if total_bandwidth > 0 else 0
|
|
|
|
|
|
def _select_optimal_gpus(self, input_sizes: Dict[int, int]) -> List[int]:
|
|
|
"""Select optimal GPUs based on data locality and load"""
|
|
|
gpu_scores = {}
|
|
|
|
|
|
|
|
|
gpu_states = self.hal.execute("""
|
|
|
SELECT chip_id, state_json
|
|
|
FROM gpu_chips
|
|
|
WHERE chip_id < ?
|
|
|
""", (self.num_gpus,)).fetchall()
|
|
|
|
|
|
for chip_id, state_json in gpu_states:
|
|
|
state = json.loads(state_json)
|
|
|
|
|
|
|
|
|
load_score = 1.0 - state.get('current_utilization', 0.0)
|
|
|
|
|
|
|
|
|
locality_score = self._calculate_memory_locality(chip_id, input_sizes)
|
|
|
|
|
|
|
|
|
nvlink_score = self._calculate_nvlink_score(chip_id)
|
|
|
|
|
|
|
|
|
gpu_scores[chip_id] = (
|
|
|
0.4 * load_score +
|
|
|
0.4 * locality_score +
|
|
|
0.2 * nvlink_score
|
|
|
)
|
|
|
|
|
|
|
|
|
total_size = sum(input_sizes.values())
|
|
|
num_gpus_needed = max(1, total_size // (1024 * 1024 * 1024))
|
|
|
|
|
|
sorted_gpus = sorted(gpu_scores.items(), key=lambda x: x[1], reverse=True)
|
|
|
return [gpu_id for gpu_id, _ in sorted_gpus[:num_gpus_needed]]
|
|
|
|
|
|
def _register_cross_gpu_operation(self, op_type: str, distributed_ops: List[Dict[str, Any]]) -> int:
|
|
|
"""Register a cross-GPU operation in HAL database"""
|
|
|
|
|
|
self.hal.execute("""
|
|
|
INSERT INTO cross_gpu_operations (
|
|
|
operation_type, source_chip, target_chip, nvlink_path, start_time, state_json
|
|
|
) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?)
|
|
|
""", (
|
|
|
op_type,
|
|
|
distributed_ops[0]['gpu_id'],
|
|
|
distributed_ops[-1]['gpu_id'],
|
|
|
json.dumps([op['nvlink_paths'] for op in distributed_ops]),
|
|
|
json.dumps({
|
|
|
'status': 'started',
|
|
|
'num_chunks': len(distributed_ops),
|
|
|
'completion': 0.0
|
|
|
})
|
|
|
))
|
|
|
|
|
|
return self.hal.execute("SELECT last_insert_rowid()").fetchone()[0]
|
|
|
|
|
|
def _setup_memory_coherence(self, distributed_ops: List[Dict[str, Any]]):
|
|
|
"""Setup memory coherence tracking for cross-GPU operation"""
|
|
|
for op in distributed_ops:
|
|
|
gpu_id = op['gpu_id']
|
|
|
|
|
|
|
|
|
for addr in op['inputs'].values():
|
|
|
self.hal.execute("""
|
|
|
INSERT OR REPLACE INTO memory_coherence (
|
|
|
address, chip_id, version, last_modified, dirty
|
|
|
) VALUES (?, ?,
|
|
|
COALESCE((SELECT version + 1 FROM memory_coherence
|
|
|
WHERE address = ? AND chip_id = ?), 1),
|
|
|
CURRENT_TIMESTAMP, FALSE)
|
|
|
""", (addr, gpu_id, addr, gpu_id))
|
|
|
|
|
|
|
|
|
if 'output' in op:
|
|
|
self.hal.execute("""
|
|
|
INSERT OR REPLACE INTO memory_coherence (
|
|
|
address, chip_id, version, last_modified, dirty
|
|
|
) VALUES (?, ?, 1, CURRENT_TIMESTAMP, TRUE)
|
|
|
""", (op['output'], gpu_id))
|
|
|
|
|
|
self.hal.commit()
|
|
|
|
|
|
def _calculate_memory_locality(self, chip_id: int, input_sizes: Dict[int, int]) -> float:
|
|
|
"""Calculate memory locality score based on data presence"""
|
|
|
total_size = sum(input_sizes.values())
|
|
|
if total_size == 0:
|
|
|
return 1.0
|
|
|
|
|
|
local_data = 0
|
|
|
for addr, size in input_sizes.items():
|
|
|
|
|
|
result = self.hal.execute("""
|
|
|
SELECT COUNT(*)
|
|
|
FROM memory_coherence
|
|
|
WHERE address = ? AND chip_id = ? AND dirty = FALSE
|
|
|
""", (addr, chip_id)).fetchone()
|
|
|
|
|
|
if result[0] > 0:
|
|
|
local_data += size
|
|
|
|
|
|
return local_data / total_size
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def distribute_operation(self, operation: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Distribute operation across GPUs using advanced scheduling strategies:
|
|
|
- Load balancing across GPUs
|
|
|
- NVLink topology awareness
|
|
|
- Memory locality optimization
|
|
|
- Dynamic chunk sizing
|
|
|
- Operation type specific optimizations
|
|
|
"""
|
|
|
if not self.initialized:
|
|
|
raise RuntimeError("GPUParallelDistributor not initialized")
|
|
|
|
|
|
op_type = operation.get("type", "")
|
|
|
input_size = operation.get("input_size", 0)
|
|
|
|
|
|
|
|
|
self._update_load_history()
|
|
|
|
|
|
|
|
|
target_gpus = self._select_target_gpus(operation)
|
|
|
|
|
|
|
|
|
chunk_sizes = self._calculate_chunk_sizes(operation, target_gpus)
|
|
|
|
|
|
|
|
|
strategy = self._get_distribution_strategy(op_type, input_size)
|
|
|
|
|
|
|
|
|
if op_type == "matmul":
|
|
|
distributed_ops = self._distribute_matmul(operation, target_gpus, chunk_sizes)
|
|
|
elif op_type == "conv":
|
|
|
distributed_ops = self._distribute_conv(operation, target_gpus, chunk_sizes)
|
|
|
elif op_type == "tensor":
|
|
|
distributed_ops = self._distribute_tensor(operation, target_gpus, chunk_sizes)
|
|
|
elif op_type == "reduction":
|
|
|
distributed_ops = self._distribute_reduction(operation, target_gpus, chunk_sizes)
|
|
|
elif op_type == "transformer":
|
|
|
distributed_ops = self._distribute_transformer(operation, target_gpus, chunk_sizes)
|
|
|
else:
|
|
|
distributed_ops = self._distribute_generic(operation, target_gpus, chunk_sizes)
|
|
|
|
|
|
|
|
|
stream = self.stream_manager.create_stream()
|
|
|
|
|
|
|
|
|
for op in distributed_ops:
|
|
|
|
|
|
op['metadata'] = {
|
|
|
'estimated_flops': self._estimate_flops(op),
|
|
|
'estimated_memory': self._estimate_memory(op),
|
|
|
'estimated_time': self._estimate_execution_time(op)
|
|
|
}
|
|
|
|
|
|
|
|
|
self.stream_manager.add_cross_gpu_operation(stream.stream_id, {
|
|
|
'type': 'compute',
|
|
|
'gpu_id': op['gpu_id'],
|
|
|
'operation': op
|
|
|
})
|
|
|
|
|
|
|
|
|
if 'dependencies' in op:
|
|
|
for dep in op['dependencies']:
|
|
|
self.stream_manager.add_cross_gpu_operation(stream.stream_id, {
|
|
|
'type': 'transfer',
|
|
|
'source_gpu': dep['gpu_id'],
|
|
|
'target_gpu': op['gpu_id'],
|
|
|
'size': dep['size']
|
|
|
})
|
|
|
|
|
|
|
|
|
if op.get('sync_gpus'):
|
|
|
self.stream_manager.add_cross_gpu_operation(stream.stream_id, {
|
|
|
'type': 'sync',
|
|
|
'gpu_ids': op['sync_gpus']
|
|
|
})
|
|
|
|
|
|
|
|
|
op_id = len(self.operation_history)
|
|
|
self.operation_history[op_id] = {
|
|
|
'type': op_type,
|
|
|
'size': input_size,
|
|
|
'distribution': distributed_ops,
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
|
|
|
return distributed_ops
|
|
|
|
|
|
def _distribute_matmul(self, operation: Dict[str, Any], target_gpus: List[int],
|
|
|
chunk_sizes: Dict[int, int]) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Distribute matrix multiplication across GPUs using advanced strategies:
|
|
|
- 2D decomposition for large matrices
|
|
|
- Pipeline stages for multi-GPU execution
|
|
|
- Memory locality optimization
|
|
|
- NVLink path optimization
|
|
|
"""
|
|
|
matrix_a = operation["inputs"]["A"]
|
|
|
matrix_b = operation["inputs"]["B"]
|
|
|
rows_a, cols_a = matrix_a.shape
|
|
|
rows_b, cols_b = matrix_b.shape
|
|
|
|
|
|
|
|
|
if rows_a >= 8192 and cols_b >= 8192:
|
|
|
|
|
|
return self._distribute_matmul_2d(matrix_a, matrix_b, target_gpus, chunk_sizes)
|
|
|
else:
|
|
|
|
|
|
return self._distribute_matmul_1d(matrix_a, matrix_b, target_gpus, chunk_sizes)
|
|
|
|
|
|
def _distribute_matmul_2d(self, matrix_a: np.ndarray, matrix_b: np.ndarray,
|
|
|
target_gpus: List[int], chunk_sizes: Dict[int, int]) -> List[Dict[str, Any]]:
|
|
|
"""Implement 2D matrix decomposition across GPUs"""
|
|
|
rows_a, cols_a = matrix_a.shape
|
|
|
rows_b, cols_b = matrix_b.shape
|
|
|
|
|
|
|
|
|
grid_dim = int(np.sqrt(len(target_gpus)))
|
|
|
row_chunks = rows_a // grid_dim
|
|
|
col_chunks = cols_b // grid_dim
|
|
|
|
|
|
distributed_ops = []
|
|
|
|
|
|
for i, gpu_id in enumerate(target_gpus):
|
|
|
grid_row = i // grid_dim
|
|
|
grid_col = i % grid_dim
|
|
|
|
|
|
|
|
|
row_start = grid_row * row_chunks
|
|
|
row_end = row_start + row_chunks if grid_row < grid_dim - 1 else rows_a
|
|
|
col_start = grid_col * col_chunks
|
|
|
col_end = col_start + col_chunks if grid_col < grid_dim - 1 else cols_b
|
|
|
|
|
|
chunk_op = {
|
|
|
"type": "matmul_2d",
|
|
|
"gpu_id": gpu_id,
|
|
|
"grid_position": (grid_row, grid_col),
|
|
|
"inputs": {
|
|
|
"A": matrix_a[row_start:row_end, :],
|
|
|
"B": matrix_b[:, col_start:col_end]
|
|
|
},
|
|
|
"output_shape": (row_end - row_start, col_end - col_start),
|
|
|
"communication": {
|
|
|
"row_gpus": target_gpus[grid_row * grid_dim:(grid_row + 1) * grid_dim],
|
|
|
"col_gpus": target_gpus[grid_col::grid_dim]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
chunk_op["nvlink_paths"] = self._get_optimal_nvlink_paths(gpu_id, chunk_op["communication"])
|
|
|
|
|
|
distributed_ops.append(chunk_op)
|
|
|
|
|
|
return distributed_ops
|
|
|
|
|
|
def _select_target_gpus(self, operation: Dict[str, Any]) -> List[int]:
|
|
|
"""Select optimal GPUs based on load, memory, and NVLink topology"""
|
|
|
gpu_scores = {}
|
|
|
|
|
|
for gpu_id in range(self.num_gpus):
|
|
|
|
|
|
load_score = np.mean(self.gpu_load_history[gpu_id][-10:]) if self.gpu_load_history[gpu_id] else 0
|
|
|
|
|
|
|
|
|
bandwidth_score = 1.0 - (self.bandwidth_usage[gpu_id] / self.hardware_config['memory_config']['bandwidth_gb_per_sec'])
|
|
|
|
|
|
|
|
|
nvlink_score = self._calculate_nvlink_score(gpu_id)
|
|
|
|
|
|
|
|
|
gpu_scores[gpu_id] = 0.4 * (1.0 - load_score) + 0.3 * bandwidth_score + 0.3 * nvlink_score
|
|
|
|
|
|
|
|
|
sorted_gpus = sorted(gpu_scores.items(), key=lambda x: x[1], reverse=True)
|
|
|
num_gpus_needed = self._estimate_gpus_needed(operation)
|
|
|
|
|
|
return [gpu_id for gpu_id, _ in sorted_gpus[:num_gpus_needed]]
|
|
|
|
|
|
def _calculate_nvlink_score(self, gpu_id: int) -> float:
|
|
|
"""Calculate NVLink connectivity score for a GPU"""
|
|
|
total_bandwidth = 0
|
|
|
used_bandwidth = 0
|
|
|
|
|
|
for link_id, link_info in self.nvlink_topology.items():
|
|
|
if link_info['gpu_a'] == gpu_id or link_info['gpu_b'] == gpu_id:
|
|
|
total_bandwidth += link_info['bandwidth_gbps']
|
|
|
used_bandwidth += self.nvlink_bandwidth[link_id]['used']
|
|
|
|
|
|
return 1.0 - (used_bandwidth / total_bandwidth) if total_bandwidth > 0 else 0
|
|
|
|
|
|
def _estimate_gpus_needed(self, operation: Dict[str, Any]) -> int:
|
|
|
"""Estimate number of GPUs needed based on operation size and type"""
|
|
|
op_type = operation.get("type", "")
|
|
|
input_size = operation.get("input_size", 0)
|
|
|
|
|
|
if op_type == "matmul":
|
|
|
|
|
|
matrix_a = operation["inputs"]["A"]
|
|
|
matrix_b = operation["inputs"]["B"]
|
|
|
flops = 2 * matrix_a.shape[0] * matrix_a.shape[1] * matrix_b.shape[1]
|
|
|
return min(self.num_gpus, max(1, flops // (self.peak_flops // self.num_gpus)))
|
|
|
|
|
|
elif op_type == "conv":
|
|
|
|
|
|
input_tensor = operation["inputs"]["tensor"]
|
|
|
batch_size = input_tensor.shape[0]
|
|
|
return min(self.num_gpus, max(1, batch_size // 32))
|
|
|
|
|
|
else:
|
|
|
|
|
|
return min(self.num_gpus, max(1, input_size // (1024 * 1024 * 1024)))
|
|
|
|
|
|
def _calculate_chunk_sizes(self, operation: Dict[str, Any], target_gpus: List[int]) -> Dict[int, int]:
|
|
|
"""Calculate optimal chunk sizes for each GPU based on their capabilities"""
|
|
|
op_type = operation.get("type", "")
|
|
|
total_size = operation.get("input_size", 0)
|
|
|
|
|
|
chunk_sizes = {}
|
|
|
total_compute_power = sum(self.sm_capacity[gpu_id] for gpu_id in target_gpus)
|
|
|
|
|
|
for gpu_id in target_gpus:
|
|
|
|
|
|
gpu_power = self.sm_capacity[gpu_id]
|
|
|
load_factor = 1.0 - np.mean(self.gpu_load_history[gpu_id][-10:]) if self.gpu_load_history[gpu_id] else 1.0
|
|
|
|
|
|
proportion = (gpu_power * load_factor) / total_compute_power
|
|
|
chunk_sizes[gpu_id] = max(self.min_chunk_size, int(total_size * proportion))
|
|
|
|
|
|
return chunk_sizes
|
|
|
|
|
|
def _update_load_history(self):
|
|
|
"""Update GPU load history with current utilization"""
|
|
|
for gpu_id in range(self.num_gpus):
|
|
|
current_load = len([op for op in self.operation_history.values()
|
|
|
if any(sub_op['gpu_id'] == gpu_id for sub_op in op['distribution'])])
|
|
|
self.gpu_load_history[gpu_id].append(current_load / self.max_concurrent_kernels)
|
|
|
|
|
|
|
|
|
if len(self.gpu_load_history[gpu_id]) > 100:
|
|
|
self.gpu_load_history[gpu_id] = self.gpu_load_history[gpu_id][-100:]
|
|
|
|
|
|
def _distribute_conv(self, operation: Dict[str, Any], target_gpus: List[int],
|
|
|
chunk_sizes: Dict[int, int]) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Distribute convolution operation across GPUs using database storage
|
|
|
for operation tracking and tensor data management.
|
|
|
"""
|
|
|
input_tensor = operation["inputs"]["tensor"]
|
|
|
kernel = operation["inputs"]["kernel"]
|
|
|
batch_size = input_tensor.shape[0]
|
|
|
|
|
|
|
|
|
input_key = f"conv_input_{time.time_ns()}"
|
|
|
kernel_key = f"conv_kernel_{time.time_ns()}"
|
|
|
|
|
|
|
|
|
self.storage.store(input_key, {
|
|
|
'data': input_tensor.tobytes(),
|
|
|
'shape': input_tensor.shape,
|
|
|
'dtype': str(input_tensor.dtype)
|
|
|
}, compress=True)
|
|
|
|
|
|
self.storage.store(kernel_key, {
|
|
|
'data': kernel.tobytes(),
|
|
|
'shape': kernel.shape,
|
|
|
'dtype': str(kernel.dtype)
|
|
|
}, compress=True)
|
|
|
|
|
|
distributed_ops = []
|
|
|
op_tracking_key = f"conv_op_{time.time_ns()}"
|
|
|
|
|
|
try:
|
|
|
|
|
|
chunks_per_gpu = self._calculate_optimal_chunks(batch_size, len(target_gpus))
|
|
|
|
|
|
for i, gpu_id in enumerate(target_gpus):
|
|
|
start_batch = sum(chunks_per_gpu[:i])
|
|
|
end_batch = start_batch + chunks_per_gpu[i]
|
|
|
|
|
|
|
|
|
chunk_key = f"{op_tracking_key}_chunk_{gpu_id}"
|
|
|
chunk_op = {
|
|
|
"type": "conv",
|
|
|
"gpu_id": gpu_id,
|
|
|
"input_key": input_key,
|
|
|
"kernel_key": kernel_key,
|
|
|
"batch_range": (start_batch, end_batch),
|
|
|
"memory_config": {
|
|
|
"cache_mode": "l1_cached",
|
|
|
"prefetch_enabled": True,
|
|
|
"chunk_size": chunk_sizes[gpu_id]
|
|
|
},
|
|
|
"nvlink_paths": self._get_optimal_nvlink_paths(gpu_id, {
|
|
|
"input_size": (end_batch - start_batch) * np.prod(input_tensor.shape[1:]),
|
|
|
"kernel_size": np.prod(kernel.shape)
|
|
|
})
|
|
|
}
|
|
|
|
|
|
|
|
|
self.storage.store(chunk_key, chunk_op)
|
|
|
|
|
|
|
|
|
distributed_ops.append({
|
|
|
"type": "conv",
|
|
|
"gpu_id": gpu_id,
|
|
|
"op_key": chunk_key,
|
|
|
"input_ref": {
|
|
|
"key": input_key,
|
|
|
"range": (start_batch, end_batch)
|
|
|
},
|
|
|
"kernel_ref": {
|
|
|
"key": kernel_key
|
|
|
}
|
|
|
})
|
|
|
|
|
|
|
|
|
self.storage.store(op_tracking_key, {
|
|
|
"type": "conv_distribution",
|
|
|
"num_gpus": len(target_gpus),
|
|
|
"chunks": chunks_per_gpu,
|
|
|
"input_key": input_key,
|
|
|
"kernel_key": kernel_key,
|
|
|
"chunk_keys": [f"{op_tracking_key}_chunk_{gpu_id}" for gpu_id in target_gpus]
|
|
|
})
|
|
|
|
|
|
return distributed_ops
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
self.storage.delete(input_key)
|
|
|
self.storage.delete(kernel_key)
|
|
|
self.storage.delete(op_tracking_key)
|
|
|
for gpu_id in target_gpus:
|
|
|
self.storage.delete(f"{op_tracking_key}_chunk_{gpu_id}")
|
|
|
raise e
|
|
|
|
|
|
def _calculate_optimal_chunks(self, total_size: int, num_gpus: int) -> List[int]:
|
|
|
"""Calculate optimal chunk sizes based on GPU capabilities and current load"""
|
|
|
chunks = []
|
|
|
remaining = total_size
|
|
|
|
|
|
|
|
|
gpu_loads = {
|
|
|
gpu_id: np.mean(self.gpu_load_history[gpu_id][-10:])
|
|
|
if self.gpu_load_history[gpu_id] else 0
|
|
|
for gpu_id in range(num_gpus)
|
|
|
}
|
|
|
|
|
|
|
|
|
total_availability = sum(1.0 - load for load in gpu_loads.values())
|
|
|
if total_availability == 0:
|
|
|
total_availability = num_gpus
|
|
|
|
|
|
for i in range(num_gpus):
|
|
|
if i == num_gpus - 1:
|
|
|
chunks.append(remaining)
|
|
|
else:
|
|
|
|
|
|
load_factor = 1.0 - gpu_loads[i]
|
|
|
chunk_size = int((load_factor / total_availability) * total_size)
|
|
|
chunk_size = max(1, min(chunk_size, remaining))
|
|
|
chunks.append(chunk_size)
|
|
|
remaining -= chunk_size
|
|
|
|
|
|
return chunks
|
|
|
|
|
|
def distribute_operation(self, input_tensors_memory_size: Dict[int, int]) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Distribute operations across GPUs based on input tensor sizes and GPU states
|
|
|
Args:
|
|
|
input_tensors_memory_size: Dictionary mapping tensor addresses to their sizes in bytes
|
|
|
Returns:
|
|
|
List of distributed operations with their configurations
|
|
|
# chunk_sizes: Dict[int, int]) -> List[Dict[str, Any]]:
|
|
|
# """
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tensor_sizes = input_tensors_memory_size
|
|
|
gpus = self._select_optimal_gpus(tensor_sizes)
|
|
|
if not gpus:
|
|
|
logging.warning("No suitable GPUs found for operation")
|
|
|
return None
|
|
|
|
|
|
|
|
|
chunk_sizes = {}
|
|
|
total_size = sum(tensor_sizes.values())
|
|
|
for gpu_id in gpus:
|
|
|
gpu_state = json.loads(self.hal.execute("""
|
|
|
SELECT state_json FROM gpu_chips WHERE chip_id = ?
|
|
|
""", (gpu_id,)).fetchone()[0])
|
|
|
|
|
|
free_memory = gpu_state.get('free_memory_bytes', 0)
|
|
|
chunk_sizes[gpu_id] = min(free_memory // 2, total_size // len(gpus))
|
|
|
|
|
|
|
|
|
num_stages = min(len(gpus), max(1, total_size // self.min_chunk_size))
|
|
|
stage_size = total_size // num_stages
|
|
|
|
|
|
distributed_ops = []
|
|
|
for stage, gpu_id in enumerate(gpus[:num_stages]):
|
|
|
start_idx = stage * stage_size
|
|
|
end_idx = start_idx + stage_size if stage < num_stages - 1 else total_size
|
|
|
|
|
|
|
|
|
stage_op = {
|
|
|
"type": "distributed_tensor",
|
|
|
"gpu_id": gpu_id,
|
|
|
"stage": stage,
|
|
|
"num_stages": num_stages,
|
|
|
"range": (start_idx, end_idx),
|
|
|
"chunk_size": chunk_sizes[gpu_id],
|
|
|
"pipeline_config": {
|
|
|
"stage_id": stage,
|
|
|
"total_stages": num_stages,
|
|
|
"next_gpu": gpus[(stage + 1) % num_stages] if stage < num_stages - 1 else None,
|
|
|
"prev_gpu": gpus[stage - 1] if stage > 0 else None
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
stage_op["memory_access"] = self._optimize_memory_access(stage_op)
|
|
|
|
|
|
|
|
|
stage_op["sync_points"] = self._generate_sync_points(stage_op)
|
|
|
|
|
|
distributed_ops.append(stage_op)
|
|
|
|
|
|
return distributed_ops
|
|
|
|
|
|
def _calculate_optimal_pipeline_stages(self, operation: Dict[str, Any]) -> int:
|
|
|
"""Calculate optimal number of pipeline stages based on operation characteristics"""
|
|
|
|
|
|
op_type = operation.get("type", "")
|
|
|
input_size = operation.get("input_size", 0)
|
|
|
|
|
|
if op_type in ["reduction", "scan"]:
|
|
|
|
|
|
return min(3, self.num_gpus)
|
|
|
elif op_type in ["map", "filter"]:
|
|
|
|
|
|
return min(8, self.num_gpus)
|
|
|
else:
|
|
|
|
|
|
return min(4, self.num_gpus)
|
|
|
|
|
|
def _optimize_memory_access(self, stage_op: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""Optimize memory access patterns for the operation"""
|
|
|
return {
|
|
|
"access_pattern": "sequential" if stage_op["type"] in ["reduction", "scan"] else "strided",
|
|
|
"prefetch_distance": 2 if stage_op["type"] in ["map", "filter"] else 1,
|
|
|
"cache_hint": "temporal" if stage_op["type"] in ["matmul", "conv"] else "spatial"
|
|
|
}
|
|
|
|
|
|
def _generate_sync_points(self, stage_op: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
|
"""Generate synchronization points for pipeline stages"""
|
|
|
sync_points = []
|
|
|
|
|
|
if stage_op["pipeline_config"]["prev_gpu"] is not None:
|
|
|
sync_points.append({
|
|
|
"type": "wait",
|
|
|
"gpu_id": stage_op["pipeline_config"]["prev_gpu"],
|
|
|
"stage": stage_op["stage"] - 1
|
|
|
})
|
|
|
|
|
|
if stage_op["pipeline_config"]["next_gpu"] is not None:
|
|
|
sync_points.append({
|
|
|
"type": "signal",
|
|
|
"gpu_id": stage_op["pipeline_config"]["next_gpu"],
|
|
|
"stage": stage_op["stage"]
|
|
|
})
|
|
|
|
|
|
return sync_points
|
|
|
|
|
|
async def distribute_cuda_ops(self, tensor_data: Dict[str, Any], workload_per_core: float, total_cores: int) -> Dict[str, Any]:
|
|
|
"""Distribute operations optimized for CUDA cores."""
|
|
|
try:
|
|
|
data = tensor_data['data']
|
|
|
operation = tensor_data.get('operation', 'generic')
|
|
|
|
|
|
|
|
|
chunk_size = int(len(data) / total_cores)
|
|
|
chunks = []
|
|
|
|
|
|
for i in range(0, total_cores):
|
|
|
start_idx = i * chunk_size
|
|
|
end_idx = start_idx + chunk_size if i < total_cores - 1 else len(data)
|
|
|
chunk_data = data[start_idx:end_idx]
|
|
|
|
|
|
chunk_op = {
|
|
|
"type": "cuda",
|
|
|
"operation": operation,
|
|
|
"data": chunk_data,
|
|
|
"core_id": i
|
|
|
}
|
|
|
chunks.append(chunk_op)
|
|
|
|
|
|
|
|
|
results = await self._process_cuda_chunks(chunks)
|
|
|
|
|
|
|
|
|
combined_data = np.concatenate([r['data'] for r in results])
|
|
|
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
'operation': operation,
|
|
|
'data': combined_data
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'status': 'error',
|
|
|
'operation': tensor_data.get('operation', 'unknown'),
|
|
|
'message': str(e),
|
|
|
'data': []
|
|
|
}
|
|
|
|
|
|
async def distribute_tensor_ops(self, tensor_data: Dict[str, Any], workload_per_core: float, total_cores: int) -> Dict[str, Any]:
|
|
|
"""Distribute operations optimized for tensor cores."""
|
|
|
try:
|
|
|
data = tensor_data['data']
|
|
|
operation = tensor_data.get('operation', 'matmul')
|
|
|
|
|
|
|
|
|
chunk_size = int(len(data) / total_cores)
|
|
|
chunks = []
|
|
|
|
|
|
for i in range(0, total_cores):
|
|
|
start_idx = i * chunk_size
|
|
|
end_idx = start_idx + chunk_size if i < total_cores - 1 else len(data)
|
|
|
chunk_data = data[start_idx:end_idx]
|
|
|
|
|
|
chunk_op = {
|
|
|
"type": "tensor",
|
|
|
"operation": operation,
|
|
|
"data": chunk_data,
|
|
|
"core_id": i
|
|
|
}
|
|
|
chunks.append(chunk_op)
|
|
|
|
|
|
|
|
|
results = await self._process_tensor_chunks(chunks)
|
|
|
|
|
|
|
|
|
combined_data = np.concatenate([r['data'] for r in results])
|
|
|
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
'operation': operation,
|
|
|
'data': combined_data
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'status': 'error',
|
|
|
'operation': tensor_data.get('operation', 'unknown'),
|
|
|
'message': str(e),
|
|
|
'data': []
|
|
|
}
|
|
|
|
|
|
async def _process_cuda_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
"""Process data chunks using CUDA cores."""
|
|
|
results = []
|
|
|
for chunk in chunks:
|
|
|
|
|
|
if chunk['operation'] == 'elemwise':
|
|
|
result = self._process_elemwise_cuda(chunk['data'])
|
|
|
elif chunk['operation'] == 'reduction':
|
|
|
result = self._process_reduction_cuda(chunk['data'])
|
|
|
else:
|
|
|
result = self._process_generic_cuda(chunk['data'])
|
|
|
results.append({'data': result, 'core_id': chunk['core_id']})
|
|
|
return results
|
|
|
|
|
|
async def _process_tensor_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
"""Process data chunks using tensor cores."""
|
|
|
results = []
|
|
|
for chunk in chunks:
|
|
|
|
|
|
if chunk['operation'] == 'matmul':
|
|
|
result = self._process_matmul_tensor(chunk['data'])
|
|
|
elif chunk['operation'] == 'conv2d':
|
|
|
result = self._process_conv2d_tensor(chunk['data'])
|
|
|
else:
|
|
|
result = self._process_generic_tensor(chunk['data'])
|
|
|
results.append({'data': result, 'core_id': chunk['core_id']})
|
|
|
return results
|
|
|
|
|
|
def _process_elemwise_cuda(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""Process element-wise operations using CUDA cores."""
|
|
|
|
|
|
return data * 2
|
|
|
|
|
|
def _process_reduction_cuda(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""Process reduction operations using CUDA cores."""
|
|
|
|
|
|
return np.sum(data, axis=0)
|
|
|
|
|
|
def _process_generic_cuda(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""Process generic operations using CUDA cores."""
|
|
|
|
|
|
return data + 1
|
|
|
|
|
|
def _process_matmul_tensor(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""Process matrix multiplication using tensor cores."""
|
|
|
|
|
|
if len(data.shape) < 2:
|
|
|
data = data.reshape((-1, 1))
|
|
|
return np.matmul(data, data.T)
|
|
|
|
|
|
def _process_conv2d_tensor(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""Process 2D convolution using tensor cores."""
|
|
|
|
|
|
kernel = np.ones((3, 3)) / 9
|
|
|
return np.apply_along_axis(lambda x: np.convolve(x, kernel.flatten(), mode='same'),
|
|
|
axis=0, arr=data)
|
|
|
|
|
|
def _process_generic_tensor(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""Process generic operations using tensor cores."""
|
|
|
|
|
|
if len(data.shape) < 2:
|
|
|
data = data.reshape((-1, 1))
|
|
|
return np.matmul(data, np.ones_like(data)) |