#!/usr/bin/env python3 """Convert APTNER CoNLL-style BIOES files to our 5-class JSONL format. Handles noisy tags by extracting the BIOES prefix and base entity type, then mapping to our 5-class label space. """ import json import re import sys from collections import defaultdict from pathlib import Path # Label mapping: APTNER type -> our 5-class label (None = DROP) LABEL_MAP = { "MAL": "Malware", "TOOL": "System", "OS": "System", "IDTY": "Organization", "IDTYL": "Organization", # typo variant in data "APT": "Organization", "SECTEAM": "Organization", "VULNAME": "Vulnerability", "VULID": "Vulnerability", "FILE": "Indicator", "URL": "Indicator", "IP": "Indicator", "EMAIL": "Indicator", "SHA2": "Indicator", "SHA1": "Indicator", "MD5": "Indicator", "DOM": "Indicator", # DROP these "ACT": None, "LOC": None, "TIME": None, "PROT": None, "ENCR": None, } VALID_BIOES = {"B", "I", "O", "E", "S"} def parse_tag(raw_tag: str): """Parse a potentially noisy tag. Returns (bioes_prefix, entity_type) or ('O', None).""" raw_tag = raw_tag.strip() if raw_tag == "O": return "O", None # Match standard BIOES-TYPE pattern at start m = re.match(r'^([BIOES])-([A-Z][A-Z0-9]*)', raw_tag) if m: return m.group(1), m.group(2) # Handle double prefix like E-S-SECTEAM or S-S-SECTEAM m = re.match(r'^([BIOES])-[BIOES]-([A-Z][A-Z0-9]*)', raw_tag) if m: return m.group(1), m.group(2) return "O", None def parse_conll_file(path: Path): """Parse APTNER CoNLL file into list of (tokens, tags) sentences.""" sentences = [] tokens, tags = [], [] with open(path) as f: for line in f: line = line.rstrip("\n") if not line or line.isspace(): if tokens: sentences.append((tokens, tags)) tokens, tags = [], [] continue # Space-separated: token tag (sometimes extra junk after tag) parts = line.split(" ") if len(parts) < 2: # Malformed line - treat as O-tagged token tokens.append(parts[0]) tags.append("O") continue token = parts[0] # The tag is parts[1], but sometimes there's noise like "E-APT also" raw_tag = parts[1] tokens.append(token) tags.append(raw_tag) if tokens: sentences.append((tokens, tags)) return sentences def tokens_to_text_and_offsets(tokens): """Join tokens with spaces and return (text, list_of_char_offsets).""" offsets = [] pos = 0 for t in tokens: offsets.append(pos) pos += len(t) + 1 # +1 for space text = " ".join(tokens) return text, offsets def extract_spans(tokens, tags, offsets): """Extract entity spans from BIOES tags, mapped to our label space. Returns dict like {"Malware: name": [[start, end], ...]} """ spans = defaultdict(list) i = 0 n = len(tokens) while i < n: prefix, etype = parse_tag(tags[i]) if prefix == "O" or etype is None: i += 1 continue our_label = LABEL_MAP.get(etype) if our_label is None: # DROP this entity type i += 1 continue if prefix == "S": # Single-token entity entity_text = tokens[i] start = offsets[i] end = start + len(entity_text) key = f"{our_label}: {entity_text}" spans[key].append([start, end]) i += 1 elif prefix == "B": # Start of multi-token entity entity_tokens = [tokens[i]] start = offsets[i] i += 1 while i < n: p2, e2 = parse_tag(tags[i]) if p2 in ("I", "E") and e2 == etype: entity_tokens.append(tokens[i]) if p2 == "E": i += 1 break i += 1 else: break entity_text = " ".join(entity_tokens) end = start + len(entity_text) key = f"{our_label}: {entity_text}" spans[key].append([start, end]) else: # Orphan I/E tag - skip i += 1 return dict(spans) def convert_file(path: Path, source_name: str): """Convert a single APTNER file to list of JSONL records.""" sentences = parse_conll_file(path) records = [] for idx, (tokens, tags) in enumerate(sentences): text, offsets = tokens_to_text_and_offsets(tokens) spans = extract_spans(tokens, tags, offsets) records.append({ "text": text, "spans": spans, "info": { "id": f"{source_name}_{idx:06d}", "source": source_name, } }) return records def build_dedup_set(jsonl_path: Path): """Build set of text[:80] for deduplication.""" texts = set() with open(jsonl_path) as f: for line in f: obj = json.loads(line) texts.add(obj["text"][:80]) return texts def main(): base = Path("/home/ubuntu/alkyline") aptner_dir = base / "data" / "raw" / "APTNER" out_dir = base / "data" / "processed" # Load existing data for dedup existing_train = out_dir / "enriched_5class_train_cleaned.jsonl" existing_valid = out_dir / "enriched_5class_valid_cleaned.jsonl" print("Building dedup set from existing data...") dedup_set = build_dedup_set(existing_train) dedup_valid = build_dedup_set(existing_valid) dedup_all = dedup_set | dedup_valid print(f" Existing unique prefixes: {len(dedup_all)}") # Convert each split stats = {} for split, filename, source_name in [ ("train", "APTNERtrain.txt", "aptner_train"), ("dev", "APTNERdev.txt", "aptner_dev"), ("test", "APTNERtest.txt", "aptner_test"), ]: path = aptner_dir / filename print(f"\nConverting {filename}...") records = convert_file(path, source_name) # Dedup new_records = [] dup_count = 0 for r in records: if r["text"][:80] in dedup_all: dup_count += 1 else: new_records.append(r) # Count entities entity_counts = defaultdict(int) total_entities = 0 for r in new_records: for key, positions in r["spans"].items(): label = key.split(":")[0] entity_counts[label] += len(positions) total_entities += len(positions) stats[split] = { "total": len(records), "duplicates": dup_count, "new": len(new_records), "entities": total_entities, "by_class": dict(entity_counts), } print(f" Total sentences: {len(records)}") print(f" Duplicates removed: {dup_count}") print(f" New sentences: {len(new_records)}") print(f" Entities: {total_entities}") print(f" By class: {dict(entity_counts)}") # Write output out_path = out_dir / f"aptner_5class_{split}.jsonl" with open(out_path, "w") as f: for r in new_records: f.write(json.dumps(r, ensure_ascii=False) + "\n") print(f" Written to {out_path}") # Summary print("\n=== APTNER Conversion Summary ===") for split, s in stats.items(): print(f" {split}: {s['total']} total → {s['new']} new ({s['duplicates']} dupes), {s['entities']} entities") if __name__ == "__main__": main()