""" Unified evaluation script for semantic (LLM-based) and auto_metric (rule-based) evaluation. This script: 1. Reads eval_rubrics.json (from 1_generate_review_based_rubrics.py) containing rubrics for each paper 2. Reads input JSON file containing model reviews (supports multiple formats) 3. Supports three evaluation modes: - semantic: LLM-based rubrics evaluation (from 2_evaluate_direct.py) - auto_metric: Rule-based metrics evaluation (from 3_rule_evaluate.py) - both: Run both evaluations separately 4. Supports strict mode: normalize scores to discrete scales before computing metrics (--strict_mode) 5. Outputs separate JSON files for results and summaries Usage: # Semantic evaluation only python 2_evaluate.py \ --rubrics_path eval_rubrics.json \ --reviews_path model_reviews.json \ --mode semantic \ --yaml_path prompts.yaml \ --config_path configs.yaml \ --semantic_output semantic_results.json \ --max_workers 5 # Auto-metric evaluation only python 2_evaluate.py \ --rubrics_path eval_rubrics.json \ --reviews_path model_reviews.json \ --mode auto_metric \ --auto_metric_output auto_metric_results.json # Auto-metric evaluation with strict mode (normalize scores to discrete scales) python 2_evaluate.py \ --rubrics_path eval_rubrics.json \ --reviews_path model_reviews.json \ --mode auto_metric \ --auto_metric_output auto_metric_results.json \ --strict_mode # Auto-metric evaluation with manually specified input format (refined) python 2_evaluate.py \ --rubrics_path eval_rubrics.json \ --reviews_path model_reviews.json \ --mode auto_metric \ --auto_metric_output auto_metric_results.json \ --input_format refined # Auto-metric evaluation with manually specified input format (original) python 2_evaluate.py \ --rubrics_path eval_rubrics.json \ --reviews_path ours.json \ --mode auto_metric \ --auto_metric_output auto_metric_results.json \ --input_format original # Both evaluations python 2_evaluate.py \ --rubrics_path eval_rubrics.json \ --reviews_path model_reviews.json \ --mode both \ --yaml_path prompts.yaml \ --config_path configs.yaml \ --semantic_output semantic_results.json \ --auto_metric_output auto_metric_results.json \ --max_workers 32 """ from __future__ import annotations import json import os import sys import argparse import yaml import math from typing import Dict, List, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from itertools import combinations from scipy.stats import spearmanr from sklearn.metrics import precision_recall_fscore_support # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import parse_llm_response from local llm_service module import llm_service as local_llm_service parse_llm_response = local_llm_service.parse_llm_response # Import from shared/utils for gpt/vllm support project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if project_root not in sys.path: sys.path.insert(0, project_root) from shared.utils.llm_service import LLMService from shared.utils.vllm_service import VLLMService from shared.utils.gpt_service import GPTService sys.path.insert(0, os.path.join(project_root, 'shared', 'utils')) from json_parser import parse_review_markdown class ReviewProcessor: """Handles the extraction and processing of reviews from different sources.""" @staticmethod def extract_review_content(pred_context): """ Extract the review content from the prediction context. Args: pred_context: Raw prediction data that contains the review Returns: str: Extracted review content """ try: # First attempt to extract from boxed format return pred_context.split(r'\boxed_review{')[-1].split('\n}')[0] except Exception: # Alternative extraction if the first method fails if isinstance(pred_context, dict) and 'output' in pred_context: return pred_context['output'].split(r'\boxed_review{')[-1].split('\n}')[0] else: # Return as is if extraction fails return pred_context # ============================================================================ # Semantic Evaluation Functions (from 2_evaluate_direct.py) # ============================================================================ def load_prompt_template(yaml_path: str) -> str: """Load the evaluator prompt from YAML file.""" with open(yaml_path, 'r', encoding='utf-8') as f: prompts = yaml.safe_load(f) return prompts.get('v1_evaluator_prompt', '') def build_evaluation_prompt( rubrics: List[Dict[str, Any]], paper_content: str, review: str, prompt_template: str ) -> str: """Build the evaluation prompt by replacing placeholders.""" rubrics_json = json.dumps(rubrics, indent=4, ensure_ascii=False) prompt = prompt_template.replace('{rubrics_json}', rubrics_json) prompt = prompt.replace('<>', paper_content) prompt = prompt.replace('<>', review) return prompt def calculate_weighted_scores( raw_scores: Dict[str, Dict[str, Any]], rubrics: List[Dict[str, Any]] ) -> Dict[str, float]: """Calculate weighted scores for each rubric.""" rubric_weights = {r['title']: r['weight'] for r in rubrics} weighted_scores = {} for rubric_title, rubric_data in raw_scores.items(): if rubric_title not in rubric_weights: continue rubric_score = rubric_data.get('score', 0) if isinstance(rubric_score, str): try: rubric_score = int(rubric_score) except ValueError: rubric_score = 0 if rubric_score not in [0, 1]: rubric_score = 1 if rubric_score > 0 else 0 weight = rubric_weights[rubric_title] weighted_scores[rubric_title] = rubric_score * weight return weighted_scores def calculate_scores(raw_scores: Dict[str, Dict[str, Any]]) -> Dict[str, float]: """Calculate scores for each rubric.""" scores = {} for rubric_title, rubric_data in raw_scores.items(): scores[rubric_title] = rubric_data.get('score', 0) return scores def evaluate_review_semantic( entry: Dict[str, Any], paper_content: str, prompt_template: str, llm_service: LLMService ) -> Dict[str, Any]: """Evaluate a single review using article-specific rubrics.""" entry_id = entry.get('id', 'unknown') rubrics = entry.get('rubrics', []) model_review = entry.get('model_review', '') if not rubrics: return { 'id': entry_id, 'raw_scores': {}, 'weighted_scores': {}, 'total_score': 0.0, 'error': 'No valid rubrics found', 'raw_response': '' } # Build prompt prompt = build_evaluation_prompt(rubrics, paper_content, model_review, prompt_template) # Call LLM try: messages = [{"role": "user", "content": prompt}] response = llm_service.generate(messages=messages) # Parse response raw_scores = parse_llm_response(response) weighted_scores = calculate_scores(raw_scores) total_score = sum(weighted_scores.values()) return { 'id': entry_id, 'raw_scores': raw_scores, 'weighted_scores': weighted_scores, 'total_score': total_score, 'raw_response': response } except Exception as e: print(f"[ERROR] Error evaluating review {entry_id}: {e}") return { 'id': entry_id, 'raw_scores': {}, 'weighted_scores': {}, 'total_score': 0.0, 'error': str(e), 'raw_response': '' } def calculate_per_rubric_statistics( valid_results: List[Dict[str, Any]], rubric_titles: List[str] ) -> Dict[str, Dict[str, float]]: """Calculate per-rubric statistics from evaluation results.""" rubric_scores = {title: [] for title in rubric_titles} for result in valid_results: weighted_scores = result.get('weighted_scores', {}) if not isinstance(weighted_scores, dict): continue for rubric_title in rubric_titles: if rubric_title in weighted_scores: score = weighted_scores[rubric_title] if isinstance(score, str): try: score = float(score) except ValueError: continue elif isinstance(score, (int, float)): score = float(score) else: continue rubric_scores[rubric_title].append(score) per_rubric_stats = {} for rubric_title in rubric_titles: scores = rubric_scores[rubric_title] if not scores: continue mean_score = sum(scores) / len(scores) min_score = min(scores) max_score = max(scores) count = len(scores) if rubric_title == "False or Contradictory Claims": pass_count = sum(1 for s in scores if s >= 0) else: pass_count = sum(1 for s in scores if s >= 1) pass_rate = pass_count / count if count > 0 else 0.0 per_rubric_stats[rubric_title] = { 'mean': mean_score, 'min': min_score, 'max': max_score, 'count': count, 'pass_rate': pass_rate } return per_rubric_stats # ============================================================================ # Auto-Metric Evaluation Functions (from 3_rule_evaluate.py) # ============================================================================ def extract_scores_from_review(review_text: str) -> Dict[str, Any]: """Extract numeric scores and decision from a review markdown text.""" if not review_text: return {'soundness': None, 'presentation': None, 'rating': None, 'confidence': None, 'decision': None} try: parsed = parse_review_markdown(review_text) decision = parsed.get('decision', '') if decision: decision_lower = decision.lower().strip() if 'accept' in decision_lower: decision = 'accept' elif 'reject' in decision_lower: decision = 'reject' elif 'undecided' in decision_lower: decision = 'undecided' else: decision = decision_lower else: decision = None return { 'soundness': parsed.get('soundness'), 'presentation': parsed.get('presentation'), 'rating': parsed.get('rating'), 'confidence': parsed.get('confidence'), 'decision': decision } except Exception as e: print(f"Warning: Failed to parse review text: {e}") return {'soundness': None, 'presentation': None, 'rating': None, 'confidence': None, 'decision': None} def calculate_mse(predicted: float, ground_truth: float) -> Optional[float]: """Calculate Mean Squared Error for a single value.""" if predicted is None or ground_truth is None: return None return (predicted - ground_truth) ** 2 def calculate_mae(predicted: float, ground_truth: float) -> Optional[float]: """Calculate Mean Absolute Error for a single value.""" if predicted is None or ground_truth is None: return None return abs(predicted - ground_truth) def normalize_to_discrete_scale(score: Optional[float], scale_type: str) -> Optional[float]: """ Normalize a float score to the nearest discrete value based on scale type. Uses round-half-up tie-breaking (e.g., 3.5 rounds to 4, 1.5 rounds to 2). Args: score: The float score to normalize (can be None) scale_type: Either '0-5' for 0-5 scale (discrete: 0,1,2,3,4,5) or '0-10' for 0-10 scale (discrete: 0,2,4,6,8,10) Returns: Normalized discrete score, or None if input is None """ if score is None: return None try: score = float(score) except (ValueError, TypeError): return None if scale_type == '0-5': # Discrete values: 0, 1, 2, 3, 4, 5 discrete_values = [0, 1, 2, 3, 4, 5] # Clamp to valid range score = max(0, min(5, score)) # Find nearest discrete value, with round-half-up tie-breaking # For ties, prefer the higher value best_value = None best_distance = float('inf') for val in discrete_values: distance = abs(val - score) if distance < best_distance: best_distance = distance best_value = val elif distance == best_distance and val > best_value: # Tie-breaking: prefer higher value (round-half-up) best_value = val return best_value elif scale_type == '0-10': # Discrete values: 0, 2, 4, 6, 8, 10 discrete_values = [0, 2, 4, 6, 8, 10] # Clamp to valid range score = max(0, min(10, score)) # Find nearest discrete value, with round-half-up tie-breaking best_value = None best_distance = float('inf') for val in discrete_values: distance = abs(val - score) if distance < best_distance: best_distance = distance best_value = val elif distance == best_distance and val > best_value: # Tie-breaking: prefer higher value (round-half-up) best_value = val return best_value else: raise ValueError(f"Unknown scale_type: {scale_type}. Must be '0-5' or '0-10'") def normalize_scores_dict(scores: Dict[str, Optional[float]]) -> Dict[str, Optional[float]]: """ Normalize all scores in a dictionary to their appropriate discrete scales. Args: scores: Dictionary with keys 'soundness', 'presentation', 'rating', 'confidence' Returns: Dictionary with normalized scores """ normalized = {} # soundness, presentation, confidence use 0-5 scale for key in ['soundness', 'presentation', 'confidence']: normalized[key] = normalize_to_discrete_scale(scores.get(key), '0-5') # rating uses 0-10 scale normalized['rating'] = normalize_to_discrete_scale(scores.get('rating'), '0-10') return normalized def calculate_score_metrics( model_scores: Dict[str, float], ground_truth_scores: Dict[str, float], normalize: bool = False ) -> Dict[str, Any]: """ Calculate MSE and MAE metrics for each scoring dimension. Args: model_scores: Dictionary with model scores ground_truth_scores: Dictionary with ground truth scores normalize: If True, normalize scores to discrete scales before computing metrics Returns: Dictionary with MSE, MAE metrics and optionally normalized scores """ dimensions = ['soundness', 'presentation', 'rating', 'confidence'] # Normalize scores to discrete scales if requested if normalize: model_scores_normalized = normalize_scores_dict(model_scores) gt_scores_normalized = normalize_scores_dict(ground_truth_scores) else: model_scores_normalized = model_scores gt_scores_normalized = ground_truth_scores mse_values = {} mae_values = {} valid_count = 0 for dim in dimensions: # Use normalized scores for metric calculation mse = calculate_mse(model_scores_normalized.get(dim), gt_scores_normalized.get(dim)) mae = calculate_mae(model_scores_normalized.get(dim), gt_scores_normalized.get(dim)) mse_values[f'{dim}_mse'] = mse mae_values[f'{dim}_mae'] = mae if mse is not None: valid_count += 1 overall_error = sum([v for v in mse_values.values() if v is not None]) result = { **mse_values, **mae_values, 'overall_error': overall_error if valid_count > 0 else None, 'valid_dimensions': valid_count } # Include normalized scores in result for transparency (only if normalize=True) if normalize: result['model_scores_normalized'] = model_scores_normalized result['gt_scores_normalized'] = gt_scores_normalized return result def normalize_score_value(value): """Normalize score value to float, handling string representations.""" if value is None: return None if isinstance(value, (int, float)): return float(value) if isinstance(value, str): # Try to extract numeric value from string (e.g., "2.75" -> 2.75) try: import re match = re.search(r'(\d+\.?\d*)', value) if match: return float(match.group(1)) except: pass return None def normalize_decision(decision): """Normalize decision string to standard format.""" if decision is None: return None decision_lower = str(decision).lower().strip() if 'accept' in decision_lower: return 'accept' elif 'reject' in decision_lower: return 'reject' elif 'undecided' in decision_lower: return 'undecided' else: return decision_lower def extract_scores_from_dict(scores_dict: Dict[str, Any]) -> Dict[str, Any]: """ Extract scores from a structured dictionary (scores or initial_scores format). Args: scores_dict: Dict containing scores (e.g., {'rating': 5.75, 'soundness': '2.75', ...}) Returns: Dict with normalized scores: {'soundness', 'presentation', 'rating', 'confidence', 'decision'} """ if not scores_dict: return { 'soundness': None, 'presentation': None, 'rating': None, 'confidence': None, 'decision': None } return { 'soundness': normalize_score_value(scores_dict.get('soundness')), 'presentation': normalize_score_value(scores_dict.get('presentation')), 'rating': normalize_score_value(scores_dict.get('rating')), 'confidence': normalize_score_value(scores_dict.get('confidence')), 'decision': normalize_decision(scores_dict.get('decision')) } def evaluate_review_auto_metric(entry: Dict[str, Any], use_initial_scores: bool = False, strict_mode: bool = False) -> Dict[str, Any]: """ Evaluate a single entry by extracting scores and calculating metrics. Args: entry: Evaluation entry containing model_review, scores, initial_scores, etc. use_initial_scores: If True, use initial_scores instead of refined scores (for refined format) Returns: Dict containing evaluation metrics """ entry_id = entry.get('id', 'unknown') model_review = entry.get('model_review', '') format_type = entry.get('format', 'unknown') # Extract scores based on format model_scores = {} model_decision = None if format_type == 'refined' and not use_initial_scores: # Use refined scores from structured data scores_dict = entry.get('scores', {}) model_data = extract_scores_from_dict(scores_dict) model_scores = { 'soundness': model_data.get('soundness'), 'presentation': model_data.get('presentation'), 'rating': model_data.get('rating'), 'confidence': model_data.get('confidence') } model_decision = model_data.get('decision') elif format_type == 'refined' and use_initial_scores: # Use initial scores from structured data initial_scores_dict = entry.get('initial_scores', {}) model_data = extract_scores_from_dict(initial_scores_dict) model_scores = { 'soundness': model_data.get('soundness'), 'presentation': model_data.get('presentation'), 'rating': model_data.get('rating'), 'confidence': model_data.get('confidence') } model_decision = model_data.get('decision') elif format_type == 'original': # Use initial scores from structured data initial_scores_dict = entry.get('initial_scores', {}) model_data = extract_scores_from_dict(initial_scores_dict) model_scores = { 'soundness': model_data.get('soundness'), 'presentation': model_data.get('presentation'), 'rating': model_data.get('rating'), 'confidence': model_data.get('confidence') } model_decision = model_data.get('decision') # Fallback: If confidence is missing from structured data, try to extract from review text # (meta_review may not have confidence field, but review text might) if model_scores.get('confidence') is None and model_review: try: review_data = extract_scores_from_review(model_review) if review_data.get('confidence') is not None: model_scores['confidence'] = review_data.get('confidence') except Exception: pass # Keep confidence as None if extraction fails else: # Fallback: extract from markdown review text model_data = extract_scores_from_review(model_review) model_scores = { 'soundness': model_data.get('soundness'), 'presentation': model_data.get('presentation'), 'rating': model_data.get('rating'), 'confidence': model_data.get('confidence') } model_decision = model_data.get('decision') # Get ground truth scores from golden_review ONLY # Ground truth must ONLY come from golden_review, never from model output # If extraction fails, leave fields as None (do not use model_review as fallback) ground_truth_review = entry.get('golden_review', '') ground_truth_scores = {} gt_decision = None if not ground_truth_review: print(f"Warning: No golden_review found for entry {entry_id}. Ground truth scores will be empty.") else: try: # Extract scores from golden_review markdown text gt_data = extract_scores_from_review(ground_truth_review) if not gt_data: print(f"Warning: Failed to parse golden_review for entry {entry_id}. Ground truth scores will be empty.") else: ground_truth_scores = { 'soundness': gt_data.get('soundness'), 'presentation': gt_data.get('presentation'), 'rating': gt_data.get('rating'), 'confidence': gt_data.get('confidence') } gt_decision = normalize_decision(gt_data.get('decision')) # Note: If any field is None, it stays None - we do NOT use model_review as fallback # Using model output as ground truth would inflate evaluation scores except Exception as e: print(f"Warning: Failed to extract scores from golden_review for {entry_id}: {e}") print(f" Ground truth scores will be empty. Error: {str(e)}") # Calculate MSE and MAE metrics (with optional normalization in strict mode) score_metrics = calculate_score_metrics(model_scores, ground_truth_scores, normalize=strict_mode) # Calculate decision accuracy decision_match = False decision_accuracy = None if model_decision is not None and gt_decision is not None: model_decision_normalized = normalize_decision(model_decision) decision_match = (model_decision_normalized == gt_decision) decision_accuracy = 1.0 if decision_match else 0.0 result = { 'id': entry_id, 'format': format_type, 'model_soundness': model_scores.get('soundness'), 'model_presentation': model_scores.get('presentation'), 'model_rating': model_scores.get('rating'), 'model_confidence': model_scores.get('confidence'), 'model_decision': model_decision, 'gt_soundness': ground_truth_scores.get('soundness'), 'gt_presentation': ground_truth_scores.get('presentation'), 'gt_rating': ground_truth_scores.get('rating'), 'gt_confidence': ground_truth_scores.get('confidence'), 'gt_decision': gt_decision, 'decision_match': decision_match, 'decision_accuracy': decision_accuracy, **score_metrics } # Add prefix to indicate which scores were used if format_type == 'refined': if use_initial_scores: result['score_type'] = 'initial' else: result['score_type'] = 'refined' else: result['score_type'] = 'auto' return result def calculate_pairwise_accuracies(paper_scores: List[Dict[str, float]]) -> Dict[str, float]: """Calculate pairwise accuracy for each metric by comparing rankings.""" if len(paper_scores) < 2: return {} total_valid_pairs = {'rating': 0, 'soundness': 0, 'presentation': 0, 'confidence': 0} correct_pairs = {'rating': 0, 'soundness': 0, 'presentation': 0, 'confidence': 0} for paper1, paper2 in combinations(paper_scores, 2): # Check rating ranking if (paper1.get('true_rating') is not None and paper2.get('true_rating') is not None and paper1.get('pred_rating') is not None and paper2.get('pred_rating') is not None): total_valid_pairs['rating'] += 1 true_order = paper1['true_rating'] > paper2['true_rating'] pred_order = paper1['pred_rating'] > paper2['pred_rating'] if true_order == pred_order: correct_pairs['rating'] += 1 # Similar for other dimensions... # (abbreviated for space, similar logic for soundness, presentation, confidence) for metric in ['soundness', 'presentation', 'confidence']: true_key = f'true_{metric}' pred_key = f'pred_{metric}' if (paper1.get(true_key) is not None and paper2.get(true_key) is not None and paper1.get(pred_key) is not None and paper2.get(pred_key) is not None): total_valid_pairs[metric] += 1 true_order = paper1[true_key] > paper2[true_key] pred_order = paper1[pred_key] > paper2[pred_key] if true_order == pred_order: correct_pairs[metric] += 1 pairwise_accuracies = { metric: correct_pairs[metric] / total_valid_pairs[metric] if total_valid_pairs[metric] > 0 else 0.0 for metric in ['rating', 'soundness', 'presentation', 'confidence'] } return pairwise_accuracies # ============================================================================ # Data Loading Functions # ============================================================================ def load_rubrics_json(rubrics_path: str) -> Dict[str, Dict[str, Any]]: """Load rubrics JSON and create lookup by id.""" with open(rubrics_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): return {item['id']: item for item in data} elif isinstance(data, dict): return data else: raise ValueError(f"Invalid rubrics JSON format: expected list or dict, got {type(data)}") def load_model_reviews_json(reviews_path: str, format_override: Optional[str] = None) -> Dict[str, Dict[str, Any]]: """ Load model reviews JSON and extract reviews by id. Supports two input formats: 1. Refined format: Contains 'scores' and 'initial_scores' fields (from refinement pipeline) 2. Original format: Contains 'model_prediction' with 'meta_review' and 'decision' (like ours.json) Args: reviews_path: Path to JSON file containing model reviews format_override: Optional format override ('refined', 'original', or None for auto-detect) Returns: Dict mapping paper_id to dict containing: - 'review': review text (markdown) - 'scores': refined scores dict (if available) - 'initial_scores': initial scores dict (if available) - 'format': 'refined' or 'original' """ with open(reviews_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): data = list(data.values()) reviews_dict = {} for item in data: item_id = None review_text = '' scores = None initial_scores = None format_type = None # Use format override if provided, otherwise auto-detect if format_override and format_override != 'auto': # Force use specified format if format_override == 'refined': item_id = item.get('paper_id') or item.get('id') if not item_id: continue format_type = 'refined' review_text = item.get('review_markdown', '') or item.get('review', '') scores = item.get('scores', {}) initial_scores = item.get('initial_scores', {}) elif format_override == 'original': item_id = item.get('id') if not item_id: continue format_type = 'original' model_prediction = item.get('model_prediction', {}) meta_review = model_prediction.get('meta_review', {}) review_text = meta_review.get('content', '') or model_prediction.get('raw_text', '') initial_scores = { 'rating': meta_review.get('rating'), 'soundness': meta_review.get('soundness'), 'presentation': meta_review.get('presentation'), 'contribution': meta_review.get('contribution'), 'decision': model_prediction.get('decision'), } else: raise ValueError(f"Unknown format_override: {format_override}. Must be 'refined', 'original', or 'auto'") else: # Auto-detect format if "paper_id" in item: # Refined format (from refinement pipeline) item_id = item.get('paper_id') if not item_id: continue # Check if this is refined format (has scores and initial_scores) if 'scores' in item and 'initial_scores' in item: format_type = 'refined' review_text = item.get('review_markdown', '') or item.get('review', '') scores = item.get('scores', {}) initial_scores = item.get('initial_scores', {}) else: # Standard format with paper_id format_type = 'standard' review_text = item.get('review_markdown', '') or item.get('review', '') elif "model_prediction" in item: # Original format (like ours.json) item_id = item.get('id') if not item_id: continue format_type = 'original' model_prediction = item.get('model_prediction', {}) meta_review = model_prediction.get('meta_review', {}) # Extract review content (prefer meta_review.content, fallback to raw_text) review_text = meta_review.get('content', '') or model_prediction.get('raw_text', '') # Extract initial scores initial_scores = { 'rating': meta_review.get('rating'), 'soundness': meta_review.get('soundness'), 'presentation': meta_review.get('presentation'), 'contribution': meta_review.get('contribution'), 'decision': model_prediction.get('decision'), } else: # Legacy format (pred_fast_mode) item_id = item.get('id') if not item_id: continue format_type = 'legacy' review_dict = item.get('pred_fast_mode', {}) if isinstance(review_dict, dict): # review_text = review_dict.get('raw_text', '') review_text = review_dict else: review_text = str(review_dict) # Extract review content from the review text field try: if review_text: extracted_review = ReviewProcessor.extract_review_content(review_text) else: extracted_review = '' reviews_dict[item_id] = { 'review': extracted_review, 'scores': scores, 'initial_scores': initial_scores, 'format': format_type } except Exception as e: print(f"[WARN] Failed to extract review for {item_id}: {e}") continue return reviews_dict def combine_rubrics_and_reviews( rubrics_data: Dict[str, Dict[str, Any]], reviews_dict: Dict[str, Dict[str, Any]] ) -> List[Dict[str, Any]]: """ Combine rubrics and reviews into evaluation entries. Args: rubrics_data: Dict mapping paper_id to rubric entry reviews_dict: Dict mapping paper_id to dict containing 'review', 'scores', 'initial_scores', 'format' Returns: List of evaluation entries with model_review, scores, initial_scores, and format info """ combined = [] missing_reviews = [] for paper_id, rubric_entry in rubrics_data.items(): review_data = reviews_dict.get(paper_id) if not review_data or not review_data.get('review'): missing_reviews.append(paper_id) continue entry = { 'id': paper_id, 'paper_context': rubric_entry.get('paper_context', ''), 'decision': rubric_entry.get('decision', ''), 'golden_review': rubric_entry.get('golden_review', ''), 'rubrics': rubric_entry.get('rubrics', []), 'model_review': review_data.get('review', ''), 'scores': review_data.get('scores'), # Refined scores (if available) 'initial_scores': review_data.get('initial_scores'), # Initial scores (if available) 'format': review_data.get('format', 'unknown') # Format type } combined.append(entry) if missing_reviews: print(f"[WARN] {len(missing_reviews)} papers have no model review, skipping them") return combined # ============================================================================ # LLM Service Configuration # ============================================================================ def load_llm_config(config_path: str) -> Dict[str, Any]: """Load LLM configuration from YAML file.""" with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) return config def create_llm_service_from_config(config: Dict[str, Any]) -> LLMService: """Create LLM service from configuration.""" mode = config.get('mode', 'gpt').lower() if mode == 'gpt': gpt_config = config.get('gpt', {}) api_key = gpt_config.get('api_key') or os.getenv('OPENAI_API_KEY') if not api_key: raise ValueError("GPT mode requires api_key in configs.yaml or OPENAI_API_KEY environment variable") service = GPTService( api_key=api_key, model_name=gpt_config.get('model_name', 'gpt-4o'), base_url=gpt_config.get('base_url'), timeout=gpt_config.get('timeout', 300) ) return service elif mode == 'vllm': vllm_config = config.get('vllm', {}) service = VLLMService( base_url=vllm_config.get('base_url', 'http://localhost:8000/v1'), api_key=vllm_config.get('api_key', 'dummy-key'), model_name=vllm_config.get('model_name'), timeout=vllm_config.get('timeout', 300), max_concurrent_requests=vllm_config.get('max_concurrent_requests', 64), max_retries=vllm_config.get('max_retries', 3), retry_delay=vllm_config.get('retry_delay', 1.0), retry_backoff=vllm_config.get('retry_backoff', 2.0) ) return service else: raise ValueError(f"Unknown mode: {mode}. Must be 'gpt' or 'vllm'") # ============================================================================ # Main Evaluation Functions # ============================================================================ def run_semantic_evaluation( evaluation_data: List[Dict[str, Any]], prompt_template: str, llm_service: LLMService, max_workers: int ) -> tuple: """Run semantic evaluation and return results and summary.""" print(f"\n{'='*80}") print("RUNNING SEMANTIC EVALUATION") print(f"{'='*80}") print(f"Evaluating {len(evaluation_data)} reviews using {max_workers} workers...") results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_entry = { executor.submit( evaluate_review_semantic, entry, entry['paper_context'], prompt_template, llm_service ): entry for entry in evaluation_data } for future in tqdm(as_completed(future_to_entry), total=len(evaluation_data), desc="Semantic evaluation"): try: result = future.result() results.append(result) except Exception as e: entry = future_to_entry[future] print(f"\n[ERROR] Failed to process entry {entry.get('id', 'unknown')}: {e}") results.append({ 'id': entry.get('id', 'unknown'), 'raw_scores': {}, 'weighted_scores': {}, 'total_score': 0.0, 'error': str(e), 'raw_response': '' }) # Calculate statistics valid_results = [r for r in results if 'error' not in r and r.get('weighted_scores')] review_scores = [r.get('total_score', 0.0) for r in valid_results] summary = { 'total_entries': len(results), 'valid_entries': len(valid_results), 'failed_entries': len(results) - len(valid_results) } if review_scores: summary['overall_score'] = { 'mean': sum(review_scores) / len(review_scores), 'min': min(review_scores), 'max': max(review_scores) } # Calculate per-rubric statistics (extract rubric titles from first entry) if evaluation_data and evaluation_data[0].get('rubrics'): rubric_titles = [r['title'] for r in evaluation_data[0]['rubrics']] per_rubric_stats = calculate_per_rubric_statistics(valid_results, rubric_titles) summary['per_rubric_statistics'] = per_rubric_stats return results, summary def run_auto_metric_evaluation( evaluation_data: List[Dict[str, Any]], strict_mode: bool = False ) -> tuple: """ Run auto-metric evaluation and return results and summary. For refined format (has scores and initial_scores), evaluates both: - Refined scores evaluation - Initial scores evaluation For original format (only initial_scores), evaluates: - Initial scores evaluation only Returns: Tuple of (results_list, summary_dict) - results_list: List of evaluation results (may contain both refined and initial results for refined format) - summary_dict: Summary statistics """ print(f"\n{'='*80}") print("RUNNING AUTO-METRIC EVALUATION") print(f"{'='*80}") print(f"Evaluating {len(evaluation_data)} entries...") # Detect format types refined_format_count = sum(1 for e in evaluation_data if e.get('format') == 'refined') original_format_count = sum(1 for e in evaluation_data if e.get('format') == 'original') if refined_format_count > 0: print(f"Detected {refined_format_count} entries in refined format (will evaluate both refined and initial scores)") if original_format_count > 0: print(f"Detected {original_format_count} entries in original format (will evaluate initial scores only)") results = [] for entry in tqdm(evaluation_data, desc="Auto-metric evaluation"): format_type = entry.get('format', 'unknown') if format_type == 'refined': # Evaluate both refined scores and initial scores try: entry_id = entry.get('id', 'unknown') # Evaluate refined scores refined_result = evaluate_review_auto_metric(entry, use_initial_scores=False, strict_mode=strict_mode) refined_result['paper_id'] = entry_id # Keep original paper_id refined_result['id'] = f"{entry_id}_refined" results.append(refined_result) # Evaluate initial scores initial_result = evaluate_review_auto_metric(entry, use_initial_scores=True, strict_mode=strict_mode) initial_result['paper_id'] = entry_id # Keep original paper_id initial_result['id'] = f"{entry_id}_initial" results.append(initial_result) except Exception as e: print(f"Error evaluating entry {entry.get('id', 'unknown')}: {e}") results.append({ 'id': entry.get('id', 'unknown'), 'error': str(e) }) else: # Evaluate initial scores only (or extract from markdown) try: result = evaluate_review_auto_metric(entry, use_initial_scores=False, strict_mode=strict_mode) results.append(result) except Exception as e: print(f"Error evaluating entry {entry.get('id', 'unknown')}: {e}") results.append({ 'id': entry.get('id', 'unknown'), 'error': str(e) }) # Calculate statistics valid_results = [r for r in results if 'error' not in r] mse_results = [r for r in valid_results if r.get('overall_error') is not None] # Separate refined and initial results for refined format refined_results = [r for r in valid_results if r.get('score_type') == 'refined'] initial_results = [r for r in valid_results if r.get('score_type') == 'initial'] auto_results = [r for r in valid_results if r.get('score_type') == 'auto' or r.get('score_type') is None] summary = { 'total_entries': len(results), 'valid_entries': len(valid_results), 'mse_entries': len(mse_results), 'refined_results_count': len(refined_results), 'initial_results_count': len(initial_results), 'auto_results_count': len(auto_results) } # Calculate MSE/MAE statistics # For refined format, only use refined results for overall statistics (avoid double counting) # For other formats, use all results if refined_format_count > 0: # Refined format: use only refined results for overall statistics stats_results = [r for r in refined_results if r.get('overall_error') is not None] else: # Original/other formats: use all results stats_results = mse_results if stats_results: dimensions = ['soundness', 'presentation', 'confidence', 'rating'] mse_stats = {} mae_stats = {} for dim in dimensions: mse_list = [r.get(f'{dim}_mse') for r in stats_results if r.get(f'{dim}_mse') is not None] mae_list = [r.get(f'{dim}_mae') for r in stats_results if r.get(f'{dim}_mae') is not None] mse_clean = [x for x in mse_list if x is not None and not (isinstance(x, float) and math.isnan(x))] mae_clean = [x for x in mae_list if x is not None and not (isinstance(x, float) and math.isnan(x))] if mse_clean: mse_stats[dim] = { 'mean': sum(mse_clean) / len(mse_clean), 'count': len(mse_clean) } if mae_clean: mae_stats[dim] = { 'mean': sum(mae_clean) / len(mae_clean), 'count': len(mae_clean) } overall_errors = [r.get('overall_error') for r in stats_results if r.get('overall_error') is not None] overall_clean = [x for x in overall_errors if x is not None and not (isinstance(x, float) and math.isnan(x))] if overall_clean: summary['overall_error'] = { 'mean': sum(overall_clean) / len(overall_clean), 'count': len(overall_clean) } summary['mse_statistics'] = mse_stats summary['mae_statistics'] = mae_stats # Calculate separate statistics for refined and initial results if refined_results: refined_mse_results = [r for r in refined_results if r.get('overall_error') is not None] if refined_mse_results: refined_mse_stats = {} refined_mae_stats = {} for dim in dimensions: mse_list = [r.get(f'{dim}_mse') for r in refined_mse_results if r.get(f'{dim}_mse') is not None] mae_list = [r.get(f'{dim}_mae') for r in refined_mse_results if r.get(f'{dim}_mae') is not None] mse_clean = [x for x in mse_list if x is not None and not (isinstance(x, float) and math.isnan(x))] mae_clean = [x for x in mae_list if x is not None and not (isinstance(x, float) and math.isnan(x))] if mse_clean: refined_mse_stats[dim] = {'mean': sum(mse_clean) / len(mse_clean), 'count': len(mse_clean)} if mae_clean: refined_mae_stats[dim] = {'mean': sum(mae_clean) / len(mae_clean), 'count': len(mae_clean)} summary['refined_mse_statistics'] = refined_mse_stats summary['refined_mae_statistics'] = refined_mae_stats if initial_results: initial_mse_results = [r for r in initial_results if r.get('overall_error') is not None] if initial_mse_results: initial_mse_stats = {} initial_mae_stats = {} for dim in dimensions: mse_list = [r.get(f'{dim}_mse') for r in initial_mse_results if r.get(f'{dim}_mse') is not None] mae_list = [r.get(f'{dim}_mae') for r in initial_mse_results if r.get(f'{dim}_mae') is not None] mse_clean = [x for x in mse_list if x is not None and not (isinstance(x, float) and math.isnan(x))] mae_clean = [x for x in mae_list if x is not None and not (isinstance(x, float) and math.isnan(x))] if mse_clean: initial_mse_stats[dim] = {'mean': sum(mse_clean) / len(mse_clean), 'count': len(mse_clean)} if mae_clean: initial_mae_stats[dim] = {'mean': sum(mae_clean) / len(mae_clean), 'count': len(mae_clean)} summary['initial_mse_statistics'] = initial_mse_stats summary['initial_mae_statistics'] = initial_mae_stats # Calculate Spearman correlations def filter_valid_pairs(true_list, pred_list): filtered_true = [] filtered_pred = [] for t, p in zip(true_list, pred_list): if (t is not None and p is not None and not (isinstance(t, float) and math.isnan(t)) and not (isinstance(p, float) and math.isnan(p))): filtered_true.append(t) filtered_pred.append(p) return filtered_true, filtered_pred # Calculate Spearman correlations # For refined format, calculate separately for refined and initial, and use refined for overall # For other formats, use all results if refined_format_count > 0: # Calculate refined spearman correlations refined_spearman_stats = {} dimensions = ['soundness', 'presentation', 'confidence', 'rating'] for dim in dimensions: true_values = [r.get(f'gt_{dim}') for r in refined_results] pred_values = [r.get(f'model_{dim}') for r in refined_results] true_clean, pred_clean = filter_valid_pairs(true_values, pred_values) if len(true_clean) >= 2 and len(pred_clean) >= 2: try: corr, _ = spearmanr(true_clean, pred_clean) if not math.isnan(corr): refined_spearman_stats[dim] = { 'correlation': corr, 'count': len(true_clean) } except Exception: pass # Calculate initial spearman correlations initial_spearman_stats = {} for dim in dimensions: true_values = [r.get(f'gt_{dim}') for r in initial_results] pred_values = [r.get(f'model_{dim}') for r in initial_results] true_clean, pred_clean = filter_valid_pairs(true_values, pred_values) if len(true_clean) >= 2 and len(pred_clean) >= 2: try: corr, _ = spearmanr(true_clean, pred_clean) if not math.isnan(corr): initial_spearman_stats[dim] = { 'correlation': corr, 'count': len(true_clean) } except Exception: pass # Use refined for overall statistics (avoid double counting) summary['spearman_correlations'] = refined_spearman_stats summary['refined_spearman_correlations'] = refined_spearman_stats summary['initial_spearman_correlations'] = initial_spearman_stats else: # Original/other formats: use all results correlation_results = valid_results spearman_stats = {} dimensions = ['soundness', 'presentation', 'confidence', 'rating'] for dim in dimensions: true_values = [r.get(f'gt_{dim}') for r in correlation_results] pred_values = [r.get(f'model_{dim}') for r in correlation_results] true_clean, pred_clean = filter_valid_pairs(true_values, pred_values) if len(true_clean) >= 2 and len(pred_clean) >= 2: try: corr, _ = spearmanr(true_clean, pred_clean) if not math.isnan(corr): spearman_stats[dim] = { 'correlation': corr, 'count': len(true_clean) } except Exception: pass summary['spearman_correlations'] = spearman_stats # Calculate Decision metrics # For refined format, calculate separately for refined and initial, and use refined for overall # For other formats, use all results if refined_format_count > 0: # Calculate refined decision metrics refined_decision_results = [r for r in refined_results if r.get('gt_decision') is not None and r.get('model_decision') is not None] if refined_decision_results: true_decisions = [] pred_decisions = [] decision_acc = [] for r in refined_decision_results: gt_decision = str(r.get('gt_decision', '')).lower().strip() pred_decision = str(r.get('model_decision', '')).lower().strip() if 'accept' in pred_decision: pred_binary = 1 else: pred_binary = 0 if 'accept' in gt_decision: gt_binary = 1 else: gt_binary = 0 true_decisions.append(gt_binary) pred_decisions.append(pred_binary) if pred_decision == gt_decision or ('accept' in pred_decision and 'accept' in gt_decision) or ('reject' in pred_decision and 'reject' in gt_decision): decision_acc.append(1.0) else: decision_acc.append(0.0) if decision_acc: decision_accuracy = sum(decision_acc) / len(decision_acc) try: _, _, f1_score, _ = precision_recall_fscore_support(true_decisions, pred_decisions, average='macro') refined_decision_metrics = { 'accuracy': decision_accuracy, 'f1_macro': f1_score, 'count': len(decision_acc) } except Exception: refined_decision_metrics = { 'accuracy': decision_accuracy, 'count': len(decision_acc) } summary['refined_decision_metrics'] = refined_decision_metrics summary['decision_metrics'] = refined_decision_metrics # Use refined for overall # Calculate initial decision metrics initial_decision_results = [r for r in initial_results if r.get('gt_decision') is not None and r.get('model_decision') is not None] if initial_decision_results: true_decisions = [] pred_decisions = [] decision_acc = [] for r in initial_decision_results: gt_decision = str(r.get('gt_decision', '')).lower().strip() pred_decision = str(r.get('model_decision', '')).lower().strip() if 'accept' in pred_decision: pred_binary = 1 else: pred_binary = 0 if 'accept' in gt_decision: gt_binary = 1 else: gt_binary = 0 true_decisions.append(gt_binary) pred_decisions.append(pred_binary) if pred_decision == gt_decision or ('accept' in pred_decision and 'accept' in gt_decision) or ('reject' in pred_decision and 'reject' in gt_decision): decision_acc.append(1.0) else: decision_acc.append(0.0) if decision_acc: decision_accuracy = sum(decision_acc) / len(decision_acc) try: _, _, f1_score, _ = precision_recall_fscore_support(true_decisions, pred_decisions, average='macro') initial_decision_metrics = { 'accuracy': decision_accuracy, 'f1_macro': f1_score, 'count': len(decision_acc) } except Exception: initial_decision_metrics = { 'accuracy': decision_accuracy, 'count': len(decision_acc) } summary['initial_decision_metrics'] = initial_decision_metrics else: # Original/other formats: use all results decision_results = [r for r in valid_results if r.get('gt_decision') is not None and r.get('model_decision') is not None] if decision_results: true_decisions = [] pred_decisions = [] decision_acc = [] for r in decision_results: gt_decision = str(r.get('gt_decision', '')).lower().strip() pred_decision = str(r.get('model_decision', '')).lower().strip() if 'accept' in pred_decision: pred_binary = 1 else: pred_binary = 0 if 'accept' in gt_decision: gt_binary = 1 else: gt_binary = 0 true_decisions.append(gt_binary) pred_decisions.append(pred_binary) if pred_decision == gt_decision or ('accept' in pred_decision and 'accept' in gt_decision) or ('reject' in pred_decision and 'reject' in gt_decision): decision_acc.append(1.0) else: decision_acc.append(0.0) if decision_acc: decision_accuracy = sum(decision_acc) / len(decision_acc) try: _, _, f1_score, _ = precision_recall_fscore_support(true_decisions, pred_decisions, average='macro') summary['decision_metrics'] = { 'accuracy': decision_accuracy, 'f1_macro': f1_score, 'count': len(decision_acc) } except Exception: summary['decision_metrics'] = { 'accuracy': decision_accuracy, 'count': len(decision_acc) } # Calculate Pairwise comparison # For refined format, only use refined results (avoid double counting) # For other formats, use all results if refined_format_count > 0: pairwise_results = refined_results else: pairwise_results = valid_results paper_scores = [] for r in pairwise_results: if (r.get('gt_rating') is not None and r.get('model_rating') is not None) or \ (r.get('gt_soundness') is not None and r.get('model_soundness') is not None): paper_scores.append({ 'true_rating': r.get('gt_rating'), 'pred_rating': r.get('model_rating'), 'true_soundness': r.get('gt_soundness'), 'pred_soundness': r.get('model_soundness'), 'true_presentation': r.get('gt_presentation'), 'pred_presentation': r.get('model_presentation'), 'true_confidence': r.get('gt_confidence'), 'pred_confidence': r.get('model_confidence') }) if len(paper_scores) >= 2: pairwise_accuracies = calculate_pairwise_accuracies(paper_scores) summary['pairwise_accuracies'] = pairwise_accuracies return results, summary # ============================================================================ # Main Function # ============================================================================ def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Unified evaluation script for semantic and auto-metric evaluation") # Input paths parser.add_argument("--rubrics_path", type=str, required=True, help="Path to eval_rubrics.json file (from 1_generate_review_based_rubrics.py)") parser.add_argument("--reviews_path", type=str, required=True, help="Path to JSON file with model reviews (contains pred_fast_mode)") # Evaluation mode parser.add_argument("--mode", type=str, choices=["semantic", "auto_metric", "both"], default="both", help="Evaluation mode: semantic (LLM-based), auto_metric (rule-based), or both") # Output paths parser.add_argument("--semantic_output", type=str, default=None, help="Path to output JSON file for semantic evaluation results (required if mode is semantic or both)") parser.add_argument("--auto_metric_output", type=str, default=None, help="Path to output JSON file for auto-metric evaluation results (required if mode is auto_metric or both)") # Semantic evaluation settings parser.add_argument("--yaml_path", type=str, default=None, help="Path to prompts.yaml file (required for semantic evaluation)") parser.add_argument("--config_path", type=str, default=None, help="Path to configs.yaml file (required for semantic evaluation)") # Multi-threading parser.add_argument("--max_workers", type=int, default=None, help="Maximum number of worker threads for semantic evaluation (default: 5)") # Strict mode (normalize scores to discrete scales) parser.add_argument("--strict_mode", action="store_true", default=False, help="Enable strict mode: normalize scores to discrete scales before computing metrics (default: False)") # Input format override parser.add_argument("--input_format", type=str, choices=['auto', 'refined', 'original'], default='auto', help="Manually specify input JSON format: 'refined' (has scores and initial_scores), 'original' (has model_prediction), or 'auto' for auto-detection (default: 'auto')") return parser.parse_args() def main(): """Main execution function.""" args = parse_args() script_dir = os.path.dirname(os.path.abspath(__file__)) # Resolve paths rubrics_path = args.rubrics_path if not os.path.isabs(rubrics_path): rubrics_path = os.path.join(script_dir, rubrics_path) reviews_path = args.reviews_path if not os.path.isabs(reviews_path): reviews_path = os.path.join(script_dir, reviews_path) max_workers = args.max_workers or int(os.getenv("MAX_WORKERS", "5")) # Validate mode and output paths if args.mode in ["semantic", "both"]: if not args.semantic_output: raise ValueError("--semantic_output is required when mode is 'semantic' or 'both'") if not args.yaml_path: raise ValueError("--yaml_path is required for semantic evaluation") if not args.config_path: raise ValueError("--config_path is required for semantic evaluation") if args.mode in ["auto_metric", "both"]: if not args.auto_metric_output: raise ValueError("--auto_metric_output is required when mode is 'auto_metric' or 'both'") # Check if files exist if not os.path.exists(rubrics_path): raise FileNotFoundError(f"Rubrics file not found: {rubrics_path}") if not os.path.exists(reviews_path): raise FileNotFoundError(f"Reviews file not found: {reviews_path}") # Load data print(f"Loading rubrics from {rubrics_path}...") rubrics_data = load_rubrics_json(rubrics_path) print(f"Loaded {len(rubrics_data)} rubrics entries") print(f"Loading model reviews from {reviews_path}...") if args.input_format != 'auto': print(f"Using manually specified format: {args.input_format}") else: print("Auto-detecting input format...") reviews_dict = load_model_reviews_json(reviews_path, format_override=args.input_format if args.input_format != 'auto' else None) print(f"Loaded {len(reviews_dict)} model reviews") # Combine rubrics and reviews print("Combining rubrics and reviews...") evaluation_data = combine_rubrics_and_reviews(rubrics_data, reviews_dict) print(f"Prepared {len(evaluation_data)} entries for evaluation") # Run evaluations based on mode if args.mode in ["semantic", "both"]: # Resolve semantic evaluation paths yaml_path = args.yaml_path if not os.path.isabs(yaml_path): yaml_path = os.path.join(script_dir, yaml_path) config_path = args.config_path if not os.path.isabs(config_path): config_path = os.path.join(script_dir, config_path) if not os.path.exists(yaml_path): raise FileNotFoundError(f"YAML file not found: {yaml_path}") if not os.path.exists(config_path): raise FileNotFoundError(f"Config file not found: {config_path}") # Load prompt template print(f"Loading prompt template from {yaml_path}...") prompt_template = load_prompt_template(yaml_path) if not prompt_template: raise ValueError("Could not find 'v1_evaluator_prompt' in YAML file") # Initialize LLM service print(f"Loading LLM configuration from {config_path}...") llm_config = load_llm_config(config_path) llm_service = create_llm_service_from_config(llm_config) mode = llm_config.get('mode', 'gpt') print(f"LLM service initialized (mode: {mode})") if hasattr(llm_service, 'model_name'): print(f"Using model: {llm_service.model_name}") # Run semantic evaluation semantic_results, semantic_summary = run_semantic_evaluation( evaluation_data, prompt_template, llm_service, max_workers ) # Save semantic results semantic_output = args.semantic_output if not os.path.isabs(semantic_output): semantic_output = os.path.join(script_dir, semantic_output) output_dir = os.path.dirname(semantic_output) os.makedirs(output_dir, exist_ok=True) with open(semantic_output, 'w', encoding='utf-8') as f: json.dump(semantic_results, f, ensure_ascii=False, indent=2) print(f"\nSemantic evaluation results saved to {semantic_output}") # Save semantic summary semantic_summary_path = semantic_output.replace('.json', '_summary.json') with open(semantic_summary_path, 'w', encoding='utf-8') as f: json.dump(semantic_summary, f, ensure_ascii=False, indent=2) print(f"Semantic evaluation summary saved to {semantic_summary_path}") # Print semantic summary print("\n" + "="*80) print("SEMANTIC EVALUATION SUMMARY") print("="*80) print(f"Total entries: {semantic_summary['total_entries']}") print(f"Valid entries: {semantic_summary['valid_entries']}") print(f"Failed entries: {semantic_summary['failed_entries']}") if 'overall_score' in semantic_summary: score = semantic_summary['overall_score'] print(f"\nOverall Score:") print(f" Mean: {score['mean']:.2f}") print(f" Min: {score['min']:.2f}") print(f" Max: {score['max']:.2f}") if args.mode in ["auto_metric", "both"]: # Run auto-metric evaluation auto_metric_results, auto_metric_summary = run_auto_metric_evaluation( evaluation_data, strict_mode=args.strict_mode ) # Save auto-metric results auto_metric_output = args.auto_metric_output if not os.path.isabs(auto_metric_output): auto_metric_output = os.path.join(script_dir, auto_metric_output) output_dir = os.path.dirname(auto_metric_output) os.makedirs(output_dir, exist_ok=True) with open(auto_metric_output, 'w', encoding='utf-8') as f: json.dump(auto_metric_results, f, ensure_ascii=False, indent=2) print(f"\nAuto-metric evaluation results saved to {auto_metric_output}") # Save auto-metric summary auto_metric_summary_path = auto_metric_output.replace('.json', '_summary.json') with open(auto_metric_summary_path, 'w', encoding='utf-8') as f: json.dump(auto_metric_summary, f, ensure_ascii=False, indent=2) print(f"Auto-metric evaluation summary saved to {auto_metric_summary_path}") # Print auto-metric summary print("\n" + "="*80) print("AUTO-METRIC EVALUATION SUMMARY") print("="*80) print(f"Total entries: {auto_metric_summary['total_entries']}") print(f"Valid entries: {auto_metric_summary['valid_entries']}") print(f"MSE entries: {auto_metric_summary['mse_entries']}") if 'mse_statistics' in auto_metric_summary: print("\nMSE Statistics:") for dim, stats in auto_metric_summary['mse_statistics'].items(): print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") if 'mae_statistics' in auto_metric_summary: print("\nMAE Statistics:") for dim, stats in auto_metric_summary['mae_statistics'].items(): print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") # Print refined and initial statistics if available if 'refined_mse_statistics' in auto_metric_summary: print("\nRefined Scores - MSE Statistics:") for dim, stats in auto_metric_summary['refined_mse_statistics'].items(): print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") if 'refined_mae_statistics' in auto_metric_summary: print("\nRefined Scores - MAE Statistics:") for dim, stats in auto_metric_summary['refined_mae_statistics'].items(): print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") if 'initial_mse_statistics' in auto_metric_summary: print("\nInitial Scores - MSE Statistics:") for dim, stats in auto_metric_summary['initial_mse_statistics'].items(): print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") if 'initial_mae_statistics' in auto_metric_summary: print("\nInitial Scores - MAE Statistics:") for dim, stats in auto_metric_summary['initial_mae_statistics'].items(): print(f" {dim.capitalize()}: Mean={stats['mean']:.4f}, Count={stats['count']}") if 'spearman_correlations' in auto_metric_summary: print("\nSpearman Correlations:") for dim, stats in auto_metric_summary['spearman_correlations'].items(): print(f" {dim.capitalize()}: {stats['correlation']:.4f} (n={stats['count']})") # Print refined and initial spearman correlations if available if 'refined_spearman_correlations' in auto_metric_summary: print("\nRefined Scores - Spearman Correlations:") for dim, stats in auto_metric_summary['refined_spearman_correlations'].items(): print(f" {dim.capitalize()}: {stats['correlation']:.4f} (n={stats['count']})") if 'initial_spearman_correlations' in auto_metric_summary: print("\nInitial Scores - Spearman Correlations:") for dim, stats in auto_metric_summary['initial_spearman_correlations'].items(): print(f" {dim.capitalize()}: {stats['correlation']:.4f} (n={stats['count']})") if 'decision_metrics' in auto_metric_summary: dm = auto_metric_summary['decision_metrics'] print(f"\nDecision Metrics:") print(f" Accuracy: {dm['accuracy']:.4f} (n={dm['count']})") if 'f1_macro' in dm: print(f" F1 (macro): {dm['f1_macro']:.4f}") # Print refined and initial decision metrics if available if 'refined_decision_metrics' in auto_metric_summary: print("\nRefined Scores - Decision Metrics:") rdm = auto_metric_summary['refined_decision_metrics'] print(f" Accuracy: {rdm['accuracy']:.4f} (n={rdm['count']})") if 'f1_macro' in rdm: print(f" F1 (macro): {rdm['f1_macro']:.4f}") if 'initial_decision_metrics' in auto_metric_summary: print("\nInitial Scores - Decision Metrics:") idm = auto_metric_summary['initial_decision_metrics'] print(f" Accuracy: {idm['accuracy']:.4f} (n={idm['count']})") if 'f1_macro' in idm: print(f" F1 (macro): {idm['f1_macro']:.4f}") print("\n" + "="*80) print("EVALUATION COMPLETE") print("="*80) if __name__ == "__main__": main()