|
|
"""
|
|
|
Test AI integration with HTTP-based storage for Florence model inference.
|
|
|
All operations are performed through HTTP storage with direct tensor core access.
|
|
|
"""
|
|
|
import asyncio
|
|
|
from gpu_arch import Chip
|
|
|
from ai_http import AIAcceleratorHTTP
|
|
|
from virtual_vram import VirtualVRAM
|
|
|
from PIL import Image
|
|
|
import numpy as np
|
|
|
from http_storage import HTTPGPUStorage
|
|
|
import time
|
|
|
import os
|
|
|
import platform
|
|
|
import contextlib
|
|
|
import atexit
|
|
|
import logging
|
|
|
import torch
|
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN")
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
|
)
|
|
|
|
|
|
|
|
|
def increase_file_limit():
|
|
|
try:
|
|
|
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
|
|
|
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
|
|
|
print(f"Increased file descriptor limit from {soft} to {hard}")
|
|
|
except Exception as e:
|
|
|
print(f"Warning: Could not increase file descriptor limit: {e}")
|
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
def http_manager(max_retries=5, retry_delay=2):
|
|
|
storage = None
|
|
|
last_error = None
|
|
|
|
|
|
def try_connect():
|
|
|
nonlocal storage
|
|
|
if storage:
|
|
|
try:
|
|
|
storage.close()
|
|
|
except:
|
|
|
pass
|
|
|
storage = HTTPGPUStorage()
|
|
|
return storage.connect()
|
|
|
|
|
|
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
if try_connect():
|
|
|
logging.info("Successfully connected to HTTP GPU storage server")
|
|
|
break
|
|
|
else:
|
|
|
logging.warning(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...")
|
|
|
time.sleep(retry_delay)
|
|
|
except Exception as e:
|
|
|
last_error = str(e)
|
|
|
logging.error(f"Connection attempt {attempt + 1} failed with error: {e}")
|
|
|
time.sleep(retry_delay)
|
|
|
|
|
|
if attempt == max_retries - 1:
|
|
|
error_msg = f"Could not connect to HTTP GPU storage server after {max_retries} attempts"
|
|
|
if last_error:
|
|
|
error_msg += f". Last error: {last_error}"
|
|
|
raise RuntimeError(error_msg)
|
|
|
|
|
|
try:
|
|
|
|
|
|
yield storage
|
|
|
except Exception as e:
|
|
|
logging.error(f"WebSocket operation failed: {e}")
|
|
|
|
|
|
if try_connect():
|
|
|
logging.info("Successfully reconnected to GPU storage server")
|
|
|
yield storage
|
|
|
else:
|
|
|
raise
|
|
|
finally:
|
|
|
if storage:
|
|
|
try:
|
|
|
storage.close()
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def cleanup_resources():
|
|
|
import gc
|
|
|
gc.collect()
|
|
|
|
|
|
|
|
|
atexit.register(cleanup_resources)
|
|
|
|
|
|
def test_ai_integration():
|
|
|
print("\n--- Testing WebSocket-Based AI Integration with Zero CPU Usage ---")
|
|
|
from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity, speed_of_light_silicon
|
|
|
|
|
|
|
|
|
components = {
|
|
|
'chips': [],
|
|
|
'ai_accelerators': [],
|
|
|
'model_id': None,
|
|
|
'vram': None,
|
|
|
'storage': None,
|
|
|
'model_config': None,
|
|
|
'tensor_registry': {},
|
|
|
'initialized': False
|
|
|
}
|
|
|
|
|
|
|
|
|
global_tensor_registry = {
|
|
|
'model_tensors': {},
|
|
|
'runtime_tensors': {},
|
|
|
'placeholder_tensors': {},
|
|
|
'stats': {
|
|
|
'total_vram_used': 0,
|
|
|
'active_tensors': 0
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
increase_file_limit()
|
|
|
|
|
|
print(f"\nElectron-Speed Architecture Parameters:")
|
|
|
print(f"Target switches/sec: {TARGET_SWITCHES_PER_SEC:.2e}")
|
|
|
print(f"Transistors on chip: {TRANSISTORS_ON_CHIP:,}")
|
|
|
print(f"Electron drift velocity: {drift_velocity:.2e} m/s")
|
|
|
print(f"Percentage of light speed: {(drift_velocity/speed_of_light_silicon)*100:.2f}%")
|
|
|
|
|
|
|
|
|
print("\nTest 1: Loading Florence Model with HTTP Storage")
|
|
|
try:
|
|
|
|
|
|
with http_manager() as storage:
|
|
|
components['storage'] = storage
|
|
|
|
|
|
|
|
|
chip_for_loading = Chip(chip_id=0, vram_size_gb=32, storage=storage)
|
|
|
components['chips'].append(chip_for_loading)
|
|
|
|
|
|
|
|
|
vram = VirtualVRAM(storage=storage)
|
|
|
components['vram'] = vram
|
|
|
|
|
|
|
|
|
ai_accelerator_for_loading = AIAcceleratorHTTP(chip=chip_for_loading)
|
|
|
ai_accelerator_for_loading.vram = vram
|
|
|
ai_accelerator_for_loading.initialize_tensor_cores()
|
|
|
components['ai_accelerators'].append(ai_accelerator_for_loading)
|
|
|
|
|
|
|
|
|
storage.store_model_state({
|
|
|
"initialized": True,
|
|
|
"max_vram": 32 * 1024 * 1024 * 1024,
|
|
|
"active_models": {}
|
|
|
})
|
|
|
|
|
|
|
|
|
from transformers import AutoModelForCausalLM, AutoProcessor
|
|
|
model_id = "microsoft/florence-2-large"
|
|
|
print(f"Loading model {model_id} with HTTP storage...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
|
model_id,
|
|
|
trust_remote_code=True,
|
|
|
device_map="auto",
|
|
|
torch_dtype="auto"
|
|
|
)
|
|
|
|
|
|
processor = AutoProcessor.from_pretrained(
|
|
|
model_id,
|
|
|
trust_remote_code=True
|
|
|
)
|
|
|
|
|
|
|
|
|
if not ai_accelerator_for_loading.storage.wait_for_connection():
|
|
|
raise RuntimeError("WebSocket connection lost - please retry")
|
|
|
|
|
|
|
|
|
model_size = sum(p.numel() * p.element_size() for p in model.parameters())
|
|
|
print(f"Model size: {model_size / (1024**3):.2f} GB")
|
|
|
|
|
|
|
|
|
|
|
|
ai_accelerator_for_loading.load_model(
|
|
|
model_id=model_id,
|
|
|
model=model,
|
|
|
processor=processor
|
|
|
)
|
|
|
|
|
|
print(f"Model '{model_id}' loaded successfully to WebSocket storage.")
|
|
|
assert ai_accelerator_for_loading.has_model(model_id), "Model not found in WebSocket storage after loading."
|
|
|
|
|
|
|
|
|
components['model_id'] = model_id
|
|
|
components['model_size'] = model_size
|
|
|
|
|
|
|
|
|
model = None
|
|
|
processor = None
|
|
|
import gc
|
|
|
gc.collect()
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Detailed model loading error: {str(e)}")
|
|
|
print("Falling back to zero-copy tensor mode...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
ai_accelerator_for_loading.load_model(
|
|
|
model_id=model_id,
|
|
|
model=model,
|
|
|
processor=processor,
|
|
|
use_http=True
|
|
|
)
|
|
|
components['model_id'] = model_id
|
|
|
print("Successfully loaded Florence model with HTTP transfer")
|
|
|
except Exception as e2:
|
|
|
print(f"HTTP model loading failed: {str(e2)}")
|
|
|
raise
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Model loading test failed: {e}")
|
|
|
return
|
|
|
|
|
|
print("\nTest 2: HTTP-Based Parallel Processing across Multiple Chips")
|
|
|
num_chips = 4
|
|
|
chips = []
|
|
|
ai_accelerators = []
|
|
|
|
|
|
try:
|
|
|
|
|
|
shared_storage = None
|
|
|
max_connection_attempts = 3
|
|
|
|
|
|
for attempt in range(max_connection_attempts):
|
|
|
try:
|
|
|
if components['storage']:
|
|
|
shared_storage = components['storage']
|
|
|
logging.info("Successfully reused existing HTTP connection")
|
|
|
break
|
|
|
else:
|
|
|
logging.warning("Existing connection unavailable, creating new connection...")
|
|
|
with http_manager() as new_storage:
|
|
|
components['storage'] = new_storage
|
|
|
shared_storage = new_storage
|
|
|
logging.info("Successfully established new HTTP connection")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logging.error(f"Connection attempt {attempt + 1} failed: {e}")
|
|
|
if attempt < max_connection_attempts - 1:
|
|
|
time.sleep(2)
|
|
|
continue
|
|
|
raise RuntimeError(f"Failed to establish HTTP connection after {max_connection_attempts} attempts")
|
|
|
|
|
|
|
|
|
total_sms = 0
|
|
|
total_cores = 0
|
|
|
|
|
|
|
|
|
from gpu_arch import OpticalInterconnect
|
|
|
optical_link = OpticalInterconnect(bandwidth_tbps=800, latency_ns=1)
|
|
|
|
|
|
|
|
|
shared_vram = components['vram']
|
|
|
if shared_vram is None:
|
|
|
shared_vram = VirtualVRAM()
|
|
|
shared_vram.storage = shared_storage
|
|
|
|
|
|
for i in range(num_chips):
|
|
|
|
|
|
chip = Chip(chip_id=i, vram_size_gb=32, storage=shared_storage)
|
|
|
chips.append(chip)
|
|
|
|
|
|
|
|
|
if i > 0:
|
|
|
chip.connect_chip(chips[i-1], optical_link)
|
|
|
|
|
|
|
|
|
ai_accelerator = AIAcceleratorHTTP(chip=chip)
|
|
|
ai_accelerator.vram = shared_vram
|
|
|
ai_accelerator.storage = shared_storage
|
|
|
ai_accelerators.append(ai_accelerator)
|
|
|
|
|
|
|
|
|
ai_accelerator.initialize_tensor_cores()
|
|
|
|
|
|
print("\nTest 3: Florence Model Inference with HTTP Storage")
|
|
|
try:
|
|
|
|
|
|
image_path = "test_image.jpg"
|
|
|
if os.path.exists(image_path):
|
|
|
image = Image.open(image_path)
|
|
|
|
|
|
|
|
|
inputs = processor(image, return_tensors="pt")
|
|
|
|
|
|
|
|
|
outputs = ai_accelerator.run_inference(
|
|
|
model_id="microsoft/florence-2-large",
|
|
|
inputs=inputs,
|
|
|
use_http=True
|
|
|
)
|
|
|
|
|
|
|
|
|
if outputs is not None:
|
|
|
predicted_caption = processor.decode(outputs[0], skip_special_tokens=True)
|
|
|
print(f"\nFlorence Model Caption: {predicted_caption}")
|
|
|
else:
|
|
|
print("Inference failed to produce output")
|
|
|
|
|
|
else:
|
|
|
print(f"Test image not found at {image_path}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Inference test failed: {str(e)}")
|
|
|
finally:
|
|
|
|
|
|
for ai_accelerator in ai_accelerators:
|
|
|
try:
|
|
|
ai_accelerator.cleanup()
|
|
|
except Exception as e:
|
|
|
print(f"Cleanup error: {str(e)}")
|
|
|
|
|
|
if shared_storage:
|
|
|
try:
|
|
|
shared_storage.close()
|
|
|
except Exception as e:
|
|
|
print(f"Storage cleanup error: {str(e)}")
|
|
|
|
|
|
|
|
|
if torch.cuda.is_available():
|
|
|
torch.cuda.empty_cache()
|
|
|
|
|
|
|
|
|
|
|
|
total_sms += chip.num_sms
|
|
|
total_cores += chip.num_sms * chip.cores_per_sm
|
|
|
|
|
|
|
|
|
shared_storage.store_state(f"chips/{i}/config", "state", {
|
|
|
"num_sms": chip.num_sms,
|
|
|
"cores_per_sm": chip.cores_per_sm,
|
|
|
"total_cores": chip.num_sms * chip.cores_per_sm,
|
|
|
"connected_chips": [c.chip_id for c in chip.connected_chips]
|
|
|
})
|
|
|
|
|
|
print(f"Chip {i} initialized with WebSocket storage and optical interconnect")
|
|
|
|
|
|
|
|
|
image_folder = os.path.join(os.path.dirname(__file__), '..', 'sample_task')
|
|
|
image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
|
|
|
image_files.sort()
|
|
|
if not image_files:
|
|
|
print("No images found in sample_task folder.")
|
|
|
return
|
|
|
|
|
|
print(f"\nTotal Processing Units:")
|
|
|
print(f"- Streaming Multiprocessors: {total_sms:,}")
|
|
|
print(f"- CUDA Cores: {total_cores:,}")
|
|
|
print(f"- Electron-speed tensor cores: {total_cores * 8:,}")
|
|
|
|
|
|
|
|
|
for img_name in image_files[:1]:
|
|
|
img_path = os.path.join(image_folder, img_name)
|
|
|
raw_image = Image.open(img_path).convert('RGB')
|
|
|
print(f"\nRunning WebSocket-based inference for image: {img_name}")
|
|
|
|
|
|
|
|
|
image_array = np.array(raw_image)
|
|
|
|
|
|
|
|
|
shared_vram.storage.store_tensor(f"input_image/{img_name}", image_array)
|
|
|
|
|
|
|
|
|
raw_image = None
|
|
|
image_array_shape = image_array.shape
|
|
|
image_array = None
|
|
|
gc.collect()
|
|
|
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
|
batch_size = image_array_shape[0] // num_chips
|
|
|
results = []
|
|
|
|
|
|
|
|
|
for accelerator in ai_accelerators:
|
|
|
accelerator.vram.storage = shared_vram.storage
|
|
|
|
|
|
for i, accelerator in enumerate(ai_accelerators):
|
|
|
|
|
|
tensor_id = f"input_image/{img_name}"
|
|
|
|
|
|
|
|
|
result = accelerator.inference(model_id, tensor_id)
|
|
|
|
|
|
|
|
|
if result is not None:
|
|
|
storage.store_tensor(f"results/chip_{i}/{img_name}", result)
|
|
|
results.append(result)
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
|
|
|
|
|
ops_per_inference = total_cores * 1024
|
|
|
electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
|
|
|
theoretical_time = electron_transit_time * ops_per_inference / total_cores
|
|
|
|
|
|
|
|
|
final_result = None
|
|
|
for i in range(num_chips):
|
|
|
chip_result = storage.load_tensor(f"results/chip_{i}/{img_name}")
|
|
|
if chip_result is not None:
|
|
|
if final_result is None:
|
|
|
final_result = chip_result
|
|
|
else:
|
|
|
final_result = np.concatenate([final_result, chip_result])
|
|
|
|
|
|
print(f"\nWebSocket-Based Performance Metrics:")
|
|
|
print(f"- Final result shape: {final_result.shape if final_result is not None else 'None'}")
|
|
|
print(f"- Wall clock time: {elapsed*1000:.3f} ms")
|
|
|
print(f"- Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
|
|
|
print(f"- Effective TFLOPS: {(ops_per_inference / elapsed) / 1e12:.2f}")
|
|
|
print(f"- Number of chips used: {num_chips}")
|
|
|
|
|
|
assert final_result is not None, "WebSocket-based inference returned None"
|
|
|
assert isinstance(result, str), "Inference result is not a string"
|
|
|
print("Multi-chip inference test on all images (virtual GPU stack) successful.")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Multi-chip inference test failed: {e}")
|
|
|
return
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
print("\nTest 3: Electron-Speed Matrix Operations")
|
|
|
try:
|
|
|
|
|
|
size = 1024
|
|
|
matrix_a = [[float(i+j) for j in range(size)] for i in range(size)]
|
|
|
matrix_b = [[float(i*j+1) for j in range(size)] for i in range(size)]
|
|
|
|
|
|
print("\nLoading matrices into virtual VRAM...")
|
|
|
matrix_a_id = ai_accelerator_for_loading.load_matrix(matrix_a, "matrix_A")
|
|
|
matrix_b_id = ai_accelerator_for_loading.load_matrix(matrix_b, "matrix_B")
|
|
|
|
|
|
print("\nPerforming electron-speed matrix multiplication...")
|
|
|
start_time = time.time()
|
|
|
result_matrix_id = ai_accelerator_for_loading.matrix_multiply(matrix_a_id, matrix_b_id, "result_C")
|
|
|
result_matrix = ai_accelerator_for_loading.get_matrix(result_matrix_id)
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
|
|
|
|
|
ops = size * size * size * 2
|
|
|
electron_transit_time = 1 / (drift_velocity * TARGET_SWITCHES_PER_SEC)
|
|
|
theoretical_time = electron_transit_time * ops / (total_cores * 8)
|
|
|
|
|
|
print("\nElectron-Speed Matrix Operation Metrics:")
|
|
|
print(f"Matrix size: {size}x{size}")
|
|
|
print(f"Total operations: {ops:,}")
|
|
|
print(f"Wall clock time: {elapsed*1000:.3f} ms")
|
|
|
print(f"Theoretical electron transit time: {theoretical_time*1e12:.3f} ps")
|
|
|
print(f"Effective TFLOPS: {(ops / elapsed) / 1e12:.2f}")
|
|
|
|
|
|
|
|
|
print("\nValidating results (first 2x2 corner):")
|
|
|
print(f"Result[0:2,0:2] = ")
|
|
|
for i in range(min(2, len(result_matrix))):
|
|
|
print(result_matrix[i][:2])
|
|
|
|
|
|
|
|
|
assert len(result_matrix) == size, "Result matrix has incorrect dimensions"
|
|
|
assert len(result_matrix[0]) == size, "Result matrix has incorrect dimensions"
|
|
|
print("\nMatrix operations at electron speed successful.")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Matrix operations test failed: {e}")
|
|
|
return
|
|
|
|
|
|
print("\n--- All AI Integration Tests Completed ---") |