| from __future__ import annotations | |
| import os | |
| from pathlib import Path | |
| from typing import Iterable, Optional | |
| import pandas as pd | |
| import sqlalchemy as sa | |
| from sqlalchemy import Column, Date, Float, Integer, MetaData, String, Table | |
| from sqlalchemy.engine import Engine | |
| from .constants import NUMERIC_COLUMNS | |
| from .pipeline import normalize_bloc | |
| def get_engine(url: Optional[str] = None) -> Engine: | |
| db_url = url or os.getenv("DATABASE_URL") | |
| if not db_url: | |
| raise RuntimeError("DATABASE_URL is not set. Example: postgresql+psycopg2://user:pass@localhost:5432/elections") | |
| return sa.create_engine(db_url) | |
| def define_schema(metadata: MetaData) -> Table: | |
| return Table( | |
| "election_results", | |
| metadata, | |
| Column("id", Integer, primary_key=True, autoincrement=True), | |
| Column("code_bv", String(32), index=True, nullable=False), | |
| Column("nom_bv", String(255)), | |
| Column("date_scrutin", Date, index=True, nullable=False), | |
| Column("annee", Integer, index=True, nullable=False), | |
| Column("type_scrutin", String(32), index=True, nullable=False), | |
| Column("tour", Integer, nullable=False), | |
| Column("bloc", String(64), index=True, nullable=False), | |
| Column("voix_bloc", Float), | |
| Column("exprimes", Float), | |
| Column("inscrits", Float), | |
| Column("votants", Float), | |
| Column("blancs", Float), | |
| Column("nuls", Float), | |
| Column("part_bloc", Float), | |
| Column("part_bloc_national", Float), | |
| Column("taux_participation_national", Float), | |
| Column("taux_participation_bv", Float), | |
| Column("taux_blancs_bv", Float), | |
| Column("taux_nuls_bv", Float), | |
| Column("ecart_bloc_vs_national", Float), | |
| Column("ecart_participation_vs_nat", Float), | |
| Column("croissance_inscrits_depuis_base", Float), | |
| Column("part_bloc_lag1", Float), | |
| Column("ecart_bloc_vs_national_lag1", Float), | |
| Column("taux_participation_bv_lag1", Float), | |
| Column("annee_centre", Float), | |
| ) | |
| def create_schema(engine: Engine) -> None: | |
| metadata = MetaData() | |
| define_schema(metadata) | |
| metadata.create_all(engine) | |
| def _coerce_numeric(df: pd.DataFrame, numeric_cols: Iterable[str]) -> pd.DataFrame: | |
| for col in numeric_cols: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors="coerce") | |
| return df | |
| def load_processed_to_db( | |
| processed_path: Path = Path("data/processed/elections_blocs.csv"), | |
| *, | |
| engine: Optional[Engine] = None, | |
| if_exists: str = "replace", | |
| chunksize: int = 1000, | |
| ) -> int: | |
| """ | |
| Load the processed bloc-level dataset into PostgreSQL. | |
| Returns the number of rows written. | |
| """ | |
| engine = engine or get_engine() | |
| create_schema(engine) | |
| df = pd.read_csv(processed_path, sep=";") | |
| df["date_scrutin"] = pd.to_datetime(df["date_scrutin"]).dt.date | |
| if "bloc" in df.columns: | |
| df["bloc"] = df["bloc"].apply(normalize_bloc) | |
| df = _coerce_numeric(df, NUMERIC_COLUMNS) | |
| df.to_sql( | |
| "election_results", | |
| engine, | |
| if_exists=if_exists, | |
| index=False, | |
| method="multi", | |
| chunksize=chunksize, | |
| ) | |
| return len(df) | |
| def list_bureaux(engine: Engine) -> list[str]: | |
| with engine.connect() as conn: | |
| result = conn.execute(sa.text("select distinct code_bv from election_results order by code_bv")) | |
| return [row[0] for row in result.fetchall()] | |
| def fetch_history(engine: Engine, code_bv: str) -> pd.DataFrame: | |
| query = sa.text( | |
| """ | |
| select * | |
| from election_results | |
| where code_bv = :code_bv | |
| order by date_scrutin asc, bloc asc | |
| """ | |
| ) | |
| return pd.read_sql(query, engine, params={"code_bv": code_bv}) | |
| __all__ = [ | |
| "create_schema", | |
| "define_schema", | |
| "fetch_history", | |
| "get_engine", | |
| "list_bureaux", | |
| "load_processed_to_db", | |
| ] | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Initialise la base et charge les résultats.") | |
| parser.add_argument( | |
| "--load", | |
| action="store_true", | |
| help="Charger data/processed/elections_blocs.csv dans la base (remplace la table).", | |
| ) | |
| parser.add_argument( | |
| "--path", | |
| type=Path, | |
| default=Path("data/processed/elections_blocs.csv"), | |
| help="Chemin vers le fichier processe (CSV ; par defaut data/processed/elections_blocs.csv).", | |
| ) | |
| args = parser.parse_args() | |
| engine = get_engine() | |
| create_schema(engine) | |
| if args.load: | |
| rows = load_processed_to_db(args.path, engine=engine) | |
| print(f"{rows} lignes inserees dans election_results.") | |
| else: | |
| print("Schema cree. Utilisez --load pour charger les donnees.") | |