import logging
import os
import time
from contextlib import contextmanager
from typing import Any, Optional
import torch
from transformers import pipeline
from virtual_vram import VirtualVRAM
from http_storage import HTTPGPUStorage
from torch_vgpu import VGPUDevice, to_vgpu
def setup_vgpu():
"""Setup vGPU device"""
try:
# Initialize the backend first
from torch_vgpu import init_vgpu_backend, VGPUDevice
if not init_vgpu_backend():
raise RuntimeError("Failed to initialize vGPU backend")
# Create and register vGPU device
vgpu = VGPUDevice()
device = vgpu.device()
# Set as default device for tensor operations
return device
except Exception as e:
logging.error(f"vGPU setup failed: {str(e)}")
raise
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@contextmanager
def gpu_context():
"""Context manager for vGPU resources"""
storage = None
try:
storage = HTTPGPUStorage()
yield storage
finally:
if storage:
storage.close()
logger.info("vGPU resources cleaned up")
def get_model_size(model):
"""Calculate model size in parameters and memory footprint"""
param_size = 0
for param in model.parameters():
param_size += param.nelement() * param.element_size()
buffer_size = 0
for buffer in model.buffers():
buffer_size += buffer.nelement() * buffer.element_size()
return param_size + buffer_size
def prepare_prompt(instruction: str) -> str:
"""Prepare a prompt for Llama-2 using its chat format."""
# Format: [INST] instruction [/INST] assistant response [INST] ...
return f"[INST] {instruction} [/INST]"
def test_ai_integration_http():
"""Test GPT OSS model on vGPU with text generation"""
logger.info("Starting vGPU text generation test")
status = {
'pipeline_loaded': False,
'model_on_vgpu': False,
'generation_complete': False,
'cleanup_success': False
}
with gpu_context() as storage:
try:
# Initialize vRAM with monitoring
initial_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
vram = VirtualVRAM(size_gb=None, storage=storage)
# Initialize vGPU device
device = setup_vgpu()
logger.info(f"vGPU initialized with device {device}")
# Load model using pipeline
model_id = "openai/gpt-oss-20b"
logger.info(f"Loading {model_id}")
try:
# Disable transformers logging temporarily
transformers_logger = logging.getLogger("transformers")
original_level = transformers_logger.level
transformers_logger.setLevel(logging.ERROR)
try:
# Create pipeline with model directly on vGPU
pipe = pipeline(
"text-generation",
model=model_id,
model_kwargs={
"torch_dtype": torch.float32, # Use full precision
"device_map": {"": device}, # Map all modules to our vGPU device
},
use_safetensors=True,
trust_remote_code=True,
device=device # Use our vGPU device
)
status["pipeline_loaded"] = True
status['model_on_vgpu'] = True
# Log model details
logger.info(f"Pipeline created with model: {model_id}")
# Log model size
model_size = get_model_size(pipe.model)
logger.info(f"Model loaded: {model_size/1e9:.2f} GB in parameters")
logger.info(f"Model architecture: {pipe.model.__class__.__name__}")
# Verify model location
with torch.device(device):
current_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
logger.info(f"Model memory usage: {(current_mem - initial_mem)/1e9:.2f} GB")
finally:
# Restore original logging level
transformers_logger.setLevel(original_level)
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Model transfer to vGPU failed: {str(e)}")
raise
# Run text generation
logger.info("Running text generation...")
start = time.time()
peak_mem = initial_mem
try:
# Prepare input prompt
prompt = "Explain how virtual GPUs work in simple terms."
with torch.no_grad():
outputs = pipe(
prompt,
max_new_tokens=256,
temperature=0.7,
top_p=0.95,
top_k=40,
num_beams=1,
do_sample=True,
return_full_text=True
)
if hasattr(storage, 'get_used_memory'):
peak_mem = max(peak_mem, storage.get_used_memory())
inference_time = time.time() - start
status['generation_complete'] = True
# Log performance metrics
logger.info(f"\nGeneration stats:")
logger.info(f"- Time: {inference_time:.4f}s")
logger.info(f"- Memory peak: {(peak_mem - initial_mem)/1e9:.2f} GB")
logger.info(f"- Generated text: {outputs[0]['generated_text']}")
except Exception as e:
logger.error(f"Text generation failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Test failed: {str(e)}")
raise
finally:
# Cleanup and status report
try:
if 'pipe' in locals():
del pipe
if 'outputs' in locals():
del outputs
torch.cuda.empty_cache() if hasattr(torch, 'cuda') else None
status['cleanup_success'] = True
except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
logger.info("\nTest Summary:")
for key, value in status.items():
logger.info(f"- {key}: {'✓' if value else '✗'}")
final_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
if final_mem > initial_mem:
logger.warning(f"Memory leak detected: {(final_mem - initial_mem)/1e6:.2f} MB")
if __name__ == "__main__":
test_ai_integration_http()