| |
| """Comprehensive data quality audit for Arcspan base training datasets.""" |
|
|
| import json, sys, os, re |
| from collections import Counter, defaultdict |
| from pathlib import Path |
|
|
| DATA = Path("/home/ubuntu/alkyline/data/processed") |
|
|
| def load_jsonl(path): |
| records = [] |
| with open(path, "r", encoding="utf-8") as f: |
| for i, line in enumerate(f): |
| try: |
| records.append(json.loads(line)) |
| except json.JSONDecodeError as e: |
| print(f" JSON ERROR line {i}: {e}") |
| return records |
|
|
| def audit_suite(prefix, report): |
| """Audit a 13class or 5class suite.""" |
| report.append(f"\n{'='*70}") |
| report.append(f"AUDITING: {prefix}") |
| report.append(f"{'='*70}") |
|
|
| train = load_jsonl(DATA / f"{prefix}_train.jsonl") |
| valid = load_jsonl(DATA / f"{prefix}_valid.jsonl") |
| test = load_jsonl(DATA / f"{prefix}_test.jsonl") |
|
|
| splits = {"train": train, "valid": valid, "test": test} |
|
|
| |
| report.append(f"\n## 1. Cross-split text overlap (DATA LEAKAGE CHECK)") |
| text_to_splits = defaultdict(set) |
| for sname, recs in splits.items(): |
| for r in recs: |
| text_to_splits[r["text"]].add(sname) |
|
|
| leaks = {t: s for t, s in text_to_splits.items() if len(s) > 1} |
| if leaks: |
| report.append(f" **CRITICAL**: {len(leaks)} texts appear in multiple splits!") |
| for t, s in list(leaks.items())[:5]: |
| report.append(f" Splits {s}: {t[:80]}...") |
| else: |
| report.append(f" PASS: No text overlap between train/valid/test") |
|
|
| |
| report.append(f"\n## 2. Within-split duplicates") |
| for sname, recs in splits.items(): |
| texts = [r["text"] for r in recs] |
| tc = Counter(texts) |
| dups = {t: c for t, c in tc.items() if c > 1} |
| report.append(f" {sname}: {len(recs)} records, {len(dups)} duplicate texts ({sum(c-1 for c in dups.values())} extra)") |
| if dups: |
| for t, c in list(sorted(dups.items(), key=lambda x:-x[1]))[:3]: |
| report.append(f" x{c}: {t[:80]}...") |
|
|
| |
| report.append(f"\n## 3. Offset / span alignment errors") |
| for sname, recs in splits.items(): |
| errors = 0 |
| examples = [] |
| for ri, r in enumerate(recs): |
| text = r["text"] |
| for key, offsets in r.get("spans", {}).items(): |
| label, entity = key.split(": ", 1) |
| for start, end in offsets: |
| if start < 0 or end > len(text): |
| errors += 1 |
| if len(examples) < 3: |
| examples.append(f" OOB [{start}:{end}] in text len {len(text)}") |
| continue |
| actual = text[start:end] |
| if actual != entity: |
| errors += 1 |
| if len(examples) < 5: |
| examples.append(f" Expected '{entity}' got '{actual}' [{start}:{end}]") |
| report.append(f" {sname}: {errors} offset errors") |
| for e in examples: |
| report.append(e) |
|
|
| |
| report.append(f"\n## 4. Label consistency (same entity string, different labels)") |
| entity_labels = defaultdict(set) |
| for sname, recs in splits.items(): |
| for r in recs: |
| for key in r.get("spans", {}): |
| label, entity = key.split(": ", 1) |
| entity_labels[entity.lower()].add(label) |
|
|
| inconsistent = {e: ls for e, ls in entity_labels.items() if len(ls) > 1} |
| report.append(f" {len(inconsistent)} entities with multiple labels") |
| |
| for e, ls in sorted(inconsistent.items(), key=lambda x: -len(x[1]))[:20]: |
| report.append(f" '{e}' -> {sorted(ls)}") |
|
|
| |
| report.append(f"\n## 5. Class balance (entity type distribution)") |
| for sname, recs in splits.items(): |
| label_counts = Counter() |
| for r in recs: |
| for key, offsets in r.get("spans", {}).items(): |
| label = key.split(": ", 1)[0] |
| label_counts[label] += len(offsets) |
| report.append(f" {sname}:") |
| total = sum(label_counts.values()) |
| for lab, cnt in sorted(label_counts.items(), key=lambda x: -x[1]): |
| report.append(f" {lab:20s} {cnt:6d} ({100*cnt/total:.1f}%)") |
| report.append(f" {'TOTAL':20s} {total:6d}") |
|
|
| |
| report.append(f"\n## 6. Text length distribution") |
| for sname, recs in splits.items(): |
| lengths = [len(r["text"]) for r in recs] |
| lengths.sort() |
| report.append(f" {sname}: min={lengths[0]} median={lengths[len(lengths)//2]} " |
| f"mean={sum(lengths)/len(lengths):.0f} max={lengths[-1]} " |
| f"p95={lengths[int(0.95*len(lengths))]}") |
| |
| short = sum(1 for l in lengths if l < 10) |
| long_ = sum(1 for l in lengths if l > 2000) |
| if short: report.append(f" {short} texts < 10 chars") |
| if long_: report.append(f" {long_} texts > 2000 chars") |
|
|
| |
| report.append(f"\n## 7. Encoding / control character issues") |
| for sname, recs in splits.items(): |
| issues = 0 |
| for r in recs: |
| t = r["text"] |
| |
| if re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', t): |
| issues += 1 |
| |
| if 'â€' in t or 'é' in t or 'ö' in t or '�' in t: |
| issues += 1 |
| report.append(f" {sname}: {issues} records with encoding issues") |
|
|
| |
| report.append(f"\n## 8. Records with no entity spans") |
| for sname, recs in splits.items(): |
| no_spans = sum(1 for r in recs if not r.get("spans")) |
| report.append(f" {sname}: {no_spans}/{len(recs)} records with no spans ({100*no_spans/len(recs):.1f}%)") |
|
|
| return splits |
|
|
|
|
| def audit_enriched(prefix, agg_train, report): |
| """Check enriched = aggregated + LLM records.""" |
| report.append(f"\n## 9. Enriched vs aggregated check ({prefix})") |
| enriched_train = load_jsonl(DATA / f"enriched_{prefix}_train.jsonl") |
| enriched_valid = load_jsonl(DATA / f"enriched_{prefix}_valid.jsonl") |
| enriched_test = load_jsonl(DATA / f"enriched_{prefix}_test.jsonl") |
|
|
| agg_valid = load_jsonl(DATA / f"aggregated_{prefix}_valid.jsonl") |
| agg_test = load_jsonl(DATA / f"aggregated_{prefix}_test.jsonl") |
|
|
| |
| report.append(f" enriched valid == aggregated valid: {len(enriched_valid)} == {len(agg_valid)} -> {len(enriched_valid)==len(agg_valid)}") |
| report.append(f" enriched test == aggregated test: {len(enriched_test)} == {len(agg_test)} -> {len(enriched_test)==len(agg_test)}") |
|
|
| |
| agg_texts = set(r["text"] for r in agg_train) |
| enr_texts = set(r["text"] for r in enriched_train) |
| missing = agg_texts - enr_texts |
| report.append(f" Aggregated train texts in enriched train: {len(agg_texts)-len(missing)}/{len(agg_texts)}") |
| if missing: |
| report.append(f" **MISSING**: {len(missing)} aggregated texts not in enriched!") |
|
|
| llm_only = enr_texts - agg_texts |
| report.append(f" LLM-only records in enriched train: {len(llm_only)}") |
| report.append(f" Total enriched train: {len(enriched_train)} (agg {len(agg_train)} + LLM ~{len(enriched_train)-len(agg_train)})") |
|
|
| |
| valid_texts = set(r["text"] for r in enriched_valid) |
| test_texts = set(r["text"] for r in enriched_test) |
| leak_v = enr_texts & valid_texts |
| leak_t = enr_texts & test_texts |
| if leak_v: |
| report.append(f" **LEAK**: {len(leak_v)} enriched train texts also in valid!") |
| if leak_t: |
| report.append(f" **LEAK**: {len(leak_t)} enriched train texts also in test!") |
| if not leak_v and not leak_t: |
| report.append(f" PASS: No enriched train/valid/test leakage") |
|
|
|
|
| def main(): |
| report = ["# Base Data Quality Audit", f"Date: 2026-04-24", ""] |
|
|
| |
| for prefix in ["aggregated_13class", "aggregated_5class"]: |
| splits = audit_suite(prefix, report) |
|
|
| |
| report.append(f"\n{'='*70}") |
| report.append("CROSS-CHECK: 13class vs 5class text identity") |
| report.append(f"{'='*70}") |
| for split in ["train", "valid", "test"]: |
| t13 = [json.loads(l)["text"] for l in open(DATA / f"aggregated_13class_{split}.jsonl")] |
| t5 = [json.loads(l)["text"] for l in open(DATA / f"aggregated_5class_{split}.jsonl")] |
| report.append(f" {split}: identical={t13==t5}, len 13c={len(t13)} 5c={len(t5)}") |
|
|
| |
| report.append(f"\n{'='*70}") |
| report.append("ENRICHED FILE CHECKS") |
| report.append(f"{'='*70}") |
| for prefix in ["13class", "5class"]: |
| agg_train = load_jsonl(DATA / f"aggregated_{prefix}_train.jsonl") |
| audit_enriched(prefix, agg_train, report) |
|
|
| |
| report.append(f"\n{'='*70}") |
| report.append("LABEL SPACES") |
| report.append(f"{'='*70}") |
| for prefix in ["aggregated_13class", "aggregated_5class"]: |
| recs = load_jsonl(DATA / f"{prefix}_train.jsonl") |
| labels = set() |
| for r in recs: |
| for key in r.get("spans", {}): |
| labels.add(key.split(": ", 1)[0]) |
| report.append(f" {prefix}: {sorted(labels)}") |
|
|
| |
| text = "\n".join(report) |
| print(text) |
|
|
| out = Path("/home/ubuntu/alkyline/research/notes/progress/2026-04-24-31-base-data-quality-audit.md") |
| out.parent.mkdir(parents=True, exist_ok=True) |
| out.write_text(text) |
| print(f"\nReport written to {out}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|