""" proxy_log.py – Persistence layer for proxy/scenario selections. Each time the engine returns a result (single search or formulation row), the chosen proxy is logged. The CSV file lives next to app.py so it survives Streamlit restarts. Colonnes du CSV : timestamp, matiere_recherchee, proxy_choisi, scenario, impact_kg_co2_t, source_db, match_exact, pays_production, pays_transformation, type_mp """ from __future__ import annotations import csv import os from datetime import datetime from pathlib import Path from collections import Counter import pandas as pd # --------------------------------------------------------------------------- # Fichier de stockage # --------------------------------------------------------------------------- _LOG_DIR = Path(__file__).parent / "data" _LOG_FILE = _LOG_DIR / "proxy_selections.csv" _FIELDNAMES = [ "timestamp", "matiere_recherchee", "proxy_choisi", "scenario", "impact_kg_co2_t", "source_db", "match_exact", "pays_production", "pays_transformation", "type_mp", ] def _ensure_file() -> None: """Crée le répertoire et le fichier CSV avec en-tête s'ils n'existent pas.""" _LOG_DIR.mkdir(parents=True, exist_ok=True) if not _LOG_FILE.exists(): with open(_LOG_FILE, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=_FIELDNAMES) writer.writeheader() # --------------------------------------------------------------------------- # Écriture # --------------------------------------------------------------------------- def log_selection( matiere_recherchee: str, proxy_choisi: str, scenario: str, impact_kg_co2_t: float | None = None, source_db: str = "", match_exact: bool = True, pays_production: str = "", pays_transformation: str = "", type_mp: str = "vegetal_animal", ) -> None: """Enregistre une sélection de proxy dans le fichier CSV.""" _ensure_file() row = { "timestamp": datetime.now().isoformat(timespec="seconds"), "matiere_recherchee": matiere_recherchee, "proxy_choisi": proxy_choisi, "scenario": scenario, "impact_kg_co2_t": round(impact_kg_co2_t, 2) if impact_kg_co2_t is not None else "", "source_db": source_db, "match_exact": "Oui" if match_exact else "Non", "pays_production": pays_production or "", "pays_transformation": pays_transformation or "", "type_mp": type_mp, } with open(_LOG_FILE, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=_FIELDNAMES) writer.writerow(row) # --------------------------------------------------------------------------- # Lecture / statistiques # --------------------------------------------------------------------------- def load_log() -> pd.DataFrame: """Charge le journal complet sous forme de DataFrame.""" _ensure_file() df = pd.read_csv(_LOG_FILE, encoding="utf-8") if "timestamp" in df.columns: df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") return df def top_proxies(n: int = 20, days: int | None = None) -> pd.DataFrame: """Renvoie les *n* proxies les plus choisis (optionnel : sur les *days* derniers jours). Colonnes retournées : proxy_choisi, nb_selections, dernière_utilisation """ df = load_log() if df.empty: return pd.DataFrame(columns=["proxy_choisi", "nb_selections", "dernière_utilisation"]) if days is not None and "timestamp" in df.columns: cutoff = pd.Timestamp.now() - pd.Timedelta(days=days) df = df[df["timestamp"] >= cutoff] if df.empty: return pd.DataFrame(columns=["proxy_choisi", "nb_selections", "dernière_utilisation"]) stats = ( df.groupby("proxy_choisi", sort=False) .agg( nb_selections=("proxy_choisi", "size"), dernière_utilisation=("timestamp", "max"), ) .reset_index() .sort_values("nb_selections", ascending=False) .head(n) ) return stats def top_scenarios(n: int = 20, days: int | None = None) -> pd.DataFrame: """Renvoie les *n* scénarios les plus fréquents.""" df = load_log() if df.empty: return pd.DataFrame(columns=["scenario", "nb_selections", "dernière_utilisation"]) if days is not None and "timestamp" in df.columns: cutoff = pd.Timestamp.now() - pd.Timedelta(days=days) df = df[df["timestamp"] >= cutoff] if df.empty: return pd.DataFrame(columns=["scenario", "nb_selections", "dernière_utilisation"]) stats = ( df.groupby("scenario", sort=False) .agg( nb_selections=("scenario", "size"), dernière_utilisation=("timestamp", "max"), ) .reset_index() .sort_values("nb_selections", ascending=False) .head(n) ) return stats