Spaces:
Sleeping
Sleeping
| """ | |
| scripts/analyze_results.py - Advanced RAG Pipeline Analysis Tool | |
| Analyzes evaluation results from SQLite database with detailed metrics | |
| Works with both Natural Questions and Generic evaluation results | |
| """ | |
| import sqlite3 | |
| import pandas as pd | |
| import argparse | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| from collections import defaultdict | |
| import json | |
| class RAGAnalyzer: | |
| """Analyze and visualize RAG pipeline evaluation results""" | |
| def __init__(self, db_path: str = "data/evaluation_results.db"): | |
| self.db_path = db_path | |
| self.conn = sqlite3.connect(db_path) | |
| self._check_schema() | |
| def _check_schema(self): | |
| """Check if database has new columns for generic evaluation""" | |
| cursor = self.conn.cursor() | |
| cursor.execute("PRAGMA table_info(evaluation_results)") | |
| columns = [row[1] for row in cursor.fetchall()] | |
| self.has_generic_columns = 'question_type' in columns | |
| def is_generic_run(self, run_id: str) -> bool: | |
| """Check if a run is generic (no ground truth) or Natural Questions""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| SELECT ground_truth_answers | |
| FROM evaluation_results | |
| WHERE run_id = ? | |
| LIMIT 1 | |
| """, (run_id,)) | |
| result = cursor.fetchone() | |
| if result: | |
| gt = json.loads(result[0]) | |
| return len(gt) == 0 # Empty ground truth = generic | |
| return False | |
| def get_all_runs(self) -> pd.DataFrame: | |
| """Get list of all evaluation runs""" | |
| query = """ | |
| SELECT DISTINCT run_id, timestamp, | |
| COUNT(*) as total_evaluations, | |
| COUNT(DISTINCT question_id) as unique_questions, | |
| COUNT(DISTINCT pipeline_name) as pipelines_tested | |
| FROM evaluation_results | |
| GROUP BY run_id | |
| ORDER BY timestamp DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_run_summary(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Get summary statistics for a specific run or latest run""" | |
| if run_id is None: | |
| # Get latest run | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| is_generic = self.is_generic_run(run_id) | |
| if is_generic: | |
| # For generic: answer_found = 1 means success (not accuracy) | |
| accuracy_label = "success_rate_pct" | |
| accuracy_desc = "Success Rate %" | |
| else: | |
| # For Natural Questions: answer_found = 1 means correct answer | |
| accuracy_label = "accuracy_pct" | |
| accuracy_desc = "Accuracy %" | |
| query = f""" | |
| SELECT | |
| pipeline_name, | |
| COUNT(*) as total_queries, | |
| SUM(answer_found) as correct_answers, | |
| ROUND(AVG(answer_found) * 100, 2) as {accuracy_label}, | |
| ROUND(AVG(total_time_ms), 2) as avg_time_ms, | |
| ROUND(AVG(retrieval_time_ms), 2) as avg_retrieval_ms, | |
| ROUND(AVG(generation_time_ms), 2) as avg_generation_ms, | |
| ROUND(AVG(total_cost_usd), 6) as avg_cost_usd, | |
| ROUND(SUM(total_cost_usd), 6) as total_cost_usd, | |
| ROUND(AVG(num_chunks_retrieved), 1) as avg_chunks, | |
| ROUND(AVG(LENGTH(generated_answer)), 1) as avg_answer_length | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY pipeline_name | |
| ORDER BY {accuracy_label} DESC | |
| """ | |
| df = pd.read_sql_query(query, self.conn) | |
| # Add run type indicator | |
| df.insert(0, 'evaluation_type', 'Generic' if is_generic else 'Natural Questions') | |
| return df | |
| def get_question_type_breakdown(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Analyze performance by question type (only for generic evaluation)""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| if not self.has_generic_columns: | |
| return pd.DataFrame() | |
| query = f""" | |
| SELECT | |
| question_type, | |
| pipeline_name, | |
| COUNT(*) as total_queries, | |
| SUM(answer_found) as successful, | |
| ROUND(AVG(answer_found) * 100, 2) as success_rate_pct, | |
| ROUND(AVG(total_time_ms), 2) as avg_time_ms, | |
| ROUND(AVG(total_cost_usd), 6) as avg_cost_usd | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' AND question_type IS NOT NULL | |
| GROUP BY question_type, pipeline_name | |
| ORDER BY question_type, success_rate_pct DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_category_performance(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Analyze performance by question category (only for generic evaluation)""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| if not self.has_generic_columns: | |
| return pd.DataFrame() | |
| query = f""" | |
| SELECT | |
| question_category, | |
| pipeline_name, | |
| COUNT(*) as total_queries, | |
| SUM(answer_found) as successful, | |
| ROUND(AVG(answer_found) * 100, 2) as success_rate_pct, | |
| ROUND(AVG(total_time_ms), 2) as avg_time_ms, | |
| ROUND(AVG(total_cost_usd), 6) as avg_cost_usd | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' AND question_category IS NOT NULL | |
| GROUP BY question_category, pipeline_name | |
| ORDER BY question_category, success_rate_pct DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_difficulty_analysis(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Analyze performance by difficulty level""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| if not self.has_generic_columns: | |
| return pd.DataFrame() | |
| query = f""" | |
| SELECT | |
| question_difficulty, | |
| pipeline_name, | |
| COUNT(*) as total_queries, | |
| SUM(answer_found) as successful, | |
| ROUND(AVG(answer_found) * 100, 2) as success_rate_pct, | |
| ROUND(AVG(total_time_ms), 2) as avg_time_ms | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' AND question_difficulty IS NOT NULL | |
| GROUP BY question_difficulty, pipeline_name | |
| ORDER BY | |
| CASE question_difficulty | |
| WHEN 'easy' THEN 1 | |
| WHEN 'medium' THEN 2 | |
| WHEN 'hard' THEN 3 | |
| END, | |
| success_rate_pct DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_question_comparison(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Compare how each pipeline answered each question""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| query, | |
| pipeline_name, | |
| generated_answer, | |
| answer_found, | |
| total_time_ms, | |
| total_cost_usd, | |
| num_chunks_retrieved | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| ORDER BY query, pipeline_name | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_difficult_questions(self, run_id: Optional[str] = None, max_accuracy: float = 0.33) -> pd.DataFrame: | |
| """Find questions that most pipelines got wrong""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| query, | |
| ground_truth_answers, | |
| COUNT(*) as total_attempts, | |
| SUM(answer_found) as correct_count, | |
| ROUND(AVG(answer_found) * 100, 2) as accuracy_pct, | |
| GROUP_CONCAT(CASE WHEN answer_found = 0 THEN pipeline_name END) as failed_pipelines | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY query | |
| HAVING accuracy_pct <= {max_accuracy * 100} | |
| ORDER BY accuracy_pct ASC, total_attempts DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_easy_questions(self, run_id: Optional[str] = None, min_accuracy: float = 0.66) -> pd.DataFrame: | |
| """Find questions that most pipelines got right""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| query, | |
| ground_truth_answers, | |
| COUNT(*) as total_attempts, | |
| SUM(answer_found) as correct_count, | |
| ROUND(AVG(answer_found) * 100, 2) as accuracy_pct, | |
| GROUP_CONCAT(CASE WHEN answer_found = 1 THEN pipeline_name END) as successful_pipelines | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY query | |
| HAVING accuracy_pct >= {min_accuracy * 100} | |
| ORDER BY accuracy_pct DESC, total_attempts DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_cost_efficiency(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Calculate cost per correct answer""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| pipeline_name, | |
| SUM(answer_found) as correct_answers, | |
| SUM(total_cost_usd) as total_cost, | |
| CASE | |
| WHEN SUM(answer_found) > 0 | |
| THEN ROUND(SUM(total_cost_usd) / SUM(answer_found), 6) | |
| ELSE NULL | |
| END as cost_per_correct_answer, | |
| CASE | |
| WHEN SUM(total_cost_usd) > 0 | |
| THEN ROUND((SUM(answer_found) * 100.0) / SUM(total_cost_usd), 2) | |
| ELSE NULL | |
| END as accuracy_points_per_dollar | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY pipeline_name | |
| ORDER BY cost_per_correct_answer ASC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_retrieval_quality(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Analyze retrieval effectiveness""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| pipeline_name, | |
| ROUND(AVG(num_chunks_retrieved), 1) as avg_chunks, | |
| ROUND(AVG(CASE WHEN answer_found = 1 THEN num_chunks_retrieved END), 1) as avg_chunks_when_correct, | |
| ROUND(AVG(CASE WHEN answer_found = 0 THEN num_chunks_retrieved END), 1) as avg_chunks_when_wrong, | |
| ROUND(AVG(retrieval_time_ms), 2) as avg_retrieval_ms, | |
| ROUND(AVG(CASE WHEN reranked = 1 THEN reranking_time_ms END), 2) as avg_reranking_ms, | |
| SUM(reranked) as reranked_count, | |
| ROUND(AVG(answer_found) * 100, 2) as accuracy_pct | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY pipeline_name | |
| ORDER BY accuracy_pct DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_time_breakdown(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Analyze time spent in different stages""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| pipeline_name, | |
| ROUND(AVG(retrieval_time_ms), 2) as avg_retrieval_ms, | |
| ROUND(AVG(generation_time_ms), 2) as avg_generation_ms, | |
| ROUND(AVG(total_time_ms), 2) as avg_total_ms, | |
| ROUND(AVG(retrieval_time_ms) * 100.0 / NULLIF(AVG(total_time_ms), 0), 1) as retrieval_pct, | |
| ROUND(AVG(generation_time_ms) * 100.0 / NULLIF(AVG(total_time_ms), 0), 1) as generation_pct | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY pipeline_name | |
| ORDER BY avg_total_ms ASC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def get_token_usage(self, run_id: Optional[str] = None) -> pd.DataFrame: | |
| """Analyze token consumption""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| query = f""" | |
| SELECT | |
| pipeline_name, | |
| ROUND(AVG(prompt_tokens), 1) as avg_prompt_tokens, | |
| ROUND(AVG(completion_tokens), 1) as avg_completion_tokens, | |
| ROUND(AVG(total_tokens), 1) as avg_total_tokens, | |
| SUM(total_tokens) as total_tokens_used, | |
| ROUND(AVG(total_cost_usd), 6) as avg_cost_usd | |
| FROM evaluation_results | |
| WHERE run_id = '{run_id}' | |
| GROUP BY pipeline_name | |
| ORDER BY total_tokens_used DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def compare_runs(self, run_ids: List[str]) -> pd.DataFrame: | |
| """Compare metrics across multiple runs""" | |
| placeholders = ','.join(['?' for _ in run_ids]) | |
| query = f""" | |
| SELECT | |
| run_id, | |
| pipeline_name, | |
| ROUND(AVG(answer_found) * 100, 2) as accuracy_pct, | |
| ROUND(AVG(total_time_ms), 2) as avg_time_ms, | |
| ROUND(SUM(total_cost_usd), 6) as total_cost_usd | |
| FROM evaluation_results | |
| WHERE run_id IN ({placeholders}) | |
| GROUP BY run_id, pipeline_name | |
| ORDER BY run_id DESC, accuracy_pct DESC | |
| """ | |
| return pd.read_sql_query(query, self.conn, params=run_ids) | |
| def get_answer_examples(self, run_id: Optional[str] = None, pipeline: str = None, | |
| correct_only: bool = False, limit: int = 5) -> pd.DataFrame: | |
| """Get example answers from a pipeline""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| where_clauses = [f"run_id = '{run_id}'"] | |
| if pipeline: | |
| where_clauses.append(f"pipeline_name = '{pipeline}'") | |
| if correct_only: | |
| where_clauses.append("answer_found = 1") | |
| where_clause = " AND ".join(where_clauses) | |
| query = f""" | |
| SELECT | |
| query, | |
| ground_truth_answers, | |
| generated_answer, | |
| answer_found, | |
| pipeline_name | |
| FROM evaluation_results | |
| WHERE {where_clause} | |
| LIMIT {limit} | |
| """ | |
| return pd.read_sql_query(query, self.conn) | |
| def export_detailed_results(self, run_id: Optional[str] = None, output_file: str = "analysis_results.xlsx"): | |
| """Export comprehensive analysis to Excel with multiple sheets""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| is_generic = self.is_generic_run(run_id) | |
| with pd.ExcelWriter(output_file, engine='openpyxl') as writer: | |
| # Sheet 1: Summary | |
| self.get_run_summary(run_id).to_excel(writer, sheet_name='Summary', index=False) | |
| # Sheet 2: Cost efficiency | |
| self.get_cost_efficiency(run_id).to_excel(writer, sheet_name='Cost Efficiency', index=False) | |
| # Sheet 3: Time breakdown | |
| self.get_time_breakdown(run_id).to_excel(writer, sheet_name='Time Breakdown', index=False) | |
| # Sheet 4: Token usage | |
| self.get_token_usage(run_id).to_excel(writer, sheet_name='Token Usage', index=False) | |
| # Sheet 5: Retrieval quality | |
| self.get_retrieval_quality(run_id).to_excel(writer, sheet_name='Retrieval Quality', index=False) | |
| # Generic-specific sheets | |
| if is_generic and self.has_generic_columns: | |
| # Question type breakdown | |
| qtype = self.get_question_type_breakdown(run_id) | |
| if not qtype.empty: | |
| qtype.to_excel(writer, sheet_name='Question Types', index=False) | |
| # Category performance | |
| category = self.get_category_performance(run_id) | |
| if not category.empty: | |
| category.to_excel(writer, sheet_name='Categories', index=False) | |
| # Difficulty analysis | |
| difficulty = self.get_difficulty_analysis(run_id) | |
| if not difficulty.empty: | |
| difficulty.to_excel(writer, sheet_name='Difficulty', index=False) | |
| else: | |
| # Natural Questions specific sheets | |
| # Sheet 6: Difficult questions | |
| self.get_difficult_questions(run_id).to_excel(writer, sheet_name='Difficult Questions', index=False) | |
| # Sheet 7: Easy questions | |
| self.get_easy_questions(run_id).to_excel(writer, sheet_name='Easy Questions', index=False) | |
| # Sheet 8: Question-by-question comparison | |
| self.get_question_comparison(run_id).to_excel(writer, sheet_name='Question Comparison', index=False) | |
| print(f"β Detailed analysis exported to: {output_file}") | |
| def print_dashboard(self, run_id: Optional[str] = None): | |
| """Print comprehensive dashboard to console""" | |
| if run_id is None: | |
| run_id = pd.read_sql_query( | |
| "SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1", | |
| self.conn | |
| ).iloc[0]['run_id'] | |
| is_generic = self.is_generic_run(run_id) | |
| eval_type = "Generic Questions" if is_generic else "Natural Questions" | |
| print("\n" + "="*100) | |
| print(f"RAG PIPELINE EVALUATION DASHBOARD - Run ID: {run_id}") | |
| print(f"Evaluation Type: {eval_type}") | |
| print("="*100) | |
| # Summary | |
| print(f"\nπ PIPELINE PERFORMANCE SUMMARY") | |
| print("-"*100) | |
| summary = self.get_run_summary(run_id) | |
| print(summary.to_string(index=False)) | |
| # Cost Efficiency | |
| print("\nπ° COST EFFICIENCY ANALYSIS") | |
| print("-"*100) | |
| cost_eff = self.get_cost_efficiency(run_id) | |
| print(cost_eff.to_string(index=False)) | |
| # Time Breakdown | |
| print("\nβ±οΈ TIME BREAKDOWN BY STAGE") | |
| print("-"*100) | |
| time_breakdown = self.get_time_breakdown(run_id) | |
| print(time_breakdown.to_string(index=False)) | |
| # Token Usage | |
| print("\nπ« TOKEN USAGE ANALYSIS") | |
| print("-"*100) | |
| tokens = self.get_token_usage(run_id) | |
| print(tokens.to_string(index=False)) | |
| # Retrieval Quality | |
| print("\nπ RETRIEVAL QUALITY METRICS") | |
| print("-"*100) | |
| retrieval = self.get_retrieval_quality(run_id) | |
| print(retrieval.to_string(index=False)) | |
| # Generic-specific analysis | |
| if is_generic and self.has_generic_columns: | |
| # Question Type Breakdown | |
| print("\nπ PERFORMANCE BY QUESTION TYPE") | |
| print("-"*100) | |
| qtype = self.get_question_type_breakdown(run_id) | |
| if not qtype.empty: | |
| pivot = qtype.pivot_table( | |
| index='question_type', | |
| columns='pipeline_name', | |
| values='success_rate_pct' | |
| ) | |
| print(pivot.to_string()) | |
| else: | |
| print("No question type data available") | |
| # Category Performance | |
| print("\nπ·οΈ PERFORMANCE BY CATEGORY (Top 10)") | |
| print("-"*100) | |
| category = self.get_category_performance(run_id) | |
| if not category.empty: | |
| top_cats = category.groupby('question_category')['total_queries'].sum().nlargest(10).index | |
| cat_filtered = category[category['question_category'].isin(top_cats)] | |
| pivot_cat = cat_filtered.pivot_table( | |
| index='question_category', | |
| columns='pipeline_name', | |
| values='success_rate_pct' | |
| ) | |
| print(pivot_cat.to_string()) | |
| else: | |
| print("No category data available") | |
| # Difficulty Analysis | |
| print("\nπ― PERFORMANCE BY DIFFICULTY") | |
| print("-"*100) | |
| difficulty = self.get_difficulty_analysis(run_id) | |
| if not difficulty.empty: | |
| pivot_diff = difficulty.pivot_table( | |
| index='question_difficulty', | |
| columns='pipeline_name', | |
| values='success_rate_pct' | |
| ) | |
| print(pivot_diff.to_string()) | |
| else: | |
| print("No difficulty data available") | |
| else: | |
| # Natural Questions specific analysis | |
| # Easy Questions | |
| print("\nβ EASIEST QUESTIONS (β₯66% accuracy)") | |
| print("-"*100) | |
| easy = self.get_easy_questions(run_id, min_accuracy=0.66) | |
| if len(easy) > 0: | |
| print(easy[['query', 'accuracy_pct', 'correct_count', 'total_attempts']].head(3).to_string(index=False)) | |
| else: | |
| print("No questions with β₯66% accuracy found") | |
| # Difficult Questions | |
| print("\nβ MOST DIFFICULT QUESTIONS (β€33% accuracy)") | |
| print("-"*100) | |
| difficult = self.get_difficult_questions(run_id, max_accuracy=0.33) | |
| if len(difficult) > 0: | |
| print(difficult[['query', 'accuracy_pct', 'correct_count', 'total_attempts']].head(3).to_string(index=False)) | |
| else: | |
| print("No particularly difficult questions found!") | |
| print("\n" + "="*100) | |
| print("π‘ TIP: Export detailed analysis with: python scripts/analyze_results.py --export results.xlsx") | |
| print("="*100 + "\n") | |
| def __del__(self): | |
| """Close database connection""" | |
| if hasattr(self, 'conn'): | |
| self.conn.close() | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Analyze RAG pipeline evaluation results", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Analyze latest run | |
| python %(prog)s | |
| # Analyze specific run | |
| python %(prog)s --run-id 20260123_143000 | |
| # List all runs | |
| python %(prog)s --list-runs | |
| # Export to Excel | |
| python %(prog)s --export results.xlsx | |
| # Compare multiple runs | |
| python %(prog)s --compare 20260123_143000 20260123_150000 | |
| Works with both: | |
| β’ Natural Questions evaluation (with ground truth) | |
| β’ Generic evaluation (operational metrics only) | |
| """ | |
| ) | |
| parser.add_argument("--run-id", type=str, help="Specific run ID to analyze (default: latest)") | |
| parser.add_argument("--list-runs", action="store_true", help="List all available runs") | |
| parser.add_argument("--export", type=str, help="Export detailed results to Excel file") | |
| parser.add_argument("--compare", nargs='+', help="Compare multiple run IDs") | |
| parser.add_argument("--examples", type=str, help="Show answer examples from specific pipeline") | |
| args = parser.parse_args() | |
| analyzer = RAGAnalyzer() | |
| if args.list_runs: | |
| print("\nπ Available Evaluation Runs:") | |
| print("-"*100) | |
| runs = analyzer.get_all_runs() | |
| print(runs.to_string(index=False)) | |
| return | |
| if args.compare: | |
| print("\nπ Comparing Multiple Runs:") | |
| print("-"*100) | |
| comparison = analyzer.compare_runs(args.compare) | |
| print(comparison.to_string(index=False)) | |
| return | |
| if args.export: | |
| analyzer.export_detailed_results(args.run_id, args.export) | |
| return | |
| if args.examples: | |
| print(f"\nπ Example Answers from {args.examples}:") | |
| print("-"*100) | |
| examples = analyzer.get_answer_examples(args.run_id, pipeline=args.examples, limit=5) | |
| for idx, row in examples.iterrows(): | |
| print(f"\nQ: {row['query']}") | |
| gt = json.loads(row['ground_truth_answers']) | |
| if gt: | |
| print(f"Ground Truth: {gt}") | |
| print(f"Generated: {row['generated_answer'][:200]}...") | |
| print(f"Correct: {'β ' if row['answer_found'] else 'β'}") | |
| print("-"*50) | |
| return | |
| # Default: print dashboard | |
| analyzer.print_dashboard(args.run_id) | |
| if __name__ == "__main__": | |
| main() |