| |
| """Convert CyberNER_harmonized CSV (BIO-tagged) to span-based JSONL for our 5-class label space. |
| |
| Input: data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv |
| Output: data/processed/cyberner_harmonized_5class.jsonl |
| |
| STIX_Tag -> 5-class mapping: |
| Malware <- Malware, Malware-Analysis |
| Indicator <- IPv4-Addr, Domain-Name, URL, Email-Addr, File, Indicator, |
| Network-Traffic, Observed-Data |
| System <- Software, Tool, Infrastructure |
| Organization <- Identity, Threat-Actor, Intrusion-Set, Campaign |
| Vulnerability <- Vulnerability |
| |
| Unmapped (dropped): |
| Attack-Pattern, Course-of-Action, Location -> no good 5-class fit |
| """ |
|
|
| import csv |
| import json |
| import sys |
| from collections import Counter |
| from pathlib import Path |
|
|
| STIX_TO_5CLASS = { |
| |
| "Malware": "Malware", |
| "Malware-Analysis": "Malware", |
| |
| "IPv4-Addr": "Indicator", |
| "Domain-Name": "Indicator", |
| "URL": "Indicator", |
| "Email-Addr": "Indicator", |
| "File": "Indicator", |
| "Indicator": "Indicator", |
| "Network-Traffic": "Indicator", |
| "Observed-Data": "Indicator", |
| |
| "Software": "System", |
| "Tool": "System", |
| "Infrastructure": "System", |
| |
| "Identity": "Organization", |
| "Threat-Actor": "Organization", |
| "Intrusion-Set": "Organization", |
| "Campaign": "Organization", |
| |
| "Vulnerability": "Vulnerability", |
| } |
|
|
| DROPPED_TYPES = {"Attack-Pattern", "Course-of-Action", "Location"} |
|
|
|
|
| def parse_csv(path): |
| """Yield (sentence_id, word, stix_tag) tuples.""" |
| with open(path, newline="") as f: |
| reader = csv.reader(f) |
| next(reader) |
| for row in reader: |
| if len(row) < 5: |
| continue |
| word, _tag, sid, stix_tag, _source = row[0], row[1], row[2], row[3], row[4] |
| yield int(sid), word, stix_tag |
|
|
|
|
| def bio_to_spans(words, tags): |
| """Convert parallel word/tag lists to (text, spans) in 5-class space.""" |
| text_parts = [] |
| char_offset = 0 |
| offsets = [] |
|
|
| for w in words: |
| start = char_offset |
| end = start + len(w) |
| offsets.append((start, end)) |
| text_parts.append(w) |
| char_offset = end + 1 |
|
|
| text = " ".join(text_parts) |
| spans = [] |
| i = 0 |
| while i < len(tags): |
| tag = tags[i] |
| if tag.startswith("B-"): |
| stix_type = tag[2:] |
| label = STIX_TO_5CLASS.get(stix_type) |
| if label is None: |
| i += 1 |
| continue |
| span_start = offsets[i][0] |
| span_end = offsets[i][1] |
| j = i + 1 |
| while j < len(tags) and tags[j] == f"I-{stix_type}": |
| span_end = offsets[j][1] |
| j += 1 |
| spans.append({"start": span_start, "end": span_end, "label": label}) |
| i = j |
| else: |
| i += 1 |
|
|
| return text, spans |
|
|
|
|
| def main(): |
| base = Path(__file__).resolve().parent.parent |
| csv_path = base / "data/raw/CyberNER_harmonized/dataset/cyberner_combined_stix.csv" |
| out_path = base / "data/processed/cyberner_harmonized_5class.jsonl" |
|
|
| |
| sentences = {} |
| for sid, word, stix_tag in parse_csv(csv_path): |
| sentences.setdefault(sid, ([], [])) |
| sentences[sid][0].append(word) |
| sentences[sid][1].append(stix_tag) |
|
|
| entity_counts = Counter() |
| dropped_counts = Counter() |
| total_spans = 0 |
| examples_with_spans = 0 |
|
|
| with open(out_path, "w") as f: |
| for sid in sorted(sentences): |
| words, tags = sentences[sid] |
| text, spans = bio_to_spans(words, tags) |
| if not text.strip(): |
| continue |
| |
| for t in tags: |
| if t.startswith("B-"): |
| stype = t[2:] |
| if stype in DROPPED_TYPES: |
| dropped_counts[stype] += 1 |
| |
| f.write(json.dumps({"text": text, "spans": spans}) + "\n") |
| for s in spans: |
| entity_counts[s["label"]] += 1 |
| total_spans += len(spans) |
| if spans: |
| examples_with_spans += 1 |
|
|
| total_examples = len(sentences) |
| print(f"Total examples: {total_examples}") |
| print(f"Examples with ≥1 entity: {examples_with_spans}") |
| print(f"Total entities: {total_spans}") |
| print(f"\nEntities per class:") |
| for label in ["Malware", "Indicator", "System", "Organization", "Vulnerability"]: |
| print(f" {label:20s} {entity_counts[label]:>6d}") |
| print(f"\nDropped (unmapped) entity types:") |
| for t, c in dropped_counts.most_common(): |
| print(f" {t:20s} {c:>6d}") |
| print(f"\nOutput: {out_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|