| |
| """Programmatic NER annotation for Exploit-DB entries.""" |
|
|
| import json |
| import re |
| import sys |
|
|
| INPUT = "/home/ubuntu/alkyline/data/raw/exploitdb/exploitdb_descriptions.jsonl" |
| OUTPUT = "/home/ubuntu/alkyline/data/processed/llm_annotated_exploitdb.jsonl" |
|
|
| |
| VULN_TYPES = [ |
| "Unauthenticated Remote Code Execution", |
| "Authenticated Remote Code Execution", |
| "Remote Code Execution (RCE)", |
| "Unrestricted File Upload + RCE", |
| "Stored Cross-Site Scripting (XSS)", |
| "Reflected Cross-Site Scripting (XSS)", |
| "Persistent Cross-Site Scripting", |
| "Multiple Stored Cross-Site Scripting (XSS)", |
| "Stored Cross-Site Scripting via SVG File Upload (Authenticated)", |
| "Stored Cross Site Scripting", |
| "Stored Cross-Site Scripting", |
| "Reflected Cross-Site Scripting", |
| "Cross-Site Scripting (XSS)", |
| "Cross Site Scripting", |
| "Cross-Site Scripting", |
| "XML External Entity Injection", |
| "Remote Code Execution", |
| "Local Privilege Escalation", |
| "Privilege Escalation", |
| "Remote Buffer Overflow", |
| "Buffer Overflow", |
| "Stack Buffer Overflow", |
| "Heap Buffer Overflow", |
| "Stack-based Buffer Overflow", |
| "Heap-based Buffer Overflow", |
| "Integer Overflow", |
| "Authentication Bypass", |
| "Authorization Bypass", |
| "Directory Traversal", |
| "Path Traversal", |
| "SQL Injection", |
| "SQL injection", |
| "Blind SQL Injection", |
| "Time Based Blind SQL Injection", |
| "Command Injection", |
| "OS Command Injection", |
| "Code Injection", |
| "LDAP Injection", |
| "SSTI", |
| "Server Side Template Injection", |
| "Server-Side Template Injection", |
| "Server Side Request Forgery", |
| "Server-Side Request Forgery (SSRF)", |
| "Server-Side Request Forgery", |
| "SSRF", |
| "Remote File Inclusion", |
| "Local File Inclusion", |
| "File Inclusion", |
| "Arbitrary File Upload", |
| "Arbitrary File Read", |
| "Arbitrary File Write", |
| "Arbitrary File Download", |
| "Arbitrary File Deletion", |
| "Arbitrary Code Execution", |
| "Remote Command Execution", |
| "Insecure Direct Object Reference", |
| "Insecure Permissions", |
| "Insecure File Permissions", |
| "Information Disclosure", |
| "Credential Disclosure", |
| "Remote Configuration Disclosure", |
| "Password Disclosure", |
| "Denial of Service (DoS)", |
| "Denial of Service (PoC)", |
| "Denial of Service", |
| "Use-After-Free", |
| "Use After Free", |
| "Double Free", |
| "Type Confusion", |
| "Out-of-Bounds Write", |
| "Out-of-Bounds Read", |
| "Out of Bounds Write", |
| "Out of Bounds Read", |
| "Null Pointer Dereference", |
| "Memory Corruption", |
| "Format String", |
| "Open Redirect", |
| "CSRF", |
| "Cross-Site Request Forgery", |
| "IDOR", |
| "XXE", |
| "XSS", |
| "SQLi", |
| "RCE", |
| "LFI", |
| "RFI", |
| "Remote Root Backdoor", |
| "Remote Password Reset", |
| "Unrestricted File Upload", |
| "File Upload", |
| "Persistent XSS", |
| "Stored XSS", |
| "Reflected XSS", |
| "DOM XSS", |
| ] |
|
|
| |
| KNOWN_TOOLS = ["Metasploit"] |
|
|
| |
| IP_RE = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b') |
| DOMAIN_RE = re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|gov|edu|co|uk|de|fr|ru|cn|jp|info|biz)\b') |
| URL_RE = re.compile(r'https?://[^\s<>"\')+,]+') |
| FILEPATH_RE = re.compile(r'(?:/[a-zA-Z0-9_.+-]+){2,}|[a-zA-Z]:\\(?:[a-zA-Z0-9_.+-]+\\)*[a-zA-Z0-9_.+-]+') |
| EMAIL_RE = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}') |
| HASH_RE = re.compile(r'\b[a-fA-F0-9]{32,64}\b') |
| CVE_RE = re.compile(r'CVE-\d{4}-\d{4,}') |
|
|
| |
| PARAM_IN_QUOTES = re.compile(r"'[a-zA-Z0-9_./]+'") |
|
|
|
|
| def find_all(text, substring): |
| """Find all occurrences of substring in text, return list of (start, end).""" |
| spans = [] |
| start = 0 |
| while True: |
| idx = text.find(substring, start) |
| if idx == -1: |
| break |
| spans.append([idx, idx + len(substring)]) |
| start = idx + 1 |
| return spans |
|
|
|
|
| def parse_title(text): |
| """Parse Exploit-DB title pattern: 'Product Version - Vuln Type (extras)' |
| |
| Returns (system_text, vuln_text, tool_text) or partial results. |
| """ |
| |
| |
| parts = text.split(' - ') |
|
|
| if len(parts) >= 2: |
| |
| |
| best_split = None |
| for i in range(1, len(parts)): |
| after = ' - '.join(parts[i:]) |
| |
| for vt in VULN_TYPES: |
| after_clean = re.sub(r'\s*\(.*?\)\s*$', '', after).strip() |
| if after_clean == vt or after.startswith(vt): |
| best_split = i |
| break |
| |
| if best_split is None and re.match(r"'[^']+'\s+", after): |
| remainder = re.sub(r"^'[^']+'\s+", "", after) |
| for vt in VULN_TYPES: |
| if remainder.strip().startswith(vt) or re.sub(r'\s*\(.*?\)\s*$', '', remainder).strip() == vt: |
| best_split = i |
| break |
|
|
| if best_split is None: |
| |
| best_split = 1 |
|
|
| system_part = ' - '.join(parts[:best_split]).strip() |
| vuln_part = ' - '.join(parts[best_split:]).strip() |
|
|
| return system_part, vuln_part |
|
|
| return text.strip(), None |
|
|
|
|
| def extract_vuln_from_part(text, vuln_part): |
| """Extract vulnerability span from the vuln part of the title.""" |
| if not vuln_part: |
| return [] |
|
|
| results = [] |
|
|
| |
| clean = re.sub(r'\s*\(Metasploit\)\s*$', '', vuln_part).strip() |
| |
|
|
| |
| param_match = re.match(r"'[^']+'\s+", clean) |
| vuln_search = clean |
| if param_match: |
| vuln_search = clean[param_match.end():] |
|
|
| |
| for vt in VULN_TYPES: |
| if vt in vuln_search: |
| |
| spans = find_all(text, vt) |
| if spans: |
| results.append(("VULNERABILITY", vt, spans)) |
| break |
| else: |
| |
| |
| stripped = re.sub(r'\s*\(.*?\)\s*$', '', clean).strip() |
| if param_match: |
| stripped = re.sub(r"^'[^']+'\s+", "", stripped).strip() |
| if len(stripped) < 80 and stripped: |
| spans = find_all(text, stripped) |
| if spans: |
| results.append(("VULNERABILITY", stripped, spans)) |
|
|
| return results |
|
|
|
|
| def annotate_entry(entry): |
| text = entry["text"] |
| cves = entry.get("cves", []) |
| spans_dict = {} |
|
|
| def add_span(label, entity, positions): |
| key = f"{label}: {entity}" |
| if key not in spans_dict: |
| spans_dict[key] = [] |
| for pos in positions: |
| if pos not in spans_dict[key]: |
| spans_dict[key].append(pos) |
|
|
| |
| system_part, vuln_part = parse_title(text) |
|
|
| |
| if system_part: |
| sys_spans = find_all(text, system_part) |
| if sys_spans: |
| add_span("SYSTEM", system_part, sys_spans) |
|
|
| |
| vuln_results = extract_vuln_from_part(text, vuln_part) |
| for label, entity, positions in vuln_results: |
| add_span(label, entity, positions) |
|
|
| |
| for cve in cves: |
| cve_spans = find_all(text, cve) |
| if cve_spans: |
| add_span("CVE_ID", cve, cve_spans) |
| |
| |
|
|
| |
| for m in CVE_RE.finditer(text): |
| cve_text = m.group() |
| add_span("CVE_ID", cve_text, [[m.start(), m.end()]]) |
|
|
| |
| for tool in KNOWN_TOOLS: |
| tool_spans = find_all(text, tool) |
| if tool_spans: |
| add_span("TOOL", tool, tool_spans) |
|
|
| |
| for m in IP_RE.finditer(text): |
| val = m.group() |
| parts_ip = val.split('.') |
| if all(0 <= int(p) <= 255 for p in parts_ip): |
| |
| |
| start_pos = m.start() |
| |
| if system_part and start_pos < len(system_part) + 3: |
| continue |
| |
| before = text[max(0, start_pos-10):start_pos] |
| after = text[m.end():m.end()+5] |
| if re.search(r'[vV]\s*$|version\s*$|\d\s*$', before) or re.search(r'^\.\d', after): |
| continue |
| |
| if start_pos > 0 and text[start_pos-1].isalnum(): |
| continue |
| add_span("IP_ADDRESS", val, [[m.start(), m.end()]]) |
|
|
| |
| for m in URL_RE.finditer(text): |
| add_span("URL", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in EMAIL_RE.finditer(text): |
| add_span("EMAIL", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in DOMAIN_RE.finditer(text): |
| |
| skip = False |
| for key in spans_dict: |
| if key.startswith("URL:") or key.startswith("EMAIL:"): |
| for s, e in spans_dict[key]: |
| if s <= m.start() and m.end() <= e: |
| skip = True |
| break |
| if not skip: |
| add_span("DOMAIN", m.group(), [[m.start(), m.end()]]) |
|
|
| |
| for m in FILEPATH_RE.finditer(text): |
| val = m.group() |
| |
| skip = False |
| for key in spans_dict: |
| if key.startswith("URL:"): |
| for s, e in spans_dict[key]: |
| if s <= m.start() and m.end() <= e: |
| skip = True |
| |
| if system_part and m.start() < len(system_part): |
| skip = True |
| if not skip and len(val) > 3: |
| add_span("FILEPATH", val, [[m.start(), m.end()]]) |
|
|
| |
| for m in HASH_RE.finditer(text): |
| val = m.group() |
| |
| if not CVE_RE.match(text[max(0,m.start()-4):m.end()]): |
| add_span("HASH", val, [[m.start(), m.end()]]) |
|
|
| return { |
| "text": text, |
| "spans": spans_dict, |
| "info": { |
| "source": "exploitdb", |
| "exploit_id": entry["exploit_id"], |
| } |
| } |
|
|
|
|
| def verify_offsets(result): |
| """Verify all span offsets are correct.""" |
| text = result["text"] |
| errors = [] |
| for key, positions in result["spans"].items(): |
| label, entity = key.split(": ", 1) |
| for start, end in positions: |
| if start < 0 or end > len(text): |
| errors.append(f"Out of bounds: {key} [{start},{end}] in text len {len(text)}") |
| elif text[start:end] != entity: |
| errors.append(f"Mismatch: {key} [{start},{end}] = '{text[start:end]}' != '{entity}'") |
| return errors |
|
|
|
|
| def main(): |
| with open(INPUT) as f: |
| entries = [json.loads(line) for line in f] |
|
|
| print(f"Processing {len(entries)} entries...") |
|
|
| all_errors = [] |
| results = [] |
|
|
| for entry in entries: |
| result = annotate_entry(entry) |
| errors = verify_offsets(result) |
| if errors: |
| all_errors.extend([(entry["exploit_id"], e) for e in errors]) |
| results.append(result) |
|
|
| |
| with open(OUTPUT, "w") as f: |
| for r in results: |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") |
|
|
| print(f"Wrote {len(results)} annotated entries to {OUTPUT}") |
|
|
| if all_errors: |
| print(f"\n{len(all_errors)} offset errors found:") |
| for eid, err in all_errors[:20]: |
| print(f" [{eid}] {err}") |
| if len(all_errors) > 20: |
| print(f" ... and {len(all_errors)-20} more") |
| else: |
| print("All offsets verified correct!") |
|
|
| |
| label_counts = {} |
| for r in results: |
| for key in r["spans"]: |
| label = key.split(": ", 1)[0] |
| label_counts[label] = label_counts.get(label, 0) + 1 |
|
|
| print("\nEntity type distribution:") |
| for label, count in sorted(label_counts.items(), key=lambda x: -x[1]): |
| print(f" {label}: {count}") |
|
|
| entries_with_spans = sum(1 for r in results if r["spans"]) |
| print(f"\nEntries with at least one span: {entries_with_spans}/{len(results)}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|