Gpu_1 / gpu_predictor.py
anik8mishra's picture
Upload 24 files
8296d63 verified
import joblib
import pandas as pd
import numpy as np
import os
import torch
from datetime import datetime
class GPUPredictor:
"""Scalable GPU usage prediction system with caching and batch processing"""
def __init__(self, model_path='models/gradient_boosting_model.joblib', cache_size=100):
self.model = joblib.load(model_path)
self.prediction_cache = {} # Cache for fast repeated predictions
self.cache_size = cache_size
self.cache_hits = 0
self.cache_misses = 0
# Define the feature order used during training
self.feature_cols = ['total_parameters', 'trainable_parameters', 'model_size_mb', 'batch_size']
def predict(self, features_batch):
"""Make predictions for a batch of models efficiently"""
if not isinstance(features_batch, list):
features_batch = [features_batch]
results = []
features_to_predict = []
cache_indices = []
# Check cache first
for i, features in enumerate(features_batch):
cache_key = self._get_cache_key(features)
if cache_key in self.prediction_cache:
results.append(self.prediction_cache[cache_key])
self.cache_hits += 1
else:
results.append(None)
features_to_predict.append(features)
cache_indices.append(i)
self.cache_misses += 1
# Make predictions for cache misses
if features_to_predict:
# Extract features in the exact order used during training
numeric_features = []
for features in features_to_predict:
feature_dict = {
'total_parameters': features.get('total_parameters', 0),
'trainable_parameters': features.get('trainable_parameters', 0),
'model_size_mb': features.get('model_size_mb', 0),
'batch_size': features.get('batch_size', 1)
}
numeric_features.append(feature_dict)
# Convert to DataFrame with specific column order
features_df = pd.DataFrame(numeric_features)[self.feature_cols]
# Make batch prediction
predictions = self.model.predict(features_df)
# Update results and cache
for i, pred_idx in enumerate(cache_indices):
results[pred_idx] = predictions[i]
# Update cache
cache_key = self._get_cache_key(features_batch[pred_idx])
self.prediction_cache[cache_key] = predictions[i]
# Limit cache size
if len(self.prediction_cache) > self.cache_size:
# Remove oldest entries (simple approach)
keys_to_remove = list(self.prediction_cache.keys())[:-self.cache_size]
for key in keys_to_remove:
del self.prediction_cache[key]
return results[0] if len(results) == 1 else results
def _get_cache_key(self, features):
"""Generate a cache key from features"""
key_parts = []
for k in self.feature_cols: # Use the same order as feature columns
if k in features:
key_parts.append(f"{k}:{features[k]}")
return "|".join(key_parts)
def get_cache_stats(self):
"""Return cache performance statistics"""
total = self.cache_hits + self.cache_misses
hit_rate = self.cache_hits / total if total > 0 else 0
return {
"cache_size": len(self.prediction_cache),
"max_cache_size": self.cache_size,
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_rate": hit_rate
}
def optimize_batch_size(self, model_features, min_batch=1, max_batch=32, memory_limit_mb=8000):
"""Find optimal batch size for throughput within memory constraints"""
# Base memory is the model size
base_memory = model_features['model_size_mb']
# Memory scaling factor based on model size
if model_features['total_parameters'] > 100000000: # Large models like VGG16
mem_scale_factor = 0.5
else: # Smaller models
mem_scale_factor = 0.3
best_throughput = 0
optimal_batch = min_batch
# Test different batch sizes
batch_results = []
for batch_size in range(min_batch, max_batch + 1):
# Estimate memory usage
memory_usage = base_memory + (base_memory * mem_scale_factor * batch_size)
# Skip if exceeds memory limit
if memory_usage > memory_limit_mb:
continue
# Predict execution time
features = model_features.copy()
features['batch_size'] = batch_size
exec_time = self.predict(features)
# Calculate throughput (samples/second)
throughput = (batch_size * 1000) / exec_time
batch_results.append({
'batch_size': batch_size,
'exec_time_ms': exec_time,
'throughput': throughput,
'memory_usage_mb': memory_usage
})
# Update optimal if better
if throughput > best_throughput:
best_throughput = throughput
optimal_batch = batch_size
return {
'optimal_batch_size': optimal_batch,
'predicted_execution_time': next((r['exec_time_ms'] for r in batch_results if r['batch_size'] == optimal_batch), None),
'estimated_memory_usage': next((r['memory_usage_mb'] for r in batch_results if r['batch_size'] == optimal_batch), None),
'batch_results': batch_results
}