INTAI / test_ai_integration.py
Factor Studios
Upload test_ai_integration.py
a2e056e verified
"""
Test AI integration with WebSocket-based storage and zero CPU memory usage.
All operations are performed through WebSocket storage with direct tensor core access.
"""
import asyncio
from gpu_arch import Chip
from ai import AIAccelerator
from virtual_vram import VirtualVRAM
from PIL import Image
import numpy as np
from websocket_storage import WebSocketGPUStorage
import time
import os
import platform
import contextlib
import atexit
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Increase system file descriptor limit
def increase_file_limit():
try:
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
print(f"Increased file descriptor limit from {soft} to {hard}")
except Exception as e:
print(f"Warning: Could not increase file descriptor limit: {e}")
# WebSocket connection manager with retry
@contextlib.contextmanager
def websocket_manager(max_retries=5, retry_delay=2, timeout=30.0):
storage = None
last_error = None
def try_connect():
nonlocal storage
if storage:
try:
storage.close()
except:
pass
storage = WebSocketGPUStorage()
return storage.wait_for_connection(timeout=timeout)
# Initial connection attempts
for attempt in range(max_retries):
try:
if try_connect():
logging.info("Successfully connected to GPU storage server")
break
else:
logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...")
time.sleep(retry_delay)
except Exception as e:
last_error = str(e)
logging.error(f"Connection attempt {attempt + 1} failed with error: {e}")
time.sleep(retry_delay)
if attempt == max_retries - 1:
error_msg = f"Could not connect to GPU storage server after {max_retries} attempts"
if last_error:
error_msg += f". Last error: {last_error}"
raise RuntimeError(error_msg)
try:
# Yield the storage connection
yield storage
except Exception as e:
logging.error(f"WebSocket operation failed: {e}")
# Try to reconnect once if operation fails
if try_connect():
logging.info("Successfully reconnected to GPU storage server")
yield storage
else:
raise
finally:
if storage:
try:
storage.close()
except:
pass
# Cleanup handler
def cleanup_resources():
import gc
gc.collect()
# Register cleanup handler
atexit.register(cleanup_resources)
def test_ai_integration():
print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---")
from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon
# Initialize components dictionary to store GPU resources
components = {
'chips': [],
'ai_accelerators': [],
'model_id': None,
'vram': None,
'storage': None,
'model_config': None,
'tensor_registry': {},
'initialized': False
}
# Initialize global tensor registry
global_tensor_registry = {
'model_tensors': {},
'runtime_tensors': {},
'placeholder_tensors': {},
'stats': {
'total_vram_used': 0,
'active_tensors': 0
}
}
# Increase file descriptor limit
increase_file_limit()
print(f"\nElectron-Speed Architecture Parameters:")
print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}")
print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}")
print(f"Electron drift velocity: {drift_velocity:.2e} m/s")
print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%")
# Test 1: WebSocket-Based Model Loading
print("\nTest 1: Model Loading with WebSocket Storage")
try:
# Use WebSocket connection manager for proper resource handling
with websocket_manager() as storage:
components['storage'] = storage # Save storage reference
# Initialize virtual GPU stack with unlimited WebSocket storage and shared connection
chip_for_loading = Chip(chip_id=0, vram_size_gb=None, storage=storage) # Pass shared storage
components['chips'].append(chip_for_loading)
# Initialize VRAM with shared WebSocket storage
vram = VirtualVRAM(storage=storage) # Pass shared storage instance
components['vram'] = vram
# Set up AI accelerator - note it already has the shared storage
ai_accelerator_for_loading = chip_for_loading.ai_accelerator
ai_accelerator_for_loading.vram = vram # Use WebSocket-backed VRAM
ai_accelerator_for_loading.initialize_tensor_cores() # Ensure tensor cores are ready
components['ai_accelerators'].append(ai_accelerator_for_loading)
# Initialize model registry in WebSocket storage
storage.store_state("model_registry", "state", {
"initialized": True,
"max_vram": None, # Unlimited
"active_models": {}
})
# Load BLIP-2 Large model directly to WebSocket storage
from transformers import AutoModelForCausalLM, AutoProcessor
model_id = "microsoft/florence-2-large"
print(f"Loading model {model_id} directly to WebSocket storage...")
try:
# Load model and processor with proper error handling
model = AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
device_map="auto", # Allow automatic device mapping
torch_dtype="auto" # Use appropriate dtype
)
processor = AutoProcessor.from_pretrained(
model_id,
trust_remote_code=True
)
# Ensure WebSocket connection is active before proceeding
if not ai_accelerator_for_loading.storage.wait_for_connection():
raise RuntimeError("WebSocket connection lost - please retry")
# Calculate model size for proper VRAM allocation
model_size = sum(p.numel() * p.element_size() for p in model.parameters())
print(f"Model size: {model_size / (1024**3):.2f} GB")
# Store model in WebSocket storage with size information
# Load model directly using AIAccelerator's load_model method
ai_accelerator_for_loading.load_model(
model_id=model_id,
model=model,
processor=processor
)
print(f"Model '{model_id}' loaded successfully to WebSocket storage.")
assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading."
# Store model parameters in components dict
components['model_id'] = model_id
components['model_size'] = model_size
# Clear any CPU-side model data
model = None
processor = None
import gc
gc.collect()
except Exception as e:
print(f"Detailed model loading error: {str(e)}")
print("Falling back to zero-copy tensor mode...")
# Try loading with zero-copy tensor mode
try:
# Try zero-copy loading
ai_accelerator_for_loading.load_model(
model_id=model_id,
model=None,
processor=None
)
components['model_id'] = model_id
print("Successfully loaded model in zero-copy mode")
except Exception as e2:
print(f"Zero-copy fallback also failed: {str(e2)}")
raise
except Exception as e:
print(f"Model loading test failed: {e}")
return
# Test 2: WebSocket-Based Multi-Chip Processing
print("\nTest 2: WebSocket-Based Parallel Processing across Multiple Chips")
num_chips = 4 # Using multiple chips for maximum parallelization
chips = []
ai_accelerators = []
try:
# Try to reuse existing connection with verification
shared_storage = None
max_connection_attempts = 3
for attempt in range(max_connection_attempts):
try:
if (components['storage'] and
components['storage'].wait_for_connection(timeout=10.0)):
shared_storage = components['storage']
shared_storage.set_keep_alive(True) # Enable keep-alive
logging.info("Successfully reused existing WebSocket connection")
break
else:
logging.warning("Existing connection unavailable, creating new connection...")
with websocket_manager(timeout=30.0) as new_storage:
if new_storage and new_storage.wait_for_connection(timeout=10.0):
components['storage'] = new_storage
shared_storage = new_storage
shared_storage.set_keep_alive(True) # Enable keep-alive
logging.info("Successfully established new WebSocket connection")
break
except Exception as e:
logging.error(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < max_connection_attempts - 1:
time.sleep(2)
continue
raise RuntimeError(f"Failed to establish WebSocket connection after {max_connection_attempts} attempts")
# Initialize high-performance chip array with WebSocket storage
total_sms = 0
total_cores = 0
# Create optical interconnect for chip communication
from gpu_arch import OpticalInterconnect
optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1)
# Reuse existing VRAM instance with shared storage
shared_vram = components['vram']
if shared_vram is None:
shared_vram = VirtualVRAM()
shared_vram.storage = shared_storage
for i in range(num_chips):
# Configure each chip with shared WebSocket storage
chip = Chip(chip_id=i, vram_size_gb=None, storage=shared_storage)
chips.append(chip)
# Connect chips in a ring topology
if i > 0:
chip.connect_chip(chips[i-1], optical_link)
# Initialize AI accelerator with shared resources
ai_accelerator = chip.ai_accelerator
ai_accelerator.vram = shared_vram
ai_accelerator.storage = shared_storage # Ensure storage is set
ai_accelerators.append(ai_accelerator)
# Verify and potentially repair WebSocket connection
max_retry = 3
for retry in range(max_retry):
try:
if not shared_storage.wait_for_connection(timeout=5.0):
logging.warning(f"Connection check failed for chip {i}, attempt {retry + 1}")
shared_storage.reconnect() # Attempt to reconnect
time.sleep(1)
continue
# Load model weights from WebSocket storage (no CPU transfer)
ai_accelerator.load_model(model_id, None, None) # Model already in WebSocket storage
logging.info(f"Successfully initialized chip {i} with model")
break
except Exception as e:
if retry < max_retry - 1:
logging.warning(f"Error initializing chip {i}, attempt {retry + 1}: {e}")
time.sleep(1)
continue
else:
logging.error(f"Failed to initialize chip {i} after {max_retry} attempts: {e}")
raise
# Track total processing units
total_sms += chip.num_sms
total_cores += chip.num_sms * chip.cores_per_sm
# Store chip configuration in WebSocket storage
shared_storage.store_state(f"chips/{i}/config", "state", {
"num_sms": chip.num_sms,
"cores_per_sm": chip.cores_per_sm,
"total_cores": chip.num_sms * chip.cores_per_sm,
"connected_chips": [c.chip_id for c in chip.connected_chips]
})
print(f"Chip {i} initialized with WebSocket storage and optical interconnect")
# Get all image files in sample_task folder
image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task')
image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
image_files.sort()
if not image_files:
print("No images found in sample_task folder.")
return
print(f"\nTotal Processing Units:")
print(f"- Streaming Multiprocessors: {total_sms:,}")
print(f"- CUDA Cores: {total_cores:,}")
print(f"- Electron-speed tensor cores: {total_cores * 8:,}")
# Test multi-chip parallel inference with WebSocket storage
for img_name in image_files[:1]: # Test with first image
img_path = os.path.join(image_folder, img_name)
raw_image = Image.open(img_path).convert('RGB')
print(f"\nRunning WebSocket-based inference for image: {img_name}")
# Store input image in WebSocket storage
image_array = np.array(raw_image)
# Use shared VRAM's storage for tensor operations
shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array)
# Free CPU memory immediately
raw_image = None
image_array_shape = image_array.shape
image_array = None
gc.collect()
# Synchronize all chips through WebSocket storage
start_time = time.time()
# Distribute workload across chips using WebSocket storage
batch_size = image_array_shape[0] // num_chips
results = []
# Ensure all connections are properly managed
for accelerator in ai_accelerators:
accelerator.vram.storage = shared_vram.storage
for i, accelerator in enumerate(ai_accelerators):
# Load image section from WebSocket storage
tensor_id = f"input_image/{img_name}"
# Run inference using WebSocket-stored weights
result = accelerator.inference(model_id, tensor_id)
# Store result in WebSocket storage
if result is not None:
storage.store_tensor(f"results/chip_{i}/{img_name}", result)
results.append(result)
elapsed = time.time() - start_time
# Calculate performance metrics
ops_per_inference = total_cores * 1024 # FMA ops per core
electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
theoretical_time = electron_transit_time * ops_per_inference / total_cores
# Combine results from all chips through WebSocket storage
final_result = None
for i in range(num_chips):
chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}")
if chip_result is not None:
if final_result is None:
final_result = chip_result
else:
final_result = np.concatenate([final_result, chip_result])
print(f"\nWebSocket-Based Performance Metrics:")
print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}")
print(f"- Wall clock time: {elapsed*1000:.3f} ms")
print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}")
print(f"- Number of chips used: {num_chips}")
assert final_result is not None, "WebSocket-based inference returned None"
assert isinstance(result, str), "Inference result is not a string"
print("Multi-chip inference test on all images (virtual GPU stack) successful.")
except Exception as e:
print(f"Multi-chip inference test failed: {e}")
return
return
# Test 3: Electron-Speed Matrix Operations
print("\nTest 3: Electron-Speed Matrix Operations")
try:
# Create large matrices to demonstrate parallel processing
size = 1024 # Large enough to show parallelization benefits
matrix_a = [[float(i+j) for j in range(size)] for i in range(size)]
matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)]
print("\nLoading matrices into virtual VRAM...")
matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A")
matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B")
print("\nPerforming electron-speed matrix multiplication...")
start_time = time.time()
result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C")
result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id)
elapsed = time.time() - start_time
# Calculate electron-speed performance metrics
ops = size * size * size * 2 # Total multiply-add operations
electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
theoretical_time = electron_transit_time * ops / (total_cores * 8) # 8 tensor cores per CUDA core
print("\nElectron-Speed Matrix Operation Metrics:")
print(f"Matrix size: {size}x{size}")
print(f"Total operations: {ops:,}")
print(f"Wall clock time: {elapsed*1000:.3f} ms")
print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}")
# Verify first few elements for correctness
print("\nValidating results (first 2x2 corner):")
print(f"Result[0:2,0:2] = ")
for i in range(min(2, len(result_matrix))):
print(result_matrix[i][:2])
# Validate dimensions
assert len(result_matrix) == size, "Result matrix has incorrect dimensions"
assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions"
print("\nMatrix operations at electron speed successful.")
except Exception as e:
print(f"Matrix operations test failed: {e}")
return
print("\n--- All AI Integration Tests Completed ---")