#!/usr/bin/env python3 """Convert CyberNER_harmonized CSV (BIO-tagged) to span-based JSONL for our 5-class label space. Input: data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv Output: data/processed/cyberner_harmonized_5class.jsonl STIX_Tag -> 5-class mapping: Malware <- Malware, Malware-Analysis Indicator <- IPv4-Addr, Domain-Name, URL, Email-Addr, File, Indicator, Network-Traffic, Observed-Data System <- Software, Tool, Infrastructure Organization <- Identity, Threat-Actor, Intrusion-Set, Campaign Vulnerability <- Vulnerability Unmapped (dropped): Attack-Pattern, Course-of-Action, Location -> no good 5-class fit """ import csv import json import sys from collections import Counter from pathlib import Path STIX_TO_5CLASS = { # Malware "Malware": "Malware", "Malware-Analysis": "Malware", # Indicator "IPv4-Addr": "Indicator", "Domain-Name": "Indicator", "URL": "Indicator", "Email-Addr": "Indicator", "File": "Indicator", "Indicator": "Indicator", "Network-Traffic": "Indicator", "Observed-Data": "Indicator", # System "Software": "System", "Tool": "System", "Infrastructure": "System", # Organization "Identity": "Organization", "Threat-Actor": "Organization", "Intrusion-Set": "Organization", "Campaign": "Organization", # Vulnerability "Vulnerability": "Vulnerability", } DROPPED_TYPES = {"Attack-Pattern", "Course-of-Action", "Location"} def parse_csv(path): """Yield (sentence_id, word, stix_tag) tuples.""" with open(path, newline="") as f: reader = csv.reader(f) next(reader) # skip header for row in reader: if len(row) < 5: continue word, _tag, sid, stix_tag, _source = row[0], row[1], row[2], row[3], row[4] yield int(sid), word, stix_tag def bio_to_spans(words, tags): """Convert parallel word/tag lists to (text, spans) in 5-class space.""" text_parts = [] char_offset = 0 offsets = [] # (start, end) for each word for w in words: start = char_offset end = start + len(w) offsets.append((start, end)) text_parts.append(w) char_offset = end + 1 # space text = " ".join(text_parts) spans = [] i = 0 while i < len(tags): tag = tags[i] if tag.startswith("B-"): stix_type = tag[2:] label = STIX_TO_5CLASS.get(stix_type) if label is None: i += 1 continue span_start = offsets[i][0] span_end = offsets[i][1] j = i + 1 while j < len(tags) and tags[j] == f"I-{stix_type}": span_end = offsets[j][1] j += 1 spans.append({"start": span_start, "end": span_end, "label": label}) i = j else: i += 1 return text, spans def main(): base = Path(__file__).resolve().parent.parent csv_path = base / "data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv" out_path = base / "data/processed/cyberner_harmonized_5class.jsonl" # Group by sentence sentences = {} for sid, word, stix_tag in parse_csv(csv_path): sentences.setdefault(sid, ([], [])) sentences[sid][0].append(word) sentences[sid][1].append(stix_tag) entity_counts = Counter() dropped_counts = Counter() total_spans = 0 examples_with_spans = 0 with open(out_path, "w") as f: for sid in sorted(sentences): words, tags = sentences[sid] text, spans = bio_to_spans(words, tags) if not text.strip(): continue # Count dropped for t in tags: if t.startswith("B-"): stype = t[2:] if stype in DROPPED_TYPES: dropped_counts[stype] += 1 # Write f.write(json.dumps({"text": text, "spans": spans}) + "\n") for s in spans: entity_counts[s["label"]] += 1 total_spans += len(spans) if spans: examples_with_spans += 1 total_examples = len(sentences) print(f"Total examples: {total_examples}") print(f"Examples with ≥1 entity: {examples_with_spans}") print(f"Total entities: {total_spans}") print(f"\nEntities per class:") for label in ["Malware", "Indicator", "System", "Organization", "Vulnerability"]: print(f" {label:20s} {entity_counts[label]:>6d}") print(f"\nDropped (unmapped) entity types:") for t, c in dropped_counts.most_common(): print(f" {t:20s} {c:>6d}") print(f"\nOutput: {out_path}") if __name__ == "__main__": main()