Spaces:
Sleeping
Sleeping
| import re | |
| import ast | |
| import numpy as np | |
| import pandas as pd | |
| DATE_COL = "status_date" | |
| # Keep only what we need for the dashboard (cuts memory a LOT) | |
| NEEDED_COLS = [ | |
| "bill_id", | |
| "session", | |
| "chamber", | |
| "bill_number", | |
| "status_date", | |
| "policy_domain_standardized", | |
| "category_main_label", | |
| "category_sub_label", | |
| "intent_standardized", | |
| "policy_direction_classifications", | |
| "category_main_keywords", | |
| "category_sub_keywords", | |
| "category_main_llama_summary_keywords", | |
| "category_sub_llama_summary_keywords", | |
| "legislative_goal_standardized", | |
| "impact_rating_standardized", | |
| "impact_rating_score", | |
| ] | |
| KEYWORD_COLS = [ | |
| "category_main_keywords", | |
| "category_sub_keywords", | |
| "category_main_llama_summary_keywords", | |
| "category_sub_llama_summary_keywords", | |
| ] | |
| def _safe_listify(x): | |
| """Turn list-like cells or strings into list[str].""" | |
| if x is None: | |
| return [] | |
| if isinstance(x, float) and np.isnan(x): | |
| return [] | |
| if isinstance(x, list): | |
| return [str(i).strip() for i in x if str(i).strip()] | |
| s = str(x).strip() | |
| if not s or s.lower() in {"nan", "none", "null"}: | |
| return [] | |
| if (s.startswith("[") and s.endswith("]")) or (s.startswith("(") and s.endswith(")")): | |
| try: | |
| parsed = ast.literal_eval(s) | |
| if isinstance(parsed, (list, tuple, set)): | |
| return [str(i).strip() for i in parsed if str(i).strip()] | |
| except Exception: | |
| pass | |
| parts = re.split(r"[,\|;]\s*", s) | |
| return [p.strip() for p in parts if p.strip()] | |
| def load_dataset(path: str) -> pd.DataFrame: | |
| if path.lower().endswith(".parquet"): | |
| all_cols = pd.read_parquet(path, engine="pyarrow").columns | |
| cols = [c for c in NEEDED_COLS if c in all_cols] | |
| df = pd.read_parquet(path, columns=cols) | |
| elif path.lower().endswith(".csv"): | |
| # for csv, we can't cheaply read columns list; just try usecols and fallback | |
| try: | |
| df = pd.read_csv(path, usecols=NEEDED_COLS) | |
| except Exception: | |
| df = pd.read_csv(path) | |
| df = df[[c for c in NEEDED_COLS if c in df.columns]] | |
| else: | |
| raise ValueError("Supported formats: .parquet or .csv") | |
| if DATE_COL not in df.columns: | |
| raise ValueError(f"Expected a date column named '{DATE_COL}'") | |
| df[DATE_COL] = pd.to_datetime(df[DATE_COL], errors="coerce") | |
| df = df[df[DATE_COL].notna()].copy() | |
| df["year"] = df[DATE_COL].dt.year | |
| df["month"] = df[DATE_COL].dt.to_period("M").dt.to_timestamp() | |
| df["week"] = df[DATE_COL].dt.to_period("W").dt.start_time | |
| return df | |
| def apply_filters( | |
| df: pd.DataFrame, | |
| date_min=None, | |
| date_max=None, | |
| sessions=None, | |
| chambers=None, | |
| policy_domains=None, | |
| category_main=None, | |
| category_sub=None, | |
| intents=None, | |
| policy_dirs=None, | |
| ): | |
| out = df.copy() | |
| if date_min is not None: | |
| out = out[out["status_date"] >= pd.to_datetime(date_min)] | |
| if date_max is not None: | |
| out = out[out["status_date"] <= pd.to_datetime(date_max)] | |
| def _filter_in(col, values): | |
| nonlocal out | |
| if values and "All" not in values: | |
| out = out[out[col].isin(values)] | |
| _filter_in("session", sessions) | |
| _filter_in("chamber", chambers) | |
| _filter_in("policy_domain_standardized", policy_domains) | |
| _filter_in("category_main_label", category_main) | |
| _filter_in("category_sub_label", category_sub) | |
| _filter_in("intent_standardized", intents) | |
| _filter_in("policy_direction_classifications", policy_dirs) | |
| return out | |
| def explode_keywords(df: pd.DataFrame, keyword_col: str) -> pd.DataFrame: | |
| keep_cols = [ | |
| "bill_id", | |
| "status_date", | |
| "month", | |
| "week", | |
| "session", | |
| "chamber", | |
| "policy_domain_standardized", | |
| "category_main_label", | |
| "category_sub_label", | |
| "intent_standardized", | |
| "policy_direction_classifications", | |
| keyword_col, | |
| ] | |
| keep_cols = [c for c in keep_cols if c in df.columns] | |
| tmp = df[keep_cols].copy() | |
| tmp["keyword_list"] = tmp[keyword_col].apply(_safe_listify) | |
| tmp = tmp.explode("keyword_list", ignore_index=True) | |
| tmp = tmp.rename(columns={"keyword_list": "keyword"}) | |
| tmp["keyword"] = tmp["keyword"].astype(str).str.strip() | |
| tmp = tmp[(tmp["keyword"].notna()) & (tmp["keyword"] != "") & (tmp["keyword"].str.lower() != "nan")] | |
| tmp["keyword_norm"] = ( | |
| tmp["keyword"] | |
| .str.lower() | |
| .str.replace(r"\s+", " ", regex=True) | |
| .str.replace(r"[^a-z0-9 \-_/]", "", regex=True) | |
| .str.strip() | |
| ) | |
| tmp = tmp[tmp["keyword_norm"].str.len() >= 3] | |
| return tmp | |
| def keyword_trends(df_long: pd.DataFrame, time_grain="month", top_n=15): | |
| tg = "month" if time_grain == "month" else "week" | |
| top = ( | |
| df_long.groupby("keyword_norm") | |
| .size() | |
| .reset_index(name="count") | |
| .sort_values("count", ascending=False) | |
| .head(top_n) | |
| ) | |
| top_set = set(top["keyword_norm"].tolist()) | |
| base = df_long[df_long["keyword_norm"].isin(top_set)] | |
| ts = ( | |
| base.groupby([tg, "keyword_norm"]) | |
| .size() | |
| .reset_index(name="mentions") | |
| .sort_values([tg, "mentions"], ascending=[True, False]) | |
| ) | |
| return top, ts | |