Spaces:
Running
Running
| """ | |
| Main Evaluation Pipeline for RGB RAG Benchmark | |
| Evaluates multiple Groq LLMs on all four RAG abilities: | |
| 1. Noise Robustness | |
| 2. Negative Rejection | |
| 3. Information Integration | |
| 4. Counterfactual Robustness | |
| """ | |
| import os | |
| import json | |
| import argparse | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| from tqdm import tqdm | |
| from src.llm_client import GroqLLMClient | |
| from src.data_loader import RGBDataLoader, TaskType, RGBSample | |
| from src.evaluator import RGBEvaluator, EvaluationResult, format_results_table | |
| from src.prompts import get_prompt_template, format_prompt | |
| class RGBEvaluationPipeline: | |
| """ | |
| Main pipeline for evaluating LLMs on the RGB benchmark. | |
| """ | |
| def __init__( | |
| self, | |
| data_dir: str = "data", | |
| output_dir: str = "results", | |
| models: Optional[List[str]] = None | |
| ): | |
| """ | |
| Initialize the evaluation pipeline. | |
| Args: | |
| data_dir: Directory containing RGB dataset files. | |
| output_dir: Directory to save results. | |
| models: List of Groq models to evaluate. Uses defaults if None. | |
| """ | |
| self.data_loader = RGBDataLoader(data_dir) | |
| self.evaluator = RGBEvaluator() | |
| self.output_dir = output_dir | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Default models to evaluate (at least 3 as required) | |
| self.models = models or [ | |
| "llama-3.3-70b-versatile", # Best quality | |
| "llama-3.1-8b-instant", # Fast | |
| "mixtral-8x7b-32768", # Good balance | |
| ] | |
| self.results: List[EvaluationResult] = [] | |
| def _create_client(self, model: str) -> GroqLLMClient: | |
| """Create a Groq client for a specific model.""" | |
| return GroqLLMClient(model=model) | |
| def _generate_responses( | |
| self, | |
| client: GroqLLMClient, | |
| samples: List[RGBSample], | |
| prompt_template: str, | |
| desc: str = "Generating" | |
| ) -> List[str]: | |
| """ | |
| Generate responses for a list of samples. | |
| Uses the system instruction from Figure 3 of the paper. | |
| Args: | |
| client: The LLM client to use. | |
| samples: List of samples to process. | |
| prompt_template: The prompt template to use. | |
| desc: Description for progress bar. | |
| Returns: | |
| List of model responses. | |
| """ | |
| from src.prompts import get_system_instruction | |
| responses = [] | |
| system_instruction = get_system_instruction() | |
| for sample in tqdm(samples, desc=desc): | |
| prompt = format_prompt( | |
| question=sample.question, | |
| documents=sample.documents, | |
| template=prompt_template | |
| ) | |
| # Use system instruction from Figure 3 of the paper | |
| response = client.generate(prompt, system_prompt=system_instruction) | |
| responses.append(response) | |
| return responses | |
| def evaluate_noise_robustness( | |
| self, | |
| model: str, | |
| max_samples: Optional[int] = None, | |
| noise_ratios: Optional[List[float]] = None | |
| ) -> List[EvaluationResult]: | |
| """ | |
| Evaluate noise robustness for a model. | |
| Tests multiple noise ratios as per the RGB paper (0%, 20%, 40%, 60%, 80%). | |
| Args: | |
| model: The model name to evaluate. | |
| max_samples: Maximum samples to evaluate per noise ratio. | |
| noise_ratios: List of noise ratios to test. Defaults to paper's ratios. | |
| Returns: | |
| List of EvaluationResults for different noise ratios. | |
| """ | |
| if noise_ratios is None: | |
| # Use the same noise ratios as the paper | |
| noise_ratios = [0.0, 0.2, 0.4, 0.6, 0.8] | |
| print(f"\n[Noise Robustness] Evaluating {model}...") | |
| print(f" Testing noise ratios: {noise_ratios}") | |
| client = self._create_client(model) | |
| results = [] | |
| for noise_ratio in noise_ratios: | |
| samples = self.data_loader.load_noise_robustness(max_samples, noise_rate=noise_ratio) | |
| if not samples: | |
| print(f" Warning: No noise robustness samples found for noise_rate={noise_ratio}") | |
| continue | |
| prompt_template = get_prompt_template("default") | |
| responses = self._generate_responses( | |
| client, samples, prompt_template, | |
| desc=f" {model} - Noise {int(noise_ratio*100)}%" | |
| ) | |
| ground_truths = [s.answer for s in samples] | |
| # Pass the noise_ratio for this batch | |
| result = self.evaluator.evaluate_noise_robustness( | |
| responses, ground_truths, model, noise_ratio | |
| ) | |
| results.append(result) | |
| print(f" Noise {int(noise_ratio*100)}%: Accuracy = {result.accuracy:.2f}%") | |
| return results | |
| def evaluate_negative_rejection( | |
| self, | |
| model: str, | |
| max_samples: Optional[int] = None | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate negative rejection for a model. | |
| Args: | |
| model: The model name to evaluate. | |
| max_samples: Maximum samples to evaluate. | |
| Returns: | |
| EvaluationResult for negative rejection. | |
| """ | |
| print(f"\n[Negative Rejection] Evaluating {model}...") | |
| client = self._create_client(model) | |
| samples = self.data_loader.load_negative_rejection(max_samples) | |
| if not samples: | |
| print(" Warning: No negative rejection samples found.") | |
| return EvaluationResult( | |
| task_type="negative_rejection", | |
| model_name=model | |
| ) | |
| prompt_template = get_prompt_template("negative") | |
| responses = self._generate_responses( | |
| client, samples, prompt_template, | |
| desc=f" {model} - Negative Rejection" | |
| ) | |
| result = self.evaluator.evaluate_negative_rejection(responses, model) | |
| print(f" Rejection Rate: {result.rejection_rate:.2f}%") | |
| return result | |
| def evaluate_information_integration( | |
| self, | |
| model: str, | |
| max_samples: Optional[int] = None | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate information integration for a model. | |
| Args: | |
| model: The model name to evaluate. | |
| max_samples: Maximum samples to evaluate. | |
| Returns: | |
| EvaluationResult for information integration. | |
| """ | |
| print(f"\n[Information Integration] Evaluating {model}...") | |
| client = self._create_client(model) | |
| samples = self.data_loader.load_information_integration(max_samples) | |
| if not samples: | |
| print(" Warning: No information integration samples found.") | |
| return EvaluationResult( | |
| task_type="information_integration", | |
| model_name=model | |
| ) | |
| prompt_template = get_prompt_template("default") | |
| responses = self._generate_responses( | |
| client, samples, prompt_template, | |
| desc=f" {model} - Info Integration" | |
| ) | |
| ground_truths = [s.answer for s in samples] | |
| result = self.evaluator.evaluate_information_integration( | |
| responses, ground_truths, model | |
| ) | |
| print(f" Accuracy: {result.accuracy:.2f}%") | |
| return result | |
| def evaluate_counterfactual_robustness( | |
| self, | |
| model: str, | |
| max_samples: Optional[int] = None | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate counterfactual robustness for a model. | |
| Args: | |
| model: The model name to evaluate. | |
| max_samples: Maximum samples to evaluate. | |
| Returns: | |
| EvaluationResult for counterfactual robustness. | |
| """ | |
| print(f"\n[Counterfactual Robustness] Evaluating {model}...") | |
| client = self._create_client(model) | |
| samples = self.data_loader.load_counterfactual_robustness(max_samples) | |
| if not samples: | |
| print(" Warning: No counterfactual robustness samples found.") | |
| return EvaluationResult( | |
| task_type="counterfactual_robustness", | |
| model_name=model | |
| ) | |
| prompt_template = get_prompt_template("counterfactual") | |
| responses = self._generate_responses( | |
| client, samples, prompt_template, | |
| desc=f" {model} - Counterfactual" | |
| ) | |
| ground_truths = [s.answer for s in samples] | |
| counterfactual_answers = [s.counterfactual_answer or "" for s in samples] | |
| result = self.evaluator.evaluate_counterfactual_robustness( | |
| responses, ground_truths, counterfactual_answers, model | |
| ) | |
| print(f" Error Detection Rate: {result.error_detection_rate:.2f}%") | |
| print(f" Error Correction Rate: {result.error_correction_rate:.2f}%") | |
| return result | |
| def run_full_evaluation( | |
| self, | |
| max_samples_per_task: Optional[int] = None, | |
| tasks: Optional[List[str]] = None | |
| ) -> List[EvaluationResult]: | |
| """ | |
| Run full evaluation across all models and tasks. | |
| Args: | |
| max_samples_per_task: Maximum samples per task (for testing). | |
| tasks: List of tasks to run. Runs all if None. | |
| Returns: | |
| List of all evaluation results. | |
| """ | |
| print("="*60) | |
| print("RGB RAG EVALUATION PIPELINE") | |
| print("="*60) | |
| print(f"Models to evaluate: {self.models}") | |
| print(f"Max samples per task: {max_samples_per_task or 'All'}") | |
| print("="*60) | |
| all_tasks = tasks or [ | |
| "noise_robustness", | |
| "negative_rejection", | |
| "information_integration", | |
| "counterfactual_robustness" | |
| ] | |
| self.results = [] | |
| for model in self.models: | |
| print(f"\n{'='*60}") | |
| print(f"EVALUATING MODEL: {model}") | |
| print(f"{'='*60}") | |
| if "noise_robustness" in all_tasks: | |
| # Noise robustness returns a list of results (one per noise ratio) | |
| noise_results = self.evaluate_noise_robustness(model, max_samples_per_task) | |
| self.results.extend(noise_results) | |
| if "negative_rejection" in all_tasks: | |
| result = self.evaluate_negative_rejection(model, max_samples_per_task) | |
| self.results.append(result) | |
| if "information_integration" in all_tasks: | |
| result = self.evaluate_information_integration(model, max_samples_per_task) | |
| self.results.append(result) | |
| if "counterfactual_robustness" in all_tasks: | |
| result = self.evaluate_counterfactual_robustness(model, max_samples_per_task) | |
| self.results.append(result) | |
| # Print and save results | |
| self._print_results() | |
| self._save_results() | |
| return self.results | |
| def _print_results(self) -> None: | |
| """Print formatted results table.""" | |
| print(format_results_table(self.results)) | |
| def _save_results(self) -> None: | |
| """Save results to JSON file.""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_file = os.path.join(self.output_dir, f"results_{timestamp}.json") | |
| results_dict = { | |
| "timestamp": timestamp, | |
| "models": self.models, | |
| "results": [r.to_dict() for r in self.results] | |
| } | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(results_dict, f, indent=2) | |
| print(f"\nResults saved to: {output_file}") | |
| # Also save a summary CSV | |
| csv_file = os.path.join(self.output_dir, f"summary_{timestamp}.csv") | |
| self._save_csv_summary(csv_file) | |
| print(f"Summary saved to: {csv_file}") | |
| def _save_csv_summary(self, filepath: str) -> None: | |
| """Save a CSV summary of results.""" | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| # Header | |
| f.write("Model,Task,Accuracy,Rejection Rate,Error Detection,Error Correction,Samples\n") | |
| for r in self.results: | |
| f.write(f"{r.model_name},{r.task_type},{r.accuracy:.2f},{r.rejection_rate:.2f}," | |
| f"{r.error_detection_rate:.2f},{r.error_correction_rate:.2f},{r.total_samples}\n") | |
| def main(): | |
| """Main entry point.""" | |
| parser = argparse.ArgumentParser( | |
| description="RGB RAG Evaluation Pipeline using Groq LLMs" | |
| ) | |
| parser.add_argument( | |
| "--data-dir", "-d", | |
| default="data", | |
| help="Directory containing RGB dataset files" | |
| ) | |
| parser.add_argument( | |
| "--output-dir", "-o", | |
| default="results", | |
| help="Directory to save results" | |
| ) | |
| parser.add_argument( | |
| "--models", "-m", | |
| nargs="+", | |
| default=None, | |
| help="Models to evaluate (space-separated)" | |
| ) | |
| parser.add_argument( | |
| "--max-samples", "-n", | |
| type=int, | |
| default=None, | |
| help="Maximum samples per task (for testing)" | |
| ) | |
| parser.add_argument( | |
| "--tasks", "-t", | |
| nargs="+", | |
| choices=[ | |
| "noise_robustness", | |
| "negative_rejection", | |
| "information_integration", | |
| "counterfactual_robustness" | |
| ], | |
| default=None, | |
| help="Specific tasks to run (default: all)" | |
| ) | |
| args = parser.parse_args() | |
| pipeline = RGBEvaluationPipeline( | |
| data_dir=args.data_dir, | |
| output_dir=args.output_dir, | |
| models=args.models | |
| ) | |
| pipeline.run_full_evaluation( | |
| max_samples_per_task=args.max_samples, | |
| tasks=args.tasks | |
| ) | |
| if __name__ == "__main__": | |
| main() | |