ExtremePrecipit / app /utils /stats_utils.py
ncsdecoopman's picture
Déploiement Docker depuis workflow (structure corrigée)
0ab0788
import streamlit as st
import polars as pl
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
def compute_statistic_per_point(df: pl.DataFrame, stat_key: str) -> pl.DataFrame:
cols = df.columns
if stat_key == "mean":
has_h = "mean_mm_h" in cols
if has_h:
df = df.with_columns(
(pl.col("mean_mm_h") * 24).alias("mean_mm_j")
)
return df.group_by("NUM_POSTE").agg([
*( [pl.col("mean_mm_h").mean().alias("mean_all_mm_h")] if has_h else [] ),
*( [pl.col("mean_mm_j").mean().alias("mean_all_mm_j")] if has_h else [] ),
])
elif stat_key == "max":
return df.group_by("NUM_POSTE").agg([
*( [pl.col("max_mm_h").max().alias("max_all_mm_h")] if "max_mm_h" in cols else [] ),
*( [pl.col("max_mm_j").max().alias("max_all_mm_j")] if "max_mm_j" in cols else [] ),
])
elif stat_key == "mean-max":
return df.group_by("NUM_POSTE").agg([
*( [pl.col("max_mm_h").mean().alias("max_mean_mm_h")] if "max_mm_h" in cols else [] ),
*( [pl.col("max_mm_j").mean().alias("max_mean_mm_j")] if "max_mm_j" in cols else [] ),
])
elif stat_key == "date":
res = []
if "max_mm_h" in cols and "max_date_mm_h" in cols:
df_h = (
df.sort("max_mm_h", descending=True)
.group_by("NUM_POSTE")
.agg(pl.col("max_date_mm_h").first().alias("date_max_h"))
)
res.append(df_h)
if "max_mm_j" in cols and "max_date_mm_j" in cols:
df_j = (
df.sort("max_mm_j", descending=True)
.group_by("NUM_POSTE")
.agg(pl.col("max_date_mm_j").first().alias("date_max_j"))
)
res.append(df_j)
if not res:
raise ValueError("Aucune date de maximum disponible.")
elif len(res) == 1:
return res[0]
else:
return res[0].join(res[1], on="NUM_POSTE", how="outer")
elif stat_key == "month":
exprs = []
if "max_date_mm_h" in cols:
exprs.append(
pl.col("max_date_mm_h")
.str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S%.f", strict=False)
.dt.month()
.alias("mois_max_h")
)
if "max_date_mm_j" in cols:
exprs.append(
pl.col("max_date_mm_j")
.str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S%.f", strict=False)
.dt.month()
.alias("mois_max_j")
)
if not exprs:
raise ValueError("Aucune date de maximum pour extraire les mois.")
df = df.with_columns(exprs)
mois_h = mois_j = None
if "mois_max_h" in df.columns:
mois_h = (
df.drop_nulls("mois_max_h")
.group_by(["NUM_POSTE", "mois_max_h"])
.len()
.sort(["NUM_POSTE", "len"], descending=[False, True])
.unique(subset=["NUM_POSTE"])
.select(["NUM_POSTE", "mois_max_h"])
.rename({"mois_max_h": "mois_pluvieux_h"})
)
if "mois_max_j" in df.columns:
mois_j = (
df.drop_nulls("mois_max_j")
.group_by(["NUM_POSTE", "mois_max_j"])
.len()
.sort(["NUM_POSTE", "len"], descending=[False, True])
.unique(subset=["NUM_POSTE"])
.select(["NUM_POSTE", "mois_max_j"])
.rename({"mois_max_j": "mois_pluvieux_j"})
)
if mois_h is None and mois_j is None:
return pl.DataFrame(schema={"NUM_POSTE": pl.Int64, "mois_pluvieux_h": pl.Int32, "mois_pluvieux_j": pl.Int32})
elif mois_h is None:
return mois_j.with_columns([pl.lit(None, dtype=pl.Int32).alias("mois_pluvieux_h")])
elif mois_j is None:
return mois_h.with_columns([pl.lit(None, dtype=pl.Int32).alias("mois_pluvieux_j")])
else:
return mois_h.join(mois_j, on="NUM_POSTE", how="outer")
elif stat_key == "numday":
if "n_days_gt1mm" not in df.columns:
raise ValueError("Colonne `n_days_gt1mm` manquante.")
return (
df.group_by("NUM_POSTE")
.agg(pl.col("n_days_gt1mm").mean().alias("jours_pluie_moyen"))
)
else:
raise ValueError(f"Statistique inconnue : {stat_key}")
def generate_metrics(df: pl.DataFrame, x_label: str = "AROME", y_label: str = "Station"):
x = df[x_label].to_numpy()
y = df[y_label].to_numpy()
if len(x) != len(y):
st.error("Longueur x et y différente")
return np.nan, np.nan, np.nan, np.nan
# Filtrage des NaNs sur les deux colonnes
mask = ~np.isnan(x) & ~np.isnan(y)
x_valid = x[mask]
y_valid = y[mask]
if len(x_valid) == 0:
st.warning("Aucune donnée valide après suppression des NaN.")
return np.nan, np.nan, np.nan, np.nan
rmse = np.sqrt(mean_squared_error(y_valid, x_valid))
mae = mean_absolute_error(y_valid, x_valid)
me = np.mean(x_valid - y_valid)
corr = np.corrcoef(x_valid, y_valid)[0, 1] if len(x_valid) > 1 else np.nan
r2_corr = corr**2 if not np.isnan(corr) else np.nan
return me, mae, rmse, r2_corr