|
|
"""
|
|
|
Tensor Core subsystem for hyperrealistic GPU simulation.
|
|
|
Models hardware-level matrix multiply-accumulate, scheduling, and memory integration.
|
|
|
Uses remote storage for high-speed distributed access and synchronization.
|
|
|
"""
|
|
|
|
|
|
import time
|
|
|
import sys
|
|
|
import hashlib
|
|
|
import numpy as np
|
|
|
from typing import Optional, Dict, Any, Tuple
|
|
|
from http_storage import LocalStorage
|
|
|
from config import get_db_url
|
|
|
|
|
|
try:
|
|
|
from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP
|
|
|
except ImportError:
|
|
|
TARGET_SWITCHES_PER_SEC = 9e20
|
|
|
TRANSISTORS_ON_CHIP = 6e11
|
|
|
|
|
|
class TensorCore:
|
|
|
"""
|
|
|
Pure virtual tensor core for matrix operations with zero CPU involvement.
|
|
|
All operations happen in virtual space at electron speed with local storage.
|
|
|
"""
|
|
|
def __init__(self, bits=2, memory_size=800*1024*1024*1024, bandwidth_tbps=10000, sm=None, storage=None):
|
|
|
from electron_speed import drift_velocity, TARGET_SWITCHES_PER_SEC
|
|
|
|
|
|
self.bits = bits
|
|
|
|
|
|
self.storage = storage
|
|
|
if self.storage is None:
|
|
|
self.storage = LocalStorage(db_url=get_db_url())
|
|
|
if not self.storage.wait_for_connection(timeout=30):
|
|
|
raise RuntimeError("Could not initialize remote storage connection")
|
|
|
|
|
|
|
|
|
self.virtual_memory_map: Dict[str, str] = {}
|
|
|
self.virtual_registers: Dict[str, np.ndarray] = {}
|
|
|
|
|
|
|
|
|
self.core_id = hashlib.md5(f"tensor_core_{time.time_ns()}".encode()).hexdigest()[:16]
|
|
|
|
|
|
|
|
|
self.drift_velocity = drift_velocity
|
|
|
self.switches_per_sec = TARGET_SWITCHES_PER_SEC
|
|
|
self.bandwidth_tbps = drift_velocity / 1e-12
|
|
|
self.sm = sm
|
|
|
|
|
|
|
|
|
self.virtual_ops_count = 0
|
|
|
self.electron_cycles = 0
|
|
|
|
|
|
|
|
|
self.core_id = f"tensor_core_{id(self)}"
|
|
|
|
|
|
def store_virtual_matrix(self, data: np.ndarray, virtual_addr: Optional[str] = None) -> str:
|
|
|
"""Store matrix data in remote storage with virtual addressing"""
|
|
|
if virtual_addr is None:
|
|
|
virtual_addr = f"vaddr_{hashlib.md5(str(time.time_ns()).encode()).hexdigest()[:12]}"
|
|
|
|
|
|
tensor_id = f"tensor_{virtual_addr}"
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
"shape": data.shape,
|
|
|
"dtype": str(data.dtype),
|
|
|
"timestamp": time.time_ns(),
|
|
|
"core_id": self.core_id,
|
|
|
"virtual_addr": virtual_addr
|
|
|
}
|
|
|
|
|
|
|
|
|
self.storage.store_tensor(
|
|
|
tensor_id,
|
|
|
data,
|
|
|
model_size=data.nbytes
|
|
|
)
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
"tensor_core_mapping",
|
|
|
virtual_addr,
|
|
|
{
|
|
|
"tensor_id": tensor_id,
|
|
|
"metadata": metadata,
|
|
|
"core_id": self.core_id,
|
|
|
"access_time": time.time_ns()
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
self.virtual_memory_map[virtual_addr] = tensor_id
|
|
|
|
|
|
return virtual_addr
|
|
|
|
|
|
def load_virtual_matrix(self, virtual_addr: str) -> Optional[np.ndarray]:
|
|
|
"""Load matrix data from remote storage using virtual address"""
|
|
|
|
|
|
if virtual_addr not in self.virtual_memory_map:
|
|
|
|
|
|
mapping = self.storage.conn.execute("""
|
|
|
SELECT data->>'tensor_id' as tensor_id
|
|
|
FROM states
|
|
|
WHERE name = 'tensor_core_mapping'
|
|
|
AND state_id = ?
|
|
|
""", [virtual_addr]).fetchone()
|
|
|
|
|
|
if not mapping:
|
|
|
return None
|
|
|
|
|
|
self.virtual_memory_map[virtual_addr] = mapping[0]
|
|
|
|
|
|
tensor_id = self.virtual_memory_map[virtual_addr]
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
"tensor_core_mapping",
|
|
|
virtual_addr,
|
|
|
{
|
|
|
"tensor_id": tensor_id,
|
|
|
"core_id": self.core_id,
|
|
|
"access_time": time.time_ns()
|
|
|
}
|
|
|
)
|
|
|
|
|
|
return self.storage.load_tensor(tensor_id)
|
|
|
|
|
|
def fetch_operand(self, source, addr, shape):
|
|
|
"""
|
|
|
Fetches a matrix operand from a given source (registers, shared, global).
|
|
|
Uses remote storage for global memory access with proper tracking.
|
|
|
"""
|
|
|
n, m = shape
|
|
|
start_time = time.time_ns()
|
|
|
|
|
|
if source == 'register':
|
|
|
|
|
|
matrix = self.virtual_registers.get(addr, np.zeros((n, m)))
|
|
|
latency = 1e-9
|
|
|
|
|
|
elif source == 'shared':
|
|
|
|
|
|
matrix = self.sm.shared_mem.read_matrix(addr, n, m)
|
|
|
latency = 10e-9
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
"tensor_core_access",
|
|
|
f"shared_{start_time}",
|
|
|
{
|
|
|
"core_id": self.core_id,
|
|
|
"source": "shared",
|
|
|
"addr": addr,
|
|
|
"shape": shape,
|
|
|
"access_time": start_time,
|
|
|
"sm_id": self.sm.sm_id if self.sm else None
|
|
|
}
|
|
|
)
|
|
|
|
|
|
elif source == 'global':
|
|
|
|
|
|
matrix = self.load_virtual_matrix(addr)
|
|
|
if matrix is None:
|
|
|
matrix = self.sm.global_mem.read_matrix(addr, n, m)
|
|
|
|
|
|
self.store_virtual_matrix(matrix, addr)
|
|
|
latency = 200e-9
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
"tensor_core_access",
|
|
|
f"global_{start_time}",
|
|
|
{
|
|
|
"core_id": self.core_id,
|
|
|
"source": "global",
|
|
|
"addr": addr,
|
|
|
"shape": shape,
|
|
|
"access_time": start_time,
|
|
|
"matrix_hash": hashlib.md5(matrix.tobytes()).hexdigest()[:16]
|
|
|
}
|
|
|
)
|
|
|
else:
|
|
|
raise ValueError(f"Unknown source: {source}")
|
|
|
|
|
|
|
|
|
data_size_bytes = n * m * (self.bits // 8)
|
|
|
transfer_time = data_size_bytes / (self.bandwidth_tbps * 1e12)
|
|
|
|
|
|
return matrix
|
|
|
|
|
|
def matmul(self, A, B):
|
|
|
"""Matrix multiplication using parallel tensor core processing"""
|
|
|
from parallel_array_distributor import ParallelArrayDistributor
|
|
|
|
|
|
|
|
|
A = np.array(A)
|
|
|
B = np.array(B)
|
|
|
|
|
|
|
|
|
distributor = ParallelArrayDistributor(
|
|
|
num_sms=self.sm.num_sms if self.sm else 108,
|
|
|
cores_per_sm=3000
|
|
|
)
|
|
|
|
|
|
|
|
|
def parallel_matmul_op(chunk: np.ndarray, sm_id: int, core_id: int) -> np.ndarray:
|
|
|
|
|
|
processing_time = chunk.size * (self.drift_velocity / self.switches_per_sec)
|
|
|
|
|
|
return chunk @ B
|
|
|
|
|
|
|
|
|
result = distributor.parallel_process(A, parallel_matmul_op)
|
|
|
|
|
|
|
|
|
self.electron_cycles += int(result.size * (self.drift_velocity / self.switches_per_sec))
|
|
|
|
|
|
return result
|
|
|
|
|
|
def matmul_from_memory(self, srcA, addrA, srcB, addrB, shapeA, shapeB):
|
|
|
"""
|
|
|
Fetches operands and performs parallel distributed matmul across all tensor cores.
|
|
|
srcA/srcB: 'register', 'shared', or 'global'
|
|
|
addrA/addrB: tensor_ids or virtual addresses
|
|
|
shapeA/shapeB: (n, p), (p, m)
|
|
|
"""
|
|
|
from parallel_array_distributor import ParallelArrayDistributor
|
|
|
|
|
|
|
|
|
A = self.storage.load_tensor(addrA) if srcA == 'global' else self.fetch_operand(srcA, addrA, shapeA)
|
|
|
B = self.storage.load_tensor(addrB) if srcB == 'global' else self.fetch_operand(srcB, addrB, shapeB)
|
|
|
|
|
|
if A is None or B is None:
|
|
|
raise ValueError("Could not load input tensors")
|
|
|
|
|
|
|
|
|
distributor = ParallelArrayDistributor(
|
|
|
num_sms=self.sm.num_sms if self.sm else 108,
|
|
|
cores_per_sm=3000
|
|
|
)
|
|
|
|
|
|
|
|
|
def parallel_memory_matmul(chunk: np.ndarray, sm_id: int, core_id: int) -> np.ndarray:
|
|
|
|
|
|
mem_latency = 0
|
|
|
if srcA == 'global' or srcB == 'global':
|
|
|
mem_latency = 200e-9
|
|
|
elif srcA == 'shared' or srcB == 'shared':
|
|
|
mem_latency = 10e-9
|
|
|
else:
|
|
|
mem_latency = 1e-9
|
|
|
|
|
|
|
|
|
chunk_size_bytes = chunk.nbytes + B.nbytes
|
|
|
transfer_time = chunk_size_bytes / (self.bandwidth_tbps * 1e12)
|
|
|
processing_time = chunk.size * (self.drift_velocity / self.switches_per_sec)
|
|
|
|
|
|
|
|
|
result = chunk @ B
|
|
|
|
|
|
|
|
|
self.virtual_ops_count += chunk.size
|
|
|
return result
|
|
|
|
|
|
|
|
|
result = distributor.parallel_process(A, parallel_memory_matmul)
|
|
|
|
|
|
|
|
|
result_id = f"matmul_result_{time.time_ns()}"
|
|
|
self.storage.store_tensor(result_id, result, metadata={
|
|
|
"operation": "parallel_matmul",
|
|
|
"num_sms_used": distributor.num_sms,
|
|
|
"cores_per_sm": distributor.cores_per_sm,
|
|
|
"total_cores": distributor.total_cores,
|
|
|
"electron_cycles": self.electron_cycles
|
|
|
})
|
|
|
|
|
|
return result
|
|
|
|
|
|
def load_matrix(self, matrix, row_offset=0, col_offset=0):
|
|
|
|
|
|
for i, row in enumerate(matrix):
|
|
|
for j, val in enumerate(row):
|
|
|
self.memory[(row_offset+i, col_offset+j)] = val
|
|
|
|
|
|
def read_matrix(self, n, m, row_offset=0, col_offset=0):
|
|
|
|
|
|
return [
|
|
|
[self.memory.get((row_offset+i, col_offset+j), 0.0) for j in range(m)]
|
|
|
for i in range(n)
|
|
|
]
|
|
|
|
|
|
class TensorCoreArray:
|
|
|
"""
|
|
|
Pure virtual tensor core array operating at electron speed with zero CPU usage.
|
|
|
All operations happen in virtual space using local storage for zero host memory usage.
|
|
|
"""
|
|
|
def __init__(self, num_tensor_cores=8000, bits=2, memory_size=800*1024*1024*1024, bandwidth_tbps=10000, sm=None):
|
|
|
from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon
|
|
|
|
|
|
|
|
|
shared_storage = LocalStorage(db_url=get_db_url())
|
|
|
if not shared_storage.wait_for_connection(timeout=30):
|
|
|
raise RuntimeError("Could not initialize remote storage connection")
|
|
|
|
|
|
|
|
|
self.tensor_cores = [TensorCore(bits=bits, memory_size=memory_size, bandwidth_tbps=bandwidth_tbps, sm=sm, storage=shared_storage)
|
|
|
for _ in range(num_tensor_cores)]
|
|
|
|
|
|
|
|
|
self.storage = shared_storage
|
|
|
|
|
|
|
|
|
self.virtual_tensor_map = {}
|
|
|
self.virtual_execution_units = []
|
|
|
|
|
|
|
|
|
self.array_id = hashlib.md5(f"tensor_array_{time.time_ns()}".encode()).hexdigest()[:16]
|
|
|
self.storage.store_state(
|
|
|
"tensor_array_init",
|
|
|
self.array_id,
|
|
|
{
|
|
|
"num_cores": num_tensor_cores,
|
|
|
"bits": bits,
|
|
|
"memory_size": memory_size,
|
|
|
"bandwidth_tbps": bandwidth_tbps,
|
|
|
"creation_time": time.time_ns(),
|
|
|
"core_ids": [core.core_id for core in self.tensor_cores]
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
self.drift_velocity = drift_velocity
|
|
|
self.target_switches = TARGET_SWITCHES_PER_SEC
|
|
|
self.transistors = TRANSISTORS_ON_CHIP
|
|
|
self.light_speed_si = speed_of_light_silicon
|
|
|
|
|
|
|
|
|
self.virtual_dispatch_ptr = 0
|
|
|
self.sm = sm
|
|
|
|
|
|
|
|
|
self.drift_velocity = drift_velocity
|
|
|
self.photon_speed = speed_of_light_silicon
|
|
|
self.electron_photon_ratio = drift_velocity / speed_of_light_silicon
|
|
|
|
|
|
|
|
|
transistors_per_core = TRANSISTORS_ON_CHIP // num_tensor_cores
|
|
|
self.ops_per_cycle = 1024 * (drift_velocity / 1e9)
|
|
|
self.switches_per_sec = TARGET_SWITCHES_PER_SEC / num_tensor_cores
|
|
|
self.clock_ghz = (self.switches_per_sec / transistors_per_core) / 1e9
|
|
|
|
|
|
|
|
|
self.pflops = (num_tensor_cores * self.ops_per_cycle * self.clock_ghz) / 1e6
|
|
|
|
|
|
|
|
|
self.parallel_enabled = True
|
|
|
self.quantum_corrected = True
|
|
|
|
|
|
|
|
|
self.storage.store_state(
|
|
|
f"tensor_array_{id(self)}",
|
|
|
"config",
|
|
|
{
|
|
|
"num_cores": num_tensor_cores,
|
|
|
"bits": bits,
|
|
|
"memory_size": memory_size,
|
|
|
"bandwidth_tbps": bandwidth_tbps,
|
|
|
"pflops": self.pflops,
|
|
|
"clock_ghz": self.clock_ghz
|
|
|
}
|
|
|
)
|
|
|
|
|
|
def schedule(self):
|
|
|
"""Schedule tensor core with local storage state tracking"""
|
|
|
tc = self.tensor_cores[self.schedule_ptr]
|
|
|
self.schedule_ptr = (self.schedule_ptr + 1) % len(self.tensor_cores)
|
|
|
|
|
|
|
|
|
state = {
|
|
|
"core_index": self.schedule_ptr,
|
|
|
"timestamp": time.time_ns(),
|
|
|
"active_tensors": list(self.virtual_tensor_map.keys())
|
|
|
}
|
|
|
self.storage.store_state("scheduler", f"schedule_{time.time_ns()}", state)
|
|
|
|
|
|
return tc
|
|
|
|
|
|
def get_tensor(self, tensor_id: str) -> Optional[np.ndarray]:
|
|
|
"""Get tensor data from local storage"""
|
|
|
return self.storage.load_tensor(tensor_id)
|
|
|
|
|
|
def update_tensor(self, tensor_id: str, data: np.ndarray):
|
|
|
"""Update tensor data in local storage"""
|
|
|
self.storage.store_tensor(tensor_id, data)
|
|
|
|
|
|
|
|
|
if tensor_id in self.virtual_tensor_map:
|
|
|
metadata = self.virtual_tensor_map[tensor_id]
|
|
|
metadata["last_updated"] = time.time_ns()
|
|
|
self.storage.store_state("tensor_metadata", tensor_id, metadata)
|
|
|
|
|
|
def allocate_virtual_tensor(self, shape, name, direct_load=True):
|
|
|
"""Allocate tensor directly in virtual space using local storage."""
|
|
|
tensor_id = f"virtual_tensor_{len(self.virtual_tensor_map)}_{time.time_ns()}"
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
"shape": shape,
|
|
|
"name": name,
|
|
|
"created_at": time.time_ns(),
|
|
|
"tensor_id": tensor_id
|
|
|
}
|
|
|
|
|
|
|
|
|
self.storage.store_state("tensor_metadata", tensor_id, metadata)
|
|
|
|
|
|
|
|
|
if direct_load:
|
|
|
zeros = np.zeros(shape)
|
|
|
self.storage.store_tensor(tensor_id, zeros)
|
|
|
|
|
|
self.virtual_tensor_map[tensor_id] = metadata
|
|
|
return tensor_id
|
|
|
|
|
|
def map_input_direct(self, data: np.ndarray, skip_host=True):
|
|
|
"""Map input directly to local storage without CPU copying."""
|
|
|
tensor_id = f"input_tensor_{time.time_ns()}"
|
|
|
|
|
|
if skip_host:
|
|
|
|
|
|
self.storage.store_tensor(tensor_id, np.zeros_like(data))
|
|
|
else:
|
|
|
|
|
|
self.storage.store_tensor(tensor_id, data)
|
|
|
|
|
|
metadata = {
|
|
|
"shape": data.shape,
|
|
|
"name": "input",
|
|
|
"created_at": time.time_ns(),
|
|
|
"tensor_id": tensor_id
|
|
|
}
|
|
|
|
|
|
self.storage.store_state("tensor_metadata", tensor_id, metadata)
|
|
|
self.virtual_tensor_map[tensor_id] = metadata
|
|
|
|
|
|
return tensor_id
|
|
|
|
|
|
def preprocess_input(self, input_id, architecture_id):
|
|
|
"""Execute preprocessing directly on tensor cores."""
|
|
|
virtual_data = self.virtual_memory_pool[input_id]
|
|
|
preprocessed = self.execute_virtual_preprocess(virtual_data, architecture_id)
|
|
|
return self.store_virtual_result(preprocessed)
|
|
|
|
|
|
def prepare_batch(self, tensor_id, num_units, direct_virtual=True):
|
|
|
"""Prepare batches in virtual memory without materializing."""
|
|
|
return self.create_virtual_batch(tensor_id, num_units)
|
|
|
|
|
|
def matmul(self, A, B, split_size=None):
|
|
|
"""
|
|
|
Pure virtual matrix multiplication at electron speed.
|
|
|
Zero CPU usage - all operations in virtual space.
|
|
|
"""
|
|
|
n = len(A)
|
|
|
m = len(B[0])
|
|
|
p = len(B)
|
|
|
|
|
|
|
|
|
quantum_units = int(self.switches_per_sec * self.electron_photon_ratio)
|
|
|
|
|
|
|
|
|
total_elements = n * m
|
|
|
elements_per_core = max(1, total_elements // len(self.tensor_cores))
|
|
|
|
|
|
|
|
|
result = [[0.0 for _ in range(m)] for _ in range(n)]
|
|
|
|
|
|
|
|
|
electron_chunks = []
|
|
|
for i in range(0, total_elements, elements_per_core):
|
|
|
row = i // m
|
|
|
col = i % m
|
|
|
chunk_size = min(elements_per_core, total_elements - i)
|
|
|
electron_chunks.append((row, col, chunk_size))
|
|
|
|
|
|
|
|
|
for core_idx, chunk in enumerate(electron_chunks):
|
|
|
start_row, start_col, size = chunk
|
|
|
tc = self.tensor_cores[core_idx % len(self.tensor_cores)]
|
|
|
|
|
|
|
|
|
current_row = start_row
|
|
|
current_col = start_col
|
|
|
|
|
|
|
|
|
for i in range(size):
|
|
|
if current_col >= m:
|
|
|
current_row += 1
|
|
|
current_col = 0
|
|
|
if current_row >= n:
|
|
|
break
|
|
|
|
|
|
|
|
|
acc = 0.0
|
|
|
for k in range(p):
|
|
|
|
|
|
transit_delay = 1 / (self.drift_velocity * quantum_units)
|
|
|
acc += A[current_row][k] * B[k][current_col]
|
|
|
|
|
|
result[current_row][current_col] = acc
|
|
|
current_col += 1
|
|
|
|
|
|
|
|
|
total_ops = n * m * p * 2
|
|
|
electron_transit_time = 1 / self.switches_per_sec
|
|
|
total_transit_time = electron_transit_time * total_ops / len(self.tensor_cores)
|
|
|
effective_pflops = (total_ops / total_transit_time) / 1e15
|
|
|
|
|
|
print(f"[TensorCoreArray] Electron-speed parallel matmul using {len(self.tensor_cores)} cores")
|
|
|
print(f"Electron drift velocity: {self.drift_velocity:.2e} m/s ({self.electron_photon_ratio*100:.1f}% c in Si)")
|
|
|
print(f"Effective performance: {effective_pflops:.1f} PFLOPS")
|
|
|
print(f"Transit time per op: {electron_transit_time*1e12:.1f} ps")
|
|
|
|
|
|
return result
|
|
|
|
|
|
def matmul_from_memory(self, srcA, addrA, srcB, addrB, shapeA, shapeB):
|
|
|
tc = self.schedule()
|
|
|
n, p = shapeA
|
|
|
p2, m = shapeB
|
|
|
total_ops = n * m * p * 2
|
|
|
seconds = total_ops / (self.pflops * 1e15)
|
|
|
print(f"[TensorCoreArray] Matmul from memory on {len(self.tensor_cores)} tensor cores @ {self.pflops:.1f} PFLOPS, ops={total_ops}, time={seconds:.9f}s")
|
|
|
|
|
|
return tc.matmul_from_memory(srcA, addrA, srcB, addrB, shapeA, shapeB)
|
|
|
|
|
|
def load_matrix(self, matrix, core_idx=0, row_offset=0, col_offset=0):
|
|
|
self.tensor_cores[core_idx].load_matrix(matrix, row_offset, col_offset)
|
|
|
|
|
|
def read_matrix(self, n, m, core_idx=0, row_offset=0, col_offset=0):
|
|
|
return self.tensor_cores[core_idx].read_matrix(n, m, row_offset, col_offset)
|
|
|
|