#!/usr/bin/env python3 """Convert SecureBERT2 NER JSON data to our 5-class JSONL format.""" import json from collections import defaultdict from pathlib import Path # SecureBERT2 BIO tag IDs → (bio_prefix, entity_type) TAG_MAP = { 0: ("B", "Indicator"), # B-File_Hash 1: ("B", "Malware"), # B-Malware_Name 2: ("B", "Organization"), # B-Organization 3: ("B", "System"), # B-Application 4: ("B", "Vulnerability"), # B-Vulnerability 5: ("I", "Indicator"), # I-File_Hash 6: ("I", "Malware"), # I-Malware_Name 7: ("I", "Organization"), # I-Organization 8: ("I", "System"), # I-Application 9: ("I", "Vulnerability"), # I-Vulnerability 10: ("O", None), } def tokens_to_text_and_offsets(tokens): """Join tokens with spaces and return (text, list_of_char_offsets).""" offsets = [] pos = 0 for t in tokens: offsets.append(pos) pos += len(t) + 1 return " ".join(tokens), offsets def extract_spans(tokens, tags, offsets): """Extract entity spans from BIO integer tags. Returns spans dict.""" spans = defaultdict(list) i = 0 n = len(tokens) while i < n: prefix, etype = TAG_MAP[tags[i]] if prefix == "O" or etype is None: i += 1 continue if prefix == "B": start = offsets[i] entity_tokens = [tokens[i]] i += 1 # Consume I- tags of same type while i < n: p2, e2 = TAG_MAP[tags[i]] if p2 == "I" and e2 == etype: entity_tokens.append(tokens[i]) i += 1 else: break entity_text = " ".join(entity_tokens) end = start + len(entity_text) spans[f"{etype}: {entity_text}"].append([start, end]) else: # Orphan I- tag — skip i += 1 return dict(spans) def convert_file(json_path: Path, source_name: str): """Convert a SecureBERT2 NER JSON file to list of JSONL records.""" with open(json_path) as f: data = json.load(f) records = [] for idx, (tokens, tags) in enumerate(zip(data["txt_data"], data["ner_tags"])): text, offsets = tokens_to_text_and_offsets(tokens) spans = extract_spans(tokens, tags, offsets) records.append({ "text": text, "spans": spans, "info": { "id": f"{source_name}_{idx:06d}", "source": source_name, }, }) return records def print_stats(records, label): """Print entity statistics for a set of records.""" entity_counts = defaultdict(int) total_entities = 0 for r in records: for key, positions in r["spans"].items(): cls = key.split(":")[0] entity_counts[cls] += len(positions) total_entities += len(positions) print(f"\n {label}:") print(f" Examples: {len(records)}") print(f" Total entities: {total_entities}") for cls in sorted(entity_counts): print(f" {cls}: {entity_counts[cls]}") def main(): base = Path("/home/ubuntu/alkyline") sb2_dir = base / "research" / "securebert2" / "opensource_data" out_dir = base / "data" / "processed" out_dir.mkdir(parents=True, exist_ok=True) for split, filename, source_name, out_name in [ ("train", "data_NER_train.json", "securebert2_train", "securebert2_5class_train.jsonl"), ("test", "data_NER_test.json", "securebert2_test", "securebert2_5class_test.jsonl"), ]: json_path = sb2_dir / filename if not json_path.exists(): print(f"Skipping {filename} — not found") continue print(f"Converting {filename}...") records = convert_file(json_path, source_name) out_path = out_dir / out_name with open(out_path, "w") as f: for r in records: f.write(json.dumps(r, ensure_ascii=False) + "\n") print(f" Written to {out_path}") print_stats(records, f"{split} ({filename})") print("\nDone.") if __name__ == "__main__": main()