arcspan / scripts /convert_dnrti.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Convert DNRTI cybersecurity NER dataset to Arcspan 5-class JSONL format.
DNRTI uses BIO tagging with these entity types:
Area, Exp, Features, HackOrg, Idus, OffAct, Org, Purp, SamFile, SecTeam, Time, Tool, Way
Mapping to our 5 classes:
Malware <- SamFile (malware samples), Tool (hacking tools/RATs)
Indicator <- (none - DNRTI doesn't annotate IOCs)
System <- Way (attack vectors often reference software/platforms)
Organization <- HackOrg (APT groups), Org (organizations), SecTeam (security teams)
Vulnerability<- Exp (exploits/CVEs)
Dropped (no clean mapping): Area, Idus, Time, OffAct, Purp, Features
"""
import json
import sys
from collections import defaultdict
from pathlib import Path
DNRTI_DIR = Path(__file__).resolve().parent.parent / "data" / "raw" / "DNRTI" / "DNRTI_Dataset"
OUTPUT = Path(__file__).resolve().parent.parent / "data" / "processed" / "dnrti_5class.jsonl"
# DNRTI tag -> our 5-class label (None = skip)
TAG_MAP = {
"SamFile": "Malware",
"Tool": "Malware",
"HackOrg": "Organization",
"Org": "Organization",
"SecTeam": "Organization",
"Exp": "Vulnerability",
"Way": "System",
# Dropped:
"Area": None,
"Idus": None,
"Time": None,
"OffAct": None,
"Purp": None,
"Features": None,
}
def parse_bio_file(path: Path) -> list[list[tuple[str, str]]]:
"""Parse BIO-tagged file into list of sentences, each a list of (token, tag)."""
sentences = []
current = []
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip().replace("\r", "")
if not line:
if current:
sentences.append(current)
current = []
continue
parts = line.split()
if len(parts) >= 2:
token = " ".join(parts[:-1]) # handle multi-word tokens (unlikely but safe)
tag = parts[-1]
current.append((token, tag))
else:
# Single column = token with no tag? Skip.
pass
if current:
sentences.append(current)
return sentences
def convert_sentence(tokens_tags: list[tuple[str, str]], idx: int, source: str) -> dict | None:
"""Convert a BIO-tagged sentence to our JSONL format.
Returns None if the sentence is empty after reconstruction.
"""
# Reconstruct text with character offsets
text_parts = []
offsets = [] # (start, end) for each token
pos = 0
for token, _ in tokens_tags:
start = pos
text_parts.append(token)
end = pos + len(token)
offsets.append((start, end))
pos = end + 1 # space separator
text = " ".join(text_parts)
if not text.strip():
return None
# Extract spans using BIO tags
spans: dict[str, list[list[int]]] = defaultdict(list)
i = 0
while i < len(tokens_tags):
_, tag = tokens_tags[i]
if tag.startswith("B-"):
etype = tag[2:]
label = TAG_MAP.get(etype)
if label is not None:
span_start = offsets[i][0]
span_end = offsets[i][1]
# Consume continuation tokens
j = i + 1
while j < len(tokens_tags):
_, next_tag = tokens_tags[j]
if next_tag == f"I-{etype}":
span_end = offsets[j][1]
j += 1
else:
break
span_text = text[span_start:span_end]
key = f"{label}: {span_text}"
spans[key].append([span_start, span_end])
i = j
continue
i += 1
return {
"text": text,
"spans": dict(spans),
"info": {"id": f"dnrti_{source}_{idx:06d}", "source": f"dnrti_{source}"},
}
def main():
all_records = []
entity_counts: dict[str, int] = defaultdict(int)
dropped_counts: dict[str, int] = defaultdict(int)
file_stats = {}
for split in ["train", "valid", "test"]:
path = DNRTI_DIR / f"{split}.txt"
if not path.exists():
print(f"Warning: {path} not found, skipping", file=sys.stderr)
continue
sentences = parse_bio_file(path)
records = []
for i, sent in enumerate(sentences):
rec = convert_sentence(sent, len(all_records) + len(records), split)
if rec is not None:
records.append(rec)
for key in rec["spans"]:
label = key.split(":")[0]
entity_counts[label] += len(rec["spans"][key])
# Count dropped entities
for sent in sentences:
for _, tag in sent:
if tag.startswith("B-"):
etype = tag[2:]
if TAG_MAP.get(etype) is None:
dropped_counts[etype] += 1
file_stats[split] = {"sentences": len(sentences), "converted": len(records)}
all_records.extend(records)
# Write output
OUTPUT.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT, "w") as f:
for rec in all_records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
# Stats
with_entities = sum(1 for r in all_records if r["spans"])
print(f"\n=== DNRTI → 5-class Conversion ===")
print(f"Output: {OUTPUT}")
print(f"Total sentences: {sum(s['sentences'] for s in file_stats.values())}")
print(f"Converted records: {len(all_records)}")
print(f"Records with entities: {with_entities}")
print(f"Records without entities (O-only): {len(all_records) - with_entities}")
print(f"\nPer-split:")
for split, stats in file_stats.items():
print(f" {split}: {stats['sentences']} sentences → {stats['converted']} records")
print(f"\nEntity counts (mapped):")
for label in sorted(entity_counts):
print(f" {label}: {entity_counts[label]}")
print(f" TOTAL: {sum(entity_counts.values())}")
print(f"\nDropped entity types (no mapping):")
for etype in sorted(dropped_counts):
print(f" {etype}: {dropped_counts[etype]}")
print(f" TOTAL dropped: {sum(dropped_counts.values())}")
if __name__ == "__main__":
main()