import re import ast import numpy as np import pandas as pd DATE_COL = "status_date" # Keep only what we need for the dashboard (cuts memory a LOT) NEEDED_COLS = [ "bill_id", "session", "chamber", "bill_number", "status_date", "policy_domain_standardized", "category_main_label", "category_sub_label", "intent_standardized", "policy_direction_classifications", "category_main_keywords", "category_sub_keywords", "category_main_llama_summary_keywords", "category_sub_llama_summary_keywords", "legislative_goal_standardized", "impact_rating_standardized", "impact_rating_score", ] KEYWORD_COLS = [ "category_main_keywords", "category_sub_keywords", "category_main_llama_summary_keywords", "category_sub_llama_summary_keywords", ] def _safe_listify(x): """Turn list-like cells or strings into list[str].""" if x is None: return [] if isinstance(x, float) and np.isnan(x): return [] if isinstance(x, list): return [str(i).strip() for i in x if str(i).strip()] s = str(x).strip() if not s or s.lower() in {"nan", "none", "null"}: return [] if (s.startswith("[") and s.endswith("]")) or (s.startswith("(") and s.endswith(")")): try: parsed = ast.literal_eval(s) if isinstance(parsed, (list, tuple, set)): return [str(i).strip() for i in parsed if str(i).strip()] except Exception: pass parts = re.split(r"[,\|;]\s*", s) return [p.strip() for p in parts if p.strip()] def load_dataset(path: str) -> pd.DataFrame: if path.lower().endswith(".parquet"): all_cols = pd.read_parquet(path, engine="pyarrow").columns cols = [c for c in NEEDED_COLS if c in all_cols] df = pd.read_parquet(path, columns=cols) elif path.lower().endswith(".csv"): # for csv, we can't cheaply read columns list; just try usecols and fallback try: df = pd.read_csv(path, usecols=NEEDED_COLS) except Exception: df = pd.read_csv(path) df = df[[c for c in NEEDED_COLS if c in df.columns]] else: raise ValueError("Supported formats: .parquet or .csv") if DATE_COL not in df.columns: raise ValueError(f"Expected a date column named '{DATE_COL}'") df[DATE_COL] = pd.to_datetime(df[DATE_COL], errors="coerce") df = df[df[DATE_COL].notna()].copy() df["year"] = df[DATE_COL].dt.year df["month"] = df[DATE_COL].dt.to_period("M").dt.to_timestamp() df["week"] = df[DATE_COL].dt.to_period("W").dt.start_time return df def apply_filters( df: pd.DataFrame, date_min=None, date_max=None, sessions=None, chambers=None, policy_domains=None, category_main=None, category_sub=None, intents=None, policy_dirs=None, ): out = df.copy() if date_min is not None: out = out[out["status_date"] >= pd.to_datetime(date_min)] if date_max is not None: out = out[out["status_date"] <= pd.to_datetime(date_max)] def _filter_in(col, values): nonlocal out if values and "All" not in values: out = out[out[col].isin(values)] _filter_in("session", sessions) _filter_in("chamber", chambers) _filter_in("policy_domain_standardized", policy_domains) _filter_in("category_main_label", category_main) _filter_in("category_sub_label", category_sub) _filter_in("intent_standardized", intents) _filter_in("policy_direction_classifications", policy_dirs) return out def explode_keywords(df: pd.DataFrame, keyword_col: str) -> pd.DataFrame: keep_cols = [ "bill_id", "status_date", "month", "week", "session", "chamber", "policy_domain_standardized", "category_main_label", "category_sub_label", "intent_standardized", "policy_direction_classifications", keyword_col, ] keep_cols = [c for c in keep_cols if c in df.columns] tmp = df[keep_cols].copy() tmp["keyword_list"] = tmp[keyword_col].apply(_safe_listify) tmp = tmp.explode("keyword_list", ignore_index=True) tmp = tmp.rename(columns={"keyword_list": "keyword"}) tmp["keyword"] = tmp["keyword"].astype(str).str.strip() tmp = tmp[(tmp["keyword"].notna()) & (tmp["keyword"] != "") & (tmp["keyword"].str.lower() != "nan")] tmp["keyword_norm"] = ( tmp["keyword"] .str.lower() .str.replace(r"\s+", " ", regex=True) .str.replace(r"[^a-z0-9 \-_/]", "", regex=True) .str.strip() ) tmp = tmp[tmp["keyword_norm"].str.len() >= 3] return tmp def keyword_trends(df_long: pd.DataFrame, time_grain="month", top_n=15): tg = "month" if time_grain == "month" else "week" top = ( df_long.groupby("keyword_norm") .size() .reset_index(name="count") .sort_values("count", ascending=False) .head(top_n) ) top_set = set(top["keyword_norm"].tolist()) base = df_long[df_long["keyword_norm"].isin(top_set)] ts = ( base.groupby([tg, "keyword_norm"]) .size() .reset_index(name="mentions") .sort_values([tg, "mentions"], ascending=[True, False]) ) return top, ts