Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import time | |
| from contextlib import contextmanager | |
| from typing import Any, Optional | |
| import torch | |
| from transformers import pipeline | |
| from virtual_vram import VirtualVRAM | |
| from http_storage import HTTPGPUStorage | |
| from torch_vgpu import VGPUDevice, to_vgpu | |
| def setup_vgpu(): | |
| """Setup vGPU device""" | |
| try: | |
| # Initialize the backend first | |
| from torch_vgpu import init_vgpu_backend, VGPUDevice | |
| if not init_vgpu_backend(): | |
| raise RuntimeError("Failed to initialize vGPU backend") | |
| # Create and register vGPU device | |
| vgpu = VGPUDevice() | |
| device = vgpu.device() | |
| # Set as default device for tensor operations | |
| return device | |
| except Exception as e: | |
| logging.error(f"vGPU setup failed: {str(e)}") | |
| raise | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def gpu_context(): | |
| """Context manager for vGPU resources""" | |
| storage = None | |
| try: | |
| storage = HTTPGPUStorage() | |
| yield storage | |
| finally: | |
| if storage: | |
| storage.close() | |
| logger.info("vGPU resources cleaned up") | |
| def get_model_size(model): | |
| """Calculate model size in parameters and memory footprint""" | |
| param_size = 0 | |
| for param in model.parameters(): | |
| param_size += param.nelement() * param.element_size() | |
| buffer_size = 0 | |
| for buffer in model.buffers(): | |
| buffer_size += buffer.nelement() * buffer.element_size() | |
| return param_size + buffer_size | |
| def prepare_prompt(instruction: str) -> str: | |
| """Prepare a prompt for Llama-2 using its chat format.""" | |
| # Format: <s>[INST] instruction [/INST] assistant response </s>[INST] ... | |
| return f"<s>[INST] {instruction} [/INST]" | |
| def test_ai_integration_http(): | |
| """Test GPT OSS model on vGPU with text generation""" | |
| logger.info("Starting vGPU text generation test") | |
| status = { | |
| 'pipeline_loaded': False, | |
| 'model_on_vgpu': False, | |
| 'generation_complete': False, | |
| 'cleanup_success': False | |
| } | |
| with gpu_context() as storage: | |
| try: | |
| # Initialize vRAM with monitoring | |
| initial_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0 | |
| vram = VirtualVRAM(size_gb=None, storage=storage) | |
| # Initialize vGPU device | |
| device = setup_vgpu() | |
| logger.info(f"vGPU initialized with device {device}") | |
| # Load model using pipeline | |
| model_id = "openai/gpt-oss-20b" | |
| logger.info(f"Loading {model_id}") | |
| try: | |
| # Disable transformers logging temporarily | |
| transformers_logger = logging.getLogger("transformers") | |
| original_level = transformers_logger.level | |
| transformers_logger.setLevel(logging.ERROR) | |
| try: | |
| # Create pipeline with model directly on vGPU | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_id, | |
| model_kwargs={ | |
| "torch_dtype": torch.float32, # Use full precision | |
| "device_map": {"": device}, # Map all modules to our vGPU device | |
| }, | |
| use_safetensors=True, | |
| trust_remote_code=True, | |
| device=device # Use our vGPU device | |
| ) | |
| status["pipeline_loaded"] = True | |
| status['model_on_vgpu'] = True | |
| # Log model details | |
| logger.info(f"Pipeline created with model: {model_id}") | |
| # Log model size | |
| model_size = get_model_size(pipe.model) | |
| logger.info(f"Model loaded: {model_size/1e9:.2f} GB in parameters") | |
| logger.info(f"Model architecture: {pipe.model.__class__.__name__}") | |
| # Verify model location | |
| with torch.device(device): | |
| current_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0 | |
| logger.info(f"Model memory usage: {(current_mem - initial_mem)/1e9:.2f} GB") | |
| finally: | |
| # Restore original logging level | |
| transformers_logger.setLevel(original_level) | |
| except Exception as e: | |
| logger.error(f"Model loading failed: {str(e)}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Model transfer to vGPU failed: {str(e)}") | |
| raise | |
| # Run text generation | |
| logger.info("Running text generation...") | |
| start = time.time() | |
| peak_mem = initial_mem | |
| try: | |
| # Prepare input prompt | |
| prompt = "Explain how virtual GPUs work in simple terms." | |
| with torch.no_grad(): | |
| outputs = pipe( | |
| prompt, | |
| max_new_tokens=256, | |
| temperature=0.7, | |
| top_p=0.95, | |
| top_k=40, | |
| num_beams=1, | |
| do_sample=True, | |
| return_full_text=True | |
| ) | |
| if hasattr(storage, 'get_used_memory'): | |
| peak_mem = max(peak_mem, storage.get_used_memory()) | |
| inference_time = time.time() - start | |
| status['generation_complete'] = True | |
| # Log performance metrics | |
| logger.info(f"\nGeneration stats:") | |
| logger.info(f"- Time: {inference_time:.4f}s") | |
| logger.info(f"- Memory peak: {(peak_mem - initial_mem)/1e9:.2f} GB") | |
| logger.info(f"- Generated text: {outputs[0]['generated_text']}") | |
| except Exception as e: | |
| logger.error(f"Text generation failed: {str(e)}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Test failed: {str(e)}") | |
| raise | |
| finally: | |
| # Cleanup and status report | |
| try: | |
| if 'pipe' in locals(): | |
| del pipe | |
| if 'outputs' in locals(): | |
| del outputs | |
| torch.cuda.empty_cache() if hasattr(torch, 'cuda') else None | |
| status['cleanup_success'] = True | |
| except Exception as e: | |
| logger.error(f"Cleanup error: {str(e)}") | |
| logger.info("\nTest Summary:") | |
| for key, value in status.items(): | |
| logger.info(f"- {key}: {'✓' if value else '✗'}") | |
| final_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0 | |
| if final_mem > initial_mem: | |
| logger.warning(f"Memory leak detected: {(final_mem - initial_mem)/1e6:.2f} MB") | |
| if __name__ == "__main__": | |
| test_ai_integration_http() |