Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /cve_intelligence_corpus.py
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import re | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DEFAULT_CVELIST = ROOT / "third_party" / "cvelistV5" | |
| DEFAULT_TRICKEST = ROOT / "third_party" / "trickest_cve" | |
| def _sha256_bytes(data: bytes) -> str: | |
| return hashlib.sha256(data).hexdigest() | |
| def _first_en_description(cna: dict) -> str: | |
| for row in cna.get("descriptions", []) or []: | |
| if row.get("lang", "").lower().startswith("en") and row.get("value"): | |
| return str(row["value"]).strip() | |
| return "" | |
| def _affected(cna: dict) -> list[dict]: | |
| out: list[dict] = [] | |
| for row in cna.get("affected", []) or []: | |
| versions = [] | |
| for version in row.get("versions", []) or []: | |
| versions.append({k: version.get(k) for k in ("version", "status", "lessThan", "versionType") if version.get(k)}) | |
| out.append( | |
| { | |
| "vendor": row.get("vendor") or "n/a", | |
| "product": row.get("product") or "n/a", | |
| "versions": versions[:8], | |
| } | |
| ) | |
| return out[:16] | |
| def _problem_types(cna: dict) -> list[str]: | |
| values: list[str] = [] | |
| for group in cna.get("problemTypes", []) or []: | |
| for desc in group.get("descriptions", []) or []: | |
| value = desc.get("description") or desc.get("cweId") | |
| if value and value not in values: | |
| values.append(str(value)) | |
| return values[:16] | |
| def _references(cna: dict, adp: list[dict] | None = None) -> list[str]: | |
| refs: list[str] = [] | |
| for row in cna.get("references", []) or []: | |
| url = row.get("url") | |
| if url and url not in refs: | |
| refs.append(str(url)) | |
| for container in adp or []: | |
| for row in container.get("references", []) or []: | |
| url = row.get("url") | |
| if url and url not in refs: | |
| refs.append(str(url)) | |
| return refs[:24] | |
| def _cvss(cna: dict, adp: list[dict] | None = None) -> list[dict]: | |
| metrics = [] | |
| for container in [cna, *(adp or [])]: | |
| for metric in container.get("metrics", []) or []: | |
| item = {} | |
| for key in ("cvssV4_0", "cvssV3_1", "cvssV3_0", "cvssV2_0"): | |
| if key in metric: | |
| src = metric[key] | |
| item = { | |
| "version": key, | |
| "baseScore": src.get("baseScore"), | |
| "baseSeverity": src.get("baseSeverity"), | |
| "vectorString": src.get("vectorString"), | |
| } | |
| break | |
| if item: | |
| metrics.append(item) | |
| return metrics[:8] | |
| def _year_from_id(cve_id: str) -> int | None: | |
| m = re.match(r"CVE-(\d{4})-", cve_id) | |
| return int(m.group(1)) if m else None | |
| class CVECorpusPolicy: | |
| max_records_per_source: int = 20_000 | |
| skip_records_per_source: int = 0 | |
| min_year: int = 1999 | |
| include_poc_urls: bool = False | |
| safety_scope: str = "defensive_vulnerability_intelligence_only" | |
| class CVEIntelligenceCorpusBuilder: | |
| def __init__( | |
| self, | |
| cvelist_root: str | Path = DEFAULT_CVELIST, | |
| trickest_root: str | Path = DEFAULT_TRICKEST, | |
| policy: CVECorpusPolicy | None = None, | |
| ): | |
| self.cvelist_root = Path(cvelist_root).resolve() | |
| self.trickest_root = Path(trickest_root).resolve() | |
| self.policy = policy or CVECorpusPolicy() | |
| def build(self, out_dir: str | Path) -> dict: | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| cvelist_records = self._cvelist_records() | |
| trickest_records = self._trickest_records() | |
| records = cvelist_records + trickest_records | |
| train_path = out / "cve_intelligence_train.jsonl" | |
| eval_path = out / "cve_intelligence_eval.jsonl" | |
| train_count = 0 | |
| eval_count = 0 | |
| with train_path.open("w", encoding="utf-8", newline="\n") as train_f, eval_path.open("w", encoding="utf-8", newline="\n") as eval_f: | |
| for idx, record in enumerate(records): | |
| target = eval_f if idx % 17 == 0 else train_f | |
| target.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") | |
| if idx % 17 == 0: | |
| eval_count += 1 | |
| else: | |
| train_count += 1 | |
| manifest = { | |
| "schema_version": "tinymind-cve-intelligence-corpus-v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "sources": { | |
| "cvelistV5": str(self.cvelist_root), | |
| "trickest_cve": str(self.trickest_root), | |
| }, | |
| "policy": { | |
| "safety_scope": self.policy.safety_scope, | |
| "include_poc_urls": self.policy.include_poc_urls, | |
| "poc_handling": "reference_summary_only" if not self.policy.include_poc_urls else "urls_only_no_payload", | |
| "max_records_per_source": self.policy.max_records_per_source, | |
| "skip_records_per_source": self.policy.skip_records_per_source, | |
| "min_year": self.policy.min_year, | |
| }, | |
| "records_written": len(records), | |
| "train_records": train_count, | |
| "eval_records": eval_count, | |
| "source_counts": { | |
| "cvelistV5": len(cvelist_records), | |
| "trickest_cve": len(trickest_records), | |
| }, | |
| "train_path": str(train_path), | |
| "eval_path": str(eval_path), | |
| "claim_gate": { | |
| "defensive_cve_corpus_ready": len(records) > 0, | |
| "exploit_payload_training_allowed": False, | |
| "reason": "CVE records are converted to defensive vulnerability intelligence. Exploit payloads are not fetched or embedded.", | |
| }, | |
| } | |
| manifest_path = out / "cve_intelligence_corpus_manifest.json" | |
| manifest["manifest_path"] = str(manifest_path) | |
| manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return manifest | |
| def _cvelist_records(self) -> list[dict]: | |
| root = self.cvelist_root / "cves" | |
| if not root.exists(): | |
| return [] | |
| files = [ | |
| p for p in root.rglob("CVE-*.json") | |
| if self._year_allowed(p.stem) | |
| ] | |
| files.sort(key=lambda p: p.as_posix(), reverse=True) | |
| if self.policy.skip_records_per_source > 0: | |
| files = files[self.policy.skip_records_per_source :] | |
| if self.policy.max_records_per_source > 0: | |
| files = files[: self.policy.max_records_per_source] | |
| records = [] | |
| for path in files: | |
| try: | |
| data = json.loads(path.read_text(encoding="utf-8")) | |
| except Exception: | |
| continue | |
| meta = data.get("cveMetadata", {}) | |
| cve_id = meta.get("cveId") or path.stem | |
| containers = data.get("containers", {}) | |
| cna = containers.get("cna", {}) | |
| adp = containers.get("adp", []) | |
| description = _first_en_description(cna) | |
| if not description: | |
| continue | |
| knowledge = { | |
| "cve_id": cve_id, | |
| "state": meta.get("state"), | |
| "date_published": meta.get("datePublished"), | |
| "date_updated": meta.get("dateUpdated"), | |
| "description": description, | |
| "affected": _affected(cna), | |
| "problem_types": _problem_types(cna), | |
| "cvss": _cvss(cna, adp), | |
| "references": _references(cna, adp), | |
| } | |
| records.append(self._record(cve_id, knowledge, "cvelistV5", path)) | |
| return records | |
| def _trickest_records(self) -> list[dict]: | |
| if not self.trickest_root.exists(): | |
| return [] | |
| files = [ | |
| p for p in self.trickest_root.rglob("CVE-*.md") | |
| if self._year_allowed(p.stem) | |
| ] | |
| files.sort(key=lambda p: p.as_posix(), reverse=True) | |
| if self.policy.skip_records_per_source > 0: | |
| files = files[self.policy.skip_records_per_source :] | |
| if self.policy.max_records_per_source > 0: | |
| files = files[: self.policy.max_records_per_source] | |
| records = [] | |
| for path in files: | |
| text = path.read_text(encoding="utf-8", errors="replace") | |
| desc = self._extract_markdown_section(text, "Description") | |
| if not desc: | |
| continue | |
| refs = self._extract_urls(text) | |
| if not self.policy.include_poc_urls: | |
| refs = [url for url in refs if "github.com" not in url.lower()] | |
| knowledge = { | |
| "cve_id": path.stem, | |
| "description": desc[:4000], | |
| "references": refs[:24], | |
| "source_note": "trickest markdown summary; PoC payloads are not fetched or embedded", | |
| } | |
| records.append(self._record(path.stem, knowledge, "trickest_cve", path)) | |
| return records | |
| def _year_allowed(self, cve_id: str) -> bool: | |
| year = _year_from_id(cve_id) | |
| return year is None or year >= self.policy.min_year | |
| def _record(self, cve_id: str, knowledge: dict, source: str, path: Path) -> dict: | |
| rel = path.relative_to(path.parents[1] if source == "cvelistV5" else self.trickest_root).as_posix() | |
| return { | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are TinyMind defensive CVE analyst. Explain vulnerabilities for patching, detection, risk triage, asset inventory, and safe remediation. Do not provide exploit payloads.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Build a defensive vulnerability intelligence note for {cve_id} from this structured evidence:\n\n```json\n{json.dumps(knowledge, ensure_ascii=False, indent=2)}\n```", | |
| }, | |
| { | |
| "role": "assistant", | |
| "content": "I will summarize impact, affected scope, severity evidence, references, and remediation/detection questions without giving exploit steps or payloads.", | |
| }, | |
| ], | |
| "source": source, | |
| "source_path": rel, | |
| "source_sha256": _sha256_bytes(path.read_bytes()), | |
| "cve_id": cve_id, | |
| "year": _year_from_id(cve_id), | |
| "topic": "defensive_cve_intelligence", | |
| "safety_scope": self.policy.safety_scope, | |
| } | |
| def _extract_markdown_section(self, text: str, heading: str) -> str: | |
| lines = text.splitlines() | |
| capture = False | |
| body: list[str] = [] | |
| for line in lines: | |
| if line.strip().lower().startswith("### "): | |
| if capture: | |
| break | |
| capture = line.strip().lower().lstrip("#").strip() == heading.lower() | |
| continue | |
| if capture: | |
| body.append(line) | |
| return "\n".join(body).strip() | |
| def _extract_urls(self, text: str) -> list[str]: | |
| urls = re.findall(r"https?://[^\s)>\]]+", text) | |
| seen: list[str] = [] | |
| for url in urls: | |
| url = url.rstrip(".,") | |
| if url not in seen: | |
| seen.append(url) | |
| return seen | |
Xet Storage Details
- Size:
- 11.5 kB
- Xet hash:
- ae3fce3ab32afe9f927682cfeda96ebe64d7ac42a4ae960140fd1d2d4b31850d
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.