| |
| """Convert SecureBERT2 NER JSON data to our 5-class JSONL format.""" |
| import json |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| |
| TAG_MAP = { |
| 0: ("B", "Indicator"), |
| 1: ("B", "Malware"), |
| 2: ("B", "Organization"), |
| 3: ("B", "System"), |
| 4: ("B", "Vulnerability"), |
| 5: ("I", "Indicator"), |
| 6: ("I", "Malware"), |
| 7: ("I", "Organization"), |
| 8: ("I", "System"), |
| 9: ("I", "Vulnerability"), |
| 10: ("O", None), |
| } |
|
|
|
|
| def tokens_to_text_and_offsets(tokens): |
| """Join tokens with spaces and return (text, list_of_char_offsets).""" |
| offsets = [] |
| pos = 0 |
| for t in tokens: |
| offsets.append(pos) |
| pos += len(t) + 1 |
| return " ".join(tokens), offsets |
|
|
|
|
| def extract_spans(tokens, tags, offsets): |
| """Extract entity spans from BIO integer tags. Returns spans dict.""" |
| spans = defaultdict(list) |
| i = 0 |
| n = len(tokens) |
| while i < n: |
| prefix, etype = TAG_MAP[tags[i]] |
| if prefix == "O" or etype is None: |
| i += 1 |
| continue |
| if prefix == "B": |
| start = offsets[i] |
| entity_tokens = [tokens[i]] |
| i += 1 |
| |
| while i < n: |
| p2, e2 = TAG_MAP[tags[i]] |
| if p2 == "I" and e2 == etype: |
| entity_tokens.append(tokens[i]) |
| i += 1 |
| else: |
| break |
| entity_text = " ".join(entity_tokens) |
| end = start + len(entity_text) |
| spans[f"{etype}: {entity_text}"].append([start, end]) |
| else: |
| |
| i += 1 |
| return dict(spans) |
|
|
|
|
| def convert_file(json_path: Path, source_name: str): |
| """Convert a SecureBERT2 NER JSON file to list of JSONL records.""" |
| with open(json_path) as f: |
| data = json.load(f) |
|
|
| records = [] |
| for idx, (tokens, tags) in enumerate(zip(data["txt_data"], data["ner_tags"])): |
| text, offsets = tokens_to_text_and_offsets(tokens) |
| spans = extract_spans(tokens, tags, offsets) |
| records.append({ |
| "text": text, |
| "spans": spans, |
| "info": { |
| "id": f"{source_name}_{idx:06d}", |
| "source": source_name, |
| }, |
| }) |
| return records |
|
|
|
|
| def print_stats(records, label): |
| """Print entity statistics for a set of records.""" |
| entity_counts = defaultdict(int) |
| total_entities = 0 |
| for r in records: |
| for key, positions in r["spans"].items(): |
| cls = key.split(":")[0] |
| entity_counts[cls] += len(positions) |
| total_entities += len(positions) |
| print(f"\n {label}:") |
| print(f" Examples: {len(records)}") |
| print(f" Total entities: {total_entities}") |
| for cls in sorted(entity_counts): |
| print(f" {cls}: {entity_counts[cls]}") |
|
|
|
|
| def main(): |
| base = Path("/home/ubuntu/alkyline") |
| sb2_dir = base / "research" / "securebert2" / "opensource_data" |
| out_dir = base / "data" / "processed" |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for split, filename, source_name, out_name in [ |
| ("train", "data_NER_train.json", "securebert2_train", "securebert2_5class_train.jsonl"), |
| ("test", "data_NER_test.json", "securebert2_test", "securebert2_5class_test.jsonl"), |
| ]: |
| json_path = sb2_dir / filename |
| if not json_path.exists(): |
| print(f"Skipping {filename} — not found") |
| continue |
|
|
| print(f"Converting {filename}...") |
| records = convert_file(json_path, source_name) |
|
|
| out_path = out_dir / out_name |
| with open(out_path, "w") as f: |
| for r in records: |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") |
| print(f" Written to {out_path}") |
| print_stats(records, f"{split} ({filename})") |
|
|
| print("\nDone.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|