NMFL / test_ai_integration.py
Factor Studios
Upload 43 files
520d6cf verified
"""
Test AI integration with HTTP-based storage for Florence model inference.
All operations are performed through HTTP storage with direct tensor core access.
"""
import asyncio
from gpu_arch import Chip
from ai_http import AIAcceleratorHTTP
from virtual_vram import VirtualVRAM
from PIL import Image
import numpy as np
from http_storage import HTTPGPUStorage
import time
import os
import platform
import contextlib
import atexit
import logging
import torch
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Increase system file descriptor limit
def increase_file_limit():
try:
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
print(f"Increased file descriptor limit from {soft} to {hard}")
except Exception as e:
print(f"Warning: Could not increase file descriptor limit: {e}")
# HTTP connection manager with retry
@contextlib.contextmanager
def http_manager(max_retries=5, retry_delay=2):
storage = None
last_error = None
def try_connect():
nonlocal storage
if storage:
try:
storage.close()
except:
pass
storage = HTTPGPUStorage()
return storage.connect()
# Initial connection attempts
for attempt in range(max_retries):
try:
if try_connect():
logging.info("Successfully connected to HTTP GPU storage server")
break
else:
logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...")
time.sleep(retry_delay)
except Exception as e:
last_error = str(e)
logging.error(f"Connection attempt {attempt + 1} failed with error: {e}")
time.sleep(retry_delay)
if attempt == max_retries - 1:
error_msg = f"Could not connect to HTTP GPU storage server after {max_retries} attempts"
if last_error:
error_msg += f". Last error: {last_error}"
raise RuntimeError(error_msg)
try:
# Yield the storage connection
yield storage
except Exception as e:
logging.error(f"WebSocket operation failed: {e}")
# Try to reconnect once if operation fails
if try_connect():
logging.info("Successfully reconnected to GPU storage server")
yield storage
else:
raise
finally:
if storage:
try:
storage.close()
except:
pass
# Cleanup handler
def cleanup_resources():
import gc
gc.collect()
# Register cleanup handler
atexit.register(cleanup_resources)
def test_ai_integration():
print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---")
from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon
# Initialize components dictionary to store GPU resources
components = {
'chips': [],
'ai_accelerators': [],
'model_id': None,
'vram': None,
'storage': None,
'model_config': None,
'tensor_registry': {},
'initialized': False
}
# Initialize global tensor registry
global_tensor_registry = {
'model_tensors': {},
'runtime_tensors': {},
'placeholder_tensors': {},
'stats': {
'total_vram_used': 0,
'active_tensors': 0
}
}
# Increase file descriptor limit
increase_file_limit()
print(f"\nElectron-Speed Architecture Parameters:")
print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}")
print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}")
print(f"Electron drift velocity: {drift_velocity:.2e} m/s")
print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%")
# Test 1: HTTP-Based Model Loading with Florence
print("\nTest 1: Loading Florence Model with HTTP Storage")
try:
# Use HTTP connection manager for proper resource handling
with http_manager() as storage:
components['storage'] = storage # Save storage reference
# Initialize virtual GPU stack with HTTP storage
chip_for_loading = Chip(chip_id=0, vram_size_gb=32, storage=storage) # Allocate sufficient VRAM
components['chips'].append(chip_for_loading)
# Initialize VRAM with HTTP storage
vram = VirtualVRAM(storage=storage)
components['vram'] = vram
# Set up AI accelerator with HTTP support
ai_accelerator_for_loading = AIAcceleratorHTTP(chip=chip_for_loading)
ai_accelerator_for_loading.vram = vram
ai_accelerator_for_loading.initialize_tensor_cores()
components['ai_accelerators'].append(ai_accelerator_for_loading)
# Initialize model registry in HTTP storage
storage.store_model_state({
"initialized": True,
"max_vram": 32 * 1024 * 1024 * 1024, # 32GB in bytes
"active_models": {}
})
# Load Florence-2 model with HTTP storage
from transformers import AutoModelForCausalLM, AutoProcessor
model_id = "microsoft/florence-2-large"
print(f"Loading model {model_id} with HTTP storage...")
try:
# Load model and processor with proper error handling
model = AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
device_map="auto", # Allow automatic device mapping
torch_dtype="auto" # Use appropriate dtype
)
processor = AutoProcessor.from_pretrained(
model_id,
trust_remote_code=True
)
# Ensure WebSocket connection is active before proceeding
if not ai_accelerator_for_loading.storage.wait_for_connection():
raise RuntimeError("WebSocket connection lost - please retry")
# Calculate model size for proper VRAM allocation
model_size = sum(p.numel() * p.element_size() for p in model.parameters())
print(f"Model size: {model_size / (1024**3):.2f} GB")
# Store model in WebSocket storage with size information
# Load model directly using AIAccelerator's load_model method
ai_accelerator_for_loading.load_model(
model_id=model_id,
model=model,
processor=processor
)
print(f"Model '{model_id}' loaded successfully to WebSocket storage.")
assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading."
# Store model parameters in components dict
components['model_id'] = model_id
components['model_size'] = model_size
# Clear any CPU-side model data
model = None
processor = None
import gc
gc.collect()
except Exception as e:
print(f"Detailed model loading error: {str(e)}")
print("Falling back to zero-copy tensor mode...")
# Try loading with zero-copy tensor mode
try:
# Load model with HTTP transfer
ai_accelerator_for_loading.load_model(
model_id=model_id,
model=model,
processor=processor,
use_http=True
)
components['model_id'] = model_id
print("Successfully loaded Florence model with HTTP transfer")
except Exception as e2:
print(f"HTTP model loading failed: {str(e2)}")
raise
except Exception as e:
print(f"Model loading test failed: {e}")
return
# Test 2: HTTP-Based Multi-Chip Processing for Florence Inference
print("\nTest 2: HTTP-Based Parallel Processing across Multiple Chips")
num_chips = 4 # Using multiple chips for maximum parallelization
chips = []
ai_accelerators = []
try:
# Try to reuse existing HTTP connection with verification
shared_storage = None
max_connection_attempts = 3
for attempt in range(max_connection_attempts):
try:
if components['storage']:
shared_storage = components['storage']
logging.info("Successfully reused existing HTTP connection")
break
else:
logging.warning("Existing connection unavailable, creating new connection...")
with http_manager() as new_storage:
components['storage'] = new_storage
shared_storage = new_storage
logging.info("Successfully established new HTTP connection")
break
except Exception as e:
logging.error(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < max_connection_attempts - 1:
time.sleep(2)
continue
raise RuntimeError(f"Failed to establish HTTP connection after {max_connection_attempts} attempts")
# Initialize high-performance chip array with HTTP storage for Florence
total_sms = 0
total_cores = 0
# Create optical interconnect for chip communication
from gpu_arch import OpticalInterconnect
optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1)
# Reuse existing VRAM instance with shared storage
shared_vram = components['vram']
if shared_vram is None:
shared_vram = VirtualVRAM()
shared_vram.storage = shared_storage
for i in range(num_chips):
# Configure each chip with shared HTTP storage
chip = Chip(chip_id=i, vram_size_gb=32, storage=shared_storage) # 32GB VRAM per chip
chips.append(chip)
# Connect chips in a ring topology
if i > 0:
chip.connect_chip(chips[i-1], optical_link)
# Initialize AI accelerator with HTTP support
ai_accelerator = AIAcceleratorHTTP(chip=chip)
ai_accelerator.vram = shared_vram
ai_accelerator.storage = shared_storage
ai_accelerators.append(ai_accelerator)
# Initialize tensor cores for Florence model
ai_accelerator.initialize_tensor_cores()
print("\nTest 3: Florence Model Inference with HTTP Storage")
try:
# Load test image
image_path = "test_image.jpg" # Make sure this image exists
if os.path.exists(image_path):
image = Image.open(image_path)
# Prepare input for Florence model
inputs = processor(image, return_tensors="pt")
# Run inference using HTTP storage
outputs = ai_accelerator.run_inference(
model_id="microsoft/florence-2-large",
inputs=inputs,
use_http=True
)
# Process outputs
if outputs is not None:
predicted_caption = processor.decode(outputs[0], skip_special_tokens=True)
print(f"\nFlorence Model Caption: {predicted_caption}")
else:
print("Inference failed to produce output")
else:
print(f"Test image not found at {image_path}")
except Exception as e:
print(f"Inference test failed: {str(e)}")
finally:
# Cleanup
for ai_accelerator in ai_accelerators:
try:
ai_accelerator.cleanup()
except Exception as e:
print(f"Cleanup error: {str(e)}")
if shared_storage:
try:
shared_storage.close()
except Exception as e:
print(f"Storage cleanup error: {str(e)}")
# Clear any remaining GPU memory
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Track total processing units
total_sms += chip.num_sms
total_cores += chip.num_sms * chip.cores_per_sm
# Store chip configuration in WebSocket storage
shared_storage.store_state(f"chips/{i}/config", "state", {
"num_sms": chip.num_sms,
"cores_per_sm": chip.cores_per_sm,
"total_cores": chip.num_sms * chip.cores_per_sm,
"connected_chips": [c.chip_id for c in chip.connected_chips]
})
print(f"Chip {i} initialized with WebSocket storage and optical interconnect")
# Get all image files in sample_task folder
image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task')
image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
image_files.sort()
if not image_files:
print("No images found in sample_task folder.")
return
print(f"\nTotal Processing Units:")
print(f"- Streaming Multiprocessors: {total_sms:,}")
print(f"- CUDA Cores: {total_cores:,}")
print(f"- Electron-speed tensor cores: {total_cores * 8:,}")
# Test multi-chip parallel inference with WebSocket storage
for img_name in image_files[:1]: # Test with first image
img_path = os.path.join(image_folder, img_name)
raw_image = Image.open(img_path).convert('RGB')
print(f"\nRunning WebSocket-based inference for image: {img_name}")
# Store input image in WebSocket storage
image_array = np.array(raw_image)
# Use shared VRAM's storage for tensor operations
shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array)
# Free CPU memory immediately
raw_image = None
image_array_shape = image_array.shape
image_array = None
gc.collect()
# Synchronize all chips through WebSocket storage
start_time = time.time()
# Distribute workload across chips using WebSocket storage
batch_size = image_array_shape[0] // num_chips
results = []
# Ensure all connections are properly managed
for accelerator in ai_accelerators:
accelerator.vram.storage = shared_vram.storage
for i, accelerator in enumerate(ai_accelerators):
# Load image section from WebSocket storage
tensor_id = f"input_image/{img_name}"
# Run inference using WebSocket-stored weights
result = accelerator.inference(model_id, tensor_id)
# Store result in WebSocket storage
if result is not None:
storage.store_tensor(f"results/chip_{i}/{img_name}", result)
results.append(result)
elapsed = time.time() - start_time
# Calculate performance metrics
ops_per_inference = total_cores * 1024 # FMA ops per core
electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
theoretical_time = electron_transit_time * ops_per_inference / total_cores
# Combine results from all chips through WebSocket storage
final_result = None
for i in range(num_chips):
chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}")
if chip_result is not None:
if final_result is None:
final_result = chip_result
else:
final_result = np.concatenate([final_result, chip_result])
print(f"\nWebSocket-Based Performance Metrics:")
print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}")
print(f"- Wall clock time: {elapsed*1000:.3f} ms")
print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}")
print(f"- Number of chips used: {num_chips}")
assert final_result is not None, "WebSocket-based inference returned None"
assert isinstance(result, str), "Inference result is not a string"
print("Multi-chip inference test on all images (virtual GPU stack) successful.")
except Exception as e:
print(f"Multi-chip inference test failed: {e}")
return
return
# Test 3: Electron-Speed Matrix Operations
print("\nTest 3: Electron-Speed Matrix Operations")
try:
# Create large matrices to demonstrate parallel processing
size = 1024 # Large enough to show parallelization benefits
matrix_a = [[float(i+j) for j in range(size)] for i in range(size)]
matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)]
print("\nLoading matrices into virtual VRAM...")
matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A")
matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B")
print("\nPerforming electron-speed matrix multiplication...")
start_time = time.time()
result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C")
result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id)
elapsed = time.time() - start_time
# Calculate electron-speed performance metrics
ops = size * size * size * 2 # Total multiply-add operations
electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
theoretical_time = electron_transit_time * ops / (total_cores * 8) # 8 tensor cores per CUDA core
print("\nElectron-Speed Matrix Operation Metrics:")
print(f"Matrix size: {size}x{size}")
print(f"Total operations: {ops:,}")
print(f"Wall clock time: {elapsed*1000:.3f} ms")
print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}")
# Verify first few elements for correctness
print("\nValidating results (first 2x2 corner):")
print(f"Result[0:2,0:2] = ")
for i in range(min(2, len(result_matrix))):
print(result_matrix[i][:2])
# Validate dimensions
assert len(result_matrix) == size, "Result matrix has incorrect dimensions"
assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions"
print("\nMatrix operations at electron speed successful.")
except Exception as e:
print(f"Matrix operations test failed: {e}")
return
print("\n--- All AI Integration Tests Completed ---")