|
|
import sys |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
def intersection_over_union(start1, end1, start2, end2): |
|
|
""" |
|
|
Compute Intersection over Union (IoU) between two date ranges. |
|
|
""" |
|
|
latest_start = max(start1, start2) |
|
|
earliest_end = min(end1, end2) |
|
|
overlap = max(0, (earliest_end - latest_start).days + 1) |
|
|
union = (end1 - start1).days + (end2 - start2).days + 2 - overlap |
|
|
return overlap / union if union > 0 else 0 |
|
|
|
|
|
def mean_abselute_error(start1, end1, start2, end2): |
|
|
""" |
|
|
Compute Mean Absolute Error (MAE) between two date ranges. |
|
|
""" |
|
|
|
|
|
if start1 is pd.NaT or end1 is pd.NaT or start2 is pd.NaT or end2 is pd.NaT: |
|
|
print("One of the dates is NaT") |
|
|
print(f"start1: {start1}, end1: {end1}, start2: {start2}, end2: {end2}") |
|
|
return None |
|
|
return (abs(start1 - start2).days + abs(end1 - end2).days) / 2 |
|
|
|
|
|
|
|
|
def get_model_eval_res(located_patterns_and_other_info_updated_dict,window_results_dict,selected_models,selected_test_patterns_without_no_pattern): |
|
|
model_eval_results_dict = {} |
|
|
for model_name in selected_models: |
|
|
print(f"\n Selected model: {model_name}") |
|
|
|
|
|
located_patterns_and_other_info_updated_df = located_patterns_and_other_info_updated_dict[model_name] |
|
|
window_results_df = window_results_dict[model_name] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
number_of_properly_located_patterns = {} |
|
|
iou_for_each_properly_detected_pattern = {} |
|
|
mae_for_each_properly_detected_pattern = {} |
|
|
|
|
|
|
|
|
located_patterns_and_other_info_updated_df['Calc_Start'] = pd.to_datetime(located_patterns_and_other_info_updated_df['Calc_Start']) |
|
|
located_patterns_and_other_info_updated_df['Calc_End'] = pd.to_datetime(located_patterns_and_other_info_updated_df['Calc_End']) |
|
|
|
|
|
|
|
|
for index, row in selected_test_patterns_without_no_pattern.iterrows(): |
|
|
sys.stdout.write(f"\rProcessing row {index + 1}/{len(selected_test_patterns_without_no_pattern)}") |
|
|
sys.stdout.flush() |
|
|
symbol = row['Symbol'] |
|
|
chart_pattern = row['Chart Pattern'] |
|
|
start_date = pd.to_datetime(row['Start']).tz_localize(None) |
|
|
end_date = pd.to_datetime(row['End']).tz_localize(None) |
|
|
|
|
|
|
|
|
located_patterns_for_this = located_patterns_and_other_info_updated_df[ |
|
|
(located_patterns_and_other_info_updated_df['Symbol'] == symbol) & |
|
|
(located_patterns_and_other_info_updated_df['Chart Pattern'] == chart_pattern) |
|
|
].copy() |
|
|
|
|
|
if located_patterns_for_this.empty: |
|
|
continue |
|
|
|
|
|
|
|
|
located_patterns_for_this.loc[:, 'IoU'] = located_patterns_for_this.apply( |
|
|
lambda x: intersection_over_union(start_date, end_date, x['Calc_Start'], x['Calc_End']), |
|
|
axis=1 |
|
|
) |
|
|
|
|
|
|
|
|
located_patterns_for_this.loc[:, 'MAE'] = located_patterns_for_this.apply( |
|
|
lambda x: mean_abselute_error(start_date, end_date, x['Calc_Start'], x['Calc_End']), |
|
|
axis=1 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
located_patterns_for_this_proper = located_patterns_for_this[located_patterns_for_this['IoU'] >= 0.25] |
|
|
|
|
|
if not located_patterns_for_this_proper.empty: |
|
|
number_of_properly_located_patterns[chart_pattern] = number_of_properly_located_patterns.get(chart_pattern, 0) + 1 |
|
|
iou_for_each_properly_detected_pattern[chart_pattern] = iou_for_each_properly_detected_pattern.get(chart_pattern, 0) + max(located_patterns_for_this_proper['IoU']) |
|
|
mae_for_each_properly_detected_pattern[chart_pattern] = mae_for_each_properly_detected_pattern.get(chart_pattern, 0) + min(located_patterns_for_this_proper['MAE']) |
|
|
|
|
|
number_of_properly_located_patterns |
|
|
|
|
|
model_eval_results_dict[model_name] = { |
|
|
'number_of_properly_located_patterns': number_of_properly_located_patterns, |
|
|
'iou_for_each_properly_detected_pattern': iou_for_each_properly_detected_pattern, |
|
|
'mae_for_each_properly_detected_pattern': mae_for_each_properly_detected_pattern |
|
|
} |
|
|
return model_eval_results_dict |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
def create_comprehensive_model_comparison(all_models_metrics): |
|
|
|
|
|
Create a comprehensive visualization comparing all models across all metrics, |
|
|
using nested concentric pie charts for Precision and Recall. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
all_models_metrics : dict |
|
|
Dictionary containing metrics for each model |
|
|
|
|
|
models = list(all_models_metrics.keys()) |
|
|
n_models = len(models) |
|
|
|
|
|
# Define the metrics to include |
|
|
key_metrics = { |
|
|
'total_recall': 'Recall', |
|
|
'total_precision': 'Precision', |
|
|
'overall_f1': 'F1 Score', |
|
|
'overall_iou': 'IoU', |
|
|
'overall_mae': 'MAE' |
|
|
} |
|
|
|
|
|
# Create figure with GridSpec for flexible layout |
|
|
fig = plt.figure(figsize=(20, 14)) |
|
|
|
|
|
# Add main title with enough space for legend below it |
|
|
plt.suptitle('Comprehensive Model Evaluation', fontsize=16, y=0.98) |
|
|
|
|
|
# Define a color palette for models |
|
|
colors = plt.cm.tab10(np.linspace(0, 1, n_models)) |
|
|
|
|
|
# Create a master legend below the title |
|
|
legend_handles = [plt.Line2D([0], [0], color=colors[i], lw=4, label=model) for i, model in enumerate(models)] |
|
|
fig.legend( |
|
|
handles=legend_handles, |
|
|
labels=models, |
|
|
loc='upper center', |
|
|
bbox_to_anchor=(0.5, 0.93), # Moved down from 0.98 to 0.93 |
|
|
ncol=n_models, |
|
|
fontsize=12 |
|
|
) |
|
|
|
|
|
# Adjust GridSpec to account for the title and legend |
|
|
gs = GridSpec(3, 3, figure=fig, height_ratios=[1.2, 1.2, 1], top=0.88) # Reduced top from 0.95 to 0.88 |
|
|
|
|
|
|
|
|
|
|
|
# 1. Precision Nested Pie Chart - top left |
|
|
ax1 = fig.add_subplot(gs[0, 0]) |
|
|
|
|
|
# Create a multi-layer nested pie chart for precision |
|
|
# Each ring represents a different model |
|
|
precision_values = [metrics['total_precision'] for metrics in all_models_metrics.values()] |
|
|
|
|
|
# Calculate radii for each ring (outermost ring is largest) |
|
|
radii = np.linspace(0.5, 1.0, n_models+1)[1:] # start from second element to skip 0.5 |
|
|
|
|
|
# Plot each model as a ring, outermost = first model |
|
|
for i, model in enumerate(models): |
|
|
# Create data for this model's ring [precision, 1-precision] |
|
|
data = [precision_values[i], 1-precision_values[i]] |
|
|
colors_ring = [colors[i], 'lightgray'] |
|
|
|
|
|
# Create pie chart for this ring |
|
|
wedges, texts = ax1.pie( |
|
|
data, |
|
|
radius=radii[i], |
|
|
colors=colors_ring, |
|
|
startangle=90, |
|
|
counterclock=False, |
|
|
wedgeprops=dict(width=0.15, edgecolor='w') |
|
|
) |
|
|
|
|
|
# Add only the value (no model name) to the pie chart wedge |
|
|
angle = (wedges[0].theta1 + wedges[0].theta2) / 2 |
|
|
x = (radii[i] - 0.075) * np.cos(np.radians(angle)) |
|
|
y = (radii[i] - 0.075) * np.sin(np.radians(angle)) |
|
|
ax1.text(x, y, f"{precision_values[i]:.3f}", |
|
|
ha='center', va='center', fontsize=10, fontweight='bold') |
|
|
|
|
|
# Create center circle for donut effect |
|
|
centre_circle = plt.Circle((0, 0), 0.25, fc='white') |
|
|
ax1.add_patch(centre_circle) |
|
|
|
|
|
ax1.set_title('Precision Comparison (Higher is Better)') |
|
|
ax1.set_aspect('equal') |
|
|
|
|
|
# 2. Recall Nested Pie Chart - top middle |
|
|
ax2 = fig.add_subplot(gs[0, 1]) |
|
|
|
|
|
# Create a multi-layer nested pie chart for recall |
|
|
recall_values = [metrics['total_recall'] for metrics in all_models_metrics.values()] |
|
|
|
|
|
# Plot each model as a ring, outermost = first model |
|
|
for i, model in enumerate(models): |
|
|
# Create data for this model's ring [recall, 1-recall] |
|
|
data = [recall_values[i], 1-recall_values[i]] |
|
|
colors_ring = [colors[i], 'lightgray'] |
|
|
|
|
|
# Create pie chart for this ring |
|
|
wedges, texts = ax2.pie( |
|
|
data, |
|
|
radius=radii[i], |
|
|
colors=colors_ring, |
|
|
startangle=90, |
|
|
counterclock=False, |
|
|
wedgeprops=dict(width=0.15, edgecolor='w') |
|
|
) |
|
|
|
|
|
# Add only the value (no model name) to the pie chart wedge |
|
|
angle = (wedges[0].theta1 + wedges[0].theta2) / 2 |
|
|
x = (radii[i] - 0.075) * np.cos(np.radians(angle)) |
|
|
y = (radii[i] - 0.075) * np.sin(np.radians(angle)) |
|
|
ax2.text(x, y, f"{recall_values[i]:.3f}", |
|
|
ha='center', va='center', fontsize=10, fontweight='bold') |
|
|
|
|
|
# Create center circle for donut effect |
|
|
centre_circle = plt.Circle((0, 0), 0.25, fc='white') |
|
|
ax2.add_patch(centre_circle) |
|
|
|
|
|
ax2.set_title('Recall Comparison (Higher is Better)') |
|
|
ax2.set_aspect('equal') |
|
|
|
|
|
# 3. F1 Score and IoU - top right |
|
|
ax3 = fig.add_subplot(gs[0, 2]) |
|
|
|
|
|
# Prepare data for grouped bar chart |
|
|
metrics_to_plot = ['overall_f1', 'overall_iou'] |
|
|
x = np.arange(len(metrics_to_plot)) |
|
|
width = 0.8 / n_models |
|
|
|
|
|
# Plot grouped bars for each model |
|
|
for i, (model_name, metrics) in enumerate(all_models_metrics.items()): |
|
|
values = [metrics[key] for key in metrics_to_plot] |
|
|
bars = ax3.bar(x + i*width - width*(n_models-1)/2, values, width, color=colors[i]) |
|
|
|
|
|
# Add value labels above each bar |
|
|
for bar, value in zip(bars, values): |
|
|
height = bar.get_height() |
|
|
ax3.text(bar.get_x() + bar.get_width()/2., height + 0.01, |
|
|
f'{value:.3f}', ha='center', va='bottom', fontsize=9, rotation=0) |
|
|
|
|
|
# Customize the plot |
|
|
ax3.set_xticks(x) |
|
|
ax3.set_xticklabels([key_metrics[key] for key in metrics_to_plot]) |
|
|
ax3.set_ylabel('Score') |
|
|
ax3.set_title('F1 Score & IoU Comparison (Higher is Better)') |
|
|
ax3.set_ylim(0, 1.0) |
|
|
ax3.grid(axis='y', linestyle='--', alpha=0.7) |
|
|
|
|
|
# 4. MAE comparison (separate bar chart) - middle left |
|
|
ax4 = fig.add_subplot(gs[1, 0]) |
|
|
|
|
|
mae_values = [metrics['overall_mae'] for metrics in all_models_metrics.values()] |
|
|
bars = ax4.bar(models, mae_values, color=colors) |
|
|
|
|
|
# Add value labels above MAE bars |
|
|
for bar, value in zip(bars, mae_values): |
|
|
height = bar.get_height() |
|
|
ax4.text(bar.get_x() + bar.get_width()/2., height + 0.01, |
|
|
f'{value:.3f}', ha='center', va='bottom', fontsize=9) |
|
|
|
|
|
ax4.set_ylabel('Error') |
|
|
ax4.set_title('Mean Absolute Error (Lower is Better)') |
|
|
ax4.grid(axis='y', linestyle='--', alpha=0.7) |
|
|
|
|
|
# 5. Model metrics radar chart - middle center |
|
|
ax5 = fig.add_subplot(gs[1, 1], polar=True) |
|
|
|
|
|
# Setup for radar chart |
|
|
metrics_for_radar = ['total_recall', 'total_precision', 'overall_f1', 'overall_iou'] |
|
|
num_vars = len(metrics_for_radar) |
|
|
angles = np.linspace(0, 2*np.pi, num_vars, endpoint=False).tolist() |
|
|
angles += angles[:1] # Close the loop |
|
|
|
|
|
# Plot each model on the radar chart |
|
|
for i, (model_name, metrics) in enumerate(all_models_metrics.items()): |
|
|
values = [metrics[metric] for metric in metrics_for_radar] |
|
|
values += values[:1] # Close the loop |
|
|
|
|
|
ax5.plot(angles, values, linewidth=2, linestyle='solid', color=colors[i]) |
|
|
ax5.fill(angles, values, alpha=0.1, color=colors[i]) |
|
|
|
|
|
# Set radar chart labels |
|
|
ax5.set_xticks(angles[:-1]) |
|
|
ax5.set_xticklabels([key_metrics[metric] for metric in metrics_for_radar]) |
|
|
ax5.set_ylim(0, 1) |
|
|
ax5.set_title('Model Performance Radar Chart') |
|
|
|
|
|
# 6. Model comparison bar - middle right |
|
|
ax6 = fig.add_subplot(gs[1, 2]) |
|
|
|
|
|
# Calculate the average of the four main metrics for an overall score |
|
|
# (excluding MAE which is inverse, lower is better) |
|
|
overall_scores = [] |
|
|
for model_name, metrics in all_models_metrics.items(): |
|
|
score = (metrics['total_recall'] + metrics['total_precision'] + |
|
|
metrics['overall_f1'] + metrics['overall_iou']) / 4 |
|
|
overall_scores.append(score) |
|
|
|
|
|
# Create horizontal bar chart |
|
|
y_pos = np.arange(len(models)) |
|
|
ax6.barh(y_pos, overall_scores, color=colors) |
|
|
ax6.set_yticks(y_pos) |
|
|
ax6.set_yticklabels(models) |
|
|
ax6.invert_yaxis() # labels read top-to-bottom |
|
|
ax6.set_xlabel('Overall Performance Score') |
|
|
ax6.set_title('Overall Model Comparison (Higher is Better)') |
|
|
|
|
|
# Add value labels |
|
|
for i, v in enumerate(overall_scores): |
|
|
ax6.text(v + 0.01, i, f'{v:.3f}', va='center') |
|
|
|
|
|
# 7. Detailed per-model metrics table - bottom span all columns |
|
|
ax7 = fig.add_subplot(gs[2, :]) |
|
|
ax7.axis('tight') |
|
|
ax7.axis('off') |
|
|
|
|
|
# Prepare table data |
|
|
table_data = [] |
|
|
for model_name, metrics in all_models_metrics.items(): |
|
|
row = [model_name] |
|
|
for key in key_metrics: |
|
|
row.append(f"{metrics[key]:.4f}") |
|
|
table_data.append(row) |
|
|
|
|
|
# Create table |
|
|
column_labels = ['Model'] + list(key_metrics.values()) |
|
|
table = ax7.table( |
|
|
cellText=table_data, |
|
|
colLabels=column_labels, |
|
|
loc='center', |
|
|
cellLoc='center' |
|
|
) |
|
|
table.auto_set_font_size(False) |
|
|
table.set_fontsize(10) |
|
|
table.scale(1, 1.5) |
|
|
ax7.set_title('Model Metrics Summary Table') |
|
|
|
|
|
plt.tight_layout(rect=[0, 0.03, 1, 0.88]) # Adjusted rect to account for title and legend |
|
|
|
|
|
plt.show() |
|
|
|
|
|
return fig |
|
|
|
|
|
# The evaluate_model and evaluate_all_models functions remain unchanged |
|
|
# The evaluate_model and evaluate_all_models functions remain unchanged |
|
|
# The evaluate_model function remains unchanged from your second code snippet |
|
|
def evaluate_model(model_name, model_eval_results_dict, pattern_row_count, test_patterns, located_patterns_and_other_info_updated_dict): |
|
|
Evaluate a model and calculate metrics without redundant plots |
|
|
print(f"\n{'='*20} Model: {model_name} {'='*20}") |
|
|
|
|
|
# Extract model results |
|
|
number_of_properly_located_patterns = model_eval_results_dict[model_name]['number_of_properly_located_patterns'] |
|
|
located_patterns_df = located_patterns_and_other_info_updated_dict[model_name] |
|
|
mae_for_each_properly_detected_pattern = model_eval_results_dict[model_name]['mae_for_each_properly_detected_pattern'] |
|
|
iou_for_each_properly_detected_pattern = model_eval_results_dict[model_name]['iou_for_each_properly_detected_pattern'] |
|
|
|
|
|
# Calculate metrics without plotting |
|
|
# Recall |
|
|
total_number_of_all_patterns = sum(pattern_row_count.values()) |
|
|
total_number_of_properly_located_patterns = sum(number_of_properly_located_patterns.values()) |
|
|
total_recall = total_number_of_properly_located_patterns / total_number_of_all_patterns if total_number_of_all_patterns > 0 else 0 |
|
|
|
|
|
per_pattern_recall = {} |
|
|
for pattern, count in number_of_properly_located_patterns.items(): |
|
|
pattern_count = test_patterns[test_patterns['Chart Pattern'] == pattern].shape[0] |
|
|
if pattern_count > 0: |
|
|
per_pattern_recall[pattern] = count / pattern_count |
|
|
else: |
|
|
per_pattern_recall[pattern] = 0 |
|
|
|
|
|
# Precision |
|
|
total_number_of_all_located_patterns = len(located_patterns_df) |
|
|
total_precision = total_number_of_properly_located_patterns / total_number_of_all_located_patterns if total_number_of_all_located_patterns > 0 else 0 |
|
|
|
|
|
per_pattern_precision = {} |
|
|
for pattern, count in number_of_properly_located_patterns.items(): |
|
|
pattern_predictions = located_patterns_df[located_patterns_df['Chart Pattern'] == pattern].shape[0] |
|
|
if pattern_predictions > 0: |
|
|
per_pattern_precision[pattern] = count / pattern_predictions |
|
|
else: |
|
|
per_pattern_precision[pattern] = 0 |
|
|
|
|
|
# F1 Score |
|
|
per_pattern_f1 = {} |
|
|
for pattern in per_pattern_recall.keys(): |
|
|
precision = per_pattern_precision.get(pattern, 0) |
|
|
recall = per_pattern_recall.get(pattern, 0) |
|
|
if precision + recall > 0: |
|
|
per_pattern_f1[pattern] = 2 * (precision * recall) / (precision + recall) |
|
|
else: |
|
|
per_pattern_f1[pattern] = 0 |
|
|
|
|
|
all_precisions = list(per_pattern_precision.values()) |
|
|
all_recalls = list(per_pattern_recall.values()) |
|
|
avg_precision = sum(all_precisions) / len(all_precisions) if all_precisions else 0 |
|
|
avg_recall = sum(all_recalls) / len(all_recalls) if all_recalls else 0 |
|
|
|
|
|
if avg_precision + avg_recall == 0: |
|
|
overall_f1 = 0 |
|
|
else: |
|
|
overall_f1 = 2 * (avg_precision * avg_recall) / (avg_precision + avg_recall) |
|
|
|
|
|
# MAE |
|
|
per_pattern_mae = {} |
|
|
for pattern, count in number_of_properly_located_patterns.items(): |
|
|
if count > 0: |
|
|
per_pattern_mae[pattern] = mae_for_each_properly_detected_pattern.get(pattern, 0) / count |
|
|
else: |
|
|
per_pattern_mae[pattern] = 0 |
|
|
|
|
|
total_mae_sum = sum(mae_for_each_properly_detected_pattern.values()) |
|
|
total_proper_patterns = sum(number_of_properly_located_patterns.values()) |
|
|
overall_mae = total_mae_sum / total_proper_patterns if total_proper_patterns > 0 else 0 |
|
|
|
|
|
# IoU |
|
|
per_pattern_iou = {} |
|
|
for pattern, count in number_of_properly_located_patterns.items(): |
|
|
if count > 0: |
|
|
per_pattern_iou[pattern] = iou_for_each_properly_detected_pattern.get(pattern, 0) / count |
|
|
else: |
|
|
per_pattern_iou[pattern] = 0 |
|
|
|
|
|
total_iou_sum = sum(iou_for_each_properly_detected_pattern.values()) |
|
|
overall_iou = total_iou_sum / total_proper_patterns if total_proper_patterns > 0 else 0 |
|
|
|
|
|
# Print summary of metrics |
|
|
print(f"Overall Recall: {total_recall:.4f}") |
|
|
print(f"Overall Precision: {total_precision:.4f}") |
|
|
print(f"Overall F1 Score: {overall_f1:.4f}") |
|
|
print(f"Overall Mean Absolute Error: {overall_mae:.4f}") |
|
|
print(f"Overall Mean Intersection over Union: {overall_iou:.4f}") |
|
|
|
|
|
# Store all metrics in one place for easy access |
|
|
metrics_summary = { |
|
|
'total_recall': total_recall, |
|
|
'per_pattern_recall': per_pattern_recall, |
|
|
'total_precision': total_precision, |
|
|
'per_pattern_precision': per_pattern_precision, |
|
|
'overall_f1': overall_f1, |
|
|
'per_pattern_f1': per_pattern_f1, |
|
|
'overall_mae': overall_mae, |
|
|
'per_pattern_mae': per_pattern_mae, |
|
|
'overall_iou': overall_iou, |
|
|
'per_pattern_iou': per_pattern_iou |
|
|
} |
|
|
|
|
|
return metrics_summary |
|
|
|
|
|
# Updated evaluate_all_models function that only creates the comprehensive plot |
|
|
def evaluate_all_models(model_eval_results_dict, pattern_row_count, test_patterns, located_patterns_and_other_info_updated_dict): |
|
|
Evaluate all models and return metrics summary with comprehensive plot only |
|
|
all_models_metrics = {} |
|
|
|
|
|
for model_name in model_eval_results_dict.keys(): |
|
|
all_models_metrics[model_name] = evaluate_model( |
|
|
model_name, |
|
|
model_eval_results_dict, |
|
|
pattern_row_count, |
|
|
test_patterns, |
|
|
located_patterns_and_other_info_updated_dict |
|
|
) |
|
|
|
|
|
# Only create the comprehensive visualization |
|
|
if len(model_eval_results_dict) > 0: |
|
|
print("\n--- Comprehensive Model Comparison ---") |
|
|
# figure = create_comprehensive_model_comparison(all_models_metrics) |
|
|
|
|
|
return all_models_metrics, None # Return None instead of figure |
|
|
""" |
|
|
|