arcspan / scripts /audit_ioc_coverage.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Audit IOC coverage in training JSONL files."""
import json, re, sys
from collections import defaultdict
# IOC patterns
PATTERNS = {
"IPv4": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
"MD5": re.compile(r'\b[a-fA-F0-9]{32}\b'),
"SHA1": re.compile(r'\b[a-fA-F0-9]{40}\b'),
"SHA256": re.compile(r'\b[a-fA-F0-9]{64}\b'),
"URL": re.compile(r'https?://[^\s\)\]\"\'<>,;]+'),
"Domain": re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|ru|cn|info|biz|xyz|top|cc|tk|pw|me|co|uk|de|fr|jp|br|in|us|gov|edu|mil)\b', re.IGNORECASE),
}
def get_covered_chars(spans):
"""Return set of character positions covered by any span."""
covered = set()
for label, offsets in spans.items():
for start, end in offsets:
covered.update(range(start, end))
return covered
def audit_file(path):
stats = {"total_examples": 0, "examples_with_iocs": 0}
ioc_counts = defaultdict(lambda: {"found": 0, "labeled": 0, "unlabeled": 0})
unlabeled_examples = [] # collect samples
with open(path) as f:
for line in f:
row = json.loads(line)
text = row["text"]
spans = row.get("spans", {})
covered = get_covered_chars(spans)
stats["total_examples"] += 1
has_ioc = False
for ioc_type, pat in PATTERNS.items():
for m in pat.finditer(text):
# Skip MD5 matches that are actually SHA1/SHA256 substrings
if ioc_type == "MD5":
# check if this is part of a longer hex string
s, e = m.start(), m.end()
extended = text[max(0,s-1):e+1]
if re.match(r'^[a-fA-F0-9]', extended) and s > 0 and text[s-1:s].isalnum():
continue
if re.match(r'.*[a-fA-F0-9]$', extended) and e < len(text) and text[e:e+1].isalnum():
continue
if ioc_type == "SHA1":
s, e = m.start(), m.end()
if s > 0 and re.match(r'[a-fA-F0-9]', text[s-1:s]):
continue
if e < len(text) and re.match(r'[a-fA-F0-9]', text[e:e+1]):
continue
has_ioc = True
ioc_counts[ioc_type]["found"] += 1
# Check overlap with any span
match_chars = set(range(m.start(), m.end()))
if match_chars & covered:
ioc_counts[ioc_type]["labeled"] += 1
else:
ioc_counts[ioc_type]["unlabeled"] += 1
if len(unlabeled_examples) < 30:
unlabeled_examples.append({
"type": ioc_type,
"match": m.group(),
"id": row.get("info", {}).get("id", "?"),
"context": text[max(0,m.start()-30):m.end()+30],
})
if has_ioc:
stats["examples_with_iocs"] += 1
return stats, dict(ioc_counts), unlabeled_examples
def main():
files = {
"ORIGINAL": "/home/ubuntu/alkyline/data/processed/enriched_5class_train.jsonl",
"CLEANED": "/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl",
}
results = {}
for label, path in files.items():
try:
stats, ioc_counts, examples = audit_file(path)
results[label] = (stats, ioc_counts, examples)
except FileNotFoundError:
print(f"SKIP {label}: {path} not found")
continue
for label, (stats, ioc_counts, examples) in results.items():
print(f"\n{'='*60}")
print(f" {label}")
print(f"{'='*60}")
print(f"Total examples: {stats['total_examples']}")
print(f"Examples with IOCs: {stats['examples_with_iocs']}")
total_found = sum(v["found"] for v in ioc_counts.values())
total_labeled = sum(v["labeled"] for v in ioc_counts.values())
total_unlabeled = sum(v["unlabeled"] for v in ioc_counts.values())
print(f"\nIOC Type | Found | Labeled | Unlabeled | Coverage%")
print(f"----------------|-------|---------|-----------|----------")
for ioc_type in ["IPv4", "MD5", "SHA1", "SHA256", "URL", "Domain"]:
c = ioc_counts.get(ioc_type, {"found":0, "labeled":0, "unlabeled":0})
pct = f"{100*c['labeled']/c['found']:.1f}" if c['found'] else "N/A"
print(f"{ioc_type:15s} | {c['found']:5d} | {c['labeled']:7d} | {c['unlabeled']:9d} | {pct}%")
print(f"{'TOTAL':15s} | {total_found:5d} | {total_labeled:7d} | {total_unlabeled:9d} | {100*total_labeled/total_found:.1f}%")
if examples:
print(f"\nSample unlabeled IOCs (up to 15):")
for ex in examples[:15]:
print(f" [{ex['type']}] {ex['match'][:60]}")
print(f" id={ex['id']}, context: ...{ex['context'][:80]}...")
if __name__ == "__main__":
main()