| |
| """Comprehensive data quality audit for LLM-annotated cybersecurity NER data.""" |
|
|
| import json, os, re, sys |
| from collections import Counter, defaultdict |
| from pathlib import Path |
|
|
| DATA_DIR = Path("/home/ubuntu/alkyline/data/processed") |
| FILES = sorted(DATA_DIR.glob("llm_annotated_*.jsonl")) + sorted(DATA_DIR.glob("llm_generated_*.jsonl")) |
|
|
| |
| KNOWN_ORGS = { |
| "eset", "kaspersky", "mandiant", "fireeye", "crowdstrike", "palo alto", |
| "symantec", "mcafee", "trend micro", "sophos", "fortinet", "cisco talos", |
| "recorded future", "unit 42", "proofpoint", "sentinelone", "microsoft", |
| "google", "facebook", "meta", "amazon", "ibm", "nsa", "cisa", "fbi", |
| "checkpoint", "check point", "avast", "bitdefender", "malwarebytes", |
| "rapid7", "qualys", "tenable", "zscaler", "carbon black", "cylance", |
| "webroot", "f-secure", "nortonlifelock", "trellix" |
| } |
|
|
| |
| IP_RE = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$') |
| CVE_RE = re.compile(r'^CVE-\d{4}-\d+$', re.I) |
| URL_RE = re.compile(r'^https?://', re.I) |
| HASH_RE = re.compile(r'^[a-f0-9]{32,64}$', re.I) |
|
|
| |
| KNOWN_SYSTEMS = { |
| "windows", "linux", "macos", "mac os", "android", "ios", "ubuntu", |
| "debian", "centos", "red hat", "fedora", "freebsd", "solaris", |
| "windows 10", "windows 11", "windows 7", "windows server", |
| "chrome os", "unix" |
| } |
|
|
| results = { |
| "offset_errors": [], |
| "duplicate_texts": [], |
| "short_texts": [], |
| "mislabels": [], |
| "overlapping_spans": [], |
| "garbage_text": [], |
| "repetitive_entities": [], |
| "empty_spans": [], |
| "parse_errors": [], |
| "label_distribution": Counter(), |
| "file_stats": {}, |
| "cross_file_dupes": [], |
| } |
|
|
| all_texts = {} |
| entity_counter = Counter() |
| all_records = [] |
|
|
| print("Loading all files...") |
| for fpath in FILES: |
| fname = fpath.name |
| records = [] |
| with open(fpath) as f: |
| for i, line in enumerate(f, 1): |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| rec = json.loads(line) |
| except json.JSONDecodeError as e: |
| results["parse_errors"].append((fname, i, str(e))) |
| continue |
| rec["_file"] = fname |
| rec["_line"] = i |
| records.append(rec) |
|
|
| |
| txt = rec.get("text", "") |
| key = txt.strip() |
| if key not in all_texts: |
| all_texts[key] = [] |
| all_texts[key].append((fname, i)) |
|
|
| results["file_stats"][fname] = len(records) |
| all_records.extend(records) |
|
|
| print(f"Loaded {len(all_records)} records from {len(FILES)} files") |
|
|
| |
| print("Checking offsets...") |
| offset_err_count = 0 |
| for rec in all_records: |
| text = rec.get("text", "") |
| spans = rec.get("spans", {}) |
| for key, positions in spans.items(): |
| if ": " not in key: |
| continue |
| label, entity_text = key.split(": ", 1) |
| results["label_distribution"][label] += 1 |
| for start, end in positions: |
| actual = text[start:end] |
| if actual != entity_text: |
| offset_err_count += 1 |
| if offset_err_count <= 200: |
| results["offset_errors"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "label": label, "expected": entity_text, |
| "actual": actual, "start": start, "end": end, |
| }) |
|
|
| |
| print("Checking duplicates...") |
| for txt, locs in all_texts.items(): |
| if len(locs) > 1: |
| files_involved = set(f for f, _ in locs) |
| results["duplicate_texts"].append({ |
| "count": len(locs), |
| "files": list(files_involved), |
| "text_preview": txt[:100], |
| "cross_file": len(files_involved) > 1, |
| }) |
|
|
| |
| print("Checking short texts...") |
| for rec in all_records: |
| txt = rec.get("text", "") |
| if len(txt) < 20: |
| results["short_texts"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "text": txt, "length": len(txt), |
| }) |
|
|
| |
| print("Checking mislabels...") |
| mislabel_count = 0 |
| for rec in all_records: |
| spans = rec.get("spans", {}) |
| for key in spans: |
| if ": " not in key: |
| continue |
| label, entity = key.split(": ", 1) |
| ent_lower = entity.lower().strip() |
|
|
| entity_counter[key] += 1 |
|
|
| |
| if IP_RE.match(entity) and label not in ("INDICATOR", "IOC"): |
| mislabel_count += 1 |
| if mislabel_count <= 200: |
| results["mislabels"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "entity": entity, "label": label, |
| "reason": f"IP address labeled as {label}, expected INDICATOR", |
| }) |
|
|
| |
| if CVE_RE.match(entity) and label not in ("VULNERABILITY", "CVE"): |
| mislabel_count += 1 |
| if mislabel_count <= 200: |
| results["mislabels"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "entity": entity, "label": label, |
| "reason": f"CVE ID labeled as {label}, expected VULNERABILITY", |
| }) |
|
|
| |
| if (URL_RE.match(entity) or HASH_RE.match(entity)) and label not in ("INDICATOR", "IOC"): |
| mislabel_count += 1 |
| if mislabel_count <= 200: |
| results["mislabels"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "entity": entity, "label": label, |
| "reason": f"URL/hash labeled as {label}, expected INDICATOR", |
| }) |
|
|
| |
| if label == "SYSTEM" and ent_lower in KNOWN_ORGS: |
| mislabel_count += 1 |
| if mislabel_count <= 200: |
| results["mislabels"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "entity": entity, "label": label, |
| "reason": f"Security vendor/org '{entity}' labeled as SYSTEM, expected ORGANIZATION", |
| }) |
|
|
| |
| if label == "ORGANIZATION" and ent_lower in KNOWN_SYSTEMS: |
| mislabel_count += 1 |
| if mislabel_count <= 200: |
| results["mislabels"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "entity": entity, "label": label, |
| "reason": f"OS/platform '{entity}' labeled as ORGANIZATION, expected SYSTEM", |
| }) |
|
|
| |
| print("Checking overlapping spans...") |
| overlap_count = 0 |
| for rec in all_records: |
| spans = rec.get("spans", {}) |
| all_intervals = [] |
| for key, positions in spans.items(): |
| for start, end in positions: |
| all_intervals.append((start, end, key)) |
| all_intervals.sort() |
| for i in range(len(all_intervals) - 1): |
| s1, e1, k1 = all_intervals[i] |
| s2, e2, k2 = all_intervals[i + 1] |
| if s2 < e1: |
| overlap_count += 1 |
| if overlap_count <= 100: |
| results["overlapping_spans"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "span1": f"{k1} [{s1}:{e1}]", |
| "span2": f"{k2} [{s2}:{e2}]", |
| }) |
|
|
| |
| print("Checking garbage text...") |
| HTML_RE = re.compile(r'<[a-z/][^>]*>', re.I) |
| MARKDOWN_RE = re.compile(r'(?:^|\n)#{1,6}\s|^\s*[\*\-]\s|\[.*?\]\(.*?\)|\*\*.*?\*\*') |
| for rec in all_records: |
| txt = rec.get("text", "") |
| issues = [] |
| if HTML_RE.search(txt): |
| issues.append("HTML tags") |
| |
| non_ascii = sum(1 for c in txt if ord(c) > 127 and c not in '–—''""•…©®™°×÷±€£¥¢') |
| if non_ascii > len(txt) * 0.1 and len(txt) > 50: |
| issues.append(f"high non-ASCII ratio ({non_ascii}/{len(txt)})") |
| if issues: |
| results["garbage_text"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "issues": issues, |
| "text_preview": txt[:120], |
| }) |
|
|
| |
| print("Checking repetitive entities...") |
| for key, count in entity_counter.most_common(100): |
| if count >= 50: |
| results["repetitive_entities"].append({"entity": key, "count": count}) |
|
|
| |
| print("Checking empty spans...") |
| for rec in all_records: |
| spans = rec.get("spans", {}) |
| if not spans: |
| results["empty_spans"].append({ |
| "file": rec["_file"], "line": rec["_line"], |
| "text_preview": rec.get("text", "")[:80], |
| }) |
|
|
| |
| print("\n" + "=" * 70) |
| print("DATA QUALITY AUDIT REPORT") |
| print("=" * 70) |
|
|
| print(f"\n## Files Audited: {len(FILES)}") |
| for fname, count in results["file_stats"].items(): |
| print(f" {fname}: {count} records") |
| print(f" TOTAL: {len(all_records)} records") |
|
|
| print(f"\n## Parse Errors: {len(results['parse_errors'])}") |
| for pe in results["parse_errors"][:10]: |
| print(f" {pe}") |
|
|
| print(f"\n## 1. Offset Errors: {offset_err_count}") |
| for e in results["offset_errors"][:30]: |
| print(f" [{e['file']}:{e['line']}] {e['label']}: expected '{e['expected']}' got '{e['actual']}' at [{e['start']}:{e['end']}]") |
|
|
| dupe_within = sum(1 for d in results["duplicate_texts"] if not d["cross_file"]) |
| dupe_cross = sum(1 for d in results["duplicate_texts"] if d["cross_file"]) |
| dupe_total_records = sum(d["count"] for d in results["duplicate_texts"]) |
| print(f"\n## 2. Duplicate Texts: {len(results['duplicate_texts'])} unique texts duplicated ({dupe_total_records} total records)") |
| print(f" Within-file: {dupe_within}, Cross-file: {dupe_cross}") |
| for d in sorted(results["duplicate_texts"], key=lambda x: -x["count"])[:20]: |
| print(f" [{d['count']}x] {'CROSS-FILE ' if d['cross_file'] else ''}{d['files']}: {d['text_preview'][:80]}") |
|
|
| print(f"\n## 3. Short Texts (<20 chars): {len(results['short_texts'])}") |
| for s in results["short_texts"][:20]: |
| print(f" [{s['file']}:{s['line']}] ({s['length']} chars) '{s['text']}'") |
|
|
| print(f"\n## 4. Mislabels: {mislabel_count}") |
| |
| reason_groups = Counter() |
| for m in results["mislabels"]: |
| reason_groups[m["reason"].split(",")[0][:60]] += 1 |
| for reason, count in reason_groups.most_common(20): |
| print(f" [{count}x] {reason}") |
| print(" Sample issues:") |
| for m in results["mislabels"][:20]: |
| print(f" [{m['file']}:{m['line']}] {m['entity']} -> {m['label']}: {m['reason']}") |
|
|
| print(f"\n## 5. Overlapping Spans: {overlap_count}") |
| for o in results["overlapping_spans"][:20]: |
| print(f" [{o['file']}:{o['line']}] {o['span1']} <-> {o['span2']}") |
|
|
| print(f"\n## 6. Garbage Text: {len(results['garbage_text'])}") |
| issue_types = Counter() |
| for g in results["garbage_text"]: |
| for iss in g["issues"]: |
| issue_types[iss.split("(")[0].strip()] += 1 |
| for it, count in issue_types.most_common(): |
| print(f" {it}: {count} records") |
| for g in results["garbage_text"][:15]: |
| print(f" [{g['file']}:{g['line']}] {g['issues']}: {g['text_preview'][:80]}") |
|
|
| print(f"\n## 7. Repetitive Entities (50+ occurrences): {len(results['repetitive_entities'])}") |
| for r in results["repetitive_entities"][:30]: |
| print(f" {r['entity']}: {r['count']}") |
|
|
| print(f"\n## 8. Empty Spans: {len(results['empty_spans'])}") |
| empty_by_file = Counter(e["file"] for e in results["empty_spans"]) |
| for fname, count in empty_by_file.most_common(): |
| print(f" {fname}: {count}") |
| for e in results["empty_spans"][:10]: |
| print(f" [{e['file']}:{e['line']}] {e['text_preview']}") |
|
|
| print(f"\n## Label Distribution:") |
| for label, count in results["label_distribution"].most_common(): |
| print(f" {label}: {count}") |
|
|
| |
| with open("/home/ubuntu/alkyline/scripts/audit_results.json", "w") as f: |
| json.dump({k: v for k, v in results.items() if k != "file_stats"}, f, indent=2, default=str) |
| print("\nDetailed results saved to scripts/audit_results.json") |
|
|