bee / scripts /push_cve_corpus_to_hf.py
Bee Deploy
HF Space backend deploy [de0cba5]
5e21013
#!/usr/bin/env python3
"""scripts/push_cve_corpus_to_hf.py β€” pull enriched cybersec rows from
training_queue and publish them to cuilabs/bee-interactions HF dataset
in the trainer-expected schema.
Source rows must have BOTH `payload.prompt` and `payload.completion` β€”
the prompt is generated by `backfill_cve_prompts.py`, the completion by
`backfill_cve_completions.py`. Rows that have only one of the two are
silently skipped (they're either still mid-pipeline or a teacher call
failed).
Output schema mirrors what the Vertex/Kaggle trainers expect via their
`_row_user()` / `_row_assistant()` extractors:
{
"role": "assistant",
"prompt": "<question>",
"content": "<answer>",
"domain": "cybersecurity",
"task_type": "cve_analysis",
"target_tiers": ["cell", "cell-plus", "comb"],
"quality_score": 0.85,
"feedback": null,
"model_id": "<completion_model>",
"sample_id": "cve:<cve_id>",
"source": "cve_distillation:<provenance>",
"kev_flag": <bool>,
"cve_id": "<id>",
"cvss_severity": "<sev>",
"cwes": [...],
"created_at": "<ISO ts>"
}
The trainer filter (`is_acceptable` in workers/vertex-train/train.py)
keys on `domain` field equality plus minimum-length checks; everything
else is provenance the trainer ignores but is useful when auditing the
adapter.
Usage
-----
python3 scripts/push_cve_corpus_to_hf.py # full push
python3 scripts/push_cve_corpus_to_hf.py --dry-run # stage only
python3 scripts/push_cve_corpus_to_hf.py --limit 10 # smoke test
Reads HF_TOKEN + POSTGRES_URL_NON_POOLING from `.env`.
"""
from __future__ import annotations
import argparse
import datetime
import json
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
except ImportError:
pass
import psycopg
from psycopg import rows as psycopg_rows
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_DATASET_ID = "cuilabs/bee-interactions"
DEFAULT_TARGET_TIERS = ["cell", "cell-plus", "comb"]
DEFAULT_QUALITY = 0.85
def fetch_rows(conn, limit: int | None) -> list[dict]:
sql = """
SELECT id, external_id, payload
FROM public.training_queue
WHERE kind = 'cve'
AND domain = 'cybersecurity'
AND payload ? 'prompt'
AND payload ? 'completion'
AND length(payload->>'prompt') >= 40
AND length(payload->>'completion') >= 80
ORDER BY
CASE WHEN (payload->>'kev')::boolean THEN 0 ELSE 1 END,
CASE payload->>'cvss_severity'
WHEN 'CRITICAL' THEN 1
WHEN 'HIGH' THEN 2
WHEN 'MEDIUM' THEN 3
ELSE 9
END,
(payload->>'published') DESC NULLS LAST
"""
if limit:
sql += " LIMIT %s"
params = (limit,)
else:
params = ()
with conn.cursor(row_factory=psycopg_rows.dict_row) as cur:
cur.execute(sql, params)
return list(cur.fetchall())
def transform(row: dict) -> dict:
p = row["payload"]
cve_id = p.get("cve_id") or row.get("external_id") or ""
completion_model = p.get("completion_model") or "mistral-medium-latest"
enrich_model = p.get("enrich_model") or completion_model
return {
"role": "assistant",
"prompt": p["prompt"],
"content": p["completion"],
"domain": "cybersecurity",
"task_type": "cve_analysis",
"target_tiers": DEFAULT_TARGET_TIERS,
"quality_score": DEFAULT_QUALITY,
"feedback": None,
"model_id": completion_model,
"sample_id": f"cve:{cve_id}",
"source": f"cve_distillation:prompt={enrich_model};answer={completion_model}",
"kev_flag": bool(p.get("kev")),
"cve_id": cve_id,
"cvss_severity": p.get("cvss_severity"),
"cwes": p.get("cwes") or [],
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
}
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0])
parser.add_argument("--dataset-id", default=DEFAULT_DATASET_ID)
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
pg_url = (os.environ.get("POSTGRES_URL_NON_POOLING") or "").strip()
if not pg_url:
print("ERROR: POSTGRES_URL_NON_POOLING not set", file=sys.stderr)
return 1
hf_token = (os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") or "").strip()
if not hf_token and not args.dry_run:
print("ERROR: HF_TOKEN not set (use --dry-run to bypass)", file=sys.stderr)
return 1
with psycopg.connect(pg_url, autocommit=False) as conn:
rows = fetch_rows(conn, args.limit)
if not rows:
print("No rows with both prompt and completion β€” backfill incomplete?")
return 0
transformed = [transform(r) for r in rows]
kev_count = sum(1 for r in transformed if r["kev_flag"])
print(f"Transformed {len(transformed)} rows ({kev_count} KEV-flagged)")
stamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d-%H%M%S")
staging_dir = REPO_ROOT / "data/datasets/distilled"
staging_dir.mkdir(parents=True, exist_ok=True)
staging_path = staging_dir / f"_upload-cve-cybersec-{stamp}.jsonl"
with staging_path.open("w", encoding="utf-8") as f:
for r in transformed:
f.write(json.dumps(r) + "\n")
print(f"Staged {len(transformed)} rows at {staging_path}")
if args.dry_run:
print("[dry-run] not uploading. First row:")
print(json.dumps(transformed[0], indent=2)[:1200])
return 0
try:
from huggingface_hub import HfApi
except ImportError:
print("ERROR: huggingface_hub not installed", file=sys.stderr)
return 1
api = HfApi(token=hf_token)
upload_path = f"distilled/cve-cybersec-{stamp}.jsonl"
print(f"Uploading {len(transformed)} rows β†’ {args.dataset_id}:{upload_path}")
api.upload_file(
path_or_fileobj=str(staging_path),
path_in_repo=upload_path,
repo_id=args.dataset_id,
repo_type="dataset",
commit_message=(
f"cve cybersec corpus: {len(transformed)} (prompt, completion) "
f"pairs, {kev_count} KEV-flagged. teacher=mistral-medium-latest. "
f"source=training_queue@{stamp}"
),
)
print(
f"OK uploaded β†’ "
f"https://huggingface.co/datasets/{args.dataset_id}/blob/main/{upload_path}"
)
return 0
if __name__ == "__main__":
sys.exit(main())