|
|
import joblib |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import os |
|
|
import torch |
|
|
from datetime import datetime |
|
|
|
|
|
class GPUPredictor: |
|
|
"""Scalable GPU usage prediction system with caching and batch processing""" |
|
|
|
|
|
def __init__(self, model_path='models/gradient_boosting_model.joblib', cache_size=100): |
|
|
self.model = joblib.load(model_path) |
|
|
self.prediction_cache = {} |
|
|
self.cache_size = cache_size |
|
|
self.cache_hits = 0 |
|
|
self.cache_misses = 0 |
|
|
|
|
|
self.feature_cols = ['total_parameters', 'trainable_parameters', 'model_size_mb', 'batch_size'] |
|
|
|
|
|
def predict(self, features_batch): |
|
|
"""Make predictions for a batch of models efficiently""" |
|
|
if not isinstance(features_batch, list): |
|
|
features_batch = [features_batch] |
|
|
|
|
|
results = [] |
|
|
features_to_predict = [] |
|
|
cache_indices = [] |
|
|
|
|
|
|
|
|
for i, features in enumerate(features_batch): |
|
|
cache_key = self._get_cache_key(features) |
|
|
if cache_key in self.prediction_cache: |
|
|
results.append(self.prediction_cache[cache_key]) |
|
|
self.cache_hits += 1 |
|
|
else: |
|
|
results.append(None) |
|
|
features_to_predict.append(features) |
|
|
cache_indices.append(i) |
|
|
self.cache_misses += 1 |
|
|
|
|
|
|
|
|
if features_to_predict: |
|
|
|
|
|
numeric_features = [] |
|
|
for features in features_to_predict: |
|
|
feature_dict = { |
|
|
'total_parameters': features.get('total_parameters', 0), |
|
|
'trainable_parameters': features.get('trainable_parameters', 0), |
|
|
'model_size_mb': features.get('model_size_mb', 0), |
|
|
'batch_size': features.get('batch_size', 1) |
|
|
} |
|
|
numeric_features.append(feature_dict) |
|
|
|
|
|
|
|
|
features_df = pd.DataFrame(numeric_features)[self.feature_cols] |
|
|
|
|
|
|
|
|
predictions = self.model.predict(features_df) |
|
|
|
|
|
|
|
|
for i, pred_idx in enumerate(cache_indices): |
|
|
results[pred_idx] = predictions[i] |
|
|
|
|
|
|
|
|
cache_key = self._get_cache_key(features_batch[pred_idx]) |
|
|
self.prediction_cache[cache_key] = predictions[i] |
|
|
|
|
|
|
|
|
if len(self.prediction_cache) > self.cache_size: |
|
|
|
|
|
keys_to_remove = list(self.prediction_cache.keys())[:-self.cache_size] |
|
|
for key in keys_to_remove: |
|
|
del self.prediction_cache[key] |
|
|
|
|
|
return results[0] if len(results) == 1 else results |
|
|
|
|
|
def _get_cache_key(self, features): |
|
|
"""Generate a cache key from features""" |
|
|
key_parts = [] |
|
|
for k in self.feature_cols: |
|
|
if k in features: |
|
|
key_parts.append(f"{k}:{features[k]}") |
|
|
return "|".join(key_parts) |
|
|
|
|
|
def get_cache_stats(self): |
|
|
"""Return cache performance statistics""" |
|
|
total = self.cache_hits + self.cache_misses |
|
|
hit_rate = self.cache_hits / total if total > 0 else 0 |
|
|
return { |
|
|
"cache_size": len(self.prediction_cache), |
|
|
"max_cache_size": self.cache_size, |
|
|
"cache_hits": self.cache_hits, |
|
|
"cache_misses": self.cache_misses, |
|
|
"hit_rate": hit_rate |
|
|
} |
|
|
|
|
|
def optimize_batch_size(self, model_features, min_batch=1, max_batch=32, memory_limit_mb=8000): |
|
|
"""Find optimal batch size for throughput within memory constraints""" |
|
|
|
|
|
base_memory = model_features['model_size_mb'] |
|
|
|
|
|
|
|
|
if model_features['total_parameters'] > 100000000: |
|
|
mem_scale_factor = 0.5 |
|
|
else: |
|
|
mem_scale_factor = 0.3 |
|
|
|
|
|
best_throughput = 0 |
|
|
optimal_batch = min_batch |
|
|
|
|
|
|
|
|
batch_results = [] |
|
|
for batch_size in range(min_batch, max_batch + 1): |
|
|
|
|
|
memory_usage = base_memory + (base_memory * mem_scale_factor * batch_size) |
|
|
|
|
|
|
|
|
if memory_usage > memory_limit_mb: |
|
|
continue |
|
|
|
|
|
|
|
|
features = model_features.copy() |
|
|
features['batch_size'] = batch_size |
|
|
exec_time = self.predict(features) |
|
|
|
|
|
|
|
|
throughput = (batch_size * 1000) / exec_time |
|
|
|
|
|
batch_results.append({ |
|
|
'batch_size': batch_size, |
|
|
'exec_time_ms': exec_time, |
|
|
'throughput': throughput, |
|
|
'memory_usage_mb': memory_usage |
|
|
}) |
|
|
|
|
|
|
|
|
if throughput > best_throughput: |
|
|
best_throughput = throughput |
|
|
optimal_batch = batch_size |
|
|
|
|
|
return { |
|
|
'optimal_batch_size': optimal_batch, |
|
|
'predicted_execution_time': next((r['exec_time_ms'] for r in batch_results if r['batch_size'] == optimal_batch), None), |
|
|
'estimated_memory_usage': next((r['memory_usage_mb'] for r in batch_results if r['batch_size'] == optimal_batch), None), |
|
|
'batch_results': batch_results |
|
|
}
|
|
|
|