#!/usr/bin/env python3 """Comprehensive data quality audit for Arcspan base training datasets.""" import json, sys, os, re from collections import Counter, defaultdict from pathlib import Path DATA = Path("/home/ubuntu/alkyline/data/processed") def load_jsonl(path): records = [] with open(path, "r", encoding="utf-8") as f: for i, line in enumerate(f): try: records.append(json.loads(line)) except json.JSONDecodeError as e: print(f" JSON ERROR line {i}: {e}") return records def audit_suite(prefix, report): """Audit a 13class or 5class suite.""" report.append(f"\n{'='*70}") report.append(f"AUDITING: {prefix}") report.append(f"{'='*70}") train = load_jsonl(DATA / f"{prefix}_train.jsonl") valid = load_jsonl(DATA / f"{prefix}_valid.jsonl") test = load_jsonl(DATA / f"{prefix}_test.jsonl") splits = {"train": train, "valid": valid, "test": test} # --- 1. Duplicates across splits (train/test leakage) --- report.append(f"\n## 1. Cross-split text overlap (DATA LEAKAGE CHECK)") text_to_splits = defaultdict(set) for sname, recs in splits.items(): for r in recs: text_to_splits[r["text"]].add(sname) leaks = {t: s for t, s in text_to_splits.items() if len(s) > 1} if leaks: report.append(f" **CRITICAL**: {len(leaks)} texts appear in multiple splits!") for t, s in list(leaks.items())[:5]: report.append(f" Splits {s}: {t[:80]}...") else: report.append(f" PASS: No text overlap between train/valid/test") # --- 2. Within-split duplicates --- report.append(f"\n## 2. Within-split duplicates") for sname, recs in splits.items(): texts = [r["text"] for r in recs] tc = Counter(texts) dups = {t: c for t, c in tc.items() if c > 1} report.append(f" {sname}: {len(recs)} records, {len(dups)} duplicate texts ({sum(c-1 for c in dups.values())} extra)") if dups: for t, c in list(sorted(dups.items(), key=lambda x:-x[1]))[:3]: report.append(f" x{c}: {t[:80]}...") # --- 3. Offset errors --- report.append(f"\n## 3. Offset / span alignment errors") for sname, recs in splits.items(): errors = 0 examples = [] for ri, r in enumerate(recs): text = r["text"] for key, offsets in r.get("spans", {}).items(): label, entity = key.split(": ", 1) for start, end in offsets: if start < 0 or end > len(text): errors += 1 if len(examples) < 3: examples.append(f" OOB [{start}:{end}] in text len {len(text)}") continue actual = text[start:end] if actual != entity: errors += 1 if len(examples) < 5: examples.append(f" Expected '{entity}' got '{actual}' [{start}:{end}]") report.append(f" {sname}: {errors} offset errors") for e in examples: report.append(e) # --- 4. Label consistency --- report.append(f"\n## 4. Label consistency (same entity string, different labels)") entity_labels = defaultdict(set) for sname, recs in splits.items(): for r in recs: for key in r.get("spans", {}): label, entity = key.split(": ", 1) entity_labels[entity.lower()].add(label) inconsistent = {e: ls for e, ls in entity_labels.items() if len(ls) > 1} report.append(f" {len(inconsistent)} entities with multiple labels") # Sort by most common first for e, ls in sorted(inconsistent.items(), key=lambda x: -len(x[1]))[:20]: report.append(f" '{e}' -> {sorted(ls)}") # --- 5. Class balance --- report.append(f"\n## 5. Class balance (entity type distribution)") for sname, recs in splits.items(): label_counts = Counter() for r in recs: for key, offsets in r.get("spans", {}).items(): label = key.split(": ", 1)[0] label_counts[label] += len(offsets) report.append(f" {sname}:") total = sum(label_counts.values()) for lab, cnt in sorted(label_counts.items(), key=lambda x: -x[1]): report.append(f" {lab:20s} {cnt:6d} ({100*cnt/total:.1f}%)") report.append(f" {'TOTAL':20s} {total:6d}") # --- 6. Text length distribution --- report.append(f"\n## 6. Text length distribution") for sname, recs in splits.items(): lengths = [len(r["text"]) for r in recs] lengths.sort() report.append(f" {sname}: min={lengths[0]} median={lengths[len(lengths)//2]} " f"mean={sum(lengths)/len(lengths):.0f} max={lengths[-1]} " f"p95={lengths[int(0.95*len(lengths))]}") # count very short short = sum(1 for l in lengths if l < 10) long_ = sum(1 for l in lengths if l > 2000) if short: report.append(f" {short} texts < 10 chars") if long_: report.append(f" {long_} texts > 2000 chars") # --- 7. Encoding issues --- report.append(f"\n## 7. Encoding / control character issues") for sname, recs in splits.items(): issues = 0 for r in recs: t = r["text"] # Check for control chars (except \n \t \r) if re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', t): issues += 1 # Mojibake patterns if 'â€' in t or 'é' in t or 'ö' in t or '�' in t: issues += 1 report.append(f" {sname}: {issues} records with encoding issues") # --- 8. Records with no spans --- report.append(f"\n## 8. Records with no entity spans") for sname, recs in splits.items(): no_spans = sum(1 for r in recs if not r.get("spans")) report.append(f" {sname}: {no_spans}/{len(recs)} records with no spans ({100*no_spans/len(recs):.1f}%)") return splits def audit_enriched(prefix, agg_train, report): """Check enriched = aggregated + LLM records.""" report.append(f"\n## 9. Enriched vs aggregated check ({prefix})") enriched_train = load_jsonl(DATA / f"enriched_{prefix}_train.jsonl") enriched_valid = load_jsonl(DATA / f"enriched_{prefix}_valid.jsonl") enriched_test = load_jsonl(DATA / f"enriched_{prefix}_test.jsonl") agg_valid = load_jsonl(DATA / f"aggregated_{prefix}_valid.jsonl") agg_test = load_jsonl(DATA / f"aggregated_{prefix}_test.jsonl") # Check valid/test are identical report.append(f" enriched valid == aggregated valid: {len(enriched_valid)} == {len(agg_valid)} -> {len(enriched_valid)==len(agg_valid)}") report.append(f" enriched test == aggregated test: {len(enriched_test)} == {len(agg_test)} -> {len(enriched_test)==len(agg_test)}") # Check enriched train contains all aggregated train texts agg_texts = set(r["text"] for r in agg_train) enr_texts = set(r["text"] for r in enriched_train) missing = agg_texts - enr_texts report.append(f" Aggregated train texts in enriched train: {len(agg_texts)-len(missing)}/{len(agg_texts)}") if missing: report.append(f" **MISSING**: {len(missing)} aggregated texts not in enriched!") llm_only = enr_texts - agg_texts report.append(f" LLM-only records in enriched train: {len(llm_only)}") report.append(f" Total enriched train: {len(enriched_train)} (agg {len(agg_train)} + LLM ~{len(enriched_train)-len(agg_train)})") # Check enriched train doesn't leak into valid/test valid_texts = set(r["text"] for r in enriched_valid) test_texts = set(r["text"] for r in enriched_test) leak_v = enr_texts & valid_texts leak_t = enr_texts & test_texts if leak_v: report.append(f" **LEAK**: {len(leak_v)} enriched train texts also in valid!") if leak_t: report.append(f" **LEAK**: {len(leak_t)} enriched train texts also in test!") if not leak_v and not leak_t: report.append(f" PASS: No enriched train/valid/test leakage") def main(): report = ["# Base Data Quality Audit", f"Date: 2026-04-24", ""] # Audit both label spaces for prefix in ["aggregated_13class", "aggregated_5class"]: splits = audit_suite(prefix, report) # Cross-check 13class vs 5class texts are identical report.append(f"\n{'='*70}") report.append("CROSS-CHECK: 13class vs 5class text identity") report.append(f"{'='*70}") for split in ["train", "valid", "test"]: t13 = [json.loads(l)["text"] for l in open(DATA / f"aggregated_13class_{split}.jsonl")] t5 = [json.loads(l)["text"] for l in open(DATA / f"aggregated_5class_{split}.jsonl")] report.append(f" {split}: identical={t13==t5}, len 13c={len(t13)} 5c={len(t5)}") # Enriched checks report.append(f"\n{'='*70}") report.append("ENRICHED FILE CHECKS") report.append(f"{'='*70}") for prefix in ["13class", "5class"]: agg_train = load_jsonl(DATA / f"aggregated_{prefix}_train.jsonl") audit_enriched(prefix, agg_train, report) # Label space comparison 13class vs 5class report.append(f"\n{'='*70}") report.append("LABEL SPACES") report.append(f"{'='*70}") for prefix in ["aggregated_13class", "aggregated_5class"]: recs = load_jsonl(DATA / f"{prefix}_train.jsonl") labels = set() for r in recs: for key in r.get("spans", {}): labels.add(key.split(": ", 1)[0]) report.append(f" {prefix}: {sorted(labels)}") # Print and save text = "\n".join(report) print(text) out = Path("/home/ubuntu/alkyline/research/notes/progress/2026-04-24-31-base-data-quality-audit.md") out.parent.mkdir(parents=True, exist_ok=True) out.write_text(text) print(f"\nReport written to {out}") if __name__ == "__main__": main()