bdv / src /data_prep.py
stephmnt's picture
Sync from GitHub Actions
46f9144 verified
from __future__ import annotations
from pathlib import Path
import re
from typing import Dict, Iterable, List, Mapping, Optional
import numpy as np
import pandas as pd
# Columns kept across all scrutins
STANDARD_COLUMNS: List[str] = [
"code_bv",
"nom_bv",
"annee",
"date_scrutin",
"type_scrutin",
"tour",
"inscrits",
"votants",
"abstentions",
"blancs",
"nuls",
"exprimes",
"code_candidature",
"nom_candidature",
"voix",
]
NUMERIC_COLUMNS = [
"inscrits",
"votants",
"abstentions",
"blancs",
"nuls",
"exprimes",
"voix",
]
_MOJIBAKE_REPLACEMENTS = {
"é": "é",
"è": "è",
"ê": "ê",
"ë": "ë",
"Ã ": "à",
"â": "â",
"ç": "ç",
"ù": "ù",
"û": "û",
"ï": "ï",
"ô": "ô",
"ö": "ö",
"É": "É",
"È": "È",
"Ê": "Ê",
"Ë": "Ë",
"À": "À",
"Â": "Â",
"Ç": "Ç",
"�": "°",
"�": "°",
}
def _normalize_label(label: str) -> str:
"""
Attempt to repair mojibake in column labels (UTF-8 read as latin-1 or vice versa).
"""
fixed = label
try:
fixed = label.encode("latin1").decode("utf-8")
except (UnicodeEncodeError, UnicodeDecodeError):
fixed = label
else:
if "Â" in fixed:
fixed = fixed.replace("Â", "")
try:
# Alternate path: utf-8 bytes decoded as latin1 then re-decoded
fixed = fixed.encode("utf-8").decode("latin1")
except (UnicodeEncodeError, UnicodeDecodeError):
pass
for bad, good in _MOJIBAKE_REPLACEMENTS.items():
if bad in fixed:
fixed = fixed.replace(bad, good)
fixed = fixed.replace("\ufeff", "") # remove BOM
fixed = " ".join(fixed.split()) # normalise whitespace
return fixed
def _canonical_label(label: str) -> str:
"""
Lowercase alpha-numeric only version of a label for fuzzy matching.
"""
import re
norm = _normalize_label(label).lower()
return re.sub(r"[^0-9a-z]", "", norm)
def _unpivot_wide_candidates(df: pd.DataFrame) -> pd.DataFrame:
"""
Detect wide candidate columns (e.g., 'Voix 1', 'Nuance liste 2') and unpivot to long.
Keeps one row per candidate with standard columns 'voix' and 'code_candidature'.
"""
pattern = re.compile(r"^(?P<base>.*?)(?:\s+|_)?(?P<idx>\d+)$")
candidate_map: Dict[str, Dict[str, str]] = {}
wide_cols: set[str] = set()
for col in df.columns:
match = pattern.match(col)
if not match:
continue
wide_cols.add(col)
base = match.group("base").strip()
idx = match.group("idx")
canon = _canonical_label(base)
field = None
if canon == "voix":
field = "voix"
elif canon in {"nuance", "nuanceliste", "codenuance", "codenuanceducandidat", "codenuanceliste"}:
field = "code_candidature"
if field:
candidate_map.setdefault(idx, {})[field] = col
indices = [
idx for idx, fields in candidate_map.items()
if {"voix", "code_candidature"}.issubset(fields.keys())
]
if len(indices) <= 1:
return df
candidate_cols = {col for fields in candidate_map.values() for col in fields.values()}
base_cols = [c for c in df.columns if c not in wide_cols]
frames = []
for idx in sorted(indices, key=lambda v: int(v)):
fields = candidate_map[idx]
use_cols = base_cols + list(fields.values())
sub = df[use_cols].copy()
sub = sub.rename(
columns={
fields["voix"]: "voix",
fields["code_candidature"]: "code_candidature",
}
)
frames.append(sub)
return pd.concat(frames, ignore_index=True)
def deduplicate_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
If multiple columns end up with the same name after rename/normalization,
keep the first non-null value across duplicates and drop the extras.
"""
df = df.copy()
duplicates = df.columns[df.columns.duplicated()].unique()
for col in duplicates:
cols = [c for c in df.columns if c == col]
base = df[cols[0]]
for extra in cols[1:]:
base = base.fillna(df[extra])
df[col] = base
df = df.drop(columns=cols[1:])
# ensure uniqueness
df = df.loc[:, ~df.columns.duplicated()]
return df
def load_raw(
path: Path,
*,
sep: str = ";",
encoding: str | Iterable[str] = "cp1252",
decimal: str = ",",
dtype: Optional[Mapping[str, str]] = None,
engine: str = "c",
) -> pd.DataFrame:
"""
Wrapper around read_csv with encoding fallbacks to mitigate mojibake.
Tries encodings in order (default: cp1252, utf-8-sig, latin-1) until column
names no longer contain replacement artefacts (� or Ã), then normalises labels.
"""
encoding_choices: List[str] = []
if isinstance(encoding, str):
encoding_choices.append(encoding)
else:
encoding_choices.extend(list(encoding))
encoding_choices.extend([e for e in ["utf-8-sig", "latin-1"] if e not in encoding_choices])
last_exc: Optional[Exception] = None
for enc in encoding_choices:
try:
try:
df = pd.read_csv(
path,
sep=sep,
encoding=enc,
decimal=decimal,
dtype=dtype, # type: ignore
engine=engine, # type: ignore
low_memory=False,
)
except pd.errors.ParserError:
# Retry with python engine and skip malformed lines (low_memory not supported)
df = pd.read_csv(
path,
sep=sep,
encoding=enc,
decimal=decimal,
dtype=dtype, # type: ignore
engine="python",
on_bad_lines="skip",
)
except UnicodeDecodeError as exc:
last_exc = exc
continue
bad_cols = any(("�" in col) or ("Ã" in col) for col in df.columns)
if bad_cols and enc != encoding_choices[-1]:
# try next encoding candidate
continue
df.columns = [_normalize_label(c) for c in df.columns]
return df
if last_exc:
raise last_exc
raise UnicodeDecodeError("utf-8", b"", 0, 1, "unable to decode with provided encodings")
def ensure_columns(df: pd.DataFrame, required: Iterable[str]) -> pd.DataFrame:
"""
Add missing columns with NaN placeholders to guarantee downstream compatibility.
"""
for col in required:
if col not in df.columns:
df[col] = np.nan
return df
def add_election_metadata(df: pd.DataFrame, meta: Mapping[str, object]) -> pd.DataFrame:
"""
Attach metadata about the scrutin to each row.
Required meta keys:
- type_scrutin
- tour
- date_scrutin
Optional:
- annee (otherwise derived from date_scrutin)
"""
df["type_scrutin"] = meta["type_scrutin"]
df["tour"] = int(meta["tour"]) # type: ignore
df["date_scrutin"] = pd.to_datetime(meta["date_scrutin"]) # type: ignore
df["annee"] = meta.get("annee", df["date_scrutin"].dt.year) # type: ignore
return df
def build_code_bv(df: pd.DataFrame, meta: Mapping[str, object]) -> pd.DataFrame:
"""
Ensure a code_bv column exists. If already present, it is left intact.
Optionally, pass in meta["code_bv_cols"] as a list of column names to combine.
"""
if "code_bv" in df.columns:
df["code_bv"] = df["code_bv"].astype(str).str.strip()
return df
columns_to_concat: Optional[List[str]] = meta.get("code_bv_cols") # type: ignore[arg-type]
if columns_to_concat:
actual_cols: List[str] = []
canon_map = {_canonical_label(col): col for col in df.columns}
for target in columns_to_concat:
canon = _canonical_label(target)
if canon in canon_map:
actual_cols.append(canon_map[canon])
else:
raise KeyError(f"{target!r} not found in columns. Available: {list(df.columns)}")
df["code_bv"] = (
df[actual_cols]
.astype(str)
.apply(lambda row: "-".join([v.zfill(3) if v.isdigit() else v for v in row]), axis=1)
)
else:
raise KeyError("code_bv not found in dataframe and no code_bv_cols provided in meta.")
return df
def coerce_numeric(df: pd.DataFrame, numeric_cols: Iterable[str] = NUMERIC_COLUMNS) -> pd.DataFrame:
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
def basic_cleaning(df: pd.DataFrame) -> pd.DataFrame:
"""
Apply harmonisations common to all scrutins.
"""
df = df.copy()
df["voix"] = df.get("voix", 0).fillna(0) # type: ignore
# Recompute exprimes when possible
mask_expr = (
df["exprimes"].isna()
& df["votants"].notna()
& df["blancs"].notna()
& df["nuls"].notna()
)
df.loc[mask_expr, "exprimes"] = (
df.loc[mask_expr, "votants"] - df.loc[mask_expr, "blancs"] - df.loc[mask_expr, "nuls"]
)
# Remove rows without minimal identifiers
df = df[df["code_bv"].notna()]
return df
def standardize_election(
path: Path,
meta: Mapping[str, object],
*,
rename_map: Optional[Mapping[str, str]] = None,
sep: str = ";",
encoding: str | Iterable[str] = ("cp1252", "utf-8-sig", "latin-1"),
decimal: str = ",",
dtype: Optional[Mapping[str, str]] = None,
) -> pd.DataFrame:
"""
Load and standardise a single raw table to the long format expected downstream.
Parameters
----------
path : Path
CSV path to the raw election table.
meta : Mapping
Must contain type_scrutin, tour, date_scrutin. Optionally code_bv_cols and annee.
rename_map : Mapping
Columns to rename from the raw schema to the standard schema.
"""
df_raw = load_raw(path, sep=sep, encoding=encoding, decimal=decimal, dtype=dtype)
rename_norm = {_normalize_label(k): v for k, v in (rename_map or {}).items()}
def _process(df: pd.DataFrame, meta_for_tour: Mapping[str, object]) -> pd.DataFrame:
df_local = df.copy()
df_local.columns = [_normalize_label(c) for c in df_local.columns]
df_local = _unpivot_wide_candidates(df_local)
if rename_norm:
# Renommer en se basant sur une version canonique (sans accents/espaces) et en ignorant d'éventuels suffixes numériques.
import re
def canonical_base(label: str) -> str:
base = _canonical_label(label)
return re.sub(r"\\d+$", "", base)
rename_by_base = {canonical_base(k): v for k, v in rename_norm.items()}
rename_using = {}
for col in df_local.columns:
base = canonical_base(col)
if base in rename_by_base:
rename_using[col] = rename_by_base[base]
df_local = df_local.rename(columns=rename_using)
df_local = deduplicate_columns(df_local)
df_local = df_local.loc[:, ~df_local.columns.duplicated()]
df_local = build_code_bv(df_local, meta_for_tour)
df_local = add_election_metadata(df_local, meta_for_tour)
df_local = ensure_columns(df_local, STANDARD_COLUMNS)
df_local = coerce_numeric(df_local)
df_local = basic_cleaning(df_local)
ordered_cols = STANDARD_COLUMNS + [col for col in df_local.columns if col not in STANDARD_COLUMNS]
return df_local[ordered_cols]
# Multi-tour handling: split on tour_column if provided and "tour" not explicit
if meta.get("tour_column") and "tour" not in meta:
tour_col = _normalize_label(str(meta["tour_column"]))
if tour_col not in df_raw.columns:
# Fallback: considérer un seul tour = 1 si la colonne est introuvable
meta_single = {k: v for k, v in meta.items() if k != "tour_column"}
meta_single["tour"] = int(meta.get("tour", 1))
return _process(df_raw, meta_single)
tours = meta.get("tours") or sorted(df_raw[tour_col].dropna().unique())
frames: list[pd.DataFrame] = []
for tour_val in tours:
meta_tour = {k: v for k, v in meta.items() if k != "tour_column"}
meta_tour["tour"] = int(tour_val)
frames.append(_process(df_raw[df_raw[tour_col] == tour_val], meta_tour))
if not frames:
raise RuntimeError(f"Aucun tour détecté pour {path.name}")
return pd.concat(frames, ignore_index=True)
return _process(df_raw, meta)
def validate_consistency(df: pd.DataFrame, *, tolerance: float = 0.02) -> Dict[str, pd.DataFrame]:
"""
Quick validation checks. Returns a dict of issues to inspect.
"""
issues: Dict[str, pd.DataFrame] = {}
if {"votants", "inscrits"}.issubset(df.columns):
issues["votants_gt_inscrits"] = df[df["votants"] > df["inscrits"]]
if {"exprimes", "blancs", "nuls", "votants"}.issubset(df.columns):
expr_gap = df.copy()
expr_gap["gap"] = (
(expr_gap["exprimes"] + expr_gap["blancs"] + expr_gap["nuls"] - expr_gap["votants"])
/ expr_gap["votants"].replace(0, np.nan)
)
issues["exprimes_balance_off"] = expr_gap[expr_gap["gap"].abs() > tolerance]
if {"code_bv", "type_scrutin", "tour", "exprimes", "voix"}.issubset(df.columns):
sums = df.groupby(["code_bv", "type_scrutin", "tour"], as_index=False)[["exprimes", "voix"]].sum()
sums["gap"] = (sums["voix"] - sums["exprimes"]) / sums["exprimes"].replace(0, np.nan)
issues["sum_voix_vs_exprimes"] = sums[sums["gap"].abs() > tolerance]
return issues