File size: 5,268 Bytes
9c0aba1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """Rules engine — loads lexicon, applies corrections, builds edit/reason output."""
import re
import sys
from pathlib import Path
import pandas as pd
BASE = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(BASE))
from config import lexicon_filename
from eval.context_checker import should_apply_correction
RULES_DIR = BASE / "rules"
def load_rules(lang: str) -> list[dict]:
path = RULES_DIR / lexicon_filename(lang)
if not path.exists():
return []
df = pd.read_csv(path, on_bad_lines="skip")
return [{col: str(row.get(col, "")) for col in df.columns} for _, row in df.iterrows()]
# Module-level cache — loaded once at import time
RULES: dict[str, list[dict]] = {
"en": load_rules("en"),
"sw": load_rules("sw"),
}
def _preserve_case(orig: str, replacement: str) -> str:
if orig.isupper():
return replacement.upper()
if orig[0].isupper():
return replacement.capitalize()
return replacement
def _make_edit(orig: str, replacement: str, rule: dict) -> dict:
tags = rule.get("tags", "") or "occupation/role"
severity = rule.get("severity", "replace")
return {
"from": orig,
"to": replacement,
"severity": severity,
"tags": tags,
"reason": f"'{orig}' is gender-biased ({tags}); use gender-neutral '{replacement}'",
}
def _apply_rule(text: str, rule: dict) -> tuple[str, dict | None]:
"""Apply one rule to text. Returns (new_text, edit) or (text, None) if no match."""
biased = rule["biased"]
neutral = rule["neutral_primary"]
severity = rule.get("severity", "replace")
pattern = r"\b" + re.escape(biased) + r"\b"
if not re.search(pattern, text, flags=re.IGNORECASE):
return text, None
orig = re.search(pattern, text, flags=re.IGNORECASE).group(0)
replacement = _preserve_case(orig, neutral)
if severity == "warn":
new_text = re.sub(
pattern,
lambda m: m.group(0) + f" [consider {replacement}]",
text,
flags=re.IGNORECASE,
)
else:
new_text = re.sub(
pattern,
lambda m: _preserve_case(m.group(0), neutral),
text,
flags=re.IGNORECASE,
)
return new_text, _make_edit(orig, replacement, rule)
def apply_rules_on_spans(
text: str, lang: str, flags: list = None
) -> tuple[str, list, int, list]:
"""
Apply lexicon rules to text with context checking.
Returns: (rewritten_text, edits, matched_count, skipped_context)
"""
edits = []
skipped = []
new_text = text
rules = RULES.get(lang, [])
if not rules:
return new_text, edits, 0, skipped
if flags:
matched = 0
for f in flags:
if "text" in f:
span_text = f["text"]
elif "span" in f:
s, e = f["span"]
span_text = text[s:e]
else:
continue
for rule in rules:
biased = rule["biased"]
pattern = r"\b" + re.escape(biased) + r"\b"
if not re.search(pattern, span_text, flags=re.IGNORECASE):
continue
avoid_when = rule.get("avoid_when", "")
constraints = rule.get("constraints", "")
if avoid_when or constraints:
ok, ctx_reason = should_apply_correction(text, biased, avoid_when, constraints)
if not ok:
skipped.append({"term": biased, "reason": ctx_reason, "avoid_when": avoid_when})
continue
new_text, edit = _apply_rule(new_text, rule)
if edit:
edits.append(edit)
matched += 1
break
return new_text, edits, matched, skipped
for rule in rules:
biased = rule["biased"]
pattern = r"\b" + re.escape(biased) + r"\b"
if not re.search(pattern, new_text, flags=re.IGNORECASE):
continue
avoid_when = rule.get("avoid_when", "")
constraints = rule.get("constraints", "")
if avoid_when or constraints:
ok, ctx_reason = should_apply_correction(text, biased, avoid_when, constraints)
if not ok:
skipped.append({"term": biased, "reason": ctx_reason, "avoid_when": avoid_when})
continue
new_text, edit = _apply_rule(new_text, rule)
if edit:
edits.append(edit)
return new_text, edits, len(edits), skipped
def build_reason(source: str, edits: list, skipped: list) -> str:
if source == "preserved":
return "Original preserved — correction would damage meaning (semantic score below threshold)."
if source == "ml":
return "No lexicon rules matched; ML fallback applied. Human review required."
if edits:
terms = ", ".join(f"'{e['from']}'" for e in edits)
return f"{len(edits)} biased term(s) corrected: {terms}."
if skipped:
terms = ", ".join(f"'{s['term']}'" for s in skipped)
return f"Bias terms detected ({terms}) but skipped — biographical, quote, or statistical context."
return "No gender bias detected."
|