Spaces:
Build error
Build error
| """ | |
| Enhanced Model Evaluation Script | |
| Includes comprehensive metrics, drift detection, and performance monitoring | |
| """ | |
| import os | |
| import sys | |
| import pandas as pd | |
| import numpy as np | |
| import pickle | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, f1_score, | |
| confusion_matrix, roc_curve, auc, classification_report | |
| ) | |
| import matplotlib.pyplot as plt | |
| import yaml | |
| # Add parent directory to path | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) | |
| from src.monitoring.drift_detector import DriftDetector | |
| from src.monitoring.metrics_collector import MetricsCollector | |
| from src.monitoring.monitoring_service import MonitoringService | |
| def load_params(): | |
| """Load parameters from params.yaml""" | |
| with open("params.yaml", "r") as f: | |
| return yaml.safe_load(f) | |
| def evaluate_model_comprehensive(model_path: str, strategy_type: str, | |
| test_data: pd.DataFrame, | |
| monitoring_service: MonitoringService): | |
| """ | |
| Comprehensive model evaluation | |
| Args: | |
| model_path: Path to model file | |
| strategy_type: TOP or BOTTOM | |
| test_data: Test dataset | |
| monitoring_service: Monitoring service instance | |
| """ | |
| print(f"\n{'='*60}") | |
| print(f"Evaluating {strategy_type} Strategy Model") | |
| print(f"{'='*60}\n") | |
| # Load model | |
| if not os.path.exists(model_path): | |
| print(f"Error: Model not found at {model_path}") | |
| return None | |
| with open(model_path, "rb") as f: | |
| model = pickle.load(f) | |
| # Prepare features | |
| features = ["sma_10", "sma_20", "rsi", "volatility", "price_position"] | |
| X_test = test_data[features].fillna(0) | |
| # Create labels | |
| if strategy_type == "TOP": | |
| y_test = ((test_data["price_position"] > 70) & | |
| (test_data["rsi"] > 50) & (test_data["rsi"] < 70)).astype(int) | |
| else: # BOTTOM | |
| y_test = ((test_data["price_position"] < 30) & | |
| (test_data["rsi"] < 30)).astype(int) | |
| # Predictions | |
| y_pred = model.predict(X_test) | |
| try: | |
| y_proba = model.predict_proba(X_test)[:, 1] | |
| except: | |
| y_proba = None | |
| # Basic metrics | |
| accuracy = accuracy_score(y_test, y_pred) | |
| precision = precision_score(y_test, y_pred, zero_division=0) | |
| recall = recall_score(y_test, y_pred, zero_division=0) | |
| f1 = f1_score(y_test, y_pred, zero_division=0) | |
| print(f"π Basic Metrics:") | |
| print(f" Accuracy: {accuracy:.4f}") | |
| print(f" Precision: {precision:.4f}") | |
| print(f" Recall: {recall:.4f}") | |
| print(f" F1 Score: {f1:.4f}") | |
| # Classification report | |
| print(f"\nπ Classification Report:") | |
| print(classification_report(y_test, y_pred, | |
| target_names=['HOLD', 'BUY'], | |
| zero_division=0)) | |
| # Confusion Matrix | |
| cm = confusion_matrix(y_test, y_pred) | |
| print(f"\nπ’ Confusion Matrix:") | |
| print(f" {'':>10} Predicted HOLD Predicted BUY") | |
| print(f" Actual HOLD {cm[0,0]:>6} {cm[0,1]:>6}") | |
| print(f" Actual BUY {cm[1,0]:>6} {cm[1,1]:>6}") | |
| # ROC Curve (if probabilities available) | |
| roc_auc = None | |
| if y_proba is not None and len(np.unique(y_test)) > 1: | |
| try: | |
| fpr, tpr, _ = roc_curve(y_test, y_proba) | |
| roc_auc = auc(fpr, tpr) | |
| print(f"\nπ ROC AUC Score: {roc_auc:.4f}") | |
| except: | |
| pass | |
| # Drift Detection | |
| print(f"\nπ Drift Detection:") | |
| drift_result = monitoring_service.drift_detector.detect_drift( | |
| test_data[features] | |
| ) | |
| if drift_result.get("drift_detected"): | |
| print(f" β οΈ DRIFT DETECTED!") | |
| for feature, drift_info in drift_result.get("feature_drifts", {}).items(): | |
| if drift_info.get("drift_detected"): | |
| print(f" - {feature}: p-value = {drift_info['p_value']:.4f}") | |
| else: | |
| print(f" β No significant drift detected") | |
| # Create plots | |
| os.makedirs("plots", exist_ok=True) | |
| # Confusion Matrix Plot | |
| plt.figure(figsize=(8, 6)) | |
| plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) | |
| plt.title(f'Confusion Matrix - {strategy_type} Strategy') | |
| plt.colorbar() | |
| tick_marks = np.arange(2) | |
| plt.xticks(tick_marks, ['HOLD', 'BUY']) | |
| plt.yticks(tick_marks, ['HOLD', 'BUY']) | |
| plt.ylabel('True label') | |
| plt.xlabel('Predicted label') | |
| thresh = cm.max() / 2. | |
| for i, j in np.ndindex(cm.shape): | |
| plt.text(j, i, format(cm[i, j], 'd'), | |
| horizontalalignment="center", | |
| color="white" if cm[i, j] > thresh else "black") | |
| plt.tight_layout() | |
| plt.savefig(f"plots/confusion_matrix_{strategy_type.lower()}.png") | |
| plt.close() | |
| # ROC Curve Plot | |
| if roc_auc is not None: | |
| plt.figure(figsize=(8, 6)) | |
| plt.plot(fpr, tpr, color='darkorange', lw=2, | |
| label=f'ROC curve (AUC = {roc_auc:.2f})') | |
| plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') | |
| plt.xlim([0.0, 1.0]) | |
| plt.ylim([0.0, 1.05]) | |
| plt.xlabel('False Positive Rate') | |
| plt.ylabel('True Positive Rate') | |
| plt.title(f'ROC Curve - {strategy_type} Strategy') | |
| plt.legend(loc="lower right") | |
| plt.tight_layout() | |
| plt.savefig(f"plots/roc_curve_{strategy_type.lower()}.png") | |
| plt.close() | |
| # Compile results | |
| results = { | |
| "model_path": model_path, | |
| "strategy_type": strategy_type, | |
| "evaluation_date": datetime.now().isoformat(), | |
| "metrics": { | |
| "accuracy": float(accuracy), | |
| "precision": float(precision), | |
| "recall": float(recall), | |
| "f1_score": float(f1), | |
| "roc_auc": float(roc_auc) if roc_auc else None | |
| }, | |
| "confusion_matrix": cm.tolist(), | |
| "sample_size": { | |
| "total": int(len(y_test)), | |
| "positive": int(y_test.sum()), | |
| "negative": int(len(y_test) - y_test.sum()) | |
| }, | |
| "drift_detection": drift_result | |
| } | |
| return results | |
| def main(): | |
| """Main evaluation function""" | |
| params = load_params() | |
| # Load test data | |
| test_data_path = "data/processed/indicators.parquet" | |
| if not os.path.exists(test_data_path): | |
| print(f"Error: Test data not found at {test_data_path}") | |
| print("Please run prepare_data.py first") | |
| return | |
| test_data = pd.read_parquet(test_data_path) | |
| test_data = test_data.dropna(subset=["rsi", "sma_10", "sma_20"]) | |
| print(f"π Loaded {len(test_data)} test samples") | |
| # Initialize monitoring service | |
| monitoring_service = MonitoringService( | |
| drift_threshold=params["mlops"]["monitoring"]["drift_threshold"] | |
| ) | |
| # Initialize reference baseline (use first 50% as reference) | |
| split_idx = len(test_data) // 2 | |
| reference_data = test_data.iloc[:split_idx] | |
| monitoring_service.initialize_reference_baseline( | |
| reference_data[["sma_10", "sma_20", "rsi", "volatility", "price_position"]] | |
| ) | |
| # Use second half for testing | |
| test_data = test_data.iloc[split_idx:] | |
| print(f"π Using {len(test_data)} samples for testing") | |
| os.makedirs("metrics", exist_ok=True) | |
| all_results = {} | |
| # Evaluate both strategies | |
| for strategy_type in ["TOP", "BOTTOM"]: | |
| model_path = f"models/{strategy_type.lower()}_strategy_model.pkl" | |
| if os.path.exists(model_path): | |
| results = evaluate_model_comprehensive( | |
| model_path, strategy_type, test_data, monitoring_service | |
| ) | |
| if results: | |
| all_results[strategy_type] = results | |
| # Record metrics | |
| monitoring_service.metrics_collector.record_model_metrics( | |
| f"{strategy_type.lower()}_strategy_model", | |
| results["metrics"] | |
| ) | |
| else: | |
| print(f"\nβ οΈ Model not found: {model_path}") | |
| print(f" Skipping {strategy_type} strategy evaluation") | |
| # Save comprehensive results | |
| with open("metrics/comprehensive_evaluation.json", "w") as f: | |
| json.dump(all_results, f, indent=2) | |
| # Health report | |
| print(f"\n{'='*60}") | |
| print(f"System Health Report") | |
| print(f"{'='*60}\n") | |
| health_report = monitoring_service.get_health_report() | |
| print(f"Status: {health_report['status']}") | |
| print(f"Metrics:") | |
| for key, value in health_report["metrics"].items(): | |
| print(f" {key}: {value}") | |
| # Save health report | |
| with open("metrics/health_report.json", "w") as f: | |
| json.dump(health_report, f, indent=2) | |
| print(f"\nβ Evaluation complete!") | |
| print(f"π Results saved to:") | |
| print(f" - metrics/comprehensive_evaluation.json") | |
| print(f" - metrics/health_report.json") | |
| print(f" - plots/ (confusion matrices and ROC curves)") | |
| if __name__ == "__main__": | |
| main() | |