INV / parallel_array_distributor.py
Fred808's picture
Upload 256 files
7a0c684 verified
"""
Handles parallel distribution of numpy arrays across tensor cores and SMs.
Uses direct hardware simulation at electron speed without Python threading limitations.
"""
import numpy as np
from typing import List, Tuple, Optional, Dict, Any
import threading
import time
import logging
from http_storage import LocalStorage
from config import get_db_url
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class ParallelArrayDistributor:
def __init__(self, num_sms: int, cores_per_sm: int):
self.num_sms = num_sms
self.cores_per_sm = cores_per_sm
self.total_cores = num_sms * cores_per_sm
# Initialize storage with retries
max_retries = 3
retry_delay = 1.0 # seconds
for attempt in range(max_retries):
try:
self.storage = LocalStorage(db_url=get_db_url())
if not self.storage.wait_for_connection(timeout=10):
raise RuntimeError("Storage connection timeout")
logging.info("Successfully connected to storage backend")
break
except Exception as e:
if attempt == max_retries - 1:
raise RuntimeError(f"Failed to initialize storage after {max_retries} attempts: {str(e)}")
logging.warning(f"Storage initialization attempt {attempt + 1} failed: {str(e)}")
time.sleep(retry_delay)
def split_array(self, arr: np.ndarray) -> List[np.ndarray]:
"""Split array into chunks for parallel processing"""
if arr.ndim == 1:
return np.array_split(arr, self.total_cores)
elif arr.ndim == 2:
# For 2D arrays, split both dimensions optimally
rows, cols = arr.shape
splits_per_dim = int(np.sqrt(self.total_cores))
row_chunks = np.array_split(arr, splits_per_dim, axis=0)
return [np.array_split(chunk, splits_per_dim, axis=1) for chunk in row_chunks]
else:
# For higher dimensions, split along first dimension
return np.array_split(arr, self.total_cores, axis=0)
def distribute_to_cores(self, chunks: List[np.ndarray]) -> Dict[Tuple[int, int], np.ndarray]:
"""Distribute chunks to specific SM and core combinations"""
distribution = {}
chunk_idx = 0
for sm_id in range(self.num_sms):
for core_id in range(self.cores_per_sm):
if chunk_idx < len(chunks):
distribution[(sm_id, core_id)] = chunks[chunk_idx]
chunk_idx += 1
return distribution
def get_processing_status(self, array_id: int) -> Dict[str, Any]:
"""Get status of parallel processing for a specific array"""
try:
# Query storage for chunks and results related to this array
chunks = self.storage.query_tensors(
metadata_filter={'array_id': array_id}
)
total_chunks = len(chunks) if chunks else 0
processed_chunks = len([c for c in chunks if c.get('metadata', {}).get('processed', False)]) if chunks else 0
return {
'array_id': array_id,
'total_chunks': total_chunks,
'processed_chunks': processed_chunks,
'completion_percentage': (processed_chunks / total_chunks * 100) if total_chunks > 0 else 0,
'timestamp': time.time()
}
except Exception as e:
return {
'array_id': array_id,
'error': str(e),
'timestamp': time.time()
}
def parallel_process(self, arr: np.ndarray, operation_func) -> np.ndarray:
"""Process array in parallel across all cores at electron speed"""
# Split array
chunks = self.split_array(arr)
# Distribute chunks
distribution = self.distribute_to_cores(chunks)
# Process in parallel using direct hardware simulation
results = {}
chunk_ids = {}
# Store all chunks first with metadata
for (sm_id, core_id), chunk in distribution.items():
# Generate unique ID and store chunk
chunk_id = f"chunk_{sm_id}_{core_id}_{time.time_ns()}"
chunk_ids[(sm_id, core_id)] = chunk_id
# Store with metadata about the distribution
metadata = {
'sm_id': sm_id,
'core_id': core_id,
'shape': chunk.shape,
'dtype': str(chunk.dtype),
'array_id': id(arr), # Track which array this chunk belongs to
'total_cores': self.total_cores,
'timestamp': time.time()
}
self.storage.store_tensor(chunk_id, chunk, metadata)
# Process chunks in parallel using distributed storage
for (sm_id, core_id), chunk_id in chunk_ids.items():
# Load chunk from storage to simulate distributed processing
chunk_data = self.storage.load_tensor(chunk_id)
if chunk_data is not None:
chunk, _ = chunk_data
# Process at electron speed (simulated hardware execution)
result = operation_func(chunk, sm_id, core_id)
results[(sm_id, core_id)] = result
# Store result back to storage
result_id = f"result_{chunk_id}"
self.storage.store_tensor(result_id, result, {
'parent_chunk': chunk_id,
'sm_id': sm_id,
'core_id': core_id,
'timestamp': time.time()
})
# Combine results in order
ordered_results = []
for sm_id in range(self.num_sms):
for core_id in range(self.cores_per_sm):
if (sm_id, core_id) in results:
ordered_results.append(results[(sm_id, core_id)])
# Reshape and combine based on dimensions
if arr.ndim == 1:
return np.concatenate(ordered_results)
elif arr.ndim == 2:
rows = []
splits_per_dim = int(np.sqrt(self.total_cores))
for i in range(0, len(ordered_results), splits_per_dim):
rows.append(np.concatenate(ordered_results[i:i+splits_per_dim], axis=1))
return np.concatenate(rows, axis=0)
else:
return np.concatenate(ordered_results, axis=0)