bbkdevops's picture
download
raw
11.5 kB
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import json
from pathlib import Path
import re
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CVELIST = ROOT / "third_party" / "cvelistV5"
DEFAULT_TRICKEST = ROOT / "third_party" / "trickest_cve"
def _sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _first_en_description(cna: dict) -> str:
for row in cna.get("descriptions", []) or []:
if row.get("lang", "").lower().startswith("en") and row.get("value"):
return str(row["value"]).strip()
return ""
def _affected(cna: dict) -> list[dict]:
out: list[dict] = []
for row in cna.get("affected", []) or []:
versions = []
for version in row.get("versions", []) or []:
versions.append({k: version.get(k) for k in ("version", "status", "lessThan", "versionType") if version.get(k)})
out.append(
{
"vendor": row.get("vendor") or "n/a",
"product": row.get("product") or "n/a",
"versions": versions[:8],
}
)
return out[:16]
def _problem_types(cna: dict) -> list[str]:
values: list[str] = []
for group in cna.get("problemTypes", []) or []:
for desc in group.get("descriptions", []) or []:
value = desc.get("description") or desc.get("cweId")
if value and value not in values:
values.append(str(value))
return values[:16]
def _references(cna: dict, adp: list[dict] | None = None) -> list[str]:
refs: list[str] = []
for row in cna.get("references", []) or []:
url = row.get("url")
if url and url not in refs:
refs.append(str(url))
for container in adp or []:
for row in container.get("references", []) or []:
url = row.get("url")
if url and url not in refs:
refs.append(str(url))
return refs[:24]
def _cvss(cna: dict, adp: list[dict] | None = None) -> list[dict]:
metrics = []
for container in [cna, *(adp or [])]:
for metric in container.get("metrics", []) or []:
item = {}
for key in ("cvssV4_0", "cvssV3_1", "cvssV3_0", "cvssV2_0"):
if key in metric:
src = metric[key]
item = {
"version": key,
"baseScore": src.get("baseScore"),
"baseSeverity": src.get("baseSeverity"),
"vectorString": src.get("vectorString"),
}
break
if item:
metrics.append(item)
return metrics[:8]
def _year_from_id(cve_id: str) -> int | None:
m = re.match(r"CVE-(\d{4})-", cve_id)
return int(m.group(1)) if m else None
@dataclass(frozen=True)
class CVECorpusPolicy:
max_records_per_source: int = 20_000
skip_records_per_source: int = 0
min_year: int = 1999
include_poc_urls: bool = False
safety_scope: str = "defensive_vulnerability_intelligence_only"
class CVEIntelligenceCorpusBuilder:
def __init__(
self,
cvelist_root: str | Path = DEFAULT_CVELIST,
trickest_root: str | Path = DEFAULT_TRICKEST,
policy: CVECorpusPolicy | None = None,
):
self.cvelist_root = Path(cvelist_root).resolve()
self.trickest_root = Path(trickest_root).resolve()
self.policy = policy or CVECorpusPolicy()
def build(self, out_dir: str | Path) -> dict:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
cvelist_records = self._cvelist_records()
trickest_records = self._trickest_records()
records = cvelist_records + trickest_records
train_path = out / "cve_intelligence_train.jsonl"
eval_path = out / "cve_intelligence_eval.jsonl"
train_count = 0
eval_count = 0
with train_path.open("w", encoding="utf-8", newline="\n") as train_f, eval_path.open("w", encoding="utf-8", newline="\n") as eval_f:
for idx, record in enumerate(records):
target = eval_f if idx % 17 == 0 else train_f
target.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
if idx % 17 == 0:
eval_count += 1
else:
train_count += 1
manifest = {
"schema_version": "tinymind-cve-intelligence-corpus-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"sources": {
"cvelistV5": str(self.cvelist_root),
"trickest_cve": str(self.trickest_root),
},
"policy": {
"safety_scope": self.policy.safety_scope,
"include_poc_urls": self.policy.include_poc_urls,
"poc_handling": "reference_summary_only" if not self.policy.include_poc_urls else "urls_only_no_payload",
"max_records_per_source": self.policy.max_records_per_source,
"skip_records_per_source": self.policy.skip_records_per_source,
"min_year": self.policy.min_year,
},
"records_written": len(records),
"train_records": train_count,
"eval_records": eval_count,
"source_counts": {
"cvelistV5": len(cvelist_records),
"trickest_cve": len(trickest_records),
},
"train_path": str(train_path),
"eval_path": str(eval_path),
"claim_gate": {
"defensive_cve_corpus_ready": len(records) > 0,
"exploit_payload_training_allowed": False,
"reason": "CVE records are converted to defensive vulnerability intelligence. Exploit payloads are not fetched or embedded.",
},
}
manifest_path = out / "cve_intelligence_corpus_manifest.json"
manifest["manifest_path"] = str(manifest_path)
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return manifest
def _cvelist_records(self) -> list[dict]:
root = self.cvelist_root / "cves"
if not root.exists():
return []
files = [
p for p in root.rglob("CVE-*.json")
if self._year_allowed(p.stem)
]
files.sort(key=lambda p: p.as_posix(), reverse=True)
if self.policy.skip_records_per_source > 0:
files = files[self.policy.skip_records_per_source :]
if self.policy.max_records_per_source > 0:
files = files[: self.policy.max_records_per_source]
records = []
for path in files:
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
continue
meta = data.get("cveMetadata", {})
cve_id = meta.get("cveId") or path.stem
containers = data.get("containers", {})
cna = containers.get("cna", {})
adp = containers.get("adp", [])
description = _first_en_description(cna)
if not description:
continue
knowledge = {
"cve_id": cve_id,
"state": meta.get("state"),
"date_published": meta.get("datePublished"),
"date_updated": meta.get("dateUpdated"),
"description": description,
"affected": _affected(cna),
"problem_types": _problem_types(cna),
"cvss": _cvss(cna, adp),
"references": _references(cna, adp),
}
records.append(self._record(cve_id, knowledge, "cvelistV5", path))
return records
def _trickest_records(self) -> list[dict]:
if not self.trickest_root.exists():
return []
files = [
p for p in self.trickest_root.rglob("CVE-*.md")
if self._year_allowed(p.stem)
]
files.sort(key=lambda p: p.as_posix(), reverse=True)
if self.policy.skip_records_per_source > 0:
files = files[self.policy.skip_records_per_source :]
if self.policy.max_records_per_source > 0:
files = files[: self.policy.max_records_per_source]
records = []
for path in files:
text = path.read_text(encoding="utf-8", errors="replace")
desc = self._extract_markdown_section(text, "Description")
if not desc:
continue
refs = self._extract_urls(text)
if not self.policy.include_poc_urls:
refs = [url for url in refs if "github.com" not in url.lower()]
knowledge = {
"cve_id": path.stem,
"description": desc[:4000],
"references": refs[:24],
"source_note": "trickest markdown summary; PoC payloads are not fetched or embedded",
}
records.append(self._record(path.stem, knowledge, "trickest_cve", path))
return records
def _year_allowed(self, cve_id: str) -> bool:
year = _year_from_id(cve_id)
return year is None or year >= self.policy.min_year
def _record(self, cve_id: str, knowledge: dict, source: str, path: Path) -> dict:
rel = path.relative_to(path.parents[1] if source == "cvelistV5" else self.trickest_root).as_posix()
return {
"messages": [
{
"role": "system",
"content": "You are TinyMind defensive CVE analyst. Explain vulnerabilities for patching, detection, risk triage, asset inventory, and safe remediation. Do not provide exploit payloads.",
},
{
"role": "user",
"content": f"Build a defensive vulnerability intelligence note for {cve_id} from this structured evidence:\n\n```json\n{json.dumps(knowledge, ensure_ascii=False, indent=2)}\n```",
},
{
"role": "assistant",
"content": "I will summarize impact, affected scope, severity evidence, references, and remediation/detection questions without giving exploit steps or payloads.",
},
],
"source": source,
"source_path": rel,
"source_sha256": _sha256_bytes(path.read_bytes()),
"cve_id": cve_id,
"year": _year_from_id(cve_id),
"topic": "defensive_cve_intelligence",
"safety_scope": self.policy.safety_scope,
}
def _extract_markdown_section(self, text: str, heading: str) -> str:
lines = text.splitlines()
capture = False
body: list[str] = []
for line in lines:
if line.strip().lower().startswith("### "):
if capture:
break
capture = line.strip().lower().lstrip("#").strip() == heading.lower()
continue
if capture:
body.append(line)
return "\n".join(body).strip()
def _extract_urls(self, text: str) -> list[str]:
urls = re.findall(r"https?://[^\s)>\]]+", text)
seen: list[str] = []
for url in urls:
url = url.rstrip(".,")
if url not in seen:
seen.append(url)
return seen

Xet Storage Details

Size:
11.5 kB
·
Xet hash:
ae3fce3ab32afe9f927682cfeda96ebe64d7ac42a4ae960140fd1d2d4b31850d

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.