Spaces:
Sleeping
Sleeping
| """ | |
| scripts/run_generic_evaluation.py - Generic RAG Pipeline Evaluation with Database Storage | |
| Uses same SQLite database structure as Natural Questions evaluation | |
| """ | |
| import sys | |
| import os | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass | |
| import json | |
| import random | |
| import sqlite3 | |
| from datetime import datetime | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from core.pipeline import RAGPipeline | |
| from config.pipeline_configs import ALL_PIPELINES | |
| from tqdm import tqdm | |
| class GenericQuestion: | |
| """Generic question structure for any evaluation""" | |
| id: str | |
| question: str | |
| expected_type: str # factual, reasoning, multi-hop, comparison, etc. | |
| category: str # history, science, geography, etc. | |
| difficulty: str # easy, medium, hard | |
| ground_truth: Optional[List[str]] = None # Optional ground truth answers | |
| class EvaluationDatabase: | |
| """SQLite database for storing evaluation results""" | |
| def __init__(self, db_path: str = "./data/evaluation_results.db"): | |
| self.db_path = Path(db_path) | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| self.conn = None | |
| self._init_database() | |
| def _init_database(self): | |
| """Initialize database schema""" | |
| self.conn = sqlite3.connect(str(self.db_path)) | |
| cursor = self.conn.cursor() | |
| # Create results table (same structure as original) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS evaluation_results ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| run_id TEXT NOT NULL, | |
| pipeline_id TEXT NOT NULL, | |
| pipeline_name TEXT NOT NULL, | |
| question_id TEXT NOT NULL, | |
| query TEXT NOT NULL, | |
| ground_truth_answers TEXT NOT NULL, | |
| retrieved_chunks TEXT NOT NULL, | |
| retrieval_scores TEXT NOT NULL, | |
| num_chunks_retrieved INTEGER, | |
| retrieval_time_ms REAL, | |
| reranking_time_ms REAL, | |
| reranked INTEGER, | |
| generated_answer TEXT NOT NULL, | |
| generation_time_ms REAL, | |
| prompt_tokens INTEGER, | |
| completion_tokens INTEGER, | |
| total_tokens INTEGER, | |
| generation_cost_usd REAL, | |
| total_cost_usd REAL, | |
| total_time_ms REAL, | |
| has_answer INTEGER, | |
| answer_found INTEGER, | |
| timestamp TEXT NOT NULL, | |
| question_type TEXT, | |
| question_category TEXT, | |
| question_difficulty TEXT | |
| ) | |
| """) | |
| # Create indexes | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_run_pipeline | |
| ON evaluation_results(run_id, pipeline_id) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_timestamp | |
| ON evaluation_results(timestamp) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_question_type | |
| ON evaluation_results(question_type) | |
| """) | |
| self.conn.commit() | |
| def insert_result(self, result: Dict[str, Any]): | |
| """Insert a single evaluation result""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO evaluation_results ( | |
| run_id, pipeline_id, pipeline_name, question_id, | |
| query, ground_truth_answers, | |
| retrieved_chunks, retrieval_scores, num_chunks_retrieved, | |
| retrieval_time_ms, reranking_time_ms, reranked, | |
| generated_answer, generation_time_ms, | |
| prompt_tokens, completion_tokens, total_tokens, | |
| generation_cost_usd, total_cost_usd, total_time_ms, | |
| has_answer, answer_found, timestamp, | |
| question_type, question_category, question_difficulty | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| result['run_id'], result['pipeline_id'], result['pipeline_name'], result['question_id'], | |
| result['query'], result['ground_truth_answers'], | |
| result['retrieved_chunks'], result['retrieval_scores'], result['num_chunks_retrieved'], | |
| result['retrieval_time_ms'], result['reranking_time_ms'], result['reranked'], | |
| result['generated_answer'], result['generation_time_ms'], | |
| result['prompt_tokens'], result['completion_tokens'], result['total_tokens'], | |
| result['generation_cost_usd'], result['total_cost_usd'], result['total_time_ms'], | |
| result['has_answer'], result['answer_found'], result['timestamp'], | |
| result.get('question_type'), result.get('question_category'), result.get('question_difficulty') | |
| )) | |
| self.conn.commit() | |
| def close(self): | |
| """Close database connection""" | |
| if self.conn: | |
| self.conn.close() | |
| class SyntheticQuestionGenerator: | |
| """Generate synthetic questions from your Wikipedia corpus""" | |
| def generate_diverse_questions(cls, num_questions: int = 100) -> List[GenericQuestion]: | |
| """Generate diverse questions covering multiple domains""" | |
| questions = [] | |
| # Pre-defined diverse questions covering Wikipedia topics | |
| question_pool = [ | |
| # Science & Technology | |
| ("What is quantum computing?", "factual", "technology", "medium"), | |
| ("How does photosynthesis work?", "reasoning", "biology", "medium"), | |
| ("What are the main differences between RNA and DNA?", "comparison", "biology", "hard"), | |
| ("Who invented the World Wide Web?", "factual", "technology", "easy"), | |
| ("What is artificial intelligence?", "factual", "technology", "easy"), | |
| ("How does a computer process information?", "reasoning", "technology", "hard"), | |
| ("What is machine learning?", "factual", "technology", "medium"), | |
| ("Compare classical computing and quantum computing.", "comparison", "technology", "hard"), | |
| # History | |
| ("When did World War II end?", "factual", "history", "easy"), | |
| ("What caused the French Revolution?", "reasoning", "history", "hard"), | |
| ("Who was the first president of the United States?", "factual", "history", "easy"), | |
| ("What were the main consequences of the Industrial Revolution?", "reasoning", "history", "hard"), | |
| ("Compare the Roman Empire and the Byzantine Empire.", "comparison", "history", "hard"), | |
| ("When did the Renaissance begin?", "factual", "history", "medium"), | |
| ("What led to the fall of the Roman Empire?", "reasoning", "history", "hard"), | |
| ("Who was Napoleon Bonaparte?", "factual", "history", "easy"), | |
| # Geography | |
| ("What is the capital of France?", "factual", "geography", "easy"), | |
| ("Where is Mount Everest located?", "factual", "geography", "easy"), | |
| ("What are the longest rivers in the world?", "factual", "geography", "medium"), | |
| ("How does climate change affect ocean currents?", "reasoning", "geography", "hard"), | |
| ("What is the difference between weather and climate?", "comparison", "geography", "medium"), | |
| ("Which ocean is the largest?", "factual", "geography", "easy"), | |
| ("What causes earthquakes?", "reasoning", "geography", "medium"), | |
| # Arts & Literature | |
| ("Who wrote Romeo and Juliet?", "factual", "literature", "easy"), | |
| ("What is impressionism in art?", "factual", "art", "medium"), | |
| ("Who painted the Mona Lisa?", "factual", "art", "easy"), | |
| ("What are the main themes in Shakespeare's works?", "reasoning", "literature", "hard"), | |
| ("Compare Renaissance and Baroque art.", "comparison", "art", "hard"), | |
| ("Who wrote Pride and Prejudice?", "factual", "literature", "easy"), | |
| # Science | |
| ("What is gravity?", "factual", "physics", "easy"), | |
| ("How does a black hole form?", "reasoning", "astronomy", "hard"), | |
| ("What is the periodic table?", "factual", "chemistry", "medium"), | |
| ("Why is the sky blue?", "reasoning", "physics", "medium"), | |
| ("What is evolution?", "factual", "biology", "medium"), | |
| ("How does the human brain work?", "reasoning", "biology", "hard"), | |
| ("What is the speed of light?", "factual", "physics", "easy"), | |
| ("Compare atomic and molecular structures.", "comparison", "chemistry", "hard"), | |
| # Mathematics | |
| ("What is pi?", "factual", "mathematics", "easy"), | |
| ("Who is credited with inventing calculus?", "factual", "mathematics", "medium"), | |
| ("What is a prime number?", "factual", "mathematics", "easy"), | |
| ("How does the Pythagorean theorem work?", "reasoning", "mathematics", "medium"), | |
| # Sports | |
| ("Who has won the most Olympic gold medals?", "factual", "sports", "medium"), | |
| ("What is the origin of the Olympic Games?", "reasoning", "sports", "medium"), | |
| ("How many players are on a soccer team?", "factual", "sports", "easy"), | |
| ("Compare basketball and football.", "comparison", "sports", "medium"), | |
| # Medicine | |
| ("What is a vaccine?", "factual", "medicine", "medium"), | |
| ("How does the immune system work?", "reasoning", "medicine", "hard"), | |
| ("What causes diabetes?", "reasoning", "medicine", "hard"), | |
| ("What is the difference between bacteria and viruses?", "comparison", "medicine", "hard"), | |
| # Economics | |
| ("What is GDP?", "factual", "economics", "medium"), | |
| ("How does inflation affect the economy?", "reasoning", "economics", "hard"), | |
| ("What is supply and demand?", "factual", "economics", "medium"), | |
| ("Compare capitalism and socialism.", "comparison", "economics", "hard"), | |
| # Philosophy | |
| ("Who was Socrates?", "factual", "philosophy", "easy"), | |
| ("What is existentialism?", "factual", "philosophy", "hard"), | |
| ("How did Plato influence Western philosophy?", "reasoning", "philosophy", "hard"), | |
| # Music | |
| ("Who composed the Symphony No. 9?", "factual", "music", "medium"), | |
| ("What is classical music?", "factual", "music", "medium"), | |
| ("How did jazz music originate?", "reasoning", "music", "medium"), | |
| ] | |
| # Sample questions to reach desired number | |
| selected = random.sample(question_pool * (num_questions // len(question_pool) + 1), | |
| num_questions) | |
| for idx, (q, qtype, category, difficulty) in enumerate(selected): | |
| questions.append(GenericQuestion( | |
| id=f"gen_{idx+1:04d}", | |
| question=q, | |
| expected_type=qtype, | |
| category=category, | |
| difficulty=difficulty, | |
| ground_truth=None | |
| )) | |
| return questions | |
| class GenericEvaluator: | |
| """Evaluates pipelines on generic questions and stores in database""" | |
| def __init__( | |
| self, | |
| questions: List[GenericQuestion], | |
| pipelines_to_evaluate: List[str] = None, | |
| max_workers: int = 3, | |
| verbose: bool = True | |
| ): | |
| self.questions = questions | |
| self.pipelines_to_evaluate = pipelines_to_evaluate or list(ALL_PIPELINES.keys()) | |
| self.max_workers = max_workers | |
| self.verbose = verbose | |
| self.run_id = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.db = EvaluationDatabase() | |
| self.pipelines = {} | |
| def initialize_pipelines(self): | |
| """Initialize specified pipelines""" | |
| if self.verbose: | |
| print(f"\n[*] Initializing {len(self.pipelines_to_evaluate)} pipelines...") | |
| print("=" * 80) | |
| for pipeline_id in self.pipelines_to_evaluate: | |
| if pipeline_id not in ALL_PIPELINES: | |
| print(f"[WARNING] Invalid pipeline ID: {pipeline_id}, skipping") | |
| continue | |
| config = ALL_PIPELINES[pipeline_id] | |
| if self.verbose: | |
| print(f"\n[{pipeline_id}] {config.name}") | |
| try: | |
| collection_name = f"pipeline_{pipeline_id.lower()}_corpus" | |
| pipeline = RAGPipeline( | |
| config=config, | |
| collection_name=collection_name, | |
| verbose=False | |
| ) | |
| self.pipelines[pipeline_id] = pipeline | |
| if self.verbose: | |
| print(f" ✅ Initialized") | |
| except Exception as e: | |
| print(f" ❌ Failed: {e}") | |
| if self.verbose: | |
| print(f"\n[OK] Initialized {len(self.pipelines)}/{len(self.pipelines_to_evaluate)} pipelines") | |
| def evaluate_single_question( | |
| self, | |
| pipeline_id: str, | |
| pipeline: RAGPipeline, | |
| question: GenericQuestion | |
| ) -> Dict[str, Any]: | |
| """Evaluate a single question on a pipeline""" | |
| start_time = time.time() | |
| try: | |
| result = pipeline.query(question.question, show_details=False) | |
| total_time = (time.time() - start_time) * 1000 | |
| # For generic questions without ground truth, answer_found = 1 if query succeeded | |
| return { | |
| 'run_id': self.run_id, | |
| 'pipeline_id': pipeline_id, | |
| 'pipeline_name': ALL_PIPELINES[pipeline_id].name, | |
| 'question_id': question.id, | |
| 'query': question.question, | |
| 'ground_truth_answers': json.dumps([]), # Empty for generic | |
| 'retrieved_chunks': json.dumps(result['chunks']), | |
| 'retrieval_scores': json.dumps(result['scores']), | |
| 'num_chunks_retrieved': len(result['chunks']), | |
| 'retrieval_time_ms': result['retrieval_time'] * 1000, | |
| 'reranking_time_ms': 0.0, | |
| 'reranked': int(ALL_PIPELINES[pipeline_id].use_reranking), | |
| 'generated_answer': result['answer'], | |
| 'generation_time_ms': result['generation_time'] * 1000, | |
| 'prompt_tokens': 0, | |
| 'completion_tokens': 0, | |
| 'total_tokens': result.get('tokens', 0), | |
| 'generation_cost_usd': result.get('cost', 0.0), | |
| 'total_cost_usd': result.get('cost', 0.0), | |
| 'total_time_ms': total_time, | |
| 'has_answer': 0, # No ground truth | |
| 'answer_found': 1, # Success = 1 for operational metric | |
| 'timestamp': datetime.now().isoformat(), | |
| 'question_type': question.expected_type, | |
| 'question_category': question.category, | |
| 'question_difficulty': question.difficulty | |
| } | |
| except Exception as e: | |
| return { | |
| 'run_id': self.run_id, | |
| 'pipeline_id': pipeline_id, | |
| 'pipeline_name': ALL_PIPELINES[pipeline_id].name, | |
| 'question_id': question.id, | |
| 'query': question.question, | |
| 'ground_truth_answers': json.dumps([]), | |
| 'retrieved_chunks': json.dumps([]), | |
| 'retrieval_scores': json.dumps([]), | |
| 'num_chunks_retrieved': 0, | |
| 'retrieval_time_ms': 0.0, | |
| 'reranking_time_ms': 0.0, | |
| 'reranked': 0, | |
| 'generated_answer': f"ERROR: {str(e)[:200]}", | |
| 'generation_time_ms': 0.0, | |
| 'prompt_tokens': 0, | |
| 'completion_tokens': 0, | |
| 'total_tokens': 0, | |
| 'generation_cost_usd': 0.0, | |
| 'total_cost_usd': 0.0, | |
| 'total_time_ms': (time.time() - start_time) * 1000, | |
| 'has_answer': 0, | |
| 'answer_found': 0, # Failed | |
| 'timestamp': datetime.now().isoformat(), | |
| 'question_type': question.expected_type, | |
| 'question_category': question.category, | |
| 'question_difficulty': question.difficulty | |
| } | |
| def evaluate_all_parallel(self): | |
| """Evaluate all questions on all pipelines in parallel""" | |
| if not self.pipelines: | |
| print("[ERROR] No pipelines initialized!") | |
| return | |
| print(f"\n[*] Starting evaluation...") | |
| print("=" * 80) | |
| print(f"Run ID: {self.run_id}") | |
| print(f"Questions: {len(self.questions)}") | |
| print(f"Pipelines: {len(self.pipelines)}") | |
| print(f"Total evaluations: {len(self.questions) * len(self.pipelines)}") | |
| print(f"Database: {self.db.db_path}") | |
| print("=" * 80) | |
| tasks = [] | |
| for question in self.questions: | |
| for pipeline_id, pipeline in self.pipelines.items(): | |
| tasks.append((pipeline_id, pipeline, question)) | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = { | |
| executor.submit( | |
| self.evaluate_single_question, | |
| pipeline_id, | |
| pipeline, | |
| question | |
| ): (pipeline_id, question.id) | |
| for pipeline_id, pipeline, question in tasks | |
| } | |
| with tqdm(total=len(futures), desc="Evaluating", unit="query") as pbar: | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| # Insert into database immediately | |
| self.db.insert_result(result) | |
| pbar.update(1) | |
| except Exception as e: | |
| pipeline_id, question_id = futures[future] | |
| print(f"\n[ERROR] Task failed ({pipeline_id}, Q{question_id}): {e}") | |
| pbar.update(1) | |
| print(f"\n[OK] Evaluation complete! {len(results)} results stored in database") | |
| return results | |
| def print_summary(self): | |
| """Print summary from database""" | |
| cursor = self.db.conn.cursor() | |
| # Get summary statistics | |
| cursor.execute(""" | |
| SELECT | |
| pipeline_name, | |
| COUNT(*) as total_queries, | |
| SUM(answer_found) as successful_queries, | |
| ROUND(AVG(answer_found) * 100, 2) as success_rate_pct, | |
| ROUND(AVG(total_time_ms), 2) as avg_time_ms, | |
| ROUND(AVG(total_cost_usd), 6) as avg_cost_usd, | |
| ROUND(SUM(total_cost_usd), 6) as total_cost_usd, | |
| ROUND(AVG(total_tokens), 1) as avg_tokens | |
| FROM evaluation_results | |
| WHERE run_id = ? | |
| GROUP BY pipeline_name | |
| ORDER BY success_rate_pct DESC | |
| """, (self.run_id,)) | |
| results = cursor.fetchall() | |
| print("\n" + "=" * 100) | |
| print("EVALUATION SUMMARY (Generic Questions)") | |
| print("=" * 100) | |
| print(f"\n{'Pipeline':<40} {'Success':<12} {'Avg Time':<12} {'Avg Cost':<12} {'Total Cost':<12}") | |
| print("-" * 100) | |
| for row in results: | |
| print( | |
| f"{row[0]:<40} " | |
| f"{row[3]:>10.1f}% " | |
| f"{row[4]:>10.0f}ms " | |
| f"${row[5]:>10.6f} " | |
| f"${row[6]:>10.4f}" | |
| ) | |
| print("\n" + "=" * 100) | |
| print(f"[DATABASE] Results saved to: {self.db.db_path}") | |
| print(f"[RUN ID] {self.run_id}") | |
| print("=" * 100) | |
| def close(self): | |
| """Cleanup resources""" | |
| self.db.close() | |
| def main(): | |
| """Main execution function""" | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Run generic RAG pipeline evaluation with database storage", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Evaluate all pipelines with 50 synthetic questions | |
| python %(prog)s --num-questions 50 | |
| # Evaluate specific pipelines | |
| python %(prog)s --pipelines A,C,D --num-questions 100 | |
| # Results stored in SQLite database (same as Natural Questions) | |
| # Analyze with: python scripts/analyze_results.py | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--num-questions", | |
| type=int, | |
| default=50, | |
| help="Number of questions to generate (default: 50)" | |
| ) | |
| parser.add_argument( | |
| "--pipelines", | |
| type=str, | |
| default=None, | |
| help="Comma-separated pipeline IDs (e.g., 'A,C,D'). Default: all pipelines" | |
| ) | |
| parser.add_argument( | |
| "--workers", | |
| type=int, | |
| default=3, | |
| help="Max parallel workers (default: 3)" | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| help="Print detailed progress" | |
| ) | |
| args = parser.parse_args() | |
| print("🔍 Generic RAG Pipeline Evaluation (Database Storage)") | |
| print("=" * 80) | |
| print("Evaluation focuses on operational metrics:") | |
| print(" • Response speed and latency") | |
| print(" • Token usage and cost") | |
| print(" • System reliability") | |
| print(" • Diverse question coverage") | |
| print("=" * 80) | |
| # Generate questions | |
| print(f"\n[Step 1] Generating {args.num_questions} diverse questions...") | |
| questions = SyntheticQuestionGenerator.generate_diverse_questions(args.num_questions) | |
| print(f"[OK] Generated {len(questions)} questions") | |
| # Parse pipeline list | |
| pipelines_to_eval = None | |
| if args.pipelines: | |
| pipelines_to_eval = [p.strip().upper() for p in args.pipelines.split(',')] | |
| print(f"[INFO] Evaluating pipelines: {', '.join(pipelines_to_eval)}") | |
| # Initialize evaluator | |
| print(f"\n[Step 2] Initializing evaluator...") | |
| evaluator = GenericEvaluator( | |
| questions=questions, | |
| pipelines_to_evaluate=pipelines_to_eval, | |
| max_workers=args.workers, | |
| verbose=args.verbose | |
| ) | |
| # Initialize pipelines | |
| evaluator.initialize_pipelines() | |
| if len(evaluator.pipelines) == 0: | |
| print("[ERROR] No pipelines initialized! Exiting.") | |
| return | |
| # Run evaluation | |
| print(f"\n[Step 3] Running evaluation...") | |
| start_time = time.time() | |
| results = evaluator.evaluate_all_parallel() | |
| elapsed_time = time.time() - start_time | |
| # Print summary | |
| print(f"\n[Step 4] Generating summary...") | |
| evaluator.print_summary() | |
| print(f"\n[TIME] Total time: {elapsed_time:.1f}s") | |
| print(f"[TIME] Avg per question: {elapsed_time/len(questions):.2f}s") | |
| # Cleanup | |
| evaluator.close() | |
| print("\n[NEXT STEPS]") | |
| print(" 1. Analyze results: python scripts/analyze_results.py") | |
| print(" 2. Export to Excel: python scripts/analyze_results.py --export results.xlsx") | |
| print(" 3. Compare pipeline performance across question types") | |
| if __name__ == "__main__": | |
| main() |