#!/usr/bin/env python3 """Audit IOC coverage in training JSONL files.""" import json, re, sys from collections import defaultdict # IOC patterns PATTERNS = { "IPv4": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), "MD5": re.compile(r'\b[a-fA-F0-9]{32}\b'), "SHA1": re.compile(r'\b[a-fA-F0-9]{40}\b'), "SHA256": re.compile(r'\b[a-fA-F0-9]{64}\b'), "URL": re.compile(r'https?://[^\s\)\]\"\'<>,;]+'), "Domain": re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|ru|cn|info|biz|xyz|top|cc|tk|pw|me|co|uk|de|fr|jp|br|in|us|gov|edu|mil)\b', re.IGNORECASE), } def get_covered_chars(spans): """Return set of character positions covered by any span.""" covered = set() for label, offsets in spans.items(): for start, end in offsets: covered.update(range(start, end)) return covered def audit_file(path): stats = {"total_examples": 0, "examples_with_iocs": 0} ioc_counts = defaultdict(lambda: {"found": 0, "labeled": 0, "unlabeled": 0}) unlabeled_examples = [] # collect samples with open(path) as f: for line in f: row = json.loads(line) text = row["text"] spans = row.get("spans", {}) covered = get_covered_chars(spans) stats["total_examples"] += 1 has_ioc = False for ioc_type, pat in PATTERNS.items(): for m in pat.finditer(text): # Skip MD5 matches that are actually SHA1/SHA256 substrings if ioc_type == "MD5": # check if this is part of a longer hex string s, e = m.start(), m.end() extended = text[max(0,s-1):e+1] if re.match(r'^[a-fA-F0-9]', extended) and s > 0 and text[s-1:s].isalnum(): continue if re.match(r'.*[a-fA-F0-9]$', extended) and e < len(text) and text[e:e+1].isalnum(): continue if ioc_type == "SHA1": s, e = m.start(), m.end() if s > 0 and re.match(r'[a-fA-F0-9]', text[s-1:s]): continue if e < len(text) and re.match(r'[a-fA-F0-9]', text[e:e+1]): continue has_ioc = True ioc_counts[ioc_type]["found"] += 1 # Check overlap with any span match_chars = set(range(m.start(), m.end())) if match_chars & covered: ioc_counts[ioc_type]["labeled"] += 1 else: ioc_counts[ioc_type]["unlabeled"] += 1 if len(unlabeled_examples) < 30: unlabeled_examples.append({ "type": ioc_type, "match": m.group(), "id": row.get("info", {}).get("id", "?"), "context": text[max(0,m.start()-30):m.end()+30], }) if has_ioc: stats["examples_with_iocs"] += 1 return stats, dict(ioc_counts), unlabeled_examples def main(): files = { "ORIGINAL": "/home/ubuntu/alkyline/data/processed/enriched_5class_train.jsonl", "CLEANED": "/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl", } results = {} for label, path in files.items(): try: stats, ioc_counts, examples = audit_file(path) results[label] = (stats, ioc_counts, examples) except FileNotFoundError: print(f"SKIP {label}: {path} not found") continue for label, (stats, ioc_counts, examples) in results.items(): print(f"\n{'='*60}") print(f" {label}") print(f"{'='*60}") print(f"Total examples: {stats['total_examples']}") print(f"Examples with IOCs: {stats['examples_with_iocs']}") total_found = sum(v["found"] for v in ioc_counts.values()) total_labeled = sum(v["labeled"] for v in ioc_counts.values()) total_unlabeled = sum(v["unlabeled"] for v in ioc_counts.values()) print(f"\nIOC Type | Found | Labeled | Unlabeled | Coverage%") print(f"----------------|-------|---------|-----------|----------") for ioc_type in ["IPv4", "MD5", "SHA1", "SHA256", "URL", "Domain"]: c = ioc_counts.get(ioc_type, {"found":0, "labeled":0, "unlabeled":0}) pct = f"{100*c['labeled']/c['found']:.1f}" if c['found'] else "N/A" print(f"{ioc_type:15s} | {c['found']:5d} | {c['labeled']:7d} | {c['unlabeled']:9d} | {pct}%") print(f"{'TOTAL':15s} | {total_found:5d} | {total_labeled:7d} | {total_unlabeled:9d} | {100*total_labeled/total_found:.1f}%") if examples: print(f"\nSample unlabeled IOCs (up to 15):") for ex in examples[:15]: print(f" [{ex['type']}] {ex['match'][:60]}") print(f" id={ex['id']}, context: ...{ex['context'][:80]}...") if __name__ == "__main__": main()