arcspan / scripts /convert_securebert2.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Convert SecureBERT2 NER JSON data to our 5-class JSONL format."""
import json
from collections import defaultdict
from pathlib import Path
# SecureBERT2 BIO tag IDs → (bio_prefix, entity_type)
TAG_MAP = {
0: ("B", "Indicator"), # B-File_Hash
1: ("B", "Malware"), # B-Malware_Name
2: ("B", "Organization"), # B-Organization
3: ("B", "System"), # B-Application
4: ("B", "Vulnerability"), # B-Vulnerability
5: ("I", "Indicator"), # I-File_Hash
6: ("I", "Malware"), # I-Malware_Name
7: ("I", "Organization"), # I-Organization
8: ("I", "System"), # I-Application
9: ("I", "Vulnerability"), # I-Vulnerability
10: ("O", None),
}
def tokens_to_text_and_offsets(tokens):
"""Join tokens with spaces and return (text, list_of_char_offsets)."""
offsets = []
pos = 0
for t in tokens:
offsets.append(pos)
pos += len(t) + 1
return " ".join(tokens), offsets
def extract_spans(tokens, tags, offsets):
"""Extract entity spans from BIO integer tags. Returns spans dict."""
spans = defaultdict(list)
i = 0
n = len(tokens)
while i < n:
prefix, etype = TAG_MAP[tags[i]]
if prefix == "O" or etype is None:
i += 1
continue
if prefix == "B":
start = offsets[i]
entity_tokens = [tokens[i]]
i += 1
# Consume I- tags of same type
while i < n:
p2, e2 = TAG_MAP[tags[i]]
if p2 == "I" and e2 == etype:
entity_tokens.append(tokens[i])
i += 1
else:
break
entity_text = " ".join(entity_tokens)
end = start + len(entity_text)
spans[f"{etype}: {entity_text}"].append([start, end])
else:
# Orphan I- tag — skip
i += 1
return dict(spans)
def convert_file(json_path: Path, source_name: str):
"""Convert a SecureBERT2 NER JSON file to list of JSONL records."""
with open(json_path) as f:
data = json.load(f)
records = []
for idx, (tokens, tags) in enumerate(zip(data["txt_data"], data["ner_tags"])):
text, offsets = tokens_to_text_and_offsets(tokens)
spans = extract_spans(tokens, tags, offsets)
records.append({
"text": text,
"spans": spans,
"info": {
"id": f"{source_name}_{idx:06d}",
"source": source_name,
},
})
return records
def print_stats(records, label):
"""Print entity statistics for a set of records."""
entity_counts = defaultdict(int)
total_entities = 0
for r in records:
for key, positions in r["spans"].items():
cls = key.split(":")[0]
entity_counts[cls] += len(positions)
total_entities += len(positions)
print(f"\n {label}:")
print(f" Examples: {len(records)}")
print(f" Total entities: {total_entities}")
for cls in sorted(entity_counts):
print(f" {cls}: {entity_counts[cls]}")
def main():
base = Path("/home/ubuntu/alkyline")
sb2_dir = base / "research" / "securebert2" / "opensource_data"
out_dir = base / "data" / "processed"
out_dir.mkdir(parents=True, exist_ok=True)
for split, filename, source_name, out_name in [
("train", "data_NER_train.json", "securebert2_train", "securebert2_5class_train.jsonl"),
("test", "data_NER_test.json", "securebert2_test", "securebert2_5class_test.jsonl"),
]:
json_path = sb2_dir / filename
if not json_path.exists():
print(f"Skipping {filename} — not found")
continue
print(f"Converting {filename}...")
records = convert_file(json_path, source_name)
out_path = out_dir / out_name
with open(out_path, "w") as f:
for r in records:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f" Written to {out_path}")
print_stats(records, f"{split} ({filename})")
print("\nDone.")
if __name__ == "__main__":
main()