""" Test AI integration with WebSocket-based storage and zero CPU memory usage. All operations are performed through WebSocket storage with direct tensor core access. """ import asyncio from gpu_arch import Chip from ai import AIAccelerator from virtual_vram import VirtualVRAM from PIL import Image import numpy as np from websocket_storage import WebSocketGPUStorage import time import os import platform import contextlib import atexit import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # Increase system file descriptor limit def increase_file_limit(): try: soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) print(f"Increased file descriptor limit from {soft} to {hard}") except Exception as e: print(f"Warning: Could not increase file descriptor limit: {e}") # WebSocket connection manager with retry @contextlib.contextmanager def websocket_manager(max_retries=5, retry_delay=2, timeout=30.0): storage = None last_error = None def try_connect(): nonlocal storage if storage: try: storage.close() except: pass storage = WebSocketGPUStorage() return storage.wait_for_connection(timeout=timeout) # Initial connection attempts for attempt in range(max_retries): try: if try_connect(): logging.info("Successfully connected to GPU storage server") break else: logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...") time.sleep(retry_delay) except Exception as e: last_error = str(e) logging.error(f"Connection attempt {attempt + 1} failed with error: {e}") time.sleep(retry_delay) if attempt == max_retries - 1: error_msg = f"Could not connect to GPU storage server after {max_retries} attempts" if last_error: error_msg += f". Last error: {last_error}" raise RuntimeError(error_msg) try: # Yield the storage connection yield storage except Exception as e: logging.error(f"WebSocket operation failed: {e}") # Try to reconnect once if operation fails if try_connect(): logging.info("Successfully reconnected to GPU storage server") yield storage else: raise finally: if storage: try: storage.close() except: pass # Cleanup handler def cleanup_resources(): import gc gc.collect() # Register cleanup handler atexit.register(cleanup_resources) def test_ai_integration(): print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---") from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon # Initialize components dictionary to store GPU resources components = { 'chips': [], 'ai_accelerators': [], 'model_id': None, 'vram': None, 'storage': None, 'model_config': None, 'tensor_registry': {}, 'initialized': False } # Initialize global tensor registry global_tensor_registry = { 'model_tensors': {}, 'runtime_tensors': {}, 'placeholder_tensors': {}, 'stats': { 'total_vram_used': 0, 'active_tensors': 0 } } # Increase file descriptor limit increase_file_limit() print(f"\nElectron-Speed Architecture Parameters:") print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}") print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}") print(f"Electron drift velocity: {drift_velocity:.2e} m/s") print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%") # Test 1: WebSocket-Based Model Loading print("\nTest 1: Model Loading with WebSocket Storage") try: # Use WebSocket connection manager for proper resource handling with websocket_manager() as storage: components['storage'] = storage # Save storage reference # Initialize virtual GPU stack with unlimited WebSocket storage and shared connection chip_for_loading = Chip(chip_id=0, vram_size_gb=None, storage=storage) # Pass shared storage components['chips'].append(chip_for_loading) # Initialize VRAM with shared WebSocket storage vram = VirtualVRAM(storage=storage) # Pass shared storage instance components['vram'] = vram # Set up AI accelerator - note it already has the shared storage ai_accelerator_for_loading = chip_for_loading.ai_accelerator ai_accelerator_for_loading.vram = vram # Use WebSocket-backed VRAM ai_accelerator_for_loading.initialize_tensor_cores() # Ensure tensor cores are ready components['ai_accelerators'].append(ai_accelerator_for_loading) # Initialize model registry in WebSocket storage storage.store_state("model_registry", "state", { "initialized": True, "max_vram": None, # Unlimited "active_models": {} }) # Load BLIP-2 Large model directly to WebSocket storage from transformers import AutoModelForCausalLM, AutoProcessor model_id = "microsoft/florence-2-large" print(f"Loading model {model_id} directly to WebSocket storage...") try: # Load model and processor with proper error handling model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, device_map="auto", # Allow automatic device mapping torch_dtype="auto" # Use appropriate dtype ) processor = AutoProcessor.from_pretrained( model_id, trust_remote_code=True ) # Ensure WebSocket connection is active before proceeding if not ai_accelerator_for_loading.storage.wait_for_connection(): raise RuntimeError("WebSocket connection lost - please retry") # Calculate model size for proper VRAM allocation model_size = sum(p.numel() * p.element_size() for p in model.parameters()) print(f"Model size: {model_size / (1024**3):.2f} GB") # Store model in WebSocket storage with size information # Load model directly using AIAccelerator's load_model method ai_accelerator_for_loading.load_model( model_id=model_id, model=model, processor=processor ) print(f"Model '{model_id}' loaded successfully to WebSocket storage.") assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading." # Store model parameters in components dict components['model_id'] = model_id components['model_size'] = model_size # Clear any CPU-side model data model = None processor = None import gc gc.collect() except Exception as e: print(f"Detailed model loading error: {str(e)}") print("Falling back to zero-copy tensor mode...") # Try loading with zero-copy tensor mode try: # Try zero-copy loading ai_accelerator_for_loading.load_model( model_id=model_id, model=None, processor=None ) components['model_id'] = model_id print("Successfully loaded model in zero-copy mode") except Exception as e2: print(f"Zero-copy fallback also failed: {str(e2)}") raise except Exception as e: print(f"Model loading test failed: {e}") return # Test 2: WebSocket-Based Multi-Chip Processing print("\nTest 2: WebSocket-Based Parallel Processing across Multiple Chips") num_chips = 4 # Using multiple chips for maximum parallelization chips = [] ai_accelerators = [] try: # Try to reuse existing connection with verification shared_storage = None max_connection_attempts = 3 for attempt in range(max_connection_attempts): try: if (components['storage'] and components['storage'].wait_for_connection(timeout=10.0)): shared_storage = components['storage'] shared_storage.set_keep_alive(True) # Enable keep-alive logging.info("Successfully reused existing WebSocket connection") break else: logging.warning("Existing connection unavailable, creating new connection...") with websocket_manager(timeout=30.0) as new_storage: if new_storage and new_storage.wait_for_connection(timeout=10.0): components['storage'] = new_storage shared_storage = new_storage shared_storage.set_keep_alive(True) # Enable keep-alive logging.info("Successfully established new WebSocket connection") break except Exception as e: logging.error(f"Connection attempt {attempt + 1} failed: {e}") if attempt < max_connection_attempts - 1: time.sleep(2) continue raise RuntimeError(f"Failed to establish WebSocket connection after {max_connection_attempts} attempts") # Initialize high-performance chip array with WebSocket storage total_sms = 0 total_cores = 0 # Create optical interconnect for chip communication from gpu_arch import OpticalInterconnect optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1) # Reuse existing VRAM instance with shared storage shared_vram = components['vram'] if shared_vram is None: shared_vram = VirtualVRAM() shared_vram.storage = shared_storage for i in range(num_chips): # Configure each chip with shared WebSocket storage chip = Chip(chip_id=i, vram_size_gb=None, storage=shared_storage) chips.append(chip) # Connect chips in a ring topology if i > 0: chip.connect_chip(chips[i-1], optical_link) # Initialize AI accelerator with shared resources ai_accelerator = chip.ai_accelerator ai_accelerator.vram = shared_vram ai_accelerator.storage = shared_storage # Ensure storage is set ai_accelerators.append(ai_accelerator) # Verify and potentially repair WebSocket connection max_retry = 3 for retry in range(max_retry): try: if not shared_storage.wait_for_connection(timeout=5.0): logging.warning(f"Connection check failed for chip {i}, attempt {retry + 1}") shared_storage.reconnect() # Attempt to reconnect time.sleep(1) continue # Load model weights from WebSocket storage (no CPU transfer) ai_accelerator.load_model(model_id, None, None) # Model already in WebSocket storage logging.info(f"Successfully initialized chip {i} with model") break except Exception as e: if retry < max_retry - 1: logging.warning(f"Error initializing chip {i}, attempt {retry + 1}: {e}") time.sleep(1) continue else: logging.error(f"Failed to initialize chip {i} after {max_retry} attempts: {e}") raise # Track total processing units total_sms += chip.num_sms total_cores += chip.num_sms * chip.cores_per_sm # Store chip configuration in WebSocket storage shared_storage.store_state(f"chips/{i}/config", "state", { "num_sms": chip.num_sms, "cores_per_sm": chip.cores_per_sm, "total_cores": chip.num_sms * chip.cores_per_sm, "connected_chips": [c.chip_id for c in chip.connected_chips] }) print(f"Chip {i} initialized with WebSocket storage and optical interconnect") # Get all image files in sample_task folder image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task') image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] image_files.sort() if not image_files: print("No images found in sample_task folder.") return print(f"\nTotal Processing Units:") print(f"- Streaming Multiprocessors: {total_sms:,}") print(f"- CUDA Cores: {total_cores:,}") print(f"- Electron-speed tensor cores: {total_cores * 8:,}") # Test multi-chip parallel inference with WebSocket storage for img_name in image_files[:1]: # Test with first image img_path = os.path.join(image_folder, img_name) raw_image = Image.open(img_path).convert('RGB') print(f"\nRunning WebSocket-based inference for image: {img_name}") # Store input image in WebSocket storage image_array = np.array(raw_image) # Use shared VRAM's storage for tensor operations shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array) # Free CPU memory immediately raw_image = None image_array_shape = image_array.shape image_array = None gc.collect() # Synchronize all chips through WebSocket storage start_time = time.time() # Distribute workload across chips using WebSocket storage batch_size = image_array_shape[0] // num_chips results = [] # Ensure all connections are properly managed for accelerator in ai_accelerators: accelerator.vram.storage = shared_vram.storage for i, accelerator in enumerate(ai_accelerators): # Load image section from WebSocket storage tensor_id = f"input_image/{img_name}" # Run inference using WebSocket-stored weights result = accelerator.inference(model_id, tensor_id) # Store result in WebSocket storage if result is not None: storage.store_tensor(f"results/chip_{i}/{img_name}", result) results.append(result) elapsed = time.time() - start_time # Calculate performance metrics ops_per_inference = total_cores * 1024 # FMA ops per core electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) theoretical_time = electron_transit_time * ops_per_inference / total_cores # Combine results from all chips through WebSocket storage final_result = None for i in range(num_chips): chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}") if chip_result is not None: if final_result is None: final_result = chip_result else: final_result = np.concatenate([final_result, chip_result]) print(f"\nWebSocket-Based Performance Metrics:") print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}") print(f"- Wall clock time: {elapsed*1000:.3f} ms") print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}") print(f"- Number of chips used: {num_chips}") assert final_result is not None, "WebSocket-based inference returned None" assert isinstance(result, str), "Inference result is not a string" print("Multi-chip inference test on all images (virtual GPU stack) successful.") except Exception as e: print(f"Multi-chip inference test failed: {e}") return return # Test 3: Electron-Speed Matrix Operations print("\nTest 3: Electron-Speed Matrix Operations") try: # Create large matrices to demonstrate parallel processing size = 1024 # Large enough to show parallelization benefits matrix_a = [[float(i+j) for j in range(size)] for i in range(size)] matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)] print("\nLoading matrices into virtual VRAM...") matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A") matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B") print("\nPerforming electron-speed matrix multiplication...") start_time = time.time() result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C") result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id) elapsed = time.time() - start_time # Calculate electron-speed performance metrics ops = size * size * size * 2 # Total multiply-add operations electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) theoretical_time = electron_transit_time * ops / (total_cores * 8) # 8 tensor cores per CUDA core print("\nElectron-Speed Matrix Operation Metrics:") print(f"Matrix size: {size}x{size}") print(f"Total operations: {ops:,}") print(f"Wall clock time: {elapsed*1000:.3f} ms") print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}") # Verify first few elements for correctness print("\nValidating results (first 2x2 corner):") print(f"Result[0:2,0:2] = ") for i in range(min(2, len(result_matrix))): print(result_matrix[i][:2]) # Validate dimensions assert len(result_matrix) == size, "Result matrix has incorrect dimensions" assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions" print("\nMatrix operations at electron speed successful.") except Exception as e: print(f"Matrix operations test failed: {e}") return print("\n--- All AI Integration Tests Completed ---")