import logging import os import time from contextlib import contextmanager from typing import Any, Optional import torch from transformers import pipeline from virtual_vram import VirtualVRAM from http_storage import HTTPGPUStorage from torch_vgpu import VGPUDevice, to_vgpu def setup_vgpu(): """Setup vGPU device""" try: # Initialize the backend first from torch_vgpu import init_vgpu_backend, VGPUDevice if not init_vgpu_backend(): raise RuntimeError("Failed to initialize vGPU backend") # Create and register vGPU device vgpu = VGPUDevice() device = vgpu.device() # Set as default device for tensor operations return device except Exception as e: logging.error(f"vGPU setup failed: {str(e)}") raise # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @contextmanager def gpu_context(): """Context manager for vGPU resources""" storage = None try: storage = HTTPGPUStorage() yield storage finally: if storage: storage.close() logger.info("vGPU resources cleaned up") def get_model_size(model): """Calculate model size in parameters and memory footprint""" param_size = 0 for param in model.parameters(): param_size += param.nelement() * param.element_size() buffer_size = 0 for buffer in model.buffers(): buffer_size += buffer.nelement() * buffer.element_size() return param_size + buffer_size def prepare_prompt(instruction: str) -> str: """Prepare a prompt for Llama-2 using its chat format.""" # Format: [INST] instruction [/INST] assistant response [INST] ... return f"[INST] {instruction} [/INST]" def test_ai_integration_http(): """Test GPT OSS model on vGPU with text generation""" logger.info("Starting vGPU text generation test") status = { 'pipeline_loaded': False, 'model_on_vgpu': False, 'generation_complete': False, 'cleanup_success': False } with gpu_context() as storage: try: # Initialize vRAM with monitoring initial_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0 vram = VirtualVRAM(size_gb=None, storage=storage) # Initialize vGPU device device = setup_vgpu() logger.info(f"vGPU initialized with device {device}") # Load model using pipeline model_id = "openai/gpt-oss-20b" logger.info(f"Loading {model_id}") try: # Disable transformers logging temporarily transformers_logger = logging.getLogger("transformers") original_level = transformers_logger.level transformers_logger.setLevel(logging.ERROR) try: # Create pipeline with model directly on vGPU pipe = pipeline( "text-generation", model=model_id, model_kwargs={ "torch_dtype": torch.float32, # Use full precision "device_map": {"": device}, # Map all modules to our vGPU device }, use_safetensors=True, trust_remote_code=True, device=device # Use our vGPU device ) status["pipeline_loaded"] = True status['model_on_vgpu'] = True # Log model details logger.info(f"Pipeline created with model: {model_id}") # Log model size model_size = get_model_size(pipe.model) logger.info(f"Model loaded: {model_size/1e9:.2f} GB in parameters") logger.info(f"Model architecture: {pipe.model.__class__.__name__}") # Verify model location with torch.device(device): current_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0 logger.info(f"Model memory usage: {(current_mem - initial_mem)/1e9:.2f} GB") finally: # Restore original logging level transformers_logger.setLevel(original_level) except Exception as e: logger.error(f"Model loading failed: {str(e)}") raise except Exception as e: logger.error(f"Model transfer to vGPU failed: {str(e)}") raise # Run text generation logger.info("Running text generation...") start = time.time() peak_mem = initial_mem try: # Prepare input prompt prompt = "Explain how virtual GPUs work in simple terms." with torch.no_grad(): outputs = pipe( prompt, max_new_tokens=256, temperature=0.7, top_p=0.95, top_k=40, num_beams=1, do_sample=True, return_full_text=True ) if hasattr(storage, 'get_used_memory'): peak_mem = max(peak_mem, storage.get_used_memory()) inference_time = time.time() - start status['generation_complete'] = True # Log performance metrics logger.info(f"\nGeneration stats:") logger.info(f"- Time: {inference_time:.4f}s") logger.info(f"- Memory peak: {(peak_mem - initial_mem)/1e9:.2f} GB") logger.info(f"- Generated text: {outputs[0]['generated_text']}") except Exception as e: logger.error(f"Text generation failed: {str(e)}") raise except Exception as e: logger.error(f"Test failed: {str(e)}") raise finally: # Cleanup and status report try: if 'pipe' in locals(): del pipe if 'outputs' in locals(): del outputs torch.cuda.empty_cache() if hasattr(torch, 'cuda') else None status['cleanup_success'] = True except Exception as e: logger.error(f"Cleanup error: {str(e)}") logger.info("\nTest Summary:") for key, value in status.items(): logger.info(f"- {key}: {'✓' if value else '✗'}") final_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0 if final_mem > initial_mem: logger.warning(f"Memory leak detected: {(final_mem - initial_mem)/1e6:.2f} MB") if __name__ == "__main__": test_ai_integration_http()