| |
| """Convert APTNER CoNLL-style BIOES files to our 5-class JSONL format. |
| |
| Handles noisy tags by extracting the BIOES prefix and base entity type, |
| then mapping to our 5-class label space. |
| """ |
| import json |
| import re |
| import sys |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| |
| LABEL_MAP = { |
| "MAL": "Malware", |
| "TOOL": "System", |
| "OS": "System", |
| "IDTY": "Organization", |
| "IDTYL": "Organization", |
| "APT": "Organization", |
| "SECTEAM": "Organization", |
| "VULNAME": "Vulnerability", |
| "VULID": "Vulnerability", |
| "FILE": "Indicator", |
| "URL": "Indicator", |
| "IP": "Indicator", |
| "EMAIL": "Indicator", |
| "SHA2": "Indicator", |
| "SHA1": "Indicator", |
| "MD5": "Indicator", |
| "DOM": "Indicator", |
| |
| "ACT": None, |
| "LOC": None, |
| "TIME": None, |
| "PROT": None, |
| "ENCR": None, |
| } |
|
|
| VALID_BIOES = {"B", "I", "O", "E", "S"} |
|
|
|
|
| def parse_tag(raw_tag: str): |
| """Parse a potentially noisy tag. Returns (bioes_prefix, entity_type) or ('O', None).""" |
| raw_tag = raw_tag.strip() |
| if raw_tag == "O": |
| return "O", None |
| |
| m = re.match(r'^([BIOES])-([A-Z][A-Z0-9]*)', raw_tag) |
| if m: |
| return m.group(1), m.group(2) |
| |
| m = re.match(r'^([BIOES])-[BIOES]-([A-Z][A-Z0-9]*)', raw_tag) |
| if m: |
| return m.group(1), m.group(2) |
| return "O", None |
|
|
|
|
| def parse_conll_file(path: Path): |
| """Parse APTNER CoNLL file into list of (tokens, tags) sentences.""" |
| sentences = [] |
| tokens, tags = [], [] |
| with open(path) as f: |
| for line in f: |
| line = line.rstrip("\n") |
| if not line or line.isspace(): |
| if tokens: |
| sentences.append((tokens, tags)) |
| tokens, tags = [], [] |
| continue |
| |
| parts = line.split(" ") |
| if len(parts) < 2: |
| |
| tokens.append(parts[0]) |
| tags.append("O") |
| continue |
| token = parts[0] |
| |
| raw_tag = parts[1] |
| tokens.append(token) |
| tags.append(raw_tag) |
| if tokens: |
| sentences.append((tokens, tags)) |
| return sentences |
|
|
|
|
| def tokens_to_text_and_offsets(tokens): |
| """Join tokens with spaces and return (text, list_of_char_offsets).""" |
| offsets = [] |
| pos = 0 |
| for t in tokens: |
| offsets.append(pos) |
| pos += len(t) + 1 |
| text = " ".join(tokens) |
| return text, offsets |
|
|
|
|
| def extract_spans(tokens, tags, offsets): |
| """Extract entity spans from BIOES tags, mapped to our label space. |
| |
| Returns dict like {"Malware: name": [[start, end], ...]} |
| """ |
| spans = defaultdict(list) |
| i = 0 |
| n = len(tokens) |
| while i < n: |
| prefix, etype = parse_tag(tags[i]) |
| if prefix == "O" or etype is None: |
| i += 1 |
| continue |
| our_label = LABEL_MAP.get(etype) |
| if our_label is None: |
| |
| i += 1 |
| continue |
|
|
| if prefix == "S": |
| |
| entity_text = tokens[i] |
| start = offsets[i] |
| end = start + len(entity_text) |
| key = f"{our_label}: {entity_text}" |
| spans[key].append([start, end]) |
| i += 1 |
| elif prefix == "B": |
| |
| entity_tokens = [tokens[i]] |
| start = offsets[i] |
| i += 1 |
| while i < n: |
| p2, e2 = parse_tag(tags[i]) |
| if p2 in ("I", "E") and e2 == etype: |
| entity_tokens.append(tokens[i]) |
| if p2 == "E": |
| i += 1 |
| break |
| i += 1 |
| else: |
| break |
| entity_text = " ".join(entity_tokens) |
| end = start + len(entity_text) |
| key = f"{our_label}: {entity_text}" |
| spans[key].append([start, end]) |
| else: |
| |
| i += 1 |
| return dict(spans) |
|
|
|
|
| def convert_file(path: Path, source_name: str): |
| """Convert a single APTNER file to list of JSONL records.""" |
| sentences = parse_conll_file(path) |
| records = [] |
| for idx, (tokens, tags) in enumerate(sentences): |
| text, offsets = tokens_to_text_and_offsets(tokens) |
| spans = extract_spans(tokens, tags, offsets) |
| records.append({ |
| "text": text, |
| "spans": spans, |
| "info": { |
| "id": f"{source_name}_{idx:06d}", |
| "source": source_name, |
| } |
| }) |
| return records |
|
|
|
|
| def build_dedup_set(jsonl_path: Path): |
| """Build set of text[:80] for deduplication.""" |
| texts = set() |
| with open(jsonl_path) as f: |
| for line in f: |
| obj = json.loads(line) |
| texts.add(obj["text"][:80]) |
| return texts |
|
|
|
|
| def main(): |
| base = Path("/home/ubuntu/alkyline") |
| aptner_dir = base / "data" / "raw" / "APTNER" |
| out_dir = base / "data" / "processed" |
|
|
| |
| existing_train = out_dir / "enriched_5class_train_cleaned.jsonl" |
| existing_valid = out_dir / "enriched_5class_valid_cleaned.jsonl" |
|
|
| print("Building dedup set from existing data...") |
| dedup_set = build_dedup_set(existing_train) |
| dedup_valid = build_dedup_set(existing_valid) |
| dedup_all = dedup_set | dedup_valid |
| print(f" Existing unique prefixes: {len(dedup_all)}") |
|
|
| |
| stats = {} |
| for split, filename, source_name in [ |
| ("train", "APTNERtrain.txt", "aptner_train"), |
| ("dev", "APTNERdev.txt", "aptner_dev"), |
| ("test", "APTNERtest.txt", "aptner_test"), |
| ]: |
| path = aptner_dir / filename |
| print(f"\nConverting {filename}...") |
| records = convert_file(path, source_name) |
|
|
| |
| new_records = [] |
| dup_count = 0 |
| for r in records: |
| if r["text"][:80] in dedup_all: |
| dup_count += 1 |
| else: |
| new_records.append(r) |
|
|
| |
| entity_counts = defaultdict(int) |
| total_entities = 0 |
| for r in new_records: |
| for key, positions in r["spans"].items(): |
| label = key.split(":")[0] |
| entity_counts[label] += len(positions) |
| total_entities += len(positions) |
|
|
| stats[split] = { |
| "total": len(records), |
| "duplicates": dup_count, |
| "new": len(new_records), |
| "entities": total_entities, |
| "by_class": dict(entity_counts), |
| } |
|
|
| print(f" Total sentences: {len(records)}") |
| print(f" Duplicates removed: {dup_count}") |
| print(f" New sentences: {len(new_records)}") |
| print(f" Entities: {total_entities}") |
| print(f" By class: {dict(entity_counts)}") |
|
|
| |
| out_path = out_dir / f"aptner_5class_{split}.jsonl" |
| with open(out_path, "w") as f: |
| for r in new_records: |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") |
| print(f" Written to {out_path}") |
|
|
| |
| print("\n=== APTNER Conversion Summary ===") |
| for split, s in stats.items(): |
| print(f" {split}: {s['total']} total → {s['new']} new ({s['duplicates']} dupes), {s['entities']} entities") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|