# ml_module/tools/evaluation_tools.py import json from datetime import datetime from typing import Optional import pandas as pd from agno.tools import Toolkit, tool from sklearn.metrics import ( accuracy_score, classification_report, confusion_matrix, f1_score, precision_score, recall_score, ) from sklearn.model_selection import train_test_split from ml_module.services.storage_service import MLStorageService from ml_module.services.project_service import ProjectService from ml_module.core.exceptions import FileOperationException from ml_module.core.constants import ArtifactTypes from ml_module.core.response_formatter import ( FormattedResponse, Severity, make_text_response, metric_block, simple_table, simple_table_with_types, visualization_block, text_block, ) class ModelEvaluationToolkit(Toolkit): """A toolkit for generating deep evaluation artifacts for trained models with versioning and code generation.""" def __init__(self, storage_service: MLStorageService, user_id: str, project_id: str, project_service: Optional[ProjectService] = None): super().__init__(name="model_evaluation_tools") self.storage = storage_service self.user_id = user_id self.project_id = project_id self.project_service = project_service def _get_base_path(self, subfolder: str = "") -> str: return f"{self.user_id}/{self.project_id}/{subfolder}" @tool def generate_confusion_matrix_data(self, model_path: str, dataset_path: str, target_column: str) -> FormattedResponse: """ Generates and saves the data for a confusion matrix visualization. Args: model_path (str): The path to the trained .joblib model file (e.g., 'models/RandomForest_model.joblib'). dataset_path (str): The path to the processed dataset used for training (e.g., 'processed/cleaned_data.csv'). target_column (str): The name of the column the model predicts. Returns: FormattedResponse: Structured confusion matrix summary with artifact reference. """ try: # Load model and data model = self.storage.load_model(self._get_base_path() + "/" + model_path) df = self.storage.load_dataframe(self._get_base_path() + "/" + dataset_path) # Recreate the exact same test set X = df.drop(columns=[target_column]) y = df[target_column] _, X_test, _, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Generate predictions and matrix y_pred = model.predict(X_test) labels = sorted(y.unique().tolist()) matrix = confusion_matrix(y_test, y_pred, labels=labels) # Format for frontend rendering output_data = { "labels": labels, "matrix": matrix.tolist() } output_path = f"{self._get_base_path('evaluation')}/confusion_matrix.json" info = self.storage.save_json(output_data, output_path) rows = [ {"actual": labels[i], **{str(labels[j]): matrix[i][j] for j in range(len(labels))}} for i in range(len(labels)) ] blocks = [ text_block("Calculated confusion matrix on held-out test set", severity=Severity.SUCCESS), simple_table(rows, caption="Confusion matrix", block_id="confusion_matrix"), text_block(f"Matrix JSON saved to `{output_path}`"), ] return FormattedResponse( blocks=blocks, summary="Generated confusion matrix", correlation_id=info.path, done=True, ) except Exception as e: raise FileOperationException("generate confusion matrix", model_path, e) @tool def get_feature_importance(self, model_path: str) -> FormattedResponse: """ Extracts and saves the feature importance scores from a model. NOTE: This tool is only applicable for models that support feature_importances_, such as RandomForest. It will fail for models like LogisticRegression. Args: model_path (str): The path to the trained .joblib model file. Returns: FormattedResponse: Structured feature importance summary or warning when unavailable. """ try: model = self.storage.load_model(self._get_base_path() + "/" + model_path) if not hasattr(model, 'feature_importances_'): response = make_text_response( "The selected model does not support feature importance extraction.", severity=Severity.WARNING, ) response.summary = "Feature importance unavailable" response.done = True return response importances = model.feature_importances_ feature_names = model.feature_names_in_ importance_data = sorted( [{"feature": name, "importance": score} for name, score in zip(feature_names, importances)], key=lambda x: x["importance"], reverse=True ) output_path = f"{self._get_base_path('evaluation')}/feature_importance.json" info = self.storage.save_json(importance_data, output_path) # Create visualization block for bar chart top_features = importance_data[:15] # Top 15 features for visualization blocks = [ text_block("Extracted model feature importances", severity=Severity.SUCCESS), visualization_block( top_features, chart_type="bar", title="Top Feature Importances", subtitle=f"Showing top {len(top_features)} most important features", block_id="feature_importance_chart" ), simple_table_with_types(importance_data[:10], caption="Top features (detailed)", block_id="feature_importance"), text_block(f"Feature importance saved to `{output_path}`"), ] return FormattedResponse( blocks=blocks, summary="Captured feature importance", correlation_id=info.path, done=True, ) except Exception as e: raise FileOperationException("get feature importance", model_path, e) def generate_evaluation_code( self, model_path: str, dataset_path: str, target_column: str, model_type: str, version: int ) -> str: """ Generate executable Python code that reproduces the evaluation process. Args: model_path: Path to the trained model dataset_path: Path to the processed dataset target_column: Name of the target column model_type: Type of model being evaluated version: Version number for this evaluation code Returns: str: The generated Python evaluation code """ timestamp = datetime.now().isoformat() # Generate comprehensive evaluation code code = f'''#!/usr/bin/env python3 """ Generated ML Evaluation Code - Version {version} Generated on: {timestamp} Model Type: {model_type} Target Column: {target_column} Model Path: {model_path} Dataset Path: {dataset_path} This code reproduces the exact evaluation process used by the ML system. """ import pandas as pd import joblib import json import numpy as np from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.metrics import ( confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score ) def evaluate_model(): """Main evaluation function""" print("="*50) print("šŸ” ML MODEL EVALUATION") print("="*50) try: # 1. Load Model and Data print("\\n1. Loading model and dataset...") model = joblib.load('{model_path}') df = pd.read_csv('{dataset_path}') print(f" Model loaded: {model_type}") print(f" Dataset shape: {{df.shape}}") # 2. Prepare Data (same split as training) print("\\n2. Preparing evaluation data...") if '{target_column}' not in df.columns: raise ValueError(f"Target column '{{target_column}}' not found in dataset") X = df.drop(columns=['{target_column}']) y = df['{target_column}'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) print(f" Test set samples: {{len(X_test)}}") print(f" Features: {{list(X.columns)}}") # 3. Generate Predictions print("\\n3. Generating predictions...") y_pred = model.predict(X_test) print(" Predictions generated successfully!") # 4. Calculate Metrics print("\\n4. Calculating evaluation metrics...") # Basic metrics accuracy = accuracy_score(y_test, y_pred) precision = precision_score(y_test, y_pred, average='weighted') recall = recall_score(y_test, y_pred, average='weighted') f1 = f1_score(y_test, y_pred, average='weighted') # Confusion matrix labels = sorted(y.unique().tolist()) cm = confusion_matrix(y_test, y_pred, labels=labels) # Feature importance (if available) feature_importance = None if hasattr(model, 'feature_importances_'): importance_scores = model.feature_importances_ feature_names = X.columns.tolist() feature_importance = sorted( [{{"feature": name, "importance": float(score)}} for name, score in zip(feature_names, importance_scores)], key=lambda x: x["importance"], reverse=True ) # 5. Prepare Results evaluation_results = {{ "model_type": "{model_type}", "version": {version}, "timestamp": datetime.now().isoformat(), "evaluation_info": {{ "test_samples": len(X_test), "total_features": len(X.columns), "target_column": "{target_column}", "unique_classes": labels }}, "performance": {{ "accuracy": float(accuracy), "precision": float(precision), "recall": float(recall), "f1_score": float(f1) }}, "confusion_matrix": {{ "labels": labels, "matrix": cm.tolist() }}, "feature_importance": feature_importance, "classification_report": classification_report(y_test, y_pred, output_dict=True) }} # 6. Display Results print("\\n" + "="*50) print("šŸ“Š EVALUATION RESULTS") print("="*50) print(f"Accuracy: {{accuracy:.4f}}") print(f"Precision: {{precision:.4f}}") print(f"Recall: {{recall:.4f}}") print(f"F1 Score: {{f1:.4f}}") if feature_importance: print("\\nšŸ” Top 5 Most Important Features:") for i, feat in enumerate(feature_importance[:5]): print(f" {{i+1}}. {{feat['feature']}}: {{feat['importance']:.4f}}") print("\\nšŸ“ˆ Confusion Matrix:") print(" Labels:", labels) for i, row in enumerate(cm): print(f" {{labels[i]}}: {{row.tolist()}}") # 7. Save Results print("\\n7. Saving evaluation results...") # Save confusion matrix with open(f'confusion_matrix_v{version}.json', 'w') as f: json.dump(evaluation_results["confusion_matrix"], f, indent=2) print(f" Confusion matrix saved: confusion_matrix_v{version}.json") # Save feature importance (if available) if feature_importance: with open(f'feature_importance_v{version}.json', 'w') as f: json.dump(feature_importance, f, indent=2) print(f" Feature importance saved: feature_importance_v{version}.json") # Save complete evaluation results with open(f'evaluation_results_v{version}.json', 'w') as f: json.dump(evaluation_results, f, indent=2) print(f" Complete results saved: evaluation_results_v{version}.json") print("\\nšŸŽ‰ Evaluation completed successfully!") return evaluation_results except Exception as e: print(f"\\nāŒ Evaluation failed: {{str(e)}}") raise e if __name__ == "__main__": # Execute evaluation results = evaluate_model() print("\\n" + "="*50) print("EVALUATION SUMMARY") print("="*50) print(f"Model Type: {{results['model_type']}}") print(f"Version: {{results['version']}}") print(f"Accuracy: {{results['performance']['accuracy']:.4f}}") print(f"F1 Score: {{results['performance']['f1_score']:.4f}}") print("="*50) ''' return code @tool def comprehensive_model_evaluation( self, model_path: str, dataset_path: str, target_column: str, model_type: str ) -> FormattedResponse: """ Perform comprehensive model evaluation with versioned artifacts generation. This is the main evaluation tool that creates all evaluation artifacts. Args: model_path (str): Path to the trained model (.joblib file) dataset_path (str): Path to the processed dataset used for training target_column (str): Name of the target column the model predicts model_type (str): Type of model being evaluated (e.g., 'RandomForest') Returns: FormattedResponse: Structured evaluation summary with artifact references """ try: # Get current evaluation version current_version = 1 if self.project_service: try: current_version = self.project_service.get_latest_version(self.user_id, self.project_id, "evaluation") + 1 except: pass # Use version 1 if no previous evaluations # 1. Generate confusion matrix data self.generate_confusion_matrix_data(model_path, dataset_path, target_column) # Load the generated confusion matrix for versioned storage cm_path = f"{self._get_base_path('evaluation')}/confusion_matrix.json" confusion_matrix_data = self.storage.load_json(cm_path) # 2. Generate feature importance (if supported) feature_importance_data = None try: fi_response = self.get_feature_importance(model_path) if fi_response.summary != "Feature importance unavailable": fi_path = f"{self._get_base_path('evaluation')}/feature_importance.json" feature_importance_data = self.storage.load_json(fi_path) except Exception: feature_importance_data = None # 3. Load model and data for additional metrics model = self.storage.load_model(self._get_base_path() + "/" + model_path) df = self.storage.load_dataframe(self._get_base_path() + "/" + dataset_path) # Prepare data (same split as training) X = df.drop(columns=[target_column]) y = df[target_column] _, X_test, _, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Generate predictions y_pred = model.predict(X_test) # Calculate comprehensive metrics accuracy = accuracy_score(y_test, y_pred) precision = precision_score(y_test, y_pred, average='weighted') recall = recall_score(y_test, y_pred, average='weighted') f1 = f1_score(y_test, y_pred, average='weighted') # Get classification report class_report = classification_report(y_test, y_pred, output_dict=True) # 4. Generate evaluation code evaluation_code = self.generate_evaluation_code( model_path, dataset_path, target_column, model_type, current_version ) # 5. Create comprehensive evaluation report report_content = f"""# Model Evaluation Report - Version {current_version} **Generated on:** {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} **Model Type:** {model_type} **Target Column:** {target_column} **Dataset:** {dataset_path} ## šŸ“Š Performance Summary | Metric | Score | |--------|-------| | **Accuracy** | {accuracy:.4f} | | **Precision** | {precision:.4f} | | **Recall** | {recall:.4f} | | **F1 Score** | {f1:.4f} | ## šŸ“ˆ Confusion Matrix Analysis The confusion matrix shows how well the model predicts each class: **Labels:** {confusion_matrix_data['labels']} **Matrix:** ``` {self._format_confusion_matrix(confusion_matrix_data['matrix'], confusion_matrix_data['labels'])} ``` ### Interpretation: - **Diagonal values** represent correct predictions - **Off-diagonal values** represent misclassifications - **Higher diagonal values** indicate better performance for that class ## šŸ” Feature Importance Analysis {self._generate_feature_importance_section(feature_importance_data)} ## šŸ“‹ Detailed Classification Report ```json {json.dumps(class_report, indent=2)} ``` ## šŸŽÆ Model Assessment ### Strengths: - Overall accuracy of {accuracy:.1%} - {'Strong' if f1 > 0.8 else 'Moderate' if f1 > 0.6 else 'Needs improvement'} F1 score ({f1:.4f}) ### Areas for Improvement: {self._generate_improvement_suggestions(accuracy, precision, recall, f1)} ## šŸ”§ Reproducibility The evaluation can be reproduced using the generated evaluation code: - **Evaluation Code:** `evaluation_code_v{current_version}.py` - **Confusion Matrix Data:** `confusion_matrix_v{current_version}.json` {f"- **Feature Importance Data:** `feature_importance_v{current_version}.json`" if feature_importance_data else ""} ## šŸ“ Generated Artifacts All evaluation artifacts have been saved with version {current_version}: 1. **Confusion Matrix:** Contains prediction accuracy breakdown by class 2. **Evaluation Report:** This comprehensive analysis document 3. **Evaluation Code:** Executable Python script to reproduce results {f"4. **Feature Importance:** Model feature ranking and scores" if feature_importance_data else ""} --- *This report was generated automatically by the ML Evaluation System.* """ # 6. Save all versioned artifacts using enhanced storage cm_info = self.storage.save_evaluation_artifact( self.user_id, self.project_id, 'confusion_matrix', confusion_matrix_data, current_version, f"Confusion matrix for {model_type} evaluation", ) fi_info = None if feature_importance_data: fi_info = self.storage.save_evaluation_artifact( self.user_id, self.project_id, 'feature_importance', feature_importance_data, current_version, f"Feature importance for {model_type} evaluation", ) report_info = self.storage.save_evaluation_artifact( self.user_id, self.project_id, 'report', report_content, current_version, f"Comprehensive evaluation report for {model_type}", ) code_info = self.storage.save_evaluation_artifact( self.user_id, self.project_id, 'code', evaluation_code, current_version, f"Reproducible evaluation code for {model_type}", ) if self.project_service: evaluation_meta = { "model_type": model_type, "target_column": target_column, "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1, } self.project_service.register_artifact( self.user_id, self.project_id, ArtifactTypes.CONFUSION_MATRIX, current_version, cm_info, version_scope="evaluation", extra_metadata={**evaluation_meta, "labels": confusion_matrix_data.get("labels", [])}, ) if fi_info: self.project_service.register_artifact( self.user_id, self.project_id, ArtifactTypes.FEATURE_IMPORTANCE, current_version, fi_info, version_scope="evaluation", extra_metadata={**evaluation_meta}, ) self.project_service.register_artifact( self.user_id, self.project_id, ArtifactTypes.EVALUATION_REPORT, current_version, report_info, version_scope="evaluation", extra_metadata={**evaluation_meta}, ) self.project_service.register_artifact( self.user_id, self.project_id, ArtifactTypes.EVALUATION_CODE, current_version, code_info, version_scope="evaluation", extra_metadata={**evaluation_meta, "lines_of_code": evaluation_code.count("\n") + 1}, ) # 7. Return comprehensive structured response cm_filename = cm_info.path.split("/")[-1] report_filename = report_info.path.split("/")[-1] code_filename = code_info.path.split("/")[-1] fi_filename = fi_info.path.split("/")[-1] if fi_info else None cm_labels = confusion_matrix_data.get("labels", []) cm_matrix = confusion_matrix_data.get("matrix", []) cm_rows = [ { "actual": label, **{f"pred_{pred_label}": value for pred_label, value in zip(cm_labels, row)}, } for label, row in zip(cm_labels, cm_matrix) ] artifact_lines = [ f"- Confusion matrix: `{cm_filename}`", f"- Evaluation report: `{report_filename}`", f"- Evaluation code: `{code_filename}`", ] if fi_filename: artifact_lines.append(f"- Feature importance: `{fi_filename}`") insights = [ "- Model shows strong predictive performance" if accuracy > 0.85 else "- Model shows moderate predictive performance", ] if abs(precision - recall) >= 0.05: insights.append("- Precision and recall diverge; inspect class balance") else: insights.append("- Precision and recall remain balanced") if feature_importance_data: top_feature = feature_importance_data[0] insights.append(f"- Top feature: {top_feature['feature']} ({top_feature['importance']:.4f})") next_steps = [ "Review the evaluation report", "Inspect confusion matrix for class-level accuracy", "Use generated code to reproduce results", ] if feature_importance_data: next_steps.append("Analyse feature importance for interpretability") blocks = [ text_block( f"Completed evaluation run v{current_version} for `{model_type}`", severity=Severity.SUCCESS, ), metric_block("Accuracy", accuracy), metric_block("Precision", precision), metric_block("Recall", recall), metric_block("F1 Score", f1), visualization_block( [ {"metric": "Accuracy", "score": accuracy}, {"metric": "Precision", "score": precision}, {"metric": "Recall", "score": recall}, {"metric": "F1 Score", "score": f1}, ], chart_type="bar", title="Model Performance Metrics", subtitle=f"Evaluation v{current_version}", block_id="evaluation_metrics_chart" ), text_block("\n".join(artifact_lines), severity=Severity.INFO, block_id="evaluation_artifacts"), simple_table_with_types(cm_rows, caption="Confusion matrix (actual vs predicted)", block_id="evaluation_confusion_matrix"), text_block("**Insights**\n" + "\n".join(insights), severity=Severity.INFO, block_id="evaluation_insights"), text_block( "**Next steps**\n" + "\n".join(f"- {step}" for step in next_steps), severity=Severity.INFO, block_id="evaluation_next_steps", ), ] if feature_importance_data: # Add chart for feature importance blocks.insert( 6, visualization_block( feature_importance_data[:10], chart_type="bar", title="Top Feature Importances", subtitle=f"Top 10 features impacting {model_type} predictions", block_id="feature_importance_chart" ), ) blocks.insert( 7, simple_table_with_types( feature_importance_data[:5], caption="Top feature importances (detailed)", block_id="feature_importance_top", ), ) else: blocks.insert( 5, text_block( "Feature importance not available for this model type", severity=Severity.WARNING, block_id="feature_importance_warning", ), ) return FormattedResponse( blocks=blocks, summary=f"Evaluation complete v{current_version}", correlation_id=report_info.path, done=True, ) except Exception as e: raise FileOperationException("comprehensive model evaluation", model_path, e) def _format_confusion_matrix(self, matrix, labels): """Format confusion matrix for markdown display""" formatted = " " + " ".join(f"{label:>8}" for label in labels) + "\\n" for i, row in enumerate(matrix): formatted += f"{labels[i]:>8}" + "".join(f"{val:>8}" for val in row) + "\\n" return formatted def _generate_feature_importance_section(self, feature_importance_data): """Generate feature importance section for report""" if not feature_importance_data: return "Feature importance analysis is not available for this model type." section = "The following features have the most impact on model predictions:\\n\\n" section += "| Rank | Feature | Importance Score |\\n" section += "|------|---------|------------------|\\n" for i, feat in enumerate(feature_importance_data[:10]): # Top 10 features section += f"| {i+1} | {feat['feature']} | {feat['importance']:.4f} |\\n" return section def _generate_improvement_suggestions(self, accuracy, precision, recall, f1): """Generate improvement suggestions based on metrics""" suggestions = [] if accuracy < 0.7: suggestions.append("- Consider collecting more training data") suggestions.append("- Try different model algorithms or hyperparameters") if precision < recall: suggestions.append("- Model may be predicting too many positive cases") suggestions.append("- Consider adjusting classification threshold") elif recall < precision: suggestions.append("- Model may be too conservative in predictions") suggestions.append("- Consider feature engineering or data augmentation") if f1 < 0.6: suggestions.append("- Significant model improvement needed") suggestions.append("- Review data quality and feature selection") return "\\n".join(suggestions) if suggestions else "- Model performance appears satisfactory"