sirus / backend /ml_module /tools /evaluation_tools.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
783a952
# ml_module/tools/evaluation_tools.py
import json
from datetime import datetime
from typing import Optional
import pandas as pd
from agno.tools import Toolkit, tool
from sklearn.metrics import (
accuracy_score,
classification_report,
confusion_matrix,
f1_score,
precision_score,
recall_score,
)
from sklearn.model_selection import train_test_split
from ml_module.services.storage_service import MLStorageService
from ml_module.services.project_service import ProjectService
from ml_module.core.exceptions import FileOperationException
from ml_module.core.constants import ArtifactTypes
from ml_module.core.response_formatter import (
FormattedResponse,
Severity,
make_text_response,
metric_block,
simple_table,
simple_table_with_types,
visualization_block,
text_block,
)
class ModelEvaluationToolkit(Toolkit):
"""A toolkit for generating deep evaluation artifacts for trained models with versioning and code generation."""
def __init__(self, storage_service: MLStorageService, user_id: str, project_id: str, project_service: Optional[ProjectService] = None):
super().__init__(name="model_evaluation_tools")
self.storage = storage_service
self.user_id = user_id
self.project_id = project_id
self.project_service = project_service
def _get_base_path(self, subfolder: str = "") -> str:
return f"{self.user_id}/{self.project_id}/{subfolder}"
@tool
def generate_confusion_matrix_data(self, model_path: str, dataset_path: str, target_column: str) -> FormattedResponse:
"""
Generates and saves the data for a confusion matrix visualization.
Args:
model_path (str): The path to the trained .joblib model file (e.g., 'models/RandomForest_model.joblib').
dataset_path (str): The path to the processed dataset used for training (e.g., 'processed/cleaned_data.csv').
target_column (str): The name of the column the model predicts.
Returns:
FormattedResponse: Structured confusion matrix summary with artifact reference.
"""
try:
# Load model and data
model = self.storage.load_model(self._get_base_path() + "/" + model_path)
df = self.storage.load_dataframe(self._get_base_path() + "/" + dataset_path)
# Recreate the exact same test set
X = df.drop(columns=[target_column])
y = df[target_column]
_, X_test, _, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Generate predictions and matrix
y_pred = model.predict(X_test)
labels = sorted(y.unique().tolist())
matrix = confusion_matrix(y_test, y_pred, labels=labels)
# Format for frontend rendering
output_data = {
"labels": labels,
"matrix": matrix.tolist()
}
output_path = f"{self._get_base_path('evaluation')}/confusion_matrix.json"
info = self.storage.save_json(output_data, output_path)
rows = [
{"actual": labels[i], **{str(labels[j]): matrix[i][j] for j in range(len(labels))}}
for i in range(len(labels))
]
blocks = [
text_block("Calculated confusion matrix on held-out test set", severity=Severity.SUCCESS),
simple_table(rows, caption="Confusion matrix", block_id="confusion_matrix"),
text_block(f"Matrix JSON saved to `{output_path}`"),
]
return FormattedResponse(
blocks=blocks,
summary="Generated confusion matrix",
correlation_id=info.path,
done=True,
)
except Exception as e:
raise FileOperationException("generate confusion matrix", model_path, e)
@tool
def get_feature_importance(self, model_path: str) -> FormattedResponse:
"""
Extracts and saves the feature importance scores from a model.
NOTE: This tool is only applicable for models that support feature_importances_,
such as RandomForest. It will fail for models like LogisticRegression.
Args:
model_path (str): The path to the trained .joblib model file.
Returns:
FormattedResponse: Structured feature importance summary or warning when unavailable.
"""
try:
model = self.storage.load_model(self._get_base_path() + "/" + model_path)
if not hasattr(model, 'feature_importances_'):
response = make_text_response(
"The selected model does not support feature importance extraction.",
severity=Severity.WARNING,
)
response.summary = "Feature importance unavailable"
response.done = True
return response
importances = model.feature_importances_
feature_names = model.feature_names_in_
importance_data = sorted(
[{"feature": name, "importance": score} for name, score in zip(feature_names, importances)],
key=lambda x: x["importance"],
reverse=True
)
output_path = f"{self._get_base_path('evaluation')}/feature_importance.json"
info = self.storage.save_json(importance_data, output_path)
# Create visualization block for bar chart
top_features = importance_data[:15] # Top 15 features for visualization
blocks = [
text_block("Extracted model feature importances", severity=Severity.SUCCESS),
visualization_block(
top_features,
chart_type="bar",
title="Top Feature Importances",
subtitle=f"Showing top {len(top_features)} most important features",
block_id="feature_importance_chart"
),
simple_table_with_types(importance_data[:10], caption="Top features (detailed)", block_id="feature_importance"),
text_block(f"Feature importance saved to `{output_path}`"),
]
return FormattedResponse(
blocks=blocks,
summary="Captured feature importance",
correlation_id=info.path,
done=True,
)
except Exception as e:
raise FileOperationException("get feature importance", model_path, e)
def generate_evaluation_code(
self,
model_path: str,
dataset_path: str,
target_column: str,
model_type: str,
version: int
) -> str:
"""
Generate executable Python code that reproduces the evaluation process.
Args:
model_path: Path to the trained model
dataset_path: Path to the processed dataset
target_column: Name of the target column
model_type: Type of model being evaluated
version: Version number for this evaluation code
Returns:
str: The generated Python evaluation code
"""
timestamp = datetime.now().isoformat()
# Generate comprehensive evaluation code
code = f'''#!/usr/bin/env python3
"""
Generated ML Evaluation Code - Version {version}
Generated on: {timestamp}
Model Type: {model_type}
Target Column: {target_column}
Model Path: {model_path}
Dataset Path: {dataset_path}
This code reproduces the exact evaluation process used by the ML system.
"""
import pandas as pd
import joblib
import json
import numpy as np
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
confusion_matrix, classification_report, accuracy_score,
precision_score, recall_score, f1_score
)
def evaluate_model():
"""Main evaluation function"""
print("="*50)
print("🔍 ML MODEL EVALUATION")
print("="*50)
try:
# 1. Load Model and Data
print("\\n1. Loading model and dataset...")
model = joblib.load('{model_path}')
df = pd.read_csv('{dataset_path}')
print(f" Model loaded: {model_type}")
print(f" Dataset shape: {{df.shape}}")
# 2. Prepare Data (same split as training)
print("\\n2. Preparing evaluation data...")
if '{target_column}' not in df.columns:
raise ValueError(f"Target column '{{target_column}}' not found in dataset")
X = df.drop(columns=['{target_column}'])
y = df['{target_column}']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f" Test set samples: {{len(X_test)}}")
print(f" Features: {{list(X.columns)}}")
# 3. Generate Predictions
print("\\n3. Generating predictions...")
y_pred = model.predict(X_test)
print(" Predictions generated successfully!")
# 4. Calculate Metrics
print("\\n4. Calculating evaluation metrics...")
# Basic metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
# Confusion matrix
labels = sorted(y.unique().tolist())
cm = confusion_matrix(y_test, y_pred, labels=labels)
# Feature importance (if available)
feature_importance = None
if hasattr(model, 'feature_importances_'):
importance_scores = model.feature_importances_
feature_names = X.columns.tolist()
feature_importance = sorted(
[{{"feature": name, "importance": float(score)}}
for name, score in zip(feature_names, importance_scores)],
key=lambda x: x["importance"], reverse=True
)
# 5. Prepare Results
evaluation_results = {{
"model_type": "{model_type}",
"version": {version},
"timestamp": datetime.now().isoformat(),
"evaluation_info": {{
"test_samples": len(X_test),
"total_features": len(X.columns),
"target_column": "{target_column}",
"unique_classes": labels
}},
"performance": {{
"accuracy": float(accuracy),
"precision": float(precision),
"recall": float(recall),
"f1_score": float(f1)
}},
"confusion_matrix": {{
"labels": labels,
"matrix": cm.tolist()
}},
"feature_importance": feature_importance,
"classification_report": classification_report(y_test, y_pred, output_dict=True)
}}
# 6. Display Results
print("\\n" + "="*50)
print("📊 EVALUATION RESULTS")
print("="*50)
print(f"Accuracy: {{accuracy:.4f}}")
print(f"Precision: {{precision:.4f}}")
print(f"Recall: {{recall:.4f}}")
print(f"F1 Score: {{f1:.4f}}")
if feature_importance:
print("\\n🔍 Top 5 Most Important Features:")
for i, feat in enumerate(feature_importance[:5]):
print(f" {{i+1}}. {{feat['feature']}}: {{feat['importance']:.4f}}")
print("\\n📈 Confusion Matrix:")
print(" Labels:", labels)
for i, row in enumerate(cm):
print(f" {{labels[i]}}: {{row.tolist()}}")
# 7. Save Results
print("\\n7. Saving evaluation results...")
# Save confusion matrix
with open(f'confusion_matrix_v{version}.json', 'w') as f:
json.dump(evaluation_results["confusion_matrix"], f, indent=2)
print(f" Confusion matrix saved: confusion_matrix_v{version}.json")
# Save feature importance (if available)
if feature_importance:
with open(f'feature_importance_v{version}.json', 'w') as f:
json.dump(feature_importance, f, indent=2)
print(f" Feature importance saved: feature_importance_v{version}.json")
# Save complete evaluation results
with open(f'evaluation_results_v{version}.json', 'w') as f:
json.dump(evaluation_results, f, indent=2)
print(f" Complete results saved: evaluation_results_v{version}.json")
print("\\n🎉 Evaluation completed successfully!")
return evaluation_results
except Exception as e:
print(f"\\n❌ Evaluation failed: {{str(e)}}")
raise e
if __name__ == "__main__":
# Execute evaluation
results = evaluate_model()
print("\\n" + "="*50)
print("EVALUATION SUMMARY")
print("="*50)
print(f"Model Type: {{results['model_type']}}")
print(f"Version: {{results['version']}}")
print(f"Accuracy: {{results['performance']['accuracy']:.4f}}")
print(f"F1 Score: {{results['performance']['f1_score']:.4f}}")
print("="*50)
'''
return code
@tool
def comprehensive_model_evaluation(
self,
model_path: str,
dataset_path: str,
target_column: str,
model_type: str
) -> FormattedResponse:
"""
Perform comprehensive model evaluation with versioned artifacts generation.
This is the main evaluation tool that creates all evaluation artifacts.
Args:
model_path (str): Path to the trained model (.joblib file)
dataset_path (str): Path to the processed dataset used for training
target_column (str): Name of the target column the model predicts
model_type (str): Type of model being evaluated (e.g., 'RandomForest')
Returns:
FormattedResponse: Structured evaluation summary with artifact references
"""
try:
# Get current evaluation version
current_version = 1
if self.project_service:
try:
current_version = self.project_service.get_latest_version(self.user_id, self.project_id, "evaluation") + 1
except:
pass # Use version 1 if no previous evaluations
# 1. Generate confusion matrix data
self.generate_confusion_matrix_data(model_path, dataset_path, target_column)
# Load the generated confusion matrix for versioned storage
cm_path = f"{self._get_base_path('evaluation')}/confusion_matrix.json"
confusion_matrix_data = self.storage.load_json(cm_path)
# 2. Generate feature importance (if supported)
feature_importance_data = None
try:
fi_response = self.get_feature_importance(model_path)
if fi_response.summary != "Feature importance unavailable":
fi_path = f"{self._get_base_path('evaluation')}/feature_importance.json"
feature_importance_data = self.storage.load_json(fi_path)
except Exception:
feature_importance_data = None
# 3. Load model and data for additional metrics
model = self.storage.load_model(self._get_base_path() + "/" + model_path)
df = self.storage.load_dataframe(self._get_base_path() + "/" + dataset_path)
# Prepare data (same split as training)
X = df.drop(columns=[target_column])
y = df[target_column]
_, X_test, _, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Generate predictions
y_pred = model.predict(X_test)
# Calculate comprehensive metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
# Get classification report
class_report = classification_report(y_test, y_pred, output_dict=True)
# 4. Generate evaluation code
evaluation_code = self.generate_evaluation_code(
model_path, dataset_path, target_column, model_type, current_version
)
# 5. Create comprehensive evaluation report
report_content = f"""# Model Evaluation Report - Version {current_version}
**Generated on:** {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
**Model Type:** {model_type}
**Target Column:** {target_column}
**Dataset:** {dataset_path}
## 📊 Performance Summary
| Metric | Score |
|--------|-------|
| **Accuracy** | {accuracy:.4f} |
| **Precision** | {precision:.4f} |
| **Recall** | {recall:.4f} |
| **F1 Score** | {f1:.4f} |
## 📈 Confusion Matrix Analysis
The confusion matrix shows how well the model predicts each class:
**Labels:** {confusion_matrix_data['labels']}
**Matrix:**
```
{self._format_confusion_matrix(confusion_matrix_data['matrix'], confusion_matrix_data['labels'])}
```
### Interpretation:
- **Diagonal values** represent correct predictions
- **Off-diagonal values** represent misclassifications
- **Higher diagonal values** indicate better performance for that class
## 🔍 Feature Importance Analysis
{self._generate_feature_importance_section(feature_importance_data)}
## 📋 Detailed Classification Report
```json
{json.dumps(class_report, indent=2)}
```
## 🎯 Model Assessment
### Strengths:
- Overall accuracy of {accuracy:.1%}
- {'Strong' if f1 > 0.8 else 'Moderate' if f1 > 0.6 else 'Needs improvement'} F1 score ({f1:.4f})
### Areas for Improvement:
{self._generate_improvement_suggestions(accuracy, precision, recall, f1)}
## 🔧 Reproducibility
The evaluation can be reproduced using the generated evaluation code:
- **Evaluation Code:** `evaluation_code_v{current_version}.py`
- **Confusion Matrix Data:** `confusion_matrix_v{current_version}.json`
{f"- **Feature Importance Data:** `feature_importance_v{current_version}.json`" if feature_importance_data else ""}
## 📁 Generated Artifacts
All evaluation artifacts have been saved with version {current_version}:
1. **Confusion Matrix:** Contains prediction accuracy breakdown by class
2. **Evaluation Report:** This comprehensive analysis document
3. **Evaluation Code:** Executable Python script to reproduce results
{f"4. **Feature Importance:** Model feature ranking and scores" if feature_importance_data else ""}
---
*This report was generated automatically by the ML Evaluation System.*
"""
# 6. Save all versioned artifacts using enhanced storage
cm_info = self.storage.save_evaluation_artifact(
self.user_id,
self.project_id,
'confusion_matrix',
confusion_matrix_data,
current_version,
f"Confusion matrix for {model_type} evaluation",
)
fi_info = None
if feature_importance_data:
fi_info = self.storage.save_evaluation_artifact(
self.user_id,
self.project_id,
'feature_importance',
feature_importance_data,
current_version,
f"Feature importance for {model_type} evaluation",
)
report_info = self.storage.save_evaluation_artifact(
self.user_id,
self.project_id,
'report',
report_content,
current_version,
f"Comprehensive evaluation report for {model_type}",
)
code_info = self.storage.save_evaluation_artifact(
self.user_id,
self.project_id,
'code',
evaluation_code,
current_version,
f"Reproducible evaluation code for {model_type}",
)
if self.project_service:
evaluation_meta = {
"model_type": model_type,
"target_column": target_column,
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1,
}
self.project_service.register_artifact(
self.user_id,
self.project_id,
ArtifactTypes.CONFUSION_MATRIX,
current_version,
cm_info,
version_scope="evaluation",
extra_metadata={**evaluation_meta, "labels": confusion_matrix_data.get("labels", [])},
)
if fi_info:
self.project_service.register_artifact(
self.user_id,
self.project_id,
ArtifactTypes.FEATURE_IMPORTANCE,
current_version,
fi_info,
version_scope="evaluation",
extra_metadata={**evaluation_meta},
)
self.project_service.register_artifact(
self.user_id,
self.project_id,
ArtifactTypes.EVALUATION_REPORT,
current_version,
report_info,
version_scope="evaluation",
extra_metadata={**evaluation_meta},
)
self.project_service.register_artifact(
self.user_id,
self.project_id,
ArtifactTypes.EVALUATION_CODE,
current_version,
code_info,
version_scope="evaluation",
extra_metadata={**evaluation_meta, "lines_of_code": evaluation_code.count("\n") + 1},
)
# 7. Return comprehensive structured response
cm_filename = cm_info.path.split("/")[-1]
report_filename = report_info.path.split("/")[-1]
code_filename = code_info.path.split("/")[-1]
fi_filename = fi_info.path.split("/")[-1] if fi_info else None
cm_labels = confusion_matrix_data.get("labels", [])
cm_matrix = confusion_matrix_data.get("matrix", [])
cm_rows = [
{
"actual": label,
**{f"pred_{pred_label}": value for pred_label, value in zip(cm_labels, row)},
}
for label, row in zip(cm_labels, cm_matrix)
]
artifact_lines = [
f"- Confusion matrix: `{cm_filename}`",
f"- Evaluation report: `{report_filename}`",
f"- Evaluation code: `{code_filename}`",
]
if fi_filename:
artifact_lines.append(f"- Feature importance: `{fi_filename}`")
insights = [
"- Model shows strong predictive performance" if accuracy > 0.85 else "- Model shows moderate predictive performance",
]
if abs(precision - recall) >= 0.05:
insights.append("- Precision and recall diverge; inspect class balance")
else:
insights.append("- Precision and recall remain balanced")
if feature_importance_data:
top_feature = feature_importance_data[0]
insights.append(f"- Top feature: {top_feature['feature']} ({top_feature['importance']:.4f})")
next_steps = [
"Review the evaluation report",
"Inspect confusion matrix for class-level accuracy",
"Use generated code to reproduce results",
]
if feature_importance_data:
next_steps.append("Analyse feature importance for interpretability")
blocks = [
text_block(
f"Completed evaluation run v{current_version} for `{model_type}`",
severity=Severity.SUCCESS,
),
metric_block("Accuracy", accuracy),
metric_block("Precision", precision),
metric_block("Recall", recall),
metric_block("F1 Score", f1),
visualization_block(
[
{"metric": "Accuracy", "score": accuracy},
{"metric": "Precision", "score": precision},
{"metric": "Recall", "score": recall},
{"metric": "F1 Score", "score": f1},
],
chart_type="bar",
title="Model Performance Metrics",
subtitle=f"Evaluation v{current_version}",
block_id="evaluation_metrics_chart"
),
text_block("\n".join(artifact_lines), severity=Severity.INFO, block_id="evaluation_artifacts"),
simple_table_with_types(cm_rows, caption="Confusion matrix (actual vs predicted)", block_id="evaluation_confusion_matrix"),
text_block("**Insights**\n" + "\n".join(insights), severity=Severity.INFO, block_id="evaluation_insights"),
text_block(
"**Next steps**\n" + "\n".join(f"- {step}" for step in next_steps),
severity=Severity.INFO,
block_id="evaluation_next_steps",
),
]
if feature_importance_data:
# Add chart for feature importance
blocks.insert(
6,
visualization_block(
feature_importance_data[:10],
chart_type="bar",
title="Top Feature Importances",
subtitle=f"Top 10 features impacting {model_type} predictions",
block_id="feature_importance_chart"
),
)
blocks.insert(
7,
simple_table_with_types(
feature_importance_data[:5],
caption="Top feature importances (detailed)",
block_id="feature_importance_top",
),
)
else:
blocks.insert(
5,
text_block(
"Feature importance not available for this model type",
severity=Severity.WARNING,
block_id="feature_importance_warning",
),
)
return FormattedResponse(
blocks=blocks,
summary=f"Evaluation complete v{current_version}",
correlation_id=report_info.path,
done=True,
)
except Exception as e:
raise FileOperationException("comprehensive model evaluation", model_path, e)
def _format_confusion_matrix(self, matrix, labels):
"""Format confusion matrix for markdown display"""
formatted = " " + " ".join(f"{label:>8}" for label in labels) + "\\n"
for i, row in enumerate(matrix):
formatted += f"{labels[i]:>8}" + "".join(f"{val:>8}" for val in row) + "\\n"
return formatted
def _generate_feature_importance_section(self, feature_importance_data):
"""Generate feature importance section for report"""
if not feature_importance_data:
return "Feature importance analysis is not available for this model type."
section = "The following features have the most impact on model predictions:\\n\\n"
section += "| Rank | Feature | Importance Score |\\n"
section += "|------|---------|------------------|\\n"
for i, feat in enumerate(feature_importance_data[:10]): # Top 10 features
section += f"| {i+1} | {feat['feature']} | {feat['importance']:.4f} |\\n"
return section
def _generate_improvement_suggestions(self, accuracy, precision, recall, f1):
"""Generate improvement suggestions based on metrics"""
suggestions = []
if accuracy < 0.7:
suggestions.append("- Consider collecting more training data")
suggestions.append("- Try different model algorithms or hyperparameters")
if precision < recall:
suggestions.append("- Model may be predicting too many positive cases")
suggestions.append("- Consider adjusting classification threshold")
elif recall < precision:
suggestions.append("- Model may be too conservative in predictions")
suggestions.append("- Consider feature engineering or data augmentation")
if f1 < 0.6:
suggestions.append("- Significant model improvement needed")
suggestions.append("- Review data quality and feature selection")
return "\\n".join(suggestions) if suggestions else "- Model performance appears satisfactory"