""" Handles parallel distribution of numpy arrays across tensor cores and SMs. Uses direct hardware simulation at electron speed without Python threading limitations. """ import numpy as np from typing import List, Tuple, Optional, Dict, Any import threading import time import logging from http_storage import LocalStorage from config import get_db_url # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class ParallelArrayDistributor: def __init__(self, num_sms: int, cores_per_sm: int): self.num_sms = num_sms self.cores_per_sm = cores_per_sm self.total_cores = num_sms * cores_per_sm # Initialize storage with retries max_retries = 3 retry_delay = 1.0 # seconds for attempt in range(max_retries): try: self.storage = LocalStorage(db_url=get_db_url()) if not self.storage.wait_for_connection(timeout=10): raise RuntimeError("Storage connection timeout") logging.info("Successfully connected to storage backend") break except Exception as e: if attempt == max_retries - 1: raise RuntimeError(f"Failed to initialize storage after {max_retries} attempts: {str(e)}") logging.warning(f"Storage initialization attempt {attempt + 1} failed: {str(e)}") time.sleep(retry_delay) def split_array(self, arr: np.ndarray) -> List[np.ndarray]: """Split array into chunks for parallel processing""" if arr.ndim == 1: return np.array_split(arr, self.total_cores) elif arr.ndim == 2: # For 2D arrays, split both dimensions optimally rows, cols = arr.shape splits_per_dim = int(np.sqrt(self.total_cores)) row_chunks = np.array_split(arr, splits_per_dim, axis=0) return [np.array_split(chunk, splits_per_dim, axis=1) for chunk in row_chunks] else: # For higher dimensions, split along first dimension return np.array_split(arr, self.total_cores, axis=0) def distribute_to_cores(self, chunks: List[np.ndarray]) -> Dict[Tuple[int, int], np.ndarray]: """Distribute chunks to specific SM and core combinations""" distribution = {} chunk_idx = 0 for sm_id in range(self.num_sms): for core_id in range(self.cores_per_sm): if chunk_idx < len(chunks): distribution[(sm_id, core_id)] = chunks[chunk_idx] chunk_idx += 1 return distribution def get_processing_status(self, array_id: int) -> Dict[str, Any]: """Get status of parallel processing for a specific array""" try: # Query storage for chunks and results related to this array chunks = self.storage.query_tensors( metadata_filter={'array_id': array_id} ) total_chunks = len(chunks) if chunks else 0 processed_chunks = len([c for c in chunks if c.get('metadata', {}).get('processed', False)]) if chunks else 0 return { 'array_id': array_id, 'total_chunks': total_chunks, 'processed_chunks': processed_chunks, 'completion_percentage': (processed_chunks / total_chunks * 100) if total_chunks > 0 else 0, 'timestamp': time.time() } except Exception as e: return { 'array_id': array_id, 'error': str(e), 'timestamp': time.time() } def parallel_process(self, arr: np.ndarray, operation_func) -> np.ndarray: """Process array in parallel across all cores at electron speed""" # Split array chunks = self.split_array(arr) # Distribute chunks distribution = self.distribute_to_cores(chunks) # Process in parallel using direct hardware simulation results = {} chunk_ids = {} # Store all chunks first with metadata for (sm_id, core_id), chunk in distribution.items(): # Generate unique ID and store chunk chunk_id = f"chunk_{sm_id}_{core_id}_{time.time_ns()}" chunk_ids[(sm_id, core_id)] = chunk_id # Store with metadata about the distribution metadata = { 'sm_id': sm_id, 'core_id': core_id, 'shape': chunk.shape, 'dtype': str(chunk.dtype), 'array_id': id(arr), # Track which array this chunk belongs to 'total_cores': self.total_cores, 'timestamp': time.time() } self.storage.store_tensor(chunk_id, chunk, metadata) # Process chunks in parallel using distributed storage for (sm_id, core_id), chunk_id in chunk_ids.items(): # Load chunk from storage to simulate distributed processing chunk_data = self.storage.load_tensor(chunk_id) if chunk_data is not None: chunk, _ = chunk_data # Process at electron speed (simulated hardware execution) result = operation_func(chunk, sm_id, core_id) results[(sm_id, core_id)] = result # Store result back to storage result_id = f"result_{chunk_id}" self.storage.store_tensor(result_id, result, { 'parent_chunk': chunk_id, 'sm_id': sm_id, 'core_id': core_id, 'timestamp': time.time() }) # Combine results in order ordered_results = [] for sm_id in range(self.num_sms): for core_id in range(self.cores_per_sm): if (sm_id, core_id) in results: ordered_results.append(results[(sm_id, core_id)]) # Reshape and combine based on dimensions if arr.ndim == 1: return np.concatenate(ordered_results) elif arr.ndim == 2: rows = [] splits_per_dim = int(np.sqrt(self.total_cores)) for i in range(0, len(ordered_results), splits_per_dim): rows.append(np.concatenate(ordered_results[i:i+splits_per_dim], axis=1)) return np.concatenate(rows, axis=0) else: return np.concatenate(ordered_results, axis=0)