import numpy as np import polars as pl import streamlit as st from scipy.spatial import cKDTree from app.utils.config_utils import menu_config_statisticals def get_column_load(stat: str, scale: str): if stat == "mean": col = "mean_mm_h" elif stat == "max": col = f"max_{scale}" elif stat == "mean-max": col = f"max_{scale}" elif stat == "month": col = f"max_date_{scale}" elif stat == "numday": col = "n_days_gt1mm" else: raise ValueError(f"Stat '{stat}' is not recognized") return ["NUM_POSTE", col], col def load_season(year: int, season_key: str, base_path: str, col_to_load: str) -> pl.DataFrame: filename = f"{base_path}/{year:04d}/{season_key}.parquet" return pl.read_parquet(filename, columns=col_to_load) def load_data(type_data: str, echelle: str, min_year: int, max_year: int, season: str, col_to_load: str, config) -> pl.DataFrame: _, SEASON, _ = menu_config_statisticals() if season not in SEASON.values(): raise ValueError(f"Saison inconnue : {season}") base_path = f'{config["statisticals"][type_data]}/{echelle}' dataframes = [] errors = [] for year in range(min_year, max_year + 1): try: df = load_season(year, season, base_path, col_to_load) # Conversion explicite des colonnes dates uniquement si elles existent for col in ["max_date_mm_h", "max_date_mm_j"]: if col in df.columns: df = df.with_columns( pl.col(col) .cast(pl.Utf8) # s'assure qu'on peut parser avec str.strptime .str.strptime(pl.Datetime, format="%Y-%m-%d", strict=False) .cast(pl.Utf8) # retour sous forme de string (comme dans l'ancien code Pandas) ) # Ajout de la colonne year df = df.with_columns(pl.lit(year).alias("year")) dataframes.append(df) except Exception as e: errors.append(f"{year} ({season}) : {e}") if errors: for err in errors: st.warning(f"Erreur : {err}") if not dataframes: raise ValueError("Aucune donnée chargée.") return pl.concat(dataframes, how="vertical") def cleaning_data_observed( df: pl.DataFrame, len_serie: float = None, nan_limit: float = 0.10 ) -> pl.DataFrame: """ Filtre les maxima par deux critères : 1) on annule les valeurs d’une année si nan_ratio > nan_limit 2) on ne garde que les stations ayant au moins n années valides """ # ——— règles dépendant de l’échelle ——— if len_serie is None: raise ValueError('Paramètre len_serie à préciser') # Selection des saisons avec nan_limit au maximum df_filter = df.filter(pl.col("nan_ratio") <= nan_limit) # Calcul du nombre d'années valides par station NUM_POSTE station_counts = ( df_filter.group_by("NUM_POSTE") .agg(pl.col("year").n_unique().alias("num_years")) ) # Sélection des NUM_POSTE avec au moins len_serie d'années valides valid_stations = station_counts.filter(pl.col("num_years") >= len_serie) # Jointure pour ne garder que les stations valides df_final = df_filter.filter( pl.col("NUM_POSTE").is_in(valid_stations["NUM_POSTE"]) ) return df_final def dont_show_extreme( modelised: pl.DataFrame, observed: pl.DataFrame, column: str, quantile_choice: float, stat_choice_key: str = None ) -> tuple[pl.DataFrame, pl.DataFrame]: if stat_choice_key not in ("month", "date"): # 1) Calcul des quantiles q_mod = modelised.select( pl.col(column).quantile(quantile_choice, interpolation="nearest") ).item() if observed is None or observed.height == 0: seuil = q_mod else: q_obs = observed.select( pl.col(column).quantile(quantile_choice, interpolation="nearest") ).item() seuil = max(q_mod, q_obs) # 2) Saturation des couleurs clamp_expr = ( pl.when(pl.col(column).abs() > seuil) .then(pl.lit(seuil) * pl.col(column).sign()) .otherwise(pl.col(column)) .alias(column) ) # 3) Renvoi des tableaux modelised_show = modelised.with_columns(clamp_expr) observed_show = observed.with_columns(clamp_expr) else: modelised_show, observed_show = modelised, observed return modelised_show, observed_show def add_metadata(df: pl.DataFrame, scale: str, type: str) -> pl.DataFrame: echelle = 'horaire' if scale == 'mm_h' else 'quotidien' # Charger les metadonnées avec Polars df_meta = pl.read_csv(f"data/metadonnees/{type}/postes_{echelle}.csv") # Harmoniser les types des colonnes lat/lon des deux côtés df_meta = df_meta.with_columns([ pl.col("NUM_POSTE").cast(pl.Int32), pl.col("lat").cast(pl.Float32), pl.col("lon").cast(pl.Float32), pl.col("altitude").cast(pl.Int32) # altitude en entier ]) df = df.with_columns([ # forcer ici aussi pl.col("NUM_POSTE").cast(pl.Int32) ]) # Join sur NUM_POSTE return df.join(df_meta, on=["NUM_POSTE"], how="left") def find_matching_point(df_model: pl.DataFrame, lat_obs: float, lon_obs: float): df_model = df_model.with_columns([ ((pl.col("lat") - lat_obs) ** 2 + (pl.col("lon") - lon_obs) ** 2).sqrt().alias("dist") ]) closest_row = df_model.filter(pl.col("dist") == pl.col("dist").min()).select(["lat", "lon"]).row(0) return closest_row # (lat, lon) def match_and_compare( obs_df: pl.DataFrame, mod_df: pl.DataFrame, column_to_show: str, obs_vs_mod: pl.DataFrame = None ) -> pl.DataFrame: if obs_vs_mod is None: raise ValueError("obs_vs_mod must be provided with NUM_POSTE_obs and NUM_POSTE_mod columns") obs_vs_mod = obs_vs_mod.with_columns( pl.col("NUM_POSTE_obs").cast(pl.Int32) ).filter( pl.col("NUM_POSTE_obs").is_in(obs_df["NUM_POSTE"].cast(pl.Int32)) ) # Renommer temporairement pour le join obs = obs_df.rename({"NUM_POSTE": "NUM_POSTE_obs"}) mod = mod_df.rename({"NUM_POSTE": "NUM_POSTE_mod"}) obs = obs_df.with_columns( pl.col("NUM_POSTE").cast(pl.Int32) ).rename({"NUM_POSTE": "NUM_POSTE_obs"}) mod = mod_df.with_columns( pl.col("NUM_POSTE").cast(pl.Int32) ).rename({"NUM_POSTE": "NUM_POSTE_mod"}) obs_vs_mod = obs_vs_mod.with_columns( pl.col("NUM_POSTE_obs").cast(pl.Int32), pl.col("NUM_POSTE_mod").cast(pl.Int32) ) # Ajoute les valeurs observées et simulées en fonction des correspondances matched = ( obs_vs_mod .join(obs.select(["NUM_POSTE_obs", "lat", "lon", column_to_show]), on="NUM_POSTE_obs", how="left") .join(mod.select(["NUM_POSTE_mod", column_to_show]), on="NUM_POSTE_mod", how="left", suffix="_mod") .rename({column_to_show: "Station", f"{column_to_show}_mod": "AROME"}) ) matched = matched.select(["NUM_POSTE_obs", "lat", "lon", "NUM_POSTE_mod", "Station", "AROME"]).drop_nulls() return matched def standardize_year(year: float, min_year: int, max_year: int) -> float: """ Normalise une année `year` entre 0 et 1 avec une transformation min-max. """ return (year - min_year) / (max_year - min_year) def filter_nan(df: pl.DataFrame, columns: list[str]): return df.drop_nulls(subset=columns)