arcspan / scripts /audit_base_data.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Comprehensive data quality audit for Arcspan base training datasets."""
import json, sys, os, re
from collections import Counter, defaultdict
from pathlib import Path
DATA = Path("/home/ubuntu/alkyline/data/processed")
def load_jsonl(path):
records = []
with open(path, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
try:
records.append(json.loads(line))
except json.JSONDecodeError as e:
print(f" JSON ERROR line {i}: {e}")
return records
def audit_suite(prefix, report):
"""Audit a 13class or 5class suite."""
report.append(f"\n{'='*70}")
report.append(f"AUDITING: {prefix}")
report.append(f"{'='*70}")
train = load_jsonl(DATA / f"{prefix}_train.jsonl")
valid = load_jsonl(DATA / f"{prefix}_valid.jsonl")
test = load_jsonl(DATA / f"{prefix}_test.jsonl")
splits = {"train": train, "valid": valid, "test": test}
# --- 1. Duplicates across splits (train/test leakage) ---
report.append(f"\n## 1. Cross-split text overlap (DATA LEAKAGE CHECK)")
text_to_splits = defaultdict(set)
for sname, recs in splits.items():
for r in recs:
text_to_splits[r["text"]].add(sname)
leaks = {t: s for t, s in text_to_splits.items() if len(s) > 1}
if leaks:
report.append(f" **CRITICAL**: {len(leaks)} texts appear in multiple splits!")
for t, s in list(leaks.items())[:5]:
report.append(f" Splits {s}: {t[:80]}...")
else:
report.append(f" PASS: No text overlap between train/valid/test")
# --- 2. Within-split duplicates ---
report.append(f"\n## 2. Within-split duplicates")
for sname, recs in splits.items():
texts = [r["text"] for r in recs]
tc = Counter(texts)
dups = {t: c for t, c in tc.items() if c > 1}
report.append(f" {sname}: {len(recs)} records, {len(dups)} duplicate texts ({sum(c-1 for c in dups.values())} extra)")
if dups:
for t, c in list(sorted(dups.items(), key=lambda x:-x[1]))[:3]:
report.append(f" x{c}: {t[:80]}...")
# --- 3. Offset errors ---
report.append(f"\n## 3. Offset / span alignment errors")
for sname, recs in splits.items():
errors = 0
examples = []
for ri, r in enumerate(recs):
text = r["text"]
for key, offsets in r.get("spans", {}).items():
label, entity = key.split(": ", 1)
for start, end in offsets:
if start < 0 or end > len(text):
errors += 1
if len(examples) < 3:
examples.append(f" OOB [{start}:{end}] in text len {len(text)}")
continue
actual = text[start:end]
if actual != entity:
errors += 1
if len(examples) < 5:
examples.append(f" Expected '{entity}' got '{actual}' [{start}:{end}]")
report.append(f" {sname}: {errors} offset errors")
for e in examples:
report.append(e)
# --- 4. Label consistency ---
report.append(f"\n## 4. Label consistency (same entity string, different labels)")
entity_labels = defaultdict(set)
for sname, recs in splits.items():
for r in recs:
for key in r.get("spans", {}):
label, entity = key.split(": ", 1)
entity_labels[entity.lower()].add(label)
inconsistent = {e: ls for e, ls in entity_labels.items() if len(ls) > 1}
report.append(f" {len(inconsistent)} entities with multiple labels")
# Sort by most common first
for e, ls in sorted(inconsistent.items(), key=lambda x: -len(x[1]))[:20]:
report.append(f" '{e}' -> {sorted(ls)}")
# --- 5. Class balance ---
report.append(f"\n## 5. Class balance (entity type distribution)")
for sname, recs in splits.items():
label_counts = Counter()
for r in recs:
for key, offsets in r.get("spans", {}).items():
label = key.split(": ", 1)[0]
label_counts[label] += len(offsets)
report.append(f" {sname}:")
total = sum(label_counts.values())
for lab, cnt in sorted(label_counts.items(), key=lambda x: -x[1]):
report.append(f" {lab:20s} {cnt:6d} ({100*cnt/total:.1f}%)")
report.append(f" {'TOTAL':20s} {total:6d}")
# --- 6. Text length distribution ---
report.append(f"\n## 6. Text length distribution")
for sname, recs in splits.items():
lengths = [len(r["text"]) for r in recs]
lengths.sort()
report.append(f" {sname}: min={lengths[0]} median={lengths[len(lengths)//2]} "
f"mean={sum(lengths)/len(lengths):.0f} max={lengths[-1]} "
f"p95={lengths[int(0.95*len(lengths))]}")
# count very short
short = sum(1 for l in lengths if l < 10)
long_ = sum(1 for l in lengths if l > 2000)
if short: report.append(f" {short} texts < 10 chars")
if long_: report.append(f" {long_} texts > 2000 chars")
# --- 7. Encoding issues ---
report.append(f"\n## 7. Encoding / control character issues")
for sname, recs in splits.items():
issues = 0
for r in recs:
t = r["text"]
# Check for control chars (except \n \t \r)
if re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', t):
issues += 1
# Mojibake patterns
if 'â€' in t or 'é' in t or 'ö' in t or '�' in t:
issues += 1
report.append(f" {sname}: {issues} records with encoding issues")
# --- 8. Records with no spans ---
report.append(f"\n## 8. Records with no entity spans")
for sname, recs in splits.items():
no_spans = sum(1 for r in recs if not r.get("spans"))
report.append(f" {sname}: {no_spans}/{len(recs)} records with no spans ({100*no_spans/len(recs):.1f}%)")
return splits
def audit_enriched(prefix, agg_train, report):
"""Check enriched = aggregated + LLM records."""
report.append(f"\n## 9. Enriched vs aggregated check ({prefix})")
enriched_train = load_jsonl(DATA / f"enriched_{prefix}_train.jsonl")
enriched_valid = load_jsonl(DATA / f"enriched_{prefix}_valid.jsonl")
enriched_test = load_jsonl(DATA / f"enriched_{prefix}_test.jsonl")
agg_valid = load_jsonl(DATA / f"aggregated_{prefix}_valid.jsonl")
agg_test = load_jsonl(DATA / f"aggregated_{prefix}_test.jsonl")
# Check valid/test are identical
report.append(f" enriched valid == aggregated valid: {len(enriched_valid)} == {len(agg_valid)} -> {len(enriched_valid)==len(agg_valid)}")
report.append(f" enriched test == aggregated test: {len(enriched_test)} == {len(agg_test)} -> {len(enriched_test)==len(agg_test)}")
# Check enriched train contains all aggregated train texts
agg_texts = set(r["text"] for r in agg_train)
enr_texts = set(r["text"] for r in enriched_train)
missing = agg_texts - enr_texts
report.append(f" Aggregated train texts in enriched train: {len(agg_texts)-len(missing)}/{len(agg_texts)}")
if missing:
report.append(f" **MISSING**: {len(missing)} aggregated texts not in enriched!")
llm_only = enr_texts - agg_texts
report.append(f" LLM-only records in enriched train: {len(llm_only)}")
report.append(f" Total enriched train: {len(enriched_train)} (agg {len(agg_train)} + LLM ~{len(enriched_train)-len(agg_train)})")
# Check enriched train doesn't leak into valid/test
valid_texts = set(r["text"] for r in enriched_valid)
test_texts = set(r["text"] for r in enriched_test)
leak_v = enr_texts & valid_texts
leak_t = enr_texts & test_texts
if leak_v:
report.append(f" **LEAK**: {len(leak_v)} enriched train texts also in valid!")
if leak_t:
report.append(f" **LEAK**: {len(leak_t)} enriched train texts also in test!")
if not leak_v and not leak_t:
report.append(f" PASS: No enriched train/valid/test leakage")
def main():
report = ["# Base Data Quality Audit", f"Date: 2026-04-24", ""]
# Audit both label spaces
for prefix in ["aggregated_13class", "aggregated_5class"]:
splits = audit_suite(prefix, report)
# Cross-check 13class vs 5class texts are identical
report.append(f"\n{'='*70}")
report.append("CROSS-CHECK: 13class vs 5class text identity")
report.append(f"{'='*70}")
for split in ["train", "valid", "test"]:
t13 = [json.loads(l)["text"] for l in open(DATA / f"aggregated_13class_{split}.jsonl")]
t5 = [json.loads(l)["text"] for l in open(DATA / f"aggregated_5class_{split}.jsonl")]
report.append(f" {split}: identical={t13==t5}, len 13c={len(t13)} 5c={len(t5)}")
# Enriched checks
report.append(f"\n{'='*70}")
report.append("ENRICHED FILE CHECKS")
report.append(f"{'='*70}")
for prefix in ["13class", "5class"]:
agg_train = load_jsonl(DATA / f"aggregated_{prefix}_train.jsonl")
audit_enriched(prefix, agg_train, report)
# Label space comparison 13class vs 5class
report.append(f"\n{'='*70}")
report.append("LABEL SPACES")
report.append(f"{'='*70}")
for prefix in ["aggregated_13class", "aggregated_5class"]:
recs = load_jsonl(DATA / f"{prefix}_train.jsonl")
labels = set()
for r in recs:
for key in r.get("spans", {}):
labels.add(key.split(": ", 1)[0])
report.append(f" {prefix}: {sorted(labels)}")
# Print and save
text = "\n".join(report)
print(text)
out = Path("/home/ubuntu/alkyline/research/notes/progress/2026-04-24-31-base-data-quality-audit.md")
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(text)
print(f"\nReport written to {out}")
if __name__ == "__main__":
main()