|
|
import logging
|
|
|
import os
|
|
|
import time
|
|
|
from contextlib import contextmanager
|
|
|
from typing import Any, Optional
|
|
|
|
|
|
import torch
|
|
|
from transformers import pipeline
|
|
|
from virtual_vram import VirtualVRAM
|
|
|
from http_storage import HTTPGPUStorage
|
|
|
from torch_vgpu import VGPUDevice, to_vgpu
|
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN")
|
|
|
|
|
|
|
|
|
def setup_vgpu():
|
|
|
"""Setup vGPU device"""
|
|
|
try:
|
|
|
|
|
|
from torch_vgpu import init_vgpu_backend, VGPUDevice
|
|
|
if not init_vgpu_backend():
|
|
|
raise RuntimeError("Failed to initialize vGPU backend")
|
|
|
|
|
|
|
|
|
vgpu = VGPUDevice()
|
|
|
device = vgpu.device()
|
|
|
|
|
|
|
|
|
return device
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error(f"vGPU setup failed: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
|
)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@contextmanager
|
|
|
def gpu_context():
|
|
|
"""Context manager for vGPU resources"""
|
|
|
storage = None
|
|
|
try:
|
|
|
storage = HTTPGPUStorage()
|
|
|
yield storage
|
|
|
finally:
|
|
|
if storage:
|
|
|
storage.close()
|
|
|
logger.info("vGPU resources cleaned up")
|
|
|
|
|
|
def get_model_size(model):
|
|
|
"""Calculate model size in parameters and memory footprint"""
|
|
|
param_size = 0
|
|
|
for param in model.parameters():
|
|
|
param_size += param.nelement() * param.element_size()
|
|
|
buffer_size = 0
|
|
|
for buffer in model.buffers():
|
|
|
buffer_size += buffer.nelement() * buffer.element_size()
|
|
|
return param_size + buffer_size
|
|
|
|
|
|
def prepare_prompt(instruction: str) -> str:
|
|
|
"""Prepare a prompt for Llama-2 using its chat format."""
|
|
|
|
|
|
return f"<s>[INST] {instruction} [/INST]"
|
|
|
|
|
|
def test_ai_integration_http():
|
|
|
"""Test GPT OSS model on vGPU with text generation"""
|
|
|
logger.info("Starting vGPU text generation test")
|
|
|
|
|
|
status = {
|
|
|
'pipeline_loaded': False,
|
|
|
'model_on_vgpu': False,
|
|
|
'generation_complete': False,
|
|
|
'cleanup_success': False
|
|
|
}
|
|
|
|
|
|
with gpu_context() as storage:
|
|
|
try:
|
|
|
|
|
|
initial_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
|
|
|
vram = VirtualVRAM(size_gb=None, storage=storage)
|
|
|
|
|
|
|
|
|
device = setup_vgpu()
|
|
|
logger.info(f"vGPU initialized with device {device}")
|
|
|
|
|
|
|
|
|
model_id = "openai/gpt-oss-20b"
|
|
|
logger.info(f"Loading {model_id}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
transformers_logger = logging.getLogger("transformers")
|
|
|
original_level = transformers_logger.level
|
|
|
transformers_logger.setLevel(logging.ERROR)
|
|
|
|
|
|
try:
|
|
|
|
|
|
pipe = pipeline(
|
|
|
"text-generation",
|
|
|
model=model_id,
|
|
|
model_kwargs={
|
|
|
"torch_dtype": torch.float32,
|
|
|
"device_map": {"": device},
|
|
|
},
|
|
|
use_safetensors=True,
|
|
|
trust_remote_code=True,
|
|
|
device=device
|
|
|
)
|
|
|
status["pipeline_loaded"] = True
|
|
|
status['model_on_vgpu'] = True
|
|
|
|
|
|
|
|
|
logger.info(f"Pipeline created with model: {model_id}")
|
|
|
|
|
|
|
|
|
model_size = get_model_size(pipe.model)
|
|
|
logger.info(f"Model loaded: {model_size/1e9:.2f} GB in parameters")
|
|
|
logger.info(f"Model architecture: {pipe.model.__class__.__name__}")
|
|
|
|
|
|
|
|
|
with torch.device(device):
|
|
|
current_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
|
|
|
logger.info(f"Model memory usage: {(current_mem - initial_mem)/1e9:.2f} GB")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
transformers_logger.setLevel(original_level)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Model loading failed: {str(e)}")
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
logger.error(f"Model transfer to vGPU failed: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
logger.info("Running text generation...")
|
|
|
start = time.time()
|
|
|
peak_mem = initial_mem
|
|
|
|
|
|
try:
|
|
|
|
|
|
prompt = "Explain how virtual GPUs work in simple terms."
|
|
|
|
|
|
with torch.no_grad():
|
|
|
outputs = pipe(
|
|
|
prompt,
|
|
|
max_new_tokens=256,
|
|
|
temperature=0.7,
|
|
|
top_p=0.95,
|
|
|
top_k=40,
|
|
|
num_beams=1,
|
|
|
do_sample=True,
|
|
|
return_full_text=True
|
|
|
)
|
|
|
|
|
|
if hasattr(storage, 'get_used_memory'):
|
|
|
peak_mem = max(peak_mem, storage.get_used_memory())
|
|
|
|
|
|
inference_time = time.time() - start
|
|
|
status['generation_complete'] = True
|
|
|
|
|
|
|
|
|
logger.info(f"\nGeneration stats:")
|
|
|
logger.info(f"- Time: {inference_time:.4f}s")
|
|
|
logger.info(f"- Memory peak: {(peak_mem - initial_mem)/1e9:.2f} GB")
|
|
|
logger.info(f"- Generated text: {outputs[0]['generated_text']}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Text generation failed: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Test failed: {str(e)}")
|
|
|
raise
|
|
|
finally:
|
|
|
|
|
|
try:
|
|
|
if 'pipe' in locals():
|
|
|
del pipe
|
|
|
if 'outputs' in locals():
|
|
|
del outputs
|
|
|
torch.cuda.empty_cache() if hasattr(torch, 'cuda') else None
|
|
|
status['cleanup_success'] = True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Cleanup error: {str(e)}")
|
|
|
|
|
|
logger.info("\nTest Summary:")
|
|
|
for key, value in status.items():
|
|
|
logger.info(f"- {key}: {'✓' if value else '✗'}")
|
|
|
|
|
|
final_mem = storage.get_used_memory() if hasattr(storage, 'get_used_memory') else 0
|
|
|
if final_mem > initial_mem:
|
|
|
logger.warning(f"Memory leak detected: {(final_mem - initial_mem)/1e6:.2f} MB")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
test_ai_integration_http() |