Spaces:
Running
Running
| import pandas as pd | |
| import re | |
| import os | |
| import spacy | |
| from tqdm import tqdm | |
| CSV_PATH = "data/cves_raw.csv" | |
| OUT_PATH = "data/cves_processed.csv" | |
| nlp = spacy.load("en_core_web_sm") | |
| def clean_text(text): | |
| text = str(text).lower() | |
| text = re.sub(r"http\S+|www\S+", "", text) | |
| text = re.sub(r"<.*?>", "", text) | |
| text = re.sub(r"cve-\d{4}-\d+", "CVE_TOKEN", text) | |
| text = re.sub(r"v?\d+\.\d+[\./\d]*", "VERSION_TOKEN", text) | |
| text = re.sub(r"[^a-z0-9\s_]", " ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| REMOTE = ["remote","remotely","network"] | |
| UNAUTH = ["unauthenticated","unauthorized"] | |
| EXEC = ["execute","execution","arbitrary code","rce"] | |
| PRIVESC = ["privilege escalation","root","elevated privileges"] | |
| DOS = ["denial of service","dos","crash"] | |
| OVERFLOW = ["buffer overflow","heap overflow","out-of-bounds"] | |
| def extract_features(text): | |
| tl = text.lower() | |
| doc = nlp(tl) | |
| ents = [e.text for e in doc.ents if e.label_ in ["PRODUCT","ORG","GPE"]] | |
| return { | |
| "entity_count": len(ents), | |
| "entities": ", ".join(ents[:5]), | |
| "has_remote": int(any(w in tl for w in REMOTE)), | |
| "has_unauth": int(any(w in tl for w in UNAUTH)), | |
| "has_exec": int(any(w in tl for w in EXEC)), | |
| "has_priv_esc": int(any(w in tl for w in PRIVESC)), | |
| "has_dos": int(any(w in tl for w in DOS)), | |
| "has_overflow": int(any(w in tl for w in OVERFLOW)), | |
| "desc_word_count": len(text.split()) | |
| } | |
| AV = {"NETWORK":3,"ADJACENT":2,"LOCAL":1,"PHYSICAL":0} | |
| AC = {"LOW":1,"HIGH":0} | |
| PR = {"NONE":2,"LOW":1,"HIGH":0} | |
| UI = {"NONE":1,"REQUIRED":0} | |
| SC = {"CHANGED":1,"UNCHANGED":0} | |
| df_raw = pd.read_csv(CSV_PATH) | |
| if os.path.exists(OUT_PATH): | |
| df_done = pd.read_csv(OUT_PATH) | |
| done_ids = set(df_done["cve_id"]) | |
| df_new = df_raw[~df_raw["cve_id"].isin(done_ids)].copy().reset_index(drop=True) | |
| print(f"Already processed: {len(df_done)} | New to process: {len(df_new)}") | |
| else: | |
| df_done = None | |
| df_new = df_raw.copy() | |
| print(f"Processing all {len(df_new)} rows from scratch") | |
| if len(df_new) == 0: | |
| print("Nothing new to process.") | |
| else: | |
| tqdm.pandas() | |
| df_new["description_clean"] = df_new["description"].apply(clean_text) | |
| feats = df_new["description"].progress_apply(lambda x: pd.Series(extract_features(x))) | |
| df_new = pd.concat([df_new, feats], axis=1) | |
| df_new["entities"] = df_new["entities"].fillna("") | |
| df_new["attack_vector_enc"] = df_new["attack_vector"].map(AV).fillna(0) | |
| df_new["attack_complexity_enc"] = df_new["attack_complexity"].map(AC).fillna(0) | |
| df_new["privileges_required_enc"] = df_new["privileges_required"].map(PR).fillna(0) | |
| df_new["user_interaction_enc"] = df_new["user_interaction"].map(UI).fillna(0) | |
| df_new["scope_enc"] = df_new["scope"].map(SC).fillna(0) | |
| final = pd.concat([df_done, df_new], ignore_index=True) if df_done is not None else df_new | |
| final.to_csv(OUT_PATH, index=False) | |
| print(f"Saved {len(final)} rows to {OUT_PATH}") |