#!/usr/bin/env python3 """Programmatic NER annotation for Exploit-DB entries.""" import json import re import sys INPUT = "/home/ubuntu/alkyline/data/raw/exploitdb/exploitdb_descriptions.jsonl" OUTPUT = "/home/ubuntu/alkyline/data/processed/llm_annotated_exploitdb.jsonl" # Common vulnerability type keywords (longest first for greedy match) VULN_TYPES = [ "Unauthenticated Remote Code Execution", "Authenticated Remote Code Execution", "Remote Code Execution (RCE)", "Unrestricted File Upload + RCE", "Stored Cross-Site Scripting (XSS)", "Reflected Cross-Site Scripting (XSS)", "Persistent Cross-Site Scripting", "Multiple Stored Cross-Site Scripting (XSS)", "Stored Cross-Site Scripting via SVG File Upload (Authenticated)", "Stored Cross Site Scripting", "Stored Cross-Site Scripting", "Reflected Cross-Site Scripting", "Cross-Site Scripting (XSS)", "Cross Site Scripting", "Cross-Site Scripting", "XML External Entity Injection", "Remote Code Execution", "Local Privilege Escalation", "Privilege Escalation", "Remote Buffer Overflow", "Buffer Overflow", "Stack Buffer Overflow", "Heap Buffer Overflow", "Stack-based Buffer Overflow", "Heap-based Buffer Overflow", "Integer Overflow", "Authentication Bypass", "Authorization Bypass", "Directory Traversal", "Path Traversal", "SQL Injection", "SQL injection", "Blind SQL Injection", "Time Based Blind SQL Injection", "Command Injection", "OS Command Injection", "Code Injection", "LDAP Injection", "SSTI", "Server Side Template Injection", "Server-Side Template Injection", "Server Side Request Forgery", "Server-Side Request Forgery (SSRF)", "Server-Side Request Forgery", "SSRF", "Remote File Inclusion", "Local File Inclusion", "File Inclusion", "Arbitrary File Upload", "Arbitrary File Read", "Arbitrary File Write", "Arbitrary File Download", "Arbitrary File Deletion", "Arbitrary Code Execution", "Remote Command Execution", "Insecure Direct Object Reference", "Insecure Permissions", "Insecure File Permissions", "Information Disclosure", "Credential Disclosure", "Remote Configuration Disclosure", "Password Disclosure", "Denial of Service (DoS)", "Denial of Service (PoC)", "Denial of Service", "Use-After-Free", "Use After Free", "Double Free", "Type Confusion", "Out-of-Bounds Write", "Out-of-Bounds Read", "Out of Bounds Write", "Out of Bounds Read", "Null Pointer Dereference", "Memory Corruption", "Format String", "Open Redirect", "CSRF", "Cross-Site Request Forgery", "IDOR", "XXE", "XSS", "SQLi", "RCE", "LFI", "RFI", "Remote Root Backdoor", "Remote Password Reset", "Unrestricted File Upload", "File Upload", "Persistent XSS", "Stored XSS", "Reflected XSS", "DOM XSS", ] # Known tools that appear in parentheses KNOWN_TOOLS = ["Metasploit"] # Regex patterns IP_RE = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b') DOMAIN_RE = re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|gov|edu|co|uk|de|fr|ru|cn|jp|info|biz)\b') URL_RE = re.compile(r'https?://[^\s<>"\')+,]+') FILEPATH_RE = re.compile(r'(?:/[a-zA-Z0-9_.+-]+){2,}|[a-zA-Z]:\\(?:[a-zA-Z0-9_.+-]+\\)*[a-zA-Z0-9_.+-]+') EMAIL_RE = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}') HASH_RE = re.compile(r'\b[a-fA-F0-9]{32,64}\b') CVE_RE = re.compile(r'CVE-\d{4}-\d{4,}') # Quoted parameter names that look like file paths but aren't PARAM_IN_QUOTES = re.compile(r"'[a-zA-Z0-9_./]+'") def find_all(text, substring): """Find all occurrences of substring in text, return list of (start, end).""" spans = [] start = 0 while True: idx = text.find(substring, start) if idx == -1: break spans.append([idx, idx + len(substring)]) start = idx + 1 return spans def parse_title(text): """Parse Exploit-DB title pattern: 'Product Version - Vuln Type (extras)' Returns (system_text, vuln_text, tool_text) or partial results. """ # Try splitting on ' - ' (the standard delimiter) # Use the LAST ' - ' that precedes a known vuln type, or just the last ' - ' parts = text.split(' - ') if len(parts) >= 2: # Try to find the split point where vuln type starts # Check from the second part onwards best_split = None for i in range(1, len(parts)): after = ' - '.join(parts[i:]) # Check if this starts with a known vuln pattern for vt in VULN_TYPES: after_clean = re.sub(r'\s*\(.*?\)\s*$', '', after).strip() if after_clean == vt or after.startswith(vt): best_split = i break # Also check for quoted-param patterns like 'param' SQL Injection if best_split is None and re.match(r"'[^']+'\s+", after): remainder = re.sub(r"^'[^']+'\s+", "", after) for vt in VULN_TYPES: if remainder.strip().startswith(vt) or re.sub(r'\s*\(.*?\)\s*$', '', remainder).strip() == vt: best_split = i break if best_split is None: # Default: first ' - ' is the split best_split = 1 system_part = ' - '.join(parts[:best_split]).strip() vuln_part = ' - '.join(parts[best_split:]).strip() return system_part, vuln_part return text.strip(), None def extract_vuln_from_part(text, vuln_part): """Extract vulnerability span from the vuln part of the title.""" if not vuln_part: return [] results = [] # Remove trailing (Metasploit) etc for vuln matching, but we'll handle tools separately clean = re.sub(r'\s*\(Metasploit\)\s*$', '', vuln_part).strip() # Remove trailing (Authenticated), (Unauthenticated), (PoC) — these are part of the vuln # Remove leading quoted param like 'username' param_match = re.match(r"'[^']+'\s+", clean) vuln_search = clean if param_match: vuln_search = clean[param_match.end():] # Try matching known vuln types (longest first) for vt in VULN_TYPES: if vt in vuln_search: # Find it in the original text spans = find_all(text, vt) if spans: results.append(("VULNERABILITY", vt, spans)) break else: # If no known type matched, try the whole clean vuln part as vulnerability # But only if it looks like a vuln (not too long, not a product name) stripped = re.sub(r'\s*\(.*?\)\s*$', '', clean).strip() if param_match: stripped = re.sub(r"^'[^']+'\s+", "", stripped).strip() if len(stripped) < 80 and stripped: spans = find_all(text, stripped) if spans: results.append(("VULNERABILITY", stripped, spans)) return results def annotate_entry(entry): text = entry["text"] cves = entry.get("cves", []) spans_dict = {} # "LABEL: entity" -> [[start, end], ...] def add_span(label, entity, positions): key = f"{label}: {entity}" if key not in spans_dict: spans_dict[key] = [] for pos in positions: if pos not in spans_dict[key]: spans_dict[key].append(pos) # 1. Parse title structure system_part, vuln_part = parse_title(text) # 2. SYSTEM entity — the product/system name if system_part: sys_spans = find_all(text, system_part) if sys_spans: add_span("SYSTEM", system_part, sys_spans) # 3. VULNERABILITY entity vuln_results = extract_vuln_from_part(text, vuln_part) for label, entity, positions in vuln_results: add_span(label, entity, positions) # 4. CVE_ID from cves field — check if in text for cve in cves: cve_spans = find_all(text, cve) if cve_spans: add_span("CVE_ID", cve, cve_spans) # CVEs from the field that aren't in the text: we still record them # but with no character spans (they're metadata) # Also find CVEs in text that might not be in the cves field for m in CVE_RE.finditer(text): cve_text = m.group() add_span("CVE_ID", cve_text, [[m.start(), m.end()]]) # 5. TOOL — check for (Metasploit) etc for tool in KNOWN_TOOLS: tool_spans = find_all(text, tool) if tool_spans: add_span("TOOL", tool, tool_spans) # 6. IP_ADDRESS — but skip version numbers embedded in product names for m in IP_RE.finditer(text): val = m.group() parts_ip = val.split('.') if all(0 <= int(p) <= 255 for p in parts_ip): # Heuristic: if it's inside the SYSTEM part of the title, it's a version # Also skip if preceded/followed by version-like context start_pos = m.start() # Check if this IP-like string is part of the system/product portion if system_part and start_pos < len(system_part) + 3: continue # Almost certainly a version number # Check surrounding context for version indicators before = text[max(0, start_pos-10):start_pos] after = text[m.end():m.end()+5] if re.search(r'[vV]\s*$|version\s*$|\d\s*$', before) or re.search(r'^\.\d', after): continue # If it's in the vuln part preceded by a letter/digit, likely a version if start_pos > 0 and text[start_pos-1].isalnum(): continue add_span("IP_ADDRESS", val, [[m.start(), m.end()]]) # 7. URL for m in URL_RE.finditer(text): add_span("URL", m.group(), [[m.start(), m.end()]]) # 8. EMAIL for m in EMAIL_RE.finditer(text): add_span("EMAIL", m.group(), [[m.start(), m.end()]]) # 9. DOMAIN (only if not already captured as part of URL/EMAIL) for m in DOMAIN_RE.finditer(text): # Skip if inside a URL or email skip = False for key in spans_dict: if key.startswith("URL:") or key.startswith("EMAIL:"): for s, e in spans_dict[key]: if s <= m.start() and m.end() <= e: skip = True break if not skip: add_span("DOMAIN", m.group(), [[m.start(), m.end()]]) # 10. FILEPATH — look for paths in the text for m in FILEPATH_RE.finditer(text): val = m.group() # Skip if it's inside a URL skip = False for key in spans_dict: if key.startswith("URL:"): for s, e in spans_dict[key]: if s <= m.start() and m.end() <= e: skip = True # Skip if inside the SYSTEM span (product names with slashes like KZTech/JatonTec) if system_part and m.start() < len(system_part): skip = True if not skip and len(val) > 3: add_span("FILEPATH", val, [[m.start(), m.end()]]) # 11. HASH for m in HASH_RE.finditer(text): val = m.group() # Skip CVE numbers and version-like strings if not CVE_RE.match(text[max(0,m.start()-4):m.end()]): add_span("HASH", val, [[m.start(), m.end()]]) return { "text": text, "spans": spans_dict, "info": { "source": "exploitdb", "exploit_id": entry["exploit_id"], } } def verify_offsets(result): """Verify all span offsets are correct.""" text = result["text"] errors = [] for key, positions in result["spans"].items(): label, entity = key.split(": ", 1) for start, end in positions: if start < 0 or end > len(text): errors.append(f"Out of bounds: {key} [{start},{end}] in text len {len(text)}") elif text[start:end] != entity: errors.append(f"Mismatch: {key} [{start},{end}] = '{text[start:end]}' != '{entity}'") return errors def main(): with open(INPUT) as f: entries = [json.loads(line) for line in f] print(f"Processing {len(entries)} entries...") all_errors = [] results = [] for entry in entries: result = annotate_entry(entry) errors = verify_offsets(result) if errors: all_errors.extend([(entry["exploit_id"], e) for e in errors]) results.append(result) # Write output with open(OUTPUT, "w") as f: for r in results: f.write(json.dumps(r, ensure_ascii=False) + "\n") print(f"Wrote {len(results)} annotated entries to {OUTPUT}") if all_errors: print(f"\n{len(all_errors)} offset errors found:") for eid, err in all_errors[:20]: print(f" [{eid}] {err}") if len(all_errors) > 20: print(f" ... and {len(all_errors)-20} more") else: print("All offsets verified correct!") # Stats label_counts = {} for r in results: for key in r["spans"]: label = key.split(": ", 1)[0] label_counts[label] = label_counts.get(label, 0) + 1 print("\nEntity type distribution:") for label, count in sorted(label_counts.items(), key=lambda x: -x[1]): print(f" {label}: {count}") entries_with_spans = sum(1 for r in results if r["spans"]) print(f"\nEntries with at least one span: {entries_with_spans}/{len(results)}") if __name__ == "__main__": main()