RAG-Pipeline-Optimizer / scripts /analyze_results.py
puji4ml's picture
Upload 30 files
2b22a59 verified
"""
scripts/analyze_results.py - Advanced RAG Pipeline Analysis Tool
Analyzes evaluation results from SQLite database with detailed metrics
Works with both Natural Questions and Generic evaluation results
"""
import sqlite3
import pandas as pd
import argparse
from pathlib import Path
from typing import Dict, List, Optional
from collections import defaultdict
import json
class RAGAnalyzer:
"""Analyze and visualize RAG pipeline evaluation results"""
def __init__(self, db_path: str = "data/evaluation_results.db"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._check_schema()
def _check_schema(self):
"""Check if database has new columns for generic evaluation"""
cursor = self.conn.cursor()
cursor.execute("PRAGMA table_info(evaluation_results)")
columns = [row[1] for row in cursor.fetchall()]
self.has_generic_columns = 'question_type' in columns
def is_generic_run(self, run_id: str) -> bool:
"""Check if a run is generic (no ground truth) or Natural Questions"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT ground_truth_answers
FROM evaluation_results
WHERE run_id = ?
LIMIT 1
""", (run_id,))
result = cursor.fetchone()
if result:
gt = json.loads(result[0])
return len(gt) == 0 # Empty ground truth = generic
return False
def get_all_runs(self) -> pd.DataFrame:
"""Get list of all evaluation runs"""
query = """
SELECT DISTINCT run_id, timestamp,
COUNT(*) as total_evaluations,
COUNT(DISTINCT question_id) as unique_questions,
COUNT(DISTINCT pipeline_name) as pipelines_tested
FROM evaluation_results
GROUP BY run_id
ORDER BY timestamp DESC
"""
return pd.read_sql_query(query, self.conn)
def get_run_summary(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Get summary statistics for a specific run or latest run"""
if run_id is None:
# Get latest run
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
is_generic = self.is_generic_run(run_id)
if is_generic:
# For generic: answer_found = 1 means success (not accuracy)
accuracy_label = "success_rate_pct"
accuracy_desc = "Success Rate %"
else:
# For Natural Questions: answer_found = 1 means correct answer
accuracy_label = "accuracy_pct"
accuracy_desc = "Accuracy %"
query = f"""
SELECT
pipeline_name,
COUNT(*) as total_queries,
SUM(answer_found) as correct_answers,
ROUND(AVG(answer_found) * 100, 2) as {accuracy_label},
ROUND(AVG(total_time_ms), 2) as avg_time_ms,
ROUND(AVG(retrieval_time_ms), 2) as avg_retrieval_ms,
ROUND(AVG(generation_time_ms), 2) as avg_generation_ms,
ROUND(AVG(total_cost_usd), 6) as avg_cost_usd,
ROUND(SUM(total_cost_usd), 6) as total_cost_usd,
ROUND(AVG(num_chunks_retrieved), 1) as avg_chunks,
ROUND(AVG(LENGTH(generated_answer)), 1) as avg_answer_length
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY pipeline_name
ORDER BY {accuracy_label} DESC
"""
df = pd.read_sql_query(query, self.conn)
# Add run type indicator
df.insert(0, 'evaluation_type', 'Generic' if is_generic else 'Natural Questions')
return df
def get_question_type_breakdown(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Analyze performance by question type (only for generic evaluation)"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
if not self.has_generic_columns:
return pd.DataFrame()
query = f"""
SELECT
question_type,
pipeline_name,
COUNT(*) as total_queries,
SUM(answer_found) as successful,
ROUND(AVG(answer_found) * 100, 2) as success_rate_pct,
ROUND(AVG(total_time_ms), 2) as avg_time_ms,
ROUND(AVG(total_cost_usd), 6) as avg_cost_usd
FROM evaluation_results
WHERE run_id = '{run_id}' AND question_type IS NOT NULL
GROUP BY question_type, pipeline_name
ORDER BY question_type, success_rate_pct DESC
"""
return pd.read_sql_query(query, self.conn)
def get_category_performance(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Analyze performance by question category (only for generic evaluation)"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
if not self.has_generic_columns:
return pd.DataFrame()
query = f"""
SELECT
question_category,
pipeline_name,
COUNT(*) as total_queries,
SUM(answer_found) as successful,
ROUND(AVG(answer_found) * 100, 2) as success_rate_pct,
ROUND(AVG(total_time_ms), 2) as avg_time_ms,
ROUND(AVG(total_cost_usd), 6) as avg_cost_usd
FROM evaluation_results
WHERE run_id = '{run_id}' AND question_category IS NOT NULL
GROUP BY question_category, pipeline_name
ORDER BY question_category, success_rate_pct DESC
"""
return pd.read_sql_query(query, self.conn)
def get_difficulty_analysis(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Analyze performance by difficulty level"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
if not self.has_generic_columns:
return pd.DataFrame()
query = f"""
SELECT
question_difficulty,
pipeline_name,
COUNT(*) as total_queries,
SUM(answer_found) as successful,
ROUND(AVG(answer_found) * 100, 2) as success_rate_pct,
ROUND(AVG(total_time_ms), 2) as avg_time_ms
FROM evaluation_results
WHERE run_id = '{run_id}' AND question_difficulty IS NOT NULL
GROUP BY question_difficulty, pipeline_name
ORDER BY
CASE question_difficulty
WHEN 'easy' THEN 1
WHEN 'medium' THEN 2
WHEN 'hard' THEN 3
END,
success_rate_pct DESC
"""
return pd.read_sql_query(query, self.conn)
def get_question_comparison(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Compare how each pipeline answered each question"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
query,
pipeline_name,
generated_answer,
answer_found,
total_time_ms,
total_cost_usd,
num_chunks_retrieved
FROM evaluation_results
WHERE run_id = '{run_id}'
ORDER BY query, pipeline_name
"""
return pd.read_sql_query(query, self.conn)
def get_difficult_questions(self, run_id: Optional[str] = None, max_accuracy: float = 0.33) -> pd.DataFrame:
"""Find questions that most pipelines got wrong"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
query,
ground_truth_answers,
COUNT(*) as total_attempts,
SUM(answer_found) as correct_count,
ROUND(AVG(answer_found) * 100, 2) as accuracy_pct,
GROUP_CONCAT(CASE WHEN answer_found = 0 THEN pipeline_name END) as failed_pipelines
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY query
HAVING accuracy_pct <= {max_accuracy * 100}
ORDER BY accuracy_pct ASC, total_attempts DESC
"""
return pd.read_sql_query(query, self.conn)
def get_easy_questions(self, run_id: Optional[str] = None, min_accuracy: float = 0.66) -> pd.DataFrame:
"""Find questions that most pipelines got right"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
query,
ground_truth_answers,
COUNT(*) as total_attempts,
SUM(answer_found) as correct_count,
ROUND(AVG(answer_found) * 100, 2) as accuracy_pct,
GROUP_CONCAT(CASE WHEN answer_found = 1 THEN pipeline_name END) as successful_pipelines
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY query
HAVING accuracy_pct >= {min_accuracy * 100}
ORDER BY accuracy_pct DESC, total_attempts DESC
"""
return pd.read_sql_query(query, self.conn)
def get_cost_efficiency(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Calculate cost per correct answer"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
pipeline_name,
SUM(answer_found) as correct_answers,
SUM(total_cost_usd) as total_cost,
CASE
WHEN SUM(answer_found) > 0
THEN ROUND(SUM(total_cost_usd) / SUM(answer_found), 6)
ELSE NULL
END as cost_per_correct_answer,
CASE
WHEN SUM(total_cost_usd) > 0
THEN ROUND((SUM(answer_found) * 100.0) / SUM(total_cost_usd), 2)
ELSE NULL
END as accuracy_points_per_dollar
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY pipeline_name
ORDER BY cost_per_correct_answer ASC
"""
return pd.read_sql_query(query, self.conn)
def get_retrieval_quality(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Analyze retrieval effectiveness"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
pipeline_name,
ROUND(AVG(num_chunks_retrieved), 1) as avg_chunks,
ROUND(AVG(CASE WHEN answer_found = 1 THEN num_chunks_retrieved END), 1) as avg_chunks_when_correct,
ROUND(AVG(CASE WHEN answer_found = 0 THEN num_chunks_retrieved END), 1) as avg_chunks_when_wrong,
ROUND(AVG(retrieval_time_ms), 2) as avg_retrieval_ms,
ROUND(AVG(CASE WHEN reranked = 1 THEN reranking_time_ms END), 2) as avg_reranking_ms,
SUM(reranked) as reranked_count,
ROUND(AVG(answer_found) * 100, 2) as accuracy_pct
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY pipeline_name
ORDER BY accuracy_pct DESC
"""
return pd.read_sql_query(query, self.conn)
def get_time_breakdown(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Analyze time spent in different stages"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
pipeline_name,
ROUND(AVG(retrieval_time_ms), 2) as avg_retrieval_ms,
ROUND(AVG(generation_time_ms), 2) as avg_generation_ms,
ROUND(AVG(total_time_ms), 2) as avg_total_ms,
ROUND(AVG(retrieval_time_ms) * 100.0 / NULLIF(AVG(total_time_ms), 0), 1) as retrieval_pct,
ROUND(AVG(generation_time_ms) * 100.0 / NULLIF(AVG(total_time_ms), 0), 1) as generation_pct
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY pipeline_name
ORDER BY avg_total_ms ASC
"""
return pd.read_sql_query(query, self.conn)
def get_token_usage(self, run_id: Optional[str] = None) -> pd.DataFrame:
"""Analyze token consumption"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
query = f"""
SELECT
pipeline_name,
ROUND(AVG(prompt_tokens), 1) as avg_prompt_tokens,
ROUND(AVG(completion_tokens), 1) as avg_completion_tokens,
ROUND(AVG(total_tokens), 1) as avg_total_tokens,
SUM(total_tokens) as total_tokens_used,
ROUND(AVG(total_cost_usd), 6) as avg_cost_usd
FROM evaluation_results
WHERE run_id = '{run_id}'
GROUP BY pipeline_name
ORDER BY total_tokens_used DESC
"""
return pd.read_sql_query(query, self.conn)
def compare_runs(self, run_ids: List[str]) -> pd.DataFrame:
"""Compare metrics across multiple runs"""
placeholders = ','.join(['?' for _ in run_ids])
query = f"""
SELECT
run_id,
pipeline_name,
ROUND(AVG(answer_found) * 100, 2) as accuracy_pct,
ROUND(AVG(total_time_ms), 2) as avg_time_ms,
ROUND(SUM(total_cost_usd), 6) as total_cost_usd
FROM evaluation_results
WHERE run_id IN ({placeholders})
GROUP BY run_id, pipeline_name
ORDER BY run_id DESC, accuracy_pct DESC
"""
return pd.read_sql_query(query, self.conn, params=run_ids)
def get_answer_examples(self, run_id: Optional[str] = None, pipeline: str = None,
correct_only: bool = False, limit: int = 5) -> pd.DataFrame:
"""Get example answers from a pipeline"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
where_clauses = [f"run_id = '{run_id}'"]
if pipeline:
where_clauses.append(f"pipeline_name = '{pipeline}'")
if correct_only:
where_clauses.append("answer_found = 1")
where_clause = " AND ".join(where_clauses)
query = f"""
SELECT
query,
ground_truth_answers,
generated_answer,
answer_found,
pipeline_name
FROM evaluation_results
WHERE {where_clause}
LIMIT {limit}
"""
return pd.read_sql_query(query, self.conn)
def export_detailed_results(self, run_id: Optional[str] = None, output_file: str = "analysis_results.xlsx"):
"""Export comprehensive analysis to Excel with multiple sheets"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
is_generic = self.is_generic_run(run_id)
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# Sheet 1: Summary
self.get_run_summary(run_id).to_excel(writer, sheet_name='Summary', index=False)
# Sheet 2: Cost efficiency
self.get_cost_efficiency(run_id).to_excel(writer, sheet_name='Cost Efficiency', index=False)
# Sheet 3: Time breakdown
self.get_time_breakdown(run_id).to_excel(writer, sheet_name='Time Breakdown', index=False)
# Sheet 4: Token usage
self.get_token_usage(run_id).to_excel(writer, sheet_name='Token Usage', index=False)
# Sheet 5: Retrieval quality
self.get_retrieval_quality(run_id).to_excel(writer, sheet_name='Retrieval Quality', index=False)
# Generic-specific sheets
if is_generic and self.has_generic_columns:
# Question type breakdown
qtype = self.get_question_type_breakdown(run_id)
if not qtype.empty:
qtype.to_excel(writer, sheet_name='Question Types', index=False)
# Category performance
category = self.get_category_performance(run_id)
if not category.empty:
category.to_excel(writer, sheet_name='Categories', index=False)
# Difficulty analysis
difficulty = self.get_difficulty_analysis(run_id)
if not difficulty.empty:
difficulty.to_excel(writer, sheet_name='Difficulty', index=False)
else:
# Natural Questions specific sheets
# Sheet 6: Difficult questions
self.get_difficult_questions(run_id).to_excel(writer, sheet_name='Difficult Questions', index=False)
# Sheet 7: Easy questions
self.get_easy_questions(run_id).to_excel(writer, sheet_name='Easy Questions', index=False)
# Sheet 8: Question-by-question comparison
self.get_question_comparison(run_id).to_excel(writer, sheet_name='Question Comparison', index=False)
print(f"βœ… Detailed analysis exported to: {output_file}")
def print_dashboard(self, run_id: Optional[str] = None):
"""Print comprehensive dashboard to console"""
if run_id is None:
run_id = pd.read_sql_query(
"SELECT run_id FROM evaluation_results ORDER BY timestamp DESC LIMIT 1",
self.conn
).iloc[0]['run_id']
is_generic = self.is_generic_run(run_id)
eval_type = "Generic Questions" if is_generic else "Natural Questions"
print("\n" + "="*100)
print(f"RAG PIPELINE EVALUATION DASHBOARD - Run ID: {run_id}")
print(f"Evaluation Type: {eval_type}")
print("="*100)
# Summary
print(f"\nπŸ“Š PIPELINE PERFORMANCE SUMMARY")
print("-"*100)
summary = self.get_run_summary(run_id)
print(summary.to_string(index=False))
# Cost Efficiency
print("\nπŸ’° COST EFFICIENCY ANALYSIS")
print("-"*100)
cost_eff = self.get_cost_efficiency(run_id)
print(cost_eff.to_string(index=False))
# Time Breakdown
print("\n⏱️ TIME BREAKDOWN BY STAGE")
print("-"*100)
time_breakdown = self.get_time_breakdown(run_id)
print(time_breakdown.to_string(index=False))
# Token Usage
print("\n🎫 TOKEN USAGE ANALYSIS")
print("-"*100)
tokens = self.get_token_usage(run_id)
print(tokens.to_string(index=False))
# Retrieval Quality
print("\nπŸ” RETRIEVAL QUALITY METRICS")
print("-"*100)
retrieval = self.get_retrieval_quality(run_id)
print(retrieval.to_string(index=False))
# Generic-specific analysis
if is_generic and self.has_generic_columns:
# Question Type Breakdown
print("\nπŸ“ PERFORMANCE BY QUESTION TYPE")
print("-"*100)
qtype = self.get_question_type_breakdown(run_id)
if not qtype.empty:
pivot = qtype.pivot_table(
index='question_type',
columns='pipeline_name',
values='success_rate_pct'
)
print(pivot.to_string())
else:
print("No question type data available")
# Category Performance
print("\n🏷️ PERFORMANCE BY CATEGORY (Top 10)")
print("-"*100)
category = self.get_category_performance(run_id)
if not category.empty:
top_cats = category.groupby('question_category')['total_queries'].sum().nlargest(10).index
cat_filtered = category[category['question_category'].isin(top_cats)]
pivot_cat = cat_filtered.pivot_table(
index='question_category',
columns='pipeline_name',
values='success_rate_pct'
)
print(pivot_cat.to_string())
else:
print("No category data available")
# Difficulty Analysis
print("\n🎯 PERFORMANCE BY DIFFICULTY")
print("-"*100)
difficulty = self.get_difficulty_analysis(run_id)
if not difficulty.empty:
pivot_diff = difficulty.pivot_table(
index='question_difficulty',
columns='pipeline_name',
values='success_rate_pct'
)
print(pivot_diff.to_string())
else:
print("No difficulty data available")
else:
# Natural Questions specific analysis
# Easy Questions
print("\nβœ… EASIEST QUESTIONS (β‰₯66% accuracy)")
print("-"*100)
easy = self.get_easy_questions(run_id, min_accuracy=0.66)
if len(easy) > 0:
print(easy[['query', 'accuracy_pct', 'correct_count', 'total_attempts']].head(3).to_string(index=False))
else:
print("No questions with β‰₯66% accuracy found")
# Difficult Questions
print("\n❌ MOST DIFFICULT QUESTIONS (≀33% accuracy)")
print("-"*100)
difficult = self.get_difficult_questions(run_id, max_accuracy=0.33)
if len(difficult) > 0:
print(difficult[['query', 'accuracy_pct', 'correct_count', 'total_attempts']].head(3).to_string(index=False))
else:
print("No particularly difficult questions found!")
print("\n" + "="*100)
print("πŸ’‘ TIP: Export detailed analysis with: python scripts/analyze_results.py --export results.xlsx")
print("="*100 + "\n")
def __del__(self):
"""Close database connection"""
if hasattr(self, 'conn'):
self.conn.close()
def main():
parser = argparse.ArgumentParser(
description="Analyze RAG pipeline evaluation results",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Analyze latest run
python %(prog)s
# Analyze specific run
python %(prog)s --run-id 20260123_143000
# List all runs
python %(prog)s --list-runs
# Export to Excel
python %(prog)s --export results.xlsx
# Compare multiple runs
python %(prog)s --compare 20260123_143000 20260123_150000
Works with both:
β€’ Natural Questions evaluation (with ground truth)
β€’ Generic evaluation (operational metrics only)
"""
)
parser.add_argument("--run-id", type=str, help="Specific run ID to analyze (default: latest)")
parser.add_argument("--list-runs", action="store_true", help="List all available runs")
parser.add_argument("--export", type=str, help="Export detailed results to Excel file")
parser.add_argument("--compare", nargs='+', help="Compare multiple run IDs")
parser.add_argument("--examples", type=str, help="Show answer examples from specific pipeline")
args = parser.parse_args()
analyzer = RAGAnalyzer()
if args.list_runs:
print("\nπŸ“‹ Available Evaluation Runs:")
print("-"*100)
runs = analyzer.get_all_runs()
print(runs.to_string(index=False))
return
if args.compare:
print("\nπŸ“Š Comparing Multiple Runs:")
print("-"*100)
comparison = analyzer.compare_runs(args.compare)
print(comparison.to_string(index=False))
return
if args.export:
analyzer.export_detailed_results(args.run_id, args.export)
return
if args.examples:
print(f"\nπŸ“ Example Answers from {args.examples}:")
print("-"*100)
examples = analyzer.get_answer_examples(args.run_id, pipeline=args.examples, limit=5)
for idx, row in examples.iterrows():
print(f"\nQ: {row['query']}")
gt = json.loads(row['ground_truth_answers'])
if gt:
print(f"Ground Truth: {gt}")
print(f"Generated: {row['generated_answer'][:200]}...")
print(f"Correct: {'βœ…' if row['answer_found'] else '❌'}")
print("-"*50)
return
# Default: print dashboard
analyzer.print_dashboard(args.run_id)
if __name__ == "__main__":
main()