FraudSimulator-AI / ensemble_predictor.py
BDR-AI's picture
Add ensemble predictor with 5-model architecture (Step 3/5)
4160c28 verified
"""
Ensemble Predictor - 5-Model Architecture with Meta Learning
Implements the Maysat method with weighted voting and stacked generalization
"""
import pickle
import json
import os
import numpy as np
from typing import Dict, List, Tuple, Any
class EnsemblePredictor:
"""
Ensemble fraud detection using 5 models + meta learner
- Random Forest (baseline)
- XGBoost (gradient boosting)
- LightGBM (fast training)
- CatBoost (categorical features)
- DistilBERT (text analysis via text_processor)
"""
def __init__(self):
self.models = {}
self.meta_learner = None
self.scaler = None
self.encoder = None
self.feature_columns = None
self.model_weights = {
'xgboost': 0.25,
'lightgbm': 0.25,
'catboost': 0.20,
'random_forest': 0.15,
'distilbert': 0.15
}
self.load_models()
def load_models(self):
"""Load all model artifacts if available"""
try:
models_path = 'models/'
# Load Random Forest (baseline)
if os.path.exists(f'{models_path}fraud_rf_model.pkl'):
with open(f'{models_path}fraud_rf_model.pkl', 'rb') as f:
self.models['random_forest'] = pickle.load(f)
print("βœ“ Random Forest loaded")
# Load XGBoost
if os.path.exists(f'{models_path}fraud_xgb_model.pkl'):
with open(f'{models_path}fraud_xgb_model.pkl', 'rb') as f:
self.models['xgboost'] = pickle.load(f)
print("βœ“ XGBoost loaded")
# Load LightGBM
if os.path.exists(f'{models_path}fraud_lgb_model.pkl'):
with open(f'{models_path}fraud_lgb_model.pkl', 'rb') as f:
self.models['lightgbm'] = pickle.load(f)
print("βœ“ LightGBM loaded")
# Load CatBoost
if os.path.exists(f'{models_path}fraud_cat_model.pkl'):
with open(f'{models_path}fraud_cat_model.pkl', 'rb') as f:
self.models['catboost'] = pickle.load(f)
print("βœ“ CatBoost loaded")
# Load preprocessing artifacts
if os.path.exists(f'{models_path}fraud_scaler.pkl'):
with open(f'{models_path}fraud_scaler.pkl', 'rb') as f:
self.scaler = pickle.load(f)
if os.path.exists(f'{models_path}fraud_encoder.pkl'):
with open(f'{models_path}fraud_encoder.pkl', 'rb') as f:
self.encoder = pickle.load(f)
if os.path.exists(f'{models_path}feature_columns.json'):
with open(f'{models_path}feature_columns.json', 'r') as f:
self.feature_columns = json.load(f)
# Load meta learner if available
if os.path.exists(f'{models_path}meta_learner.pkl'):
with open(f'{models_path}meta_learner.pkl', 'rb') as f:
self.meta_learner = pickle.load(f)
print("βœ“ Meta Learner loaded")
print(f"βœ“ Ensemble loaded: {len(self.models)} models")
except Exception as e:
print(f"Model loading error: {e}")
def predict_ensemble(self, features: np.ndarray, text_score: float = None) -> Dict[str, Any]:
"""
Predict using ensemble with weighted voting
Args:
features: Engineered features array
text_score: Optional text analysis score from DistilBERT
Returns:
Dictionary with ensemble prediction and individual model scores
"""
if len(self.models) == 0:
return {
'ensemble_score': None,
'method': 'No models loaded',
'individual_scores': {}
}
try:
# Scale features
if self.scaler is not None:
features_scaled = self.scaler.transform([features])
else:
features_scaled = np.array([features])
# Get predictions from each model
individual_scores = {}
for model_name, model in self.models.items():
try:
# Get probability of fraud (class 1)
if hasattr(model, 'predict_proba'):
prob = model.predict_proba(features_scaled)[0][1]
else:
prob = model.predict(features_scaled)[0]
individual_scores[model_name] = float(prob)
except Exception as e:
print(f"Error predicting with {model_name}: {e}")
individual_scores[model_name] = 0.0
# Add text score if available
if text_score is not None:
individual_scores['distilbert'] = text_score
# Ensemble prediction
if self.meta_learner is not None:
# Use meta learner (stacked generalization)
meta_features = np.array([[individual_scores.get(m, 0.0) for m in self.model_weights.keys()]])
ensemble_score = self.meta_learner.predict_proba(meta_features)[0][1]
method = "Meta Learner (Stacked)"
else:
# Use weighted voting
ensemble_score = 0.0
total_weight = 0.0
for model_name, weight in self.model_weights.items():
if model_name in individual_scores:
ensemble_score += individual_scores[model_name] * weight
total_weight += weight
if total_weight > 0:
ensemble_score /= total_weight
method = "Weighted Voting"
return {
'ensemble_score': float(ensemble_score),
'method': method,
'individual_scores': individual_scores,
'num_models': len(individual_scores)
}
except Exception as e:
print(f"Ensemble prediction error: {e}")
return {
'ensemble_score': None,
'method': 'Error',
'individual_scores': {},
'error': str(e)
}
def get_model_status(self) -> Dict[str, bool]:
"""Check which models are loaded"""
return {
'random_forest': 'random_forest' in self.models,
'xgboost': 'xgboost' in self.models,
'lightgbm': 'lightgbm' in self.models,
'catboost': 'catboost' in self.models,
'meta_learner': self.meta_learner is not None,
'scaler': self.scaler is not None,
'encoder': self.encoder is not None
}
def get_feature_importance(self, model_name: str = 'random_forest') -> List[Tuple[str, float]]:
"""Get feature importance from specified model"""
if model_name not in self.models:
return []
model = self.models[model_name]
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
if self.feature_columns:
return sorted(
zip(self.feature_columns, importances),
key=lambda x: x[1],
reverse=True
)
return []
# Test the ensemble
if __name__ == "__main__":
print("="*60)
print("Ensemble Predictor - Model Status Check")
print("="*60)
ensemble = EnsemblePredictor()
status = ensemble.get_model_status()
print("\nModel Status:")
for model, loaded in status.items():
status_icon = "βœ“" if loaded else "βœ—"
print(f" {status_icon} {model}: {'Loaded' if loaded else 'Not found'}")
print("\n" + "="*60)
print(f"Ensemble ready with {len(ensemble.models)} models")
print("="*60)