|
|
"""
|
|
|
Handles parallel distribution of numpy arrays across tensor cores and SMs.
|
|
|
Uses direct hardware simulation at electron speed without Python threading limitations.
|
|
|
"""
|
|
|
import numpy as np
|
|
|
from typing import List, Tuple, Optional, Dict, Any
|
|
|
import threading
|
|
|
import time
|
|
|
import logging
|
|
|
from http_storage import LocalStorage
|
|
|
from config import get_db_url
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
|
)
|
|
|
|
|
|
class ParallelArrayDistributor:
|
|
|
def __init__(self, num_sms: int, cores_per_sm: int):
|
|
|
self.num_sms = num_sms
|
|
|
self.cores_per_sm = cores_per_sm
|
|
|
self.total_cores = num_sms * cores_per_sm
|
|
|
|
|
|
|
|
|
max_retries = 3
|
|
|
retry_delay = 1.0
|
|
|
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
self.storage = LocalStorage(db_url=get_db_url())
|
|
|
if not self.storage.wait_for_connection(timeout=10):
|
|
|
raise RuntimeError("Storage connection timeout")
|
|
|
logging.info("Successfully connected to storage backend")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
if attempt == max_retries - 1:
|
|
|
raise RuntimeError(f"Failed to initialize storage after {max_retries} attempts: {str(e)}")
|
|
|
logging.warning(f"Storage initialization attempt {attempt + 1} failed: {str(e)}")
|
|
|
time.sleep(retry_delay)
|
|
|
|
|
|
def split_array(self, arr: np.ndarray) -> List[np.ndarray]:
|
|
|
"""Split array into chunks for parallel processing"""
|
|
|
if arr.ndim == 1:
|
|
|
return np.array_split(arr, self.total_cores)
|
|
|
elif arr.ndim == 2:
|
|
|
|
|
|
rows, cols = arr.shape
|
|
|
splits_per_dim = int(np.sqrt(self.total_cores))
|
|
|
row_chunks = np.array_split(arr, splits_per_dim, axis=0)
|
|
|
return [np.array_split(chunk, splits_per_dim, axis=1) for chunk in row_chunks]
|
|
|
else:
|
|
|
|
|
|
return np.array_split(arr, self.total_cores, axis=0)
|
|
|
|
|
|
def distribute_to_cores(self, chunks: List[np.ndarray]) -> Dict[Tuple[int, int], np.ndarray]:
|
|
|
"""Distribute chunks to specific SM and core combinations"""
|
|
|
distribution = {}
|
|
|
chunk_idx = 0
|
|
|
for sm_id in range(self.num_sms):
|
|
|
for core_id in range(self.cores_per_sm):
|
|
|
if chunk_idx < len(chunks):
|
|
|
distribution[(sm_id, core_id)] = chunks[chunk_idx]
|
|
|
chunk_idx += 1
|
|
|
return distribution
|
|
|
|
|
|
def get_processing_status(self, array_id: int) -> Dict[str, Any]:
|
|
|
"""Get status of parallel processing for a specific array"""
|
|
|
try:
|
|
|
|
|
|
chunks = self.storage.query_tensors(
|
|
|
metadata_filter={'array_id': array_id}
|
|
|
)
|
|
|
|
|
|
total_chunks = len(chunks) if chunks else 0
|
|
|
processed_chunks = len([c for c in chunks if c.get('metadata', {}).get('processed', False)]) if chunks else 0
|
|
|
|
|
|
return {
|
|
|
'array_id': array_id,
|
|
|
'total_chunks': total_chunks,
|
|
|
'processed_chunks': processed_chunks,
|
|
|
'completion_percentage': (processed_chunks / total_chunks * 100) if total_chunks > 0 else 0,
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'array_id': array_id,
|
|
|
'error': str(e),
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
|
|
|
def parallel_process(self, arr: np.ndarray, operation_func) -> np.ndarray:
|
|
|
"""Process array in parallel across all cores at electron speed"""
|
|
|
|
|
|
chunks = self.split_array(arr)
|
|
|
|
|
|
|
|
|
distribution = self.distribute_to_cores(chunks)
|
|
|
|
|
|
|
|
|
results = {}
|
|
|
chunk_ids = {}
|
|
|
|
|
|
|
|
|
for (sm_id, core_id), chunk in distribution.items():
|
|
|
|
|
|
chunk_id = f"chunk_{sm_id}_{core_id}_{time.time_ns()}"
|
|
|
chunk_ids[(sm_id, core_id)] = chunk_id
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
'sm_id': sm_id,
|
|
|
'core_id': core_id,
|
|
|
'shape': chunk.shape,
|
|
|
'dtype': str(chunk.dtype),
|
|
|
'array_id': id(arr),
|
|
|
'total_cores': self.total_cores,
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
self.storage.store_tensor(chunk_id, chunk, metadata)
|
|
|
|
|
|
|
|
|
for (sm_id, core_id), chunk_id in chunk_ids.items():
|
|
|
|
|
|
chunk_data = self.storage.load_tensor(chunk_id)
|
|
|
if chunk_data is not None:
|
|
|
chunk, _ = chunk_data
|
|
|
|
|
|
result = operation_func(chunk, sm_id, core_id)
|
|
|
results[(sm_id, core_id)] = result
|
|
|
|
|
|
|
|
|
result_id = f"result_{chunk_id}"
|
|
|
self.storage.store_tensor(result_id, result, {
|
|
|
'parent_chunk': chunk_id,
|
|
|
'sm_id': sm_id,
|
|
|
'core_id': core_id,
|
|
|
'timestamp': time.time()
|
|
|
})
|
|
|
|
|
|
|
|
|
ordered_results = []
|
|
|
for sm_id in range(self.num_sms):
|
|
|
for core_id in range(self.cores_per_sm):
|
|
|
if (sm_id, core_id) in results:
|
|
|
ordered_results.append(results[(sm_id, core_id)])
|
|
|
|
|
|
|
|
|
if arr.ndim == 1:
|
|
|
return np.concatenate(ordered_results)
|
|
|
elif arr.ndim == 2:
|
|
|
rows = []
|
|
|
splits_per_dim = int(np.sqrt(self.total_cores))
|
|
|
for i in range(0, len(ordered_results), splits_per_dim):
|
|
|
rows.append(np.concatenate(ordered_results[i:i+splits_per_dim], axis=1))
|
|
|
return np.concatenate(rows, axis=0)
|
|
|
else:
|
|
|
return np.concatenate(ordered_results, axis=0)
|
|
|
|