| |
| """Fix label consistency issues in Arcspan cybersecurity NER datasets.""" |
|
|
| import json |
| import re |
| import sys |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| |
| ENTITY_TO_LABEL = {} |
|
|
| apt_groups = [ |
| "apt28", "apt29", "apt30", "apt32", "apt33", "apt34", "apt37", "apt38", "apt41", |
| "fin7", "fin8", "turla", "lazarus", "lazarus group", "kimsuky", |
| "oceanlotus", "ocean lotus", "winnti", "fancy bear", "cozy bear", |
| "equation group", "sandworm", "darkhotel", "pawn storm", "sofacy", |
| "carbanak group", "cobalt group", "ta505", "ta551", "muddywater", "charming kitten", |
| ] |
| companies = [ |
| "facebook", "github", "vmware", "cisco", "apple", "google", "microsoft", |
| "amazon", "oracle", "ibm", "samsung", "huawei", "intel", "adobe", "citrix", |
| "fortinet", "palo alto", "palo alto networks", "fireeye", "mandiant", |
| "crowdstrike", "kaspersky", "symantec", "mcafee", "trend micro", "sophos", "eset", |
| ] |
| products = [ |
| "powershell", "windows", "linux", "macos", "ios", "android", |
| "chrome", "firefox", "safari", "office", "outlook", "exchange", |
| "iis", "apache", "nginx", "docker", "kubernetes", |
| ] |
|
|
| for name in apt_groups + companies: |
| ENTITY_TO_LABEL[name] = "Organization" |
| for name in products: |
| ENTITY_TO_LABEL[name] = "System" |
|
|
| CVE_RE = re.compile(r"^CVE-\d{4}-\d+$", re.IGNORECASE) |
|
|
| def get_correct_label(surface_text): |
| key = surface_text.strip().lower() |
| if key in ENTITY_TO_LABEL: |
| return ENTITY_TO_LABEL[key] |
| if CVE_RE.match(key): |
| return "Vulnerability" |
| return None |
|
|
| def fix_file(filepath): |
| path = Path(filepath) |
| lines = path.read_text().strip().split("\n") |
| stats = defaultdict(int) |
| total_relabeled = 0 |
| fixed_lines = [] |
|
|
| for line in lines: |
| rec = json.loads(line) |
| spans = rec.get("spans", {}) |
| new_spans = {} |
| changed = False |
|
|
| for span_key, offsets in spans.items(): |
| |
| colon_idx = span_key.index(":") |
| old_label = span_key[:colon_idx] |
| entity_text = span_key[colon_idx + 1:].strip() |
|
|
| correct_label = get_correct_label(entity_text) |
|
|
| if correct_label and correct_label != old_label: |
| new_key = f"{correct_label}: {entity_text}" |
| stats[f"{old_label} → {correct_label}"] += len(offsets) |
| total_relabeled += len(offsets) |
| changed = True |
| else: |
| new_key = span_key |
|
|
| |
| if new_key in new_spans: |
| new_spans[new_key].extend(offsets) |
| else: |
| new_spans[new_key] = list(offsets) |
|
|
| if changed: |
| rec["spans"] = new_spans |
| fixed_lines.append(json.dumps(rec, ensure_ascii=False)) |
|
|
| path.write_text("\n".join(fixed_lines) + "\n") |
| return total_relabeled, dict(stats) |
|
|
| FILES = [ |
| "/home/ubuntu/alkyline/data/processed/enriched_5class_train_cleaned.jsonl", |
| "/home/ubuntu/alkyline/data/processed/enriched_5class_valid_cleaned.jsonl", |
| "/home/ubuntu/alkyline/data/processed/aptner_5class_train.jsonl", |
| "/home/ubuntu/alkyline/data/processed/defanged_augmented.jsonl", |
| ] |
|
|
| if __name__ == "__main__": |
| for f in FILES: |
| p = Path(f) |
| if not p.exists(): |
| print(f"SKIP (not found): {f}") |
| continue |
| total, breakdown = fix_file(f) |
| print(f"\n{'='*60}") |
| print(f"FILE: {p.name}") |
| print(f"Total span relabelings: {total}") |
| for transition, count in sorted(breakdown.items(), key=lambda x: -x[1]): |
| print(f" {transition}: {count}") |
|
|