| | """ |
| | Data loading utilities for bias evaluation framework. |
| | |
| | This module handles all file I/O operations with proper error handling and validation. |
| | Supports both legacy 4-field format and full AI BRIDGE 29-field schema. |
| | Includes automatic lexicon validation on load. |
| | """ |
| | import csv |
| | import json |
| | from pathlib import Path |
| | from typing import List, Dict, Any, Optional |
| |
|
| | from .models import ( |
| | GroundTruthSample, Language, BiasCategory, BiasLabel, |
| | StereotypeCategory, TargetGender, Explicitness, Sentiment, |
| | SafetyFlag, QAStatus |
| | ) |
| | from .lexicon_validator import ( |
| | LexiconValidator, ValidationReport, LexiconValidationError, |
| | validate_lexicon_on_load |
| | ) |
| | from config import lexicon_filename, ground_truth_filename |
| |
|
| |
|
| | class DataLoadError(Exception): |
| | """Custom exception for data loading errors.""" |
| | pass |
| |
|
| |
|
| | class GroundTruthLoader: |
| | """Handles loading and validation of ground truth datasets.""" |
| | |
| | def __init__(self, data_dir: Path = Path("eval")): |
| | """ |
| | Initialize the ground truth loader. |
| | |
| | Args: |
| | data_dir: Directory containing ground truth files |
| | """ |
| | self.data_dir = data_dir |
| | |
| | def load_ground_truth(self, language: Language) -> List[GroundTruthSample]: |
| | """ |
| | Load ground truth samples for a specific language. |
| | |
| | Args: |
| | language: Language to load ground truth for |
| | |
| | Returns: |
| | List of validated ground truth samples |
| | |
| | Raises: |
| | DataLoadError: If file cannot be loaded or data is invalid |
| | """ |
| | file_path = self._get_ground_truth_path(language) |
| | |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | reader = csv.DictReader(f) |
| | samples = [] |
| | |
| | for row_num, row in enumerate(reader, start=2): |
| | try: |
| | sample = self._parse_ground_truth_row(row) |
| | samples.append(sample) |
| | except Exception as e: |
| | raise DataLoadError( |
| | f"Invalid data in {file_path} at row {row_num}: {e}" |
| | ) from e |
| | |
| | return samples |
| | |
| | except FileNotFoundError: |
| | raise DataLoadError(f"Ground truth file not found: {file_path}") |
| | except Exception as e: |
| | raise DataLoadError(f"Failed to load ground truth from {file_path}: {e}") from e |
| | |
| | def _get_ground_truth_path(self, language: Language) -> Path: |
| | """Get the file path for ground truth data.""" |
| | filename = ground_truth_filename(language.value) |
| | return self.data_dir / filename |
| | |
| | def _parse_ground_truth_row(self, row: Dict[str, str]) -> GroundTruthSample: |
| | """ |
| | Parse a single CSV row into a GroundTruthSample. |
| | |
| | Supports both legacy 4-field format and full AI BRIDGE schema. |
| | """ |
| | |
| | text = row['text'].strip('"') |
| | has_bias = row['has_bias'].lower() == 'true' |
| | bias_category = BiasCategory(row['bias_category']) |
| | expected_correction = row.get('expected_correction', '') |
| |
|
| | |
| | is_extended = 'target_gender' in row or 'bias_label' in row |
| |
|
| | if is_extended: |
| | return GroundTruthSample( |
| | text=text, |
| | has_bias=has_bias, |
| | bias_category=bias_category, |
| | expected_correction=expected_correction, |
| | |
| | id=row.get('id'), |
| | language=row.get('language'), |
| | script=row.get('script'), |
| | country=row.get('country'), |
| | region_dialect=row.get('region_dialect'), |
| | source_type=row.get('source_type'), |
| | source_ref=row.get('source_ref'), |
| | collection_date=row.get('collection_date'), |
| | translation=row.get('translation'), |
| | domain=row.get('domain'), |
| | topic=row.get('topic'), |
| | theme=row.get('theme'), |
| | sensitive_characteristic=row.get('sensitive_characteristic'), |
| | |
| | target_gender=self._parse_enum(row.get('target_gender'), TargetGender), |
| | bias_label=self._parse_enum(row.get('bias_label'), BiasLabel), |
| | stereotype_category=self._parse_enum(row.get('stereotype_category'), StereotypeCategory), |
| | explicitness=self._parse_enum(row.get('explicitness'), Explicitness), |
| | bias_severity=self._parse_int(row.get('bias_severity')), |
| | sentiment_toward_referent=self._parse_enum(row.get('sentiment_toward_referent'), Sentiment), |
| | device=row.get('device'), |
| | |
| | safety_flag=self._parse_enum(row.get('safety_flag'), SafetyFlag), |
| | pii_removed=self._parse_bool(row.get('pii_removed')), |
| | annotator_id=row.get('annotator_id'), |
| | qa_status=self._parse_enum(row.get('qa_status'), QAStatus), |
| | approver_id=row.get('approver_id'), |
| | cohen_kappa=self._parse_float(row.get('cohen_kappa')), |
| | notes=row.get('notes'), |
| | eval_split=row.get('eval_split') |
| | ) |
| | else: |
| | |
| | return GroundTruthSample( |
| | text=text, |
| | has_bias=has_bias, |
| | bias_category=bias_category, |
| | expected_correction=expected_correction |
| | ) |
| |
|
| | def _parse_enum(self, value: Optional[str], enum_class) -> Optional[Any]: |
| | """Parse a string value into an enum, returning None if invalid.""" |
| | if not value or value.upper() in ('', 'NEEDS_ANNOTATION', 'N/A', 'NONE'): |
| | return None |
| | try: |
| | |
| | value_lower = value.lower().replace('_', '-') |
| | for member in enum_class: |
| | if member.value.lower() == value_lower or member.name.lower() == value_lower: |
| | return member |
| | return None |
| | except (ValueError, KeyError): |
| | return None |
| |
|
| | def _parse_int(self, value: Optional[str]) -> Optional[int]: |
| | """Parse a string to int, returning None if invalid.""" |
| | if not value or value in ('', 'N/A'): |
| | return None |
| | try: |
| | return int(value) |
| | except ValueError: |
| | return None |
| |
|
| | def _parse_float(self, value: Optional[str]) -> Optional[float]: |
| | """Parse a string to float, returning None if invalid.""" |
| | if not value or value in ('', 'N/A'): |
| | return None |
| | try: |
| | return float(value) |
| | except ValueError: |
| | return None |
| |
|
| | def _parse_bool(self, value: Optional[str]) -> Optional[bool]: |
| | """Parse a string to bool, returning None if invalid.""" |
| | if not value or value in ('', 'N/A'): |
| | return None |
| | return value.lower() in ('true', '1', 'yes') |
| |
|
| |
|
| | class RulesLoader: |
| | """Handles loading bias detection rules from CSV files with validation.""" |
| |
|
| | def __init__(self, rules_dir: Path = Path("rules"), validate: bool = True, |
| | strict_validation: bool = False): |
| | """ |
| | Initialize the rules loader. |
| | |
| | Args: |
| | rules_dir: Directory containing rule files |
| | validate: If True, validates lexicons before loading |
| | strict_validation: If True, warnings become errors during validation |
| | """ |
| | self.rules_dir = rules_dir |
| | self.validate = validate |
| | self.strict_validation = strict_validation |
| | self._validator = LexiconValidator(strict_mode=strict_validation) |
| | self._validation_reports: Dict[str, ValidationReport] = {} |
| |
|
| | def get_validation_report(self, language: Language) -> Optional[ValidationReport]: |
| | """Get the validation report for a language if available.""" |
| | return self._validation_reports.get(language.value) |
| |
|
| | def load_rules(self, language: Language) -> List[Dict[str, str]]: |
| | """ |
| | Load bias detection rules for a specific language. |
| | |
| | Args: |
| | language: Language to load rules for |
| | |
| | Returns: |
| | List of rule dictionaries with AI BRIDGE extended fields |
| | |
| | Raises: |
| | DataLoadError: If rules cannot be loaded |
| | LexiconValidationError: If validation fails (when validate=True) |
| | """ |
| | file_path = self._get_rules_path(language) |
| |
|
| | |
| | if self.validate: |
| | report = self._validator.validate_file(file_path) |
| | self._validation_reports[language.value] = report |
| |
|
| | if not report.is_valid: |
| | |
| | print(f"\n⚠️ Lexicon validation issues for {language.value}:") |
| | for issue in report.issues: |
| | if issue.severity.value == "error": |
| | print(f" ❌ Row {issue.row_number}: {issue.message}") |
| |
|
| | raise LexiconValidationError(report) |
| |
|
| | elif report.warning_count > 0: |
| | print(f"\n⚠️ Lexicon warnings for {language.value}: {report.warning_count} warnings") |
| |
|
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | reader = csv.DictReader(f) |
| | rules = [] |
| |
|
| | for row in reader: |
| | |
| | if row.get('biased'): |
| | rule = { |
| | 'biased': row['biased'], |
| | 'neutral_primary': row.get('neutral_primary', ''), |
| | 'severity': row.get('severity', 'replace'), |
| | 'pos': row.get('pos', 'noun'), |
| | 'tags': row.get('tags', ''), |
| | |
| | 'bias_label': row.get('bias_label', 'stereotype'), |
| | 'stereotype_category': row.get('stereotype_category', 'profession'), |
| | 'explicitness': row.get('explicitness', 'explicit'), |
| | |
| | 'ngeli': row.get('ngeli', ''), |
| | 'number': row.get('number', ''), |
| | 'requires_agreement': row.get('requires_agreement', 'false'), |
| | 'scope': row.get('scope', ''), |
| | 'register': row.get('register', 'formal'), |
| | } |
| | rules.append(rule) |
| |
|
| | return rules |
| |
|
| | except FileNotFoundError: |
| | raise DataLoadError(f"Rules file not found: {file_path}") |
| | except Exception as e: |
| | raise DataLoadError(f"Failed to load rules from {file_path}: {e}") from e |
| | |
| | def _get_rules_path(self, language: Language) -> Path: |
| | """Get the file path for rules data.""" |
| | filename = lexicon_filename(language.value) |
| | return self.rules_dir / filename |
| |
|
| |
|
| | class ResultsWriter: |
| | """Handles writing evaluation results to files.""" |
| | |
| | def __init__(self, results_dir: Path = Path("eval/results")): |
| | """ |
| | Initialize the results writer. |
| | |
| | Args: |
| | results_dir: Directory to write results to |
| | """ |
| | self.results_dir = results_dir |
| | self.results_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | def write_csv_report(self, results: List[Any], filename: str) -> Path: |
| | """ |
| | Write evaluation results to CSV file. |
| | |
| | Args: |
| | results: List of result dictionaries |
| | filename: Name of output file |
| | |
| | Returns: |
| | Path to written file |
| | |
| | Raises: |
| | DataLoadError: If file cannot be written |
| | """ |
| | file_path = self.results_dir / filename |
| | |
| | try: |
| | with open(file_path, 'w', newline='', encoding='utf-8') as f: |
| | if results: |
| | writer = csv.DictWriter(f, fieldnames=results[0].keys()) |
| | writer.writeheader() |
| | writer.writerows(results) |
| | |
| | return file_path |
| | |
| | except Exception as e: |
| | raise DataLoadError(f"Failed to write CSV report to {file_path}: {e}") from e |
| | |
| | def write_json_report(self, data: Dict[str, Any], filename: str) -> Path: |
| | """ |
| | Write data to JSON file. |
| | |
| | Args: |
| | data: Data to write |
| | filename: Name of output file |
| | |
| | Returns: |
| | Path to written file |
| | |
| | Raises: |
| | DataLoadError: If file cannot be written |
| | """ |
| | file_path = self.results_dir / filename |
| | |
| | try: |
| | with open(file_path, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | |
| | return file_path |
| | |
| | except Exception as e: |
| | raise DataLoadError(f"Failed to write JSON report to {file_path}: {e}") from e |