Spaces:
Sleeping
Sleeping
| """ | |
| Test AI integration with WebSocket-based storage and zero CPU memory usage. | |
| All operations are performed through WebSocket storage with direct tensor core access. | |
| """ | |
| import asyncio | |
| from gpu_arch import Chip | |
| from ai import AIAccelerator | |
| from virtual_vram import VirtualVRAM | |
| from PIL import Image | |
| import numpy as np | |
| from websocket_storage import WebSocketGPUStorage | |
| import time | |
| import os | |
| import platform | |
| import contextlib | |
| import atexit | |
| import logging | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # Increase system file descriptor limit | |
| def increase_file_limit(): | |
| try: | |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) | |
| resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) | |
| print(f"Increased file descriptor limit from {soft} to {hard}") | |
| except Exception as e: | |
| print(f"Warning: Could not increase file descriptor limit: {e}") | |
| # WebSocket connection manager with retry | |
| def websocket_manager(max_retries=5, retry_delay=2, timeout=30.0): | |
| storage = None | |
| last_error = None | |
| def try_connect(): | |
| nonlocal storage | |
| if storage: | |
| try: | |
| storage.close() | |
| except: | |
| pass | |
| storage = WebSocketGPUStorage() | |
| return storage.wait_for_connection(timeout=timeout) | |
| # Initial connection attempts | |
| for attempt in range(max_retries): | |
| try: | |
| if try_connect(): | |
| logging.info("Successfully connected to GPU storage server") | |
| break | |
| else: | |
| logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| except Exception as e: | |
| last_error = str(e) | |
| logging.error(f"Connection attempt {attempt + 1} failed with error: {e}") | |
| time.sleep(retry_delay) | |
| if attempt == max_retries - 1: | |
| error_msg = f"Could not connect to GPU storage server after {max_retries} attempts" | |
| if last_error: | |
| error_msg += f". Last error: {last_error}" | |
| raise RuntimeError(error_msg) | |
| try: | |
| # Yield the storage connection | |
| yield storage | |
| except Exception as e: | |
| logging.error(f"WebSocket operation failed: {e}") | |
| # Try to reconnect once if operation fails | |
| if try_connect(): | |
| logging.info("Successfully reconnected to GPU storage server") | |
| yield storage | |
| else: | |
| raise | |
| finally: | |
| if storage: | |
| try: | |
| storage.close() | |
| except: | |
| pass | |
| # Cleanup handler | |
| def cleanup_resources(): | |
| import gc | |
| gc.collect() | |
| # Register cleanup handler | |
| atexit.register(cleanup_resources) | |
| def test_ai_integration(): | |
| print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---") | |
| from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon | |
| # Initialize components dictionary to store GPU resources | |
| components = { | |
| 'chips': [], | |
| 'ai_accelerators': [], | |
| 'model_id': None, | |
| 'vram': None, | |
| 'storage': None, | |
| 'model_config': None, | |
| 'tensor_registry': {}, | |
| 'initialized': False | |
| } | |
| # Initialize global tensor registry | |
| global_tensor_registry = { | |
| 'model_tensors': {}, | |
| 'runtime_tensors': {}, | |
| 'placeholder_tensors': {}, | |
| 'stats': { | |
| 'total_vram_used': 0, | |
| 'active_tensors': 0 | |
| } | |
| } | |
| # Increase file descriptor limit | |
| increase_file_limit() | |
| print(f"\nElectron-Speed Architecture Parameters:") | |
| print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}") | |
| print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}") | |
| print(f"Electron drift velocity: {drift_velocity:.2e} m/s") | |
| print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%") | |
| # Test 1: WebSocket-Based Model Loading | |
| print("\nTest 1: Model Loading with WebSocket Storage") | |
| try: | |
| # Use WebSocket connection manager for proper resource handling | |
| with websocket_manager() as storage: | |
| components['storage'] = storage # Save storage reference | |
| # Initialize virtual GPU stack with unlimited WebSocket storage and shared connection | |
| chip_for_loading = Chip(chip_id=0, vram_size_gb=None, storage=storage) # Pass shared storage | |
| components['chips'].append(chip_for_loading) | |
| # Initialize VRAM with shared WebSocket storage | |
| vram = VirtualVRAM(storage=storage) # Pass shared storage instance | |
| components['vram'] = vram | |
| # Set up AI accelerator - note it already has the shared storage | |
| ai_accelerator_for_loading = chip_for_loading.ai_accelerator | |
| ai_accelerator_for_loading.vram = vram # Use WebSocket-backed VRAM | |
| ai_accelerator_for_loading.initialize_tensor_cores() # Ensure tensor cores are ready | |
| components['ai_accelerators'].append(ai_accelerator_for_loading) | |
| # Initialize model registry in WebSocket storage | |
| storage.store_state("model_registry", "state", { | |
| "initialized": True, | |
| "max_vram": None, # Unlimited | |
| "active_models": {} | |
| }) | |
| # Load BLIP-2 Large model directly to WebSocket storage | |
| from transformers import AutoModelForCausalLM, AutoProcessor | |
| model_id = "microsoft/florence-2-large" | |
| print(f"Loading model {model_id} directly to WebSocket storage...") | |
| try: | |
| # Load model and processor with proper error handling | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| trust_remote_code=True, | |
| device_map="auto", # Allow automatic device mapping | |
| torch_dtype="auto" # Use appropriate dtype | |
| ) | |
| processor = AutoProcessor.from_pretrained( | |
| model_id, | |
| trust_remote_code=True | |
| ) | |
| # Ensure WebSocket connection is active before proceeding | |
| if not ai_accelerator_for_loading.storage.wait_for_connection(): | |
| raise RuntimeError("WebSocket connection lost - please retry") | |
| # Calculate model size for proper VRAM allocation | |
| model_size = sum(p.numel() * p.element_size() for p in model.parameters()) | |
| print(f"Model size: {model_size / (1024**3):.2f} GB") | |
| # Store model in WebSocket storage with size information | |
| # Load model directly using AIAccelerator's load_model method | |
| ai_accelerator_for_loading.load_model( | |
| model_id=model_id, | |
| model=model, | |
| processor=processor | |
| ) | |
| print(f"Model '{model_id}' loaded successfully to WebSocket storage.") | |
| assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading." | |
| # Store model parameters in components dict | |
| components['model_id'] = model_id | |
| components['model_size'] = model_size | |
| # Clear any CPU-side model data | |
| model = None | |
| processor = None | |
| import gc | |
| gc.collect() | |
| except Exception as e: | |
| print(f"Detailed model loading error: {str(e)}") | |
| print("Falling back to zero-copy tensor mode...") | |
| # Try loading with zero-copy tensor mode | |
| try: | |
| # Try zero-copy loading | |
| ai_accelerator_for_loading.load_model( | |
| model_id=model_id, | |
| model=None, | |
| processor=None | |
| ) | |
| components['model_id'] = model_id | |
| print("Successfully loaded model in zero-copy mode") | |
| except Exception as e2: | |
| print(f"Zero-copy fallback also failed: {str(e2)}") | |
| raise | |
| except Exception as e: | |
| print(f"Model loading test failed: {e}") | |
| return | |
| # Test 2: WebSocket-Based Multi-Chip Processing | |
| print("\nTest 2: WebSocket-Based Parallel Processing across Multiple Chips") | |
| num_chips = 4 # Using multiple chips for maximum parallelization | |
| chips = [] | |
| ai_accelerators = [] | |
| try: | |
| # Try to reuse existing connection with verification | |
| shared_storage = None | |
| max_connection_attempts = 3 | |
| for attempt in range(max_connection_attempts): | |
| try: | |
| if (components['storage'] and | |
| components['storage'].wait_for_connection(timeout=10.0)): | |
| shared_storage = components['storage'] | |
| shared_storage.set_keep_alive(True) # Enable keep-alive | |
| logging.info("Successfully reused existing WebSocket connection") | |
| break | |
| else: | |
| logging.warning("Existing connection unavailable, creating new connection...") | |
| with websocket_manager(timeout=30.0) as new_storage: | |
| if new_storage and new_storage.wait_for_connection(timeout=10.0): | |
| components['storage'] = new_storage | |
| shared_storage = new_storage | |
| shared_storage.set_keep_alive(True) # Enable keep-alive | |
| logging.info("Successfully established new WebSocket connection") | |
| break | |
| except Exception as e: | |
| logging.error(f"Connection attempt {attempt + 1} failed: {e}") | |
| if attempt < max_connection_attempts - 1: | |
| time.sleep(2) | |
| continue | |
| raise RuntimeError(f"Failed to establish WebSocket connection after {max_connection_attempts} attempts") | |
| # Initialize high-performance chip array with WebSocket storage | |
| total_sms = 0 | |
| total_cores = 0 | |
| # Create optical interconnect for chip communication | |
| from gpu_arch import OpticalInterconnect | |
| optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1) | |
| # Reuse existing VRAM instance with shared storage | |
| shared_vram = components['vram'] | |
| if shared_vram is None: | |
| shared_vram = VirtualVRAM() | |
| shared_vram.storage = shared_storage | |
| for i in range(num_chips): | |
| # Configure each chip with shared WebSocket storage | |
| chip = Chip(chip_id=i, vram_size_gb=None, storage=shared_storage) | |
| chips.append(chip) | |
| # Connect chips in a ring topology | |
| if i > 0: | |
| chip.connect_chip(chips[i-1], optical_link) | |
| # Initialize AI accelerator with shared resources | |
| ai_accelerator = chip.ai_accelerator | |
| ai_accelerator.vram = shared_vram | |
| ai_accelerator.storage = shared_storage # Ensure storage is set | |
| ai_accelerators.append(ai_accelerator) | |
| # Verify and potentially repair WebSocket connection | |
| max_retry = 3 | |
| for retry in range(max_retry): | |
| try: | |
| if not shared_storage.wait_for_connection(timeout=5.0): | |
| logging.warning(f"Connection check failed for chip {i}, attempt {retry + 1}") | |
| shared_storage.reconnect() # Attempt to reconnect | |
| time.sleep(1) | |
| continue | |
| # Load model weights from WebSocket storage (no CPU transfer) | |
| ai_accelerator.load_model(model_id, None, None) # Model already in WebSocket storage | |
| logging.info(f"Successfully initialized chip {i} with model") | |
| break | |
| except Exception as e: | |
| if retry < max_retry - 1: | |
| logging.warning(f"Error initializing chip {i}, attempt {retry + 1}: {e}") | |
| time.sleep(1) | |
| continue | |
| else: | |
| logging.error(f"Failed to initialize chip {i} after {max_retry} attempts: {e}") | |
| raise | |
| # Track total processing units | |
| total_sms += chip.num_sms | |
| total_cores += chip.num_sms * chip.cores_per_sm | |
| # Store chip configuration in WebSocket storage | |
| shared_storage.store_state(f"chips/{i}/config", "state", { | |
| "num_sms": chip.num_sms, | |
| "cores_per_sm": chip.cores_per_sm, | |
| "total_cores": chip.num_sms * chip.cores_per_sm, | |
| "connected_chips": [c.chip_id for c in chip.connected_chips] | |
| }) | |
| print(f"Chip {i} initialized with WebSocket storage and optical interconnect") | |
| # Get all image files in sample_task folder | |
| image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task') | |
| image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] | |
| image_files.sort() | |
| if not image_files: | |
| print("No images found in sample_task folder.") | |
| return | |
| print(f"\nTotal Processing Units:") | |
| print(f"- Streaming Multiprocessors: {total_sms:,}") | |
| print(f"- CUDA Cores: {total_cores:,}") | |
| print(f"- Electron-speed tensor cores: {total_cores * 8:,}") | |
| # Test multi-chip parallel inference with WebSocket storage | |
| for img_name in image_files[:1]: # Test with first image | |
| img_path = os.path.join(image_folder, img_name) | |
| raw_image = Image.open(img_path).convert('RGB') | |
| print(f"\nRunning WebSocket-based inference for image: {img_name}") | |
| # Store input image in WebSocket storage | |
| image_array = np.array(raw_image) | |
| # Use shared VRAM's storage for tensor operations | |
| shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array) | |
| # Free CPU memory immediately | |
| raw_image = None | |
| image_array_shape = image_array.shape | |
| image_array = None | |
| gc.collect() | |
| # Synchronize all chips through WebSocket storage | |
| start_time = time.time() | |
| # Distribute workload across chips using WebSocket storage | |
| batch_size = image_array_shape[0] // num_chips | |
| results = [] | |
| # Ensure all connections are properly managed | |
| for accelerator in ai_accelerators: | |
| accelerator.vram.storage = shared_vram.storage | |
| for i, accelerator in enumerate(ai_accelerators): | |
| # Load image section from WebSocket storage | |
| tensor_id = f"input_image/{img_name}" | |
| # Run inference using WebSocket-stored weights | |
| result = accelerator.inference(model_id, tensor_id) | |
| # Store result in WebSocket storage | |
| if result is not None: | |
| storage.store_tensor(f"results/chip_{i}/{img_name}", result) | |
| results.append(result) | |
| elapsed = time.time() - start_time | |
| # Calculate performance metrics | |
| ops_per_inference = total_cores * 1024 # FMA ops per core | |
| electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) | |
| theoretical_time = electron_transit_time * ops_per_inference / total_cores | |
| # Combine results from all chips through WebSocket storage | |
| final_result = None | |
| for i in range(num_chips): | |
| chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}") | |
| if chip_result is not None: | |
| if final_result is None: | |
| final_result = chip_result | |
| else: | |
| final_result = np.concatenate([final_result, chip_result]) | |
| print(f"\nWebSocket-Based Performance Metrics:") | |
| print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}") | |
| print(f"- Wall clock time: {elapsed*1000:.3f} ms") | |
| print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") | |
| print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}") | |
| print(f"- Number of chips used: {num_chips}") | |
| assert final_result is not None, "WebSocket-based inference returned None" | |
| assert isinstance(result, str), "Inference result is not a string" | |
| print("Multi-chip inference test on all images (virtual GPU stack) successful.") | |
| except Exception as e: | |
| print(f"Multi-chip inference test failed: {e}") | |
| return | |
| return | |
| # Test 3: Electron-Speed Matrix Operations | |
| print("\nTest 3: Electron-Speed Matrix Operations") | |
| try: | |
| # Create large matrices to demonstrate parallel processing | |
| size = 1024 # Large enough to show parallelization benefits | |
| matrix_a = [[float(i+j) for j in range(size)] for i in range(size)] | |
| matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)] | |
| print("\nLoading matrices into virtual VRAM...") | |
| matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A") | |
| matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B") | |
| print("\nPerforming electron-speed matrix multiplication...") | |
| start_time = time.time() | |
| result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C") | |
| result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id) | |
| elapsed = time.time() - start_time | |
| # Calculate electron-speed performance metrics | |
| ops = size * size * size * 2 # Total multiply-add operations | |
| electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) | |
| theoretical_time = electron_transit_time * ops / (total_cores * 8) # 8 tensor cores per CUDA core | |
| print("\nElectron-Speed Matrix Operation Metrics:") | |
| print(f"Matrix size: {size}x{size}") | |
| print(f"Total operations: {ops:,}") | |
| print(f"Wall clock time: {elapsed*1000:.3f} ms") | |
| print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") | |
| print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}") | |
| # Verify first few elements for correctness | |
| print("\nValidating results (first 2x2 corner):") | |
| print(f"Result[0:2,0:2] = ") | |
| for i in range(min(2, len(result_matrix))): | |
| print(result_matrix[i][:2]) | |
| # Validate dimensions | |
| assert len(result_matrix) == size, "Result matrix has incorrect dimensions" | |
| assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions" | |
| print("\nMatrix operations at electron speed successful.") | |
| except Exception as e: | |
| print(f"Matrix operations test failed: {e}") | |
| return | |
| print("\n--- All AI Integration Tests Completed ---") |