arcspan / scripts /audit_span_boundaries.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Audit span boundary quality in Arcspan training data."""
import json
import random
import re
import sys
from collections import defaultdict
from pathlib import Path
random.seed(42)
FILES = [
Path("/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl"),
Path("/home/ubuntu/alkyline/data/processed/aptner_5class_train.jsonl"),
Path("/home/ubuntu/alkyline/data/processed/securebert2_5class_train.jsonl"),
]
SAMPLE_SIZE = 200
def parse_surface(key: str) -> str:
"""Extract surface text from 'Label: surface_text' key."""
idx = key.find(": ")
if idx == -1:
return key
return key[idx + 2:]
def is_word_boundary(text, pos):
"""Check if position is at a word boundary."""
if pos <= 0 or pos >= len(text):
return True
# Boundary if one side is alnum and other isn't, or at start/end
left = text[pos - 1]
right = text[pos]
# Both alphanumeric = mid-word
if left.isalnum() and right.isalnum():
return False
return True
def audit_file(path: Path):
lines = path.read_text().strip().split("\n")
total = len(lines)
indices = random.sample(range(total), min(SAMPLE_SIZE, total))
issues = {
"offset_mismatch": [],
"leading_trailing_ws": [],
"mid_word_boundary": [],
"trailing_punct": [],
"overlapping": [],
"empty_or_zero": [],
"out_of_bounds": [],
}
for line_idx in indices:
line_num = line_idx + 1
rec = json.loads(lines[line_idx])
text = rec["text"]
text_len = len(text)
all_intervals = []
for key, offsets in rec.get("spans", {}).items():
surface = parse_surface(key)
for start, end in offsets:
ctx = {"line": line_num, "key": key, "start": start, "end": end}
# Empty/zero-length
if start >= end:
issues["empty_or_zero"].append(ctx)
continue
# Out of bounds
if end > text_len or start < 0:
issues["out_of_bounds"].append({**ctx, "text_len": text_len})
continue
extracted = text[start:end]
ctx["extracted"] = extracted
# Offset mismatch
if extracted != surface:
issues["offset_mismatch"].append({**ctx, "expected": surface})
# Leading/trailing whitespace
if extracted != extracted.strip():
issues["leading_trailing_ws"].append(ctx)
# Mid-word boundary
if not is_word_boundary(text, start) or not is_word_boundary(text, end):
issues["mid_word_boundary"].append({
**ctx,
"context": text[max(0, start-5):end+5]
})
# Trailing punctuation (.,;:!?) that likely shouldn't be in entity
if extracted and extracted[-1] in ".,;:!?)":
issues["trailing_punct"].append(ctx)
all_intervals.append((start, end, key))
# Overlapping spans
all_intervals.sort()
for i in range(len(all_intervals) - 1):
s1, e1, k1 = all_intervals[i]
s2, e2, k2 = all_intervals[i + 1]
if s2 < e1:
issues["overlapping"].append({
"line": line_num,
"span1": (k1, s1, e1),
"span2": (k2, s2, e2),
})
return total, issues
def main():
for path in FILES:
if not path.exists():
print(f"\n{'='*60}")
print(f"SKIPPED (not found): {path.name}")
continue
print(f"\n{'='*60}")
print(f"FILE: {path.name}")
total, issues = audit_file(path)
print(f"Total examples: {total}, sampled: {min(SAMPLE_SIZE, total)}")
print(f"{'='*60}")
any_found = False
for cat, items in issues.items():
count = len(items)
if count == 0:
continue
any_found = True
print(f"\n [{cat.upper()}] — {count} issues")
for ex in items[:5]:
print(f" Line {ex.get('line', '?')}: {json.dumps(ex, ensure_ascii=False, default=str)}")
if not any_found:
print("\n ✅ No issues found in sample!")
# Summary table
print(f"\n --- Summary ---")
for cat, items in issues.items():
print(f" {cat:25s}: {len(items)}")
if __name__ == "__main__":
main()