#!/usr/bin/env python3 """Convert DNRTI cybersecurity NER dataset to Arcspan 5-class JSONL format. DNRTI uses BIO tagging with these entity types: Area, Exp, Features, HackOrg, Idus, OffAct, Org, Purp, SamFile, SecTeam, Time, Tool, Way Mapping to our 5 classes: Malware <- SamFile (malware samples), Tool (hacking tools/RATs) Indicator <- (none - DNRTI doesn't annotate IOCs) System <- Way (attack vectors often reference software/platforms) Organization <- HackOrg (APT groups), Org (organizations), SecTeam (security teams) Vulnerability<- Exp (exploits/CVEs) Dropped (no clean mapping): Area, Idus, Time, OffAct, Purp, Features """ import json import sys from collections import defaultdict from pathlib import Path DNRTI_DIR = Path(__file__).resolve().parent.parent / "data" / "raw" / "DNRTI" / "DNRTI_Dataset" OUTPUT = Path(__file__).resolve().parent.parent / "data" / "processed" / "dnrti_5class.jsonl" # DNRTI tag -> our 5-class label (None = skip) TAG_MAP = { "SamFile": "Malware", "Tool": "Malware", "HackOrg": "Organization", "Org": "Organization", "SecTeam": "Organization", "Exp": "Vulnerability", "Way": "System", # Dropped: "Area": None, "Idus": None, "Time": None, "OffAct": None, "Purp": None, "Features": None, } def parse_bio_file(path: Path) -> list[list[tuple[str, str]]]: """Parse BIO-tagged file into list of sentences, each a list of (token, tag).""" sentences = [] current = [] with open(path, encoding="utf-8") as f: for line in f: line = line.strip().replace("\r", "") if not line: if current: sentences.append(current) current = [] continue parts = line.split() if len(parts) >= 2: token = " ".join(parts[:-1]) # handle multi-word tokens (unlikely but safe) tag = parts[-1] current.append((token, tag)) else: # Single column = token with no tag? Skip. pass if current: sentences.append(current) return sentences def convert_sentence(tokens_tags: list[tuple[str, str]], idx: int, source: str) -> dict | None: """Convert a BIO-tagged sentence to our JSONL format. Returns None if the sentence is empty after reconstruction. """ # Reconstruct text with character offsets text_parts = [] offsets = [] # (start, end) for each token pos = 0 for token, _ in tokens_tags: start = pos text_parts.append(token) end = pos + len(token) offsets.append((start, end)) pos = end + 1 # space separator text = " ".join(text_parts) if not text.strip(): return None # Extract spans using BIO tags spans: dict[str, list[list[int]]] = defaultdict(list) i = 0 while i < len(tokens_tags): _, tag = tokens_tags[i] if tag.startswith("B-"): etype = tag[2:] label = TAG_MAP.get(etype) if label is not None: span_start = offsets[i][0] span_end = offsets[i][1] # Consume continuation tokens j = i + 1 while j < len(tokens_tags): _, next_tag = tokens_tags[j] if next_tag == f"I-{etype}": span_end = offsets[j][1] j += 1 else: break span_text = text[span_start:span_end] key = f"{label}: {span_text}" spans[key].append([span_start, span_end]) i = j continue i += 1 return { "text": text, "spans": dict(spans), "info": {"id": f"dnrti_{source}_{idx:06d}", "source": f"dnrti_{source}"}, } def main(): all_records = [] entity_counts: dict[str, int] = defaultdict(int) dropped_counts: dict[str, int] = defaultdict(int) file_stats = {} for split in ["train", "valid", "test"]: path = DNRTI_DIR / f"{split}.txt" if not path.exists(): print(f"Warning: {path} not found, skipping", file=sys.stderr) continue sentences = parse_bio_file(path) records = [] for i, sent in enumerate(sentences): rec = convert_sentence(sent, len(all_records) + len(records), split) if rec is not None: records.append(rec) for key in rec["spans"]: label = key.split(":")[0] entity_counts[label] += len(rec["spans"][key]) # Count dropped entities for sent in sentences: for _, tag in sent: if tag.startswith("B-"): etype = tag[2:] if TAG_MAP.get(etype) is None: dropped_counts[etype] += 1 file_stats[split] = {"sentences": len(sentences), "converted": len(records)} all_records.extend(records) # Write output OUTPUT.parent.mkdir(parents=True, exist_ok=True) with open(OUTPUT, "w") as f: for rec in all_records: f.write(json.dumps(rec, ensure_ascii=False) + "\n") # Stats with_entities = sum(1 for r in all_records if r["spans"]) print(f"\n=== DNRTI → 5-class Conversion ===") print(f"Output: {OUTPUT}") print(f"Total sentences: {sum(s['sentences'] for s in file_stats.values())}") print(f"Converted records: {len(all_records)}") print(f"Records with entities: {with_entities}") print(f"Records without entities (O-only): {len(all_records) - with_entities}") print(f"\nPer-split:") for split, stats in file_stats.items(): print(f" {split}: {stats['sentences']} sentences → {stats['converted']} records") print(f"\nEntity counts (mapped):") for label in sorted(entity_counts): print(f" {label}: {entity_counts[label]}") print(f" TOTAL: {sum(entity_counts.values())}") print(f"\nDropped entity types (no mapping):") for etype in sorted(dropped_counts): print(f" {etype}: {dropped_counts[etype]}") print(f" TOTAL dropped: {sum(dropped_counts.values())}") if __name__ == "__main__": main()