| import mlflow
|
|
|
| import json
|
| import numpy as np
|
| import pandas as pd
|
|
|
| from sklearn import linear_model
|
| from sklearn.ensemble import RandomForestRegressor
|
| from statsmodels.tsa.api import VAR
|
| from xgboost import XGBRegressor
|
|
|
|
|
|
|
|
|
| def evaluate_imputation_mlflow(df_orig, imputation_function, imputation_params, method_name=None, sample_frac=0.3, random_state=42):
|
| df_orig_data_out = df_orig.copy().asfreq('h').sort_index()
|
|
|
|
|
| np.random.seed(random_state)
|
| mask = pd.DataFrame(False, index=df_orig_data_out.index, columns=df_orig_data_out.columns)
|
| for col in df_orig_data_out.columns:
|
| n_samples = int(sample_frac * len(df_orig_data_out))
|
| mask_indices = np.random.choice(df_orig_data_out.index, size=n_samples, replace=False)
|
| mask.loc[mask_indices, col] = True
|
|
|
| df_masked = df_orig_data_out.mask(mask)
|
|
|
|
|
| df_imputed = imputation_function(df_masked, **imputation_params)
|
|
|
|
|
| rmse_values = ((df_imputed[mask] - df_orig_data_out[mask])**2).mean().pow(0.5)
|
| rmse_df = pd.DataFrame(rmse_values).rename(columns={0: 'RMSE'}).sort_index()
|
|
|
| nrmse_percent = {}
|
| for col in df_orig_data_out.columns:
|
| col_range = df_orig_data_out[col].max() - df_orig_data_out[col].min()
|
| nrmse_percent[col] = (rmse_values[col] / col_range) * 100
|
| nrmse_df = pd.DataFrame.from_dict(nrmse_percent, orient='index', columns=['NRMSE'])
|
|
|
|
|
| rmse_temp = rmse_df.loc[rmse_df.index.str.contains('Temperature'), 'RMSE'].mean()
|
| rmse_prec = rmse_df.loc[rmse_df.index.str.contains('Precipitation'), 'RMSE'].mean()
|
| overall_rmse = rmse_df['RMSE'].mean()
|
| nrmse_temp = nrmse_df.loc[nrmse_df.index.str.contains('Temperature'), 'NRMSE'].mean()
|
| nrmse_prec = nrmse_df.loc[nrmse_df.index.str.contains('Precipitation'), 'NRMSE'].mean()
|
| overall_nrmse = nrmse_df['NRMSE'].mean()
|
|
|
|
|
| if method_name is not None:
|
| with mlflow.start_run(run_name=method_name):
|
| mlflow.log_param("method", method_name)
|
| mlflow.log_param("sample_frac", sample_frac)
|
| for k, v in imputation_params.items():
|
| if isinstance(v, (dict, list)):
|
| mlflow.log_param(k, json.dumps(v))
|
| else:
|
| mlflow.log_param(k, v)
|
| for var, rmse in rmse_df['RMSE'].items():
|
| mlflow.log_metric(f"rmse_{var}", rmse)
|
|
|
| mlflow.log_metric("nrmse_temperature", round(nrmse_temp, 3))
|
| mlflow.log_metric("nrmse_precipitation", round(nrmse_prec, 3))
|
| mlflow.log_metric("nrmse_overall", round(overall_nrmse, 3))
|
|
|
|
|
| metrics_df = rmse_df.join(nrmse_df)
|
| aggregates = pd.DataFrame({
|
| 'RMSE': [rmse_temp, rmse_prec, overall_rmse],
|
| 'NRMSE': [nrmse_temp, nrmse_prec, overall_nrmse]
|
| }, index=['Temperature', 'Precipitation', 'Overall'])
|
| metrics_df = pd.concat([metrics_df, aggregates]).round(3)
|
|
|
| return metrics_df
|
|
|
|
|
|
|
|
|
| def imputation_linear_naive(df):
|
| """
|
| Imputes missing values using simple linear interpolation.
|
|
|
| Parameters:
|
| df (pd.DataFrame): DataFrame with possible missing values.
|
|
|
| Returns:
|
| pd.DataFrame: Imputed DataFrame.
|
| """
|
| df_out = df.copy().asfreq('h')
|
|
|
|
|
| df_out = df_out.interpolate(method='linear').bfill()
|
|
|
|
|
| prec_cols = [col for col in df_out.columns if col.lower().startswith('precipitation')]
|
| df_out[prec_cols] = df_out[prec_cols].clip(lower=0)
|
| df_out = df_out.round(1)
|
|
|
| return df_out
|
|
|
|
|
| def imputation_linear_openmeteo(df, df_hist, column_pairs, lags=None):
|
| """
|
| Imputes missing values using linear regression with historical data from Open-Meteo.
|
|
|
| Parameters:
|
| df (pd.DataFrame): DataFrame with missing values.
|
| df_hist (pd.DataFrame): Historical dataset without NaNs.
|
| column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}.
|
| lags (int or None): Maximum lag to use for historical variables. If None, no lag is used.
|
|
|
| Returns:
|
| pd.DataFrame: Imputed DataFrame.
|
| """
|
| target_cols = df.columns.tolist()
|
| df_out = df.copy().asfreq('h')
|
|
|
| mask_nan_midnight = df_out.index.hour == 0
|
| mask_nan_midnight &= df_out.isna().all(axis=1)
|
|
|
| df_prev = df_out.shift(1)
|
| df_next = df_out.shift(-1)
|
| df_mean = (df_prev + df_next) / 2
|
|
|
| df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight])
|
|
|
|
|
| df_hist_lagged = pd.DataFrame(index=df_out.index)
|
| for col in df_hist.columns:
|
| df_hist_lagged[col] = df_hist[col]
|
|
|
| if lags is not None and lags > 0:
|
| for lag in range(1, lags + 1):
|
| df_hist_lagged[f"{col}_lag{lag}"] = df_hist[col].shift(lag)
|
|
|
|
|
| nan_counts = df_out.isna().sum().sort_values()
|
| ordered_targets = [col for col in nan_counts.index if col in column_pairs]
|
|
|
|
|
| for target_col in ordered_targets:
|
| feature_cols = column_pairs[target_col]
|
| feature_cols_all = feature_cols.copy()
|
| if lags is not None and lags > 0:
|
| feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)]
|
|
|
| df_joint = df_out[[target_col]].join(df_hist_lagged[feature_cols_all], how='left')
|
|
|
|
|
| reg = linear_model.LinearRegression()
|
| reg.fit(
|
| df_joint.dropna()[feature_cols_all].values,
|
| df_joint.dropna()[target_col].values
|
| )
|
|
|
|
|
| missing_idx = df_joint[target_col].isna()
|
| if missing_idx.any():
|
| new_values = np.matmul(df_joint.loc[missing_idx, feature_cols_all].values, reg.coef_) + reg.intercept_
|
| df_out.loc[missing_idx, target_col] = new_values
|
|
|
|
|
| prec_cols = [col for col in df_out.columns if col.startswith('Precipitation')]
|
| df_out[prec_cols] = df_out[prec_cols].clip(lower=0)
|
| df_out = df_out.round(1)
|
|
|
| df_out = df_out[target_cols]
|
|
|
| return df_out
|
|
|
|
|
| def select_var_lag(df, maxlags=48, lag_criterion='bic'):
|
| """
|
| Selects the best lag for a VAR model based on the chosen criterion.
|
|
|
| Parameters:
|
| df (pd.DataFrame): DataFrame with possible missing values.
|
| maxlags (int): Maximum number of lags to consider.
|
| lag_criterion (str): Selection criterion ('aic', 'bic', 'hqic', 'fpe').
|
|
|
| Returns:
|
| int: Selected lag order.
|
| """
|
|
|
| df_tmp = df.asfreq('h').interpolate(method='linear').bfill()
|
|
|
|
|
| model = VAR(df_tmp)
|
| lag_order = model.select_order(maxlags=maxlags)
|
| selected_lag = lag_order.selected_orders[lag_criterion]
|
|
|
| return selected_lag
|
|
|
|
|
| def imputation_var(df, df_hist, lag):
|
| """
|
| Imputes missing values using a VAR model with a specified lag order.
|
|
|
| Parameters:
|
| df (pd.DataFrame): DataFrame with missing values.
|
| lag (int): Lag order for the VAR model.
|
|
|
| Returns:
|
| pd.DataFrame: DataFrame with imputed values.
|
| """
|
| target_cols = df.columns.tolist()
|
| exog_cols = df_hist.columns.tolist()
|
|
|
| df_combined = df.asfreq('h').join(df_hist, how='left')
|
| exog_vars = df_combined[exog_cols]
|
|
|
|
|
| df_tmp = df_combined.asfreq('h').interpolate(method='linear').bfill()
|
|
|
|
|
| model = VAR(df_tmp, exog=exog_vars)
|
| var_model = model.fit(lag)
|
|
|
|
|
| original_index = df_tmp.index
|
| data_array = df_tmp.to_numpy()
|
| mask_nan = np.isnan(df.to_numpy())
|
|
|
| i = var_model.k_ar
|
| while i < len(df):
|
| if mask_nan[i].any():
|
|
|
| j = i
|
| while j < len(df) and mask_nan[j].any():
|
| j += 1
|
| n_steps = j - i
|
|
|
|
|
| input_data = data_array[i - var_model.k_ar:i]
|
| forecast = var_model.forecast(y=input_data, steps=n_steps, exog_future=exog_vars.iloc[i:i + n_steps])
|
|
|
|
|
| for k in range(n_steps):
|
| nan_cols_idx = np.where(mask_nan[i + k])[0]
|
| data_array[i + k, nan_cols_idx] = forecast[k, nan_cols_idx]
|
|
|
| i = j
|
| else:
|
| i += 1
|
|
|
|
|
| df_out = pd.DataFrame(data_array, columns=df_tmp.columns, index=original_index)
|
|
|
|
|
| prec_cols = [col for col in df_out.columns if col.startswith('Precipitation')]
|
| df_out[prec_cols] = df_out[prec_cols].clip(lower=0)
|
| df_out = df_out.round(1)
|
|
|
| df_out = df_out[target_cols]
|
|
|
| return df_out
|
|
|
|
|
| def imputation_rf_openmeteo(df, df_hist, column_pairs, lags=None, autolag=False, n_estimators=100, max_depth=None, min_samples_split=2):
|
| """
|
| Imputes missing values using Random Forest regression with historical data from Open-Meteo.
|
|
|
| Parameters:
|
| df (pd.DataFrame): DataFrame with missing values.
|
| df_hist (pd.DataFrame): Historical dataset without NaNs.
|
| column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}.
|
| lags (int or None): Maximum lag to use for historical variables. If None, no lag is used.
|
| autolag (bool): If True, adds lagged versions of the target variable as features.
|
| n_estimators (int): Number of trees in the Random Forest.
|
|
|
| Returns:
|
| pd.DataFrame: Imputed DataFrame.
|
| """
|
| target_cols = df.columns.tolist()
|
| df_out = df.copy().asfreq('h')
|
|
|
| mask_nan_midnight = df_out.index.hour == 0
|
| mask_nan_midnight &= df_out.isna().all(axis=1)
|
|
|
| df_prev = df_out.shift(1)
|
| df_next = df_out.shift(-1)
|
| df_mean = (df_prev + df_next) / 2
|
|
|
| df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight])
|
|
|
| df_full = df_out.join(df_hist, how='left')
|
|
|
|
|
| df_full_lagged = pd.DataFrame(index=df_out.index)
|
| for col in df_full.columns:
|
| df_full_lagged[col] = df_full[col]
|
| if lags is not None and lags > 0:
|
| for lag in range(1, lags + 1):
|
| df_full_lagged[f"{col}_lag{lag}"] = df_full[col].shift(lag)
|
|
|
|
|
| nan_counts = df_out.isna().sum().sort_values(ascending=False)
|
| ordered_targets = [col for col in nan_counts.index if col in column_pairs]
|
|
|
|
|
| for target_col in ordered_targets:
|
| feature_cols = column_pairs[target_col]
|
| feature_cols_all = feature_cols.copy()
|
| if lags is not None and lags > 0:
|
| feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)]
|
|
|
|
|
| if autolag and lags is not None and lags > 0:
|
| feature_cols_all += [f"{target_col}_lag{l}" for l in range(1, lags + 1)]
|
|
|
| df_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), feature_cols_all]
|
| y_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), target_col]
|
| df_pred = df_full_lagged.loc[df_full_lagged[target_col].isna(), feature_cols_all]
|
|
|
|
|
| rf = RandomForestRegressor(
|
| n_estimators=n_estimators,
|
| max_depth=max_depth,
|
| min_samples_split=min_samples_split,
|
| random_state=42,
|
| n_jobs=-1
|
| )
|
|
|
| rf.fit(df_train, y_train)
|
| df_out.loc[df_full_lagged[target_col].isna(), target_col] = rf.predict(df_pred)
|
|
|
| df_out = df_out.round(1)
|
| df_out = df_out[target_cols]
|
|
|
| return df_out
|
|
|
|
|
| def imputation_xgb_openmeteo(df, df_hist, column_pairs, lags=None, autolag=False, n_estimators=100, max_depth=6, learning_rate=0.1, subsample=1.0):
|
| """
|
| Imputes missing values using XGBoost regression with historical data from Open-Meteo.
|
|
|
| Parameters:
|
| df (pd.DataFrame): DataFrame with missing values.
|
| df_hist (pd.DataFrame): Historical dataset without NaNs.
|
| column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}.
|
| lags (int or None): Maximum lag to use for historical variables. If None, no lag is used.
|
| autolag (bool): If True, adds lagged versions of the target variable as features.
|
| n_estimators (int): Number of boosting rounds (trees).
|
| max_depth (int): Maximum tree depth for base learners.
|
| learning_rate (float): Boosting learning rate (xgb parameter 'eta').
|
| subsample (float): Subsample ratio of the training instance.
|
|
|
| Returns:
|
| pd.DataFrame: Imputed DataFrame.
|
| """
|
| target_cols = df.columns.tolist()
|
| df_out = df.copy().asfreq('h')
|
|
|
| mask_nan_midnight = df_out.index.hour == 0
|
| mask_nan_midnight &= df_out.isna().all(axis=1)
|
|
|
| df_prev = df_out.shift(1)
|
| df_next = df_out.shift(-1)
|
| df_mean = (df_prev + df_next) / 2
|
|
|
| df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight])
|
|
|
| df_full = df_out.join(df_hist, how='left')
|
|
|
|
|
| lagged_cols = {}
|
| for col in df_full.columns:
|
| lagged_cols[col] = df_full[col]
|
| if lags is not None and lags > 0:
|
| for lag in range(1, lags + 1):
|
| lagged_cols[f"{col}_lag{lag}"] = df_full[col].shift(lag)
|
|
|
| df_full_lagged = pd.concat(lagged_cols, axis=1)
|
| df_full_lagged.index = df_out.index
|
|
|
|
|
| nan_counts = df_out.isna().sum().sort_values(ascending=False)
|
| ordered_targets = [col for col in nan_counts.index if col in column_pairs]
|
|
|
|
|
| for target_col in ordered_targets:
|
| feature_cols = column_pairs[target_col]
|
| feature_cols_all = feature_cols.copy()
|
| if lags is not None and lags > 0:
|
| feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)]
|
|
|
|
|
| if autolag and lags is not None and lags > 0:
|
| feature_cols_all += [f"{target_col}_lag{l}" for l in range(1, lags + 1)]
|
|
|
| df_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), feature_cols_all]
|
| y_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), target_col]
|
| df_pred = df_full_lagged.loc[df_full_lagged[target_col].isna(), feature_cols_all]
|
|
|
|
|
| xgb = XGBRegressor(
|
| n_estimators=n_estimators,
|
| max_depth=max_depth,
|
| learning_rate=learning_rate,
|
| subsample=subsample,
|
| random_state=42,
|
| n_jobs=-1,
|
| objective='reg:squarederror'
|
| )
|
|
|
| xgb.fit(df_train, y_train)
|
| df_out.loc[df_full_lagged[target_col].isna(), target_col] = xgb.predict(df_pred)
|
|
|
| df_out = df_out.round(1)
|
| df_out = df_out[target_cols]
|
|
|
| return df_out
|
|
|
|
|
|
|
|
|
| def objective_rf(trial, df, df_hist, column_pairs, max_lag=6, sample_frac=0.3, random_state=42):
|
| """
|
| Optuna objective function for tuning Random Forest imputation.
|
| """
|
|
|
| n_estimators = trial.suggest_int("n_estimators", 50, 500, step=50)
|
| max_depth = trial.suggest_int("max_depth", 2, 20)
|
| min_samples_split = trial.suggest_int("min_samples_split", 2, 10)
|
| lags = trial.suggest_categorical("lags", [None] + list(range(1, max_lag + 1)))
|
| autolag = trial.suggest_categorical("autolag", [True, False])
|
|
|
|
|
| imputation_params = {
|
| "df_hist": df_hist,
|
| "column_pairs": column_pairs,
|
| "lags": lags,
|
| "autolag": autolag,
|
| "n_estimators": n_estimators,
|
| "max_depth": max_depth,
|
| "min_samples_split": min_samples_split
|
| }
|
|
|
|
|
| metrics_df = evaluate_imputation_mlflow(
|
| df,
|
| imputation_rf_openmeteo,
|
| imputation_params=imputation_params,
|
| method_name=None,
|
| sample_frac=sample_frac,
|
| random_state=random_state
|
| )
|
|
|
|
|
| return metrics_df.loc['Overall', 'NRMSE']
|
|
|
|
|
| def objective_xgb(trial, df, df_hist, column_pairs, max_lag=6, sample_frac=0.3, random_state=42):
|
| """
|
| Optuna objective function for tuning XGBoost imputation.
|
| """
|
|
|
| n_estimators = trial.suggest_int("n_estimators", 50, 500, step=50)
|
| max_depth = trial.suggest_int("max_depth", 2, 20)
|
| learning_rate = trial.suggest_float("learning_rate", 0.01, 0.3, log=True)
|
| subsample = trial.suggest_float("subsample", 0.5, 1.0)
|
| lags = trial.suggest_categorical("lags", [None] + list(range(1, max_lag + 1)))
|
| autolag = trial.suggest_categorical("autolag", [True, False])
|
|
|
|
|
| imputation_params = {
|
| "df_hist": df_hist,
|
| "column_pairs": column_pairs,
|
| "lags": lags,
|
| "autolag": autolag,
|
| "n_estimators": n_estimators,
|
| "max_depth": max_depth,
|
| "learning_rate": learning_rate,
|
| "subsample": subsample
|
| }
|
|
|
|
|
| metrics_df = evaluate_imputation_mlflow(
|
| df,
|
| imputation_xgb_openmeteo,
|
| imputation_params=imputation_params,
|
| method_name=None,
|
| sample_frac=sample_frac,
|
| random_state=random_state
|
| )
|
|
|
|
|
| return metrics_df.loc['Overall', 'NRMSE'] |