#!/usr/bin/env python3 """Annotate NVD CVE descriptions with cybersecurity NER entities.""" import json, re, sys with open("/home/ubuntu/alkyline/data/raw/nvd_cves_filtered.json") as f: cves = json.load(f) # Entity patterns with priorities (later patterns can override earlier) # We'll collect all matches, then resolve overlaps def find_all(text): """Find all entity spans in text. Returns list of (start, end, label, surface).""" spans = [] # CVE_ID for m in re.finditer(r'CVE-\d{4}-\d{4,}', text): spans.append((m.start(), m.end(), 'CVE_ID', m.group())) # VULNERABILITY patterns vuln_patterns = [ r'(?:buffer\s+overflow|stack\s+overflow|heap\s+overflow|integer\s+overflow)', r'(?:use-after-free|double[- ]free|null\s+pointer\s+dereference|NULL\s+pointer\s+dereference)', r'(?:remote\s+code\s+execution|arbitrary\s+code\s+execution)', r'(?:SQL\s+injection|command\s+injection|code\s+injection|OS\s+command\s+injection)', r'(?:cross-site\s+scripting|stored\s+XSS|reflected\s+XSS|DOM-based\s+cross-site\s+scripting|Stored\s+Cross-Site\s+Scripting)', r'(?:denial\s+of\s+service|DoS)', r'(?:path\s+traversal|directory\s+traversal|Path\s+Traversal)', r'(?:privilege\s+escalation|authentication\s+bypass)', r'(?:information\s+disclosure|information\s+leak)', r'(?:memory\s+leak|memory\s+corruption|memory\s+consumption)', r'(?:race\s+condition)', r'(?:out-of-bounds\s+(?:read|write|access)|oob\s+access)', r'(?:type\s+confusion)', r'(?:improper\s+(?:input\s+validation|access\s+control|authentication|authorization))', r'(?:brute-force\s+protection\s+bypass)', r'(?:SASL\s+mechanism\s+downgrade)', r'(?:response\s+injection)', r'(?:prompt\s+injection)', r'(?:token\s+theft)', r'(?:arbitrary\s+file\s+(?:upload|delete|read|write)s?)', r'(?:sanitization\s+bypass)', r'(?:RBAC\s+(?:bypass|restriction)s?)', ] for pat in vuln_patterns: for m in re.finditer(pat, text, re.IGNORECASE): spans.append((m.start(), m.end(), 'VULNERABILITY', m.group())) # SYSTEM patterns - software/hardware names system_patterns = [ r'\b(?:Linux\s+kernel)\b', r'\b(?:WordPress)\b', r'\b(?:Apache\s+(?:HTTP\s+Server|Tomcat|Kafka|Struts|Maven|Hadoop|Spark|Flink|Airflow|Camel|Solr|CXF|NiFi|OFBiz))\b', r'\b(?:Cisco\s+(?:IOS|NX-OS|ASA|WebEx|Meraki|Catalyst|Nexus|ISE))\b', r'\b(?:Android|iOS|macOS|Windows|FreeBSD|OpenBSD|NetBSD)\b', r'\b(?:Chrome|Firefox|Safari|Edge|Opera)\b', r'\b(?:MySQL|PostgreSQL|MariaDB|MongoDB|Redis|SQLite|Oracle\s+Database)\b', r'\b(?:nginx|HAProxy|Envoy|Traefik)\b', r'\b(?:Docker|Kubernetes|Kyverno|Helm)\b', r'\b(?:OpenSSL|GnuTLS|LibreSSL|BoringSSL)\b', r'\b(?:Git|GitLab|GitHub|Bitbucket)\b', r'\b(?:MailKit|basic-ftp|AnythingLLM|Contact\s+Form\s+7)\b', r'\b(?:STARTTLS|GraphQL|LDAP|OAuth|FTP|SMTP|HTTP|HTTPS|SSH|DNS|NFS|TFTP)\b', r'\b(?:Delta\s+Electronics\s+AS320T|AS320T)\b', r'\b(?:D-Link\s+DWM-222W|DWM-222W)\b', r'\b(?:Royal\s+Elementor\s+Addons)\b', r'\b(?:Booking\s+Calendar\s+Contact\s+Form)\b', r'\b(?:WP\s+Books\s+Gallery)\b', r'\b(?:Drag\s+and\s+Drop\s+File\s+Upload)\b', r'\b(?:BetterDocs)\b', r'\b(?:ExactMetrics)\b', r'\b(?:MaxiBlocks\s+Builder)\b', r'\b(?:Mobile\s+Next)\b', r'\b(?:Sentry)\b', r'\b(?:OpenAI\s+API)\b', r'\b(?:Google\s+Ads)\b', r'\b(?:ConfigMap)\b', r'\b(?:ServiceAccount)\b', r'\b(?:CODEOWNERS)\b', ] for pat in system_patterns: for m in re.finditer(pat, text): spans.append((m.start(), m.end(), 'SYSTEM', m.group())) # ORGANIZATION patterns org_patterns = [ r'\b(?:Microsoft|Google|Apple|Amazon|Meta|Facebook)\b', r'\b(?:Cisco|Intel|AMD|NVIDIA|Qualcomm|Broadcom|Samsung)\b', r'\b(?:Oracle|IBM|SAP|VMware|Red\s+Hat|Canonical|SUSE)\b', r'\b(?:Mozilla|Apache\s+(?:Software\s+)?Foundation)\b', r'\b(?:Delta\s+Electronics)\b', r'\b(?:D-Link)\b', r'\b(?:GitLab)\b', # also org r'\b(?:OpenAI)\b', ] for pat in org_patterns: for m in re.finditer(pat, text): spans.append((m.start(), m.end(), 'ORGANIZATION', m.group())) # FILEPATH patterns for m in re.finditer(r'(?:/[\w.-]+){2,}(?:\.\w+)?(?:\(\))?', text): spans.append((m.start(), m.end(), 'FILEPATH', m.group())) # Function names like foo_bar() or foo::bar() for m in re.finditer(r'\b\w+(?:_\w+)+\(\)', text): s = m.group() if len(s) > 6: # skip very short spans.append((m.start(), m.end(), 'FILEPATH', s)) return spans def resolve_overlaps(spans): """Remove overlapping spans, preferring longer matches and specific labels.""" # Sort by start, then by length descending spans.sort(key=lambda x: (x[0], -(x[1]-x[0]))) result = [] for s in spans: # Check overlap with already accepted spans overlaps = False for r in result: if s[0] < r[1] and s[1] > r[0]: # overlap overlaps = True break if not overlaps: result.append(s) return result def to_opf_format(cve_id, text, spans): """Convert to OPF JSONL format.""" span_dict = {} for start, end, label, surface in spans: # Verify offset actual = text[start:end] if actual != surface: print(f"WARNING: offset mismatch in {cve_id}: expected '{surface}' got '{actual}'", file=sys.stderr) continue key = f"{label}: {surface}" if key not in span_dict: span_dict[key] = [] span_dict[key].append([start, end]) return { "text": text, "spans": span_dict, "info": {"id": f"nvd_{cve_id.replace('CVE-','').replace('-','_')}", "source": "nvd_cve"} } # Process all CVEs output = [] total_spans = 0 for c in cves: text = c['desc'] cve_id = c['id'] # Prepend CVE ID to text if not already there if cve_id not in text: text = f"{cve_id}: {text}" raw_spans = find_all(text) spans = resolve_overlaps(raw_spans) if len(spans) < 2: continue # skip if too few entities record = to_opf_format(cve_id, text, spans) total_spans += len(spans) output.append(record) print(f"Annotated {len(output)} CVEs with {total_spans} total spans ({total_spans/len(output):.1f} avg)", file=sys.stderr) outpath = "/home/ubuntu/alkyline/data/processed/llm_annotated_nvd.jsonl" with open(outpath, "w") as f: for rec in output: f.write(json.dumps(rec) + "\n") print(f"Written to {outpath}", file=sys.stderr)