File size: 12,393 Bytes
3dac39e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 | #!/usr/bin/env python3
"""Comprehensive data quality audit for LLM-annotated cybersecurity NER data."""
import json, os, re, sys
from collections import Counter, defaultdict
from pathlib import Path
DATA_DIR = Path("/home/ubuntu/alkyline/data/processed")
FILES = sorted(DATA_DIR.glob("llm_annotated_*.jsonl")) + sorted(DATA_DIR.glob("llm_generated_*.jsonl"))
# Known security vendors/orgs that should NOT be SYSTEM
KNOWN_ORGS = {
"eset", "kaspersky", "mandiant", "fireeye", "crowdstrike", "palo alto",
"symantec", "mcafee", "trend micro", "sophos", "fortinet", "cisco talos",
"recorded future", "unit 42", "proofpoint", "sentinelone", "microsoft",
"google", "facebook", "meta", "amazon", "ibm", "nsa", "cisa", "fbi",
"checkpoint", "check point", "avast", "bitdefender", "malwarebytes",
"rapid7", "qualys", "tenable", "zscaler", "carbon black", "cylance",
"webroot", "f-secure", "nortonlifelock", "trellix"
}
# Patterns for entity type validation
IP_RE = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
CVE_RE = re.compile(r'^CVE-\d{4}-\d+$', re.I)
URL_RE = re.compile(r'^https?://', re.I)
HASH_RE = re.compile(r'^[a-f0-9]{32,64}$', re.I)
# Known operating systems / platforms
KNOWN_SYSTEMS = {
"windows", "linux", "macos", "mac os", "android", "ios", "ubuntu",
"debian", "centos", "red hat", "fedora", "freebsd", "solaris",
"windows 10", "windows 11", "windows 7", "windows server",
"chrome os", "unix"
}
results = {
"offset_errors": [],
"duplicate_texts": [],
"short_texts": [],
"mislabels": [],
"overlapping_spans": [],
"garbage_text": [],
"repetitive_entities": [],
"empty_spans": [],
"parse_errors": [],
"label_distribution": Counter(),
"file_stats": {},
"cross_file_dupes": [],
}
all_texts = {} # text -> [(file, line_num)]
entity_counter = Counter() # "LABEL: entity" -> count
all_records = []
print("Loading all files...")
for fpath in FILES:
fname = fpath.name
records = []
with open(fpath) as f:
for i, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError as e:
results["parse_errors"].append((fname, i, str(e)))
continue
rec["_file"] = fname
rec["_line"] = i
records.append(rec)
# Track text for duplicate detection
txt = rec.get("text", "")
key = txt.strip()
if key not in all_texts:
all_texts[key] = []
all_texts[key].append((fname, i))
results["file_stats"][fname] = len(records)
all_records.extend(records)
print(f"Loaded {len(all_records)} records from {len(FILES)} files")
# === CHECK 1: Offset errors ===
print("Checking offsets...")
offset_err_count = 0
for rec in all_records:
text = rec.get("text", "")
spans = rec.get("spans", {})
for key, positions in spans.items():
if ": " not in key:
continue
label, entity_text = key.split(": ", 1)
results["label_distribution"][label] += 1
for start, end in positions:
actual = text[start:end]
if actual != entity_text:
offset_err_count += 1
if offset_err_count <= 200:
results["offset_errors"].append({
"file": rec["_file"], "line": rec["_line"],
"label": label, "expected": entity_text,
"actual": actual, "start": start, "end": end,
})
# === CHECK 2: Duplicate texts ===
print("Checking duplicates...")
for txt, locs in all_texts.items():
if len(locs) > 1:
files_involved = set(f for f, _ in locs)
results["duplicate_texts"].append({
"count": len(locs),
"files": list(files_involved),
"text_preview": txt[:100],
"cross_file": len(files_involved) > 1,
})
# === CHECK 3: Short texts ===
print("Checking short texts...")
for rec in all_records:
txt = rec.get("text", "")
if len(txt) < 20:
results["short_texts"].append({
"file": rec["_file"], "line": rec["_line"],
"text": txt, "length": len(txt),
})
# === CHECK 4: Mislabels ===
print("Checking mislabels...")
mislabel_count = 0
for rec in all_records:
spans = rec.get("spans", {})
for key in spans:
if ": " not in key:
continue
label, entity = key.split(": ", 1)
ent_lower = entity.lower().strip()
entity_counter[key] += 1
# IP labeled as non-INDICATOR
if IP_RE.match(entity) and label not in ("INDICATOR", "IOC"):
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"IP address labeled as {label}, expected INDICATOR",
})
# CVE labeled wrong
if CVE_RE.match(entity) and label not in ("VULNERABILITY", "CVE"):
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"CVE ID labeled as {label}, expected VULNERABILITY",
})
# URL/hash as non-indicator
if (URL_RE.match(entity) or HASH_RE.match(entity)) and label not in ("INDICATOR", "IOC"):
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"URL/hash labeled as {label}, expected INDICATOR",
})
# Known org labeled as SYSTEM
if label == "SYSTEM" and ent_lower in KNOWN_ORGS:
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"Security vendor/org '{entity}' labeled as SYSTEM, expected ORGANIZATION",
})
# Known system labeled as ORGANIZATION
if label == "ORGANIZATION" and ent_lower in KNOWN_SYSTEMS:
mislabel_count += 1
if mislabel_count <= 200:
results["mislabels"].append({
"file": rec["_file"], "line": rec["_line"],
"entity": entity, "label": label,
"reason": f"OS/platform '{entity}' labeled as ORGANIZATION, expected SYSTEM",
})
# === CHECK 5: Overlapping spans ===
print("Checking overlapping spans...")
overlap_count = 0
for rec in all_records:
spans = rec.get("spans", {})
all_intervals = []
for key, positions in spans.items():
for start, end in positions:
all_intervals.append((start, end, key))
all_intervals.sort()
for i in range(len(all_intervals) - 1):
s1, e1, k1 = all_intervals[i]
s2, e2, k2 = all_intervals[i + 1]
if s2 < e1: # overlap
overlap_count += 1
if overlap_count <= 100:
results["overlapping_spans"].append({
"file": rec["_file"], "line": rec["_line"],
"span1": f"{k1} [{s1}:{e1}]",
"span2": f"{k2} [{s2}:{e2}]",
})
# === CHECK 6: Garbage text ===
print("Checking garbage text...")
HTML_RE = re.compile(r'<[a-z/][^>]*>', re.I)
MARKDOWN_RE = re.compile(r'(?:^|\n)#{1,6}\s|^\s*[\*\-]\s|\[.*?\]\(.*?\)|\*\*.*?\*\*')
for rec in all_records:
txt = rec.get("text", "")
issues = []
if HTML_RE.search(txt):
issues.append("HTML tags")
# Check for high non-ASCII ratio (encoding issues)
non_ascii = sum(1 for c in txt if ord(c) > 127 and c not in '–—''""•…©®™°×÷±€£¥¢')
if non_ascii > len(txt) * 0.1 and len(txt) > 50:
issues.append(f"high non-ASCII ratio ({non_ascii}/{len(txt)})")
if issues:
results["garbage_text"].append({
"file": rec["_file"], "line": rec["_line"],
"issues": issues,
"text_preview": txt[:120],
})
# === CHECK 7: Repetitive entities ===
print("Checking repetitive entities...")
for key, count in entity_counter.most_common(100):
if count >= 50:
results["repetitive_entities"].append({"entity": key, "count": count})
# === CHECK 8: Empty spans ===
print("Checking empty spans...")
for rec in all_records:
spans = rec.get("spans", {})
if not spans:
results["empty_spans"].append({
"file": rec["_file"], "line": rec["_line"],
"text_preview": rec.get("text", "")[:80],
})
# === REPORT ===
print("\n" + "=" * 70)
print("DATA QUALITY AUDIT REPORT")
print("=" * 70)
print(f"\n## Files Audited: {len(FILES)}")
for fname, count in results["file_stats"].items():
print(f" {fname}: {count} records")
print(f" TOTAL: {len(all_records)} records")
print(f"\n## Parse Errors: {len(results['parse_errors'])}")
for pe in results["parse_errors"][:10]:
print(f" {pe}")
print(f"\n## 1. Offset Errors: {offset_err_count}")
for e in results["offset_errors"][:30]:
print(f" [{e['file']}:{e['line']}] {e['label']}: expected '{e['expected']}' got '{e['actual']}' at [{e['start']}:{e['end']}]")
dupe_within = sum(1 for d in results["duplicate_texts"] if not d["cross_file"])
dupe_cross = sum(1 for d in results["duplicate_texts"] if d["cross_file"])
dupe_total_records = sum(d["count"] for d in results["duplicate_texts"])
print(f"\n## 2. Duplicate Texts: {len(results['duplicate_texts'])} unique texts duplicated ({dupe_total_records} total records)")
print(f" Within-file: {dupe_within}, Cross-file: {dupe_cross}")
for d in sorted(results["duplicate_texts"], key=lambda x: -x["count"])[:20]:
print(f" [{d['count']}x] {'CROSS-FILE ' if d['cross_file'] else ''}{d['files']}: {d['text_preview'][:80]}")
print(f"\n## 3. Short Texts (<20 chars): {len(results['short_texts'])}")
for s in results["short_texts"][:20]:
print(f" [{s['file']}:{s['line']}] ({s['length']} chars) '{s['text']}'")
print(f"\n## 4. Mislabels: {mislabel_count}")
# Group by reason pattern
reason_groups = Counter()
for m in results["mislabels"]:
reason_groups[m["reason"].split(",")[0][:60]] += 1
for reason, count in reason_groups.most_common(20):
print(f" [{count}x] {reason}")
print(" Sample issues:")
for m in results["mislabels"][:20]:
print(f" [{m['file']}:{m['line']}] {m['entity']} -> {m['label']}: {m['reason']}")
print(f"\n## 5. Overlapping Spans: {overlap_count}")
for o in results["overlapping_spans"][:20]:
print(f" [{o['file']}:{o['line']}] {o['span1']} <-> {o['span2']}")
print(f"\n## 6. Garbage Text: {len(results['garbage_text'])}")
issue_types = Counter()
for g in results["garbage_text"]:
for iss in g["issues"]:
issue_types[iss.split("(")[0].strip()] += 1
for it, count in issue_types.most_common():
print(f" {it}: {count} records")
for g in results["garbage_text"][:15]:
print(f" [{g['file']}:{g['line']}] {g['issues']}: {g['text_preview'][:80]}")
print(f"\n## 7. Repetitive Entities (50+ occurrences): {len(results['repetitive_entities'])}")
for r in results["repetitive_entities"][:30]:
print(f" {r['entity']}: {r['count']}")
print(f"\n## 8. Empty Spans: {len(results['empty_spans'])}")
empty_by_file = Counter(e["file"] for e in results["empty_spans"])
for fname, count in empty_by_file.most_common():
print(f" {fname}: {count}")
for e in results["empty_spans"][:10]:
print(f" [{e['file']}:{e['line']}] {e['text_preview']}")
print(f"\n## Label Distribution:")
for label, count in results["label_distribution"].most_common():
print(f" {label}: {count}")
# Save detailed JSON
with open("/home/ubuntu/alkyline/scripts/audit_results.json", "w") as f:
json.dump({k: v for k, v in results.items() if k != "file_stats"}, f, indent=2, default=str)
print("\nDetailed results saved to scripts/audit_results.json")
|