INV / test_ai_integration_http.py.bak
Fred808's picture
Upload 256 files
7a0c684 verified
import logging
import os
import time
from contextlib import contextmanager
from typing import Any, Optional
import torch
from transformers import pipeline
from virtual_vram import VirtualVRAM
from http_storage import HTTPGPUStorage
from torch_vgpu import VGPUDevice, to_vgpu
def setup_vgpu():
"""Setup vGPU device"""
try:
# Initialize the backend first
from torch_vgpu import init_vgpu_backend, VGPUDevice
if not init_vgpu_backend():
raise RuntimeError("Failed to initialize vGPU backend")
# Create and register vGPU device
vgpu = VGPUDevice()
device = vgpu.device()
# Set as default device for tensor operations
return device
except Exception as e:
logging.error(f"vGPU setup failed: {str(e)}")
raise
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@contextmanager
def gpu_context():
"""Context manager for vGPU resources"""
storage = None
try:
storage = HTTPGPUStorage()
yield storage
finally:
if storage:
storage.close()
logger.info("vGPU resources cleaned up")
def get_model_size(model):
"""Calculate model size in parameters and memory footprint"""
param_size = 0
for param in model.parameters():
param_size += param.nelement() * param.element_size()
buffer_size = 0
for buffer in model.buffers():
buffer_size += buffer.nelement() * buffer.element_size()
return param_size + buffer_size
def prepare_prompt(instruction: str) -> str:
"""Prepare a prompt for Llama-2 using its chat format."""
# Format: <s>[INST] instruction [/INST] assistant response </s>[INST] ...
return f"<s>[INST] {instruction} [/INST]"
def test_ai_integration_http():
"""Test GPT OSS model on vGPU with text generation"""
logger.info("Starting vGPU text generation test")
status = {
'pipeline_loaded': False,
'model_on_vgpu': False,
'generation_complete': False,
'cleanup_success': False
}
with gpu_context() as storage:
try:
# Initialize vRAM with monitoring
initial_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
vram = VirtualVRAM(size_gb=None, storage=storage)
# Initialize vGPU device
device = setup_vgpu()
logger.info(f"vGPU initialized with device {device}")
# Load model using pipeline
model_id = "openai/gpt-oss-20b"
logger.info(f"Loading {model_id}")
try:
# Disable transformers logging temporarily
transformers_logger = logging.getLogger("transformers")
original_level = transformers_logger.level
transformers_logger.setLevel(logging.ERROR)
try:
# Create pipeline with model directly on vGPU
pipe = pipeline(
"text-generation",
model=model_id,
model_kwargs={
"torch_dtype": torch.float32, # Use full precision
"device_map": {"": device}, # Map all modules to our vGPU device
},
use_safetensors=True,
trust_remote_code=True,
device=device # Use our vGPU device
)
status["pipeline_loaded"] = True
status['model_on_vgpu'] = True
# Log model details
logger.info(f"Pipeline created with model: {model_id}")
# Log model size
model_size = get_model_size(pipe.model)
logger.info(f"Model loaded: {model_size/1e9:.2f} GB in parameters")
logger.info(f"Model architecture: {pipe.model.__class__.__name__}")
# Verify model location
with torch.device(device):
current_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
logger.info(f"Model memory usage: {(current_mem - initial_mem)/1e9:.2f} GB")
finally:
# Restore original logging level
transformers_logger.setLevel(original_level)
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Model transfer to vGPU failed: {str(e)}")
raise
# Run text generation
logger.info("Running text generation...")
start = time.time()
peak_mem = initial_mem
try:
# Prepare input prompt
prompt = "Explain how virtual GPUs work in simple terms."
with torch.no_grad():
outputs = pipe(
prompt,
max_new_tokens=256,
temperature=0.7,
top_p=0.95,
top_k=40,
num_beams=1,
do_sample=True,
return_full_text=True
)
if hasattr(storage, 'get_used_memory'):
peak_mem = max(peak_mem, storage.get_used_memory())
inference_time = time.time() - start
status['generation_complete'] = True
# Log performance metrics
logger.info(f"\nGeneration stats:")
logger.info(f"- Time: {inference_time:.4f}s")
logger.info(f"- Memory peak: {(peak_mem - initial_mem)/1e9:.2f} GB")
logger.info(f"- Generated text: {outputs[0]['generated_text']}")
except Exception as e:
logger.error(f"Text generation failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Test failed: {str(e)}")
raise
finally:
# Cleanup and status report
try:
if 'pipe' in locals():
del pipe
if 'outputs' in locals():
del outputs
torch.cuda.empty_cache() if hasattr(torch, 'cuda') else None
status['cleanup_success'] = True
except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
logger.info("\nTest Summary:")
for key, value in status.items():
logger.info(f"- {key}: {'✓' if value else '✗'}")
final_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
if final_mem > initial_mem:
logger.warning(f"Memory leak detected: {(final_mem - initial_mem)/1e6:.2f} MB")
if __name__ == "__main__":
test_ai_integration_http()