Spaces:
Sleeping
Sleeping
| """ | |
| Unified evaluation script for semantic (LLM-based) and auto_metric (rule-based) evaluation. | |
| This script: | |
| 1. Reads eval_rubrics.json (from 1_generate_review_based_rubrics.py) containing rubrics for each paper | |
| 2. Reads input JSON file containing model reviews (supports multiple formats) | |
| 3. Supports three evaluation modes: | |
| - semantic: LLM-based rubrics evaluation (from 2_evaluate_direct.py) | |
| - auto_metric: Rule-based metrics evaluation (from 3_rule_evaluate.py) | |
| - both: Run both evaluations separately | |
| 4. Supports strict mode: normalize scores to discrete scales before computing metrics (--strict_mode) | |
| 5. Outputs separate JSON files for results and summaries | |
| Usage: | |
| # Semantic evaluation only | |
| python 2_evaluate.py \ | |
| --rubrics_path eval_rubrics.json \ | |
| --reviews_path model_reviews.json \ | |
| --mode semantic \ | |
| --yaml_path prompts.yaml \ | |
| --config_path configs.yaml \ | |
| --semantic_output semantic_results.json \ | |
| --max_workers 5 | |
| # Auto-metric evaluation only | |
| python 2_evaluate.py \ | |
| --rubrics_path eval_rubrics.json \ | |
| --reviews_path model_reviews.json \ | |
| --mode auto_metric \ | |
| --auto_metric_output auto_metric_results.json | |
| # Auto-metric evaluation with strict mode (normalize scores to discrete scales) | |
| python 2_evaluate.py \ | |
| --rubrics_path eval_rubrics.json \ | |
| --reviews_path model_reviews.json \ | |
| --mode auto_metric \ | |
| --auto_metric_output auto_metric_results.json \ | |
| --strict_mode | |
| # Auto-metric evaluation with manually specified input format (refined) | |
| python 2_evaluate.py \ | |
| --rubrics_path eval_rubrics.json \ | |
| --reviews_path model_reviews.json \ | |
| --mode auto_metric \ | |
| --auto_metric_output auto_metric_results.json \ | |
| --input_format refined | |
| # Auto-metric evaluation with manually specified input format (original) | |
| python 2_evaluate.py \ | |
| --rubrics_path eval_rubrics.json \ | |
| --reviews_path ours.json \ | |
| --mode auto_metric \ | |
| --auto_metric_output auto_metric_results.json \ | |
| --input_format original | |
| # Both evaluations | |
| python 2_evaluate.py \ | |
| --rubrics_path eval_rubrics.json \ | |
| --reviews_path model_reviews.json \ | |
| --mode both \ | |
| --yaml_path prompts.yaml \ | |
| --config_path configs.yaml \ | |
| --semantic_output semantic_results.json \ | |
| --auto_metric_output auto_metric_results.json \ | |
| --max_workers 32 | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import sys | |
| import argparse | |
| import yaml | |
| import math | |
| from typing import Dict, List, Any, Optional | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from tqdm import tqdm | |
| from itertools import combinations | |
| from scipy.stats import spearmanr | |
| from sklearn.metrics import precision_recall_fscore_support | |
| # Add parent directory to path | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| # Import parse_llm_response from local llm_service module | |
| import llm_service as local_llm_service | |
| parse_llm_response = local_llm_service.parse_llm_response | |
| # Import from shared/utils for gpt/vllm support | |
| project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| if project_root not in sys.path: | |
| sys.path.insert(0, project_root) | |
| from shared.utils.llm_service import LLMService | |
| from shared.utils.vllm_service import VLLMService | |
| from shared.utils.gpt_service import GPTService | |
| sys.path.insert(0, os.path.join(project_root, 'shared', 'utils')) | |
| from json_parser import parse_review_markdown | |
| class ReviewProcessor: | |
| """Handles the extraction and processing of reviews from different sources.""" | |
| def extract_review_content(pred_context): | |
| """ | |
| Extract the review content from the prediction context. | |
| Args: | |
| pred_context: Raw prediction data that contains the review | |
| Returns: | |
| str: Extracted review content | |
| """ | |
| try: | |
| # First attempt to extract from boxed format | |
| return pred_context.split(r'\boxed_review{')[-1].split('\n}')[0] | |
| except Exception: | |
| # Alternative extraction if the first method fails | |
| if isinstance(pred_context, dict) and 'output' in pred_context: | |
| return pred_context['output'].split(r'\boxed_review{')[-1].split('\n}')[0] | |
| else: | |
| # Return as is if extraction fails | |
| return pred_context | |
| # ============================================================================ | |
| # Semantic Evaluation Functions (from 2_evaluate_direct.py) | |
| # ============================================================================ | |
| def load_prompt_template(yaml_path: str) -> str: | |
| """Load the evaluator prompt from YAML file.""" | |
| with open(yaml_path, 'r', encoding='utf-8') as f: | |
| prompts = yaml.safe_load(f) | |
| return prompts.get('v1_evaluator_prompt', '') | |
| def build_evaluation_prompt( | |
| rubrics: List[Dict[str, Any]], | |
| paper_content: str, | |
| review: str, | |
| prompt_template: str | |
| ) -> str: | |
| """Build the evaluation prompt by replacing placeholders.""" | |
| rubrics_json = json.dumps(rubrics, indent=4, ensure_ascii=False) | |
| prompt = prompt_template.replace('{rubrics_json}', rubrics_json) | |
| prompt = prompt.replace('<<paper_content>>', paper_content) | |
| prompt = prompt.replace('<<review>>', review) | |
| return prompt | |
| def calculate_weighted_scores( | |
| raw_scores: Dict[str, Dict[str, Any]], | |
| rubrics: List[Dict[str, Any]] | |
| ) -> Dict[str, float]: | |
| """Calculate weighted scores for each rubric.""" | |
| rubric_weights = {r['title']: r['weight'] for r in rubrics} | |
| weighted_scores = {} | |
| for rubric_title, rubric_data in raw_scores.items(): | |
| if rubric_title not in rubric_weights: | |
| continue | |
| rubric_score = rubric_data.get('score', 0) | |
| if isinstance(rubric_score, str): | |
| try: | |
| rubric_score = int(rubric_score) | |
| except ValueError: | |
| rubric_score = 0 | |
| if rubric_score not in [0, 1]: | |
| rubric_score = 1 if rubric_score > 0 else 0 | |
| weight = rubric_weights[rubric_title] | |
| weighted_scores[rubric_title] = rubric_score * weight | |
| return weighted_scores | |
| def calculate_scores(raw_scores: Dict[str, Dict[str, Any]]) -> Dict[str, float]: | |
| """Calculate scores for each rubric.""" | |
| scores = {} | |
| for rubric_title, rubric_data in raw_scores.items(): | |
| scores[rubric_title] = rubric_data.get('score', 0) | |
| return scores | |
| def evaluate_review_semantic( | |
| entry: Dict[str, Any], | |
| paper_content: str, | |
| prompt_template: str, | |
| llm_service: LLMService | |
| ) -> Dict[str, Any]: | |
| """Evaluate a single review using article-specific rubrics.""" | |
| entry_id = entry.get('id', 'unknown') | |
| rubrics = entry.get('rubrics', []) | |
| model_review = entry.get('model_review', '') | |
| if not rubrics: | |
| return { | |
| 'id': entry_id, | |
| 'raw_scores': {}, | |
| 'weighted_scores': {}, | |
| 'total_score': 0.0, | |
| 'error': 'No valid rubrics found', | |
| 'raw_response': '' | |
| } | |
| # Build prompt | |
| prompt = build_evaluation_prompt(rubrics, paper_content, model_review, prompt_template) | |
| # Call LLM | |
| try: | |
| messages = [{"role": "user", "content": prompt}] | |
| response = llm_service.generate(messages=messages) | |
| # Parse response | |
| raw_scores = parse_llm_response(response) | |
| weighted_scores = calculate_scores(raw_scores) | |
| total_score = sum(weighted_scores.values()) | |
| return { | |
| 'id': entry_id, | |
| 'raw_scores': raw_scores, | |
| 'weighted_scores': weighted_scores, | |
| 'total_score': total_score, | |
| 'raw_response': response | |
| } | |
| except Exception as e: | |
| print(f"[ERROR] Error evaluating review {entry_id}: {e}") | |
| return { | |
| 'id': entry_id, | |
| 'raw_scores': {}, | |
| 'weighted_scores': {}, | |
| 'total_score': 0.0, | |
| 'error': str(e), | |
| 'raw_response': '' | |
| } | |
| def calculate_per_rubric_statistics( | |
| valid_results: List[Dict[str, Any]], | |
| rubric_titles: List[str] | |
| ) -> Dict[str, Dict[str, float]]: | |
| """Calculate per-rubric statistics from evaluation results.""" | |
| rubric_scores = {title: [] for title in rubric_titles} | |
| for result in valid_results: | |
| weighted_scores = result.get('weighted_scores', {}) | |
| if not isinstance(weighted_scores, dict): | |
| continue | |
| for rubric_title in rubric_titles: | |
| if rubric_title in weighted_scores: | |
| score = weighted_scores[rubric_title] | |
| if isinstance(score, str): | |
| try: | |
| score = float(score) | |
| except ValueError: | |
| continue | |
| elif isinstance(score, (int, float)): | |
| score = float(score) | |
| else: | |
| continue | |
| rubric_scores[rubric_title].append(score) | |
| per_rubric_stats = {} | |
| for rubric_title in rubric_titles: | |
| scores = rubric_scores[rubric_title] | |
| if not scores: | |
| continue | |
| mean_score = sum(scores) / len(scores) | |
| min_score = min(scores) | |
| max_score = max(scores) | |
| count = len(scores) | |
| if rubric_title == "False or Contradictory Claims": | |
| pass_count = sum(1 for s in scores if s >= 0) | |
| else: | |
| pass_count = sum(1 for s in scores if s >= 1) | |
| pass_rate = pass_count / count if count > 0 else 0.0 | |
| per_rubric_stats[rubric_title] = { | |
| 'mean': mean_score, | |
| 'min': min_score, | |
| 'max': max_score, | |
| 'count': count, | |
| 'pass_rate': pass_rate | |
| } | |
| return per_rubric_stats | |
| # ============================================================================ | |
| # Auto-Metric Evaluation Functions (from 3_rule_evaluate.py) | |
| # ============================================================================ | |
| def extract_scores_from_review(review_text: str) -> Dict[str, Any]: | |
| """Extract numeric scores and decision from a review markdown text.""" | |
| if not review_text: | |
| return {'soundness': None, 'presentation': None, 'rating': None, 'confidence': None, 'decision': None} | |
| try: | |
| parsed = parse_review_markdown(review_text) | |
| decision = parsed.get('decision', '') | |
| if decision: | |
| decision_lower = decision.lower().strip() | |
| if 'accept' in decision_lower: | |
| decision = 'accept' | |
| elif 'reject' in decision_lower: | |
| decision = 'reject' | |
| elif 'undecided' in decision_lower: | |
| decision = 'undecided' | |
| else: | |
| decision = decision_lower | |
| else: | |
| decision = None | |
| return { | |
| 'soundness': parsed.get('soundness'), | |
| 'presentation': parsed.get('presentation'), | |
| 'rating': parsed.get('rating'), | |
| 'confidence': parsed.get('confidence'), | |
| 'decision': decision | |
| } | |
| except Exception as e: | |
| print(f"Warning: Failed to parse review text: {e}") | |
| return {'soundness': None, 'presentation': None, 'rating': None, 'confidence': None, 'decision': None} | |
| def calculate_mse(predicted: float, ground_truth: float) -> Optional[float]: | |
| """Calculate Mean Squared Error for a single value.""" | |
| if predicted is None or ground_truth is None: | |
| return None | |
| return (predicted - ground_truth) ** 2 | |
| def calculate_mae(predicted: float, ground_truth: float) -> Optional[float]: | |
| """Calculate Mean Absolute Error for a single value.""" | |
| if predicted is None or ground_truth is None: | |
| return None | |
| return abs(predicted - ground_truth) | |
| def normalize_to_discrete_scale(score: Optional[float], scale_type: str) -> Optional[float]: | |
| """ | |
| Normalize a float score to the nearest discrete value based on scale type. | |
| Uses round-half-up tie-breaking (e.g., 3.5 rounds to 4, 1.5 rounds to 2). | |
| Args: | |
| score: The float score to normalize (can be None) | |
| scale_type: Either '0-5' for 0-5 scale (discrete: 0,1,2,3,4,5) | |
| or '0-10' for 0-10 scale (discrete: 0,2,4,6,8,10) | |
| Returns: | |
| Normalized discrete score, or None if input is None | |
| """ | |
| if score is None: | |
| return None | |
| try: | |
| score = float(score) | |
| except (ValueError, TypeError): | |
| return None | |
| if scale_type == '0-5': | |
| # Discrete values: 0, 1, 2, 3, 4, 5 | |
| discrete_values = [0, 1, 2, 3, 4, 5] | |
| # Clamp to valid range | |
| score = max(0, min(5, score)) | |
| # Find nearest discrete value, with round-half-up tie-breaking | |
| # For ties, prefer the higher value | |
| best_value = None | |
| best_distance = float('inf') | |
| for val in discrete_values: | |
| distance = abs(val - score) | |
| if distance < best_distance: | |
| best_distance = distance | |
| best_value = val | |
| elif distance == best_distance and val > best_value: | |
| # Tie-breaking: prefer higher value (round-half-up) | |
| best_value = val | |
| return best_value | |
| elif scale_type == '0-10': | |
| # Discrete values: 0, 2, 4, 6, 8, 10 | |
| discrete_values = [0, 2, 4, 6, 8, 10] | |
| # Clamp to valid range | |
| score = max(0, min(10, score)) | |
| # Find nearest discrete value, with round-half-up tie-breaking | |
| best_value = None | |
| best_distance = float('inf') | |
| for val in discrete_values: | |
| distance = abs(val - score) | |
| if distance < best_distance: | |
| best_distance = distance | |
| best_value = val | |
| elif distance == best_distance and val > best_value: | |
| # Tie-breaking: prefer higher value (round-half-up) | |
| best_value = val | |
| return best_value | |
| else: | |
| raise ValueError(f"Unknown scale_type: {scale_type}. Must be '0-5' or '0-10'") | |
| def normalize_scores_dict(scores: Dict[str, Optional[float]]) -> Dict[str, Optional[float]]: | |
| """ | |
| Normalize all scores in a dictionary to their appropriate discrete scales. | |
| Args: | |
| scores: Dictionary with keys 'soundness', 'presentation', 'rating', 'confidence' | |
| Returns: | |
| Dictionary with normalized scores | |
| """ | |
| normalized = {} | |
| # soundness, presentation, confidence use 0-5 scale | |
| for key in ['soundness', 'presentation', 'confidence']: | |
| normalized[key] = normalize_to_discrete_scale(scores.get(key), '0-5') | |
| # rating uses 0-10 scale | |
| normalized['rating'] = normalize_to_discrete_scale(scores.get('rating'), '0-10') | |
| return normalized | |
| def calculate_score_metrics( | |
| model_scores: Dict[str, float], | |
| ground_truth_scores: Dict[str, float], | |
| normalize: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Calculate MSE and MAE metrics for each scoring dimension. | |
| Args: | |
| model_scores: Dictionary with model scores | |
| ground_truth_scores: Dictionary with ground truth scores | |
| normalize: If True, normalize scores to discrete scales before computing metrics | |
| Returns: | |
| Dictionary with MSE, MAE metrics and optionally normalized scores | |
| """ | |
| dimensions = ['soundness', 'presentation', 'rating', 'confidence'] | |
| # Normalize scores to discrete scales if requested | |
| if normalize: | |
| model_scores_normalized = normalize_scores_dict(model_scores) | |
| gt_scores_normalized = normalize_scores_dict(ground_truth_scores) | |
| else: | |
| model_scores_normalized = model_scores | |
| gt_scores_normalized = ground_truth_scores | |
| mse_values = {} | |
| mae_values = {} | |
| valid_count = 0 | |
| for dim in dimensions: | |
| # Use normalized scores for metric calculation | |
| mse = calculate_mse(model_scores_normalized.get(dim), gt_scores_normalized.get(dim)) | |
| mae = calculate_mae(model_scores_normalized.get(dim), gt_scores_normalized.get(dim)) | |
| mse_values[f'{dim}_mse'] = mse | |
| mae_values[f'{dim}_mae'] = mae | |
| if mse is not None: | |
| valid_count += 1 | |
| overall_error = sum([v for v in mse_values.values() if v is not None]) | |
| result = { | |
| **mse_values, | |
| **mae_values, | |
| 'overall_error': overall_error if valid_count > 0 else None, | |
| 'valid_dimensions': valid_count | |
| } | |
| # Include normalized scores in result for transparency (only if normalize=True) | |
| if normalize: | |
| result['model_scores_normalized'] = model_scores_normalized | |
| result['gt_scores_normalized'] = gt_scores_normalized | |
| return result | |
| def normalize_score_value(value): | |
| """Normalize score value to float, handling string representations.""" | |
| if value is None: | |
| return None | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| if isinstance(value, str): | |
| # Try to extract numeric value from string (e.g., "2.75" -> 2.75) | |
| try: | |
| import re | |
| match = re.search(r'(\d+\.?\d*)', value) | |
| if match: | |
| return float(match.group(1)) | |
| except: | |
| pass | |
| return None | |
| def normalize_decision(decision): | |
| """Normalize decision string to standard format.""" | |
| if decision is None: | |
| return None | |
| decision_lower = str(decision).lower().strip() | |
| if 'accept' in decision_lower: | |
| return 'accept' | |
| elif 'reject' in decision_lower: | |
| return 'reject' | |
| elif 'undecided' in decision_lower: | |
| return 'undecided' | |
| else: | |
| return decision_lower | |
| def extract_scores_from_dict(scores_dict: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Extract scores from a structured dictionary (scores or initial_scores format). | |
| Args: | |
| scores_dict: Dict containing scores (e.g., {'rating': 5.75, 'soundness': '2.75', ...}) | |
| Returns: | |
| Dict with normalized scores: {'soundness', 'presentation', 'rating', 'confidence', 'decision'} | |
| """ | |
| if not scores_dict: | |
| return { | |
| 'soundness': None, | |
| 'presentation': None, | |
| 'rating': None, | |
| 'confidence': None, | |
| 'decision': None | |
| } | |
| return { | |
| 'soundness': normalize_score_value(scores_dict.get('soundness')), | |
| 'presentation': normalize_score_value(scores_dict.get('presentation')), | |
| 'rating': normalize_score_value(scores_dict.get('rating')), | |
| 'confidence': normalize_score_value(scores_dict.get('confidence')), | |
| 'decision': normalize_decision(scores_dict.get('decision')) | |
| } | |
| def evaluate_review_auto_metric(entry: Dict[str, Any], use_initial_scores: bool = False, strict_mode: bool = False) -> Dict[str, Any]: | |
| """ | |
| Evaluate a single entry by extracting scores and calculating metrics. | |
| Args: | |
| entry: Evaluation entry containing model_review, scores, initial_scores, etc. | |
| use_initial_scores: If True, use initial_scores instead of refined scores (for refined format) | |
| Returns: | |
| Dict containing evaluation metrics | |
| """ | |
| entry_id = entry.get('id', 'unknown') | |
| model_review = entry.get('model_review', '') | |
| format_type = entry.get('format', 'unknown') | |
| # Extract scores based on format | |
| model_scores = {} | |
| model_decision = None | |
| if format_type == 'refined' and not use_initial_scores: | |
| # Use refined scores from structured data | |
| scores_dict = entry.get('scores', {}) | |
| model_data = extract_scores_from_dict(scores_dict) | |
| model_scores = { | |
| 'soundness': model_data.get('soundness'), | |
| 'presentation': model_data.get('presentation'), | |
| 'rating': model_data.get('rating'), | |
| 'confidence': model_data.get('confidence') | |
| } | |
| model_decision = model_data.get('decision') | |
| elif format_type == 'refined' and use_initial_scores: | |
| # Use initial scores from structured data | |
| initial_scores_dict = entry.get('initial_scores', {}) | |
| model_data = extract_scores_from_dict(initial_scores_dict) | |
| model_scores = { | |
| 'soundness': model_data.get('soundness'), | |
| 'presentation': model_data.get('presentation'), | |
| 'rating': model_data.get('rating'), | |
| 'confidence': model_data.get('confidence') | |
| } | |
| model_decision = model_data.get('decision') | |
| elif format_type == 'original': | |
| # Use initial scores from structured data | |
| initial_scores_dict = entry.get('initial_scores', {}) | |
| model_data = extract_scores_from_dict(initial_scores_dict) | |
| model_scores = { | |
| 'soundness': model_data.get('soundness'), | |
| 'presentation': model_data.get('presentation'), | |
| 'rating': model_data.get('rating'), | |
| 'confidence': model_data.get('confidence') | |
| } | |
| model_decision = model_data.get('decision') | |
| # Fallback: If confidence is missing from structured data, try to extract from review text | |
| # (meta_review may not have confidence field, but review text might) | |
| if model_scores.get('confidence') is None and model_review: | |
| try: | |
| review_data = extract_scores_from_review(model_review) | |
| if review_data.get('confidence') is not None: | |
| model_scores['confidence'] = review_data.get('confidence') | |
| except Exception: | |
| pass # Keep confidence as None if extraction fails | |
| else: | |
| # Fallback: extract from markdown review text | |
| model_data = extract_scores_from_review(model_review) | |
| model_scores = { | |
| 'soundness': model_data.get('soundness'), | |
| 'presentation': model_data.get('presentation'), | |
| 'rating': model_data.get('rating'), | |
| 'confidence': model_data.get('confidence') | |
| } | |
| model_decision = model_data.get('decision') | |
| # Get ground truth scores from golden_review ONLY | |
| # Ground truth must ONLY come from golden_review, never from model output | |
| # If extraction fails, leave fields as None (do not use model_review as fallback) | |
| ground_truth_review = entry.get('golden_review', '') | |
| ground_truth_scores = {} | |
| gt_decision = None | |
| if not ground_truth_review: | |
| print(f"Warning: No golden_review found for entry {entry_id}. Ground truth scores will be empty.") | |
| else: | |
| try: | |
| # Extract scores from golden_review markdown text | |
| gt_data = extract_scores_from_review(ground_truth_review) | |
| if not gt_data: | |
| print(f"Warning: Failed to parse golden_review for entry {entry_id}. Ground truth scores will be empty.") | |
| else: | |
| ground_truth_scores = { | |
| 'soundness': gt_data.get('soundness'), | |
| 'presentation': gt_data.get('presentation'), | |
| 'rating': gt_data.get('rating'), | |
| 'confidence': gt_data.get('confidence') | |
| } | |
| gt_decision = normalize_decision(gt_data.get('decision')) | |
| # Note: If any field is None, it stays None - we do NOT use model_review as fallback | |
| # Using model output as ground truth would inflate evaluation scores | |
| except Exception as e: | |
| print(f"Warning: Failed to extract scores from golden_review for {entry_id}: {e}") | |
| print(f" Ground truth scores will be empty. Error: {str(e)}") | |
| # Calculate MSE and MAE metrics (with optional normalization in strict mode) | |
| score_metrics = calculate_score_metrics(model_scores, ground_truth_scores, normalize=strict_mode) | |
| # Calculate decision accuracy | |
| decision_match = False | |
| decision_accuracy = None | |
| if model_decision is not None and gt_decision is not None: | |
| model_decision_normalized = normalize_decision(model_decision) | |
| decision_match = (model_decision_normalized == gt_decision) | |
| decision_accuracy = 1.0 if decision_match else 0.0 | |
| result = { | |
| 'id': entry_id, | |
| 'format': format_type, | |
| 'model_soundness': model_scores.get('soundness'), | |
| 'model_presentation': model_scores.get('presentation'), | |
| 'model_rating': model_scores.get('rating'), | |
| 'model_confidence': model_scores.get('confidence'), | |
| 'model_decision': model_decision, | |
| 'gt_soundness': ground_truth_scores.get('soundness'), | |
| 'gt_presentation': ground_truth_scores.get('presentation'), | |
| 'gt_rating': ground_truth_scores.get('rating'), | |
| 'gt_confidence': ground_truth_scores.get('confidence'), | |
| 'gt_decision': gt_decision, | |
| 'decision_match': decision_match, | |
| 'decision_accuracy': decision_accuracy, | |
| **score_metrics | |
| } | |
| # Add prefix to indicate which scores were used | |
| if format_type == 'refined': | |
| if use_initial_scores: | |
| result['score_type'] = 'initial' | |
| else: | |
| result['score_type'] = 'refined' | |
| else: | |
| result['score_type'] = 'auto' | |
| return result | |
| def calculate_pairwise_accuracies(paper_scores: List[Dict[str, float]]) -> Dict[str, float]: | |
| """Calculate pairwise accuracy for each metric by comparing rankings.""" | |
| if len(paper_scores) < 2: | |
| return {} | |
| total_valid_pairs = {'rating': 0, 'soundness': 0, 'presentation': 0, 'confidence': 0} | |
| correct_pairs = {'rating': 0, 'soundness': 0, 'presentation': 0, 'confidence': 0} | |
| for paper1, paper2 in combinations(paper_scores, 2): | |
| # Check rating ranking | |
| if (paper1.get('true_rating') is not None and paper2.get('true_rating') is not None and | |
| paper1.get('pred_rating') is not None and paper2.get('pred_rating') is not None): | |
| total_valid_pairs['rating'] += 1 | |
| true_order = paper1['true_rating'] > paper2['true_rating'] | |
| pred_order = paper1['pred_rating'] > paper2['pred_rating'] | |
| if true_order == pred_order: | |
| correct_pairs['rating'] += 1 | |
| # Similar for other dimensions... | |
| # (abbreviated for space, similar logic for soundness, presentation, confidence) | |
| for metric in ['soundness', 'presentation', 'confidence']: | |
| true_key = f'true_{metric}' | |
| pred_key = f'pred_{metric}' | |
| if (paper1.get(true_key) is not None and paper2.get(true_key) is not None and | |
| paper1.get(pred_key) is not None and paper2.get(pred_key) is not None): | |
| total_valid_pairs[metric] += 1 | |
| true_order = paper1[true_key] > paper2[true_key] | |
| pred_order = paper1[pred_key] > paper2[pred_key] | |
| if true_order == pred_order: | |
| correct_pairs[metric] += 1 | |
| pairwise_accuracies = { | |
| metric: correct_pairs[metric] / total_valid_pairs[metric] if total_valid_pairs[metric] > 0 else 0.0 | |
| for metric in ['rating', 'soundness', 'presentation', 'confidence'] | |
| } | |
| return pairwise_accuracies | |
| # ============================================================================ | |
| # Data Loading Functions | |
| # ============================================================================ | |
| def load_rubrics_json(rubrics_path: str) -> Dict[str, Dict[str, Any]]: | |
| """Load rubrics JSON and create lookup by id.""" | |
| with open(rubrics_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| return {item['id']: item for item in data} | |
| elif isinstance(data, dict): | |
| return data | |
| else: | |
| raise ValueError(f"Invalid rubrics JSON format: expected list or dict, got {type(data)}") | |
| def load_model_reviews_json(reviews_path: str, format_override: Optional[str] = None) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Load model reviews JSON and extract reviews by id. | |
| Supports two input formats: | |
| 1. Refined format: Contains 'scores' and 'initial_scores' fields (from refinement pipeline) | |
| 2. Original format: Contains 'model_prediction' with 'meta_review' and 'decision' (like ours.json) | |
| Args: | |
| reviews_path: Path to JSON file containing model reviews | |
| format_override: Optional format override ('refined', 'original', or None for auto-detect) | |
| Returns: | |
| Dict mapping paper_id to dict containing: | |
| - 'review': review text (markdown) | |
| - 'scores': refined scores dict (if available) | |
| - 'initial_scores': initial scores dict (if available) | |
| - 'format': 'refined' or 'original' | |
| """ | |
| with open(reviews_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, dict): | |
| data = list(data.values()) | |
| reviews_dict = {} | |
| for item in data: | |
| item_id = None | |
| review_text = '' | |
| scores = None | |
| initial_scores = None | |
| format_type = None | |
| # Use format override if provided, otherwise auto-detect | |
| if format_override and format_override != 'auto': | |
| # Force use specified format | |
| if format_override == 'refined': | |
| item_id = item.get('paper_id') or item.get('id') | |
| if not item_id: | |
| continue | |
| format_type = 'refined' | |
| review_text = item.get('review_markdown', '') or item.get('review', '') | |
| scores = item.get('scores', {}) | |
| initial_scores = item.get('initial_scores', {}) | |
| elif format_override == 'original': | |
| item_id = item.get('id') | |
| if not item_id: | |
| continue | |
| format_type = 'original' | |
| model_prediction = item.get('model_prediction', {}) | |
| meta_review = model_prediction.get('meta_review', {}) | |
| review_text = meta_review.get('content', '') or model_prediction.get('raw_text', '') | |
| initial_scores = { | |
| 'rating': meta_review.get('rating'), | |
| 'soundness': meta_review.get('soundness'), | |
| 'presentation': meta_review.get('presentation'), | |
| 'contribution': meta_review.get('contribution'), | |
| 'decision': model_prediction.get('decision'), | |
| } | |
| else: | |
| raise ValueError(f"Unknown format_override: {format_override}. Must be 'refined', 'original', or 'auto'") | |
| else: | |
| # Auto-detect format | |
| if "paper_id" in item: | |
| # Refined format (from refinement pipeline) | |
| item_id = item.get('paper_id') | |
| if not item_id: | |
| continue | |
| # Check if this is refined format (has scores and initial_scores) | |
| if 'scores' in item and 'initial_scores' in item: | |
| format_type = 'refined' | |
| review_text = item.get('review_markdown', '') or item.get('review', '') | |
| scores = item.get('scores', {}) | |
| initial_scores = item.get('initial_scores', {}) | |
| else: | |
| # Standard format with paper_id | |
| format_type = 'standard' | |
| review_text = item.get('review_markdown', '') or item.get('review', '') | |
| elif "model_prediction" in item: | |
| # Original format (like ours.json) | |
| item_id = item.get('id') | |
| if not item_id: | |
| continue | |
| format_type = 'original' | |
| model_prediction = item.get('model_prediction', {}) | |
| meta_review = model_prediction.get('meta_review', {}) | |
| # Extract review content (prefer meta_review.content, fallback to raw_text) | |
| review_text = meta_review.get('content', '') or model_prediction.get('raw_text', '') | |
| # Extract initial scores | |
| initial_scores = { | |
| 'rating': meta_review.get('rating'), | |
| 'soundness': meta_review.get('soundness'), | |
| 'presentation': meta_review.get('presentation'), | |
| 'contribution': meta_review.get('contribution'), | |
| 'decision': model_prediction.get('decision'), | |
| } | |
| else: | |
| # Legacy format (pred_fast_mode) | |
| item_id = item.get('id') | |
| if not item_id: | |
| continue | |
| format_type = 'legacy' | |
| review_dict = item.get('pred_fast_mode', {}) | |
| if isinstance(review_dict, dict): | |
| # review_text = review_dict.get('raw_text', '') | |
| review_text = review_dict | |
| else: | |
| review_text = str(review_dict) | |
| # Extract review content from the review text field | |
| try: | |
| if review_text: | |
| extracted_review = ReviewProcessor.extract_review_content(review_text) | |
| else: | |
| extracted_review = '' | |
| reviews_dict[item_id] = { | |
| 'review': extracted_review, | |
| 'scores': scores, | |
| 'initial_scores': initial_scores, | |
| 'format': format_type | |
| } | |
| except Exception as e: | |
| print(f"[WARN] Failed to extract review for {item_id}: {e}") | |
| continue | |
| return reviews_dict | |
| def combine_rubrics_and_reviews( | |
| rubrics_data: Dict[str, Dict[str, Any]], | |
| reviews_dict: Dict[str, Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Combine rubrics and reviews into evaluation entries. | |
| Args: | |
| rubrics_data: Dict mapping paper_id to rubric entry | |
| reviews_dict: Dict mapping paper_id to dict containing 'review', 'scores', 'initial_scores', 'format' | |
| Returns: | |
| List of evaluation entries with model_review, scores, initial_scores, and format info | |
| """ | |
| combined = [] | |
| missing_reviews = [] | |
| for paper_id, rubric_entry in rubrics_data.items(): | |
| review_data = reviews_dict.get(paper_id) | |
| if not review_data or not review_data.get('review'): | |
| missing_reviews.append(paper_id) | |
| continue | |
| entry = { | |
| 'id': paper_id, | |
| 'paper_context': rubric_entry.get('paper_context', ''), | |
| 'decision': rubric_entry.get('decision', ''), | |
| 'golden_review': rubric_entry.get('golden_review', ''), | |
| 'rubrics': rubric_entry.get('rubrics', []), | |
| 'model_review': review_data.get('review', ''), | |
| 'scores': review_data.get('scores'), # Refined scores (if available) | |
| 'initial_scores': review_data.get('initial_scores'), # Initial scores (if available) | |
| 'format': review_data.get('format', 'unknown') # Format type | |
| } | |
| combined.append(entry) | |
| if missing_reviews: | |
| print(f"[WARN] {len(missing_reviews)} papers have no model review, skipping them") | |
| return combined | |
| # ============================================================================ | |
| # LLM Service Configuration | |
| # ============================================================================ | |
| def load_llm_config(config_path: str) -> Dict[str, Any]: | |
| """Load LLM configuration from YAML file.""" | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| config = yaml.safe_load(f) | |
| return config | |
| def create_llm_service_from_config(config: Dict[str, Any]) -> LLMService: | |
| """Create LLM service from configuration.""" | |
| mode = config.get('mode', 'gpt').lower() | |
| if mode == 'gpt': | |
| gpt_config = config.get('gpt', {}) | |
| api_key = gpt_config.get('api_key') or os.getenv('OPENAI_API_KEY') | |
| if not api_key: | |
| raise ValueError("GPT mode requires api_key in configs.yaml or OPENAI_API_KEY environment variable") | |
| service = GPTService( | |
| api_key=api_key, | |
| model_name=gpt_config.get('model_name', 'gpt-4o'), | |
| base_url=gpt_config.get('base_url'), | |
| timeout=gpt_config.get('timeout', 300) | |
| ) | |
| return service | |
| elif mode == 'vllm': | |
| vllm_config = config.get('vllm', {}) | |
| service = VLLMService( | |
| base_url=vllm_config.get('base_url', 'http://localhost:8000/v1'), | |
| api_key=vllm_config.get('api_key', 'dummy-key'), | |
| model_name=vllm_config.get('model_name'), | |
| timeout=vllm_config.get('timeout', 300), | |
| max_concurrent_requests=vllm_config.get('max_concurrent_requests', 64), | |
| max_retries=vllm_config.get('max_retries', 3), | |
| retry_delay=vllm_config.get('retry_delay', 1.0), | |
| retry_backoff=vllm_config.get('retry_backoff', 2.0) | |
| ) | |
| return service | |
| else: | |
| raise ValueError(f"Unknown mode: {mode}. Must be 'gpt' or 'vllm'") | |
| # ============================================================================ | |
| # Main Evaluation Functions | |
| # ============================================================================ | |
| def run_semantic_evaluation( | |
| evaluation_data: List[Dict[str, Any]], | |
| prompt_template: str, | |
| llm_service: LLMService, | |
| max_workers: int | |
| ) -> tuple: | |
| """Run semantic evaluation and return results and summary.""" | |
| print(f"\n{'='*80}") | |
| print("RUNNING SEMANTIC EVALUATION") | |
| print(f"{'='*80}") | |
| print(f"Evaluating {len(evaluation_data)} reviews using {max_workers} workers...") | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_entry = { | |
| executor.submit( | |
| evaluate_review_semantic, | |
| entry, | |
| entry['paper_context'], | |
| prompt_template, | |
| llm_service | |
| ): entry | |
| for entry in evaluation_data | |
| } | |
| for future in tqdm(as_completed(future_to_entry), total=len(evaluation_data), desc="Semantic evaluation"): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| except Exception as e: | |
| entry = future_to_entry[future] | |
| print(f"\n[ERROR] Failed to process entry {entry.get('id', 'unknown')}: {e}") | |
| results.append({ | |
| 'id': entry.get('id', 'unknown'), | |
| 'raw_scores': {}, | |
| 'weighted_scores': {}, | |
| 'total_score': 0.0, | |
| 'error': str(e), | |
| 'raw_response': '' | |
| }) | |
| # Calculate statistics | |
| valid_results = [r for r in results if 'error' not in r and r.get('weighted_scores')] | |
| review_scores = [r.get('total_score', 0.0) for r in valid_results] | |
| summary = { | |
| 'total_entries': len(results), | |
| 'valid_entries': len(valid_results), | |
| 'failed_entries': len(results) - len(valid_results) | |
| } | |
| if review_scores: | |
| summary['overall_score'] = { | |
| 'mean': sum(review_scores) / len(review_scores), | |
| 'min': min(review_scores), | |
| 'max': max(review_scores) | |
| } | |
| # Calculate per-rubric statistics (extract rubric titles from first entry) | |
| if evaluation_data and evaluation_data[0].get('rubrics'): | |
| rubric_titles = [r['title'] for r in evaluation_data[0]['rubrics']] | |
| per_rubric_stats = calculate_per_rubric_statistics(valid_results, rubric_titles) | |
| summary['per_rubric_statistics'] = per_rubric_stats | |
| return results, summary | |
| def run_auto_metric_evaluation( | |
| evaluation_data: List[Dict[str, Any]], | |
| strict_mode: bool = False | |
| ) -> tuple: | |
| """ | |
| Run auto-metric evaluation and return results and summary. | |
| For refined format (has scores and initial_scores), evaluates both: | |
| - Refined scores evaluation | |
| - Initial scores evaluation | |
| For original format (only initial_scores), evaluates: | |
| - Initial scores evaluation only | |
| Returns: | |
| Tuple of (results_list, summary_dict) | |
| - results_list: List of evaluation results (may contain both refined and initial results for refined format) | |
| - summary_dict: Summary statistics | |
| """ | |
| print(f"\n{'='*80}") | |
| print("RUNNING AUTO-METRIC EVALUATION") | |
| print(f"{'='*80}") | |
| print(f"Evaluating {len(evaluation_data)} entries...") | |
| # Detect format types | |
| refined_format_count = sum(1 for e in evaluation_data if e.get('format') == 'refined') | |
| original_format_count = sum(1 for e in evaluation_data if e.get('format') == 'original') | |
| if refined_format_count > 0: | |
| print(f"Detected {refined_format_count} entries in refined format (will evaluate both refined and initial scores)") | |
| if original_format_count > 0: | |
| print(f"Detected {original_format_count} entries in original format (will evaluate initial scores only)") | |
| results = [] | |
| for entry in tqdm(evaluation_data, desc="Auto-metric evaluation"): | |
| format_type = entry.get('format', 'unknown') | |
| if format_type == 'refined': | |
| # Evaluate both refined scores and initial scores | |
| try: | |
| entry_id = entry.get('id', 'unknown') | |
| # Evaluate refined scores | |
| refined_result = evaluate_review_auto_metric(entry, use_initial_scores=False, strict_mode=strict_mode) | |
| refined_result['paper_id'] = entry_id # Keep original paper_id | |
| refined_result['id'] = f"{entry_id}_refined" | |
| results.append(refined_result) | |
| # Evaluate initial scores | |
| initial_result = evaluate_review_auto_metric(entry, use_initial_scores=True, strict_mode=strict_mode) | |
| initial_result['paper_id'] = entry_id # Keep original paper_id | |
| initial_result['id'] = f"{entry_id}_initial" | |
| results.append(initial_result) | |
| except Exception as e: | |
| print(f"Error evaluating entry {entry.get('id', 'unknown')}: {e}") | |
| results.append({ | |
| 'id': entry.get('id', 'unknown'), | |
| 'error': str(e) | |
| }) | |
| else: | |
| # Evaluate initial scores only (or extract from markdown) | |
| try: | |
| result = evaluate_review_auto_metric(entry, use_initial_scores=False, strict_mode=strict_mode) | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Error evaluating entry {entry.get('id', 'unknown')}: {e}") | |
| results.append({ | |
| 'id': entry.get('id', 'unknown'), | |
| 'error': str(e) | |
| }) | |
| # Calculate statistics | |
| valid_results = [r for r in results if 'error' not in r] | |
| mse_results = [r for r in valid_results if r.get('overall_error') is not None] | |
| # Separate refined and initial results for refined format | |
| refined_results = [r for r in valid_results if r.get('score_type') == 'refined'] | |
| initial_results = [r for r in valid_results if r.get('score_type') == 'initial'] | |
| auto_results = [r for r in valid_results if r.get('score_type') == 'auto' or r.get('score_type') is None] | |
| summary = { | |
| 'total_entries': len(results), | |
| 'valid_entries': len(valid_results), | |
| 'mse_entries': len(mse_results), | |
| 'refined_results_count': len(refined_results), | |
| 'initial_results_count': len(initial_results), | |
| 'auto_results_count': len(auto_results) | |
| } | |
| # Calculate MSE/MAE statistics | |
| # For refined format, only use refined results for overall statistics (avoid double counting) | |
| # For other formats, use all results | |
| if refined_format_count > 0: | |
| # Refined format: use only refined results for overall statistics | |
| stats_results = [r for r in refined_results if r.get('overall_error') is not None] | |
| else: | |
| # Original/other formats: use all results | |
| stats_results = mse_results | |
| if stats_results: | |
| dimensions = ['soundness', 'presentation', 'confidence', 'rating'] | |
| mse_stats = {} | |
| mae_stats = {} | |
| for dim in dimensions: | |
| mse_list = [r.get(f'{dim}_mse') for r in stats_results if r.get(f'{dim}_mse') is not None] | |
| mae_list = [r.get(f'{dim}_mae') for r in stats_results if r.get(f'{dim}_mae') is not None] | |
| mse_clean = [x for x in mse_list if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| mae_clean = [x for x in mae_list if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| if mse_clean: | |
| mse_stats[dim] = { | |
| 'mean': sum(mse_clean) / len(mse_clean), | |
| 'count': len(mse_clean) | |
| } | |
| if mae_clean: | |
| mae_stats[dim] = { | |
| 'mean': sum(mae_clean) / len(mae_clean), | |
| 'count': len(mae_clean) | |
| } | |
| overall_errors = [r.get('overall_error') for r in stats_results if r.get('overall_error') is not None] | |
| overall_clean = [x for x in overall_errors if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| if overall_clean: | |
| summary['overall_error'] = { | |
| 'mean': sum(overall_clean) / len(overall_clean), | |
| 'count': len(overall_clean) | |
| } | |
| summary['mse_statistics'] = mse_stats | |
| summary['mae_statistics'] = mae_stats | |
| # Calculate separate statistics for refined and initial results | |
| if refined_results: | |
| refined_mse_results = [r for r in refined_results if r.get('overall_error') is not None] | |
| if refined_mse_results: | |
| refined_mse_stats = {} | |
| refined_mae_stats = {} | |
| for dim in dimensions: | |
| mse_list = [r.get(f'{dim}_mse') for r in refined_mse_results if r.get(f'{dim}_mse') is not None] | |
| mae_list = [r.get(f'{dim}_mae') for r in refined_mse_results if r.get(f'{dim}_mae') is not None] | |
| mse_clean = [x for x in mse_list if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| mae_clean = [x for x in mae_list if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| if mse_clean: | |
| refined_mse_stats[dim] = {'mean': sum(mse_clean) / len(mse_clean), 'count': len(mse_clean)} | |
| if mae_clean: | |
| refined_mae_stats[dim] = {'mean': sum(mae_clean) / len(mae_clean), 'count': len(mae_clean)} | |
| summary['refined_mse_statistics'] = refined_mse_stats | |
| summary['refined_mae_statistics'] = refined_mae_stats | |
| if initial_results: | |
| initial_mse_results = [r for r in initial_results if r.get('overall_error') is not None] | |
| if initial_mse_results: | |
| initial_mse_stats = {} | |
| initial_mae_stats = {} | |
| for dim in dimensions: | |
| mse_list = [r.get(f'{dim}_mse') for r in initial_mse_results if r.get(f'{dim}_mse') is not None] | |
| mae_list = [r.get(f'{dim}_mae') for r in initial_mse_results if r.get(f'{dim}_mae') is not None] | |
| mse_clean = [x for x in mse_list if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| mae_clean = [x for x in mae_list if x is not None and not (isinstance(x, float) and math.isnan(x))] | |
| if mse_clean: | |
| initial_mse_stats[dim] = {'mean': sum(mse_clean) / len(mse_clean), 'count': len(mse_clean)} | |
| if mae_clean: | |
| initial_mae_stats[dim] = {'mean': sum(mae_clean) / len(mae_clean), 'count': len(mae_clean)} | |
| summary['initial_mse_statistics'] = initial_mse_stats | |
| summary['initial_mae_statistics'] = initial_mae_stats | |
| # Calculate Spearman correlations | |
| def filter_valid_pairs(true_list, pred_list): | |
| filtered_true = [] | |
| filtered_pred = [] | |
| for t, p in zip(true_list, pred_list): | |
| if (t is not None and p is not None and | |
| not (isinstance(t, float) and math.isnan(t)) and | |
| not (isinstance(p, float) and math.isnan(p))): | |
| filtered_true.append(t) | |
| filtered_pred.append(p) | |
| return filtered_true, filtered_pred | |
| # Calculate Spearman correlations | |
| # For refined format, calculate separately for refined and initial, and use refined for overall | |
| # For other formats, use all results | |
| if refined_format_count > 0: | |
| # Calculate refined spearman correlations | |
| refined_spearman_stats = {} | |
| dimensions = ['soundness', 'presentation', 'confidence', 'rating'] | |
| for dim in dimensions: | |
| true_values = [r.get(f'gt_{dim}') for r in refined_results] | |
| pred_values = [r.get(f'model_{dim}') for r in refined_results] | |
| true_clean, pred_clean = filter_valid_pairs(true_values, pred_values) | |
| if len(true_clean) >= 2 and len(pred_clean) >= 2: | |
| try: | |
| corr, _ = spearmanr(true_clean, pred_clean) | |
| if not math.isnan(corr): | |
| refined_spearman_stats[dim] = { | |
| 'correlation': corr, | |
| 'count': len(true_clean) | |
| } | |
| except Exception: | |
| pass | |
| # Calculate initial spearman correlations | |
| initial_spearman_stats = {} | |
| for dim in dimensions: | |
| true_values = [r.get(f'gt_{dim}') for r in initial_results] | |
| pred_values = [r.get(f'model_{dim}') for r in initial_results] | |
| true_clean, pred_clean = filter_valid_pairs(true_values, pred_values) | |
| if len(true_clean) >= 2 and len(pred_clean) >= 2: | |
| try: | |
| corr, _ = spearmanr(true_clean, pred_clean) | |
| if not math.isnan(corr): | |
| initial_spearman_stats[dim] = { | |
| 'correlation': corr, | |
| 'count': len(true_clean) | |
| } | |
| except Exception: | |
| pass | |
| # Use refined for overall statistics (avoid double counting) | |
| summary['spearman_correlations'] = refined_spearman_stats | |
| summary['refined_spearman_correlations'] = refined_spearman_stats | |
| summary['initial_spearman_correlations'] = initial_spearman_stats | |
| else: | |
| # Original/other formats: use all results | |
| correlation_results = valid_results | |
| spearman_stats = {} | |
| dimensions = ['soundness', 'presentation', 'confidence', 'rating'] | |
| for dim in dimensions: | |
| true_values = [r.get(f'gt_{dim}') for r in correlation_results] | |
| pred_values = [r.get(f'model_{dim}') for r in correlation_results] | |
| true_clean, pred_clean = filter_valid_pairs(true_values, pred_values) | |
| if len(true_clean) >= 2 and len(pred_clean) >= 2: | |
| try: | |
| corr, _ = spearmanr(true_clean, pred_clean) | |
| if not math.isnan(corr): | |
| spearman_stats[dim] = { | |
| 'correlation': corr, | |
| 'count': len(true_clean) | |
| } | |
| except Exception: | |
| pass | |
| summary['spearman_correlations'] = spearman_stats | |
| # Calculate Decision metrics | |
| # For refined format, calculate separately for refined and initial, and use refined for overall | |
| # For other formats, use all results | |
| if refined_format_count > 0: | |
| # Calculate refined decision metrics | |
| refined_decision_results = [r for r in refined_results if r.get('gt_decision') is not None and r.get('model_decision') is not None] | |
| if refined_decision_results: | |
| true_decisions = [] | |
| pred_decisions = [] | |
| decision_acc = [] | |
| for r in refined_decision_results: | |
| gt_decision = str(r.get('gt_decision', '')).lower().strip() | |
| pred_decision = str(r.get('model_decision', '')).lower().strip() | |
| if 'accept' in pred_decision: | |
| pred_binary = 1 | |
| else: | |
| pred_binary = 0 | |
| if 'accept' in gt_decision: | |
| gt_binary = 1 | |
| else: | |
| gt_binary = 0 | |
| true_decisions.append(gt_binary) | |
| pred_decisions.append(pred_binary) | |
| if pred_decision == gt_decision or ('accept' in pred_decision and 'accept' in gt_decision) or ('reject' in pred_decision and 'reject' in gt_decision): | |
| decision_acc.append(1.0) | |
| else: | |
| decision_acc.append(0.0) | |
| if decision_acc: | |
| decision_accuracy = sum(decision_acc) / len(decision_acc) | |
| try: | |
| _, _, f1_score, _ = precision_recall_fscore_support(true_decisions, pred_decisions, average='macro') | |
| refined_decision_metrics = { | |
| 'accuracy': decision_accuracy, | |
| 'f1_macro': f1_score, | |
| 'count': len(decision_acc) | |
| } | |
| except Exception: | |
| refined_decision_metrics = { | |
| 'accuracy': decision_accuracy, | |
| 'count': len(decision_acc) | |
| } | |
| summary['refined_decision_metrics'] = refined_decision_metrics | |
| summary['decision_metrics'] = refined_decision_metrics # Use refined for overall | |
| # Calculate initial decision metrics | |
| initial_decision_results = [r for r in initial_results if r.get('gt_decision') is not None and r.get('model_decision') is not None] | |
| if initial_decision_results: | |
| true_decisions = [] | |
| pred_decisions = [] | |
| decision_acc = [] | |
| for r in initial_decision_results: | |
| gt_decision = str(r.get('gt_decision', '')).lower().strip() | |
| pred_decision = str(r.get('model_decision', '')).lower().strip() | |
| if 'accept' in pred_decision: | |
| pred_binary = 1 | |
| else: | |
| pred_binary = 0 | |
| if 'accept' in gt_decision: | |
| gt_binary = 1 | |
| else: | |
| gt_binary = 0 | |
| true_decisions.append(gt_binary) | |
| pred_decisions.append(pred_binary) | |
| if pred_decision == gt_decision or ('accept' in pred_decision and 'accept' in gt_decision) or ('reject' in pred_decision and 'reject' in gt_decision): | |
| decision_acc.append(1.0) | |
| else: | |
| decision_acc.append(0.0) | |
| if decision_acc: | |
| decision_accuracy = sum(decision_acc) / len(decision_acc) | |
| try: | |
| _, _, f1_score, _ = precision_recall_fscore_support(true_decisions, pred_decisions, average='macro') | |
| initial_decision_metrics = { | |
| 'accuracy': decision_accuracy, | |
| 'f1_macro': f1_score, | |
| 'count': len(decision_acc) | |
| } | |
| except Exception: | |
| initial_decision_metrics = { | |
| 'accuracy': decision_accuracy, | |
| 'count': len(decision_acc) | |
| } | |
| summary['initial_decision_metrics'] = initial_decision_metrics | |
| else: | |
| # Original/other formats: use all results | |
| decision_results = [r for r in valid_results if r.get('gt_decision') is not None and r.get('model_decision') is not None] | |
| if decision_results: | |
| true_decisions = [] | |
| pred_decisions = [] | |
| decision_acc = [] | |
| for r in decision_results: | |
| gt_decision = str(r.get('gt_decision', '')).lower().strip() | |
| pred_decision = str(r.get('model_decision', '')).lower().strip() | |
| if 'accept' in pred_decision: | |
| pred_binary = 1 | |
| else: | |
| pred_binary = 0 | |
| if 'accept' in gt_decision: | |
| gt_binary = 1 | |
| else: | |
| gt_binary = 0 | |
| true_decisions.append(gt_binary) | |
| pred_decisions.append(pred_binary) | |
| if pred_decision == gt_decision or ('accept' in pred_decision and 'accept' in gt_decision) or ('reject' in pred_decision and 'reject' in gt_decision): | |
| decision_acc.append(1.0) | |
| else: | |
| decision_acc.append(0.0) | |
| if decision_acc: | |
| decision_accuracy = sum(decision_acc) / len(decision_acc) | |
| try: | |
| _, _, f1_score, _ = precision_recall_fscore_support(true_decisions, pred_decisions, average='macro') | |
| summary['decision_metrics'] = { | |
| 'accuracy': decision_accuracy, | |
| 'f1_macro': f1_score, | |
| 'count': len(decision_acc) | |
| } | |
| except Exception: | |
| summary['decision_metrics'] = { | |
| 'accuracy': decision_accuracy, | |
| 'count': len(decision_acc) | |
| } | |
| # Calculate Pairwise comparison | |
| # For refined format, only use refined results (avoid double counting) | |
| # For other formats, use all results | |
| if refined_format_count > 0: | |
| pairwise_results = refined_results | |
| else: | |
| pairwise_results = valid_results | |
| paper_scores = [] | |
| for r in pairwise_results: | |
| if (r.get('gt_rating') is not None and r.get('model_rating') is not None) or \ | |
| (r.get('gt_soundness') is not None and r.get('model_soundness') is not None): | |
| paper_scores.append({ | |
| 'true_rating': r.get('gt_rating'), | |
| 'pred_rating': r.get('model_rating'), | |
| 'true_soundness': r.get('gt_soundness'), | |
| 'pred_soundness': r.get('model_soundness'), | |
| 'true_presentation': r.get('gt_presentation'), | |
| 'pred_presentation': r.get('model_presentation'), | |
| 'true_confidence': r.get('gt_confidence'), | |
| 'pred_confidence': r.get('model_confidence') | |
| }) | |
| if len(paper_scores) >= 2: | |
| pairwise_accuracies = calculate_pairwise_accuracies(paper_scores) | |
| summary['pairwise_accuracies'] = pairwise_accuracies | |
| return results, summary | |
| # ============================================================================ | |
| # Main Function | |
| # ============================================================================ | |
| def parse_args(): | |
| """Parse command line arguments.""" | |
| parser = argparse.ArgumentParser(description="Unified evaluation script for semantic and auto-metric evaluation") | |
| # Input paths | |
| parser.add_argument("--rubrics_path", type=str, required=True, | |
| help="Path to eval_rubrics.json file (from 1_generate_review_based_rubrics.py)") | |
| parser.add_argument("--reviews_path", type=str, required=True, | |
| help="Path to JSON file with model reviews (contains pred_fast_mode)") | |
| # Evaluation mode | |
| parser.add_argument("--mode", type=str, choices=["semantic", "auto_metric", "both"], default="both", | |
| help="Evaluation mode: semantic (LLM-based), auto_metric (rule-based), or both") | |
| # Output paths | |
| parser.add_argument("--semantic_output", type=str, default=None, | |
| help="Path to output JSON file for semantic evaluation results (required if mode is semantic or both)") | |
| parser.add_argument("--auto_metric_output", type=str, default=None, | |
| help="Path to output JSON file for auto-metric evaluation results (required if mode is auto_metric or both)") | |
| # Semantic evaluation settings | |
| parser.add_argument("--yaml_path", type=str, default=None, | |
| help="Path to prompts.yaml file (required for semantic evaluation)") | |
| parser.add_argument("--config_path", type=str, default=None, | |
| help="Path to configs.yaml file (required for semantic evaluation)") | |
| # Multi-threading | |
| parser.add_argument("--max_workers", type=int, default=None, | |
| help="Maximum number of worker threads for semantic evaluation (default: 5)") | |
| # Strict mode (normalize scores to discrete scales) | |
| parser.add_argument("--strict_mode", action="store_true", default=False, | |
| help="Enable strict mode: normalize scores to discrete scales before computing metrics (default: False)") | |
| # Input format override | |
| parser.add_argument("--input_format", type=str, choices=['auto', 'refined', 'original'], default='auto', | |
| help="Manually specify input JSON format: 'refined' (has scores and initial_scores), 'original' (has model_prediction), or 'auto' for auto-detection (default: 'auto')") | |
| return parser.parse_args() | |
| def main(): | |
| """Main execution function.""" | |
| args = parse_args() | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Resolve paths | |
| rubrics_path = args.rubrics_path | |
| if not os.path.isabs(rubrics_path): | |
| rubrics_path = os.path.join(script_dir, rubrics_path) | |
| reviews_path = args.reviews_path | |
| if not os.path.isabs(reviews_path): | |
| reviews_path = os.path.join(script_dir, reviews_path) | |
| max_workers = args.max_workers or int(os.getenv("MAX_WORKERS", "5")) | |
| # Validate mode and output paths | |
| if args.mode in ["semantic", "both"]: | |
| if not args.semantic_output: | |
| raise ValueError("--semantic_output is required when mode is 'semantic' or 'both'") | |
| if not args.yaml_path: | |
| raise ValueError("--yaml_path is required for semantic evaluation") | |
| if not args.config_path: | |
| raise ValueError("--config_path is required for semantic evaluation") | |
| if args.mode in ["auto_metric", "both"]: | |
| if not args.auto_metric_output: | |
| raise ValueError("--auto_metric_output is required when mode is 'auto_metric' or 'both'") | |
| # Check if files exist | |
| if not os.path.exists(rubrics_path): | |
| raise FileNotFoundError(f"Rubrics file not found: {rubrics_path}") | |
| if not os.path.exists(reviews_path): | |
| raise FileNotFoundError(f"Reviews file not found: {reviews_path}") | |
| # Load data | |
| print(f"Loading rubrics from {rubrics_path}...") | |
| rubrics_data = load_rubrics_json(rubrics_path) | |
| print(f"Loaded {len(rubrics_data)} rubrics entries") | |
| print(f"Loading model reviews from {reviews_path}...") | |
| if args.input_format != 'auto': | |
| print(f"Using manually specified format: {args.input_format}") | |
| else: | |
| print("Auto-detecting input format...") | |
| reviews_dict = load_model_reviews_json(reviews_path, format_override=args.input_format if args.input_format != 'auto' else None) | |
| print(f"Loaded {len(reviews_dict)} model reviews") | |
| # Combine rubrics and reviews | |
| print("Combining rubrics and reviews...") | |
| evaluation_data = combine_rubrics_and_reviews(rubrics_data, reviews_dict) | |
| print(f"Prepared {len(evaluation_data)} entries for evaluation") | |
| # Run evaluations based on mode | |
| if args.mode in ["semantic", "both"]: | |
| # Resolve semantic evaluation paths | |
| yaml_path = args.yaml_path | |
| if not os.path.isabs(yaml_path): | |
| yaml_path = os.path.join(script_dir, yaml_path) | |
| config_path = args.config_path | |
| if not os.path.isabs(config_path): | |
| config_path = os.path.join(script_dir, config_path) | |
| if not os.path.exists(yaml_path): | |
| raise FileNotFoundError(f"YAML file not found: {yaml_path}") | |
| if not os.path.exists(config_path): | |
| raise FileNotFoundError(f"Config file not found: {config_path}") | |
| # Load prompt template | |
| print(f"Loading prompt template from {yaml_path}...") | |
| prompt_template = load_prompt_template(yaml_path) | |
| if not prompt_template: | |
| raise ValueError("Could not find 'v1_evaluator_prompt' in YAML file") | |
| # Initialize LLM service | |
| print(f"Loading LLM configuration from {config_path}...") | |
| llm_config = load_llm_config(config_path) | |
| llm_service = create_llm_service_from_config(llm_config) | |
| mode = llm_config.get('mode', 'gpt') | |
| print(f"LLM service initialized (mode: {mode})") | |
| if hasattr(llm_service, 'model_name'): | |
| print(f"Using model: {llm_service.model_name}") | |
| # Run semantic evaluation | |
| semantic_results, semantic_summary = run_semantic_evaluation( | |
| evaluation_data, prompt_template, llm_service, max_workers | |
| ) | |
| # Save semantic results | |
| semantic_output = args.semantic_output | |
| if not os.path.isabs(semantic_output): | |
| semantic_output = os.path.join(script_dir, semantic_output) | |
| output_dir = os.path.dirname(semantic_output) | |
| os.makedirs(output_dir, exist_ok=True) | |
| with open(semantic_output, 'w', encoding='utf-8') as f: | |
| json.dump(semantic_results, f, ensure_ascii=False, indent=2) | |
| print(f"\nSemantic evaluation results saved to {semantic_output}") | |
| # Save semantic summary | |
| semantic_summary_path = semantic_output.replace('.json', '_summary.json') | |
| with open(semantic_summary_path, 'w', encoding='utf-8') as f: | |
| json.dump(semantic_summary, f, ensure_ascii=False, indent=2) | |
| print(f"Semantic evaluation summary saved to {semantic_summary_path}") | |
| # Print semantic summary | |
| print("\n" + "="*80) | |
| print("SEMANTIC EVALUATION SUMMARY") | |
| print("="*80) | |
| print(f"Total entries: {semantic_summary['total_entries']}") | |
| print(f"Valid entries: {semantic_summary['valid_entries']}") | |
| print(f"Failed entries: {semantic_summary['failed_entries']}") | |
| if 'overall_score' in semantic_summary: | |
| score = semantic_summary['overall_score'] | |
| print(f"\nOverall Score:") | |
| print(f" Mean: {score['mean']:.2f}") | |
| print(f" Min: {score['min']:.2f}") | |
| print(f" Max: {score['max']:.2f}") | |
| if args.mode in ["auto_metric", "both"]: | |
| # Run auto-metric evaluation | |
| auto_metric_results, auto_metric_summary = run_auto_metric_evaluation( | |
| evaluation_data, | |
| strict_mode=args.strict_mode | |
| ) | |
| # Save auto-metric results | |
| auto_metric_output = args.auto_metric_output | |
| if not os.path.isabs(auto_metric_output): | |
| auto_metric_output = os.path.join(script_dir, auto_metric_output) | |
| output_dir = os.path.dirname(auto_metric_output) | |
| os.makedirs(output_dir, exist_ok=True) | |
| with open(auto_metric_output, 'w', encoding='utf-8') as f: | |
| json.dump(auto_metric_results, f, ensure_ascii=False, indent=2) | |
| print(f"\nAuto-metric evaluation results saved to {auto_metric_output}") | |
| # Save auto-metric summary | |
| auto_metric_summary_path = auto_metric_output.replace('.json', '_summary.json') | |
| with open(auto_metric_summary_path, 'w', encoding='utf-8') as f: | |
| json.dump(auto_metric_summary, f, ensure_ascii=False, indent=2) | |
| print(f"Auto-metric evaluation summary saved to {auto_metric_summary_path}") | |
| # Print auto-metric summary | |
| print("\n" + "="*80) | |
| print("AUTO-METRIC EVALUATION SUMMARY") | |
| print("="*80) | |
| print(f"Total entries: {auto_metric_summary['total_entries']}") | |
| print(f"Valid entries: {auto_metric_summary['valid_entries']}") | |
| print(f"MSE entries: {auto_metric_summary['mse_entries']}") | |
| if 'mse_statistics' in auto_metric_summary: | |
| print("\nMSE Statistics:") | |
| for dim, stats in auto_metric_summary['mse_statistics'].items(): | |
| print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") | |
| if 'mae_statistics' in auto_metric_summary: | |
| print("\nMAE Statistics:") | |
| for dim, stats in auto_metric_summary['mae_statistics'].items(): | |
| print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") | |
| # Print refined and initial statistics if available | |
| if 'refined_mse_statistics' in auto_metric_summary: | |
| print("\nRefined Scores - MSE Statistics:") | |
| for dim, stats in auto_metric_summary['refined_mse_statistics'].items(): | |
| print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") | |
| if 'refined_mae_statistics' in auto_metric_summary: | |
| print("\nRefined Scores - MAE Statistics:") | |
| for dim, stats in auto_metric_summary['refined_mae_statistics'].items(): | |
| print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") | |
| if 'initial_mse_statistics' in auto_metric_summary: | |
| print("\nInitial Scores - MSE Statistics:") | |
| for dim, stats in auto_metric_summary['initial_mse_statistics'].items(): | |
| print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") | |
| if 'initial_mae_statistics' in auto_metric_summary: | |
| print("\nInitial Scores - MAE Statistics:") | |
| for dim, stats in auto_metric_summary['initial_mae_statistics'].items(): | |
| print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") | |
| if 'spearman_correlations' in auto_metric_summary: | |
| print("\nSpearman Correlations:") | |
| for dim, stats in auto_metric_summary['spearman_correlations'].items(): | |
| print(f" {dim.capitalize()}: {stats['correlation']:.4f} (n={stats['count']})") | |
| # Print refined and initial spearman correlations if available | |
| if 'refined_spearman_correlations' in auto_metric_summary: | |
| print("\nRefined Scores - Spearman Correlations:") | |
| for dim, stats in auto_metric_summary['refined_spearman_correlations'].items(): | |
| print(f" {dim.capitalize()}: {stats['correlation']:.4f} (n={stats['count']})") | |
| if 'initial_spearman_correlations' in auto_metric_summary: | |
| print("\nInitial Scores - Spearman Correlations:") | |
| for dim, stats in auto_metric_summary['initial_spearman_correlations'].items(): | |
| print(f" {dim.capitalize()}: {stats['correlation']:.4f} (n={stats['count']})") | |
| if 'decision_metrics' in auto_metric_summary: | |
| dm = auto_metric_summary['decision_metrics'] | |
| print(f"\nDecision Metrics:") | |
| print(f" Accuracy: {dm['accuracy']:.4f} (n={dm['count']})") | |
| if 'f1_macro' in dm: | |
| print(f" F1 (macro): {dm['f1_macro']:.4f}") | |
| # Print refined and initial decision metrics if available | |
| if 'refined_decision_metrics' in auto_metric_summary: | |
| print("\nRefined Scores - Decision Metrics:") | |
| rdm = auto_metric_summary['refined_decision_metrics'] | |
| print(f" Accuracy: {rdm['accuracy']:.4f} (n={rdm['count']})") | |
| if 'f1_macro' in rdm: | |
| print(f" F1 (macro): {rdm['f1_macro']:.4f}") | |
| if 'initial_decision_metrics' in auto_metric_summary: | |
| print("\nInitial Scores - Decision Metrics:") | |
| idm = auto_metric_summary['initial_decision_metrics'] | |
| print(f" Accuracy: {idm['accuracy']:.4f} (n={idm['count']})") | |
| if 'f1_macro' in idm: | |
| print(f" F1 (macro): {idm['f1_macro']:.4f}") | |
| print("\n" + "="*80) | |
| print("EVALUATION COMPLETE") | |
| print("="*80) | |
| if __name__ == "__main__": | |
| main() | |