arcspan / scripts /convert_aptner.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Convert APTNER CoNLL-style BIOES files to our 5-class JSONL format.
Handles noisy tags by extracting the BIOES prefix and base entity type,
then mapping to our 5-class label space.
"""
import json
import re
import sys
from collections import defaultdict
from pathlib import Path
# Label mapping: APTNER type -> our 5-class label (None = DROP)
LABEL_MAP = {
"MAL": "Malware",
"TOOL": "System",
"OS": "System",
"IDTY": "Organization",
"IDTYL": "Organization", # typo variant in data
"APT": "Organization",
"SECTEAM": "Organization",
"VULNAME": "Vulnerability",
"VULID": "Vulnerability",
"FILE": "Indicator",
"URL": "Indicator",
"IP": "Indicator",
"EMAIL": "Indicator",
"SHA2": "Indicator",
"SHA1": "Indicator",
"MD5": "Indicator",
"DOM": "Indicator",
# DROP these
"ACT": None,
"LOC": None,
"TIME": None,
"PROT": None,
"ENCR": None,
}
VALID_BIOES = {"B", "I", "O", "E", "S"}
def parse_tag(raw_tag: str):
"""Parse a potentially noisy tag. Returns (bioes_prefix, entity_type) or ('O', None)."""
raw_tag = raw_tag.strip()
if raw_tag == "O":
return "O", None
# Match standard BIOES-TYPE pattern at start
m = re.match(r'^([BIOES])-([A-Z][A-Z0-9]*)', raw_tag)
if m:
return m.group(1), m.group(2)
# Handle double prefix like E-S-SECTEAM or S-S-SECTEAM
m = re.match(r'^([BIOES])-[BIOES]-([A-Z][A-Z0-9]*)', raw_tag)
if m:
return m.group(1), m.group(2)
return "O", None
def parse_conll_file(path: Path):
"""Parse APTNER CoNLL file into list of (tokens, tags) sentences."""
sentences = []
tokens, tags = [], []
with open(path) as f:
for line in f:
line = line.rstrip("\n")
if not line or line.isspace():
if tokens:
sentences.append((tokens, tags))
tokens, tags = [], []
continue
# Space-separated: token tag (sometimes extra junk after tag)
parts = line.split(" ")
if len(parts) < 2:
# Malformed line - treat as O-tagged token
tokens.append(parts[0])
tags.append("O")
continue
token = parts[0]
# The tag is parts[1], but sometimes there's noise like "E-APT also"
raw_tag = parts[1]
tokens.append(token)
tags.append(raw_tag)
if tokens:
sentences.append((tokens, tags))
return sentences
def tokens_to_text_and_offsets(tokens):
"""Join tokens with spaces and return (text, list_of_char_offsets)."""
offsets = []
pos = 0
for t in tokens:
offsets.append(pos)
pos += len(t) + 1 # +1 for space
text = " ".join(tokens)
return text, offsets
def extract_spans(tokens, tags, offsets):
"""Extract entity spans from BIOES tags, mapped to our label space.
Returns dict like {"Malware: name": [[start, end], ...]}
"""
spans = defaultdict(list)
i = 0
n = len(tokens)
while i < n:
prefix, etype = parse_tag(tags[i])
if prefix == "O" or etype is None:
i += 1
continue
our_label = LABEL_MAP.get(etype)
if our_label is None:
# DROP this entity type
i += 1
continue
if prefix == "S":
# Single-token entity
entity_text = tokens[i]
start = offsets[i]
end = start + len(entity_text)
key = f"{our_label}: {entity_text}"
spans[key].append([start, end])
i += 1
elif prefix == "B":
# Start of multi-token entity
entity_tokens = [tokens[i]]
start = offsets[i]
i += 1
while i < n:
p2, e2 = parse_tag(tags[i])
if p2 in ("I", "E") and e2 == etype:
entity_tokens.append(tokens[i])
if p2 == "E":
i += 1
break
i += 1
else:
break
entity_text = " ".join(entity_tokens)
end = start + len(entity_text)
key = f"{our_label}: {entity_text}"
spans[key].append([start, end])
else:
# Orphan I/E tag - skip
i += 1
return dict(spans)
def convert_file(path: Path, source_name: str):
"""Convert a single APTNER file to list of JSONL records."""
sentences = parse_conll_file(path)
records = []
for idx, (tokens, tags) in enumerate(sentences):
text, offsets = tokens_to_text_and_offsets(tokens)
spans = extract_spans(tokens, tags, offsets)
records.append({
"text": text,
"spans": spans,
"info": {
"id": f"{source_name}_{idx:06d}",
"source": source_name,
}
})
return records
def build_dedup_set(jsonl_path: Path):
"""Build set of text[:80] for deduplication."""
texts = set()
with open(jsonl_path) as f:
for line in f:
obj = json.loads(line)
texts.add(obj["text"][:80])
return texts
def main():
base = Path("/home/ubuntu/alkyline")
aptner_dir = base / "data" / "raw" / "APTNER"
out_dir = base / "data" / "processed"
# Load existing data for dedup
existing_train = out_dir / "enriched_5class_train_cleaned.jsonl"
existing_valid = out_dir / "enriched_5class_valid_cleaned.jsonl"
print("Building dedup set from existing data...")
dedup_set = build_dedup_set(existing_train)
dedup_valid = build_dedup_set(existing_valid)
dedup_all = dedup_set | dedup_valid
print(f" Existing unique prefixes: {len(dedup_all)}")
# Convert each split
stats = {}
for split, filename, source_name in [
("train", "APTNERtrain.txt", "aptner_train"),
("dev", "APTNERdev.txt", "aptner_dev"),
("test", "APTNERtest.txt", "aptner_test"),
]:
path = aptner_dir / filename
print(f"\nConverting {filename}...")
records = convert_file(path, source_name)
# Dedup
new_records = []
dup_count = 0
for r in records:
if r["text"][:80] in dedup_all:
dup_count += 1
else:
new_records.append(r)
# Count entities
entity_counts = defaultdict(int)
total_entities = 0
for r in new_records:
for key, positions in r["spans"].items():
label = key.split(":")[0]
entity_counts[label] += len(positions)
total_entities += len(positions)
stats[split] = {
"total": len(records),
"duplicates": dup_count,
"new": len(new_records),
"entities": total_entities,
"by_class": dict(entity_counts),
}
print(f" Total sentences: {len(records)}")
print(f" Duplicates removed: {dup_count}")
print(f" New sentences: {len(new_records)}")
print(f" Entities: {total_entities}")
print(f" By class: {dict(entity_counts)}")
# Write output
out_path = out_dir / f"aptner_5class_{split}.jsonl"
with open(out_path, "w") as f:
for r in new_records:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f" Written to {out_path}")
# Summary
print("\n=== APTNER Conversion Summary ===")
for split, s in stats.items():
print(f" {split}: {s['total']} total → {s['new']} new ({s['duplicates']} dupes), {s['entities']} entities")
if __name__ == "__main__":
main()