|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, List |
|
|
|
|
|
import joblib |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
from src.constants import CANDIDATE_CATEGORIES |
|
|
from src.features.build_features import ( |
|
|
aggregate_by_event, |
|
|
compute_national_reference, |
|
|
expand_by_category, |
|
|
load_elections_long, |
|
|
load_mapping, |
|
|
) |
|
|
|
|
|
LOGGER = logging.getLogger(__name__) |
|
|
TYPE_HISTORY_BLEND = { |
|
|
"presidentielles": 0.4, |
|
|
"legislatives": 0.35, |
|
|
"europeennes": 0.3, |
|
|
"regionales": 0.3, |
|
|
"departementales": 0.3, |
|
|
"municipales": 0.2, |
|
|
} |
|
|
|
|
|
|
|
|
def blend_with_type_history( |
|
|
preds: np.ndarray, |
|
|
feature_df: pd.DataFrame, |
|
|
target_type: str, |
|
|
) -> np.ndarray: |
|
|
base_weight = TYPE_HISTORY_BLEND.get(str(target_type).lower(), 0.0) |
|
|
if base_weight <= 0 or preds.size == 0: |
|
|
return preds |
|
|
hist_cols = [f"prev_share_type_lag1_{cat}" for cat in CANDIDATE_CATEGORIES] |
|
|
if not all(col in feature_df.columns for col in hist_cols): |
|
|
return preds |
|
|
hist_vals = feature_df[hist_cols].to_numpy(dtype=float) |
|
|
mask = np.isnan(hist_vals) |
|
|
available = (~mask).sum(axis=1).astype(float) |
|
|
if np.nanmax(available) == 0: |
|
|
return preds |
|
|
ratio = (available / len(CANDIDATE_CATEGORIES)).reshape(-1, 1) |
|
|
weights = base_weight * ratio |
|
|
hist_vals = np.where(mask, preds, hist_vals) |
|
|
blended = (1 - weights) * preds + weights * hist_vals |
|
|
blended = np.clip(blended, 0, None) |
|
|
sums = blended.sum(axis=1, keepdims=True) |
|
|
sums[sums == 0] = 1 |
|
|
return blended / sums |
|
|
|
|
|
|
|
|
def filter_history(df: pd.DataFrame, target_year: int, commune_code: str | None) -> pd.DataFrame: |
|
|
df = df[df["annee"] < target_year] |
|
|
if commune_code: |
|
|
df = df[df["code_commune"] == commune_code] |
|
|
return df |
|
|
|
|
|
|
|
|
def build_feature_matrix( |
|
|
elections_long: pd.DataFrame, |
|
|
mapping: pd.DataFrame, |
|
|
target_type: str, |
|
|
target_year: int, |
|
|
) -> pd.DataFrame: |
|
|
expanded = expand_by_category(elections_long, mapping) |
|
|
local = aggregate_by_event(expanded) |
|
|
nat = compute_national_reference(local) |
|
|
local = local.merge(nat, on=["election_type", "election_year", "round", "category"], how="left") |
|
|
local["dev_to_nat"] = local["share"] - local["share_nat"] |
|
|
local = local.sort_values("date_scrutin") |
|
|
|
|
|
last_any_share = ( |
|
|
local.sort_values("date_scrutin").groupby(["code_bv", "category"])["share"].last() |
|
|
) |
|
|
last_any_dev = ( |
|
|
local.sort_values("date_scrutin").groupby(["code_bv", "category"])["dev_to_nat"].last() |
|
|
) |
|
|
last_type_share = ( |
|
|
local[local["election_type"] == target_type] |
|
|
.sort_values("date_scrutin") |
|
|
.groupby(["code_bv", "category"])["share"] |
|
|
.last() |
|
|
) |
|
|
last_type_dev = ( |
|
|
local[local["election_type"] == target_type] |
|
|
.sort_values("date_scrutin") |
|
|
.groupby(["code_bv", "category"])["dev_to_nat"] |
|
|
.last() |
|
|
) |
|
|
|
|
|
swing_any = ( |
|
|
local.groupby(["code_bv", "category"])["share"] |
|
|
.apply(lambda s: s.iloc[-1] - s.iloc[-2] if len(s) >= 2 else np.nan) |
|
|
.rename("swing_any") |
|
|
) |
|
|
|
|
|
turnout_any = local.groupby("code_bv")["turnout_pct"].last() |
|
|
turnout_type = ( |
|
|
local[local["election_type"] == target_type] |
|
|
.sort_values("date_scrutin") |
|
|
.groupby("code_bv")["turnout_pct"] |
|
|
.last() |
|
|
) |
|
|
|
|
|
bureaux = sorted(local["code_bv"].dropna().unique()) |
|
|
records: List[dict] = [] |
|
|
for code_bv in bureaux: |
|
|
record = { |
|
|
"commune_code": str(code_bv).split("-")[0], |
|
|
"code_bv": code_bv, |
|
|
"election_type": target_type, |
|
|
"election_year": target_year, |
|
|
"round": 1, |
|
|
"date_scrutin": f"{target_year}-01-01", |
|
|
"prev_turnout_any_lag1": turnout_any.get(code_bv, np.nan), |
|
|
"prev_turnout_same_type_lag1": turnout_type.get(code_bv, np.nan), |
|
|
} |
|
|
for cat in CANDIDATE_CATEGORIES: |
|
|
record[f"prev_share_any_lag1_{cat}"] = last_any_share.get((code_bv, cat), np.nan) |
|
|
record[f"prev_share_type_lag1_{cat}"] = last_type_share.get((code_bv, cat), np.nan) |
|
|
record[f"prev_dev_to_national_any_lag1_{cat}"] = last_any_dev.get((code_bv, cat), np.nan) |
|
|
record[f"prev_dev_to_national_type_lag1_{cat}"] = last_type_dev.get((code_bv, cat), np.nan) |
|
|
record[f"swing_any_{cat}"] = swing_any.get((code_bv, cat), np.nan) |
|
|
records.append(record) |
|
|
return pd.DataFrame.from_records(records) |
|
|
|
|
|
|
|
|
def compute_references(local: pd.DataFrame, target_year: int) -> Dict[str, Dict[str, float]]: |
|
|
refs: Dict[str, Dict[str, float]] = {} |
|
|
leg = ( |
|
|
local[(local["election_type"] == "legislatives") & (local["election_year"] < target_year)] |
|
|
.sort_values("date_scrutin") |
|
|
.groupby(["code_bv", "category"]) |
|
|
.last() |
|
|
) |
|
|
mun2020 = ( |
|
|
local[(local["election_type"] == "municipales") & (local["election_year"] == 2020)] |
|
|
.sort_values("date_scrutin") |
|
|
.groupby(["code_bv", "category"]) |
|
|
.last() |
|
|
) |
|
|
refs["leg"] = {(code_bv, cat): row["share"] for (code_bv, cat), row in leg.iterrows()} |
|
|
refs["mun2020"] = {(code_bv, cat): row["share"] for (code_bv, cat), row in mun2020.iterrows()} |
|
|
return refs |
|
|
|
|
|
|
|
|
def load_feature_columns(path: Path, df: pd.DataFrame) -> List[str]: |
|
|
if path.exists(): |
|
|
return json.loads(path.read_text()) |
|
|
|
|
|
exclude = {"commune_code", "code_bv", "election_type", "election_year", "round", "date_scrutin"} |
|
|
return [c for c in df.columns if c not in exclude] |
|
|
|
|
|
|
|
|
def predict( |
|
|
model_path: Path, |
|
|
feature_df: pd.DataFrame, |
|
|
feature_cols: List[str], |
|
|
refs: Dict[str, Dict[str, float]], |
|
|
) -> pd.DataFrame: |
|
|
model = joblib.load(model_path) |
|
|
|
|
|
missing_cols = [c for c in feature_cols if c not in feature_df.columns] |
|
|
for col in missing_cols: |
|
|
feature_df[col] = np.nan |
|
|
preds = model.predict(feature_df[feature_cols]) |
|
|
preds = np.clip(preds, 0, 1) |
|
|
sums = preds.sum(axis=1, keepdims=True) |
|
|
sums[sums == 0] = 1 |
|
|
preds = preds / sums |
|
|
target_type = None |
|
|
if "election_type" in feature_df.columns and not feature_df.empty: |
|
|
target_type = str(feature_df["election_type"].iloc[0]) |
|
|
if target_type: |
|
|
preds = blend_with_type_history(preds, feature_df, target_type) |
|
|
preds_pct = preds * 100 |
|
|
|
|
|
rows = [] |
|
|
for idx, row in feature_df.iterrows(): |
|
|
code_bv = row["code_bv"] |
|
|
record = { |
|
|
"commune_code": row["commune_code"], |
|
|
"code_bv": code_bv, |
|
|
} |
|
|
for cat_idx, cat in enumerate(CANDIDATE_CATEGORIES): |
|
|
pred_val = preds_pct[idx, cat_idx] |
|
|
record[f"predicted_share_{cat}"] = round(float(pred_val), 2) |
|
|
leg_ref = refs["leg"].get((code_bv, cat)) |
|
|
mun_ref = refs["mun2020"].get((code_bv, cat)) |
|
|
record[f"delta_leg_{cat}"] = "N/A" if leg_ref is None else round(float(pred_val - leg_ref * 100), 2) |
|
|
record[f"delta_mun2020_{cat}"] = "N/A" if mun_ref is None else round(float(pred_val - mun_ref * 100), 2) |
|
|
rows.append(record) |
|
|
return pd.DataFrame(rows) |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
parser = argparse.ArgumentParser(description="Prédictions bureau par bureau pour une échéance cible.") |
|
|
parser.add_argument("--model-path", type=Path, default=Path("models/hist_gradient_boosting.joblib"), help="Modèle entraîné.") |
|
|
parser.add_argument("--feature-columns", type=Path, default=Path("models/feature_columns.json"), help="Colonnes de features attendues.") |
|
|
parser.add_argument("--elections-long", type=Path, default=Path("data/interim/elections_long.parquet"), help="Historique long.") |
|
|
parser.add_argument("--mapping", type=Path, default=Path("config/nuances.yaml"), help="Mapping nuances->catégories.") |
|
|
parser.add_argument("--target-election-type", type=str, default="municipales", help="Type d'élection cible.") |
|
|
parser.add_argument("--target-year", type=int, default=2026, help="Année cible.") |
|
|
parser.add_argument("--commune-code", type=str, default="34301", help="Code commune à filtrer (Sete=34301).") |
|
|
parser.add_argument("--output-dir", type=Path, default=Path("predictions"), help="Répertoire de sortie.") |
|
|
args = parser.parse_args() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") |
|
|
|
|
|
elections_long = load_elections_long(args.elections_long) |
|
|
elections_long = filter_history(elections_long, args.target_year, args.commune_code) |
|
|
mapping = load_mapping(args.mapping) |
|
|
|
|
|
feature_df = build_feature_matrix(elections_long, mapping, args.target_election_type, args.target_year) |
|
|
if feature_df.empty: |
|
|
raise RuntimeError("Aucune donnée historique disponible pour construire les features.") |
|
|
feature_cols = load_feature_columns(args.feature_columns, feature_df) |
|
|
refs = compute_references( |
|
|
aggregate_by_event(expand_by_category(elections_long, mapping)).assign( |
|
|
election_type=lambda d: d["election_type"] |
|
|
), |
|
|
args.target_year, |
|
|
) |
|
|
preds_df = predict(args.model_path, feature_df, feature_cols, refs) |
|
|
|
|
|
args.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
output_path = args.output_dir / f"pred_{args.target_election_type}_{args.target_year}_sete.csv" |
|
|
preds_df.to_csv(output_path, index=False) |
|
|
LOGGER.info("Prédictions écrites dans %s", output_path) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|