from __future__ import annotations from dataclasses import dataclass from typing import Optional import numpy as np import pandas as pd import warnings from .constants import CANDIDATE_CATEGORIES from .pipeline import normalize_bloc try: from numpy import RankWarning as NP_RANK_WARNING # type: ignore[attr-defined] except Exception: class NP_RANK_WARNING(UserWarning): pass @dataclass class PredictionResult: category: str predicted_share: float predicted_count: int @dataclass class PredictionSummary: bloc_predictions: list[PredictionResult] inscrits: Optional[int] votants: Optional[int] blancs: Optional[int] nuls: Optional[int] abstention: Optional[int] exprimes: Optional[int] DISPLAY_BLOC_ORDER = [ "extreme_gauche", "gauche_dure", "gauche_modere", "centre", "droite_modere", "droite_dure", "extreme_droite", ] EXTRA_CATEGORIES = ["blancs", "nuls", "abstention"] def _clip01(value: float) -> float: return float(min(1.0, max(0.0, value))) def _last_share(df: pd.DataFrame, bloc: str, *, election: Optional[str] = None, year: Optional[int] = None) -> Optional[float]: subset = df[df["bloc"] == bloc] if election: subset = subset[subset["type_scrutin"] == election] if year is not None: subset = subset[subset["annee"] == year] if subset.empty: return None valid = subset.sort_values("date_scrutin")["part_bloc"].dropna() if valid.empty: return None return valid.iloc[-1] # type: ignore[index] def _last_value(series: pd.Series) -> Optional[float]: series = pd.to_numeric(series, errors="coerce").dropna() if series.empty: return None return float(series.iloc[-1]) def _project_share(series: pd.Series, years: pd.Series, target_year: int) -> Optional[float]: df = pd.DataFrame({"value": pd.to_numeric(series, errors="coerce"), "year": pd.to_numeric(years, errors="coerce")}) df = df.dropna() if df.empty: return None if len(df["year"].unique()) >= 2 and len(df) >= 2: # Guard against poorly conditioned fits on tiny samples with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NP_RANK_WARNING) try: slope, intercept = np.polyfit(df["year"], df["value"], 1) projected = slope * target_year + intercept except Exception: projected = df["value"].iloc[-1] else: projected = df["value"].iloc[-1] return _clip01(float(projected)) def _project_rate( series: pd.Series, years: pd.Series, target_year: int, *, min_points_trend: int = 3, clamp_to_observed: bool = True, ) -> Optional[float]: df = pd.DataFrame( {"value": pd.to_numeric(series, errors="coerce"), "year": pd.to_numeric(years, errors="coerce")} ).dropna() if df.empty: return None values = df["value"].to_numpy() years_arr = df["year"].to_numpy() if len(set(years_arr)) >= min_points_trend and len(df) >= min_points_trend: with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NP_RANK_WARNING) try: slope, intercept = np.polyfit(years_arr, values, 1) projected = slope * target_year + intercept except Exception: projected = values[-1] else: projected = values[-1] if clamp_to_observed and len(values): projected = min(max(projected, float(np.nanmin(values))), float(np.nanmax(values))) return _clip01(float(projected)) def _allocate_counts(shares: list[float], total: int) -> list[int]: if total <= 0 or not shares: return [0 for _ in shares] arr = np.clip(np.asarray(shares, dtype=float), 0, None) if arr.sum() == 0: return [0 for _ in shares] arr = arr / arr.sum() raw = arr * total floors = np.floor(raw) remainder = int(total - floors.sum()) if remainder > 0: order = np.argsort(-(raw - floors)) for idx in order[:remainder]: floors[idx] += 1 return floors.astype(int).tolist() def compute_predictions( history: pd.DataFrame, *, target_election: str = "municipales", target_year: int = 2026, inscrits_override: Optional[float] = None, ) -> PredictionSummary: if history.empty: return PredictionSummary([], None, None, None, None, None, None) df = history.copy() target_election = str(target_election).strip().lower() df["bloc"] = df["bloc"].apply(normalize_bloc) if "type_scrutin" in df.columns: df["type_scrutin"] = df["type_scrutin"].astype(str).str.strip().str.lower() # Coerce numeric and infer exprimes when missing from the sum of voix_bloc for col in ["voix_bloc", "exprimes", "inscrits", "votants", "blancs", "nuls"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") for col in ["inscrits", "votants", "blancs", "nuls"]: if col not in df.columns: df[col] = np.nan if "exprimes" in df.columns: sum_voix = df.groupby(["code_bv", "date_scrutin"])["voix_bloc"].transform("sum") df["exprimes"] = df["exprimes"].fillna(sum_voix) df.loc[df["exprimes"] == 0, "exprimes"] = sum_voix if "part_bloc" not in df.columns or df["part_bloc"].isna().all(): df["part_bloc"] = df["voix_bloc"] / df["exprimes"] df["part_bloc"] = pd.to_numeric(df["part_bloc"], errors="coerce").clip(upper=1) df = df.dropna(subset=["bloc"]) bloc_order = [b for b in DISPLAY_BLOC_ORDER if b in CANDIDATE_CATEGORIES] raw_shares: dict[str, float] = {} for bloc in bloc_order: bloc_hist = df[df["bloc"] == bloc].sort_values("date_scrutin") last_overall = _last_share(bloc_hist, bloc) base_series = bloc_hist["part_bloc"] base_years = bloc_hist["annee"] if not bloc_hist.empty and target_election in bloc_hist["type_scrutin"].values: base_series = bloc_hist[bloc_hist["type_scrutin"] == target_election]["part_bloc"] base_years = bloc_hist[bloc_hist["type_scrutin"] == target_election]["annee"] projected = _project_share(base_series, base_years, target_year) if projected is None and last_overall is not None: projected = last_overall predicted = _clip01(projected or 0.0) raw_shares[bloc] = predicted share_values = np.array([raw_shares.get(b, 0.0) for b in bloc_order], dtype=float) share_sum = share_values.sum() if share_sum > 0: share_values = share_values / share_sum else: share_values = np.zeros_like(share_values) event_cols = [col for col in ["code_bv", "date_scrutin", "type_scrutin", "tour", "annee"] if col in df.columns] event_df = df.groupby(event_cols, as_index=False).agg( inscrits=("inscrits", "max"), votants=("votants", "max"), blancs=("blancs", "max"), nuls=("nuls", "max"), ) if "date_scrutin" in event_df.columns: event_df = event_df.sort_values("date_scrutin") if "type_scrutin" not in event_df.columns: event_df["type_scrutin"] = "" if "annee" not in event_df.columns: if "date_scrutin" in event_df.columns: event_df["annee"] = pd.to_datetime(event_df["date_scrutin"], errors="coerce").dt.year else: event_df["annee"] = np.nan base_inscrits = event_df["inscrits"].replace(0, pd.NA) event_df["taux_participation"] = event_df["votants"] / base_inscrits event_df["taux_blancs"] = event_df["blancs"] / base_inscrits event_df["taux_nuls"] = event_df["nuls"] / base_inscrits def _select_series(col: str) -> tuple[pd.Series, pd.Series]: scoped = event_df if "tour" in event_df.columns: round1 = event_df[event_df["tour"] == 1] if not round1.empty: scoped = round1 if not scoped.empty and target_election in scoped["type_scrutin"].values: mask = scoped["type_scrutin"] == target_election return scoped.loc[mask, col], scoped.loc[mask, "annee"] return scoped[col], scoped["annee"] turnout_series, turnout_years = _select_series("taux_participation") blancs_series, blancs_years = _select_series("taux_blancs") nuls_series, nuls_years = _select_series("taux_nuls") taux_participation = _project_rate(turnout_series, turnout_years, target_year) taux_blancs = _project_rate(blancs_series, blancs_years, target_year) taux_nuls = _project_rate(nuls_series, nuls_years, target_year) inscrits_used = None if inscrits_override is not None: try: value = float(inscrits_override) if value > 0: inscrits_used = value except (TypeError, ValueError): inscrits_used = None if inscrits_used is None: inscrits_used = _last_value(event_df["inscrits"]) if inscrits_used is None: return PredictionSummary([], None, None, None, None, None, None) if taux_participation is None: taux_participation = 0.0 if taux_blancs is None: taux_blancs = 0.0 if taux_nuls is None: taux_nuls = 0.0 if taux_blancs + taux_nuls > taux_participation and (taux_blancs + taux_nuls) > 0: scale = taux_participation / (taux_blancs + taux_nuls) taux_blancs *= scale taux_nuls *= scale inscrits_total = int(round(inscrits_used)) votants_total = int(round(inscrits_total * taux_participation)) blancs_total = int(round(inscrits_total * taux_blancs)) nuls_total = int(round(inscrits_total * taux_nuls)) if blancs_total + nuls_total > votants_total and (blancs_total + nuls_total) > 0: scale = votants_total / (blancs_total + nuls_total) blancs_total = int(round(blancs_total * scale)) nuls_total = int(round(nuls_total * scale)) exprimes_total = max(0, votants_total - blancs_total - nuls_total) abstention_total = max(0, inscrits_total - votants_total) bloc_counts = _allocate_counts(share_values.tolist(), exprimes_total) bloc_predictions: list[PredictionResult] = [] for bloc, share, count in zip(bloc_order, share_values.tolist(), bloc_counts): bloc_predictions.append( PredictionResult( category=bloc, predicted_share=float(share), predicted_count=int(count), ) ) return PredictionSummary( bloc_predictions=bloc_predictions, inscrits=inscrits_total, votants=votants_total, blancs=blancs_total, nuls=nuls_total, abstention=abstention_total, exprimes=exprimes_total, ) def predictions_as_dataframe(summary: PredictionSummary) -> pd.DataFrame: if summary is None or not summary.bloc_predictions: return pd.DataFrame(columns=["categorie", "nombre"]) rows = [] pred_map = {item.category: item for item in summary.bloc_predictions} for bloc in [b for b in DISPLAY_BLOC_ORDER if b in pred_map]: item = pred_map[bloc] rows.append({"categorie": bloc, "nombre": int(item.predicted_count)}) if summary.blancs is not None: rows.append({"categorie": "blancs", "nombre": int(summary.blancs)}) if summary.nuls is not None: rows.append({"categorie": "nuls", "nombre": int(summary.nuls)}) if summary.abstention is not None: rows.append({"categorie": "abstention", "nombre": int(summary.abstention)}) return pd.DataFrame(rows) __all__ = ["compute_predictions", "predictions_as_dataframe", "PredictionResult", "PredictionSummary"]