""" Test AI integration with HTTP-based storage for Florence model inference. All operations are performed through HTTP storage with direct tensor core access. """ import asyncio from gpu_arch import Chip from ai_http import AIAcceleratorHTTP from virtual_vram import VirtualVRAM from PIL import Image import numpy as np from http_storage import HTTPGPUStorage import time import os import platform import contextlib import atexit import logging import torch # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # Increase system file descriptor limit def increase_file_limit(): try: soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) print(f"Increased file descriptor limit from {soft} to {hard}") except Exception as e: print(f"Warning: Could not increase file descriptor limit: {e}") # HTTP connection manager with retry @contextlib.contextmanager def http_manager(max_retries=5, retry_delay=2): storage = None last_error = None def try_connect(): nonlocal storage if storage: try: storage.close() except: pass storage = HTTPGPUStorage() return storage.connect() # Initial connection attempts for attempt in range(max_retries): try: if try_connect(): logging.info("Successfully connected to HTTP GPU storage server") break else: logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...") time.sleep(retry_delay) except Exception as e: last_error = str(e) logging.error(f"Connection attempt {attempt + 1} failed with error: {e}") time.sleep(retry_delay) if attempt == max_retries - 1: error_msg = f"Could not connect to HTTP GPU storage server after {max_retries} attempts" if last_error: error_msg += f". Last error: {last_error}" raise RuntimeError(error_msg) try: # Yield the storage connection yield storage except Exception as e: logging.error(f"WebSocket operation failed: {e}") # Try to reconnect once if operation fails if try_connect(): logging.info("Successfully reconnected to GPU storage server") yield storage else: raise finally: if storage: try: storage.close() except: pass # Cleanup handler def cleanup_resources(): import gc gc.collect() # Register cleanup handler atexit.register(cleanup_resources) def test_ai_integration(): print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---") from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon # Initialize components dictionary to store GPU resources components = { 'chips': [], 'ai_accelerators': [], 'model_id': None, 'vram': None, 'storage': None, 'model_config': None, 'tensor_registry': {}, 'initialized': False } # Initialize global tensor registry global_tensor_registry = { 'model_tensors': {}, 'runtime_tensors': {}, 'placeholder_tensors': {}, 'stats': { 'total_vram_used': 0, 'active_tensors': 0 } } # Increase file descriptor limit increase_file_limit() print(f"\nElectron-Speed Architecture Parameters:") print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}") print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}") print(f"Electron drift velocity: {drift_velocity:.2e} m/s") print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%") # Test 1: HTTP-Based Model Loading with Florence print("\nTest 1: Loading Florence Model with HTTP Storage") try: # Use HTTP connection manager for proper resource handling with http_manager() as storage: components['storage'] = storage # Save storage reference # Initialize virtual GPU stack with HTTP storage chip_for_loading = Chip(chip_id=0, vram_size_gb=32, storage=storage) # Allocate sufficient VRAM components['chips'].append(chip_for_loading) # Initialize VRAM with HTTP storage vram = VirtualVRAM(storage=storage) components['vram'] = vram # Set up AI accelerator with HTTP support ai_accelerator_for_loading = AIAcceleratorHTTP(chip=chip_for_loading) ai_accelerator_for_loading.vram = vram ai_accelerator_for_loading.initialize_tensor_cores() components['ai_accelerators'].append(ai_accelerator_for_loading) # Initialize model registry in HTTP storage storage.store_model_state({ "initialized": True, "max_vram": 32 * 1024 * 1024 * 1024, # 32GB in bytes "active_models": {} }) # Load Florence-2 model with HTTP storage from transformers import AutoModelForCausalLM, AutoProcessor model_id = "microsoft/florence-2-large" print(f"Loading model {model_id} with HTTP storage...") try: # Load model and processor with proper error handling model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, device_map="auto", # Allow automatic device mapping torch_dtype="auto" # Use appropriate dtype ) processor = AutoProcessor.from_pretrained( model_id, trust_remote_code=True ) # Ensure WebSocket connection is active before proceeding if not ai_accelerator_for_loading.storage.wait_for_connection(): raise RuntimeError("WebSocket connection lost - please retry") # Calculate model size for proper VRAM allocation model_size = sum(p.numel() * p.element_size() for p in model.parameters()) print(f"Model size: {model_size / (1024**3):.2f} GB") # Store model in WebSocket storage with size information # Load model directly using AIAccelerator's load_model method ai_accelerator_for_loading.load_model( model_id=model_id, model=model, processor=processor ) print(f"Model '{model_id}' loaded successfully to WebSocket storage.") assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading." # Store model parameters in components dict components['model_id'] = model_id components['model_size'] = model_size # Clear any CPU-side model data model = None processor = None import gc gc.collect() except Exception as e: print(f"Detailed model loading error: {str(e)}") print("Falling back to zero-copy tensor mode...") # Try loading with zero-copy tensor mode try: # Load model with HTTP transfer ai_accelerator_for_loading.load_model( model_id=model_id, model=model, processor=processor, use_http=True ) components['model_id'] = model_id print("Successfully loaded Florence model with HTTP transfer") except Exception as e2: print(f"HTTP model loading failed: {str(e2)}") raise except Exception as e: print(f"Model loading test failed: {e}") return # Test 2: HTTP-Based Multi-Chip Processing for Florence Inference print("\nTest 2: HTTP-Based Parallel Processing across Multiple Chips") num_chips = 4 # Using multiple chips for maximum parallelization chips = [] ai_accelerators = [] try: # Try to reuse existing HTTP connection with verification shared_storage = None max_connection_attempts = 3 for attempt in range(max_connection_attempts): try: if components['storage']: shared_storage = components['storage'] logging.info("Successfully reused existing HTTP connection") break else: logging.warning("Existing connection unavailable, creating new connection...") with http_manager() as new_storage: components['storage'] = new_storage shared_storage = new_storage logging.info("Successfully established new HTTP connection") break except Exception as e: logging.error(f"Connection attempt {attempt + 1} failed: {e}") if attempt < max_connection_attempts - 1: time.sleep(2) continue raise RuntimeError(f"Failed to establish HTTP connection after {max_connection_attempts} attempts") # Initialize high-performance chip array with HTTP storage for Florence total_sms = 0 total_cores = 0 # Create optical interconnect for chip communication from gpu_arch import OpticalInterconnect optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1) # Reuse existing VRAM instance with shared storage shared_vram = components['vram'] if shared_vram is None: shared_vram = VirtualVRAM() shared_vram.storage = shared_storage for i in range(num_chips): # Configure each chip with shared HTTP storage chip = Chip(chip_id=i, vram_size_gb=32, storage=shared_storage) # 32GB VRAM per chip chips.append(chip) # Connect chips in a ring topology if i > 0: chip.connect_chip(chips[i-1], optical_link) # Initialize AI accelerator with HTTP support ai_accelerator = AIAcceleratorHTTP(chip=chip) ai_accelerator.vram = shared_vram ai_accelerator.storage = shared_storage ai_accelerators.append(ai_accelerator) # Initialize tensor cores for Florence model ai_accelerator.initialize_tensor_cores() print("\nTest 3: Florence Model Inference with HTTP Storage") try: # Load test image image_path = "test_image.jpg" # Make sure this image exists if os.path.exists(image_path): image = Image.open(image_path) # Prepare input for Florence model inputs = processor(image, return_tensors="pt") # Run inference using HTTP storage outputs = ai_accelerator.run_inference( model_id="microsoft/florence-2-large", inputs=inputs, use_http=True ) # Process outputs if outputs is not None: predicted_caption = processor.decode(outputs[0], skip_special_tokens=True) print(f"\nFlorence Model Caption: {predicted_caption}") else: print("Inference failed to produce output") else: print(f"Test image not found at {image_path}") except Exception as e: print(f"Inference test failed: {str(e)}") finally: # Cleanup for ai_accelerator in ai_accelerators: try: ai_accelerator.cleanup() except Exception as e: print(f"Cleanup error: {str(e)}") if shared_storage: try: shared_storage.close() except Exception as e: print(f"Storage cleanup error: {str(e)}") # Clear any remaining GPU memory if torch.cuda.is_available(): torch.cuda.empty_cache() # Track total processing units total_sms += chip.num_sms total_cores += chip.num_sms * chip.cores_per_sm # Store chip configuration in WebSocket storage shared_storage.store_state(f"chips/{i}/config", "state", { "num_sms": chip.num_sms, "cores_per_sm": chip.cores_per_sm, "total_cores": chip.num_sms * chip.cores_per_sm, "connected_chips": [c.chip_id for c in chip.connected_chips] }) print(f"Chip {i} initialized with WebSocket storage and optical interconnect") # Get all image files in sample_task folder image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task') image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] image_files.sort() if not image_files: print("No images found in sample_task folder.") return print(f"\nTotal Processing Units:") print(f"- Streaming Multiprocessors: {total_sms:,}") print(f"- CUDA Cores: {total_cores:,}") print(f"- Electron-speed tensor cores: {total_cores * 8:,}") # Test multi-chip parallel inference with WebSocket storage for img_name in image_files[:1]: # Test with first image img_path = os.path.join(image_folder, img_name) raw_image = Image.open(img_path).convert('RGB') print(f"\nRunning WebSocket-based inference for image: {img_name}") # Store input image in WebSocket storage image_array = np.array(raw_image) # Use shared VRAM's storage for tensor operations shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array) # Free CPU memory immediately raw_image = None image_array_shape = image_array.shape image_array = None gc.collect() # Synchronize all chips through WebSocket storage start_time = time.time() # Distribute workload across chips using WebSocket storage batch_size = image_array_shape[0] // num_chips results = [] # Ensure all connections are properly managed for accelerator in ai_accelerators: accelerator.vram.storage = shared_vram.storage for i, accelerator in enumerate(ai_accelerators): # Load image section from WebSocket storage tensor_id = f"input_image/{img_name}" # Run inference using WebSocket-stored weights result = accelerator.inference(model_id, tensor_id) # Store result in WebSocket storage if result is not None: storage.store_tensor(f"results/chip_{i}/{img_name}", result) results.append(result) elapsed = time.time() - start_time # Calculate performance metrics ops_per_inference = total_cores * 1024 # FMA ops per core electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) theoretical_time = electron_transit_time * ops_per_inference / total_cores # Combine results from all chips through WebSocket storage final_result = None for i in range(num_chips): chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}") if chip_result is not None: if final_result is None: final_result = chip_result else: final_result = np.concatenate([final_result, chip_result]) print(f"\nWebSocket-Based Performance Metrics:") print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}") print(f"- Wall clock time: {elapsed*1000:.3f} ms") print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}") print(f"- Number of chips used: {num_chips}") assert final_result is not None, "WebSocket-based inference returned None" assert isinstance(result, str), "Inference result is not a string" print("Multi-chip inference test on all images (virtual GPU stack) successful.") except Exception as e: print(f"Multi-chip inference test failed: {e}") return return # Test 3: Electron-Speed Matrix Operations print("\nTest 3: Electron-Speed Matrix Operations") try: # Create large matrices to demonstrate parallel processing size = 1024 # Large enough to show parallelization benefits matrix_a = [[float(i+j) for j in range(size)] for i in range(size)] matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)] print("\nLoading matrices into virtual VRAM...") matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A") matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B") print("\nPerforming electron-speed matrix multiplication...") start_time = time.time() result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C") result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id) elapsed = time.time() - start_time # Calculate electron-speed performance metrics ops = size * size * size * 2 # Total multiply-add operations electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) theoretical_time = electron_transit_time * ops / (total_cores * 8) # 8 tensor cores per CUDA core print("\nElectron-Speed Matrix Operation Metrics:") print(f"Matrix size: {size}x{size}") print(f"Total operations: {ops:,}") print(f"Wall clock time: {elapsed*1000:.3f} ms") print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}") # Verify first few elements for correctness print("\nValidating results (first 2x2 corner):") print(f"Result[0:2,0:2] = ") for i in range(min(2, len(result_matrix))): print(result_matrix[i][:2]) # Validate dimensions assert len(result_matrix) == size, "Result matrix has incorrect dimensions" assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions" print("\nMatrix operations at electron speed successful.") except Exception as e: print(f"Matrix operations test failed: {e}") return print("\n--- All AI Integration Tests Completed ---")