INFER / ai.py
Factor Studios
Upload 27 files
2ff82ee verified
import numpy as np
import time
from typing import Dict, Any, Optional, Tuple, Union, List
from enum import Enum
class VectorOperation(Enum):
"""Enumeration of supported vector operations."""
ADD = "add"
SUBTRACT = "subtract"
MULTIPLY = "multiply"
DIVIDE = "divide"
DOT_PRODUCT = "dot_product"
CROSS_PRODUCT = "cross_product"
NORMALIZE = "normalize"
MAGNITUDE = "magnitude"
class AIAccelerator:
"""
AI Accelerator that simulates GPU-based AI computations.
This class leverages NumPy's optimized operations to simulate the parallel
processing capabilities of the vGPU for AI workloads.
"""
def __init__(self, vram=None, num_sms: int = 800, cores_per_sm: int = 222):
self.vram = vram
self.num_sms = num_sms
self.cores_per_sm = cores_per_sm
self.total_cores = num_sms * cores_per_sm
# AI operation statistics
self.operations_performed = 0
self.total_compute_time = 0.0
self.flops_performed = 0 # Floating point operations
# Matrix registry for storing matrices in VRAM
self.matrix_registry: Dict[str, str] = {} # matrix_id -> vram_address
self.matrix_counter = 0
# Model/tokenizer registry for full isolation
self.model_registry: Dict[str, Any] = {}
self.tokenizer_registry: Dict[str, Any] = {}
self.model_loaded = False
def set_vram(self, vram):
"""Set the VRAM reference."""
self.vram = vram
def allocate_matrix(self, shape: Tuple[int, ...], dtype=np.float32,
name: Optional[str] = None) -> str:
"""Allocate a matrix in VRAM and return its ID."""
if not self.vram:
raise RuntimeError("VRAM not available")
if name is None:
name = f"matrix_{self.matrix_counter}"
self.matrix_counter += 1
# Create matrix data
matrix_data = np.zeros(shape, dtype=dtype)
# Store in VRAM as a texture (reusing texture storage mechanism)
matrix_id = self.vram.load_texture(matrix_data, name)
self.matrix_registry[name] = matrix_id
return name
def load_matrix(self, matrix_data: np.ndarray, name: Optional[str] = None) -> str:
"""Load matrix data into VRAM and return its ID."""
if not self.vram:
raise RuntimeError("VRAM not available")
if name is None:
name = f"matrix_{self.matrix_counter}"
self.matrix_counter += 1
# Store in VRAM
matrix_id = self.vram.load_texture(matrix_data, name)
self.matrix_registry[name] = matrix_id
return name
def get_matrix(self, matrix_id: str) -> Optional[np.ndarray]:
"""Retrieve matrix data from VRAM."""
if not self.vram or matrix_id not in self.matrix_registry:
return None
vram_id = self.matrix_registry[matrix_id]
return self.vram.get_texture(vram_id)
def matrix_multiply(self, matrix_a_id: str, matrix_b_id: str,
result_id: Optional[str] = None) -> Optional[str]:
"""Perform matrix multiplication using simulated GPU parallelism."""
start_time = time.time()
# Retrieve matrices from VRAM
matrix_a = self.get_matrix(matrix_a_id)
matrix_b = self.get_matrix(matrix_b_id)
if matrix_a is None or matrix_b is None:
print(f"Error: Could not retrieve matrices {matrix_a_id} or {matrix_b_id}")
return None
try:
# Check if matrices can be multiplied
if matrix_a.shape[-1] != matrix_b.shape[0]:
print(f"Error: Matrix dimensions incompatible for multiplication: "
f"{matrix_a.shape} x {matrix_b.shape}")
return None
# Simulate parallel processing by breaking down the operation
# In a real GPU, this would be distributed across SMs and cores
result = self._simulate_parallel_matmul(matrix_a, matrix_b)
# Store result in VRAM
if result_id is None:
result_id = f"result_{self.matrix_counter}"
self.matrix_counter += 1
result_matrix_id = self.load_matrix(result, result_id)
# Update statistics
compute_time = time.time() - start_time
self.total_compute_time += compute_time
self.operations_performed += 1
# Calculate FLOPs (2 * M * N * K for matrix multiplication)
m, k = matrix_a.shape
k2, n = matrix_b.shape
flops = 2 * m * n * k
self.flops_performed += flops
print(f"Matrix multiplication completed: {matrix_a.shape} x {matrix_b.shape} "
f"= {result.shape} in {compute_time:.4f}s")
print(f"Simulated {flops:,} FLOPs across {self.total_cores} cores")
return result_matrix_id
except Exception as e:
print(f"Error in matrix multiplication: {e}")
return None
def _simulate_parallel_matmul(self, matrix_a: np.ndarray, matrix_b: np.ndarray) -> np.ndarray:
"""Simulate parallel matrix multiplication across SMs."""
# Use NumPy's optimized matrix multiplication
# In a real implementation, this would be broken down into blocks
# and distributed across the simulated SMs
# For demonstration, we can show how the work would be distributed
m, k = matrix_a.shape
k2, n = matrix_b.shape
# Calculate work distribution
total_output_elements = m * n
elements_per_sm = max(1, total_output_elements // self.num_sms)
print(f"Distributing {total_output_elements:,} output elements across "
f"{self.num_sms} SMs ({elements_per_sm} elements per SM)")
# Perform the actual computation using NumPy
result = np.dot(matrix_a, matrix_b)
return result
def vector_operation(self, operation: VectorOperation, vector_a_id: str,
vector_b_id: Optional[str] = None,
result_id: Optional[str] = None) -> Optional[str]:
"""Perform vector operations using simulated GPU parallelism."""
start_time = time.time()
# Retrieve vectors from VRAM
vector_a = self.get_matrix(vector_a_id)
if vector_a is None:
print(f"Error: Could not retrieve vector {vector_a_id}")
return None
vector_b = None
if vector_b_id:
vector_b = self.get_matrix(vector_b_id)
if vector_b is None:
print(f"Error: Could not retrieve vector {vector_b_id}")
return None
try:
result = None
flops = 0
if operation == VectorOperation.ADD:
if vector_b is None:
raise ValueError("Vector B required for addition")
result = vector_a + vector_b
flops = vector_a.size
elif operation == VectorOperation.SUBTRACT:
if vector_b is None:
raise ValueError("Vector B required for subtraction")
result = vector_a - vector_b
flops = vector_a.size
elif operation == VectorOperation.MULTIPLY:
if vector_b is None:
raise ValueError("Vector B required for multiplication")
result = vector_a * vector_b
flops = vector_a.size
elif operation == VectorOperation.DIVIDE:
if vector_b is None:
raise ValueError("Vector B required for division")
result = vector_a / vector_b
flops = vector_a.size
elif operation == VectorOperation.DOT_PRODUCT:
if vector_b is None:
raise ValueError("Vector B required for dot product")
result = np.dot(vector_a.flatten(), vector_b.flatten())
flops = 2 * vector_a.size
elif operation == VectorOperation.CROSS_PRODUCT:
if vector_b is None:
raise ValueError("Vector B required for cross product")
result = np.cross(vector_a, vector_b)
flops = 6 # Approximate for 3D cross product
elif operation == VectorOperation.NORMALIZE:
magnitude = np.linalg.norm(vector_a)
result = vector_a / magnitude if magnitude > 0 else vector_a
flops = vector_a.size * 2 # Division + magnitude calculation
elif operation == VectorOperation.MAGNITUDE:
result = np.array([np.linalg.norm(vector_a)])
flops = vector_a.size * 2 # Squares and sum
else:
raise ValueError(f"Unsupported vector operation: {operation}")
# Store result in VRAM
if result_id is None:
result_id = f"vector_result_{self.matrix_counter}"
self.matrix_counter += 1
result_vector_id = self.load_matrix(result, result_id)
# Update statistics
compute_time = time.time() - start_time
self.total_compute_time += compute_time
self.operations_performed += 1
self.flops_performed += flops
print(f"Vector operation {operation.value} completed in {compute_time:.4f}s")
return result_vector_id
except Exception as e:
print(f"Error in vector operation {operation.value}: {e}")
return None
def convolution_2d(self, input_id: str, kernel_id: str,
stride: int = 1, padding: int = 0,
result_id: Optional[str] = None) -> Optional[str]:
"""Perform 2D convolution operation."""
start_time = time.time()
# Retrieve input and kernel from VRAM
input_data = self.get_matrix(input_id)
kernel = self.get_matrix(kernel_id)
if input_data is None or kernel is None:
print(f"Error: Could not retrieve input or kernel")
return None
try:
# Simple 2D convolution implementation
# In a real GPU implementation, this would be highly optimized
# and distributed across many cores
if len(input_data.shape) == 2:
input_h, input_w = input_data.shape
channels = 1
else:
input_h, input_w, channels = input_data.shape
kernel_h, kernel_w = kernel.shape[:2]
# Calculate output dimensions
output_h = (input_h + 2 * padding - kernel_h) // stride + 1
output_w = (input_w + 2 * padding - kernel_w) // stride + 1
# Initialize output
if channels == 1:
output = np.zeros((output_h, output_w))
else:
output = np.zeros((output_h, output_w, channels))
# Pad input if necessary
if padding > 0:
if channels == 1:
padded_input = np.pad(input_data, padding, mode='constant')
else:
padded_input = np.pad(input_data,
((padding, padding), (padding, padding), (0, 0)),
mode='constant')
else:
padded_input = input_data
# Perform convolution
flops = 0
for y in range(0, output_h):
for x in range(0, output_w):
y_start = y * stride
x_start = x * stride
if channels == 1:
patch = padded_input[y_start:y_start+kernel_h, x_start:x_start+kernel_w]
output[y, x] = np.sum(patch * kernel)
flops += kernel_h * kernel_w * 2 # Multiply and add
else:
for c in range(channels):
patch = padded_input[y_start:y_start+kernel_h,
x_start:x_start+kernel_w, c]
output[y, x, c] = np.sum(patch * kernel)
flops += kernel_h * kernel_w * 2
# Store result in VRAM
if result_id is None:
result_id = f"conv_result_{self.matrix_counter}"
self.matrix_counter += 1
result_conv_id = self.load_matrix(output, result_id)
# Update statistics
compute_time = time.time() - start_time
self.total_compute_time += compute_time
self.operations_performed += 1
self.flops_performed += flops
print(f"2D Convolution completed: {input_data.shape} * {kernel.shape} "
f"= {output.shape} in {compute_time:.4f}s")
print(f"Simulated {flops:,} FLOPs")
return result_conv_id
except Exception as e:
print(f"Error in 2D convolution: {e}")
return None
def get_stats(self) -> Dict[str, Any]:
"""Get AI accelerator statistics."""
avg_compute_time = self.total_compute_time / max(1, self.operations_performed)
flops_per_second = self.flops_performed / max(0.001, self.total_compute_time)
return {
"operations_performed": self.operations_performed,
"total_compute_time": self.total_compute_time,
"avg_compute_time": avg_compute_time,
"flops_performed": self.flops_performed,
"flops_per_second": flops_per_second,
"matrices_in_memory": len(self.matrix_registry),
"simulated_cores": self.total_cores,
"simulated_sms": self.num_sms
}
def reset_stats(self) -> None:
"""Reset AI accelerator statistics."""
self.operations_performed = 0
self.total_compute_time = 0.0
self.flops_performed = 0
def load_model(self, model_id: str, model: Any, processor: Any):
"""Loads a model and its processor into the accelerator's registry."""
self.model_registry[model_id] = model
self.tokenizer_registry[model_id] = processor
self.model_loaded = True
print(f"Model '{model_id}' loaded into AIAccelerator.")
def has_model(self, model_id: str) -> bool:
"""Checks if a model is loaded in the accelerator's registry."""
return model_id in self.model_registry
def inference(self, model_id, input_text, idx=None):
print(f"[DEBUG] AIAccelerator.inference called for model_id={model_id}, idx={idx}")
if not self.has_model(model_id):
print(f"[ERROR] Model {model_id} not loaded in AIAccelerator.")
return None
model = self.model_registry[model_id]
processor = self.tokenizer_registry[model_id]
try:
# Check if this is a dummy model for testing
if hasattr(model, '__class__') and 'Dummy' in model.__class__.__name__:
# Handle dummy model for testing
return processor.decode([1, 2, 3, 4, 5], skip_special_tokens=True)
# Try to import torch and transformers for real models
import torch
from transformers import BlipForConditionalGeneration, BlipProcessor
# BLIP vision model branch
if isinstance(model, BlipForConditionalGeneration) and isinstance(processor, BlipProcessor):
# input_text is actually the image/frame (numpy array)
image = input_text
prompt = "Describe this image."
# Accept numpy.ndarray, PIL.Image, or torch.Tensor
if not (hasattr(image, 'shape') or hasattr(image, 'size')):
raise ValueError(f"Invalid image type. Expected either PIL.Image.Image, numpy.ndarray, torch.Tensor, tf.Tensor or jax.ndarray, but got {type(image)}.")
inputs = processor(images=image, text=prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
out = model.generate(**inputs, max_new_tokens=64)
caption = processor.decode(out[0], skip_special_tokens=True)
print(f"[DEBUG] BLIP inference result for idx={idx}: {caption}")
return caption
else:
print(f"[ERROR] Unsupported model type for inference: {type(model)}")
return None
except Exception as e:
print(f"[ERROR] AIAccelerator.inference failed for idx={idx}: {e}")
return None