"""Assemble la logique metier finale de prediction et de recommandation. Le module combine un modele historique `P1` et un modele local `P2/P3` pour produire un rendement ajuste, une explication interpretable et un classement de cultures candidates. """ from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import json from pathlib import Path from typing import Any import joblib import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder from scripts.runtime_model_specs import ( HISTORICAL_RUNTIME_MODEL_SPEC, SIMULATION_RUNTIME_MODEL_SPEC, ) from scripts.simulation_dataset import load_normalized_simulation_dataset PROJECT_ROOT = Path(__file__).resolve().parents[1] HISTORICAL_WIDE_DATASET_PATH = PROJECT_ROOT / "artifacts/experiments/experience_1/dataset_consolide_historique_colonnes.csv" HISTORICAL_MODEL_PATH = HISTORICAL_RUNTIME_MODEL_SPEC.output_model_path HISTORICAL_METADATA_PATH = HISTORICAL_RUNTIME_MODEL_SPEC.output_metadata_path SIMULATION_DATASET_PATH = PROJECT_ROOT / "data/simulation/crop_yield.csv" SIMULATION_MODEL_PATH = SIMULATION_RUNTIME_MODEL_SPEC.output_model_path SIMULATION_METADATA_PATH = SIMULATION_RUNTIME_MODEL_SPEC.output_metadata_path SEED = 42 SIMULATION_SAMPLE_SIZE = 200_000 SIMULATION_FEATURE_COLUMNS = [ "region", "soil_type", "rainfall_mm", "temperature_celsius", "fertilizer_used", "irrigation_used", "weather_condition", "days_to_harvest", ] @dataclass class LoadedModel: """Couple simple contenant un pipeline charge et ses metadonnees.""" pipeline: Pipeline metadata: dict[str, Any] def _resolve_path(path: str | Path) -> Path: """Resout un chemin absolu ou relatif par rapport au depot.""" raw_path = Path(path) if raw_path.is_absolute(): return raw_path return PROJECT_ROOT / raw_path def _ensure_parent_dir(path: Path) -> None: """Cree le dossier parent d'un artefact si necessaire.""" path.parent.mkdir(parents=True, exist_ok=True) def _json_ready(value: Any) -> Any: """Convertit les types numpy et pandas en types JSON-compatibles.""" if isinstance(value, (np.floating, np.integer)): return value.item() if isinstance(value, np.ndarray): return value.tolist() if isinstance(value, pd.Timestamp): return value.isoformat() if isinstance(value, dict): return {str(key): _json_ready(item) for key, item in value.items()} if isinstance(value, list): return [_json_ready(item) for item in value] return value def _safe_float(value: Any) -> float: """Convertit de maniere defensive un scalaire potentiel en `float`.""" return float(np.asarray(value).reshape(-1)[0]) def _value_for_display(value: Any) -> Any: """Normalise une valeur pour l'affichage ou la serialisation.""" if pd.isna(value): return None if isinstance(value, (np.floating, np.integer)): return value.item() return value def make_dense_onehot_encoder() -> OneHotEncoder: """Construit un `OneHotEncoder` dense compatible avec plusieurs versions sklearn.""" try: return OneHotEncoder(handle_unknown="ignore", sparse_output=False) except TypeError: return OneHotEncoder(handle_unknown="ignore", sparse=False) def build_preprocessor(feature_frame: pd.DataFrame) -> ColumnTransformer: """Construit le preprocesseur commun aux modeles tabulaires. Args: feature_frame: Table de caracteristiques de reference. Returns: ColumnTransformer: Pipeline de pretraitement numerique et categoriel. """ numeric_features = feature_frame.select_dtypes(include=np.number).columns.tolist() categorical_features = [col for col in feature_frame.columns if col not in numeric_features] return ColumnTransformer( transformers=[ ( "num", Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ] ), numeric_features, ), ( "cat", Pipeline( steps=[ ("imputer", SimpleImputer(strategy="most_frequent")), ("encoder", make_dense_onehot_encoder()), ] ), categorical_features, ), ] ) def compute_regression_metrics(y_true: pd.Series | np.ndarray, y_pred: np.ndarray) -> dict[str, float]: """Calcule les metriques de regression standard du projet.""" y_true_array = np.asarray(y_true, dtype=float) y_pred_array = np.asarray(y_pred, dtype=float) rmse = float(np.sqrt(mean_squared_error(y_true_array, y_pred_array))) mae = float(mean_absolute_error(y_true_array, y_pred_array)) r2 = float(r2_score(y_true_array, y_pred_array)) if len(y_true_array) >= 2 else np.nan return { "rmse": rmse, "mae": mae, "r2": r2, } def normalize_label(value: Any) -> str: """Normalise une etiquette textuelle issue des datasets ou de l'API.""" return str(value).strip() def load_historical_wide_dataset(dataset_path: str | Path = HISTORICAL_WIDE_DATASET_PATH) -> pd.DataFrame: """Charge le dataset historique consolide utilise par la brique P1.""" path = _resolve_path(dataset_path) historical_df = pd.read_csv(path) historical_df["area"] = historical_df["area"].map(normalize_label) historical_df["crop"] = historical_df["crop"].map(normalize_label) return historical_df def load_historical_model( model_path: str | Path = HISTORICAL_MODEL_PATH, metadata_path: str | Path = HISTORICAL_METADATA_PATH, ) -> LoadedModel: """Charge le pipeline historique et ses metadonnees.""" resolved_model_path = _resolve_path(model_path) resolved_metadata_path = _resolve_path(metadata_path) if not resolved_model_path.exists(): raise FileNotFoundError(f"Historical model artifact not found: {resolved_model_path}") if not resolved_metadata_path.exists(): raise FileNotFoundError(f"Historical metadata artifact not found: {resolved_metadata_path}") pipeline = joblib.load(resolved_model_path) metadata = json.loads(resolved_metadata_path.read_text(encoding="utf-8")) return LoadedModel(pipeline=pipeline, metadata=metadata) def load_and_prepare_simulation_dataset( simulation_path: str | Path = SIMULATION_DATASET_PATH, ) -> pd.DataFrame: """Charge et normalise le dataset de simulation locale.""" return load_normalized_simulation_dataset( _resolve_path(simulation_path), boolean_dtype="bool", ) def _fit_simulation_pipeline( simulation_df: pd.DataFrame, feature_columns: list[str] | None = None, sample_size: int = SIMULATION_SAMPLE_SIZE, ) -> dict[str, Any]: """Entraine le modele lineaire local utilise pour P2 et P3.""" selected_features = feature_columns or SIMULATION_FEATURE_COLUMNS sampled_df = simulation_df.sample(n=min(sample_size, len(simulation_df)), random_state=SEED).copy() X_all = sampled_df[selected_features].copy() y_all = sampled_df["yield_tons_per_hectare"].copy() X_train, X_test, y_train, y_test = train_test_split( X_all, y_all, test_size=0.2, random_state=SEED, ) pipeline = Pipeline( steps=[ ("preprocessor", build_preprocessor(X_train)), ("regressor", LinearRegression()), ] ) pipeline.fit(X_train, y_train) train_metrics = compute_regression_metrics(y_train, pipeline.predict(X_train)) test_metrics = compute_regression_metrics(y_test, pipeline.predict(X_test)) final_pipeline = Pipeline( steps=[ ("preprocessor", build_preprocessor(X_all)), ("regressor", LinearRegression()), ] ) final_pipeline.fit(X_all, y_all) metadata = { "model_name": "linear_regression", "trained_at_utc": datetime.now(timezone.utc).isoformat(), "dataset_source": str(_resolve_path(SIMULATION_DATASET_PATH).relative_to(PROJECT_ROOT)), "feature_columns": selected_features, "sample_size": int(len(sampled_df)), "metrics": { "train_rmse": train_metrics["rmse"], "train_mae": train_metrics["mae"], "train_r2": train_metrics["r2"], "test_rmse": test_metrics["rmse"], "test_mae": test_metrics["mae"], "test_r2": test_metrics["r2"], }, "strategy": "2_models_3_predictions_combined", "role": "local_adjustment_model_for_P2_and_P3", } return { "pipeline": final_pipeline, "metadata": metadata, "sampled_df": sampled_df, } def load_or_train_simulation_model( *, force_retrain: bool = False, save_artifact: bool = True, simulation_path: str | Path = SIMULATION_DATASET_PATH, model_path: str | Path = SIMULATION_MODEL_PATH, metadata_path: str | Path = SIMULATION_METADATA_PATH, sample_size: int = SIMULATION_SAMPLE_SIZE, ) -> tuple[LoadedModel, pd.DataFrame]: """Charge ou regenere le modele local de simulation. Args: force_retrain: Force le reentrainement meme si les artefacts existent. save_artifact: Ecrit les artefacts sur disque si `True`. simulation_path: Source tabulaire du modele local. model_path: Chemin cible du pipeline serialize. metadata_path: Chemin cible des metadonnees JSON. sample_size: Taille maximale de l'echantillon d'entrainement. Returns: tuple[LoadedModel, pd.DataFrame]: Modele local et dataset normalise. """ resolved_model_path = _resolve_path(model_path) resolved_metadata_path = _resolve_path(metadata_path) simulation_df = load_and_prepare_simulation_dataset(simulation_path) if not force_retrain and resolved_model_path.exists() and resolved_metadata_path.exists(): loaded = LoadedModel( pipeline=joblib.load(resolved_model_path), metadata=json.loads(resolved_metadata_path.read_text(encoding="utf-8")), ) return loaded, simulation_df trained = _fit_simulation_pipeline(simulation_df, sample_size=sample_size) loaded = LoadedModel(pipeline=trained["pipeline"], metadata=trained["metadata"]) if save_artifact: _ensure_parent_dir(resolved_model_path) joblib.dump(loaded.pipeline, resolved_model_path) resolved_metadata_path.write_text( json.dumps(_json_ready(loaded.metadata), indent=2, ensure_ascii=True), encoding="utf-8", ) return loaded, simulation_df def infer_target_year_from_metadata_or_dataset( historical_metadata: dict[str, Any], historical_df: pd.DataFrame, ) -> int: """Determine l'annee cible du modele historique.""" target_year = historical_metadata.get("target_year") if target_year is not None: return int(target_year) target_columns = [col for col in historical_df.columns if col.startswith("target_yield_t_ha_")] available_years = sorted( int(col.rsplit("_", 1)[1]) for col in target_columns if historical_df[col].notna().any() ) if not available_years: raise ValueError("No usable historical target year found.") return int(max(available_years)) def latest_available_from_row(row: pd.Series, prefix: str, years: list[int]) -> tuple[float, int | None]: """Recupere la derniere valeur non nulle disponible pour une serie annuelle.""" for year in sorted(years, reverse=True): value = row.get(f"{prefix}_{year}", np.nan) if pd.notna(value): return float(value), year return np.nan, None def build_historical_reference_frame( historical_df: pd.DataFrame, *, target_year: int, ) -> pd.DataFrame: """Construit les reperes climatiques utilises comme reference locale.""" feature_years = [year for year in range(target_year) if year >= 0] rainfall_years = [ year for year in feature_years if f"average_rain_fall_mm_per_year_{year}" in historical_df.columns ] temperature_years = [year for year in feature_years if f"avg_temp_{year}" in historical_df.columns] reference_df = historical_df[["area", "crop"]].copy() reference_df[["reference_rainfall_mm", "reference_rainfall_year"]] = historical_df.apply( lambda row: pd.Series( latest_available_from_row(row, "average_rain_fall_mm_per_year", rainfall_years) ), axis=1, ) reference_df[["reference_temperature_celsius", "reference_temperature_year"]] = historical_df.apply( lambda row: pd.Series(latest_available_from_row(row, "avg_temp", temperature_years)), axis=1, ) crop_fallback_df = reference_df.groupby("crop").agg( crop_reference_rainfall_mm=("reference_rainfall_mm", "median"), crop_reference_temperature_celsius=("reference_temperature_celsius", "median"), ).reset_index() return reference_df.merge(crop_fallback_df, on="crop", how="left") def build_simulation_global_reference(simulation_df: pd.DataFrame) -> dict[str, Any]: """Construit un profil global median/modal pour le modele de simulation.""" return { "region": simulation_df["region"].mode().iloc[0], "soil_type": simulation_df["soil_type"].mode().iloc[0], "rainfall_mm": float(simulation_df["rainfall_mm"].median()), "temperature_celsius": float(simulation_df["temperature_celsius"].median()), "fertilizer_used": bool(simulation_df["fertilizer_used"].mode().iloc[0]), "irrigation_used": bool(simulation_df["irrigation_used"].mode().iloc[0]), "weather_condition": simulation_df["weather_condition"].mode().iloc[0], "days_to_harvest": float(simulation_df["days_to_harvest"].median()), } def build_reference_profile_from_row( row: pd.Series, *, simulation_global_reference: dict[str, Any], selected_simulation_features: list[str], overrides: dict[str, Any] | None = None, ) -> tuple[pd.DataFrame, dict[str, str]]: """Construit le profil de reference local pour un couple pays/culture. Returns: tuple[pd.DataFrame, dict[str, str]]: Profil pret pour l'inference et informations de provenance des references pluie/temperature. """ rainfall_source = ( "row_latest_history" if pd.notna(row["reference_rainfall_mm"]) else "crop_median" if pd.notna(row["crop_reference_rainfall_mm"]) else "simulation_global_default" ) temperature_source = ( "row_latest_history" if pd.notna(row["reference_temperature_celsius"]) else "crop_median" if pd.notna(row["crop_reference_temperature_celsius"]) else "simulation_global_default" ) profile = dict(simulation_global_reference) profile["rainfall_mm"] = ( float(row["reference_rainfall_mm"]) if pd.notna(row["reference_rainfall_mm"]) else float(row["crop_reference_rainfall_mm"]) if pd.notna(row["crop_reference_rainfall_mm"]) else float(simulation_global_reference["rainfall_mm"]) ) profile["temperature_celsius"] = ( float(row["reference_temperature_celsius"]) if pd.notna(row["reference_temperature_celsius"]) else float(row["crop_reference_temperature_celsius"]) if pd.notna(row["crop_reference_temperature_celsius"]) else float(simulation_global_reference["temperature_celsius"]) ) if overrides: profile.update(overrides) profile_df = pd.DataFrame([profile])[selected_simulation_features] return profile_df, { "rainfall_reference_source": rainfall_source, "temperature_reference_source": temperature_source, } class AdjustedYieldService: """Service metier principal expose a l'API et a l'interface Streamlit.""" def __init__( self, *, historical_dataset_path: str | Path = HISTORICAL_WIDE_DATASET_PATH, historical_model_path: str | Path = HISTORICAL_MODEL_PATH, historical_metadata_path: str | Path = HISTORICAL_METADATA_PATH, simulation_dataset_path: str | Path = SIMULATION_DATASET_PATH, simulation_model_path: str | Path = SIMULATION_MODEL_PATH, simulation_metadata_path: str | Path = SIMULATION_METADATA_PATH, force_retrain_simulation: bool = False, ) -> None: """Initialise les modeles, datasets et catalogues utiles au runtime.""" self.context = _load_prediction_context( historical_dataset_path=historical_dataset_path, historical_model_path=historical_model_path, historical_metadata_path=historical_metadata_path, simulation_dataset_path=simulation_dataset_path, simulation_model_path=simulation_model_path, simulation_metadata_path=simulation_metadata_path, force_retrain_simulation=force_retrain_simulation, ) self.historical_model = self.context["historical_model"] self.historical_metadata = self.context["historical_metadata"] self.historical_df = self.context["historical_df"] self.simulation_model = self.context["simulation_model"] self.simulation_metadata = self.context["simulation_metadata"] self.simulation_df = self.context["simulation_df"] self.simulation_global_reference = self.context["simulation_global_reference"] self.strategy_df = self.context["strategy_df"] self.target_year = int(self.context["target_year"]) self.selected_simulation_features = list(self.simulation_metadata["feature_columns"]) self.available_areas = sorted(self.strategy_df["area"].dropna().unique().tolist()) self.available_crops = sorted(self.strategy_df["crop"].dropna().unique().tolist()) self.crops_by_area = { area: sorted(area_df["crop"].dropna().unique().tolist()) for area, area_df in self.strategy_df.groupby("area") } self.simulation_options = { "regions": sorted(self.simulation_df["region"].dropna().unique().tolist()), "soil_types": sorted(self.simulation_df["soil_type"].dropna().unique().tolist()), "weather_conditions": sorted(self.simulation_df["weather_condition"].dropna().unique().tolist()), } self._historical_shap_state: dict[str, Any] | None = None def _sanitize_overrides(self, overrides: dict[str, Any] | None = None) -> dict[str, Any]: """Nettoie les surcharges de conditions avant utilisation.""" if not overrides: return {} cleaned: dict[str, Any] = {} for key, value in overrides.items(): if value is None: continue if key in {"region", "soil_type", "weather_condition"}: cleaned[key] = normalize_label(value) continue if key in {"fertilizer_used", "irrigation_used"}: cleaned[key] = bool(value) continue cleaned[key] = float(value) if key in {"rainfall_mm", "temperature_celsius", "days_to_harvest"} else value return cleaned def _get_row(self, area: str, crop: str) -> pd.Series: """Recupere la ligne historique unique correspondant au couple demande.""" return _get_area_crop_row(self.strategy_df, area=area, crop=crop) def _predict_p1(self, row: pd.Series) -> float: """Calcule la prediction historique P1 a partir d'une ligne consolidee.""" return _predict_p1_from_row(row, self.historical_model, self.historical_metadata) def _map_transformed_feature_to_raw_feature( self, transformed_feature_name: str, raw_feature_names: list[str], ) -> str: """Ramene un nom de feature transformee vers sa variable brute d'origine.""" candidates = [transformed_feature_name] if "__" in transformed_feature_name: parts = transformed_feature_name.split("__") candidates.extend("__".join(parts[index:]) for index in range(1, len(parts))) for candidate in candidates: for raw_feature in sorted(raw_feature_names, key=len, reverse=True): if candidate == raw_feature or candidate.startswith(f"{raw_feature}_"): return raw_feature return transformed_feature_name def _aggregate_transformed_contributions( self, *, transformed_feature_names: list[str], contribution_values: np.ndarray, raw_feature_names: list[str], ) -> dict[str, float]: """Agrege les contributions encodees par modalite au niveau variable brute.""" aggregated: dict[str, float] = {} for transformed_feature_name, contribution_value in zip(transformed_feature_names, contribution_values): raw_feature_name = self._map_transformed_feature_to_raw_feature( transformed_feature_name, raw_feature_names, ) aggregated[raw_feature_name] = aggregated.get(raw_feature_name, 0.0) + float(contribution_value) return aggregated def _ensure_historical_shap_state(self) -> dict[str, Any]: """Initialise a la demande l'etat SHAP du modele historique.""" if self._historical_shap_state is not None: return self._historical_shap_state try: import shap # type: ignore except ModuleNotFoundError: self._historical_shap_state = { "available": False, "status": "missing_dependency", "message": "Le package shap n'est pas installe dans l'environnement courant.", } return self._historical_shap_state preprocessor = self.historical_model.named_steps["preprocessor"] regressor = self.historical_model.named_steps["regressor"] raw_feature_names = list(self.historical_metadata["feature_columns"]) background_df = self.historical_df[raw_feature_names].sample( n=min(200, len(self.historical_df)), random_state=SEED, ) background_matrix = preprocessor.transform(background_df) transformed_feature_names = list(preprocessor.get_feature_names_out()) try: explainer = shap.Explainer( regressor, background_matrix, feature_names=transformed_feature_names, ) except Exception as exc: # pragma: no cover - defensive fallback self._historical_shap_state = { "available": False, "status": "explainer_initialization_failed", "message": f"Impossible d'initialiser SHAP sur le modele historique : {exc}", } return self._historical_shap_state self._historical_shap_state = { "available": True, "status": "ok", "message": None, "explainer": explainer, "preprocessor": preprocessor, "transformed_feature_names": transformed_feature_names, "raw_feature_names": raw_feature_names, } return self._historical_shap_state def _explain_historical_prediction( self, *, row: pd.Series, p1_prediction: float, top_n: int = 10, ) -> dict[str, Any]: """Produit l'explication SHAP agregee de la prediction P1.""" shap_state = self._ensure_historical_shap_state() if not shap_state["available"]: return { "available": False, "status": shap_state["status"], "message": shap_state["message"], "model_prediction": p1_prediction, "base_value": None, "prediction_from_shap": None, "top_contributions": [], } raw_feature_names = shap_state["raw_feature_names"] feature_frame = pd.DataFrame([row[raw_feature_names].to_dict()])[raw_feature_names] transformed_row = shap_state["preprocessor"].transform(feature_frame) shap_values = shap_state["explainer"](transformed_row) contribution_vector = np.asarray(shap_values.values)[0] aggregated_contributions = self._aggregate_transformed_contributions( transformed_feature_names=shap_state["transformed_feature_names"], contribution_values=contribution_vector, raw_feature_names=raw_feature_names, ) top_contributions = [ { "feature": raw_feature_name, "raw_value": _value_for_display(row.get(raw_feature_name)), "contribution": float(contribution), "abs_contribution": abs(float(contribution)), } for raw_feature_name, contribution in sorted( aggregated_contributions.items(), key=lambda item: abs(item[1]), reverse=True, )[:top_n] ] base_value = _safe_float(shap_values.base_values) prediction_from_shap = float(base_value + contribution_vector.sum()) return { "available": True, "status": "ok", "message": None, "model_prediction": p1_prediction, "base_value": base_value, "prediction_from_shap": prediction_from_shap, "top_contributions": top_contributions, } def _explain_local_adjustment( self, *, reference_profile: pd.DataFrame, user_profile: pd.DataFrame, p2_prediction: float, p3_prediction: float, top_n: int = 10, ) -> dict[str, Any]: """Decompose lineairement l'ajustement local applique entre P2 et P3.""" preprocessor = self.simulation_model.named_steps["preprocessor"] regressor = self.simulation_model.named_steps["regressor"] transformed_feature_names = list(preprocessor.get_feature_names_out()) reference_vector = np.asarray(preprocessor.transform(reference_profile))[0] user_vector = np.asarray(preprocessor.transform(user_profile))[0] delta_vector = user_vector - reference_vector coefficient_vector = np.asarray(regressor.coef_).reshape(-1) contribution_vector = delta_vector * coefficient_vector aggregated_contributions = self._aggregate_transformed_contributions( transformed_feature_names=transformed_feature_names, contribution_values=contribution_vector, raw_feature_names=self.selected_simulation_features, ) reference_row = reference_profile.iloc[0].to_dict() user_row = user_profile.iloc[0].to_dict() top_contributions = [ { "feature": raw_feature_name, "reference_value": _value_for_display(reference_row.get(raw_feature_name)), "user_value": _value_for_display(user_row.get(raw_feature_name)), "contribution_delta": float(contribution), "abs_contribution_delta": abs(float(contribution)), } for raw_feature_name, contribution in sorted( aggregated_contributions.items(), key=lambda item: abs(item[1]), reverse=True, )[:top_n] ] return { "method": "exact_linear_delta_decomposition", "reference_prediction": p2_prediction, "user_prediction": p3_prediction, "total_adjustment": float(p3_prediction - p2_prediction), "top_contributions": top_contributions, } def get_reference_profile( self, area: str, crop: str, *, reference_overrides: dict[str, Any] | None = None, ) -> dict[str, Any]: """Retourne le profil de reference local pour un pays et une culture.""" row = self._get_row(area, crop) normalized_reference_overrides = self._sanitize_overrides(reference_overrides) reference_profile, reference_sources = build_reference_profile_from_row( row, simulation_global_reference=self.simulation_global_reference, selected_simulation_features=self.selected_simulation_features, overrides=normalized_reference_overrides, ) return { "country": normalize_label(area), "crop": normalize_label(crop), "reference_profile": reference_profile.iloc[0].to_dict(), **reference_sources, } def get_baseline( self, area: str, crop: str, *, reference_overrides: dict[str, Any] | None = None, ) -> dict[str, Any]: """Retourne la prediction historique de base et son profil de reference.""" row = self._get_row(area, crop) reference_payload = self.get_reference_profile( area, crop, reference_overrides=reference_overrides, ) p1 = self._predict_p1(row) return { "country": normalize_label(area), "crop": normalize_label(crop), "target_year": self.target_year, "p1_historical_prediction": p1, "reference_profile": reference_payload["reference_profile"], "rainfall_reference_source": reference_payload["rainfall_reference_source"], "temperature_reference_source": reference_payload["temperature_reference_source"], } def predict_adjusted_yield( self, area: str, crop: str, user_conditions: dict[str, Any], *, reference_overrides: dict[str, Any] | None = None, ) -> dict[str, Any]: """Calcule le rendement final ajuste pour une culture donnee. Args: area: Pays ou zone retenue. crop: Culture cible. user_conditions: Conditions locales saisies par l'utilisateur. reference_overrides: Surcharges appliquees au profil de reference. Returns: dict[str, Any]: Detail complet des composantes P1, P2, P3 et des explications associees. """ row = self._get_row(area, crop) normalized_reference_overrides = self._sanitize_overrides(reference_overrides) normalized_user_conditions = self._sanitize_overrides(user_conditions) merged_user_overrides = { **normalized_reference_overrides, **normalized_user_conditions, } reference_profile, reference_sources = build_reference_profile_from_row( row, simulation_global_reference=self.simulation_global_reference, selected_simulation_features=self.selected_simulation_features, overrides=normalized_reference_overrides, ) user_profile, _ = build_reference_profile_from_row( row, simulation_global_reference=self.simulation_global_reference, selected_simulation_features=self.selected_simulation_features, overrides=merged_user_overrides, ) p1 = self._predict_p1(row) p2 = float(self.simulation_model.predict(reference_profile)[0]) p3 = float(self.simulation_model.predict(user_profile)[0]) local_adjustment = float(p3 - p2) final_prediction = float(max(p1 + local_adjustment, 0.0)) gap_vs_historical_pct = float(local_adjustment / p1 * 100.0) if p1 != 0 else 0.0 explanation = { "historical_shap": self._explain_historical_prediction( row=row, p1_prediction=p1, ), "local_adjustment": self._explain_local_adjustment( reference_profile=reference_profile, user_profile=user_profile, p2_prediction=p2, p3_prediction=p3, ), } return { "country": normalize_label(area), "crop": normalize_label(crop), "p1_historical_prediction": p1, "p2_reference_simulation": p2, "p3_user_simulation": p3, "local_adjustment": local_adjustment, "gap_vs_historical_pct": gap_vs_historical_pct, "final_prediction": final_prediction, "reference_profile": reference_profile.iloc[0].to_dict(), "user_profile": user_profile.iloc[0].to_dict(), "explanation": explanation, **reference_sources, } def recommend_crops( self, area: str, user_conditions: dict[str, Any], candidate_crops: list[str] | None = None, *, reference_overrides: dict[str, Any] | None = None, ) -> pd.DataFrame: """Classe les cultures candidates pour un pays et des conditions locales.""" normalized_area = normalize_label(area) area_rows = self.strategy_df.loc[self.strategy_df["area"] == normalized_area].copy() if area_rows.empty: raise ValueError(f"No historical rows found for area={normalized_area!r}.") if candidate_crops: normalized_candidates = {normalize_label(crop) for crop in candidate_crops if normalize_label(crop)} area_rows = area_rows.loc[area_rows["crop"].isin(normalized_candidates)].copy() if area_rows.empty: raise ValueError(f"No matching crop found for area={normalized_area!r} and provided candidates.") normalized_reference_overrides = self._sanitize_overrides(reference_overrides) normalized_user_conditions = self._sanitize_overrides(user_conditions) merged_user_overrides = { **normalized_reference_overrides, **normalized_user_conditions, } recommendation_rows = [] for _, row in area_rows.sort_values("crop").iterrows(): reference_profile, reference_sources = build_reference_profile_from_row( row, simulation_global_reference=self.simulation_global_reference, selected_simulation_features=self.selected_simulation_features, overrides=normalized_reference_overrides, ) user_profile, _ = build_reference_profile_from_row( row, simulation_global_reference=self.simulation_global_reference, selected_simulation_features=self.selected_simulation_features, overrides=merged_user_overrides, ) p1 = self._predict_p1(row) p2 = float(self.simulation_model.predict(reference_profile)[0]) p3 = float(self.simulation_model.predict(user_profile)[0]) local_adjustment = float(p3 - p2) final_prediction = float(max(p1 + local_adjustment, 0.0)) gap_vs_historical_pct = float(local_adjustment / p1 * 100.0) if p1 != 0 else 0.0 recommendation_rows.append( { "country": normalized_area, "crop": row["crop"], "p1_historical_prediction": p1, "p2_reference_simulation": p2, "p3_user_simulation": p3, "local_adjustment": local_adjustment, "gap_vs_historical_pct": gap_vs_historical_pct, "final_prediction": final_prediction, "rainfall_reference_source": reference_sources["rainfall_reference_source"], "temperature_reference_source": reference_sources["temperature_reference_source"], } ) recommendation_df = ( pd.DataFrame(recommendation_rows) .sort_values(["final_prediction", "p1_historical_prediction"], ascending=[False, False]) .reset_index(drop=True) ) recommendation_df["recommendation_rank"] = np.arange(1, len(recommendation_df) + 1) ordered_columns = [ "country", "crop", "p1_historical_prediction", "p2_reference_simulation", "p3_user_simulation", "local_adjustment", "gap_vs_historical_pct", "final_prediction", "recommendation_rank", "rainfall_reference_source", "temperature_reference_source", ] return recommendation_df[ordered_columns] def _load_prediction_context( *, historical_dataset_path: str | Path = HISTORICAL_WIDE_DATASET_PATH, historical_model_path: str | Path = HISTORICAL_MODEL_PATH, historical_metadata_path: str | Path = HISTORICAL_METADATA_PATH, simulation_dataset_path: str | Path = SIMULATION_DATASET_PATH, simulation_model_path: str | Path = SIMULATION_MODEL_PATH, simulation_metadata_path: str | Path = SIMULATION_METADATA_PATH, force_retrain_simulation: bool = False, ) -> dict[str, Any]: """Charge l'ensemble des briques necessaires au runtime final.""" historical_loaded = load_historical_model( model_path=historical_model_path, metadata_path=historical_metadata_path, ) historical_df = load_historical_wide_dataset(historical_dataset_path) simulation_loaded, simulation_df = load_or_train_simulation_model( force_retrain=force_retrain_simulation, simulation_path=simulation_dataset_path, model_path=simulation_model_path, metadata_path=simulation_metadata_path, ) target_year = infer_target_year_from_metadata_or_dataset(historical_loaded.metadata, historical_df) reference_df = build_historical_reference_frame(historical_df, target_year=target_year) strategy_df = historical_df.merge(reference_df, on=["area", "crop"], how="left") return { "historical_model": historical_loaded.pipeline, "historical_metadata": historical_loaded.metadata, "historical_df": historical_df, "simulation_model": simulation_loaded.pipeline, "simulation_metadata": simulation_loaded.metadata, "simulation_df": simulation_df, "simulation_global_reference": build_simulation_global_reference(simulation_df), "strategy_df": strategy_df, "target_year": target_year, } def _predict_p1_from_row( row: pd.Series, historical_model: Pipeline, historical_metadata: dict[str, Any], ) -> float: """Projette une ligne historique consolidee dans le modele P1.""" feature_columns = historical_metadata["feature_columns"] feature_frame = pd.DataFrame([row[feature_columns].to_dict()])[feature_columns] return float(historical_model.predict(feature_frame)[0]) def _get_area_crop_row(strategy_df: pd.DataFrame, area: str, crop: str) -> pd.Series: """Retourne la ligne unique correspondant a un couple pays/culture.""" normalized_area = normalize_label(area) normalized_crop = normalize_label(crop) filtered = strategy_df.loc[ (strategy_df["area"] == normalized_area) & (strategy_df["crop"] == normalized_crop) ].copy() if filtered.empty: raise ValueError(f"No historical row found for area={normalized_area!r}, crop={normalized_crop!r}.") if len(filtered) > 1: raise ValueError(f"Multiple historical rows found for area={normalized_area!r}, crop={normalized_crop!r}.") return filtered.iloc[0] def predict_adjusted_yield( *, area: str, crop: str, user_conditions: dict[str, Any], historical_dataset_path: str | Path = HISTORICAL_WIDE_DATASET_PATH, historical_model_path: str | Path = HISTORICAL_MODEL_PATH, historical_metadata_path: str | Path = HISTORICAL_METADATA_PATH, simulation_dataset_path: str | Path = SIMULATION_DATASET_PATH, simulation_model_path: str | Path = SIMULATION_MODEL_PATH, simulation_metadata_path: str | Path = SIMULATION_METADATA_PATH, force_retrain_simulation: bool = False, ) -> dict[str, Any]: """Helper procedural pour calculer un rendement ajuste sans gerer le service.""" service = AdjustedYieldService( historical_dataset_path=historical_dataset_path, historical_model_path=historical_model_path, historical_metadata_path=historical_metadata_path, simulation_dataset_path=simulation_dataset_path, simulation_model_path=simulation_model_path, simulation_metadata_path=simulation_metadata_path, force_retrain_simulation=force_retrain_simulation, ) return service.predict_adjusted_yield(area=area, crop=crop, user_conditions=user_conditions) def recommend_crops( *, area: str, user_conditions: dict[str, Any], historical_dataset_path: str | Path = HISTORICAL_WIDE_DATASET_PATH, historical_model_path: str | Path = HISTORICAL_MODEL_PATH, historical_metadata_path: str | Path = HISTORICAL_METADATA_PATH, simulation_dataset_path: str | Path = SIMULATION_DATASET_PATH, simulation_model_path: str | Path = SIMULATION_MODEL_PATH, simulation_metadata_path: str | Path = SIMULATION_METADATA_PATH, force_retrain_simulation: bool = False, ) -> pd.DataFrame: """Helper procedural pour classer les cultures candidates.""" service = AdjustedYieldService( historical_dataset_path=historical_dataset_path, historical_model_path=historical_model_path, historical_metadata_path=historical_metadata_path, simulation_dataset_path=simulation_dataset_path, simulation_model_path=simulation_model_path, simulation_metadata_path=simulation_metadata_path, force_retrain_simulation=force_retrain_simulation, ) return service.recommend_crops(area=area, user_conditions=user_conditions)