File size: 4,209 Bytes
3dac39e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | #!/usr/bin/env python3
"""Convert SecureBERT2 NER JSON data to our 5-class JSONL format."""
import json
from collections import defaultdict
from pathlib import Path
# SecureBERT2 BIO tag IDs → (bio_prefix, entity_type)
TAG_MAP = {
0: ("B", "Indicator"), # B-File_Hash
1: ("B", "Malware"), # B-Malware_Name
2: ("B", "Organization"), # B-Organization
3: ("B", "System"), # B-Application
4: ("B", "Vulnerability"), # B-Vulnerability
5: ("I", "Indicator"), # I-File_Hash
6: ("I", "Malware"), # I-Malware_Name
7: ("I", "Organization"), # I-Organization
8: ("I", "System"), # I-Application
9: ("I", "Vulnerability"), # I-Vulnerability
10: ("O", None),
}
def tokens_to_text_and_offsets(tokens):
"""Join tokens with spaces and return (text, list_of_char_offsets)."""
offsets = []
pos = 0
for t in tokens:
offsets.append(pos)
pos += len(t) + 1
return " ".join(tokens), offsets
def extract_spans(tokens, tags, offsets):
"""Extract entity spans from BIO integer tags. Returns spans dict."""
spans = defaultdict(list)
i = 0
n = len(tokens)
while i < n:
prefix, etype = TAG_MAP[tags[i]]
if prefix == "O" or etype is None:
i += 1
continue
if prefix == "B":
start = offsets[i]
entity_tokens = [tokens[i]]
i += 1
# Consume I- tags of same type
while i < n:
p2, e2 = TAG_MAP[tags[i]]
if p2 == "I" and e2 == etype:
entity_tokens.append(tokens[i])
i += 1
else:
break
entity_text = " ".join(entity_tokens)
end = start + len(entity_text)
spans[f"{etype}: {entity_text}"].append([start, end])
else:
# Orphan I- tag — skip
i += 1
return dict(spans)
def convert_file(json_path: Path, source_name: str):
"""Convert a SecureBERT2 NER JSON file to list of JSONL records."""
with open(json_path) as f:
data = json.load(f)
records = []
for idx, (tokens, tags) in enumerate(zip(data["txt_data"], data["ner_tags"])):
text, offsets = tokens_to_text_and_offsets(tokens)
spans = extract_spans(tokens, tags, offsets)
records.append({
"text": text,
"spans": spans,
"info": {
"id": f"{source_name}_{idx:06d}",
"source": source_name,
},
})
return records
def print_stats(records, label):
"""Print entity statistics for a set of records."""
entity_counts = defaultdict(int)
total_entities = 0
for r in records:
for key, positions in r["spans"].items():
cls = key.split(":")[0]
entity_counts[cls] += len(positions)
total_entities += len(positions)
print(f"\n {label}:")
print(f" Examples: {len(records)}")
print(f" Total entities: {total_entities}")
for cls in sorted(entity_counts):
print(f" {cls}: {entity_counts[cls]}")
def main():
base = Path("/home/ubuntu/alkyline")
sb2_dir = base / "research" / "securebert2" / "opensource_data"
out_dir = base / "data" / "processed"
out_dir.mkdir(parents=True, exist_ok=True)
for split, filename, source_name, out_name in [
("train", "data_NER_train.json", "securebert2_train", "securebert2_5class_train.jsonl"),
("test", "data_NER_test.json", "securebert2_test", "securebert2_5class_test.jsonl"),
]:
json_path = sb2_dir / filename
if not json_path.exists():
print(f"Skipping {filename} — not found")
continue
print(f"Converting {filename}...")
records = convert_file(json_path, source_name)
out_path = out_dir / out_name
with open(out_path, "w") as f:
for r in records:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f" Written to {out_path}")
print_stats(records, f"{split} ({filename})")
print("\nDone.")
if __name__ == "__main__":
main()
|