#!/usr/bin/env python3 """scripts/push_cve_corpus_to_hf.py — pull enriched cybersec rows from training_queue and publish them to cuilabs/bee-interactions HF dataset in the trainer-expected schema. Source rows must have BOTH `payload.prompt` and `payload.completion` — the prompt is generated by `backfill_cve_prompts.py`, the completion by `backfill_cve_completions.py`. Rows that have only one of the two are silently skipped (they're either still mid-pipeline or a teacher call failed). Output schema mirrors what the Vertex/Kaggle trainers expect via their `_row_user()` / `_row_assistant()` extractors: { "role": "assistant", "prompt": "", "content": "", "domain": "cybersecurity", "task_type": "cve_analysis", "target_tiers": ["cell", "cell-plus", "comb"], "quality_score": 0.85, "feedback": null, "model_id": "", "sample_id": "cve:", "source": "cve_distillation:", "kev_flag": , "cve_id": "", "cvss_severity": "", "cwes": [...], "created_at": "" } The trainer filter (`is_acceptable` in workers/vertex-train/train.py) keys on `domain` field equality plus minimum-length checks; everything else is provenance the trainer ignores but is useful when auditing the adapter. Usage ----- python3 scripts/push_cve_corpus_to_hf.py # full push python3 scripts/push_cve_corpus_to_hf.py --dry-run # stage only python3 scripts/push_cve_corpus_to_hf.py --limit 10 # smoke test Reads HF_TOKEN + POSTGRES_URL_NON_POOLING from `.env`. """ from __future__ import annotations import argparse import datetime import json import os import sys from pathlib import Path try: from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parent.parent / ".env") except ImportError: pass import psycopg from psycopg import rows as psycopg_rows REPO_ROOT = Path(__file__).resolve().parent.parent DEFAULT_DATASET_ID = "cuilabs/bee-interactions" DEFAULT_TARGET_TIERS = ["cell", "cell-plus", "comb"] DEFAULT_QUALITY = 0.85 def fetch_rows(conn, limit: int | None) -> list[dict]: sql = """ SELECT id, external_id, payload FROM public.training_queue WHERE kind = 'cve' AND domain = 'cybersecurity' AND payload ? 'prompt' AND payload ? 'completion' AND length(payload->>'prompt') >= 40 AND length(payload->>'completion') >= 80 ORDER BY CASE WHEN (payload->>'kev')::boolean THEN 0 ELSE 1 END, CASE payload->>'cvss_severity' WHEN 'CRITICAL' THEN 1 WHEN 'HIGH' THEN 2 WHEN 'MEDIUM' THEN 3 ELSE 9 END, (payload->>'published') DESC NULLS LAST """ if limit: sql += " LIMIT %s" params = (limit,) else: params = () with conn.cursor(row_factory=psycopg_rows.dict_row) as cur: cur.execute(sql, params) return list(cur.fetchall()) def transform(row: dict) -> dict: p = row["payload"] cve_id = p.get("cve_id") or row.get("external_id") or "" completion_model = p.get("completion_model") or "mistral-medium-latest" enrich_model = p.get("enrich_model") or completion_model return { "role": "assistant", "prompt": p["prompt"], "content": p["completion"], "domain": "cybersecurity", "task_type": "cve_analysis", "target_tiers": DEFAULT_TARGET_TIERS, "quality_score": DEFAULT_QUALITY, "feedback": None, "model_id": completion_model, "sample_id": f"cve:{cve_id}", "source": f"cve_distillation:prompt={enrich_model};answer={completion_model}", "kev_flag": bool(p.get("kev")), "cve_id": cve_id, "cvss_severity": p.get("cvss_severity"), "cwes": p.get("cwes") or [], "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), } def main() -> int: parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0]) parser.add_argument("--dataset-id", default=DEFAULT_DATASET_ID) parser.add_argument("--limit", type=int, default=None) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() pg_url = (os.environ.get("POSTGRES_URL_NON_POOLING") or "").strip() if not pg_url: print("ERROR: POSTGRES_URL_NON_POOLING not set", file=sys.stderr) return 1 hf_token = (os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") or "").strip() if not hf_token and not args.dry_run: print("ERROR: HF_TOKEN not set (use --dry-run to bypass)", file=sys.stderr) return 1 with psycopg.connect(pg_url, autocommit=False) as conn: rows = fetch_rows(conn, args.limit) if not rows: print("No rows with both prompt and completion — backfill incomplete?") return 0 transformed = [transform(r) for r in rows] kev_count = sum(1 for r in transformed if r["kev_flag"]) print(f"Transformed {len(transformed)} rows ({kev_count} KEV-flagged)") stamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d-%H%M%S") staging_dir = REPO_ROOT / "data/datasets/distilled" staging_dir.mkdir(parents=True, exist_ok=True) staging_path = staging_dir / f"_upload-cve-cybersec-{stamp}.jsonl" with staging_path.open("w", encoding="utf-8") as f: for r in transformed: f.write(json.dumps(r) + "\n") print(f"Staged {len(transformed)} rows at {staging_path}") if args.dry_run: print("[dry-run] not uploading. First row:") print(json.dumps(transformed[0], indent=2)[:1200]) return 0 try: from huggingface_hub import HfApi except ImportError: print("ERROR: huggingface_hub not installed", file=sys.stderr) return 1 api = HfApi(token=hf_token) upload_path = f"distilled/cve-cybersec-{stamp}.jsonl" print(f"Uploading {len(transformed)} rows → {args.dataset_id}:{upload_path}") api.upload_file( path_or_fileobj=str(staging_path), path_in_repo=upload_path, repo_id=args.dataset_id, repo_type="dataset", commit_message=( f"cve cybersec corpus: {len(transformed)} (prompt, completion) " f"pairs, {kev_count} KEV-flagged. teacher=mistral-medium-latest. " f"source=training_queue@{stamp}" ), ) print( f"OK uploaded → " f"https://huggingface.co/datasets/{args.dataset_id}/blob/main/{upload_path}" ) return 0 if __name__ == "__main__": sys.exit(main())