FServe / ai_http.py
Factor Studios
Upload 37 files
e9bc512 verified
import numpy as np
import time
from typing import Dict, Any, Optional, Tuple, Union, List
from enum import Enum
from tensor_core import TensorCoreArray
class VectorOperation(Enum):
"""Enumeration of supported vector operations."""
ADD = "add"
SUBTRACT = "subtract"
MULTIPLY = "multiply"
DIVIDE = "divide"
DOT_PRODUCT = "dot_product"
CROSS_PRODUCT = "cross_product"
NORMALIZE = "normalize"
MAGNITUDE = "magnitude"
class AIAccelerator:
"""
AI Accelerator that simulates GPU-based AI computations using HTTP storage.
This class leverages NumPy's optimized operations to simulate the parallel
processing capabilities of the vGPU for AI workloads.
"""
def __init__(self, vram=None, num_sms: int = 800, cores_per_sm: int = 222, storage=None):
"""Initialize AI Accelerator with electron-speed awareness and shared HTTP storage."""
from electron_speed import TARGET_SWITCHES_PER_SEC, TRANSISTORS_ON_CHIP, drift_velocity
self.storage = storage # Use the shared storage instance
if self.storage is None:
from http_storage import HTTPGPUStorage
self.storage = HTTPGPUStorage() # Create HTTP storage instead of WebSocket
if not self.storage.wait_for_connection():
raise RuntimeError("Could not connect to GPU storage server")
self.vram = vram
self.num_sms = num_sms
self.cores_per_sm = cores_per_sm
self.total_cores = num_sms * cores_per_sm
# Configure for maximum parallel processing at electron speed
total_tensor_cores = num_sms * cores_per_sm # Use ALL cores for tensor operations
self.tensor_core_array = TensorCoreArray(
num_tensor_cores=total_tensor_cores,
bits=32,
bandwidth_tbps=drift_velocity / 1e-12 # Bandwidth scaled to electron drift speed
)
self.tensor_cores_initialized = False
# Initialize model, tensor, and tokenizer tracking
self.model_registry: Dict[str, Dict[str, Any]] = {} # Track loaded models
self.tensor_registry: Dict[str, Dict[str, Any]] = {} # Track tensor metadata
self.tokenizer_registry: Dict[str, Any] = {} # Track tokenizers
self.resource_monitor = {
'vram_used': 0,
'active_tensors': 0,
'loaded_models': set()
}
# AI operation statistics
self.operations_performed = 0
self.total_compute_time = 0.0
self.flops_performed = 0
# HTTP-based memory management
self.model_registry = {} # Track loaded models
self.matrix_registry = {} # Track loaded matrices
self.matrix_counter = 0
self.activation_cache: Dict[str, str] = {} # Cache activation outputs
self.weight_cache: Dict[str, Any] = {} # Cache preprocessed weights
# Model registries
self.model_registry: Dict[str, Any] = {}
self.tokenizer_registry: Dict[str, Any] = {}
self.model_configs: Dict[str, Any] = {} # Store model architectures
self.model_loaded = False
# Batch processing configuration
self.max_batch_size = 64
self.min_batch_size = 4
self.dynamic_batching = True # Enable automatic batch size adjustment
def _serialize_model_config(self, config: Any) -> dict:
"""Convert model config to a serializable format."""
# Handle None case first
if config is None:
return None
# Handle Florence2LanguageConfig specifically
if config.__class__.__name__ == "Florence2LanguageConfig":
try:
return {
"type": "Florence2LanguageConfig",
"model_type": getattr(config, "model_type", ""),
"architectures": getattr(config, "architectures", []),
"hidden_size": getattr(config, "hidden_size", 0),
"num_attention_heads": getattr(config, "num_attention_heads", 0),
"num_hidden_layers": getattr(config, "num_hidden_layers", 0),
"intermediate_size": getattr(config, "intermediate_size", 0),
"max_position_embeddings": getattr(config, "max_position_embeddings", 0),
"layer_norm_eps": getattr(config, "layer_norm_eps", 1e-12),
"vocab_size": getattr(config, "vocab_size", 0)
}
except Exception as e:
print(f"Warning: Error serializing Florence2LanguageConfig: {e}")
return {"type": "Florence2LanguageConfig", "error": str(e)}
# Handle standard types
if isinstance(config, (int, float, str, bool)):
return config
# Handle lists and tuples
if isinstance(config, (list, tuple)):
return [self._serialize_model_config(item) for item in config]
# Handle dictionaries
if isinstance(config, dict):
return {k: self._serialize_model_config(v) for k, v in config.items()}
# Handle objects with __dict__
if hasattr(config, '__dict__'):
config_dict = {}
for key, value in config.__dict__.items():
try:
# Skip private attributes
if key.startswith('_'):
continue
config_dict[key] = self._serialize_model_config(value)
except Exception as e:
print(f"Warning: Error serializing attribute {key}: {e}")
config_dict[key] = str(value)
return config_dict
# Fallback: convert to string representation
try:
return str(config)
except Exception as e:
return f"<Unserializable object of type {type(config).__name__}: {str(e)}>"
def store_model_state(self, model_name: str, model_info: Dict[str, Any]) -> bool:
"""Store model state in HTTP storage with proper serialization."""
try:
# Convert any non-serializable parts of model_info
serializable_info = self._serialize_model_config(model_info)
# Store in model registry
self.model_registry[model_name] = serializable_info
# Save to storage
if self.storage:
# Store model info
info_success = self.storage.store_state(
"models",
f"{model_name}/info",
serializable_info
)
# Store model state
state_success = self.storage.store_state(
"models",
f"{model_name}/state",
{"loaded": True, "timestamp": time.time()}
)
if info_success and state_success:
self.resource_monitor['loaded_models'].add(model_name)
return True
return False
except Exception as e:
print(f"Error storing model state: {str(e)}")
return False
def initialize_tensor_cores(self):
"""Initialize tensor cores and verify they're ready for computation"""
if self.tensor_cores_initialized:
return True
try:
# Verify tensor core array is properly initialized
if not hasattr(self, 'tensor_core_array') or self.tensor_core_array is None:
raise RuntimeError("Tensor core array not properly initialized")
# Initialize tensor cores if needed
if hasattr(self.tensor_core_array, 'initialize'):
self.tensor_core_array.initialize()
# Verify VRAM access
if self.vram is None:
raise RuntimeError("VRAM not properly configured")
# Test tensor core functionality with a small computation
test_input = [[1.0, 2.0], [3.0, 4.0]]
# Convert input to numpy array if needed
if isinstance(test_input, list):
test_input = np.array(test_input, dtype=np.float32)
test_result = self.tensor_core_array.matmul(test_input, test_input)
if test_result is None or not isinstance(test_result, (np.ndarray, list)) or len(test_result) == 0:
raise RuntimeError("Tensor core test computation failed")
self.tensor_cores_initialized = True
return True
except Exception as e:
print(f"Failed to initialize tensor cores: {str(e)}")
self.tensor_cores_initialized = False
return False
def set_vram(self, vram):
"""Set the VRAM reference."""
self.vram = vram
def allocate_matrix(self, shape: Tuple[int, ...], dtype=np.float32,
name: Optional[str] = None) -> str:
"""Allocate a matrix in VRAM and return its ID."""
if not self.vram:
raise RuntimeError("VRAM not available")
if name is None:
name = f"matrix_{self.matrix_counter}"
self.matrix_counter += 1
# Create matrix data
matrix_data = np.zeros(shape, dtype=dtype)
# Store in VRAM using HTTP storage
if self.storage.store_tensor(name, matrix_data):
self.matrix_registry[name] = name
return name
else:
raise RuntimeError(f"Failed to allocate matrix {name}")
def load_matrix(self, matrix_data: np.ndarray, name: Optional[str] = None) -> str:
"""Load matrix data into VRAM and return its ID."""
if name is None:
name = f"matrix_{self.matrix_counter}"
self.matrix_counter += 1
# Store in VRAM using HTTP storage
if self.storage.store_tensor(name, matrix_data):
self.matrix_registry[name] = name
return name
else:
raise RuntimeError(f"Failed to load matrix {name}")
def get_matrix(self, matrix_id: str) -> Optional[np.ndarray]:
"""Retrieve matrix data from VRAM."""
if matrix_id not in self.matrix_registry:
return None
return self.storage.load_tensor(matrix_id)
def matrix_multiply(self, matrix_a_id: str, matrix_b_id: str,
result_id: Optional[str] = None) -> Optional[str]:
"""Perform matrix multiplication using simulated GPU parallelism."""
start_time = time.time()
# Retrieve matrices from VRAM via HTTP storage
matrix_a = self.get_matrix(matrix_a_id)
matrix_b = self.get_matrix(matrix_b_id)
if matrix_a is None or matrix_b is None:
print(f"Error: Could not retrieve matrices {matrix_a_id} or {matrix_b_id}")
return None
try:
# Check if matrices can be multiplied
if matrix_a.shape[-1] != matrix_b.shape[0]:
print(f"Error: Matrix dimensions incompatible for multiplication: "
f"{matrix_a.shape} x {matrix_b.shape}")
return None
# Route matrix multiplication through the virtual TensorCoreArray
A = matrix_a.tolist()
B = matrix_b.tolist()
result = self.tensor_core_array.matmul(A, B)
result_array = np.array(result)
# Store result in VRAM
if result_id is None:
result_id = f"result_{self.matrix_counter}"
self.matrix_counter += 1
result_matrix_id = self.load_matrix(result_array, result_id)
# Update statistics
compute_time = time.time() - start_time
self.total_compute_time += compute_time
self.operations_performed += 1
# Calculate FLOPs (2 * M * N * K for matrix multiplication)
m, k = matrix_a.shape
k2, n = matrix_b.shape
flops = 2 * m * n * k
self.flops_performed += flops
print(f"Matrix multiplication completed: {matrix_a.shape} x {matrix_b.shape} "
f"= {result_array.shape} in {compute_time:.4f}s")
print(f"Simulated {flops:,} FLOPs across {self.total_cores} cores")
return result_matrix_id
except Exception as e:
print(f"Error in matrix multiplication: {e}")
return None
def vector_operation(self, operation: VectorOperation, vector_a_id: str,
vector_b_id: Optional[str] = None,
result_id: Optional[str] = None) -> Optional[str]:
"""Perform vector operations using simulated GPU parallelism."""
start_time = time.time()
# Retrieve vectors from VRAM via HTTP storage
vector_a = self.get_matrix(vector_a_id)
if vector_a is None:
print(f"Error: Could not retrieve vector {vector_a_id}")
return None
vector_b = None
if vector_b_id:
vector_b = self.get_matrix(vector_b_id)
if vector_b is None:
print(f"Error: Could not retrieve vector {vector_b_id}")
return None
try:
result = None
flops = 0
if operation == VectorOperation.ADD:
if vector_b is None:
raise ValueError("Vector B required for addition")
result = vector_a + vector_b
flops = vector_a.size
elif operation == VectorOperation.SUBTRACT:
if vector_b is None:
raise ValueError("Vector B required for subtraction")
result = vector_a - vector_b
flops = vector_a.size
elif operation == VectorOperation.MULTIPLY:
if vector_b is None:
raise ValueError("Vector B required for multiplication")
result = vector_a * vector_b
flops = vector_a.size
elif operation == VectorOperation.DIVIDE:
if vector_b is None:
raise ValueError("Vector B required for division")
result = vector_a / vector_b
flops = vector_a.size
elif operation == VectorOperation.DOT_PRODUCT:
if vector_b is None:
raise ValueError("Vector B required for dot product")
result = np.dot(vector_a.flatten(), vector_b.flatten())
flops = 2 * vector_a.size
elif operation == VectorOperation.CROSS_PRODUCT:
if vector_b is None:
raise ValueError("Vector B required for cross product")
if vector_a.size != 3 or vector_b.size != 3:
raise ValueError("Cross product requires 3D vectors")
result = np.cross(vector_a.flatten(), vector_b.flatten())
flops = 6 # Cross product operations
elif operation == VectorOperation.NORMALIZE:
magnitude = np.linalg.norm(vector_a)
if magnitude == 0:
result = vector_a
else:
result = vector_a / magnitude
flops = vector_a.size + 1 # Division + sqrt
elif operation == VectorOperation.MAGNITUDE:
result = np.array([np.linalg.norm(vector_a)])
flops = vector_a.size + 1 # Sum of squares + sqrt
else:
raise ValueError(f"Unknown vector operation: {operation}")
# Store result
if result_id is None:
result_id = f"vector_result_{self.matrix_counter}"
self.matrix_counter += 1
result_vector_id = self.load_matrix(result, result_id)
# Update statistics
compute_time = time.time() - start_time
self.total_compute_time += compute_time
self.operations_performed += 1
self.flops_performed += flops
print(f"Vector operation {operation.value} completed in {compute_time:.4f}s")
print(f"Simulated {flops:,} FLOPs across {self.total_cores} cores")
return result_vector_id
except Exception as e:
print(f"Error in vector operation: {e}")
return None
def has_model(self, model_id: str) -> bool:
"""Check if model is loaded via HTTP storage"""
return self.storage.is_model_loaded(model_id)
def load_model(self, model_id: str, model=None, processor=None) -> bool:
"""Load model via HTTP storage"""
try:
# Prepare model data for storage
model_data = None
if model is not None:
# In a real implementation, this would serialize the model
model_data = {
"model_type": type(model).__name__,
"config": self._serialize_model_config(getattr(model, 'config', None)),
"loaded_at": time.time()
}
# Use HTTP storage to load model
success = self.storage.load_model(model_id, model_data=model_data)
if success:
self.model_registry[model_id] = {
"model_data": model_data,
"processor": processor,
"loaded_at": time.time()
}
self.resource_monitor['loaded_models'].add(model_id)
return True
return False
except Exception as e:
print(f"Error loading model {model_id}: {str(e)}")
return False
def inference(self, model_id: str, input_tensor_id: str) -> Optional[np.ndarray]:
"""Run inference using HTTP storage"""
try:
# Load input tensor
input_data = self.storage.load_tensor(input_tensor_id)
if input_data is None:
print(f"Could not load input tensor {input_tensor_id}")
return None
# Run inference via HTTP API
result = self.storage.start_inference(model_id, input_data)
if result and result.get('output') is not None:
return result['output']
else:
print(f"Inference failed for model {model_id}")
return None
except Exception as e:
print(f"Error during inference: {str(e)}")
return None
def get_stats(self) -> Dict[str, Any]:
"""Get AI accelerator statistics"""
return {
"operations_performed": self.operations_performed,
"total_compute_time": self.total_compute_time,
"flops_performed": self.flops_performed,
"avg_ops_per_second": self.operations_performed / max(self.total_compute_time, 0.001),
"tensor_cores_initialized": self.tensor_cores_initialized,
"total_cores": self.total_cores,
"loaded_models": list(self.resource_monitor['loaded_models']),
"storage_status": self.storage.get_connection_status() if self.storage else None
}