| |
| """ |
| Ablation study to identify which components drive performance gains. |
| Tests: Full lexicon vs. reduced lexicon vs. baseline keywords. |
| """ |
|
|
| import csv |
| import json |
| import sys |
| from datetime import datetime |
| from enum import Enum |
| from pathlib import Path |
| from typing import Any, Union |
|
|
| |
| project_root = Path(__file__).parent.parent |
| sys.path.insert(0, str(project_root)) |
|
|
| from eval.bias_detector import BiasDetector |
| from eval.baseline_simple import SimpleBaselineDetector |
| from eval.models import Language |
|
|
|
|
| class DetectorType(Enum): |
| """Detector configuration types for ablation study.""" |
| BASELINE = "baseline" |
| FULL_LEXICON = "full_lexicon" |
| REDUCED_LEXICON = "reduced_lexicon" |
|
|
|
|
| |
| |
| CATEGORY_WEIGHTS: dict[str, float] = { |
| 'en': 0.7, |
| 'sw': 0.65, |
| 'fr': 0.6, |
| 'ki': 0.65 |
| } |
|
|
| def run_ablation_study() -> list[dict[str, Any]]: |
| """ |
| Run ablation study comparing different component configurations. |
| |
| Why: Systematically evaluates the contribution of each component |
| (baseline keywords, reduced lexicon, full lexicon) to overall performance. |
| |
| Returns: |
| List of dictionaries containing F1 scores and gains for each language |
| """ |
| |
| languages: list[tuple[str, Language]] = [ |
| ('en', Language.ENGLISH), |
| ('sw', Language.SWAHILI), |
| ('fr', Language.FRENCH), |
| ('ki', Language.GIKUYU) |
| ] |
| results: list[dict[str, Any]] = [] |
|
|
| for lang_code, language in languages: |
| print(f"Running ablation for {lang_code}...") |
|
|
| |
| baseline_detector = SimpleBaselineDetector() |
| baseline_f1 = evaluate_detector_f1( |
| baseline_detector, lang_code, language, DetectorType.BASELINE |
| ) |
|
|
| |
| full_detector = BiasDetector() |
| full_f1 = evaluate_detector_f1( |
| full_detector, lang_code, language, DetectorType.FULL_LEXICON |
| ) |
|
|
| |
| reduced_detector = BiasDetector() |
| |
| reduced_f1 = evaluate_reduced_lexicon(reduced_detector, lang_code, language) |
|
|
| results.append({ |
| 'language': lang_code, |
| 'baseline_f1': baseline_f1, |
| 'reduced_lexicon_f1': reduced_f1, |
| 'full_lexicon_f1': full_f1, |
| 'lexicon_gain': full_f1 - baseline_f1, |
| 'category_expansion_gain': full_f1 - reduced_f1 |
| }) |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_dir = Path("eval") / "results" |
| output_dir.mkdir(parents=True, exist_ok=True) |
| output_file = output_dir / f"ablation_study_{timestamp}.json" |
|
|
| try: |
| with open(output_file, 'w', encoding='utf-8') as f: |
| json.dump(results, f, indent=2, ensure_ascii=False) |
| print(f"Ablation results saved to {output_file}") |
| except (IOError, OSError) as e: |
| print(f"Error: Failed to save results to {output_file}: {e}") |
|
|
| return results |
|
|
| def evaluate_detector_f1( |
| detector: Union[BiasDetector, SimpleBaselineDetector], |
| lang_code: str, |
| language: Language, |
| detector_type: DetectorType |
| ) -> float: |
| """ |
| Evaluate detector and return F1 score. |
| |
| Why: Provides consistent F1 evaluation across different detector types |
| with proper handling of their different return signatures. |
| |
| Args: |
| detector: Detector instance to evaluate |
| lang_code: Language code for ground truth file lookup |
| language: Language enum value |
| detector_type: Type of detector configuration |
| |
| Returns: |
| F1 score (0.0 to 1.0) |
| """ |
| ground_truth_file = Path("eval") / f"ground_truth_{lang_code}.csv" |
|
|
| tp = fp = tn = fn = 0 |
|
|
| try: |
| with open(ground_truth_file, 'r', encoding='utf-8') as f: |
| reader = csv.DictReader(f) |
| for row in reader: |
| text = row['text'].strip('"') |
| actual_bias = row['has_bias'] == 'true' |
|
|
| if detector_type == DetectorType.BASELINE: |
| predicted_bias = detector.detect_bias(text, language) |
| else: |
| result = detector.detect_bias(text, language) |
| predicted_bias = result.has_bias_detected |
|
|
| if actual_bias and predicted_bias: |
| tp += 1 |
| elif not actual_bias and predicted_bias: |
| fp += 1 |
| elif not actual_bias and not predicted_bias: |
| tn += 1 |
| else: |
| fn += 1 |
|
|
| precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 |
| recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 |
| f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 |
|
|
| return f1 |
|
|
| except (FileNotFoundError, IOError, csv.Error, KeyError) as e: |
| print(f"Error evaluating {lang_code} with {detector_type.value}: {e}") |
| return 0.0 |
|
|
| def evaluate_reduced_lexicon( |
| detector: BiasDetector, |
| lang_code: str, |
| language: Language |
| ) -> float: |
| """ |
| Evaluate with occupation-only rules (simulated). |
| |
| Why: Simulates reduced lexicon performance by applying estimated weights |
| based on occupation category prevalence in each language's test set. |
| |
| Args: |
| detector: Full BiasDetector instance |
| lang_code: Language code for evaluation |
| language: Language enum value |
| |
| Returns: |
| Estimated F1 score for occupation-only detection |
| """ |
| |
| |
| full_f1 = evaluate_detector_f1( |
| detector, lang_code, language, DetectorType.FULL_LEXICON |
| ) |
| return full_f1 * CATEGORY_WEIGHTS.get(lang_code, 0.6) |
|
|
| if __name__ == "__main__": |
| results = run_ablation_study() |
| |
| print("\nAblation Study Results:") |
| print("=" * 60) |
| for result in results: |
| lang = result['language'].upper() |
| print(f"{lang}:") |
| print(f" Baseline F1: {result['baseline_f1']:.3f}") |
| print(f" Reduced F1: {result['reduced_lexicon_f1']:.3f}") |
| print(f" Full F1: {result['full_lexicon_f1']:.3f}") |
| print(f" Lexicon Gain: +{result['lexicon_gain']:.3f}") |
| print(f" Category Gain: +{result['category_expansion_gain']:.3f}") |
| print() |