File size: 6,373 Bytes
3dac39e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | #!/usr/bin/env python3
"""Convert DNRTI cybersecurity NER dataset to Arcspan 5-class JSONL format.
DNRTI uses BIO tagging with these entity types:
Area, Exp, Features, HackOrg, Idus, OffAct, Org, Purp, SamFile, SecTeam, Time, Tool, Way
Mapping to our 5 classes:
Malware <- SamFile (malware samples), Tool (hacking tools/RATs)
Indicator <- (none - DNRTI doesn't annotate IOCs)
System <- Way (attack vectors often reference software/platforms)
Organization <- HackOrg (APT groups), Org (organizations), SecTeam (security teams)
Vulnerability<- Exp (exploits/CVEs)
Dropped (no clean mapping): Area, Idus, Time, OffAct, Purp, Features
"""
import json
import sys
from collections import defaultdict
from pathlib import Path
DNRTI_DIR = Path(__file__).resolve().parent.parent / "data" / "raw" / "DNRTI" / "DNRTI_Dataset"
OUTPUT = Path(__file__).resolve().parent.parent / "data" / "processed" / "dnrti_5class.jsonl"
# DNRTI tag -> our 5-class label (None = skip)
TAG_MAP = {
"SamFile": "Malware",
"Tool": "Malware",
"HackOrg": "Organization",
"Org": "Organization",
"SecTeam": "Organization",
"Exp": "Vulnerability",
"Way": "System",
# Dropped:
"Area": None,
"Idus": None,
"Time": None,
"OffAct": None,
"Purp": None,
"Features": None,
}
def parse_bio_file(path: Path) -> list[list[tuple[str, str]]]:
"""Parse BIO-tagged file into list of sentences, each a list of (token, tag)."""
sentences = []
current = []
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip().replace("\r", "")
if not line:
if current:
sentences.append(current)
current = []
continue
parts = line.split()
if len(parts) >= 2:
token = " ".join(parts[:-1]) # handle multi-word tokens (unlikely but safe)
tag = parts[-1]
current.append((token, tag))
else:
# Single column = token with no tag? Skip.
pass
if current:
sentences.append(current)
return sentences
def convert_sentence(tokens_tags: list[tuple[str, str]], idx: int, source: str) -> dict | None:
"""Convert a BIO-tagged sentence to our JSONL format.
Returns None if the sentence is empty after reconstruction.
"""
# Reconstruct text with character offsets
text_parts = []
offsets = [] # (start, end) for each token
pos = 0
for token, _ in tokens_tags:
start = pos
text_parts.append(token)
end = pos + len(token)
offsets.append((start, end))
pos = end + 1 # space separator
text = " ".join(text_parts)
if not text.strip():
return None
# Extract spans using BIO tags
spans: dict[str, list[list[int]]] = defaultdict(list)
i = 0
while i < len(tokens_tags):
_, tag = tokens_tags[i]
if tag.startswith("B-"):
etype = tag[2:]
label = TAG_MAP.get(etype)
if label is not None:
span_start = offsets[i][0]
span_end = offsets[i][1]
# Consume continuation tokens
j = i + 1
while j < len(tokens_tags):
_, next_tag = tokens_tags[j]
if next_tag == f"I-{etype}":
span_end = offsets[j][1]
j += 1
else:
break
span_text = text[span_start:span_end]
key = f"{label}: {span_text}"
spans[key].append([span_start, span_end])
i = j
continue
i += 1
return {
"text": text,
"spans": dict(spans),
"info": {"id": f"dnrti_{source}_{idx:06d}", "source": f"dnrti_{source}"},
}
def main():
all_records = []
entity_counts: dict[str, int] = defaultdict(int)
dropped_counts: dict[str, int] = defaultdict(int)
file_stats = {}
for split in ["train", "valid", "test"]:
path = DNRTI_DIR / f"{split}.txt"
if not path.exists():
print(f"Warning: {path} not found, skipping", file=sys.stderr)
continue
sentences = parse_bio_file(path)
records = []
for i, sent in enumerate(sentences):
rec = convert_sentence(sent, len(all_records) + len(records), split)
if rec is not None:
records.append(rec)
for key in rec["spans"]:
label = key.split(":")[0]
entity_counts[label] += len(rec["spans"][key])
# Count dropped entities
for sent in sentences:
for _, tag in sent:
if tag.startswith("B-"):
etype = tag[2:]
if TAG_MAP.get(etype) is None:
dropped_counts[etype] += 1
file_stats[split] = {"sentences": len(sentences), "converted": len(records)}
all_records.extend(records)
# Write output
OUTPUT.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT, "w") as f:
for rec in all_records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
# Stats
with_entities = sum(1 for r in all_records if r["spans"])
print(f"\n=== DNRTI → 5-class Conversion ===")
print(f"Output: {OUTPUT}")
print(f"Total sentences: {sum(s['sentences'] for s in file_stats.values())}")
print(f"Converted records: {len(all_records)}")
print(f"Records with entities: {with_entities}")
print(f"Records without entities (O-only): {len(all_records) - with_entities}")
print(f"\nPer-split:")
for split, stats in file_stats.items():
print(f" {split}: {stats['sentences']} sentences → {stats['converted']} records")
print(f"\nEntity counts (mapped):")
for label in sorted(entity_counts):
print(f" {label}: {entity_counts[label]}")
print(f" TOTAL: {sum(entity_counts.values())}")
print(f"\nDropped entity types (no mapping):")
for etype in sorted(dropped_counts):
print(f" {etype}: {dropped_counts[etype]}")
print(f" TOTAL dropped: {sum(dropped_counts.values())}")
if __name__ == "__main__":
main()
|