CVE-ReRanker / scripts /01_fetch.py
Tanmay1205
clean upload
fc40cb4
Raw
History Blame Contribute Delete
4.68 kB
import requests
import pandas as pd
import time
import os
import json
from datetime import datetime, timedelta, timezone
API_KEY = os.environ["NVD_API_KEY"]
HEADERS = {"apiKey": API_KEY}
CSV_PATH = "data/cves_raw.csv"
TRACKER = "data/last_updated.json"
def score_to_label(score):
if score >= 9.0: return "Critical"
elif score >= 7.0: return "High"
elif score >= 4.0: return "Medium"
else: return "Low"
def fetch_chunk(start, end):
url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
all_items = []
idx = 0
while True:
full_url = f"{url}?pubStartDate={start}&pubEndDate={end}&startIndex={idx}&resultsPerPage=2000"
try:
r = requests.get(full_url, headers=HEADERS, timeout=60)
r.raise_for_status()
data = r.json()
total = data.get("totalResults", 0)
items = data.get("vulnerabilities", [])
all_items.extend(items)
if len(all_items) >= total:
break
idx += 2000
time.sleep(0.7)
except Exception as e:
print(f" Error: {e}")
break
return all_items
def parse_items(items, existing_ids):
rows = []
for item in items:
try:
cve = item["cve"]
desc = ""
for d in cve.get("descriptions", []):
if d["lang"] == "en":
desc = d["value"]
break
if not desc or "** REJECT **" in desc or len(desc.split()) < 10:
continue
if cve["id"] in existing_ids:
continue
metrics = cve.get("metrics", {})
cvss_data = None
if "cvssMetricV31" in metrics:
cvss_data = metrics["cvssMetricV31"][0]["cvssData"]
elif "cvssMetricV30" in metrics:
cvss_data = metrics["cvssMetricV30"][0]["cvssData"]
else:
continue
score = cvss_data["baseScore"]
rows.append({
"cve_id": cve["id"],
"description": desc,
"cvss_score": score,
"cvss_label": score_to_label(score),
"attack_vector": cvss_data.get("attackVector", ""),
"attack_complexity": cvss_data.get("attackComplexity", ""),
"privileges_required": cvss_data.get("privilegesRequired", ""),
"user_interaction": cvss_data.get("userInteraction", ""),
"scope": cvss_data.get("scope", "")
})
except:
continue
return rows
# ── MAIN ──────────────────────────────────────────────────────────────
df = pd.read_csv(CSV_PATH)
existing_ids = set(df["cve_id"].tolist())
today = datetime.now(timezone.utc).replace(tzinfo=None)
# Read last collected date from tracker
if os.path.exists(TRACKER):
with open(TRACKER) as f:
data = json.load(f)
last = datetime.strptime(data["last_collected"], "%Y-%m-%d")
else:
last = datetime(2023, 12, 31)
gap = (today - last).days
print(f"Last collected: {last.strftime('%Y-%m-%d')}")
print(f"Today: {today.strftime('%Y-%m-%d')}")
print(f"Gap: {gap} days")
print(f"Existing CVEs: {len(df)}")
if gap < 1:
print("Already up to date.")
else:
chunks = []
current = last + timedelta(days=1)
while current < today:
chunk_end = min(current + timedelta(days=99), today)
chunks.append((
current.strftime("%Y-%m-%dT00:00:00.000"),
chunk_end.strftime("%Y-%m-%dT23:59:59.999")
))
current = chunk_end + timedelta(days=1)
print(f"Fetching {len(chunks)} chunks...")
all_new = []
for i, (start, end) in enumerate(chunks):
print(f"Chunk {i+1}/{len(chunks)}: {start[:10]} β†’ {end[:10]}")
items = fetch_chunk(start, end)
new_rows = parse_items(items, existing_ids)
all_new.extend(new_rows)
existing_ids.update([r["cve_id"] for r in new_rows])
print(f" Added {len(new_rows)} | Total new: {len(all_new)}")
time.sleep(2)
if all_new:
combined = pd.concat([df, pd.DataFrame(all_new)], ignore_index=True)
combined.to_csv(CSV_PATH, index=False)
print(f"Saved. Total CVEs: {len(combined)}")
with open(TRACKER, "w") as f:
json.dump({"last_collected": today.strftime("%Y-%m-%d")}, f)
print("Tracker updated.")