File size: 20,046 Bytes
e9bc512
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
"""
Test AI integration with HTTP-based storage for Florence model inference.
All operations are performed through HTTP storage with direct tensor core access.
"""
import asyncio
from gpu_arch import Chip
from ai_http import AIAcceleratorHTTP
from virtual_vram import VirtualVRAM
from PIL import Image
import numpy as np
from http_storage import HTTPGPUStorage
import time
import os
import platform
import contextlib
import atexit
import logging
import torch

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Increase system file descriptor limit
def increase_file_limit():
    try:
        soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
        resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
        print(f"Increased file descriptor limit from {soft} to {hard}")
    except Exception as e:
        print(f"Warning: Could not increase file descriptor limit: {e}")

# HTTP connection manager with retry
@contextlib.contextmanager
def http_manager(max_retries=5, retry_delay=2):
    storage = None
    last_error = None
    
    def try_connect():
        nonlocal storage
        if storage:
            try:
                storage.close()
            except:
                pass
        storage = HTTPGPUStorage()
        return storage.connect()
    
    # Initial connection attempts
    for attempt in range(max_retries):
        try:
            if try_connect():
                logging.info("Successfully connected to HTTP GPU storage server")
                break
            else:
                logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...")
                time.sleep(retry_delay)
        except Exception as e:
            last_error = str(e)
            logging.error(f"Connection attempt {attempt + 1} failed with error: {e}")
            time.sleep(retry_delay)
            
        if attempt == max_retries - 1:
            error_msg = f"Could not connect to HTTP GPU storage server after {max_retries} attempts"
            if last_error:
                error_msg += f". Last error: {last_error}"
            raise RuntimeError(error_msg)
    
    try:
        # Yield the storage connection
        yield storage
    except Exception as e:
        logging.error(f"WebSocket operation failed: {e}")
        # Try to reconnect once if operation fails
        if try_connect():
            logging.info("Successfully reconnected to GPU storage server")
            yield storage
        else:
            raise
    finally:
        if storage:
            try:
                storage.close()
            except:
                pass
        
# Cleanup handler
def cleanup_resources():
    import gc
    gc.collect()
    
# Register cleanup handler
atexit.register(cleanup_resources)

def test_ai_integration():
    print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---")
    from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon
    
    # Initialize components dictionary to store GPU resources
    components = {
        'chips': [],
        'ai_accelerators': [],
        'model_id': None,
        'vram': None,
        'storage': None,
        'model_config': None,
        'tensor_registry': {},
        'initialized': False
    }
    
    # Initialize global tensor registry
    global_tensor_registry = {
        'model_tensors': {},
        'runtime_tensors': {},
        'placeholder_tensors': {},
        'stats': {
            'total_vram_used': 0,
            'active_tensors': 0
        }
    }
    
    # Increase file descriptor limit
    increase_file_limit()

    print(f"\nElectron-Speed Architecture Parameters:")
    print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}")
    print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}")
    print(f"Electron drift velocity: {drift_velocity:.2e} m/s")
    print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%")

    # Test 1: HTTP-Based Model Loading with Florence
    print("\nTest 1: Loading Florence Model with HTTP Storage")
    try:
        # Use HTTP connection manager for proper resource handling
        with http_manager() as storage:
            components['storage'] = storage  # Save storage reference
            
            # Initialize virtual GPU stack with HTTP storage
            chip_for_loading = Chip(chip_id=0, vram_size_gb=32, storage=storage)  # Allocate sufficient VRAM
            components['chips'].append(chip_for_loading)
            
            # Initialize VRAM with HTTP storage
            vram = VirtualVRAM(storage=storage)
            components['vram'] = vram
            
            # Set up AI accelerator with HTTP support
            ai_accelerator_for_loading = AIAcceleratorHTTP(chip=chip_for_loading)
            ai_accelerator_for_loading.vram = vram
            ai_accelerator_for_loading.initialize_tensor_cores()
            components['ai_accelerators'].append(ai_accelerator_for_loading)

            # Initialize model registry in HTTP storage
            storage.store_model_state({
                "initialized": True,
                "max_vram": 32 * 1024 * 1024 * 1024,  # 32GB in bytes
                "active_models": {}
            })

        # Load Florence-2 model with HTTP storage
        from transformers import AutoModelForCausalLM, AutoProcessor
        model_id = "microsoft/florence-2-large"
        print(f"Loading model {model_id} with HTTP storage...")
        
        try:
            # Load model and processor with proper error handling
            model = AutoModelForCausalLM.from_pretrained(
                model_id, 
                trust_remote_code=True,
                device_map="auto",  # Allow automatic device mapping
                torch_dtype="auto"   # Use appropriate dtype
            )
            
            processor = AutoProcessor.from_pretrained(
                model_id, 
                trust_remote_code=True
            )
            
            # Ensure WebSocket connection is active before proceeding
            if not ai_accelerator_for_loading.storage.wait_for_connection():
                raise RuntimeError("WebSocket connection lost - please retry")
            
            # Calculate model size for proper VRAM allocation
            model_size = sum(p.numel() * p.element_size() for p in model.parameters())
            print(f"Model size: {model_size / (1024**3):.2f} GB")
            
            # Store model in WebSocket storage with size information
            # Load model directly using AIAccelerator's load_model method
            ai_accelerator_for_loading.load_model(
                model_id=model_id,
                model=model,
                processor=processor
            )
            
            print(f"Model '{model_id}' loaded successfully to WebSocket storage.")
            assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading."
            
            # Store model parameters in components dict
            components['model_id'] = model_id
            components['model_size'] = model_size
            
            # Clear any CPU-side model data
            model = None
            processor = None
            import gc
            gc.collect()
            
        except Exception as e:
            print(f"Detailed model loading error: {str(e)}")
            print("Falling back to zero-copy tensor mode...")
            # Try loading with zero-copy tensor mode
            try:
                # Load model with HTTP transfer
                ai_accelerator_for_loading.load_model(
                    model_id=model_id,
                    model=model,
                    processor=processor,
                    use_http=True
                )
                components['model_id'] = model_id
                print("Successfully loaded Florence model with HTTP transfer")
            except Exception as e2:
                print(f"HTTP model loading failed: {str(e2)}")
                raise

    except Exception as e:
        print(f"Model loading test failed: {e}")
        return
    # Test 2: HTTP-Based Multi-Chip Processing for Florence Inference
    print("\nTest 2: HTTP-Based Parallel Processing across Multiple Chips")
    num_chips = 4  # Using multiple chips for maximum parallelization
    chips = []
    ai_accelerators = []

    try:
        # Try to reuse existing HTTP connection with verification
        shared_storage = None
        max_connection_attempts = 3
        
        for attempt in range(max_connection_attempts):
            try:
                if components['storage']:
                    shared_storage = components['storage']
                    logging.info("Successfully reused existing HTTP connection")
                    break
                else:
                    logging.warning("Existing connection unavailable, creating new connection...")
                    with http_manager() as new_storage:
                        components['storage'] = new_storage
                        shared_storage = new_storage
                        logging.info("Successfully established new HTTP connection")
                        break
            except Exception as e:
                logging.error(f"Connection attempt {attempt + 1} failed: {e}")
                if attempt < max_connection_attempts - 1:
                    time.sleep(2)
                    continue
                raise RuntimeError(f"Failed to establish HTTP connection after {max_connection_attempts} attempts")
        
        # Initialize high-performance chip array with HTTP storage for Florence
        total_sms = 0
        total_cores = 0
        
        # Create optical interconnect for chip communication
        from gpu_arch import OpticalInterconnect
        optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1)
        
        # Reuse existing VRAM instance with shared storage
        shared_vram = components['vram']
        if shared_vram is None:
            shared_vram = VirtualVRAM()
        shared_vram.storage = shared_storage
        
        for i in range(num_chips):
            # Configure each chip with shared HTTP storage
            chip = Chip(chip_id=i, vram_size_gb=32, storage=shared_storage)  # 32GB VRAM per chip
            chips.append(chip)
            
            # Connect chips in a ring topology
            if i > 0:
                chip.connect_chip(chips[i-1], optical_link)
            
            # Initialize AI accelerator with HTTP support
            ai_accelerator = AIAcceleratorHTTP(chip=chip)
            ai_accelerator.vram = shared_vram
            ai_accelerator.storage = shared_storage
            ai_accelerators.append(ai_accelerator)
            
            # Initialize tensor cores for Florence model
            ai_accelerator.initialize_tensor_cores()
        
        print("\nTest 3: Florence Model Inference with HTTP Storage")
        try:
            # Load test image
            image_path = "test_image.jpg"  # Make sure this image exists
            if os.path.exists(image_path):
                image = Image.open(image_path)
                
                # Prepare input for Florence model
                inputs = processor(image, return_tensors="pt")
                
                # Run inference using HTTP storage
                outputs = ai_accelerator.run_inference(
                    model_id="microsoft/florence-2-large",
                    inputs=inputs,
                    use_http=True
                )
                
                # Process outputs
                if outputs is not None:
                    predicted_caption = processor.decode(outputs[0], skip_special_tokens=True)
                    print(f"\nFlorence Model Caption: {predicted_caption}")
                else:
                    print("Inference failed to produce output")
                    
            else:
                print(f"Test image not found at {image_path}")
                
        except Exception as e:
            print(f"Inference test failed: {str(e)}")
        finally:
            # Cleanup
            for ai_accelerator in ai_accelerators:
                try:
                    ai_accelerator.cleanup()
                except Exception as e:
                    print(f"Cleanup error: {str(e)}")
                    
            if shared_storage:
                try:
                    shared_storage.close()
                except Exception as e:
                    print(f"Storage cleanup error: {str(e)}")
                    
            # Clear any remaining GPU memory
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
                    
                 
            # Track total processing units
            total_sms += chip.num_sms
            total_cores += chip.num_sms * chip.cores_per_sm
            
            # Store chip configuration in WebSocket storage
            shared_storage.store_state(f"chips/{i}/config", "state", {
                "num_sms": chip.num_sms,
                "cores_per_sm": chip.cores_per_sm,
                "total_cores": chip.num_sms * chip.cores_per_sm,
                "connected_chips": [c.chip_id for c in chip.connected_chips]
            })
            
            print(f"Chip {i} initialized with WebSocket storage and optical interconnect")

        # Get all image files in sample_task folder
        image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task')
        image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
        image_files.sort()
        if not image_files:
            print("No images found in sample_task folder.")
            return

        print(f"\nTotal Processing Units:")
        print(f"- Streaming Multiprocessors: {total_sms:,}")
        print(f"- CUDA Cores: {total_cores:,}")
        print(f"- Electron-speed tensor cores: {total_cores * 8:,}")
        
        # Test multi-chip parallel inference with WebSocket storage
        for img_name in image_files[:1]:  # Test with first image
            img_path = os.path.join(image_folder, img_name)
            raw_image = Image.open(img_path).convert('RGB')
            print(f"\nRunning WebSocket-based inference for image: {img_name}")
            
            # Store input image in WebSocket storage
            image_array = np.array(raw_image)
            
            # Use shared VRAM's storage for tensor operations
            shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array)
            
            # Free CPU memory immediately
            raw_image = None
            image_array_shape = image_array.shape
            image_array = None
            gc.collect()
            
            # Synchronize all chips through WebSocket storage
            start_time = time.time()
            
            # Distribute workload across chips using WebSocket storage
            batch_size = image_array_shape[0] // num_chips
            results = []
            
            # Ensure all connections are properly managed
            for accelerator in ai_accelerators:
                accelerator.vram.storage = shared_vram.storage
            
            for i, accelerator in enumerate(ai_accelerators):
                # Load image section from WebSocket storage
                tensor_id = f"input_image/{img_name}"
                
                # Run inference using WebSocket-stored weights
                result = accelerator.inference(model_id, tensor_id)
                
                # Store result in WebSocket storage
                if result is not None:
                    storage.store_tensor(f"results/chip_{i}/{img_name}", result)
                    results.append(result)
            
            elapsed = time.time() - start_time
            
            # Calculate performance metrics
            ops_per_inference = total_cores * 1024  # FMA ops per core
            electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
            theoretical_time = electron_transit_time * ops_per_inference / total_cores
            
            # Combine results from all chips through WebSocket storage
            final_result = None
            for i in range(num_chips):
                chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}")
                if chip_result is not None:
                    if final_result is None:
                        final_result = chip_result
                    else:
                        final_result = np.concatenate([final_result, chip_result])
            
            print(f"\nWebSocket-Based Performance Metrics:")
            print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}")
            print(f"- Wall clock time: {elapsed*1000:.3f} ms")
            print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
            print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}")
            print(f"- Number of chips used: {num_chips}")
            
            assert final_result is not None, "WebSocket-based inference returned None"
            assert isinstance(result, str), "Inference result is not a string"
        print("Multi-chip inference test on all images (virtual GPU stack) successful.")

    except Exception as e:
        print(f"Multi-chip inference test failed: {e}")
        return
        return


    # Test 3: Electron-Speed Matrix Operations
    print("\nTest 3: Electron-Speed Matrix Operations")
    try:
        # Create large matrices to demonstrate parallel processing
        size = 1024  # Large enough to show parallelization benefits
        matrix_a = [[float(i+j) for j in range(size)] for i in range(size)]
        matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)]

        print("\nLoading matrices into virtual VRAM...")
        matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A")
        matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B")

        print("\nPerforming electron-speed matrix multiplication...")
        start_time = time.time()
        result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C")
        result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id)

        elapsed = time.time() - start_time
        
        # Calculate electron-speed performance metrics
        ops = size * size * size * 2  # Total multiply-add operations
        electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
        theoretical_time = electron_transit_time * ops / (total_cores * 8)  # 8 tensor cores per CUDA core
        
        print("\nElectron-Speed Matrix Operation Metrics:")
        print(f"Matrix size: {size}x{size}")
        print(f"Total operations: {ops:,}")
        print(f"Wall clock time: {elapsed*1000:.3f} ms")
        print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
        print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}")
        
        # Verify first few elements for correctness
        print("\nValidating results (first 2x2 corner):")
        print(f"Result[0:2,0:2] = ")
        for i in range(min(2, len(result_matrix))):
            print(result_matrix[i][:2])
            
        # Validate dimensions
        assert len(result_matrix) == size, "Result matrix has incorrect dimensions"
        assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions"
        print("\nMatrix operations at electron speed successful.")

    except Exception as e:
        print(f"Matrix operations test failed: {e}")
        return

    print("\n--- All AI Integration Tests Completed ---")