arcspan / scripts /audit_data_quality.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Comprehensive data quality audit for LLM-annotated cybersecurity NER data."""
import json, os, re, sys
from collections import Counter, defaultdict
from pathlib import Path
DATA_DIR = Path("/home/ubuntu/alkyline/data/processed")
FILES = sorted(DATA_DIR.glob("llm_annotated_*.jsonl")) + sorted(DATA_DIR.glob("llm_generated_*.jsonl"))
# Known security vendors/orgs that should NOT be SYSTEM
KNOWN_ORGS = {
"eset", "kaspersky", "mandiant", "fireeye", "crowdstrike", "palo alto",
"symantec", "mcafee", "trend micro", "sophos", "fortinet", "cisco talos",
"recorded future", "unit 42", "proofpoint", "sentinelone", "microsoft",
"google", "facebook", "meta", "amazon", "ibm", "nsa", "cisa", "fbi",
"checkpoint", "check point", "avast", "bitdefender", "malwarebytes",
"rapid7", "qualys", "tenable", "zscaler", "carbon black", "cylance",
"webroot", "f-secure", "nortonlifelock", "trellix"
}
# Patterns for entity type validation
IP_RE = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
CVE_RE = re.compile(r'^CVE-\d{4}-\d+$', re.I)
URL_RE = re.compile(r'^https?://', re.I)
HASH_RE = re.compile(r'^[a-f0-9]{32,64}$', re.I)
# Known operating systems / platforms
KNOWN_SYSTEMS = {
"windows", "linux", "macos", "mac os", "android", "ios", "ubuntu",
"debian", "centos", "red hat", "fedora", "freebsd", "solaris",
"windows 10", "windows 11", "windows 7", "windows server",
"chrome os", "unix"
}
results = {
"offset_errors": [],
"duplicate_texts": [],
"short_texts": [],
"mislabels": [],
"overlapping_spans": [],
"garbage_text": [],
"repetitive_entities": [],
"empty_spans": [],
"parse_errors": [],
"label_distribution": Counter(),
"file_stats": {},
"cross_file_dupes": [],
}
all_texts = {} # text -> [(file, line_num)]
entity_counter = Counter() # "LABEL: entity" -> count
all_records = []
print("Loading all files...")
for fpath in FILES:
fname = fpath.name
records = []
with open(fpath) as f:
for i, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError as e:
results["parse_errors"].append((fname, i, str(e)))
continue
rec["_file"] = fname
rec["_line"] = i
records.append(rec)
# Track text for duplicate detection
txt = rec.get("text", "")
key = txt.strip()
if key not in all_texts:
all_texts[key] = []
all_texts[key].append((fname, i))
results["file_stats"][fname] = len(records)
all_records.extend(records)
print(f"Loaded {len(all_records)} records from {len(FILES)} files")
# === CHECK 1: Offset errors ===
print("Checking offsets...")
offset_err_count = 0
for rec in all_records:
text = rec.get("text", "")
spans = rec.get("spans", {})
for key, positions in spans.items():
if ": " not in key:
continue
label, entity_text = key.split(": ", 1)
results["label_distribution"][label] += 1
for start, end in positions:
actual = text[start:end]
if actual != entity_text:
offset_err_count += 1
if offset_err_count <= 200:
results["offset_errors"].append({
"file": rec["_file"], "line": rec["_line"],
"label": label, "expected": entity_text,
"actual": actual, "start": start, "end": end,
})
# === CHECK 2: Duplicate texts ===
print("Checking duplicates...")
for txt, locs in all_texts.items():
if len(locs) > 1:
files_involved = set(f for f, _ in locs)
results["duplicate_texts"].append({
"count": len(locs),
"files": list(files_involved),
"text_preview": txt[:100],
"cross_file": len(files_involved) > 1,
})
# === CHECK 3: Short texts ===
print("Checking short texts...")
for rec in all_records:
txt = rec.get("text", "")
if len(txt) < 20:
results["short_texts"].append({
"file": rec["_file"], "line": rec["_line"],
"text": txt, "length": len(txt),
})
# === CHECK 4: Mislabels ===
print("Checking mislabels...")
mislabel_count = 0
for rec in all_records:
spans = rec.get("spans", {})
for key in spans:
if ": " not in key:
continue
label, entity = key.split(": ", 1)
ent_lower = entity.lower().strip()
entity_counter[key] += 1
# IP labeled as non-INDICATOR
if IP_RE.match(entity) and label not in ("INDICATOR", "IOC"):
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"IP address labeled as {label}, expected INDICATOR",
})
# CVE labeled wrong
if CVE_RE.match(entity) and label not in ("VULNERABILITY", "CVE"):
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"CVE ID labeled as {label}, expected VULNERABILITY",
})
# URL/hash as non-indicator
if (URL_RE.match(entity) or HASH_RE.match(entity)) and label not in ("INDICATOR", "IOC"):
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"URL/hash labeled as {label}, expected INDICATOR",
})
# Known org labeled as SYSTEM
if label == "SYSTEM" and ent_lower in KNOWN_ORGS:
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"Security vendor/org '{entity}' labeled as SYSTEM, expected ORGANIZATION",
})
# Known system labeled as ORGANIZATION
if label == "ORGANIZATION" and ent_lower in KNOWN_SYSTEMS:
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"OS/platform '{entity}' labeled as ORGANIZATION, expected SYSTEM",
})
# === CHECK 5: Overlapping spans ===
print("Checking overlapping spans...")
overlap_count = 0
for rec in all_records:
spans = rec.get("spans", {})
all_intervals = []
for key, positions in spans.items():
for start, end in positions:
all_intervals.append((start, end, key))
all_intervals.sort()
for i in range(len(all_intervals) - 1):
s1, e1, k1 = all_intervals[i]
s2, e2, k2 = all_intervals[i + 1]
if s2 < e1: # overlap
overlap_count += 1
if overlap_count <= 100:
results["overlapping_spans"].append({
"file": rec["_file"], "line": rec["_line"],
"span1": f"{k1} [{s1}:{e1}]",
"span2": f"{k2} [{s2}:{e2}]",
})
# === CHECK 6: Garbage text ===
print("Checking garbage text...")
HTML_RE = re.compile(r'<[a-z/][^>]*>', re.I)
MARKDOWN_RE = re.compile(r'(?:^|\n)#{1,6}\s|^\s*[\*\-]\s|\[.*?\]\(.*?\)|\*\*.*?\*\*')
for rec in all_records:
txt = rec.get("text", "")
issues = []
if HTML_RE.search(txt):
issues.append("HTML tags")
# Check for high non-ASCII ratio (encoding issues)
non_ascii = sum(1 for c in txt if ord(c) > 127 and c not in '–—''""•…©®™°×÷±€£¥¢')
if non_ascii > len(txt) * 0.1 and len(txt) > 50:
issues.append(f"high non-ASCII ratio ({non_ascii}/{len(txt)})")
if issues:
results["garbage_text"].append({
"file": rec["_file"], "line": rec["_line"],
"issues": issues,
"text_preview": txt[:120],
})
# === CHECK 7: Repetitive entities ===
print("Checking repetitive entities...")
for key, count in entity_counter.most_common(100):
if count >= 50:
results["repetitive_entities"].append({"entity": key, "count": count})
# === CHECK 8: Empty spans ===
print("Checking empty spans...")
for rec in all_records:
spans = rec.get("spans", {})
if not spans:
results["empty_spans"].append({
"file": rec["_file"], "line": rec["_line"],
"text_preview": rec.get("text", "")[:80],
})
# === REPORT ===
print("\n" + "=" * 70)
print("DATA QUALITY AUDIT REPORT")
print("=" * 70)
print(f"\n## Files Audited: {len(FILES)}")
for fname, count in results["file_stats"].items():
print(f" {fname}: {count} records")
print(f" TOTAL: {len(all_records)} records")
print(f"\n## Parse Errors: {len(results['parse_errors'])}")
for pe in results["parse_errors"][:10]:
print(f" {pe}")
print(f"\n## 1. Offset Errors: {offset_err_count}")
for e in results["offset_errors"][:30]:
print(f" [{e['file']}:{e['line']}] {e['label']}: expected '{e['expected']}' got '{e['actual']}' at [{e['start']}:{e['end']}]")
dupe_within = sum(1 for d in results["duplicate_texts"] if not d["cross_file"])
dupe_cross = sum(1 for d in results["duplicate_texts"] if d["cross_file"])
dupe_total_records = sum(d["count"] for d in results["duplicate_texts"])
print(f"\n## 2. Duplicate Texts: {len(results['duplicate_texts'])} unique texts duplicated ({dupe_total_records} total records)")
print(f" Within-file: {dupe_within}, Cross-file: {dupe_cross}")
for d in sorted(results["duplicate_texts"], key=lambda x: -x["count"])[:20]:
print(f" [{d['count']}x] {'CROSS-FILE ' if d['cross_file'] else ''}{d['files']}: {d['text_preview'][:80]}")
print(f"\n## 3. Short Texts (<20 chars): {len(results['short_texts'])}")
for s in results["short_texts"][:20]:
print(f" [{s['file']}:{s['line']}] ({s['length']} chars) '{s['text']}'")
print(f"\n## 4. Mislabels: {mislabel_count}")
# Group by reason pattern
reason_groups = Counter()
for m in results["mislabels"]:
reason_groups[m["reason"].split(",")[0][:60]] += 1
for reason, count in reason_groups.most_common(20):
print(f" [{count}x] {reason}")
print(" Sample issues:")
for m in results["mislabels"][:20]:
print(f" [{m['file']}:{m['line']}] {m['entity']} -> {m['label']}: {m['reason']}")
print(f"\n## 5. Overlapping Spans: {overlap_count}")
for o in results["overlapping_spans"][:20]:
print(f" [{o['file']}:{o['line']}] {o['span1']} <-> {o['span2']}")
print(f"\n## 6. Garbage Text: {len(results['garbage_text'])}")
issue_types = Counter()
for g in results["garbage_text"]:
for iss in g["issues"]:
issue_types[iss.split("(")[0].strip()] += 1
for it, count in issue_types.most_common():
print(f" {it}: {count} records")
for g in results["garbage_text"][:15]:
print(f" [{g['file']}:{g['line']}] {g['issues']}: {g['text_preview'][:80]}")
print(f"\n## 7. Repetitive Entities (50+ occurrences): {len(results['repetitive_entities'])}")
for r in results["repetitive_entities"][:30]:
print(f" {r['entity']}: {r['count']}")
print(f"\n## 8. Empty Spans: {len(results['empty_spans'])}")
empty_by_file = Counter(e["file"] for e in results["empty_spans"])
for fname, count in empty_by_file.most_common():
print(f" {fname}: {count}")
for e in results["empty_spans"][:10]:
print(f" [{e['file']}:{e['line']}] {e['text_preview']}")
print(f"\n## Label Distribution:")
for label, count in results["label_distribution"].most_common():
print(f" {label}: {count}")
# Save detailed JSON
with open("/home/ubuntu/alkyline/scripts/audit_results.json", "w") as f:
json.dump({k: v for k, v in results.items() if k != "file_stats"}, f, indent=2, default=str)
print("\nDetailed results saved to scripts/audit_results.json")