| |
| """Convert DNRTI cybersecurity NER dataset to Arcspan 5-class JSONL format. |
| |
| DNRTI uses BIO tagging with these entity types: |
| Area, Exp, Features, HackOrg, Idus, OffAct, Org, Purp, SamFile, SecTeam, Time, Tool, Way |
| |
| Mapping to our 5 classes: |
| Malware <- SamFile (malware samples), Tool (hacking tools/RATs) |
| Indicator <- (none - DNRTI doesn't annotate IOCs) |
| System <- Way (attack vectors often reference software/platforms) |
| Organization <- HackOrg (APT groups), Org (organizations), SecTeam (security teams) |
| Vulnerability<- Exp (exploits/CVEs) |
| |
| Dropped (no clean mapping): Area, Idus, Time, OffAct, Purp, Features |
| """ |
|
|
| import json |
| import sys |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| DNRTI_DIR = Path(__file__).resolve().parent.parent / "data" / "raw" / "DNRTI" / "DNRTI_Dataset" |
| OUTPUT = Path(__file__).resolve().parent.parent / "data" / "processed" / "dnrti_5class.jsonl" |
|
|
| |
| TAG_MAP = { |
| "SamFile": "Malware", |
| "Tool": "Malware", |
| "HackOrg": "Organization", |
| "Org": "Organization", |
| "SecTeam": "Organization", |
| "Exp": "Vulnerability", |
| "Way": "System", |
| |
| "Area": None, |
| "Idus": None, |
| "Time": None, |
| "OffAct": None, |
| "Purp": None, |
| "Features": None, |
| } |
|
|
|
|
| def parse_bio_file(path: Path) -> list[list[tuple[str, str]]]: |
| """Parse BIO-tagged file into list of sentences, each a list of (token, tag).""" |
| sentences = [] |
| current = [] |
| with open(path, encoding="utf-8") as f: |
| for line in f: |
| line = line.strip().replace("\r", "") |
| if not line: |
| if current: |
| sentences.append(current) |
| current = [] |
| continue |
| parts = line.split() |
| if len(parts) >= 2: |
| token = " ".join(parts[:-1]) |
| tag = parts[-1] |
| current.append((token, tag)) |
| else: |
| |
| pass |
| if current: |
| sentences.append(current) |
| return sentences |
|
|
|
|
| def convert_sentence(tokens_tags: list[tuple[str, str]], idx: int, source: str) -> dict | None: |
| """Convert a BIO-tagged sentence to our JSONL format. |
| |
| Returns None if the sentence is empty after reconstruction. |
| """ |
| |
| text_parts = [] |
| offsets = [] |
| pos = 0 |
| for token, _ in tokens_tags: |
| start = pos |
| text_parts.append(token) |
| end = pos + len(token) |
| offsets.append((start, end)) |
| pos = end + 1 |
|
|
| text = " ".join(text_parts) |
| if not text.strip(): |
| return None |
|
|
| |
| spans: dict[str, list[list[int]]] = defaultdict(list) |
| i = 0 |
| while i < len(tokens_tags): |
| _, tag = tokens_tags[i] |
| if tag.startswith("B-"): |
| etype = tag[2:] |
| label = TAG_MAP.get(etype) |
| if label is not None: |
| span_start = offsets[i][0] |
| span_end = offsets[i][1] |
| |
| j = i + 1 |
| while j < len(tokens_tags): |
| _, next_tag = tokens_tags[j] |
| if next_tag == f"I-{etype}": |
| span_end = offsets[j][1] |
| j += 1 |
| else: |
| break |
| span_text = text[span_start:span_end] |
| key = f"{label}: {span_text}" |
| spans[key].append([span_start, span_end]) |
| i = j |
| continue |
| i += 1 |
|
|
| return { |
| "text": text, |
| "spans": dict(spans), |
| "info": {"id": f"dnrti_{source}_{idx:06d}", "source": f"dnrti_{source}"}, |
| } |
|
|
|
|
| def main(): |
| all_records = [] |
| entity_counts: dict[str, int] = defaultdict(int) |
| dropped_counts: dict[str, int] = defaultdict(int) |
| file_stats = {} |
|
|
| for split in ["train", "valid", "test"]: |
| path = DNRTI_DIR / f"{split}.txt" |
| if not path.exists(): |
| print(f"Warning: {path} not found, skipping", file=sys.stderr) |
| continue |
|
|
| sentences = parse_bio_file(path) |
| records = [] |
| for i, sent in enumerate(sentences): |
| rec = convert_sentence(sent, len(all_records) + len(records), split) |
| if rec is not None: |
| records.append(rec) |
| for key in rec["spans"]: |
| label = key.split(":")[0] |
| entity_counts[label] += len(rec["spans"][key]) |
|
|
| |
| for sent in sentences: |
| for _, tag in sent: |
| if tag.startswith("B-"): |
| etype = tag[2:] |
| if TAG_MAP.get(etype) is None: |
| dropped_counts[etype] += 1 |
|
|
| file_stats[split] = {"sentences": len(sentences), "converted": len(records)} |
| all_records.extend(records) |
|
|
| |
| OUTPUT.parent.mkdir(parents=True, exist_ok=True) |
| with open(OUTPUT, "w") as f: |
| for rec in all_records: |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") |
|
|
| |
| with_entities = sum(1 for r in all_records if r["spans"]) |
| print(f"\n=== DNRTI → 5-class Conversion ===") |
| print(f"Output: {OUTPUT}") |
| print(f"Total sentences: {sum(s['sentences'] for s in file_stats.values())}") |
| print(f"Converted records: {len(all_records)}") |
| print(f"Records with entities: {with_entities}") |
| print(f"Records without entities (O-only): {len(all_records) - with_entities}") |
| print(f"\nPer-split:") |
| for split, stats in file_stats.items(): |
| print(f" {split}: {stats['sentences']} sentences → {stats['converted']} records") |
| print(f"\nEntity counts (mapped):") |
| for label in sorted(entity_counts): |
| print(f" {label}: {entity_counts[label]}") |
| print(f" TOTAL: {sum(entity_counts.values())}") |
| print(f"\nDropped entity types (no mapping):") |
| for etype in sorted(dropped_counts): |
| print(f" {etype}: {dropped_counts[etype]}") |
| print(f" TOTAL dropped: {sum(dropped_counts.values())}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|