OBD_Logger / OBD /visualiseScorer.py
LiamKhoaLe's picture
Upd fuel efficiency model fusing
bc3c386
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from driving_aggressiveness_scorer import DrivingAggressivenessScorer
def visualize_drive(df_scored: pd.DataFrame, results: dict, save_path: str = None):
"""
Create comprehensive visualization of drive analysis.
Args:
df_scored: DataFrame with aggressiveness scores
results: Aggregate results dictionary
save_path: Optional path to save figure
"""
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
fig.suptitle(f"Drive Analysis - Score: {results['final_score']:.1f}/100",
fontsize=16, fontweight='bold')
# 1. Aggressiveness Score Over Time
ax = axes[0, 0]
ax.plot(df_scored['aggressiveness_score'], linewidth=1, color='#2E86AB')
ax.axhline(y=results['mean_score'], color='green', linestyle='--',
label=f"Mean: {results['mean_score']:.1f}")
ax.axhline(y=70, color='orange', linestyle='--', alpha=0.5, label='Spike Threshold')
ax.axhline(y=85, color='red', linestyle='--', alpha=0.5, label='Extreme Threshold')
ax.set_title('Aggressiveness Score Timeline')
ax.set_ylabel('Score (0-100)')
ax.set_xlabel('Sample Number')
ax.legend()
ax.grid(True, alpha=0.3)
# 2. Score Distribution
ax = axes[0, 1]
ax.hist(df_scored['aggressiveness_score'], bins=50, color='#A23B72', alpha=0.7, edgecolor='black')
ax.axvline(x=results['mean_score'], color='green', linestyle='--', linewidth=2, label='Mean')
ax.axvline(x=results['median_score'], color='blue', linestyle='--', linewidth=2, label='Median')
ax.set_title('Score Distribution')
ax.set_xlabel('Aggressiveness Score')
ax.set_ylabel('Frequency')
ax.legend()
ax.grid(True, alpha=0.3)
# 3. RPM vs Throttle Position (colored by score)
ax = axes[1, 0]
scatter = ax.scatter(df_scored['THROTTLE_POS'], df_scored['RPM'],
c=df_scored['aggressiveness_score'], cmap='RdYlGn_r',
s=10, alpha=0.6)
ax.set_title('RPM vs Throttle Position')
ax.set_xlabel('Throttle Position (%)')
ax.set_ylabel('RPM')
plt.colorbar(scatter, ax=ax, label='Aggressiveness')
ax.grid(True, alpha=0.3)
# 4. Speed vs Engine Load (colored by score)
ax = axes[1, 1]
scatter = ax.scatter(df_scored['SPEED'], df_scored['ENGINE_LOAD'],
c=df_scored['aggressiveness_score'], cmap='RdYlGn_r',
s=10, alpha=0.6)
ax.set_title('Speed vs Engine Load')
ax.set_xlabel('Speed (km/h)')
ax.set_ylabel('Engine Load (%)')
plt.colorbar(scatter, ax=ax, label='Aggressiveness')
ax.grid(True, alpha=0.3)
# 5. Key Metrics Over Time
ax = axes[2, 0]
ax2 = ax.twinx()
ln1 = ax.plot(df_scored['RPM'] / 100, label='RPM/100', color='#E63946', linewidth=0.8)
ln2 = ax.plot(df_scored['THROTTLE_POS'], label='Throttle %', color='#F77F00', linewidth=0.8)
ln3 = ax2.plot(df_scored['SPEED'], label='Speed', color='#06FFA5', linewidth=0.8)
ax.set_title('Key Metrics Timeline')
ax.set_xlabel('Sample Number')
ax.set_ylabel('RPM/100 & Throttle %')
ax2.set_ylabel('Speed (km/h)')
# Combine legends
lns = ln1 + ln2 + ln3
labs = [l.get_label() for l in lns]
ax.legend(lns, labs, loc='upper left')
ax.grid(True, alpha=0.3)
# 6. Score Statistics Summary
ax = axes[2, 1]
ax.axis('off')
stats_text = f"""
AGGREGATE SCORE BREAKDOWN
{'─' * 40}
Final Score: {results['final_score']:.1f} / 100
SCORE STATISTICS
Mean: {results['mean_score']:.1f}
Median: {results['median_score']:.1f}
Std Dev: {results['std_score']:.1f}
PERCENTILES
75th: {results['p75_score']:.1f}
90th: {results['p90_score']:.1f}
95th: {results['p95_score']:.1f}
99th: {results['p99_score']:.1f}
Max: {results['max_score']:.1f}
SPIKE ANALYSIS
Spikes (>70): {results['spike_percentage']:.1f}%
Extreme (>85): {results['extreme_percentage']:.1f}%
Spike Penalty: +{results['spike_penalty']:.1f}
"""
ax.text(0.1, 0.95, stats_text, transform=ax.transAxes,
fontfamily='monospace', fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.3))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f"✓ Visualization saved to {save_path}")
plt.close()
else:
plt.show()
def compare_drives(scorer: DrivingAggressivenessScorer, csv_paths: list):
"""
Compare multiple drives side-by-side.
Args:
scorer: DrivingAggressivenessScorer instance
csv_paths: List of CSV file paths to compare
"""
results_list = []
for csv_path in csv_paths:
_, results = scorer.analyze_drive(csv_path, update_bounds=True)
results['file'] = csv_path
results_list.append(results)
# Create comparison DataFrame
comparison_df = pd.DataFrame(results_list)
print("\n" + "="*80)
print("DRIVE COMPARISON")
print("="*80)
print(comparison_df[['file', 'final_score', 'mean_score',
'spike_percentage', 'spike_penalty']].to_string(index=False))
print("="*80 + "\n")
return comparison_df
def batch_analyze_folder(folder_path: str, pattern: str = "*.csv"):
from pathlib import Path
scorer = DrivingAggressivenessScorer()
csv_files = list(Path(folder_path).glob(pattern))
if not csv_files:
print(f"No CSV files found in {folder_path}")
return
print(f"Found {len(csv_files)} CSV files")
all_results = []
for csv_file in csv_files:
try:
df_scored, results = scorer.analyze_drive(str(csv_file), update_bounds=True)
results['filename'] = csv_file.name
all_results.append(results)
# Save individual scored file
output_path = csv_file.parent / f"{csv_file.stem}_scored.csv"
df_scored.to_csv(output_path, index=False)
except Exception as e:
print(f"Error processing {csv_file}: {e}")
continue
summary_df = pd.DataFrame(all_results)
summary_path = Path(folder_path) / "drive_summary_report.csv"
summary_df.to_csv(summary_path, index=False)
print(f"\n✓ Summary report saved to {summary_path}")
return summary_df
def export_bounds_report(scorer: DrivingAggressivenessScorer, output_path: str = "bounds_report.txt"):
bounds = scorer.get_current_bounds()
report = []
report.append("="*60)
report.append("DRIVING AGGRESSIVENESS SCORER - BOUNDS REPORT")
report.append("="*60)
report.append(f"\nGenerated: {pd.Timestamp.now()}\n")
report.append("PARAMETER WEIGHTS:")
report.append("-"*60)
for param, weight in scorer.weights.items():
report.append(f"{param:20s}: {weight:.3f} ({weight*100:.1f}%)")
report.append("\n\nCURRENT BOUNDS:")
report.append("-"*60)
report.append(f"{'Parameter':<20s} {'Min':>12s} {'Max':>12s} {'Range':>12s}")
report.append("-"*60)
for param in scorer.weights.keys():
min_val = bounds[param]['min']
max_val = bounds[param]['max']
range_val = max_val - min_val
report.append(f"{param:<20s} {min_val:>12.2f} {max_val:>12.2f} {range_val:>12.2f}")
report.append("="*60)
report_text = "\n".join(report)
with open(output_path, 'w') as f:
f.write(report_text)
print(report_text)
print(f"\n✓ Report saved to {output_path}")
# Example usage
if __name__ == "__main__":
scorer = DrivingAggressivenessScorer()
csv_path = 'obd_data_log_20251012_121810.csv'
df_scored, results = scorer.analyze_drive(csv_path)
visualize_drive(df_scored, results, save_path='drive_analysis.png')
# Export bounds report
export_bounds_report(scorer)