Spaces:
Running
Running
| """ | |
| Model Evaluator for Summarization Models | |
| Evaluates individual models and compares their performance | |
| """ | |
| import sys | |
| from pathlib import Path | |
| project_root = Path(__file__).parent.parent | |
| if str(project_root) not in sys.path: | |
| sys.path.insert(0, str(project_root)) | |
| import time | |
| import json | |
| import pandas as pd | |
| from typing import Dict, List, Any | |
| import logging | |
| from rouge_score import rouge_scorer | |
| from models.textrank import TextRankSummarizer | |
| from models.bart import BARTSummarizer | |
| from models.pegasus import PEGASUSSummarizer | |
| logger = logging.getLogger(__name__) | |
| class ModelEvaluator: | |
| """Evaluate summarization models on CNN/DailyMail dataset""" | |
| def __init__(self): | |
| self.models = {} | |
| self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) | |
| self.results = {} | |
| def initialize_models(self): | |
| """Initialize all summarization models""" | |
| logger.info("Initializing models...") | |
| try: | |
| self.models['textrank'] = TextRankSummarizer() | |
| logger.info("TextRank model initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize TextRank: {e}") | |
| try: | |
| self.models['bart'] = BARTSummarizer(device='cpu') | |
| logger.info("BART model initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize BART: {e}") | |
| try: | |
| self.models['pegasus'] = PEGASUSSummarizer(device='cpu') | |
| logger.info("PEGASUS model initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize PEGASUS: {e}") | |
| def evaluate_single_model(self, model_name: str, data: List[Dict], max_samples: int = None) -> Dict: | |
| """Evaluate a single model on the dataset""" | |
| if model_name not in self.models: | |
| raise ValueError(f"Model {model_name} not initialized") | |
| model = self.models[model_name] | |
| results = { | |
| 'model': model_name, | |
| 'total_samples': len(data), | |
| 'processed_samples': 0, | |
| 'rouge_scores': {'rouge1': [], 'rouge2': [], 'rougeL': []}, | |
| 'processing_times': [], | |
| 'summaries': [], | |
| 'errors': 0 | |
| } | |
| if max_samples: | |
| data = data[:max_samples] | |
| logger.info(f"Evaluating {model_name} on {len(data)} samples") | |
| for i, item in enumerate(data): | |
| try: | |
| start_time = time.time() | |
| # Generate summary | |
| if model_name == 'textrank': | |
| # Calculate appropriate number of sentences | |
| sentences = item['article'].count('.') + item['article'].count('!') + item['article'].count('?') | |
| num_sentences = max(2, int(sentences * 0.3)) | |
| summary = model.summarize(item['article'], num_sentences=num_sentences) | |
| else: | |
| # For BART and PEGASUS | |
| input_words = len(item['article'].split()) | |
| target_length = max(30, min(150, int(input_words * 0.22))) | |
| summary = model.summarize( | |
| item['article'], | |
| max_length=target_length, | |
| min_length=max(20, int(target_length * 0.5)) | |
| ) | |
| processing_time = time.time() - start_time | |
| # Calculate ROUGE scores | |
| rouge_scores = self.rouge_scorer.score(item['highlights'], summary) | |
| # Store results | |
| results['rouge_scores']['rouge1'].append(rouge_scores['rouge1'].fmeasure) | |
| results['rouge_scores']['rouge2'].append(rouge_scores['rouge2'].fmeasure) | |
| results['rouge_scores']['rougeL'].append(rouge_scores['rougeL'].fmeasure) | |
| results['processing_times'].append(processing_time) | |
| results['summaries'].append({ | |
| 'id': item.get('id', i), | |
| 'original': item['article'][:200] + '...', | |
| 'reference': item['highlights'], | |
| 'generated': summary, | |
| 'rouge1': rouge_scores['rouge1'].fmeasure, | |
| 'rouge2': rouge_scores['rouge2'].fmeasure, | |
| 'rougeL': rouge_scores['rougeL'].fmeasure, | |
| 'processing_time': processing_time | |
| }) | |
| results['processed_samples'] += 1 | |
| if (i + 1) % 10 == 0: | |
| logger.info(f"{model_name}: Processed {i + 1}/{len(data)} samples") | |
| except Exception as e: | |
| logger.error(f"Error processing sample {i} with {model_name}: {e}") | |
| results['errors'] += 1 | |
| # Calculate average scores | |
| results['avg_rouge1'] = sum(results['rouge_scores']['rouge1']) / len(results['rouge_scores']['rouge1']) if results['rouge_scores']['rouge1'] else 0 | |
| results['avg_rouge2'] = sum(results['rouge_scores']['rouge2']) / len(results['rouge_scores']['rouge2']) if results['rouge_scores']['rouge2'] else 0 | |
| results['avg_rougeL'] = sum(results['rouge_scores']['rougeL']) / len(results['rouge_scores']['rougeL']) if results['rouge_scores']['rougeL'] else 0 | |
| results['avg_processing_time'] = sum(results['processing_times']) / len(results['processing_times']) if results['processing_times'] else 0 | |
| logger.info(f"{model_name} evaluation complete:") | |
| logger.info(f" ROUGE-1: {results['avg_rouge1']:.4f}") | |
| logger.info(f" ROUGE-2: {results['avg_rouge2']:.4f}") | |
| logger.info(f" ROUGE-L: {results['avg_rougeL']:.4f}") | |
| logger.info(f" Avg Time: {results['avg_processing_time']:.4f}s") | |
| return results | |
| def evaluate_all_models(self, data: List[Dict], max_samples: int = None) -> Dict: | |
| """Evaluate all models on the same dataset""" | |
| if not self.models: | |
| self.initialize_models() | |
| all_results = {} | |
| for model_name in self.models.keys(): | |
| logger.info(f"Starting evaluation for {model_name}") | |
| all_results[model_name] = self.evaluate_single_model(model_name, data, max_samples) | |
| return all_results | |
| def compare_models(self, results: Dict) -> pd.DataFrame: | |
| """Create comparison table of model performance""" | |
| comparison_data = [] | |
| for model_name, result in results.items(): | |
| comparison_data.append({ | |
| 'Model': model_name.upper(), | |
| 'ROUGE-1': f"{result['avg_rouge1']:.4f}", | |
| 'ROUGE-2': f"{result['avg_rouge2']:.4f}", | |
| 'ROUGE-L': f"{result['avg_rougeL']:.4f}", | |
| 'Avg Time (s)': f"{result['avg_processing_time']:.4f}", | |
| 'Samples': result['processed_samples'], | |
| 'Errors': result['errors'] | |
| }) | |
| df = pd.DataFrame(comparison_data) | |
| return df | |
| def save_results(self, results: Dict, filename: str): | |
| """Save evaluation results to JSON file""" | |
| # Convert numpy types to native Python types for JSON serialization | |
| serializable_results = {} | |
| for model_name, result in results.items(): | |
| serializable_results[model_name] = { | |
| 'model': result['model'], | |
| 'total_samples': result['total_samples'], | |
| 'processed_samples': result['processed_samples'], | |
| 'errors': result['errors'], | |
| 'avg_rouge1': float(result['avg_rouge1']), | |
| 'avg_rouge2': float(result['avg_rouge2']), | |
| 'avg_rougeL': float(result['avg_rougeL']), | |
| 'avg_processing_time': float(result['avg_processing_time']), | |
| 'summaries': result['summaries'][:10] # Save only first 10 for space | |
| } | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(serializable_results, f, indent=2, ensure_ascii=False) | |
| logger.info(f"Results saved to {filename}") | |
| def evaluate_by_topic(self, categorized_data: Dict[str, List[Dict]], max_samples_per_topic: int = 20) -> Dict: | |
| """Evaluate models on different topic categories""" | |
| topic_results = {} | |
| for topic, data in categorized_data.items(): | |
| if not data: | |
| continue | |
| logger.info(f"Evaluating topic: {topic} ({len(data)} samples)") | |
| topic_results[topic] = self.evaluate_all_models(data, max_samples_per_topic) | |
| return topic_results | |
| if __name__ == "__main__": | |
| # Example usage | |
| from evaluation.dataset_loader import CNNDailyMailLoader | |
| # Load data | |
| loader = CNNDailyMailLoader() | |
| eval_data = loader.create_evaluation_subset(size=50) | |
| # Initialize evaluator | |
| evaluator = ModelEvaluator() | |
| evaluator.initialize_models() | |
| # Run evaluation | |
| results = evaluator.evaluate_all_models(eval_data, max_samples=20) | |
| # Create comparison | |
| comparison_df = evaluator.compare_models(results) | |
| print("\nModel Comparison:") | |
| print(comparison_df.to_string(index=False)) | |
| # Save results | |
| evaluator.save_results(results, "evaluation_results.json") |