arcspan / src /scripts /annotate_exploitdb.py
chairulridjal's picture
Add files using upload-large-folder tool
07fcfbd verified
#!/usr/bin/env python3
"""Programmatic NER annotation for Exploit-DB entries."""
import json
import re
import sys
INPUT = "/home/ubuntu/alkyline/data/raw/exploitdb/exploitdb_descriptions.jsonl"
OUTPUT = "/home/ubuntu/alkyline/data/processed/llm_annotated_exploitdb.jsonl"
# Common vulnerability type keywords (longest first for greedy match)
VULN_TYPES = [
"Unauthenticated Remote Code Execution",
"Authenticated Remote Code Execution",
"Remote Code Execution (RCE)",
"Unrestricted File Upload + RCE",
"Stored Cross-Site Scripting (XSS)",
"Reflected Cross-Site Scripting (XSS)",
"Persistent Cross-Site Scripting",
"Multiple Stored Cross-Site Scripting (XSS)",
"Stored Cross-Site Scripting via SVG File Upload (Authenticated)",
"Stored Cross Site Scripting",
"Stored Cross-Site Scripting",
"Reflected Cross-Site Scripting",
"Cross-Site Scripting (XSS)",
"Cross Site Scripting",
"Cross-Site Scripting",
"XML External Entity Injection",
"Remote Code Execution",
"Local Privilege Escalation",
"Privilege Escalation",
"Remote Buffer Overflow",
"Buffer Overflow",
"Stack Buffer Overflow",
"Heap Buffer Overflow",
"Stack-based Buffer Overflow",
"Heap-based Buffer Overflow",
"Integer Overflow",
"Authentication Bypass",
"Authorization Bypass",
"Directory Traversal",
"Path Traversal",
"SQL Injection",
"SQL injection",
"Blind SQL Injection",
"Time Based Blind SQL Injection",
"Command Injection",
"OS Command Injection",
"Code Injection",
"LDAP Injection",
"SSTI",
"Server Side Template Injection",
"Server-Side Template Injection",
"Server Side Request Forgery",
"Server-Side Request Forgery (SSRF)",
"Server-Side Request Forgery",
"SSRF",
"Remote File Inclusion",
"Local File Inclusion",
"File Inclusion",
"Arbitrary File Upload",
"Arbitrary File Read",
"Arbitrary File Write",
"Arbitrary File Download",
"Arbitrary File Deletion",
"Arbitrary Code Execution",
"Remote Command Execution",
"Insecure Direct Object Reference",
"Insecure Permissions",
"Insecure File Permissions",
"Information Disclosure",
"Credential Disclosure",
"Remote Configuration Disclosure",
"Password Disclosure",
"Denial of Service (DoS)",
"Denial of Service (PoC)",
"Denial of Service",
"Use-After-Free",
"Use After Free",
"Double Free",
"Type Confusion",
"Out-of-Bounds Write",
"Out-of-Bounds Read",
"Out of Bounds Write",
"Out of Bounds Read",
"Null Pointer Dereference",
"Memory Corruption",
"Format String",
"Open Redirect",
"CSRF",
"Cross-Site Request Forgery",
"IDOR",
"XXE",
"XSS",
"SQLi",
"RCE",
"LFI",
"RFI",
"Remote Root Backdoor",
"Remote Password Reset",
"Unrestricted File Upload",
"File Upload",
"Persistent XSS",
"Stored XSS",
"Reflected XSS",
"DOM XSS",
]
# Known tools that appear in parentheses
KNOWN_TOOLS = ["Metasploit"]
# Regex patterns
IP_RE = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
DOMAIN_RE = re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|gov|edu|co|uk|de|fr|ru|cn|jp|info|biz)\b')
URL_RE = re.compile(r'https?://[^\s<>"\')+,]+')
FILEPATH_RE = re.compile(r'(?:/[a-zA-Z0-9_.+-]+){2,}|[a-zA-Z]:\\(?:[a-zA-Z0-9_.+-]+\\)*[a-zA-Z0-9_.+-]+')
EMAIL_RE = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
HASH_RE = re.compile(r'\b[a-fA-F0-9]{32,64}\b')
CVE_RE = re.compile(r'CVE-\d{4}-\d{4,}')
# Quoted parameter names that look like file paths but aren't
PARAM_IN_QUOTES = re.compile(r"'[a-zA-Z0-9_./]+'")
def find_all(text, substring):
"""Find all occurrences of substring in text, return list of (start, end)."""
spans = []
start = 0
while True:
idx = text.find(substring, start)
if idx == -1:
break
spans.append([idx, idx + len(substring)])
start = idx + 1
return spans
def parse_title(text):
"""Parse Exploit-DB title pattern: 'Product Version - Vuln Type (extras)'
Returns (system_text, vuln_text, tool_text) or partial results.
"""
# Try splitting on ' - ' (the standard delimiter)
# Use the LAST ' - ' that precedes a known vuln type, or just the last ' - '
parts = text.split(' - ')
if len(parts) >= 2:
# Try to find the split point where vuln type starts
# Check from the second part onwards
best_split = None
for i in range(1, len(parts)):
after = ' - '.join(parts[i:])
# Check if this starts with a known vuln pattern
for vt in VULN_TYPES:
after_clean = re.sub(r'\s*\(.*?\)\s*$', '', after).strip()
if after_clean == vt or after.startswith(vt):
best_split = i
break
# Also check for quoted-param patterns like 'param' SQL Injection
if best_split is None and re.match(r"'[^']+'\s+", after):
remainder = re.sub(r"^'[^']+'\s+", "", after)
for vt in VULN_TYPES:
if remainder.strip().startswith(vt) or re.sub(r'\s*\(.*?\)\s*$', '', remainder).strip() == vt:
best_split = i
break
if best_split is None:
# Default: first ' - ' is the split
best_split = 1
system_part = ' - '.join(parts[:best_split]).strip()
vuln_part = ' - '.join(parts[best_split:]).strip()
return system_part, vuln_part
return text.strip(), None
def extract_vuln_from_part(text, vuln_part):
"""Extract vulnerability span from the vuln part of the title."""
if not vuln_part:
return []
results = []
# Remove trailing (Metasploit) etc for vuln matching, but we'll handle tools separately
clean = re.sub(r'\s*\(Metasploit\)\s*$', '', vuln_part).strip()
# Remove trailing (Authenticated), (Unauthenticated), (PoC) — these are part of the vuln
# Remove leading quoted param like 'username'
param_match = re.match(r"'[^']+'\s+", clean)
vuln_search = clean
if param_match:
vuln_search = clean[param_match.end():]
# Try matching known vuln types (longest first)
for vt in VULN_TYPES:
if vt in vuln_search:
# Find it in the original text
spans = find_all(text, vt)
if spans:
results.append(("VULNERABILITY", vt, spans))
break
else:
# If no known type matched, try the whole clean vuln part as vulnerability
# But only if it looks like a vuln (not too long, not a product name)
stripped = re.sub(r'\s*\(.*?\)\s*$', '', clean).strip()
if param_match:
stripped = re.sub(r"^'[^']+'\s+", "", stripped).strip()
if len(stripped) < 80 and stripped:
spans = find_all(text, stripped)
if spans:
results.append(("VULNERABILITY", stripped, spans))
return results
def annotate_entry(entry):
text = entry["text"]
cves = entry.get("cves", [])
spans_dict = {} # "LABEL: entity" -> [[start, end], ...]
def add_span(label, entity, positions):
key = f"{label}: {entity}"
if key not in spans_dict:
spans_dict[key] = []
for pos in positions:
if pos not in spans_dict[key]:
spans_dict[key].append(pos)
# 1. Parse title structure
system_part, vuln_part = parse_title(text)
# 2. SYSTEM entity — the product/system name
if system_part:
sys_spans = find_all(text, system_part)
if sys_spans:
add_span("SYSTEM", system_part, sys_spans)
# 3. VULNERABILITY entity
vuln_results = extract_vuln_from_part(text, vuln_part)
for label, entity, positions in vuln_results:
add_span(label, entity, positions)
# 4. CVE_ID from cves field — check if in text
for cve in cves:
cve_spans = find_all(text, cve)
if cve_spans:
add_span("CVE_ID", cve, cve_spans)
# CVEs from the field that aren't in the text: we still record them
# but with no character spans (they're metadata)
# Also find CVEs in text that might not be in the cves field
for m in CVE_RE.finditer(text):
cve_text = m.group()
add_span("CVE_ID", cve_text, [[m.start(), m.end()]])
# 5. TOOL — check for (Metasploit) etc
for tool in KNOWN_TOOLS:
tool_spans = find_all(text, tool)
if tool_spans:
add_span("TOOL", tool, tool_spans)
# 6. IP_ADDRESS — but skip version numbers embedded in product names
for m in IP_RE.finditer(text):
val = m.group()
parts_ip = val.split('.')
if all(0 <= int(p) <= 255 for p in parts_ip):
# Heuristic: if it's inside the SYSTEM part of the title, it's a version
# Also skip if preceded/followed by version-like context
start_pos = m.start()
# Check if this IP-like string is part of the system/product portion
if system_part and start_pos < len(system_part) + 3:
continue # Almost certainly a version number
# Check surrounding context for version indicators
before = text[max(0, start_pos-10):start_pos]
after = text[m.end():m.end()+5]
if re.search(r'[vV]\s*$|version\s*$|\d\s*$', before) or re.search(r'^\.\d', after):
continue
# If it's in the vuln part preceded by a letter/digit, likely a version
if start_pos > 0 and text[start_pos-1].isalnum():
continue
add_span("IP_ADDRESS", val, [[m.start(), m.end()]])
# 7. URL
for m in URL_RE.finditer(text):
add_span("URL", m.group(), [[m.start(), m.end()]])
# 8. EMAIL
for m in EMAIL_RE.finditer(text):
add_span("EMAIL", m.group(), [[m.start(), m.end()]])
# 9. DOMAIN (only if not already captured as part of URL/EMAIL)
for m in DOMAIN_RE.finditer(text):
# Skip if inside a URL or email
skip = False
for key in spans_dict:
if key.startswith("URL:") or key.startswith("EMAIL:"):
for s, e in spans_dict[key]:
if s <= m.start() and m.end() <= e:
skip = True
break
if not skip:
add_span("DOMAIN", m.group(), [[m.start(), m.end()]])
# 10. FILEPATH — look for paths in the text
for m in FILEPATH_RE.finditer(text):
val = m.group()
# Skip if it's inside a URL
skip = False
for key in spans_dict:
if key.startswith("URL:"):
for s, e in spans_dict[key]:
if s <= m.start() and m.end() <= e:
skip = True
# Skip if inside the SYSTEM span (product names with slashes like KZTech/JatonTec)
if system_part and m.start() < len(system_part):
skip = True
if not skip and len(val) > 3:
add_span("FILEPATH", val, [[m.start(), m.end()]])
# 11. HASH
for m in HASH_RE.finditer(text):
val = m.group()
# Skip CVE numbers and version-like strings
if not CVE_RE.match(text[max(0,m.start()-4):m.end()]):
add_span("HASH", val, [[m.start(), m.end()]])
return {
"text": text,
"spans": spans_dict,
"info": {
"source": "exploitdb",
"exploit_id": entry["exploit_id"],
}
}
def verify_offsets(result):
"""Verify all span offsets are correct."""
text = result["text"]
errors = []
for key, positions in result["spans"].items():
label, entity = key.split(": ", 1)
for start, end in positions:
if start < 0 or end > len(text):
errors.append(f"Out of bounds: {key} [{start},{end}] in text len {len(text)}")
elif text[start:end] != entity:
errors.append(f"Mismatch: {key} [{start},{end}] = '{text[start:end]}' != '{entity}'")
return errors
def main():
with open(INPUT) as f:
entries = [json.loads(line) for line in f]
print(f"Processing {len(entries)} entries...")
all_errors = []
results = []
for entry in entries:
result = annotate_entry(entry)
errors = verify_offsets(result)
if errors:
all_errors.extend([(entry["exploit_id"], e) for e in errors])
results.append(result)
# Write output
with open(OUTPUT, "w") as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f"Wrote {len(results)} annotated entries to {OUTPUT}")
if all_errors:
print(f"\n{len(all_errors)} offset errors found:")
for eid, err in all_errors[:20]:
print(f" [{eid}] {err}")
if len(all_errors) > 20:
print(f" ... and {len(all_errors)-20} more")
else:
print("All offsets verified correct!")
# Stats
label_counts = {}
for r in results:
for key in r["spans"]:
label = key.split(": ", 1)[0]
label_counts[label] = label_counts.get(label, 0) + 1
print("\nEntity type distribution:")
for label, count in sorted(label_counts.items(), key=lambda x: -x[1]):
print(f" {label}: {count}")
entries_with_spans = sum(1 for r in results if r["spans"])
print(f"\nEntries with at least one span: {entries_with_spans}/{len(results)}")
if __name__ == "__main__":
main()