Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Deterministic Evaluation Controls | |
| Provides utilities for ensuring reproducible and deterministic evaluation results | |
| across different runs and environments. | |
| """ | |
| import logging | |
| import os | |
| import random | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| # Default seed for reproducible evaluations | |
| DEFAULT_EVALUATION_SEED = 42 | |
| class DeterministicConfig: | |
| """Configuration for deterministic evaluation settings.""" | |
| # Random seed for reproducibility | |
| random_seed: int = DEFAULT_EVALUATION_SEED | |
| # Sort results for consistent ordering | |
| sort_results: bool = True | |
| # Use fixed precision for floating point comparisons | |
| float_precision: int = 6 | |
| # Consistent evaluation order | |
| consistent_order: bool = True | |
| # Additional deterministic flags | |
| deterministic_mode: bool = True | |
| # Environment variables to set for reproducibility | |
| env_vars: Dict[str, str] = field( | |
| default_factory=lambda: { | |
| "PYTHONHASHSEED": "0", | |
| "CUBLAS_WORKSPACE_CONFIG": ":4096:8", | |
| } | |
| ) | |
| class DeterministicEvaluator: | |
| """ | |
| Wrapper that ensures deterministic evaluation behavior. | |
| Provides: | |
| - Fixed random seeds | |
| - Consistent result ordering | |
| - Reproducible floating point precision | |
| - Environment variable controls | |
| """ | |
| def __init__(self, config: Optional[DeterministicConfig] = None): | |
| """Initialize deterministic evaluator with configuration.""" | |
| self.config = config or DeterministicConfig() | |
| self._setup_deterministic_environment() | |
| def _setup_deterministic_environment(self) -> None: | |
| """Configure environment for deterministic behavior.""" | |
| # Set random seed | |
| random.seed(self.config.random_seed) | |
| # Set environment variables for reproducibility | |
| for key, value in self.config.env_vars.items(): | |
| os.environ[key] = value | |
| # Try to set numpy seed if available | |
| try: | |
| import numpy as np | |
| np.random.seed(self.config.random_seed) | |
| except ImportError: | |
| logger.warning("numpy is not installed; deterministic seeding for numpy will be skipped.") | |
| # Try to set torch seed if available | |
| try: | |
| import torch | |
| torch.manual_seed(self.config.random_seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed(self.config.random_seed) | |
| torch.cuda.manual_seed_all(self.config.random_seed) | |
| # Enable deterministic algorithms in PyTorch | |
| torch.use_deterministic_algorithms(True, warn_only=True) | |
| except ImportError: | |
| # torch is optional; skip deterministic seeding if not installed | |
| pass | |
| logger.info(f"Deterministic environment configured with seed: {self.config.random_seed}") | |
| def normalize_float(self, value: float) -> float: | |
| """Normalize floating point value to consistent precision.""" | |
| return round(value, self.config.float_precision) | |
| def normalize_metrics(self, metrics: Dict[str, Any]) -> Dict[str, Any]: | |
| """Normalize metric values for consistent precision.""" | |
| normalized = {} | |
| for key, value in metrics.items(): | |
| if isinstance(value, float): | |
| normalized[key] = self.normalize_float(value) | |
| elif isinstance(value, dict): | |
| normalized[key] = self.normalize_metrics(value) | |
| else: | |
| normalized[key] = value | |
| return normalized | |
| def sort_evaluation_results( | |
| self, results: List[Dict[str, Any]], sort_key: str = "query_id" | |
| ) -> List[Dict[str, Any]]: | |
| """Sort evaluation results for consistent ordering.""" | |
| if not self.config.sort_results: | |
| return results | |
| try: | |
| return sorted(results, key=lambda x: x.get(sort_key, "")) | |
| except (KeyError, TypeError): | |
| # Fallback to string representation if sort key issues | |
| return sorted(results, key=str) | |
| def ensure_deterministic_order(self, items: List[Any], key_func=None) -> List[Any]: | |
| """Ensure consistent ordering of items.""" | |
| if not self.config.consistent_order: | |
| return items | |
| if key_func: | |
| return sorted(items, key=key_func) | |
| # Try natural sorting, fall back to string representation | |
| try: | |
| return sorted(items) | |
| except TypeError: | |
| return sorted(items, key=str) | |
| def create_deterministic_groundedness_evaluator(seed: Optional[int] = None) -> DeterministicEvaluator: | |
| """Create a deterministic evaluator specifically configured for groundedness evaluation.""" | |
| config = DeterministicConfig( | |
| random_seed=seed or DEFAULT_EVALUATION_SEED, | |
| sort_results=True, | |
| float_precision=4, # Slightly lower precision for groundedness scores | |
| consistent_order=True, | |
| deterministic_mode=True, | |
| ) | |
| return DeterministicEvaluator(config) | |
| def evaluate_groundedness_deterministic( | |
| generated_text: str, source_passages: List[str], evaluator: Optional[DeterministicEvaluator] = None | |
| ) -> Dict[str, float]: | |
| """ | |
| Evaluate groundedness with deterministic behavior. | |
| Uses token overlap and passage-level matching with consistent ordering | |
| and normalized precision. | |
| """ | |
| if evaluator is None: | |
| evaluator = create_deterministic_groundedness_evaluator() | |
| if not generated_text.strip() or not source_passages: | |
| return evaluator.normalize_metrics( | |
| {"groundedness_score": 0.0, "passage_coverage": 0.0, "token_overlap": 0.0, "exact_matches": 0.0} | |
| ) | |
| # Normalize inputs for consistent processing | |
| generated_tokens = set(generated_text.lower().split()) | |
| # Process passages in consistent order | |
| sorted_passages = evaluator.ensure_deterministic_order(source_passages) | |
| # Calculate passage-level scores | |
| passage_scores = [] | |
| total_coverage = 0 | |
| exact_matches = 0 | |
| for passage in sorted_passages: | |
| if not passage.strip(): | |
| continue | |
| passage_tokens = set(passage.lower().split()) | |
| # Token overlap for this passage | |
| if passage_tokens: | |
| overlap = len(generated_tokens & passage_tokens) / len(passage_tokens) | |
| passage_scores.append(overlap) | |
| # Check for exact phrase matches (deterministic substring matching) | |
| passage_lower = passage.lower() | |
| generated_lower = generated_text.lower() | |
| # Count exact matches using consistent methodology | |
| exact_phrases = [] | |
| words = generated_lower.split() | |
| for i in range(len(words)): | |
| for j in range(i + 2, min(i + 8, len(words) + 1)): # 2-7 word phrases | |
| phrase = " ".join(words[i:j]) | |
| if phrase in passage_lower and phrase not in exact_phrases: | |
| exact_phrases.append(phrase) | |
| if exact_phrases: | |
| exact_matches += 1 | |
| total_coverage += overlap | |
| # Calculate aggregate scores with normalization | |
| if passage_scores: | |
| groundedness_score = sum(passage_scores) / len(passage_scores) | |
| passage_coverage = total_coverage / len(sorted_passages) | |
| else: | |
| groundedness_score = 0.0 | |
| passage_coverage = 0.0 | |
| # Overall token overlap across all passages | |
| all_source_tokens = set() | |
| for passage in sorted_passages: | |
| all_source_tokens.update(passage.lower().split()) | |
| if all_source_tokens: | |
| token_overlap = len(generated_tokens & all_source_tokens) / len(all_source_tokens) | |
| else: | |
| token_overlap = 0.0 | |
| exact_match_rate = exact_matches / len(sorted_passages) if sorted_passages else 0.0 | |
| metrics = { | |
| "groundedness_score": groundedness_score, | |
| "passage_coverage": passage_coverage, | |
| "token_overlap": token_overlap, | |
| "exact_matches": exact_match_rate, | |
| } | |
| return evaluator.normalize_metrics(metrics) | |
| def evaluate_citation_accuracy_deterministic( | |
| generated_text: str, | |
| returned_sources: List[Dict[str, Any]], | |
| expected_sources: List[str], | |
| evaluator: Optional[DeterministicEvaluator] = None, | |
| ) -> Dict[str, float]: | |
| """ | |
| Evaluate citation accuracy with deterministic behavior. | |
| Provides consistent filename matching and source validation. | |
| """ | |
| if evaluator is None: | |
| evaluator = create_deterministic_groundedness_evaluator() | |
| if not expected_sources: | |
| # If no expected sources, score based on whether any sources were returned | |
| return evaluator.normalize_metrics( | |
| { | |
| "citation_accuracy": 1.0 if not returned_sources else 0.0, | |
| "source_precision": 1.0, | |
| "source_recall": 1.0, | |
| "exact_filename_matches": 1.0, | |
| } | |
| ) | |
| # Normalize filenames for consistent matching | |
| def normalize_filename(filename: str) -> str: | |
| """Normalize filename for consistent comparison.""" | |
| if not filename: | |
| return "" | |
| import os | |
| import re | |
| # Remove query parameters and fragments | |
| filename = re.sub(r"[?#].*$", "", filename.strip()) | |
| # Get basename | |
| basename = os.path.basename(filename) | |
| # Remove common extensions consistently | |
| basename = re.sub( | |
| r"\.(md|markdown|txt|html|htm|pdf|csv|json|yaml|yml|py|ipynb)$", "", basename, flags=re.IGNORECASE | |
| ) | |
| return basename.lower() | |
| # Extract returned filenames in consistent order | |
| returned_filenames = set() | |
| sorted_sources = evaluator.ensure_deterministic_order(returned_sources, key_func=str) | |
| for source in sorted_sources: | |
| if isinstance(source, dict): | |
| candidates = [source.get(k) for k in ["filename", "source_file", "file", "url", "path", "source"]] | |
| # Check metadata | |
| metadata = source.get("metadata", {}) | |
| if isinstance(metadata, dict): | |
| candidates.extend([metadata.get(k) for k in ["filename", "file", "source_file"]]) | |
| else: | |
| candidates = [str(source)] | |
| for candidate in candidates: | |
| if candidate: | |
| normalized = normalize_filename(str(candidate)) | |
| if normalized: | |
| returned_filenames.add(normalized) | |
| # Normalize expected sources | |
| expected_normalized = set() | |
| sorted_expected = evaluator.ensure_deterministic_order(expected_sources) | |
| for expected in sorted_expected: | |
| normalized = normalize_filename(str(expected)) | |
| if normalized: | |
| expected_normalized.add(normalized) | |
| # Calculate matches with consistent methodology | |
| exact_matches = len(expected_normalized & returned_filenames) | |
| # Calculate precision and recall | |
| if returned_filenames: | |
| precision = exact_matches / len(returned_filenames) | |
| else: | |
| precision = 1.0 if not expected_normalized else 0.0 | |
| if expected_normalized: | |
| recall = exact_matches / len(expected_normalized) | |
| else: | |
| recall = 1.0 | |
| # Overall citation accuracy (F1-like score) | |
| if precision + recall > 0: | |
| citation_accuracy = 2 * (precision * recall) / (precision + recall) | |
| else: | |
| citation_accuracy = 0.0 | |
| exact_filename_match_rate = recall # Same as recall for exact matches | |
| metrics = { | |
| "citation_accuracy": citation_accuracy, | |
| "source_precision": precision, | |
| "source_recall": recall, | |
| "exact_filename_matches": exact_filename_match_rate, | |
| } | |
| return evaluator.normalize_metrics(metrics) | |
| # Utility functions for integration | |
| def setup_deterministic_evaluation(seed: Optional[int] = None) -> DeterministicEvaluator: | |
| """Setup deterministic evaluation environment.""" | |
| return create_deterministic_groundedness_evaluator(seed) | |
| def get_evaluation_seed() -> int: | |
| """Get the evaluation seed from environment or use default.""" | |
| try: | |
| return int(os.getenv("EVALUATION_SEED", DEFAULT_EVALUATION_SEED)) | |
| except (ValueError, TypeError): | |
| return DEFAULT_EVALUATION_SEED | |