arcspan / scripts /annotate_nvd.py
chairulridjal's picture
Add files using upload-large-folder tool
3dac39e verified
#!/usr/bin/env python3
"""Annotate NVD CVE descriptions with cybersecurity NER entities."""
import json, re, sys
with open("/home/ubuntu/alkyline/data/raw/nvd_cves_filtered.json") as f:
cves = json.load(f)
# Entity patterns with priorities (later patterns can override earlier)
# We'll collect all matches, then resolve overlaps
def find_all(text):
"""Find all entity spans in text. Returns list of (start, end, label, surface)."""
spans = []
# CVE_ID
for m in re.finditer(r'CVE-\d{4}-\d{4,}', text):
spans.append((m.start(), m.end(), 'CVE_ID', m.group()))
# VULNERABILITY patterns
vuln_patterns = [
r'(?:buffer\s+overflow|stack\s+overflow|heap\s+overflow|integer\s+overflow)',
r'(?:use-after-free|double[- ]free|null\s+pointer\s+dereference|NULL\s+pointer\s+dereference)',
r'(?:remote\s+code\s+execution|arbitrary\s+code\s+execution)',
r'(?:SQL\s+injection|command\s+injection|code\s+injection|OS\s+command\s+injection)',
r'(?:cross-site\s+scripting|stored\s+XSS|reflected\s+XSS|DOM-based\s+cross-site\s+scripting|Stored\s+Cross-Site\s+Scripting)',
r'(?:denial\s+of\s+service|DoS)',
r'(?:path\s+traversal|directory\s+traversal|Path\s+Traversal)',
r'(?:privilege\s+escalation|authentication\s+bypass)',
r'(?:information\s+disclosure|information\s+leak)',
r'(?:memory\s+leak|memory\s+corruption|memory\s+consumption)',
r'(?:race\s+condition)',
r'(?:out-of-bounds\s+(?:read|write|access)|oob\s+access)',
r'(?:type\s+confusion)',
r'(?:improper\s+(?:input\s+validation|access\s+control|authentication|authorization))',
r'(?:brute-force\s+protection\s+bypass)',
r'(?:SASL\s+mechanism\s+downgrade)',
r'(?:response\s+injection)',
r'(?:prompt\s+injection)',
r'(?:token\s+theft)',
r'(?:arbitrary\s+file\s+(?:upload|delete|read|write)s?)',
r'(?:sanitization\s+bypass)',
r'(?:RBAC\s+(?:bypass|restriction)s?)',
]
for pat in vuln_patterns:
for m in re.finditer(pat, text, re.IGNORECASE):
spans.append((m.start(), m.end(), 'VULNERABILITY', m.group()))
# SYSTEM patterns - software/hardware names
system_patterns = [
r'\b(?:Linux\s+kernel)\b',
r'\b(?:WordPress)\b',
r'\b(?:Apache\s+(?:HTTP\s+Server|Tomcat|Kafka|Struts|Maven|Hadoop|Spark|Flink|Airflow|Camel|Solr|CXF|NiFi|OFBiz))\b',
r'\b(?:Cisco\s+(?:IOS|NX-OS|ASA|WebEx|Meraki|Catalyst|Nexus|ISE))\b',
r'\b(?:Android|iOS|macOS|Windows|FreeBSD|OpenBSD|NetBSD)\b',
r'\b(?:Chrome|Firefox|Safari|Edge|Opera)\b',
r'\b(?:MySQL|PostgreSQL|MariaDB|MongoDB|Redis|SQLite|Oracle\s+Database)\b',
r'\b(?:nginx|HAProxy|Envoy|Traefik)\b',
r'\b(?:Docker|Kubernetes|Kyverno|Helm)\b',
r'\b(?:OpenSSL|GnuTLS|LibreSSL|BoringSSL)\b',
r'\b(?:Git|GitLab|GitHub|Bitbucket)\b',
r'\b(?:MailKit|basic-ftp|AnythingLLM|Contact\s+Form\s+7)\b',
r'\b(?:STARTTLS|GraphQL|LDAP|OAuth|FTP|SMTP|HTTP|HTTPS|SSH|DNS|NFS|TFTP)\b',
r'\b(?:Delta\s+Electronics\s+AS320T|AS320T)\b',
r'\b(?:D-Link\s+DWM-222W|DWM-222W)\b',
r'\b(?:Royal\s+Elementor\s+Addons)\b',
r'\b(?:Booking\s+Calendar\s+Contact\s+Form)\b',
r'\b(?:WP\s+Books\s+Gallery)\b',
r'\b(?:Drag\s+and\s+Drop\s+File\s+Upload)\b',
r'\b(?:BetterDocs)\b',
r'\b(?:ExactMetrics)\b',
r'\b(?:MaxiBlocks\s+Builder)\b',
r'\b(?:Mobile\s+Next)\b',
r'\b(?:Sentry)\b',
r'\b(?:OpenAI\s+API)\b',
r'\b(?:Google\s+Ads)\b',
r'\b(?:ConfigMap)\b',
r'\b(?:ServiceAccount)\b',
r'\b(?:CODEOWNERS)\b',
]
for pat in system_patterns:
for m in re.finditer(pat, text):
spans.append((m.start(), m.end(), 'SYSTEM', m.group()))
# ORGANIZATION patterns
org_patterns = [
r'\b(?:Microsoft|Google|Apple|Amazon|Meta|Facebook)\b',
r'\b(?:Cisco|Intel|AMD|NVIDIA|Qualcomm|Broadcom|Samsung)\b',
r'\b(?:Oracle|IBM|SAP|VMware|Red\s+Hat|Canonical|SUSE)\b',
r'\b(?:Mozilla|Apache\s+(?:Software\s+)?Foundation)\b',
r'\b(?:Delta\s+Electronics)\b',
r'\b(?:D-Link)\b',
r'\b(?:GitLab)\b', # also org
r'\b(?:OpenAI)\b',
]
for pat in org_patterns:
for m in re.finditer(pat, text):
spans.append((m.start(), m.end(), 'ORGANIZATION', m.group()))
# FILEPATH patterns
for m in re.finditer(r'(?:/[\w.-]+){2,}(?:\.\w+)?(?:\(\))?', text):
spans.append((m.start(), m.end(), 'FILEPATH', m.group()))
# Function names like foo_bar() or foo::bar()
for m in re.finditer(r'\b\w+(?:_\w+)+\(\)', text):
s = m.group()
if len(s) > 6: # skip very short
spans.append((m.start(), m.end(), 'FILEPATH', s))
return spans
def resolve_overlaps(spans):
"""Remove overlapping spans, preferring longer matches and specific labels."""
# Sort by start, then by length descending
spans.sort(key=lambda x: (x[0], -(x[1]-x[0])))
result = []
for s in spans:
# Check overlap with already accepted spans
overlaps = False
for r in result:
if s[0] < r[1] and s[1] > r[0]: # overlap
overlaps = True
break
if not overlaps:
result.append(s)
return result
def to_opf_format(cve_id, text, spans):
"""Convert to OPF JSONL format."""
span_dict = {}
for start, end, label, surface in spans:
# Verify offset
actual = text[start:end]
if actual != surface:
print(f"WARNING: offset mismatch in {cve_id}: expected '{surface}' got '{actual}'", file=sys.stderr)
continue
key = f"{label}: {surface}"
if key not in span_dict:
span_dict[key] = []
span_dict[key].append([start, end])
return {
"text": text,
"spans": span_dict,
"info": {"id": f"nvd_{cve_id.replace('CVE-','').replace('-','_')}", "source": "nvd_cve"}
}
# Process all CVEs
output = []
total_spans = 0
for c in cves:
text = c['desc']
cve_id = c['id']
# Prepend CVE ID to text if not already there
if cve_id not in text:
text = f"{cve_id}: {text}"
raw_spans = find_all(text)
spans = resolve_overlaps(raw_spans)
if len(spans) < 2:
continue # skip if too few entities
record = to_opf_format(cve_id, text, spans)
total_spans += len(spans)
output.append(record)
print(f"Annotated {len(output)} CVEs with {total_spans} total spans ({total_spans/len(output):.1f} avg)", file=sys.stderr)
outpath = "/home/ubuntu/alkyline/data/processed/llm_annotated_nvd.jsonl"
with open(outpath, "w") as f:
for rec in output:
f.write(json.dumps(rec) + "\n")
print(f"Written to {outpath}", file=sys.stderr)