|
|
from __future__ import annotations |
|
|
|
|
|
from pathlib import Path |
|
|
import re |
|
|
from typing import Dict, Iterable, List, Mapping, Optional |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
STANDARD_COLUMNS: List[str] = [ |
|
|
"code_bv", |
|
|
"nom_bv", |
|
|
"annee", |
|
|
"date_scrutin", |
|
|
"type_scrutin", |
|
|
"tour", |
|
|
"inscrits", |
|
|
"votants", |
|
|
"abstentions", |
|
|
"blancs", |
|
|
"nuls", |
|
|
"exprimes", |
|
|
"code_candidature", |
|
|
"nom_candidature", |
|
|
"voix", |
|
|
] |
|
|
|
|
|
NUMERIC_COLUMNS = [ |
|
|
"inscrits", |
|
|
"votants", |
|
|
"abstentions", |
|
|
"blancs", |
|
|
"nuls", |
|
|
"exprimes", |
|
|
"voix", |
|
|
] |
|
|
|
|
|
|
|
|
_MOJIBAKE_REPLACEMENTS = { |
|
|
"é": "é", |
|
|
"è": "è", |
|
|
"ê": "ê", |
|
|
"ë": "ë", |
|
|
"Ã ": "à", |
|
|
"â": "â", |
|
|
"ç": "ç", |
|
|
"ù": "ù", |
|
|
"û": "û", |
|
|
"ï": "ï", |
|
|
"ô": "ô", |
|
|
"ö": "ö", |
|
|
"É": "É", |
|
|
"È": "È", |
|
|
"Ê": "Ê", |
|
|
"Ë": "Ë", |
|
|
"À": "À", |
|
|
"Â": "Â", |
|
|
"Ç": "Ç", |
|
|
"�": "°", |
|
|
"�": "°", |
|
|
} |
|
|
|
|
|
|
|
|
def _normalize_label(label: str) -> str: |
|
|
""" |
|
|
Attempt to repair mojibake in column labels (UTF-8 read as latin-1 or vice versa). |
|
|
""" |
|
|
fixed = label |
|
|
try: |
|
|
fixed = label.encode("latin1").decode("utf-8") |
|
|
except (UnicodeEncodeError, UnicodeDecodeError): |
|
|
fixed = label |
|
|
else: |
|
|
if "Â" in fixed: |
|
|
fixed = fixed.replace("Â", "") |
|
|
try: |
|
|
|
|
|
fixed = fixed.encode("utf-8").decode("latin1") |
|
|
except (UnicodeEncodeError, UnicodeDecodeError): |
|
|
pass |
|
|
for bad, good in _MOJIBAKE_REPLACEMENTS.items(): |
|
|
if bad in fixed: |
|
|
fixed = fixed.replace(bad, good) |
|
|
fixed = fixed.replace("\ufeff", "") |
|
|
fixed = " ".join(fixed.split()) |
|
|
return fixed |
|
|
|
|
|
|
|
|
def _canonical_label(label: str) -> str: |
|
|
""" |
|
|
Lowercase alpha-numeric only version of a label for fuzzy matching. |
|
|
""" |
|
|
import re |
|
|
|
|
|
norm = _normalize_label(label).lower() |
|
|
return re.sub(r"[^0-9a-z]", "", norm) |
|
|
|
|
|
|
|
|
def _unpivot_wide_candidates(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Detect wide candidate columns (e.g., 'Voix 1', 'Nuance liste 2') and unpivot to long. |
|
|
Keeps one row per candidate with standard columns 'voix' and 'code_candidature'. |
|
|
""" |
|
|
pattern = re.compile(r"^(?P<base>.*?)(?:\s+|_)?(?P<idx>\d+)$") |
|
|
candidate_map: Dict[str, Dict[str, str]] = {} |
|
|
wide_cols: set[str] = set() |
|
|
for col in df.columns: |
|
|
match = pattern.match(col) |
|
|
if not match: |
|
|
continue |
|
|
wide_cols.add(col) |
|
|
base = match.group("base").strip() |
|
|
idx = match.group("idx") |
|
|
canon = _canonical_label(base) |
|
|
field = None |
|
|
if canon == "voix": |
|
|
field = "voix" |
|
|
elif canon in {"nuance", "nuanceliste", "codenuance", "codenuanceducandidat", "codenuanceliste"}: |
|
|
field = "code_candidature" |
|
|
if field: |
|
|
candidate_map.setdefault(idx, {})[field] = col |
|
|
|
|
|
indices = [ |
|
|
idx for idx, fields in candidate_map.items() |
|
|
if {"voix", "code_candidature"}.issubset(fields.keys()) |
|
|
] |
|
|
if len(indices) <= 1: |
|
|
return df |
|
|
|
|
|
candidate_cols = {col for fields in candidate_map.values() for col in fields.values()} |
|
|
base_cols = [c for c in df.columns if c not in wide_cols] |
|
|
frames = [] |
|
|
for idx in sorted(indices, key=lambda v: int(v)): |
|
|
fields = candidate_map[idx] |
|
|
use_cols = base_cols + list(fields.values()) |
|
|
sub = df[use_cols].copy() |
|
|
sub = sub.rename( |
|
|
columns={ |
|
|
fields["voix"]: "voix", |
|
|
fields["code_candidature"]: "code_candidature", |
|
|
} |
|
|
) |
|
|
frames.append(sub) |
|
|
return pd.concat(frames, ignore_index=True) |
|
|
|
|
|
|
|
|
def deduplicate_columns(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
If multiple columns end up with the same name after rename/normalization, |
|
|
keep the first non-null value across duplicates and drop the extras. |
|
|
""" |
|
|
df = df.copy() |
|
|
duplicates = df.columns[df.columns.duplicated()].unique() |
|
|
for col in duplicates: |
|
|
cols = [c for c in df.columns if c == col] |
|
|
base = df[cols[0]] |
|
|
for extra in cols[1:]: |
|
|
base = base.fillna(df[extra]) |
|
|
df[col] = base |
|
|
df = df.drop(columns=cols[1:]) |
|
|
|
|
|
df = df.loc[:, ~df.columns.duplicated()] |
|
|
return df |
|
|
|
|
|
|
|
|
def load_raw( |
|
|
path: Path, |
|
|
*, |
|
|
sep: str = ";", |
|
|
encoding: str | Iterable[str] = "cp1252", |
|
|
decimal: str = ",", |
|
|
dtype: Optional[Mapping[str, str]] = None, |
|
|
engine: str = "c", |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Wrapper around read_csv with encoding fallbacks to mitigate mojibake. |
|
|
|
|
|
Tries encodings in order (default: cp1252, utf-8-sig, latin-1) until column |
|
|
names no longer contain replacement artefacts (� or Ã), then normalises labels. |
|
|
""" |
|
|
encoding_choices: List[str] = [] |
|
|
if isinstance(encoding, str): |
|
|
encoding_choices.append(encoding) |
|
|
else: |
|
|
encoding_choices.extend(list(encoding)) |
|
|
encoding_choices.extend([e for e in ["utf-8-sig", "latin-1"] if e not in encoding_choices]) |
|
|
|
|
|
last_exc: Optional[Exception] = None |
|
|
for enc in encoding_choices: |
|
|
try: |
|
|
try: |
|
|
df = pd.read_csv( |
|
|
path, |
|
|
sep=sep, |
|
|
encoding=enc, |
|
|
decimal=decimal, |
|
|
dtype=dtype, |
|
|
engine=engine, |
|
|
low_memory=False, |
|
|
) |
|
|
except pd.errors.ParserError: |
|
|
|
|
|
df = pd.read_csv( |
|
|
path, |
|
|
sep=sep, |
|
|
encoding=enc, |
|
|
decimal=decimal, |
|
|
dtype=dtype, |
|
|
engine="python", |
|
|
on_bad_lines="skip", |
|
|
) |
|
|
except UnicodeDecodeError as exc: |
|
|
last_exc = exc |
|
|
continue |
|
|
|
|
|
bad_cols = any(("�" in col) or ("Ã" in col) for col in df.columns) |
|
|
if bad_cols and enc != encoding_choices[-1]: |
|
|
|
|
|
continue |
|
|
|
|
|
df.columns = [_normalize_label(c) for c in df.columns] |
|
|
return df |
|
|
|
|
|
if last_exc: |
|
|
raise last_exc |
|
|
raise UnicodeDecodeError("utf-8", b"", 0, 1, "unable to decode with provided encodings") |
|
|
|
|
|
|
|
|
def ensure_columns(df: pd.DataFrame, required: Iterable[str]) -> pd.DataFrame: |
|
|
""" |
|
|
Add missing columns with NaN placeholders to guarantee downstream compatibility. |
|
|
""" |
|
|
for col in required: |
|
|
if col not in df.columns: |
|
|
df[col] = np.nan |
|
|
return df |
|
|
|
|
|
|
|
|
def add_election_metadata(df: pd.DataFrame, meta: Mapping[str, object]) -> pd.DataFrame: |
|
|
""" |
|
|
Attach metadata about the scrutin to each row. |
|
|
|
|
|
Required meta keys: |
|
|
- type_scrutin |
|
|
- tour |
|
|
- date_scrutin |
|
|
|
|
|
Optional: |
|
|
- annee (otherwise derived from date_scrutin) |
|
|
""" |
|
|
df["type_scrutin"] = meta["type_scrutin"] |
|
|
df["tour"] = int(meta["tour"]) |
|
|
df["date_scrutin"] = pd.to_datetime(meta["date_scrutin"]) |
|
|
df["annee"] = meta.get("annee", df["date_scrutin"].dt.year) |
|
|
return df |
|
|
|
|
|
|
|
|
def build_code_bv(df: pd.DataFrame, meta: Mapping[str, object]) -> pd.DataFrame: |
|
|
""" |
|
|
Ensure a code_bv column exists. If already present, it is left intact. |
|
|
|
|
|
Optionally, pass in meta["code_bv_cols"] as a list of column names to combine. |
|
|
""" |
|
|
if "code_bv" in df.columns: |
|
|
df["code_bv"] = df["code_bv"].astype(str).str.strip() |
|
|
return df |
|
|
|
|
|
columns_to_concat: Optional[List[str]] = meta.get("code_bv_cols") |
|
|
if columns_to_concat: |
|
|
actual_cols: List[str] = [] |
|
|
canon_map = {_canonical_label(col): col for col in df.columns} |
|
|
for target in columns_to_concat: |
|
|
canon = _canonical_label(target) |
|
|
if canon in canon_map: |
|
|
actual_cols.append(canon_map[canon]) |
|
|
else: |
|
|
raise KeyError(f"{target!r} not found in columns. Available: {list(df.columns)}") |
|
|
|
|
|
df["code_bv"] = ( |
|
|
df[actual_cols] |
|
|
.astype(str) |
|
|
.apply(lambda row: "-".join([v.zfill(3) if v.isdigit() else v for v in row]), axis=1) |
|
|
) |
|
|
else: |
|
|
raise KeyError("code_bv not found in dataframe and no code_bv_cols provided in meta.") |
|
|
return df |
|
|
|
|
|
|
|
|
def coerce_numeric(df: pd.DataFrame, numeric_cols: Iterable[str] = NUMERIC_COLUMNS) -> pd.DataFrame: |
|
|
for col in numeric_cols: |
|
|
if col in df.columns: |
|
|
df[col] = pd.to_numeric(df[col], errors="coerce") |
|
|
return df |
|
|
|
|
|
|
|
|
def basic_cleaning(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Apply harmonisations common to all scrutins. |
|
|
""" |
|
|
df = df.copy() |
|
|
df["voix"] = df.get("voix", 0).fillna(0) |
|
|
|
|
|
|
|
|
mask_expr = ( |
|
|
df["exprimes"].isna() |
|
|
& df["votants"].notna() |
|
|
& df["blancs"].notna() |
|
|
& df["nuls"].notna() |
|
|
) |
|
|
df.loc[mask_expr, "exprimes"] = ( |
|
|
df.loc[mask_expr, "votants"] - df.loc[mask_expr, "blancs"] - df.loc[mask_expr, "nuls"] |
|
|
) |
|
|
|
|
|
|
|
|
df = df[df["code_bv"].notna()] |
|
|
return df |
|
|
|
|
|
|
|
|
def standardize_election( |
|
|
path: Path, |
|
|
meta: Mapping[str, object], |
|
|
*, |
|
|
rename_map: Optional[Mapping[str, str]] = None, |
|
|
sep: str = ";", |
|
|
encoding: str | Iterable[str] = ("cp1252", "utf-8-sig", "latin-1"), |
|
|
decimal: str = ",", |
|
|
dtype: Optional[Mapping[str, str]] = None, |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Load and standardise a single raw table to the long format expected downstream. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
path : Path |
|
|
CSV path to the raw election table. |
|
|
meta : Mapping |
|
|
Must contain type_scrutin, tour, date_scrutin. Optionally code_bv_cols and annee. |
|
|
rename_map : Mapping |
|
|
Columns to rename from the raw schema to the standard schema. |
|
|
""" |
|
|
df_raw = load_raw(path, sep=sep, encoding=encoding, decimal=decimal, dtype=dtype) |
|
|
rename_norm = {_normalize_label(k): v for k, v in (rename_map or {}).items()} |
|
|
|
|
|
def _process(df: pd.DataFrame, meta_for_tour: Mapping[str, object]) -> pd.DataFrame: |
|
|
df_local = df.copy() |
|
|
df_local.columns = [_normalize_label(c) for c in df_local.columns] |
|
|
df_local = _unpivot_wide_candidates(df_local) |
|
|
if rename_norm: |
|
|
|
|
|
import re |
|
|
|
|
|
def canonical_base(label: str) -> str: |
|
|
base = _canonical_label(label) |
|
|
return re.sub(r"\\d+$", "", base) |
|
|
|
|
|
rename_by_base = {canonical_base(k): v for k, v in rename_norm.items()} |
|
|
rename_using = {} |
|
|
for col in df_local.columns: |
|
|
base = canonical_base(col) |
|
|
if base in rename_by_base: |
|
|
rename_using[col] = rename_by_base[base] |
|
|
df_local = df_local.rename(columns=rename_using) |
|
|
df_local = deduplicate_columns(df_local) |
|
|
df_local = df_local.loc[:, ~df_local.columns.duplicated()] |
|
|
|
|
|
df_local = build_code_bv(df_local, meta_for_tour) |
|
|
df_local = add_election_metadata(df_local, meta_for_tour) |
|
|
df_local = ensure_columns(df_local, STANDARD_COLUMNS) |
|
|
df_local = coerce_numeric(df_local) |
|
|
df_local = basic_cleaning(df_local) |
|
|
ordered_cols = STANDARD_COLUMNS + [col for col in df_local.columns if col not in STANDARD_COLUMNS] |
|
|
return df_local[ordered_cols] |
|
|
|
|
|
|
|
|
if meta.get("tour_column") and "tour" not in meta: |
|
|
tour_col = _normalize_label(str(meta["tour_column"])) |
|
|
if tour_col not in df_raw.columns: |
|
|
|
|
|
meta_single = {k: v for k, v in meta.items() if k != "tour_column"} |
|
|
meta_single["tour"] = int(meta.get("tour", 1)) |
|
|
return _process(df_raw, meta_single) |
|
|
tours = meta.get("tours") or sorted(df_raw[tour_col].dropna().unique()) |
|
|
frames: list[pd.DataFrame] = [] |
|
|
for tour_val in tours: |
|
|
meta_tour = {k: v for k, v in meta.items() if k != "tour_column"} |
|
|
meta_tour["tour"] = int(tour_val) |
|
|
frames.append(_process(df_raw[df_raw[tour_col] == tour_val], meta_tour)) |
|
|
if not frames: |
|
|
raise RuntimeError(f"Aucun tour détecté pour {path.name}") |
|
|
return pd.concat(frames, ignore_index=True) |
|
|
|
|
|
return _process(df_raw, meta) |
|
|
|
|
|
|
|
|
def validate_consistency(df: pd.DataFrame, *, tolerance: float = 0.02) -> Dict[str, pd.DataFrame]: |
|
|
""" |
|
|
Quick validation checks. Returns a dict of issues to inspect. |
|
|
""" |
|
|
issues: Dict[str, pd.DataFrame] = {} |
|
|
|
|
|
if {"votants", "inscrits"}.issubset(df.columns): |
|
|
issues["votants_gt_inscrits"] = df[df["votants"] > df["inscrits"]] |
|
|
|
|
|
if {"exprimes", "blancs", "nuls", "votants"}.issubset(df.columns): |
|
|
expr_gap = df.copy() |
|
|
expr_gap["gap"] = ( |
|
|
(expr_gap["exprimes"] + expr_gap["blancs"] + expr_gap["nuls"] - expr_gap["votants"]) |
|
|
/ expr_gap["votants"].replace(0, np.nan) |
|
|
) |
|
|
issues["exprimes_balance_off"] = expr_gap[expr_gap["gap"].abs() > tolerance] |
|
|
|
|
|
if {"code_bv", "type_scrutin", "tour", "exprimes", "voix"}.issubset(df.columns): |
|
|
sums = df.groupby(["code_bv", "type_scrutin", "tour"], as_index=False)[["exprimes", "voix"]].sum() |
|
|
sums["gap"] = (sums["voix"] - sums["exprimes"]) / sums["exprimes"].replace(0, np.nan) |
|
|
issues["sum_voix_vs_exprimes"] = sums[sums["gap"].abs() > tolerance] |
|
|
|
|
|
return issues |
|
|
|