arcspan / scripts /convert_cyberner_harmonized.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Convert CyberNER_harmonized CSV (BIO-tagged) to span-based JSONL for our 5-class label space.
Input: data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv
Output: data/processed/cyberner_harmonized_5class.jsonl
STIX_Tag -> 5-class mapping:
Malware <- Malware, Malware-Analysis
Indicator <- IPv4-Addr, Domain-Name, URL, Email-Addr, File, Indicator,
Network-Traffic, Observed-Data
System <- Software, Tool, Infrastructure
Organization <- Identity, Threat-Actor, Intrusion-Set, Campaign
Vulnerability <- Vulnerability
Unmapped (dropped):
Attack-Pattern, Course-of-Action, Location -> no good 5-class fit
"""
import csv
import json
import sys
from collections import Counter
from pathlib import Path
STIX_TO_5CLASS = {
# Malware
"Malware": "Malware",
"Malware-Analysis": "Malware",
# Indicator
"IPv4-Addr": "Indicator",
"Domain-Name": "Indicator",
"URL": "Indicator",
"Email-Addr": "Indicator",
"File": "Indicator",
"Indicator": "Indicator",
"Network-Traffic": "Indicator",
"Observed-Data": "Indicator",
# System
"Software": "System",
"Tool": "System",
"Infrastructure": "System",
# Organization
"Identity": "Organization",
"Threat-Actor": "Organization",
"Intrusion-Set": "Organization",
"Campaign": "Organization",
# Vulnerability
"Vulnerability": "Vulnerability",
}
DROPPED_TYPES = {"Attack-Pattern", "Course-of-Action", "Location"}
def parse_csv(path):
"""Yield (sentence_id, word, stix_tag) tuples."""
with open(path, newline="") as f:
reader = csv.reader(f)
next(reader) # skip header
for row in reader:
if len(row) < 5:
continue
word, _tag, sid, stix_tag, _source = row[0], row[1], row[2], row[3], row[4]
yield int(sid), word, stix_tag
def bio_to_spans(words, tags):
"""Convert parallel word/tag lists to (text, spans) in 5-class space."""
text_parts = []
char_offset = 0
offsets = [] # (start, end) for each word
for w in words:
start = char_offset
end = start + len(w)
offsets.append((start, end))
text_parts.append(w)
char_offset = end + 1 # space
text = " ".join(text_parts)
spans = []
i = 0
while i < len(tags):
tag = tags[i]
if tag.startswith("B-"):
stix_type = tag[2:]
label = STIX_TO_5CLASS.get(stix_type)
if label is None:
i += 1
continue
span_start = offsets[i][0]
span_end = offsets[i][1]
j = i + 1
while j < len(tags) and tags[j] == f"I-{stix_type}":
span_end = offsets[j][1]
j += 1
spans.append({"start": span_start, "end": span_end, "label": label})
i = j
else:
i += 1
return text, spans
def main():
base = Path(__file__).resolve().parent.parent
csv_path = base / "data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv"
out_path = base / "data/processed/cyberner_harmonized_5class.jsonl"
# Group by sentence
sentences = {}
for sid, word, stix_tag in parse_csv(csv_path):
sentences.setdefault(sid, ([], []))
sentences[sid][0].append(word)
sentences[sid][1].append(stix_tag)
entity_counts = Counter()
dropped_counts = Counter()
total_spans = 0
examples_with_spans = 0
with open(out_path, "w") as f:
for sid in sorted(sentences):
words, tags = sentences[sid]
text, spans = bio_to_spans(words, tags)
if not text.strip():
continue
# Count dropped
for t in tags:
if t.startswith("B-"):
stype = t[2:]
if stype in DROPPED_TYPES:
dropped_counts[stype] += 1
# Write
f.write(json.dumps({"text": text, "spans": spans}) + "\n")
for s in spans:
entity_counts[s["label"]] += 1
total_spans += len(spans)
if spans:
examples_with_spans += 1
total_examples = len(sentences)
print(f"Total examples: {total_examples}")
print(f"Examples with ≥1 entity: {examples_with_spans}")
print(f"Total entities: {total_spans}")
print(f"\nEntities per class:")
for label in ["Malware", "Indicator", "System", "Organization", "Vulnerability"]:
print(f" {label:20s} {entity_counts[label]:>6d}")
print(f"\nDropped (unmapped) entity types:")
for t, c in dropped_counts.most_common():
print(f" {t:20s} {c:>6d}")
print(f"\nOutput: {out_path}")
if __name__ == "__main__":
main()