#!/usr/bin/env python3 """Comprehensive data quality audit for LLM-annotated cybersecurity NER data.""" import json, os, re, sys from collections import Counter, defaultdict from pathlib import Path DATA_DIR = Path("/home/ubuntu/alkyline/data/processed") FILES = sorted(DATA_DIR.glob("llm_annotated_*.jsonl")) + sorted(DATA_DIR.glob("llm_generated_*.jsonl")) # Known security vendors/orgs that should NOT be SYSTEM KNOWN_ORGS = { "eset", "kaspersky", "mandiant", "fireeye", "crowdstrike", "palo alto", "symantec", "mcafee", "trend micro", "sophos", "fortinet", "cisco talos", "recorded future", "unit 42", "proofpoint", "sentinelone", "microsoft", "google", "facebook", "meta", "amazon", "ibm", "nsa", "cisa", "fbi", "checkpoint", "check point", "avast", "bitdefender", "malwarebytes", "rapid7", "qualys", "tenable", "zscaler", "carbon black", "cylance", "webroot", "f-secure", "nortonlifelock", "trellix" } # Patterns for entity type validation IP_RE = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$') CVE_RE = re.compile(r'^CVE-\d{4}-\d+$', re.I) URL_RE = re.compile(r'^https?://', re.I) HASH_RE = re.compile(r'^[a-f0-9]{32,64}$', re.I) # Known operating systems / platforms KNOWN_SYSTEMS = { "windows", "linux", "macos", "mac os", "android", "ios", "ubuntu", "debian", "centos", "red hat", "fedora", "freebsd", "solaris", "windows 10", "windows 11", "windows 7", "windows server", "chrome os", "unix" } results = { "offset_errors": [], "duplicate_texts": [], "short_texts": [], "mislabels": [], "overlapping_spans": [], "garbage_text": [], "repetitive_entities": [], "empty_spans": [], "parse_errors": [], "label_distribution": Counter(), "file_stats": {}, "cross_file_dupes": [], } all_texts = {} # text -> [(file, line_num)] entity_counter = Counter() # "LABEL: entity" -> count all_records = [] print("Loading all files...") for fpath in FILES: fname = fpath.name records = [] with open(fpath) as f: for i, line in enumerate(f, 1): line = line.strip() if not line: continue try: rec = json.loads(line) except json.JSONDecodeError as e: results["parse_errors"].append((fname, i, str(e))) continue rec["_file"] = fname rec["_line"] = i records.append(rec) # Track text for duplicate detection txt = rec.get("text", "") key = txt.strip() if key not in all_texts: all_texts[key] = [] all_texts[key].append((fname, i)) results["file_stats"][fname] = len(records) all_records.extend(records) print(f"Loaded {len(all_records)} records from {len(FILES)} files") # === CHECK 1: Offset errors === print("Checking offsets...") offset_err_count = 0 for rec in all_records: text = rec.get("text", "") spans = rec.get("spans", {}) for key, positions in spans.items(): if ": " not in key: continue label, entity_text = key.split(": ", 1) results["label_distribution"][label] += 1 for start, end in positions: actual = text[start:end] if actual != entity_text: offset_err_count += 1 if offset_err_count <= 200: results["offset_errors"].append({ "file": rec["_file"], "line": rec["_line"], "label": label, "expected": entity_text, "actual": actual, "start": start, "end": end, }) # === CHECK 2: Duplicate texts === print("Checking duplicates...") for txt, locs in all_texts.items(): if len(locs) > 1: files_involved = set(f for f, _ in locs) results["duplicate_texts"].append({ "count": len(locs), "files": list(files_involved), "text_preview": txt[:100], "cross_file": len(files_involved) > 1, }) # === CHECK 3: Short texts === print("Checking short texts...") for rec in all_records: txt = rec.get("text", "") if len(txt) < 20: results["short_texts"].append({ "file": rec["_file"], "line": rec["_line"], "text": txt, "length": len(txt), }) # === CHECK 4: Mislabels === print("Checking mislabels...") mislabel_count = 0 for rec in all_records: spans = rec.get("spans", {}) for key in spans: if ": " not in key: continue label, entity = key.split(": ", 1) ent_lower = entity.lower().strip() entity_counter[key] += 1 # IP labeled as non-INDICATOR if IP_RE.match(entity) and label not in ("INDICATOR", "IOC"): mislabel_count += 1 if mislabel_count <= 200: results["mislabels"].append({ "file": rec["_file"], "line": rec["_line"], "entity": entity, "label": label, "reason": f"IP address labeled as {label}, expected INDICATOR", }) # CVE labeled wrong if CVE_RE.match(entity) and label not in ("VULNERABILITY", "CVE"): mislabel_count += 1 if mislabel_count <= 200: results["mislabels"].append({ "file": rec["_file"], "line": rec["_line"], "entity": entity, "label": label, "reason": f"CVE ID labeled as {label}, expected VULNERABILITY", }) # URL/hash as non-indicator if (URL_RE.match(entity) or HASH_RE.match(entity)) and label not in ("INDICATOR", "IOC"): mislabel_count += 1 if mislabel_count <= 200: results["mislabels"].append({ "file": rec["_file"], "line": rec["_line"], "entity": entity, "label": label, "reason": f"URL/hash labeled as {label}, expected INDICATOR", }) # Known org labeled as SYSTEM if label == "SYSTEM" and ent_lower in KNOWN_ORGS: mislabel_count += 1 if mislabel_count <= 200: results["mislabels"].append({ "file": rec["_file"], "line": rec["_line"], "entity": entity, "label": label, "reason": f"Security vendor/org '{entity}' labeled as SYSTEM, expected ORGANIZATION", }) # Known system labeled as ORGANIZATION if label == "ORGANIZATION" and ent_lower in KNOWN_SYSTEMS: mislabel_count += 1 if mislabel_count <= 200: results["mislabels"].append({ "file": rec["_file"], "line": rec["_line"], "entity": entity, "label": label, "reason": f"OS/platform '{entity}' labeled as ORGANIZATION, expected SYSTEM", }) # === CHECK 5: Overlapping spans === print("Checking overlapping spans...") overlap_count = 0 for rec in all_records: spans = rec.get("spans", {}) all_intervals = [] for key, positions in spans.items(): for start, end in positions: all_intervals.append((start, end, key)) all_intervals.sort() for i in range(len(all_intervals) - 1): s1, e1, k1 = all_intervals[i] s2, e2, k2 = all_intervals[i + 1] if s2 < e1: # overlap overlap_count += 1 if overlap_count <= 100: results["overlapping_spans"].append({ "file": rec["_file"], "line": rec["_line"], "span1": f"{k1} [{s1}:{e1}]", "span2": f"{k2} [{s2}:{e2}]", }) # === CHECK 6: Garbage text === print("Checking garbage text...") HTML_RE = re.compile(r'<[a-z/][^>]*>', re.I) MARKDOWN_RE = re.compile(r'(?:^|\n)#{1,6}\s|^\s*[\*\-]\s|\[.*?\]\(.*?\)|\*\*.*?\*\*') for rec in all_records: txt = rec.get("text", "") issues = [] if HTML_RE.search(txt): issues.append("HTML tags") # Check for high non-ASCII ratio (encoding issues) non_ascii = sum(1 for c in txt if ord(c) > 127 and c not in '–—''""•…©®™°×÷±€£¥¢') if non_ascii > len(txt) * 0.1 and len(txt) > 50: issues.append(f"high non-ASCII ratio ({non_ascii}/{len(txt)})") if issues: results["garbage_text"].append({ "file": rec["_file"], "line": rec["_line"], "issues": issues, "text_preview": txt[:120], }) # === CHECK 7: Repetitive entities === print("Checking repetitive entities...") for key, count in entity_counter.most_common(100): if count >= 50: results["repetitive_entities"].append({"entity": key, "count": count}) # === CHECK 8: Empty spans === print("Checking empty spans...") for rec in all_records: spans = rec.get("spans", {}) if not spans: results["empty_spans"].append({ "file": rec["_file"], "line": rec["_line"], "text_preview": rec.get("text", "")[:80], }) # === REPORT === print("\n" + "=" * 70) print("DATA QUALITY AUDIT REPORT") print("=" * 70) print(f"\n## Files Audited: {len(FILES)}") for fname, count in results["file_stats"].items(): print(f" {fname}: {count} records") print(f" TOTAL: {len(all_records)} records") print(f"\n## Parse Errors: {len(results['parse_errors'])}") for pe in results["parse_errors"][:10]: print(f" {pe}") print(f"\n## 1. Offset Errors: {offset_err_count}") for e in results["offset_errors"][:30]: print(f" [{e['file']}:{e['line']}] {e['label']}: expected '{e['expected']}' got '{e['actual']}' at [{e['start']}:{e['end']}]") dupe_within = sum(1 for d in results["duplicate_texts"] if not d["cross_file"]) dupe_cross = sum(1 for d in results["duplicate_texts"] if d["cross_file"]) dupe_total_records = sum(d["count"] for d in results["duplicate_texts"]) print(f"\n## 2. Duplicate Texts: {len(results['duplicate_texts'])} unique texts duplicated ({dupe_total_records} total records)") print(f" Within-file: {dupe_within}, Cross-file: {dupe_cross}") for d in sorted(results["duplicate_texts"], key=lambda x: -x["count"])[:20]: print(f" [{d['count']}x] {'CROSS-FILE ' if d['cross_file'] else ''}{d['files']}: {d['text_preview'][:80]}") print(f"\n## 3. Short Texts (<20 chars): {len(results['short_texts'])}") for s in results["short_texts"][:20]: print(f" [{s['file']}:{s['line']}] ({s['length']} chars) '{s['text']}'") print(f"\n## 4. Mislabels: {mislabel_count}") # Group by reason pattern reason_groups = Counter() for m in results["mislabels"]: reason_groups[m["reason"].split(",")[0][:60]] += 1 for reason, count in reason_groups.most_common(20): print(f" [{count}x] {reason}") print(" Sample issues:") for m in results["mislabels"][:20]: print(f" [{m['file']}:{m['line']}] {m['entity']} -> {m['label']}: {m['reason']}") print(f"\n## 5. Overlapping Spans: {overlap_count}") for o in results["overlapping_spans"][:20]: print(f" [{o['file']}:{o['line']}] {o['span1']} <-> {o['span2']}") print(f"\n## 6. Garbage Text: {len(results['garbage_text'])}") issue_types = Counter() for g in results["garbage_text"]: for iss in g["issues"]: issue_types[iss.split("(")[0].strip()] += 1 for it, count in issue_types.most_common(): print(f" {it}: {count} records") for g in results["garbage_text"][:15]: print(f" [{g['file']}:{g['line']}] {g['issues']}: {g['text_preview'][:80]}") print(f"\n## 7. Repetitive Entities (50+ occurrences): {len(results['repetitive_entities'])}") for r in results["repetitive_entities"][:30]: print(f" {r['entity']}: {r['count']}") print(f"\n## 8. Empty Spans: {len(results['empty_spans'])}") empty_by_file = Counter(e["file"] for e in results["empty_spans"]) for fname, count in empty_by_file.most_common(): print(f" {fname}: {count}") for e in results["empty_spans"][:10]: print(f" [{e['file']}:{e['line']}] {e['text_preview']}") print(f"\n## Label Distribution:") for label, count in results["label_distribution"].most_common(): print(f" {label}: {count}") # Save detailed JSON with open("/home/ubuntu/alkyline/scripts/audit_results.json", "w") as f: json.dump({k: v for k, v in results.items() if k != "file_stats"}, f, indent=2, default=str) print("\nDetailed results saved to scripts/audit_results.json")