File size: 6,833 Bytes
7a0c684
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""

Handles parallel distribution of numpy arrays across tensor cores and SMs.

Uses direct hardware simulation at electron speed without Python threading limitations.

"""
import numpy as np
from typing import List, Tuple, Optional, Dict, Any
import threading
import time
import logging
from http_storage import LocalStorage
from config import get_db_url

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class ParallelArrayDistributor:
    def __init__(self, num_sms: int, cores_per_sm: int):
        self.num_sms = num_sms
        self.cores_per_sm = cores_per_sm
        self.total_cores = num_sms * cores_per_sm
        
        # Initialize storage with retries
        max_retries = 3
        retry_delay = 1.0  # seconds
        
        for attempt in range(max_retries):
            try:
                self.storage = LocalStorage(db_url=get_db_url())
                if not self.storage.wait_for_connection(timeout=10):
                    raise RuntimeError("Storage connection timeout")
                logging.info("Successfully connected to storage backend")
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    raise RuntimeError(f"Failed to initialize storage after {max_retries} attempts: {str(e)}")
                logging.warning(f"Storage initialization attempt {attempt + 1} failed: {str(e)}")
                time.sleep(retry_delay)
        
    def split_array(self, arr: np.ndarray) -> List[np.ndarray]:
        """Split array into chunks for parallel processing"""
        if arr.ndim == 1:
            return np.array_split(arr, self.total_cores)
        elif arr.ndim == 2:
            # For 2D arrays, split both dimensions optimally
            rows, cols = arr.shape
            splits_per_dim = int(np.sqrt(self.total_cores))
            row_chunks = np.array_split(arr, splits_per_dim, axis=0)
            return [np.array_split(chunk, splits_per_dim, axis=1) for chunk in row_chunks]
        else:
            # For higher dimensions, split along first dimension
            return np.array_split(arr, self.total_cores, axis=0)
            
    def distribute_to_cores(self, chunks: List[np.ndarray]) -> Dict[Tuple[int, int], np.ndarray]:
        """Distribute chunks to specific SM and core combinations"""
        distribution = {}
        chunk_idx = 0
        for sm_id in range(self.num_sms):
            for core_id in range(self.cores_per_sm):
                if chunk_idx < len(chunks):
                    distribution[(sm_id, core_id)] = chunks[chunk_idx]
                    chunk_idx += 1
        return distribution
        
    def get_processing_status(self, array_id: int) -> Dict[str, Any]:
        """Get status of parallel processing for a specific array"""
        try:
            # Query storage for chunks and results related to this array
            chunks = self.storage.query_tensors(
                metadata_filter={'array_id': array_id}
            )
            
            total_chunks = len(chunks) if chunks else 0
            processed_chunks = len([c for c in chunks if c.get('metadata', {}).get('processed', False)]) if chunks else 0
            
            return {
                'array_id': array_id,
                'total_chunks': total_chunks,
                'processed_chunks': processed_chunks,
                'completion_percentage': (processed_chunks / total_chunks * 100) if total_chunks > 0 else 0,
                'timestamp': time.time()
            }
        except Exception as e:
            return {
                'array_id': array_id,
                'error': str(e),
                'timestamp': time.time()
            }

    def parallel_process(self, arr: np.ndarray, operation_func) -> np.ndarray:
        """Process array in parallel across all cores at electron speed"""
        # Split array
        chunks = self.split_array(arr)
        
        # Distribute chunks
        distribution = self.distribute_to_cores(chunks)
        
        # Process in parallel using direct hardware simulation
        results = {}
        chunk_ids = {}
        
        # Store all chunks first with metadata
        for (sm_id, core_id), chunk in distribution.items():
            # Generate unique ID and store chunk
            chunk_id = f"chunk_{sm_id}_{core_id}_{time.time_ns()}"
            chunk_ids[(sm_id, core_id)] = chunk_id
            
            # Store with metadata about the distribution
            metadata = {
                'sm_id': sm_id,
                'core_id': core_id,
                'shape': chunk.shape,
                'dtype': str(chunk.dtype),
                'array_id': id(arr),  # Track which array this chunk belongs to
                'total_cores': self.total_cores,
                'timestamp': time.time()
            }
            self.storage.store_tensor(chunk_id, chunk, metadata)
        
        # Process chunks in parallel using distributed storage
        for (sm_id, core_id), chunk_id in chunk_ids.items():
            # Load chunk from storage to simulate distributed processing
            chunk_data = self.storage.load_tensor(chunk_id)
            if chunk_data is not None:
                chunk, _ = chunk_data
                # Process at electron speed (simulated hardware execution)
                result = operation_func(chunk, sm_id, core_id)
                results[(sm_id, core_id)] = result
                
                # Store result back to storage
                result_id = f"result_{chunk_id}"
                self.storage.store_tensor(result_id, result, {
                    'parent_chunk': chunk_id,
                    'sm_id': sm_id,
                    'core_id': core_id,
                    'timestamp': time.time()
                })
            
        # Combine results in order
        ordered_results = []
        for sm_id in range(self.num_sms):
            for core_id in range(self.cores_per_sm):
                if (sm_id, core_id) in results:
                    ordered_results.append(results[(sm_id, core_id)])
        
        # Reshape and combine based on dimensions
        if arr.ndim == 1:
            return np.concatenate(ordered_results)
        elif arr.ndim == 2:
            rows = []
            splits_per_dim = int(np.sqrt(self.total_cores))
            for i in range(0, len(ordered_results), splits_per_dim):
                rows.append(np.concatenate(ordered_results[i:i+splits_per_dim], axis=1))
            return np.concatenate(rows, axis=0)
        else:
            return np.concatenate(ordered_results, axis=0)