File size: 4,829 Bytes
3dac39e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | #!/usr/bin/env python3
"""Convert CyberNER_harmonized CSV (BIO-tagged) to span-based JSONL for our 5-class label space.
Input: data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv
Output: data/processed/cyberner_harmonized_5class.jsonl
STIX_Tag -> 5-class mapping:
Malware <- Malware, Malware-Analysis
Indicator <- IPv4-Addr, Domain-Name, URL, Email-Addr, File, Indicator,
Network-Traffic, Observed-Data
System <- Software, Tool, Infrastructure
Organization <- Identity, Threat-Actor, Intrusion-Set, Campaign
Vulnerability <- Vulnerability
Unmapped (dropped):
Attack-Pattern, Course-of-Action, Location -> no good 5-class fit
"""
import csv
import json
import sys
from collections import Counter
from pathlib import Path
STIX_TO_5CLASS = {
# Malware
"Malware": "Malware",
"Malware-Analysis": "Malware",
# Indicator
"IPv4-Addr": "Indicator",
"Domain-Name": "Indicator",
"URL": "Indicator",
"Email-Addr": "Indicator",
"File": "Indicator",
"Indicator": "Indicator",
"Network-Traffic": "Indicator",
"Observed-Data": "Indicator",
# System
"Software": "System",
"Tool": "System",
"Infrastructure": "System",
# Organization
"Identity": "Organization",
"Threat-Actor": "Organization",
"Intrusion-Set": "Organization",
"Campaign": "Organization",
# Vulnerability
"Vulnerability": "Vulnerability",
}
DROPPED_TYPES = {"Attack-Pattern", "Course-of-Action", "Location"}
def parse_csv(path):
"""Yield (sentence_id, word, stix_tag) tuples."""
with open(path, newline="") as f:
reader = csv.reader(f)
next(reader) # skip header
for row in reader:
if len(row) < 5:
continue
word, _tag, sid, stix_tag, _source = row[0], row[1], row[2], row[3], row[4]
yield int(sid), word, stix_tag
def bio_to_spans(words, tags):
"""Convert parallel word/tag lists to (text, spans) in 5-class space."""
text_parts = []
char_offset = 0
offsets = [] # (start, end) for each word
for w in words:
start = char_offset
end = start + len(w)
offsets.append((start, end))
text_parts.append(w)
char_offset = end + 1 # space
text = " ".join(text_parts)
spans = []
i = 0
while i < len(tags):
tag = tags[i]
if tag.startswith("B-"):
stix_type = tag[2:]
label = STIX_TO_5CLASS.get(stix_type)
if label is None:
i += 1
continue
span_start = offsets[i][0]
span_end = offsets[i][1]
j = i + 1
while j < len(tags) and tags[j] == f"I-{stix_type}":
span_end = offsets[j][1]
j += 1
spans.append({"start": span_start, "end": span_end, "label": label})
i = j
else:
i += 1
return text, spans
def main():
base = Path(__file__).resolve().parent.parent
csv_path = base / "data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv"
out_path = base / "data/processed/cyberner_harmonized_5class.jsonl"
# Group by sentence
sentences = {}
for sid, word, stix_tag in parse_csv(csv_path):
sentences.setdefault(sid, ([], []))
sentences[sid][0].append(word)
sentences[sid][1].append(stix_tag)
entity_counts = Counter()
dropped_counts = Counter()
total_spans = 0
examples_with_spans = 0
with open(out_path, "w") as f:
for sid in sorted(sentences):
words, tags = sentences[sid]
text, spans = bio_to_spans(words, tags)
if not text.strip():
continue
# Count dropped
for t in tags:
if t.startswith("B-"):
stype = t[2:]
if stype in DROPPED_TYPES:
dropped_counts[stype] += 1
# Write
f.write(json.dumps({"text": text, "spans": spans}) + "\n")
for s in spans:
entity_counts[s["label"]] += 1
total_spans += len(spans)
if spans:
examples_with_spans += 1
total_examples = len(sentences)
print(f"Total examples: {total_examples}")
print(f"Examples with ≥1 entity: {examples_with_spans}")
print(f"Total entities: {total_spans}")
print(f"\nEntities per class:")
for label in ["Malware", "Indicator", "System", "Organization", "Vulnerability"]:
print(f" {label:20s} {entity_counts[label]:>6d}")
print(f"\nDropped (unmapped) entity types:")
for t, c in dropped_counts.most_common():
print(f" {t:20s} {c:>6d}")
print(f"\nOutput: {out_path}")
if __name__ == "__main__":
main()
|