Spaces:
Sleeping
Sleeping
| """ | |
| Test AI integration with HTTP-based storage for Florence model inference. | |
| All operations are performed through HTTP storage with direct tensor core access. | |
| """ | |
| import asyncio | |
| from gpu_arch import Chip | |
| from ai_http import AIAcceleratorHTTP | |
| from virtual_vram import VirtualVRAM | |
| from PIL import Image | |
| import numpy as np | |
| from http_storage import HTTPGPUStorage | |
| import time | |
| import os | |
| import platform | |
| import contextlib | |
| import atexit | |
| import logging | |
| import torch | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # Increase system file descriptor limit | |
| def increase_file_limit(): | |
| try: | |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) | |
| resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) | |
| print(f"Increased file descriptor limit from {soft} to {hard}") | |
| except Exception as e: | |
| print(f"Warning: Could not increase file descriptor limit: {e}") | |
| # HTTP connection manager with retry | |
| def http_manager(max_retries=5, retry_delay=2): | |
| storage = None | |
| last_error = None | |
| def try_connect(): | |
| nonlocal storage | |
| if storage: | |
| try: | |
| storage.close() | |
| except: | |
| pass | |
| storage = HTTPGPUStorage() | |
| return storage.connect() | |
| # Initial connection attempts | |
| for attempt in range(max_retries): | |
| try: | |
| if try_connect(): | |
| logging.info("Successfully connected to HTTP GPU storage server") | |
| break | |
| else: | |
| logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| except Exception as e: | |
| last_error = str(e) | |
| logging.error(f"Connection attempt {attempt + 1} failed with error: {e}") | |
| time.sleep(retry_delay) | |
| if attempt == max_retries - 1: | |
| error_msg = f"Could not connect to HTTP GPU storage server after {max_retries} attempts" | |
| if last_error: | |
| error_msg += f". Last error: {last_error}" | |
| raise RuntimeError(error_msg) | |
| try: | |
| # Yield the storage connection | |
| yield storage | |
| except Exception as e: | |
| logging.error(f"WebSocket operation failed: {e}") | |
| # Try to reconnect once if operation fails | |
| if try_connect(): | |
| logging.info("Successfully reconnected to GPU storage server") | |
| yield storage | |
| else: | |
| raise | |
| finally: | |
| if storage: | |
| try: | |
| storage.close() | |
| except: | |
| pass | |
| # Cleanup handler | |
| def cleanup_resources(): | |
| import gc | |
| gc.collect() | |
| # Register cleanup handler | |
| atexit.register(cleanup_resources) | |
| def test_ai_integration(): | |
| print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---") | |
| from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon | |
| # Initialize components dictionary to store GPU resources | |
| components = { | |
| 'chips': [], | |
| 'ai_accelerators': [], | |
| 'model_id': None, | |
| 'vram': None, | |
| 'storage': None, | |
| 'model_config': None, | |
| 'tensor_registry': {}, | |
| 'initialized': False | |
| } | |
| # Initialize global tensor registry | |
| global_tensor_registry = { | |
| 'model_tensors': {}, | |
| 'runtime_tensors': {}, | |
| 'placeholder_tensors': {}, | |
| 'stats': { | |
| 'total_vram_used': 0, | |
| 'active_tensors': 0 | |
| } | |
| } | |
| # Increase file descriptor limit | |
| increase_file_limit() | |
| print(f"\nElectron-Speed Architecture Parameters:") | |
| print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}") | |
| print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}") | |
| print(f"Electron drift velocity: {drift_velocity:.2e} m/s") | |
| print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%") | |
| # Test 1: HTTP-Based Model Loading with Florence | |
| print("\nTest 1: Loading Florence Model with HTTP Storage") | |
| try: | |
| # Use HTTP connection manager for proper resource handling | |
| with http_manager() as storage: | |
| components['storage'] = storage # Save storage reference | |
| # Initialize virtual GPU stack with HTTP storage | |
| chip_for_loading = Chip(chip_id=0, vram_size_gb=32, storage=storage) # Allocate sufficient VRAM | |
| components['chips'].append(chip_for_loading) | |
| # Initialize VRAM with HTTP storage | |
| vram = VirtualVRAM(storage=storage) | |
| components['vram'] = vram | |
| # Set up AI accelerator with HTTP support | |
| ai_accelerator_for_loading = AIAcceleratorHTTP(chip=chip_for_loading) | |
| ai_accelerator_for_loading.vram = vram | |
| ai_accelerator_for_loading.initialize_tensor_cores() | |
| components['ai_accelerators'].append(ai_accelerator_for_loading) | |
| # Initialize model registry in HTTP storage | |
| storage.store_model_state({ | |
| "initialized": True, | |
| "max_vram": 32 * 1024 * 1024 * 1024, # 32GB in bytes | |
| "active_models": {} | |
| }) | |
| # Load Florence-2 model with HTTP storage | |
| from transformers import AutoModelForCausalLM, AutoProcessor | |
| model_id = "microsoft/florence-2-large" | |
| print(f"Loading model {model_id} with HTTP storage...") | |
| try: | |
| # Load model and processor with proper error handling | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| trust_remote_code=True, | |
| device_map="auto", # Allow automatic device mapping | |
| torch_dtype="auto" # Use appropriate dtype | |
| ) | |
| processor = AutoProcessor.from_pretrained( | |
| model_id, | |
| trust_remote_code=True | |
| ) | |
| # Ensure WebSocket connection is active before proceeding | |
| if not ai_accelerator_for_loading.storage.wait_for_connection(): | |
| raise RuntimeError("WebSocket connection lost - please retry") | |
| # Calculate model size for proper VRAM allocation | |
| model_size = sum(p.numel() * p.element_size() for p in model.parameters()) | |
| print(f"Model size: {model_size / (1024**3):.2f} GB") | |
| # Store model in WebSocket storage with size information | |
| # Load model directly using AIAccelerator's load_model method | |
| ai_accelerator_for_loading.load_model( | |
| model_id=model_id, | |
| model=model, | |
| processor=processor | |
| ) | |
| print(f"Model '{model_id}' loaded successfully to WebSocket storage.") | |
| assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading." | |
| # Store model parameters in components dict | |
| components['model_id'] = model_id | |
| components['model_size'] = model_size | |
| # Clear any CPU-side model data | |
| model = None | |
| processor = None | |
| import gc | |
| gc.collect() | |
| except Exception as e: | |
| print(f"Detailed model loading error: {str(e)}") | |
| print("Falling back to zero-copy tensor mode...") | |
| # Try loading with zero-copy tensor mode | |
| try: | |
| # Load model with HTTP transfer | |
| ai_accelerator_for_loading.load_model( | |
| model_id=model_id, | |
| model=model, | |
| processor=processor, | |
| use_http=True | |
| ) | |
| components['model_id'] = model_id | |
| print("Successfully loaded Florence model with HTTP transfer") | |
| except Exception as e2: | |
| print(f"HTTP model loading failed: {str(e2)}") | |
| raise | |
| except Exception as e: | |
| print(f"Model loading test failed: {e}") | |
| return | |
| # Test 2: HTTP-Based Multi-Chip Processing for Florence Inference | |
| print("\nTest 2: HTTP-Based Parallel Processing across Multiple Chips") | |
| num_chips = 4 # Using multiple chips for maximum parallelization | |
| chips = [] | |
| ai_accelerators = [] | |
| try: | |
| # Try to reuse existing HTTP connection with verification | |
| shared_storage = None | |
| max_connection_attempts = 3 | |
| for attempt in range(max_connection_attempts): | |
| try: | |
| if components['storage']: | |
| shared_storage = components['storage'] | |
| logging.info("Successfully reused existing HTTP connection") | |
| break | |
| else: | |
| logging.warning("Existing connection unavailable, creating new connection...") | |
| with http_manager() as new_storage: | |
| components['storage'] = new_storage | |
| shared_storage = new_storage | |
| logging.info("Successfully established new HTTP connection") | |
| break | |
| except Exception as e: | |
| logging.error(f"Connection attempt {attempt + 1} failed: {e}") | |
| if attempt < max_connection_attempts - 1: | |
| time.sleep(2) | |
| continue | |
| raise RuntimeError(f"Failed to establish HTTP connection after {max_connection_attempts} attempts") | |
| # Initialize high-performance chip array with HTTP storage for Florence | |
| total_sms = 0 | |
| total_cores = 0 | |
| # Create optical interconnect for chip communication | |
| from gpu_arch import OpticalInterconnect | |
| optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1) | |
| # Reuse existing VRAM instance with shared storage | |
| shared_vram = components['vram'] | |
| if shared_vram is None: | |
| shared_vram = VirtualVRAM() | |
| shared_vram.storage = shared_storage | |
| for i in range(num_chips): | |
| # Configure each chip with shared HTTP storage | |
| chip = Chip(chip_id=i, vram_size_gb=32, storage=shared_storage) # 32GB VRAM per chip | |
| chips.append(chip) | |
| # Connect chips in a ring topology | |
| if i > 0: | |
| chip.connect_chip(chips[i-1], optical_link) | |
| # Initialize AI accelerator with HTTP support | |
| ai_accelerator = AIAcceleratorHTTP(chip=chip) | |
| ai_accelerator.vram = shared_vram | |
| ai_accelerator.storage = shared_storage | |
| ai_accelerators.append(ai_accelerator) | |
| # Initialize tensor cores for Florence model | |
| ai_accelerator.initialize_tensor_cores() | |
| print("\nTest 3: Florence Model Inference with HTTP Storage") | |
| try: | |
| # Load test image | |
| image_path = "test_image.jpg" # Make sure this image exists | |
| if os.path.exists(image_path): | |
| image = Image.open(image_path) | |
| # Prepare input for Florence model | |
| inputs = processor(image, return_tensors="pt") | |
| # Run inference using HTTP storage | |
| outputs = ai_accelerator.run_inference( | |
| model_id="microsoft/florence-2-large", | |
| inputs=inputs, | |
| use_http=True | |
| ) | |
| # Process outputs | |
| if outputs is not None: | |
| predicted_caption = processor.decode(outputs[0], skip_special_tokens=True) | |
| print(f"\nFlorence Model Caption: {predicted_caption}") | |
| else: | |
| print("Inference failed to produce output") | |
| else: | |
| print(f"Test image not found at {image_path}") | |
| except Exception as e: | |
| print(f"Inference test failed: {str(e)}") | |
| finally: | |
| # Cleanup | |
| for ai_accelerator in ai_accelerators: | |
| try: | |
| ai_accelerator.cleanup() | |
| except Exception as e: | |
| print(f"Cleanup error: {str(e)}") | |
| if shared_storage: | |
| try: | |
| shared_storage.close() | |
| except Exception as e: | |
| print(f"Storage cleanup error: {str(e)}") | |
| # Clear any remaining GPU memory | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # Track total processing units | |
| total_sms += chip.num_sms | |
| total_cores += chip.num_sms * chip.cores_per_sm | |
| # Store chip configuration in WebSocket storage | |
| shared_storage.store_state(f"chips/{i}/config", "state", { | |
| "num_sms": chip.num_sms, | |
| "cores_per_sm": chip.cores_per_sm, | |
| "total_cores": chip.num_sms * chip.cores_per_sm, | |
| "connected_chips": [c.chip_id for c in chip.connected_chips] | |
| }) | |
| print(f"Chip {i} initialized with WebSocket storage and optical interconnect") | |
| # Get all image files in sample_task folder | |
| image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task') | |
| image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] | |
| image_files.sort() | |
| if not image_files: | |
| print("No images found in sample_task folder.") | |
| return | |
| print(f"\nTotal Processing Units:") | |
| print(f"- Streaming Multiprocessors: {total_sms:,}") | |
| print(f"- CUDA Cores: {total_cores:,}") | |
| print(f"- Electron-speed tensor cores: {total_cores * 8:,}") | |
| # Test multi-chip parallel inference with WebSocket storage | |
| for img_name in image_files[:1]: # Test with first image | |
| img_path = os.path.join(image_folder, img_name) | |
| raw_image = Image.open(img_path).convert('RGB') | |
| print(f"\nRunning WebSocket-based inference for image: {img_name}") | |
| # Store input image in WebSocket storage | |
| image_array = np.array(raw_image) | |
| # Use shared VRAM's storage for tensor operations | |
| shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array) | |
| # Free CPU memory immediately | |
| raw_image = None | |
| image_array_shape = image_array.shape | |
| image_array = None | |
| gc.collect() | |
| # Synchronize all chips through WebSocket storage | |
| start_time = time.time() | |
| # Distribute workload across chips using WebSocket storage | |
| batch_size = image_array_shape[0] // num_chips | |
| results = [] | |
| # Ensure all connections are properly managed | |
| for accelerator in ai_accelerators: | |
| accelerator.vram.storage = shared_vram.storage | |
| for i, accelerator in enumerate(ai_accelerators): | |
| # Load image section from WebSocket storage | |
| tensor_id = f"input_image/{img_name}" | |
| # Run inference using WebSocket-stored weights | |
| result = accelerator.inference(model_id, tensor_id) | |
| # Store result in WebSocket storage | |
| if result is not None: | |
| storage.store_tensor(f"results/chip_{i}/{img_name}", result) | |
| results.append(result) | |
| elapsed = time.time() - start_time | |
| # Calculate performance metrics | |
| ops_per_inference = total_cores * 1024 # FMA ops per core | |
| electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) | |
| theoretical_time = electron_transit_time * ops_per_inference / total_cores | |
| # Combine results from all chips through WebSocket storage | |
| final_result = None | |
| for i in range(num_chips): | |
| chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}") | |
| if chip_result is not None: | |
| if final_result is None: | |
| final_result = chip_result | |
| else: | |
| final_result = np.concatenate([final_result, chip_result]) | |
| print(f"\nWebSocket-Based Performance Metrics:") | |
| print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}") | |
| print(f"- Wall clock time: {elapsed*1000:.3f} ms") | |
| print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") | |
| print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}") | |
| print(f"- Number of chips used: {num_chips}") | |
| assert final_result is not None, "WebSocket-based inference returned None" | |
| assert isinstance(result, str), "Inference result is not a string" | |
| print("Multi-chip inference test on all images (virtual GPU stack) successful.") | |
| except Exception as e: | |
| print(f"Multi-chip inference test failed: {e}") | |
| return | |
| return | |
| # Test 3: Electron-Speed Matrix Operations | |
| print("\nTest 3: Electron-Speed Matrix Operations") | |
| try: | |
| # Create large matrices to demonstrate parallel processing | |
| size = 1024 # Large enough to show parallelization benefits | |
| matrix_a = [[float(i+j) for j in range(size)] for i in range(size)] | |
| matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)] | |
| print("\nLoading matrices into virtual VRAM...") | |
| matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A") | |
| matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B") | |
| print("\nPerforming electron-speed matrix multiplication...") | |
| start_time = time.time() | |
| result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C") | |
| result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id) | |
| elapsed = time.time() - start_time | |
| # Calculate electron-speed performance metrics | |
| ops = size * size * size * 2 # Total multiply-add operations | |
| electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC) | |
| theoretical_time = electron_transit_time * ops / (total_cores * 8) # 8 tensor cores per CUDA core | |
| print("\nElectron-Speed Matrix Operation Metrics:") | |
| print(f"Matrix size: {size}x{size}") | |
| print(f"Total operations: {ops:,}") | |
| print(f"Wall clock time: {elapsed*1000:.3f} ms") | |
| print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps") | |
| print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}") | |
| # Verify first few elements for correctness | |
| print("\nValidating results (first 2x2 corner):") | |
| print(f"Result[0:2,0:2] = ") | |
| for i in range(min(2, len(result_matrix))): | |
| print(result_matrix[i][:2]) | |
| # Validate dimensions | |
| assert len(result_matrix) == size, "Result matrix has incorrect dimensions" | |
| assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions" | |
| print("\nMatrix operations at electron speed successful.") | |
| except Exception as e: | |
| print(f"Matrix operations test failed: {e}") | |
| return | |
| print("\n--- All AI Integration Tests Completed ---") |