#!/usr/bin/env python3 """Audit span boundary quality in Arcspan training data.""" import json import random import re import sys from collections import defaultdict from pathlib import Path random.seed(42) FILES = [ Path("/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl"), Path("/home/ubuntu/alkyline/data/processed/aptner_5class_train.jsonl"), Path("/home/ubuntu/alkyline/data/processed/securebert2_5class_train.jsonl"), ] SAMPLE_SIZE = 200 def parse_surface(key: str) -> str: """Extract surface text from 'Label: surface_text' key.""" idx = key.find(": ") if idx == -1: return key return key[idx + 2:] def is_word_boundary(text, pos): """Check if position is at a word boundary.""" if pos <= 0 or pos >= len(text): return True # Boundary if one side is alnum and other isn't, or at start/end left = text[pos - 1] right = text[pos] # Both alphanumeric = mid-word if left.isalnum() and right.isalnum(): return False return True def audit_file(path: Path): lines = path.read_text().strip().split("\n") total = len(lines) indices = random.sample(range(total), min(SAMPLE_SIZE, total)) issues = { "offset_mismatch": [], "leading_trailing_ws": [], "mid_word_boundary": [], "trailing_punct": [], "overlapping": [], "empty_or_zero": [], "out_of_bounds": [], } for line_idx in indices: line_num = line_idx + 1 rec = json.loads(lines[line_idx]) text = rec["text"] text_len = len(text) all_intervals = [] for key, offsets in rec.get("spans", {}).items(): surface = parse_surface(key) for start, end in offsets: ctx = {"line": line_num, "key": key, "start": start, "end": end} # Empty/zero-length if start >= end: issues["empty_or_zero"].append(ctx) continue # Out of bounds if end > text_len or start < 0: issues["out_of_bounds"].append({**ctx, "text_len": text_len}) continue extracted = text[start:end] ctx["extracted"] = extracted # Offset mismatch if extracted != surface: issues["offset_mismatch"].append({**ctx, "expected": surface}) # Leading/trailing whitespace if extracted != extracted.strip(): issues["leading_trailing_ws"].append(ctx) # Mid-word boundary if not is_word_boundary(text, start) or not is_word_boundary(text, end): issues["mid_word_boundary"].append({ **ctx, "context": text[max(0, start-5):end+5] }) # Trailing punctuation (.,;:!?) that likely shouldn't be in entity if extracted and extracted[-1] in ".,;:!?)": issues["trailing_punct"].append(ctx) all_intervals.append((start, end, key)) # Overlapping spans all_intervals.sort() for i in range(len(all_intervals) - 1): s1, e1, k1 = all_intervals[i] s2, e2, k2 = all_intervals[i + 1] if s2 < e1: issues["overlapping"].append({ "line": line_num, "span1": (k1, s1, e1), "span2": (k2, s2, e2), }) return total, issues def main(): for path in FILES: if not path.exists(): print(f"\n{'='*60}") print(f"SKIPPED (not found): {path.name}") continue print(f"\n{'='*60}") print(f"FILE: {path.name}") total, issues = audit_file(path) print(f"Total examples: {total}, sampled: {min(SAMPLE_SIZE, total)}") print(f"{'='*60}") any_found = False for cat, items in issues.items(): count = len(items) if count == 0: continue any_found = True print(f"\n [{cat.upper()}] — {count} issues") for ex in items[:5]: print(f" Line {ex.get('line', '?')}: {json.dumps(ex, ensure_ascii=False, default=str)}") if not any_found: print("\n ✅ No issues found in sample!") # Summary table print(f"\n --- Summary ---") for cat, items in issues.items(): print(f" {cat:25s}: {len(items)}") if __name__ == "__main__": main()