| |
| """Audit IOC coverage in training JSONL files.""" |
| import json, re, sys |
| from collections import defaultdict |
|
|
| |
| PATTERNS = { |
| "IPv4": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), |
| "MD5": re.compile(r'\b[a-fA-F0-9]{32}\b'), |
| "SHA1": re.compile(r'\b[a-fA-F0-9]{40}\b'), |
| "SHA256": re.compile(r'\b[a-fA-F0-9]{64}\b'), |
| "URL": re.compile(r'https?://[^\s\)\]\"\'<>,;]+'), |
| "Domain": re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|ru|cn|info|biz|xyz|top|cc|tk|pw|me|co|uk|de|fr|jp|br|in|us|gov|edu|mil)\b', re.IGNORECASE), |
| } |
|
|
| def get_covered_chars(spans): |
| """Return set of character positions covered by any span.""" |
| covered = set() |
| for label, offsets in spans.items(): |
| for start, end in offsets: |
| covered.update(range(start, end)) |
| return covered |
|
|
| def audit_file(path): |
| stats = {"total_examples": 0, "examples_with_iocs": 0} |
| ioc_counts = defaultdict(lambda: {"found": 0, "labeled": 0, "unlabeled": 0}) |
| unlabeled_examples = [] |
|
|
| with open(path) as f: |
| for line in f: |
| row = json.loads(line) |
| text = row["text"] |
| spans = row.get("spans", {}) |
| covered = get_covered_chars(spans) |
| stats["total_examples"] += 1 |
|
|
| has_ioc = False |
| for ioc_type, pat in PATTERNS.items(): |
| for m in pat.finditer(text): |
| |
| if ioc_type == "MD5": |
| |
| s, e = m.start(), m.end() |
| extended = text[max(0,s-1):e+1] |
| if re.match(r'^[a-fA-F0-9]', extended) and s > 0 and text[s-1:s].isalnum(): |
| continue |
| if re.match(r'.*[a-fA-F0-9]$', extended) and e < len(text) and text[e:e+1].isalnum(): |
| continue |
| if ioc_type == "SHA1": |
| s, e = m.start(), m.end() |
| if s > 0 and re.match(r'[a-fA-F0-9]', text[s-1:s]): |
| continue |
| if e < len(text) and re.match(r'[a-fA-F0-9]', text[e:e+1]): |
| continue |
|
|
| has_ioc = True |
| ioc_counts[ioc_type]["found"] += 1 |
|
|
| |
| match_chars = set(range(m.start(), m.end())) |
| if match_chars & covered: |
| ioc_counts[ioc_type]["labeled"] += 1 |
| else: |
| ioc_counts[ioc_type]["unlabeled"] += 1 |
| if len(unlabeled_examples) < 30: |
| unlabeled_examples.append({ |
| "type": ioc_type, |
| "match": m.group(), |
| "id": row.get("info", {}).get("id", "?"), |
| "context": text[max(0,m.start()-30):m.end()+30], |
| }) |
|
|
| if has_ioc: |
| stats["examples_with_iocs"] += 1 |
|
|
| return stats, dict(ioc_counts), unlabeled_examples |
|
|
| def main(): |
| files = { |
| "ORIGINAL": "/home/ubuntu/alkyline/data/processed/enriched_5class_train.jsonl", |
| "CLEANED": "/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl", |
| } |
|
|
| results = {} |
| for label, path in files.items(): |
| try: |
| stats, ioc_counts, examples = audit_file(path) |
| results[label] = (stats, ioc_counts, examples) |
| except FileNotFoundError: |
| print(f"SKIP {label}: {path} not found") |
| continue |
|
|
| for label, (stats, ioc_counts, examples) in results.items(): |
| print(f"\n{'='*60}") |
| print(f" {label}") |
| print(f"{'='*60}") |
| print(f"Total examples: {stats['total_examples']}") |
| print(f"Examples with IOCs: {stats['examples_with_iocs']}") |
|
|
| total_found = sum(v["found"] for v in ioc_counts.values()) |
| total_labeled = sum(v["labeled"] for v in ioc_counts.values()) |
| total_unlabeled = sum(v["unlabeled"] for v in ioc_counts.values()) |
|
|
| print(f"\nIOC Type | Found | Labeled | Unlabeled | Coverage%") |
| print(f"----------------|-------|---------|-----------|----------") |
| for ioc_type in ["IPv4", "MD5", "SHA1", "SHA256", "URL", "Domain"]: |
| c = ioc_counts.get(ioc_type, {"found":0, "labeled":0, "unlabeled":0}) |
| pct = f"{100*c['labeled']/c['found']:.1f}" if c['found'] else "N/A" |
| print(f"{ioc_type:15s} | {c['found']:5d} | {c['labeled']:7d} | {c['unlabeled']:9d} | {pct}%") |
| print(f"{'TOTAL':15s} | {total_found:5d} | {total_labeled:7d} | {total_unlabeled:9d} | {100*total_labeled/total_found:.1f}%") |
|
|
| if examples: |
| print(f"\nSample unlabeled IOCs (up to 15):") |
| for ex in examples[:15]: |
| print(f" [{ex['type']}] {ex['match'][:60]}") |
| print(f" id={ex['id']}, context: ...{ex['context'][:80]}...") |
|
|
| if __name__ == "__main__": |
| main() |
|
|