| |
| """Annotate MITRE ATT&CK descriptions with cybersecurity entity spans.""" |
|
|
| import json |
| import re |
| import sys |
|
|
| INPUT = "/home/ubuntu/alkyline/data/raw/mitre_attack/mitre_descriptions.jsonl" |
| OUTPUT = "/home/ubuntu/alkyline/data/processed/llm_annotated_mitre_v2.jsonl" |
|
|
| |
| def clean_markdown(text): |
| """Remove markdown link syntax: [Name](url) -> Name. Returns (clean_text, mapping).""" |
| |
| result = [] |
| old_to_new = [] |
| i = 0 |
| new_i = 0 |
| while i < len(text): |
| |
| if text[i] == '[': |
| m = re.match(r'\[([^\]]*)\]\(([^)]*)\)', text[i:]) |
| if m: |
| link_text = m.group(1) |
| old_start = i |
| old_end = i + len(m.group(0)) |
| new_start = new_i |
| new_end = new_i + len(link_text) |
| old_to_new.append((old_start, old_end, new_start, new_end)) |
| result.append(link_text) |
| new_i += len(link_text) |
| i = old_end |
| continue |
| result.append(text[i]) |
| new_i += 1 |
| i += 1 |
| return ''.join(result), old_to_new |
|
|
|
|
| |
| |
|
|
| KNOWN_TOOLS = { |
| "Mimikatz", "PsExec", "PowerShell", "cmd", "cmd.exe", "Cobalt Strike", "Metasploit", |
| "Net", "netsh", "Netcat", "Nmap", "Wireshark", "BloodHound", "Empire", "Impacket", |
| "LaZagne", "CrackMapExec", "Responder", "John the Ripper", "Hashcat", "sqlmap", |
| "Burp Suite", "certutil", "bitsadmin", "cURL", "curl", "wget", "ssh", "scp", |
| "FTP", "Telnet", "tasklist", "ipconfig", "systeminfo", "whoami", "nltest", |
| "dsquery", "csvde", "ldifde", "ntdsutil", "vssadmin", "wmic", "WMI", |
| "PowerSploit", "Rubeus", "SharpHound", "ADFind", "PuTTY", "plink", |
| "7-Zip", "WinRAR", "RAR", "tar", "Reg", "at", "schtasks", "crontab", |
| "Windows Credential Editor", "gsecdump", "pwdump", "fgdump", "Windows Sysinternals", |
| "ProcDump", "Process Explorer", "Autoruns", "Sysmon", "tcpdump", "tshark", |
| "Net Crawler", "Tor", "HTRAN", "HTran", "NBTscan", "SDelete", "Timestomp", |
| "UPX", "Themida", "VMProtect", "nscd", "ifconfig", "arp", "route", |
| "traceroute", "ping", "nslookup", "dig", "netstat", "ss", "lsof", |
| "ps", "top", "kill", "chmod", "chown", "chattr", "mount", "umount", |
| "iptables", "ufw", "csc", "msbuild", "MSBuild", "InstallUtil", "Regsvr32", |
| "Rundll32", "Mshta", "CMSTP", "Regasm", "Regsvcs", "RegAsm", |
| "Compiled HTML File", "Control Panel Items", |
| "Koadic", "Pupy", "QuasarRAT", "Quasar RAT", "RemCom", |
| "PAExec", "Windows Remote Management", "WinRM", |
| "Remote Desktop Protocol", "RDP", "VNC", "TeamViewer", "AnyDesk", |
| "ngrok", "Plink", "socat", "Chisel", |
| "SharpView", "ADRecon", "Ping", "Tasklist", |
| "Nltest", "Dsquery", |
| } |
|
|
| KNOWN_SYSTEMS = { |
| "Windows", "Linux", "macOS", "Android", "iOS", "Unix", "FreeBSD", |
| "Solaris", "AIX", "HP-UX", "IRIX", "Chrome OS", |
| "Windows XP", "Windows 7", "Windows 8", "Windows 10", "Windows 11", |
| "Windows Vista", "Windows 2000", "Windows NT", |
| "Windows Server", "Windows Server 2003", "Windows Server 2008", |
| "Windows Server 2012", "Windows Server 2016", "Windows Server 2019", |
| "Windows Server 2022", |
| "Ubuntu", "Debian", "CentOS", "Red Hat", "Red Hat Enterprise Linux", "RHEL", |
| "Fedora", "Arch Linux", "Kali Linux", "Gentoo", "SUSE", |
| "Microsoft Office", "Microsoft Word", "Microsoft Excel", "Microsoft Outlook", |
| "Microsoft PowerPoint", "Microsoft Access", |
| "Office", "Word", "Excel", "Outlook", "PowerPoint", "Access", |
| "Active Directory", "Azure AD", "Azure Active Directory", |
| "Azure", "AWS", "Amazon Web Services", "Google Cloud", "GCP", |
| "Exchange", "Exchange Server", "SharePoint", "IIS", |
| "Internet Information Services", |
| "Apache", "Nginx", "nginx", "Docker", "Kubernetes", "VMware", |
| "Hyper-V", "VirtualBox", "QEMU", "KVM", |
| "Chrome", "Firefox", "Safari", "Edge", "Internet Explorer", |
| "Google Chrome", "Mozilla Firefox", |
| "SQL Server", "MySQL", "PostgreSQL", "Oracle", "MongoDB", |
| "Samba", "OpenSSH", "OpenSSL", "OpenVPN", |
| "Cisco", "Juniper", "MikroTik", "Fortinet", "FortiOS", "FortiGate", |
| "Palo Alto", "SonicWall", "Check Point", |
| "SNMP", "LDAP", "Kerberos", "NTLM", "SMB", "NFS", "DNS", "DHCP", |
| "HTTP", "HTTPS", "SSH", "TLS", "SSL", |
| "Group Policy", "GPO", |
| "WatchGuard", "Asus", "SOHO", |
| "macOS Gatekeeper", "Gatekeeper", |
| "Systemd", "systemd", "journald", |
| "SELinux", "AppArmor", |
| "Raspberry Pi", "Arduino", |
| "Telegram", "Signal", "WhatsApp", "Slack", "Discord", "Skype", |
| "Java", "JavaScript", "Python", "Perl", "Ruby", "PHP", "VBScript", |
| "JScript", "Visual Basic", "VBA", "VB.NET", "C#", ".NET", |
| "COM", "DCOM", "OLE", "DDE", |
| "PowerShell", "Bash", "cmd.exe", "Command Prompt", |
| "Registry", "Windows Registry", |
| "API", "Win32 API", "Native API", |
| "SAM", "LSASS", "lsass.exe", |
| "BIOS", "UEFI", "MBR", "GPT", "EFI", |
| "BitLocker", "FileVault", "LUKS", |
| "Windows Management Instrumentation", |
| "Component Object Model", |
| "Windows Defender", "Microsoft Defender", |
| "Security Accounts Manager", |
| "Local Security Authority Subsystem Service", |
| "Task Scheduler", |
| "Event Log", "Windows Event Log", |
| "CloudTrail", "S3", "EC2", "Lambda", |
| "Gmail", "Google Workspace", "Microsoft 365", "Office 365", |
| "Dropbox", "OneDrive", "Google Drive", "Box", |
| "GitHub", "GitLab", "Bitbucket", |
| "Jira", "Confluence", |
| "Splunk", "Elastic", "Elasticsearch", |
| "Snort", "Suricata", "YARA", |
| } |
|
|
| KNOWN_ORGS = { |
| "Microsoft", "Google", "Apple", "Amazon", "Facebook", "Meta", |
| "CISA", "NSA", "FBI", "CIA", "DHS", "NIST", |
| "FireEye", "Mandiant", "CrowdStrike", "Palo Alto Networks", |
| "Symantec", "Kaspersky", "ESET", "Trend Micro", "McAfee", |
| "Secureworks", "Recorded Future", "Proofpoint", "Cisco Talos", |
| "Dragos", "Volexity", "SentinelOne", "Carbon Black", |
| "Fortinet", "Sophos", "Avast", "Bitdefender", "Malwarebytes", |
| "US-CERT", "CERT", "MITRE", "ATT&CK", |
| "NATO", "United Nations", "UN", |
| "GRU", "FSB", "PLA", "MSS", |
| "Unit 42", "Unit42", "Talos", "X-Force", |
| "Accenture", "Deloitte", "PwC", "KPMG", "EY", |
| "SolarWinds", "Kaseya", |
| "Samsung", "Sony", "Intel", "AMD", "NVIDIA", "Qualcomm", |
| "Cloudflare", "Akamai", "Fastly", |
| "Adobe", "SAP", "Oracle", "IBM", "Dell", "HP", "Lenovo", |
| "GTsST", |
| } |
|
|
| |
| CVE_RE = re.compile(r'CVE-\d{4}-\d{4,}') |
| IP_RE = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b') |
| HASH_MD5_RE = re.compile(r'\b[a-fA-F0-9]{32}\b') |
| HASH_SHA1_RE = re.compile(r'\b[a-fA-F0-9]{40}\b') |
| HASH_SHA256_RE = re.compile(r'\b[a-fA-F0-9]{64}\b') |
| EMAIL_RE = re.compile(r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b') |
| DOMAIN_RE = re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|gov|mil|edu|co|ru|cn|uk|de|fr|jp|kr|br|info|biz|xyz|top|cc|tk|pw|me|tv|ly|onion)\b') |
| URL_RE = re.compile(r'https?://[^\s)<>\"]+') |
| |
| FILEPATH_WIN_RE = re.compile(r'[A-Z]:\\(?:[^\s,;\"\'<>|*?]+)') |
| FILEPATH_UNIX_RE = re.compile(r'(?<!\w)/(?:etc|usr|var|tmp|opt|home|root|bin|sbin|proc|sys|dev|mnt|boot|lib|lib64|run|srv)/[^\s,;\"\'<>|*?]*') |
| FILEPATH_SPECIAL_RE = re.compile(r'(?:%[A-Za-z_]+%|\\\\[^\s,;\"\'<>|*?]+)') |
|
|
|
|
| def find_all_occurrences(text, entity, label): |
| """Find all non-overlapping occurrences of entity in text.""" |
| spans = [] |
| start = 0 |
| while True: |
| idx = text.find(entity, start) |
| if idx == -1: |
| break |
| |
| if label not in ("FILEPATH", "URL", "IP_ADDRESS", "HASH", "EMAIL", "DOMAIN", "CVE_ID"): |
| |
| if idx > 0 and text[idx-1].isalnum(): |
| start = idx + 1 |
| continue |
| end = idx + len(entity) |
| if end < len(text) and text[end].isalnum(): |
| start = idx + 1 |
| continue |
| spans.append([idx, idx + len(entity)]) |
| start = idx + len(entity) |
| return spans |
|
|
|
|
| def annotate_record(record): |
| """Annotate a single MITRE record.""" |
| raw_text = record["text"] |
|
|
| |
| linked_names = [] |
| for m in re.finditer(r'\[([^\]]+)\]\(https?://attack\.mitre\.org/([^)]+)\)', raw_text): |
| name = m.group(1) |
| url_path = m.group(2) |
| if 'software/' in url_path: |
| linked_names.append((name, "MALWARE_OR_TOOL")) |
| elif 'groups/' in url_path: |
| linked_names.append((name, "THREAT_ACTOR")) |
| elif 'campaigns/' in url_path: |
| linked_names.append((name, "THREAT_ACTOR")) |
| elif 'techniques/' in url_path: |
| pass |
|
|
| |
| for m in re.finditer(r'\[([^\]]+)\]\(https?://(?!attack\.mitre\.org)[^)]+\)', raw_text): |
| linked_names.append((m.group(1), "REFERENCE")) |
|
|
| |
| clean_text, _ = clean_markdown(raw_text) |
|
|
| |
| spans = {} |
|
|
| def add_spans(label, entity, offsets): |
| if not offsets: |
| return |
| key = f"{label}: {entity}" |
| if key in spans: |
| |
| existing = set(tuple(s) for s in spans[key]) |
| for s in offsets: |
| if tuple(s) not in existing: |
| spans[key].append(s) |
| existing.add(tuple(s)) |
| else: |
| spans[key] = offsets |
|
|
| |
| |
| for m in CVE_RE.finditer(clean_text): |
| add_spans("CVE_ID", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in IP_RE.finditer(clean_text): |
| val = m.group() |
| |
| parts = val.split('.') |
| if all(0 <= int(p) <= 255 for p in parts): |
| add_spans("IP_ADDRESS", val, [[m.start(), m.end()]]) |
|
|
| |
| for m in URL_RE.finditer(clean_text): |
| add_spans("URL", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in EMAIL_RE.finditer(clean_text): |
| add_spans("EMAIL", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in DOMAIN_RE.finditer(clean_text): |
| |
| add_spans("DOMAIN", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in HASH_SHA256_RE.finditer(clean_text): |
| add_spans("HASH", m.group(), [[m.start(), m.end()]]) |
| for m in HASH_SHA1_RE.finditer(clean_text): |
| |
| val = m.group() |
| already = False |
| for k, v in spans.items(): |
| if k.startswith("HASH:") and any(s[0] <= m.start() and s[1] >= m.end() for s in v): |
| already = True |
| break |
| if not already: |
| add_spans("HASH", val, [[m.start(), m.end()]]) |
|
|
| |
| for pat in [FILEPATH_WIN_RE, FILEPATH_UNIX_RE, FILEPATH_SPECIAL_RE]: |
| for m in pat.finditer(clean_text): |
| add_spans("FILEPATH", m.group(), [[m.start(), m.end()]]) |
|
|
| |
|
|
| |
| for name, etype in linked_names: |
| if etype == "MALWARE_OR_TOOL": |
| |
| if name in KNOWN_TOOLS: |
| label = "TOOL" |
| elif name in KNOWN_SYSTEMS: |
| label = "SYSTEM" |
| else: |
| label = "MALWARE" |
| elif etype == "THREAT_ACTOR": |
| label = "THREAT_ACTOR" |
| else: |
| continue |
|
|
| offsets = find_all_occurrences(clean_text, name, label) |
| if offsets: |
| add_spans(label, name, offsets) |
|
|
| |
| for tool in KNOWN_TOOLS: |
| if tool in clean_text: |
| offsets = find_all_occurrences(clean_text, tool, "TOOL") |
| if offsets: |
| |
| add_spans("TOOL", tool, offsets) |
|
|
| |
| for sys_name in KNOWN_SYSTEMS: |
| if sys_name in clean_text: |
| offsets = find_all_occurrences(clean_text, sys_name, "SYSTEM") |
| if offsets: |
| add_spans("SYSTEM", sys_name, offsets) |
|
|
| |
| for org in KNOWN_ORGS: |
| if org in clean_text: |
| offsets = find_all_occurrences(clean_text, org, "ORGANIZATION") |
| if offsets: |
| add_spans("ORGANIZATION", org, offsets) |
|
|
| |
| |
| |
|
|
| |
| |
| all_span_keys = list(spans.keys()) |
| to_remove_entries = {} |
|
|
| for key1 in all_span_keys: |
| label1 = key1.split(": ", 1)[0] |
| entity1 = key1.split(": ", 1)[1] |
| for key2 in all_span_keys: |
| if key1 == key2: |
| continue |
| label2 = key2.split(": ", 1)[0] |
| entity2 = key2.split(": ", 1)[1] |
| |
| if entity1 in entity2 and label1 == label2: |
| |
| for s2 in spans.get(key2, []): |
| for s1 in spans.get(key1, []): |
| if s1[0] >= s2[0] and s1[1] <= s2[1]: |
| if key1 not in to_remove_entries: |
| to_remove_entries[key1] = set() |
| to_remove_entries[key1].add(tuple(s1)) |
|
|
| for key, removals in to_remove_entries.items(): |
| if key in spans: |
| spans[key] = [s for s in spans[key] if tuple(s) not in removals] |
| if not spans[key]: |
| del spans[key] |
|
|
| |
| |
| PRIORITY = { |
| "CVE_ID": 10, "IP_ADDRESS": 9, "HASH": 9, "EMAIL": 9, "URL": 9, |
| "DOMAIN": 8, "FILEPATH": 8, |
| "MALWARE": 7, "VULNERABILITY": 6, "TOOL": 5, |
| "THREAT_ACTOR": 4, "SYSTEM": 3, "ORGANIZATION": 2, |
| } |
|
|
| |
| all_spans_flat = [] |
| for key, offsets in spans.items(): |
| label = key.split(": ", 1)[0] |
| pri = PRIORITY.get(label, 0) |
| for s in offsets: |
| all_spans_flat.append((s[0], s[1], key, pri)) |
|
|
| |
| all_spans_flat.sort(key=lambda x: (x[0], -(x[1]-x[0]), -x[3])) |
|
|
| |
| kept = [] |
| for span in all_spans_flat: |
| overlaps = False |
| for k in kept: |
| if span[0] < k[1] and span[1] > k[0]: |
| if k[3] >= span[3] or (k[1] - k[0]) > (span[1] - span[0]): |
| overlaps = True |
| break |
| if not overlaps: |
| kept.append(span) |
|
|
| |
| new_spans = {} |
| for s0, s1, key, pri in kept: |
| if key not in new_spans: |
| new_spans[key] = [] |
| new_spans[key].append([s0, s1]) |
| spans = new_spans |
|
|
| return { |
| "text": clean_text, |
| "spans": spans, |
| "info": {"source": "mitre_attack_v2", "mitre_id": record["mitre_id"]} |
| } |
|
|
|
|
| def main(): |
| with open(INPUT) as f: |
| records = [json.loads(line) for line in f] |
|
|
| print(f"Processing {len(records)} records...") |
|
|
| with open(OUTPUT, 'w') as out: |
| for i, rec in enumerate(records): |
| result = annotate_record(rec) |
| out.write(json.dumps(result, ensure_ascii=False) + '\n') |
| if (i + 1) % 100 == 0: |
| out.flush() |
| print(f" {i+1}/{len(records)} done") |
|
|
| print(f"Wrote {len(records)} records to {OUTPUT}") |
|
|
| |
| print("\nRunning verification...") |
| errors = 0 |
| with open(OUTPUT) as f: |
| for i, line in enumerate(f): |
| rec = json.loads(line) |
| for key, offsets in rec["spans"].items(): |
| entity = key.split(": ", 1)[1] |
| for start, end in offsets: |
| actual = rec["text"][start:end] |
| if actual != entity: |
| errors += 1 |
| if errors <= 20: |
| print(f" Line {i}: expected '{entity}' got '{actual}' at [{start}:{end}]") |
| print(f"Total errors: {errors}") |
|
|
| |
| label_counts = {} |
| total_spans = 0 |
| with open(OUTPUT) as f: |
| for line in f: |
| rec = json.loads(line) |
| for key, offsets in rec["spans"].items(): |
| label = key.split(": ", 1)[0] |
| label_counts[label] = label_counts.get(label, 0) + len(offsets) |
| total_spans += len(offsets) |
|
|
| print(f"\nTotal spans: {total_spans}") |
| for label, count in sorted(label_counts.items(), key=lambda x: -x[1]): |
| print(f" {label}: {count}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|