File size: 5,192 Bytes
3dac39e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
"""Audit IOC coverage in training JSONL files."""
import json, re, sys
from collections import defaultdict

# IOC patterns
PATTERNS = {
    "IPv4": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
    "MD5": re.compile(r'\b[a-fA-F0-9]{32}\b'),
    "SHA1": re.compile(r'\b[a-fA-F0-9]{40}\b'),
    "SHA256": re.compile(r'\b[a-fA-F0-9]{64}\b'),
    "URL": re.compile(r'https?://[^\s\)\]\"\'<>,;]+'),
    "Domain": re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|ru|cn|info|biz|xyz|top|cc|tk|pw|me|co|uk|de|fr|jp|br|in|us|gov|edu|mil)\b', re.IGNORECASE),
}

def get_covered_chars(spans):
    """Return set of character positions covered by any span."""
    covered = set()
    for label, offsets in spans.items():
        for start, end in offsets:
            covered.update(range(start, end))
    return covered

def audit_file(path):
    stats = {"total_examples": 0, "examples_with_iocs": 0}
    ioc_counts = defaultdict(lambda: {"found": 0, "labeled": 0, "unlabeled": 0})
    unlabeled_examples = []  # collect samples

    with open(path) as f:
        for line in f:
            row = json.loads(line)
            text = row["text"]
            spans = row.get("spans", {})
            covered = get_covered_chars(spans)
            stats["total_examples"] += 1

            has_ioc = False
            for ioc_type, pat in PATTERNS.items():
                for m in pat.finditer(text):
                    # Skip MD5 matches that are actually SHA1/SHA256 substrings
                    if ioc_type == "MD5":
                        # check if this is part of a longer hex string
                        s, e = m.start(), m.end()
                        extended = text[max(0,s-1):e+1]
                        if re.match(r'^[a-fA-F0-9]', extended) and s > 0 and text[s-1:s].isalnum():
                            continue
                        if re.match(r'.*[a-fA-F0-9]$', extended) and e < len(text) and text[e:e+1].isalnum():
                            continue
                    if ioc_type == "SHA1":
                        s, e = m.start(), m.end()
                        if s > 0 and re.match(r'[a-fA-F0-9]', text[s-1:s]):
                            continue
                        if e < len(text) and re.match(r'[a-fA-F0-9]', text[e:e+1]):
                            continue

                    has_ioc = True
                    ioc_counts[ioc_type]["found"] += 1

                    # Check overlap with any span
                    match_chars = set(range(m.start(), m.end()))
                    if match_chars & covered:
                        ioc_counts[ioc_type]["labeled"] += 1
                    else:
                        ioc_counts[ioc_type]["unlabeled"] += 1
                        if len(unlabeled_examples) < 30:
                            unlabeled_examples.append({
                                "type": ioc_type,
                                "match": m.group(),
                                "id": row.get("info", {}).get("id", "?"),
                                "context": text[max(0,m.start()-30):m.end()+30],
                            })

            if has_ioc:
                stats["examples_with_iocs"] += 1

    return stats, dict(ioc_counts), unlabeled_examples

def main():
    files = {
        "ORIGINAL": "/home/ubuntu/alkyline/data/processed/enriched_5class_train.jsonl",
        "CLEANED": "/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl",
    }

    results = {}
    for label, path in files.items():
        try:
            stats, ioc_counts, examples = audit_file(path)
            results[label] = (stats, ioc_counts, examples)
        except FileNotFoundError:
            print(f"SKIP {label}: {path} not found")
            continue

    for label, (stats, ioc_counts, examples) in results.items():
        print(f"\n{'='*60}")
        print(f"  {label}")
        print(f"{'='*60}")
        print(f"Total examples: {stats['total_examples']}")
        print(f"Examples with IOCs: {stats['examples_with_iocs']}")

        total_found = sum(v["found"] for v in ioc_counts.values())
        total_labeled = sum(v["labeled"] for v in ioc_counts.values())
        total_unlabeled = sum(v["unlabeled"] for v in ioc_counts.values())

        print(f"\nIOC Type        | Found | Labeled | Unlabeled | Coverage%")
        print(f"----------------|-------|---------|-----------|----------")
        for ioc_type in ["IPv4", "MD5", "SHA1", "SHA256", "URL", "Domain"]:
            c = ioc_counts.get(ioc_type, {"found":0, "labeled":0, "unlabeled":0})
            pct = f"{100*c['labeled']/c['found']:.1f}" if c['found'] else "N/A"
            print(f"{ioc_type:15s} | {c['found']:5d} | {c['labeled']:7d} | {c['unlabeled']:9d} | {pct}%")
        print(f"{'TOTAL':15s} | {total_found:5d} | {total_labeled:7d} | {total_unlabeled:9d} | {100*total_labeled/total_found:.1f}%")

        if examples:
            print(f"\nSample unlabeled IOCs (up to 15):")
            for ex in examples[:15]:
                print(f"  [{ex['type']}] {ex['match'][:60]}")
                print(f"    id={ex['id']}, context: ...{ex['context'][:80]}...")

if __name__ == "__main__":
    main()