import mlflow import json import numpy as np import pandas as pd from sklearn import linear_model from sklearn.ensemble import RandomForestRegressor from statsmodels.tsa.api import VAR from xgboost import XGBRegressor # Imputation Evaluation and MLflow Logging def evaluate_imputation_mlflow(df_orig, imputation_function, imputation_params, method_name=None, sample_frac=0.3, random_state=42): df_orig_data_out = df_orig.copy().asfreq('h').sort_index() # Random Sampling np.random.seed(random_state) mask = pd.DataFrame(False, index=df_orig_data_out.index, columns=df_orig_data_out.columns) for col in df_orig_data_out.columns: n_samples = int(sample_frac * len(df_orig_data_out)) mask_indices = np.random.choice(df_orig_data_out.index, size=n_samples, replace=False) mask.loc[mask_indices, col] = True df_masked = df_orig_data_out.mask(mask) # Imputation df_imputed = imputation_function(df_masked, **imputation_params) # RMSE & NRMSE rmse_values = ((df_imputed[mask] - df_orig_data_out[mask])**2).mean().pow(0.5) rmse_df = pd.DataFrame(rmse_values).rename(columns={0: 'RMSE'}).sort_index() nrmse_percent = {} for col in df_orig_data_out.columns: col_range = df_orig_data_out[col].max() - df_orig_data_out[col].min() nrmse_percent[col] = (rmse_values[col] / col_range) * 100 nrmse_df = pd.DataFrame.from_dict(nrmse_percent, orient='index', columns=['NRMSE']) #Aggregated Metrics rmse_temp = rmse_df.loc[rmse_df.index.str.contains('Temperature'), 'RMSE'].mean() rmse_prec = rmse_df.loc[rmse_df.index.str.contains('Precipitation'), 'RMSE'].mean() overall_rmse = rmse_df['RMSE'].mean() nrmse_temp = nrmse_df.loc[nrmse_df.index.str.contains('Temperature'), 'NRMSE'].mean() nrmse_prec = nrmse_df.loc[nrmse_df.index.str.contains('Precipitation'), 'NRMSE'].mean() overall_nrmse = nrmse_df['NRMSE'].mean() # Log to MLflow if method_name is not None: with mlflow.start_run(run_name=method_name): mlflow.log_param("method", method_name) mlflow.log_param("sample_frac", sample_frac) for k, v in imputation_params.items(): if isinstance(v, (dict, list)): mlflow.log_param(k, json.dumps(v)) else: mlflow.log_param(k, v) for var, rmse in rmse_df['RMSE'].items(): mlflow.log_metric(f"rmse_{var}", rmse) mlflow.log_metric("nrmse_temperature", round(nrmse_temp, 3)) mlflow.log_metric("nrmse_precipitation", round(nrmse_prec, 3)) mlflow.log_metric("nrmse_overall", round(overall_nrmse, 3)) # Prepare Metrics DataFrame metrics_df = rmse_df.join(nrmse_df) aggregates = pd.DataFrame({ 'RMSE': [rmse_temp, rmse_prec, overall_rmse], 'NRMSE': [nrmse_temp, nrmse_prec, overall_nrmse] }, index=['Temperature', 'Precipitation', 'Overall']) metrics_df = pd.concat([metrics_df, aggregates]).round(3) return metrics_df # Imputation Functions def imputation_linear_naive(df): """ Imputes missing values using simple linear interpolation. Parameters: df (pd.DataFrame): DataFrame with possible missing values. Returns: pd.DataFrame: Imputed DataFrame. """ df_out = df.copy().asfreq('h') # Step 1: interpolation df_out = df_out.interpolate(method='linear').bfill() # Step 2: clip precipitation values to be non-negative and round all values prec_cols = [col for col in df_out.columns if col.lower().startswith('precipitation')] df_out[prec_cols] = df_out[prec_cols].clip(lower=0) df_out = df_out.round(1) return df_out def imputation_linear_openmeteo(df, df_hist, column_pairs, lags=None): """ Imputes missing values using linear regression with historical data from Open-Meteo. Parameters: df (pd.DataFrame): DataFrame with missing values. df_hist (pd.DataFrame): Historical dataset without NaNs. column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}. lags (int or None): Maximum lag to use for historical variables. If None, no lag is used. Returns: pd.DataFrame: Imputed DataFrame. """ target_cols = df.columns.tolist() df_out = df.copy().asfreq('h') mask_nan_midnight = df_out.index.hour == 0 mask_nan_midnight &= df_out.isna().all(axis=1) df_prev = df_out.shift(1) df_next = df_out.shift(-1) df_mean = (df_prev + df_next) / 2 df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight]) # Step 1: create lagged versions of historical variables df_hist_lagged = pd.DataFrame(index=df_out.index) for col in df_hist.columns: df_hist_lagged[col] = df_hist[col] # contemporaneous value if lags is not None and lags > 0: for lag in range(1, lags + 1): df_hist_lagged[f"{col}_lag{lag}"] = df_hist[col].shift(lag) # Step 2: sort target columns by number of NaNs nan_counts = df_out.isna().sum().sort_values() ordered_targets = [col for col in nan_counts.index if col in column_pairs] # Step 3: ordered_targets for target_col in ordered_targets: feature_cols = column_pairs[target_col] feature_cols_all = feature_cols.copy() if lags is not None and lags > 0: feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)] df_joint = df_out[[target_col]].join(df_hist_lagged[feature_cols_all], how='left') # linear regression reg = linear_model.LinearRegression() reg.fit( df_joint.dropna()[feature_cols_all].values, df_joint.dropna()[target_col].values ) # predict missing values missing_idx = df_joint[target_col].isna() if missing_idx.any(): new_values = np.matmul(df_joint.loc[missing_idx, feature_cols_all].values, reg.coef_) + reg.intercept_ df_out.loc[missing_idx, target_col] = new_values # Step 4: final adjustments prec_cols = [col for col in df_out.columns if col.startswith('Precipitation')] df_out[prec_cols] = df_out[prec_cols].clip(lower=0) df_out = df_out.round(1) df_out = df_out[target_cols] return df_out def select_var_lag(df, maxlags=48, lag_criterion='bic'): """ Selects the best lag for a VAR model based on the chosen criterion. Parameters: df (pd.DataFrame): DataFrame with possible missing values. maxlags (int): Maximum number of lags to consider. lag_criterion (str): Selection criterion ('aic', 'bic', 'hqic', 'fpe'). Returns: int: Selected lag order. """ # Step 1: initial linear interpolation df_tmp = df.asfreq('h').interpolate(method='linear').bfill() # Step 2: fit the VAR model model = VAR(df_tmp) lag_order = model.select_order(maxlags=maxlags) selected_lag = lag_order.selected_orders[lag_criterion] return selected_lag def imputation_var(df, df_hist, lag): """ Imputes missing values using a VAR model with a specified lag order. Parameters: df (pd.DataFrame): DataFrame with missing values. lag (int): Lag order for the VAR model. Returns: pd.DataFrame: DataFrame with imputed values. """ target_cols = df.columns.tolist() exog_cols = df_hist.columns.tolist() df_combined = df.asfreq('h').join(df_hist, how='left') exog_vars = df_combined[exog_cols] # Step 1: initial linear interpolation df_tmp = df_combined.asfreq('h').interpolate(method='linear').bfill() # Step 2: fit VAR model model = VAR(df_tmp, exog=exog_vars) var_model = model.fit(lag) # Step 3: prepare for fast imputation original_index = df_tmp.index data_array = df_tmp.to_numpy() mask_nan = np.isnan(df.to_numpy()) i = var_model.k_ar while i < len(df): if mask_nan[i].any(): # find consecutive missing values j = i while j < len(df) and mask_nan[j].any(): j += 1 n_steps = j - i # forecast missing values input_data = data_array[i - var_model.k_ar:i] forecast = var_model.forecast(y=input_data, steps=n_steps, exog_future=exog_vars.iloc[i:i + n_steps]) # update only missing columns for k in range(n_steps): nan_cols_idx = np.where(mask_nan[i + k])[0] data_array[i + k, nan_cols_idx] = forecast[k, nan_cols_idx] i = j else: i += 1 # Step 4: convert back to DataFrame df_out = pd.DataFrame(data_array, columns=df_tmp.columns, index=original_index) # Step 5: final adjustments prec_cols = [col for col in df_out.columns if col.startswith('Precipitation')] df_out[prec_cols] = df_out[prec_cols].clip(lower=0) df_out = df_out.round(1) df_out = df_out[target_cols] return df_out def imputation_rf_openmeteo(df, df_hist, column_pairs, lags=None, autolag=False, n_estimators=100, max_depth=None, min_samples_split=2): """ Imputes missing values using Random Forest regression with historical data from Open-Meteo. Parameters: df (pd.DataFrame): DataFrame with missing values. df_hist (pd.DataFrame): Historical dataset without NaNs. column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}. lags (int or None): Maximum lag to use for historical variables. If None, no lag is used. autolag (bool): If True, adds lagged versions of the target variable as features. n_estimators (int): Number of trees in the Random Forest. Returns: pd.DataFrame: Imputed DataFrame. """ target_cols = df.columns.tolist() df_out = df.copy().asfreq('h') mask_nan_midnight = df_out.index.hour == 0 mask_nan_midnight &= df_out.isna().all(axis=1) df_prev = df_out.shift(1) df_next = df_out.shift(-1) df_mean = (df_prev + df_next) / 2 df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight]) df_full = df_out.join(df_hist, how='left') # Step 1: create lagged versions of historical variables df_full_lagged = pd.DataFrame(index=df_out.index) for col in df_full.columns: df_full_lagged[col] = df_full[col] # contemporaneous if lags is not None and lags > 0: for lag in range(1, lags + 1): df_full_lagged[f"{col}_lag{lag}"] = df_full[col].shift(lag) # Step 2: sort target columns by number of NaNs nan_counts = df_out.isna().sum().sort_values(ascending=False) ordered_targets = [col for col in nan_counts.index if col in column_pairs] # Step 3: imputation for target_col in ordered_targets: feature_cols = column_pairs[target_col] feature_cols_all = feature_cols.copy() if lags is not None and lags > 0: feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)] # add autolags of target variable if specified if autolag and lags is not None and lags > 0: feature_cols_all += [f"{target_col}_lag{l}" for l in range(1, lags + 1)] df_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), feature_cols_all] y_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), target_col] df_pred = df_full_lagged.loc[df_full_lagged[target_col].isna(), feature_cols_all] # Random Forest regression rf = RandomForestRegressor( n_estimators=n_estimators, max_depth=max_depth, min_samples_split=min_samples_split, random_state=42, n_jobs=-1 ) rf.fit(df_train, y_train) df_out.loc[df_full_lagged[target_col].isna(), target_col] = rf.predict(df_pred) df_out = df_out.round(1) df_out = df_out[target_cols] return df_out def imputation_xgb_openmeteo(df, df_hist, column_pairs, lags=None, autolag=False, n_estimators=100, max_depth=6, learning_rate=0.1, subsample=1.0): """ Imputes missing values using XGBoost regression with historical data from Open-Meteo. Parameters: df (pd.DataFrame): DataFrame with missing values. df_hist (pd.DataFrame): Historical dataset without NaNs. column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}. lags (int or None): Maximum lag to use for historical variables. If None, no lag is used. autolag (bool): If True, adds lagged versions of the target variable as features. n_estimators (int): Number of boosting rounds (trees). max_depth (int): Maximum tree depth for base learners. learning_rate (float): Boosting learning rate (xgb parameter 'eta'). subsample (float): Subsample ratio of the training instance. Returns: pd.DataFrame: Imputed DataFrame. """ target_cols = df.columns.tolist() df_out = df.copy().asfreq('h') mask_nan_midnight = df_out.index.hour == 0 mask_nan_midnight &= df_out.isna().all(axis=1) df_prev = df_out.shift(1) df_next = df_out.shift(-1) df_mean = (df_prev + df_next) / 2 df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight]) df_full = df_out.join(df_hist, how='left') # Step 1: create lagged versions of historical variables lagged_cols = {} for col in df_full.columns: lagged_cols[col] = df_full[col] if lags is not None and lags > 0: for lag in range(1, lags + 1): lagged_cols[f"{col}_lag{lag}"] = df_full[col].shift(lag) df_full_lagged = pd.concat(lagged_cols, axis=1) df_full_lagged.index = df_out.index # Step 2: sort target columns by number of NaNs nan_counts = df_out.isna().sum().sort_values(ascending=False) ordered_targets = [col for col in nan_counts.index if col in column_pairs] # Step 3: imputation for target_col in ordered_targets: feature_cols = column_pairs[target_col] feature_cols_all = feature_cols.copy() if lags is not None and lags > 0: feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)] # add autolags of target variable if specified if autolag and lags is not None and lags > 0: feature_cols_all += [f"{target_col}_lag{l}" for l in range(1, lags + 1)] df_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), feature_cols_all] y_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), target_col] df_pred = df_full_lagged.loc[df_full_lagged[target_col].isna(), feature_cols_all] # XGBoost regression xgb = XGBRegressor( n_estimators=n_estimators, max_depth=max_depth, learning_rate=learning_rate, subsample=subsample, random_state=42, n_jobs=-1, objective='reg:squarederror' ) xgb.fit(df_train, y_train) df_out.loc[df_full_lagged[target_col].isna(), target_col] = xgb.predict(df_pred) df_out = df_out.round(1) df_out = df_out[target_cols] return df_out # Optimization Functions def objective_rf(trial, df, df_hist, column_pairs, max_lag=6, sample_frac=0.3, random_state=42): """ Optuna objective function for tuning Random Forest imputation. """ # Hyperparameter suggestions n_estimators = trial.suggest_int("n_estimators", 50, 500, step=50) max_depth = trial.suggest_int("max_depth", 2, 20) min_samples_split = trial.suggest_int("min_samples_split", 2, 10) lags = trial.suggest_categorical("lags", [None] + list(range(1, max_lag + 1))) autolag = trial.suggest_categorical("autolag", [True, False]) # Imputation parameters imputation_params = { "df_hist": df_hist, "column_pairs": column_pairs, "lags": lags, "autolag": autolag, "n_estimators": n_estimators, "max_depth": max_depth, "min_samples_split": min_samples_split } # Evaluate imputation metrics_df = evaluate_imputation_mlflow( df, imputation_rf_openmeteo, imputation_params=imputation_params, method_name=None, sample_frac=sample_frac, random_state=random_state ) # Return overall NRMSE to minimize return metrics_df.loc['Overall', 'NRMSE'] def objective_xgb(trial, df, df_hist, column_pairs, max_lag=6, sample_frac=0.3, random_state=42): """ Optuna objective function for tuning XGBoost imputation. """ # Hyperparameter suggestions n_estimators = trial.suggest_int("n_estimators", 50, 500, step=50) max_depth = trial.suggest_int("max_depth", 2, 20) learning_rate = trial.suggest_float("learning_rate", 0.01, 0.3, log=True) subsample = trial.suggest_float("subsample", 0.5, 1.0) lags = trial.suggest_categorical("lags", [None] + list(range(1, max_lag + 1))) autolag = trial.suggest_categorical("autolag", [True, False]) # Imputation parameters imputation_params = { "df_hist": df_hist, "column_pairs": column_pairs, "lags": lags, "autolag": autolag, "n_estimators": n_estimators, "max_depth": max_depth, "learning_rate": learning_rate, "subsample": subsample } # Evaluate imputation metrics_df = evaluate_imputation_mlflow( df, imputation_xgb_openmeteo, imputation_params=imputation_params, method_name=None, sample_frac=sample_frac, random_state=random_state ) # Return overall NRMSE to minimize return metrics_df.loc['Overall', 'NRMSE']