| |
| """Audit span boundary quality in Arcspan training data.""" |
|
|
| import json |
| import random |
| import re |
| import sys |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| random.seed(42) |
|
|
| FILES = [ |
| Path("/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl"), |
| Path("/home/ubuntu/alkyline/data/processed/aptner_5class_train.jsonl"), |
| Path("/home/ubuntu/alkyline/data/processed/securebert2_5class_train.jsonl"), |
| ] |
|
|
| SAMPLE_SIZE = 200 |
|
|
|
|
| def parse_surface(key: str) -> str: |
| """Extract surface text from 'Label: surface_text' key.""" |
| idx = key.find(": ") |
| if idx == -1: |
| return key |
| return key[idx + 2:] |
|
|
|
|
| def is_word_boundary(text, pos): |
| """Check if position is at a word boundary.""" |
| if pos <= 0 or pos >= len(text): |
| return True |
| |
| left = text[pos - 1] |
| right = text[pos] |
| |
| if left.isalnum() and right.isalnum(): |
| return False |
| return True |
|
|
|
|
| def audit_file(path: Path): |
| lines = path.read_text().strip().split("\n") |
| total = len(lines) |
| indices = random.sample(range(total), min(SAMPLE_SIZE, total)) |
|
|
| issues = { |
| "offset_mismatch": [], |
| "leading_trailing_ws": [], |
| "mid_word_boundary": [], |
| "trailing_punct": [], |
| "overlapping": [], |
| "empty_or_zero": [], |
| "out_of_bounds": [], |
| } |
|
|
| for line_idx in indices: |
| line_num = line_idx + 1 |
| rec = json.loads(lines[line_idx]) |
| text = rec["text"] |
| text_len = len(text) |
| all_intervals = [] |
|
|
| for key, offsets in rec.get("spans", {}).items(): |
| surface = parse_surface(key) |
| for start, end in offsets: |
| ctx = {"line": line_num, "key": key, "start": start, "end": end} |
|
|
| |
| if start >= end: |
| issues["empty_or_zero"].append(ctx) |
| continue |
|
|
| |
| if end > text_len or start < 0: |
| issues["out_of_bounds"].append({**ctx, "text_len": text_len}) |
| continue |
|
|
| extracted = text[start:end] |
| ctx["extracted"] = extracted |
|
|
| |
| if extracted != surface: |
| issues["offset_mismatch"].append({**ctx, "expected": surface}) |
|
|
| |
| if extracted != extracted.strip(): |
| issues["leading_trailing_ws"].append(ctx) |
|
|
| |
| if not is_word_boundary(text, start) or not is_word_boundary(text, end): |
| issues["mid_word_boundary"].append({ |
| **ctx, |
| "context": text[max(0, start-5):end+5] |
| }) |
|
|
| |
| if extracted and extracted[-1] in ".,;:!?)": |
| issues["trailing_punct"].append(ctx) |
|
|
| all_intervals.append((start, end, key)) |
|
|
| |
| all_intervals.sort() |
| for i in range(len(all_intervals) - 1): |
| s1, e1, k1 = all_intervals[i] |
| s2, e2, k2 = all_intervals[i + 1] |
| if s2 < e1: |
| issues["overlapping"].append({ |
| "line": line_num, |
| "span1": (k1, s1, e1), |
| "span2": (k2, s2, e2), |
| }) |
|
|
| return total, issues |
|
|
|
|
| def main(): |
| for path in FILES: |
| if not path.exists(): |
| print(f"\n{'='*60}") |
| print(f"SKIPPED (not found): {path.name}") |
| continue |
|
|
| print(f"\n{'='*60}") |
| print(f"FILE: {path.name}") |
| total, issues = audit_file(path) |
| print(f"Total examples: {total}, sampled: {min(SAMPLE_SIZE, total)}") |
| print(f"{'='*60}") |
|
|
| any_found = False |
| for cat, items in issues.items(): |
| count = len(items) |
| if count == 0: |
| continue |
| any_found = True |
| print(f"\n [{cat.upper()}] — {count} issues") |
| for ex in items[:5]: |
| print(f" Line {ex.get('line', '?')}: {json.dumps(ex, ensure_ascii=False, default=str)}") |
|
|
| if not any_found: |
| print("\n ✅ No issues found in sample!") |
|
|
| |
| print(f"\n --- Summary ---") |
| for cat, items in issues.items(): |
| print(f" {cat:25s}: {len(items)}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|