paolog-fbk's picture
Upload folder using huggingface_hub
64ab846 verified
import mlflow
import json
import numpy as np
import pandas as pd
from sklearn import linear_model
from sklearn.ensemble import RandomForestRegressor
from statsmodels.tsa.api import VAR
from xgboost import XGBRegressor
# Imputation Evaluation and MLflow Logging
def evaluate_imputation_mlflow(df_orig, imputation_function, imputation_params, method_name=None, sample_frac=0.3, random_state=42):
df_orig_data_out = df_orig.copy().asfreq('h').sort_index()
# Random Sampling
np.random.seed(random_state)
mask = pd.DataFrame(False, index=df_orig_data_out.index, columns=df_orig_data_out.columns)
for col in df_orig_data_out.columns:
n_samples = int(sample_frac * len(df_orig_data_out))
mask_indices = np.random.choice(df_orig_data_out.index, size=n_samples, replace=False)
mask.loc[mask_indices, col] = True
df_masked = df_orig_data_out.mask(mask)
# Imputation
df_imputed = imputation_function(df_masked, **imputation_params)
# RMSE & NRMSE
rmse_values = ((df_imputed[mask] - df_orig_data_out[mask])**2).mean().pow(0.5)
rmse_df = pd.DataFrame(rmse_values).rename(columns={0: 'RMSE'}).sort_index()
nrmse_percent = {}
for col in df_orig_data_out.columns:
col_range = df_orig_data_out[col].max() - df_orig_data_out[col].min()
nrmse_percent[col] = (rmse_values[col] / col_range) * 100
nrmse_df = pd.DataFrame.from_dict(nrmse_percent, orient='index', columns=['NRMSE'])
#Aggregated Metrics
rmse_temp = rmse_df.loc[rmse_df.index.str.contains('Temperature'), 'RMSE'].mean()
rmse_prec = rmse_df.loc[rmse_df.index.str.contains('Precipitation'), 'RMSE'].mean()
overall_rmse = rmse_df['RMSE'].mean()
nrmse_temp = nrmse_df.loc[nrmse_df.index.str.contains('Temperature'), 'NRMSE'].mean()
nrmse_prec = nrmse_df.loc[nrmse_df.index.str.contains('Precipitation'), 'NRMSE'].mean()
overall_nrmse = nrmse_df['NRMSE'].mean()
# Log to MLflow
if method_name is not None:
with mlflow.start_run(run_name=method_name):
mlflow.log_param("method", method_name)
mlflow.log_param("sample_frac", sample_frac)
for k, v in imputation_params.items():
if isinstance(v, (dict, list)):
mlflow.log_param(k, json.dumps(v))
else:
mlflow.log_param(k, v)
for var, rmse in rmse_df['RMSE'].items():
mlflow.log_metric(f"rmse_{var}", rmse)
mlflow.log_metric("nrmse_temperature", round(nrmse_temp, 3))
mlflow.log_metric("nrmse_precipitation", round(nrmse_prec, 3))
mlflow.log_metric("nrmse_overall", round(overall_nrmse, 3))
# Prepare Metrics DataFrame
metrics_df = rmse_df.join(nrmse_df)
aggregates = pd.DataFrame({
'RMSE': [rmse_temp, rmse_prec, overall_rmse],
'NRMSE': [nrmse_temp, nrmse_prec, overall_nrmse]
}, index=['Temperature', 'Precipitation', 'Overall'])
metrics_df = pd.concat([metrics_df, aggregates]).round(3)
return metrics_df
# Imputation Functions
def imputation_linear_naive(df):
"""
Imputes missing values using simple linear interpolation.
Parameters:
df (pd.DataFrame): DataFrame with possible missing values.
Returns:
pd.DataFrame: Imputed DataFrame.
"""
df_out = df.copy().asfreq('h')
# Step 1: interpolation
df_out = df_out.interpolate(method='linear').bfill()
# Step 2: clip precipitation values to be non-negative and round all values
prec_cols = [col for col in df_out.columns if col.lower().startswith('precipitation')]
df_out[prec_cols] = df_out[prec_cols].clip(lower=0)
df_out = df_out.round(1)
return df_out
def imputation_linear_openmeteo(df, df_hist, column_pairs, lags=None):
"""
Imputes missing values using linear regression with historical data from Open-Meteo.
Parameters:
df (pd.DataFrame): DataFrame with missing values.
df_hist (pd.DataFrame): Historical dataset without NaNs.
column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}.
lags (int or None): Maximum lag to use for historical variables. If None, no lag is used.
Returns:
pd.DataFrame: Imputed DataFrame.
"""
target_cols = df.columns.tolist()
df_out = df.copy().asfreq('h')
mask_nan_midnight = df_out.index.hour == 0
mask_nan_midnight &= df_out.isna().all(axis=1)
df_prev = df_out.shift(1)
df_next = df_out.shift(-1)
df_mean = (df_prev + df_next) / 2
df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight])
# Step 1: create lagged versions of historical variables
df_hist_lagged = pd.DataFrame(index=df_out.index)
for col in df_hist.columns:
df_hist_lagged[col] = df_hist[col] # contemporaneous value
if lags is not None and lags > 0:
for lag in range(1, lags + 1):
df_hist_lagged[f"{col}_lag{lag}"] = df_hist[col].shift(lag)
# Step 2: sort target columns by number of NaNs
nan_counts = df_out.isna().sum().sort_values()
ordered_targets = [col for col in nan_counts.index if col in column_pairs]
# Step 3: ordered_targets
for target_col in ordered_targets:
feature_cols = column_pairs[target_col]
feature_cols_all = feature_cols.copy()
if lags is not None and lags > 0:
feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)]
df_joint = df_out[[target_col]].join(df_hist_lagged[feature_cols_all], how='left')
# linear regression
reg = linear_model.LinearRegression()
reg.fit(
df_joint.dropna()[feature_cols_all].values,
df_joint.dropna()[target_col].values
)
# predict missing values
missing_idx = df_joint[target_col].isna()
if missing_idx.any():
new_values = np.matmul(df_joint.loc[missing_idx, feature_cols_all].values, reg.coef_) + reg.intercept_
df_out.loc[missing_idx, target_col] = new_values
# Step 4: final adjustments
prec_cols = [col for col in df_out.columns if col.startswith('Precipitation')]
df_out[prec_cols] = df_out[prec_cols].clip(lower=0)
df_out = df_out.round(1)
df_out = df_out[target_cols]
return df_out
def select_var_lag(df, maxlags=48, lag_criterion='bic'):
"""
Selects the best lag for a VAR model based on the chosen criterion.
Parameters:
df (pd.DataFrame): DataFrame with possible missing values.
maxlags (int): Maximum number of lags to consider.
lag_criterion (str): Selection criterion ('aic', 'bic', 'hqic', 'fpe').
Returns:
int: Selected lag order.
"""
# Step 1: initial linear interpolation
df_tmp = df.asfreq('h').interpolate(method='linear').bfill()
# Step 2: fit the VAR model
model = VAR(df_tmp)
lag_order = model.select_order(maxlags=maxlags)
selected_lag = lag_order.selected_orders[lag_criterion]
return selected_lag
def imputation_var(df, df_hist, lag):
"""
Imputes missing values using a VAR model with a specified lag order.
Parameters:
df (pd.DataFrame): DataFrame with missing values.
lag (int): Lag order for the VAR model.
Returns:
pd.DataFrame: DataFrame with imputed values.
"""
target_cols = df.columns.tolist()
exog_cols = df_hist.columns.tolist()
df_combined = df.asfreq('h').join(df_hist, how='left')
exog_vars = df_combined[exog_cols]
# Step 1: initial linear interpolation
df_tmp = df_combined.asfreq('h').interpolate(method='linear').bfill()
# Step 2: fit VAR model
model = VAR(df_tmp, exog=exog_vars)
var_model = model.fit(lag)
# Step 3: prepare for fast imputation
original_index = df_tmp.index
data_array = df_tmp.to_numpy()
mask_nan = np.isnan(df.to_numpy())
i = var_model.k_ar
while i < len(df):
if mask_nan[i].any():
# find consecutive missing values
j = i
while j < len(df) and mask_nan[j].any():
j += 1
n_steps = j - i
# forecast missing values
input_data = data_array[i - var_model.k_ar:i]
forecast = var_model.forecast(y=input_data, steps=n_steps, exog_future=exog_vars.iloc[i:i + n_steps])
# update only missing columns
for k in range(n_steps):
nan_cols_idx = np.where(mask_nan[i + k])[0]
data_array[i + k, nan_cols_idx] = forecast[k, nan_cols_idx]
i = j
else:
i += 1
# Step 4: convert back to DataFrame
df_out = pd.DataFrame(data_array, columns=df_tmp.columns, index=original_index)
# Step 5: final adjustments
prec_cols = [col for col in df_out.columns if col.startswith('Precipitation')]
df_out[prec_cols] = df_out[prec_cols].clip(lower=0)
df_out = df_out.round(1)
df_out = df_out[target_cols]
return df_out
def imputation_rf_openmeteo(df, df_hist, column_pairs, lags=None, autolag=False, n_estimators=100, max_depth=None, min_samples_split=2):
"""
Imputes missing values using Random Forest regression with historical data from Open-Meteo.
Parameters:
df (pd.DataFrame): DataFrame with missing values.
df_hist (pd.DataFrame): Historical dataset without NaNs.
column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}.
lags (int or None): Maximum lag to use for historical variables. If None, no lag is used.
autolag (bool): If True, adds lagged versions of the target variable as features.
n_estimators (int): Number of trees in the Random Forest.
Returns:
pd.DataFrame: Imputed DataFrame.
"""
target_cols = df.columns.tolist()
df_out = df.copy().asfreq('h')
mask_nan_midnight = df_out.index.hour == 0
mask_nan_midnight &= df_out.isna().all(axis=1)
df_prev = df_out.shift(1)
df_next = df_out.shift(-1)
df_mean = (df_prev + df_next) / 2
df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight])
df_full = df_out.join(df_hist, how='left')
# Step 1: create lagged versions of historical variables
df_full_lagged = pd.DataFrame(index=df_out.index)
for col in df_full.columns:
df_full_lagged[col] = df_full[col] # contemporaneous
if lags is not None and lags > 0:
for lag in range(1, lags + 1):
df_full_lagged[f"{col}_lag{lag}"] = df_full[col].shift(lag)
# Step 2: sort target columns by number of NaNs
nan_counts = df_out.isna().sum().sort_values(ascending=False)
ordered_targets = [col for col in nan_counts.index if col in column_pairs]
# Step 3: imputation
for target_col in ordered_targets:
feature_cols = column_pairs[target_col]
feature_cols_all = feature_cols.copy()
if lags is not None and lags > 0:
feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)]
# add autolags of target variable if specified
if autolag and lags is not None and lags > 0:
feature_cols_all += [f"{target_col}_lag{l}" for l in range(1, lags + 1)]
df_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), feature_cols_all]
y_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), target_col]
df_pred = df_full_lagged.loc[df_full_lagged[target_col].isna(), feature_cols_all]
# Random Forest regression
rf = RandomForestRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=min_samples_split,
random_state=42,
n_jobs=-1
)
rf.fit(df_train, y_train)
df_out.loc[df_full_lagged[target_col].isna(), target_col] = rf.predict(df_pred)
df_out = df_out.round(1)
df_out = df_out[target_cols]
return df_out
def imputation_xgb_openmeteo(df, df_hist, column_pairs, lags=None, autolag=False, n_estimators=100, max_depth=6, learning_rate=0.1, subsample=1.0):
"""
Imputes missing values using XGBoost regression with historical data from Open-Meteo.
Parameters:
df (pd.DataFrame): DataFrame with missing values.
df_hist (pd.DataFrame): Historical dataset without NaNs.
column_pairs (dict): Dictionary {target_column: [feature1, feature2, ...]}.
lags (int or None): Maximum lag to use for historical variables. If None, no lag is used.
autolag (bool): If True, adds lagged versions of the target variable as features.
n_estimators (int): Number of boosting rounds (trees).
max_depth (int): Maximum tree depth for base learners.
learning_rate (float): Boosting learning rate (xgb parameter 'eta').
subsample (float): Subsample ratio of the training instance.
Returns:
pd.DataFrame: Imputed DataFrame.
"""
target_cols = df.columns.tolist()
df_out = df.copy().asfreq('h')
mask_nan_midnight = df_out.index.hour == 0
mask_nan_midnight &= df_out.isna().all(axis=1)
df_prev = df_out.shift(1)
df_next = df_out.shift(-1)
df_mean = (df_prev + df_next) / 2
df_out.loc[mask_nan_midnight] = df_out.loc[mask_nan_midnight].fillna(df_mean.loc[mask_nan_midnight])
df_full = df_out.join(df_hist, how='left')
# Step 1: create lagged versions of historical variables
lagged_cols = {}
for col in df_full.columns:
lagged_cols[col] = df_full[col]
if lags is not None and lags > 0:
for lag in range(1, lags + 1):
lagged_cols[f"{col}_lag{lag}"] = df_full[col].shift(lag)
df_full_lagged = pd.concat(lagged_cols, axis=1)
df_full_lagged.index = df_out.index
# Step 2: sort target columns by number of NaNs
nan_counts = df_out.isna().sum().sort_values(ascending=False)
ordered_targets = [col for col in nan_counts.index if col in column_pairs]
# Step 3: imputation
for target_col in ordered_targets:
feature_cols = column_pairs[target_col]
feature_cols_all = feature_cols.copy()
if lags is not None and lags > 0:
feature_cols_all += [f"{c}_lag{l}" for c in feature_cols for l in range(1, lags + 1)]
# add autolags of target variable if specified
if autolag and lags is not None and lags > 0:
feature_cols_all += [f"{target_col}_lag{l}" for l in range(1, lags + 1)]
df_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), feature_cols_all]
y_train = df_full_lagged.loc[df_full_lagged[target_col].notna(), target_col]
df_pred = df_full_lagged.loc[df_full_lagged[target_col].isna(), feature_cols_all]
# XGBoost regression
xgb = XGBRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
subsample=subsample,
random_state=42,
n_jobs=-1,
objective='reg:squarederror'
)
xgb.fit(df_train, y_train)
df_out.loc[df_full_lagged[target_col].isna(), target_col] = xgb.predict(df_pred)
df_out = df_out.round(1)
df_out = df_out[target_cols]
return df_out
# Optimization Functions
def objective_rf(trial, df, df_hist, column_pairs, max_lag=6, sample_frac=0.3, random_state=42):
"""
Optuna objective function for tuning Random Forest imputation.
"""
# Hyperparameter suggestions
n_estimators = trial.suggest_int("n_estimators", 50, 500, step=50)
max_depth = trial.suggest_int("max_depth", 2, 20)
min_samples_split = trial.suggest_int("min_samples_split", 2, 10)
lags = trial.suggest_categorical("lags", [None] + list(range(1, max_lag + 1)))
autolag = trial.suggest_categorical("autolag", [True, False])
# Imputation parameters
imputation_params = {
"df_hist": df_hist,
"column_pairs": column_pairs,
"lags": lags,
"autolag": autolag,
"n_estimators": n_estimators,
"max_depth": max_depth,
"min_samples_split": min_samples_split
}
# Evaluate imputation
metrics_df = evaluate_imputation_mlflow(
df,
imputation_rf_openmeteo,
imputation_params=imputation_params,
method_name=None,
sample_frac=sample_frac,
random_state=random_state
)
# Return overall NRMSE to minimize
return metrics_df.loc['Overall', 'NRMSE']
def objective_xgb(trial, df, df_hist, column_pairs, max_lag=6, sample_frac=0.3, random_state=42):
"""
Optuna objective function for tuning XGBoost imputation.
"""
# Hyperparameter suggestions
n_estimators = trial.suggest_int("n_estimators", 50, 500, step=50)
max_depth = trial.suggest_int("max_depth", 2, 20)
learning_rate = trial.suggest_float("learning_rate", 0.01, 0.3, log=True)
subsample = trial.suggest_float("subsample", 0.5, 1.0)
lags = trial.suggest_categorical("lags", [None] + list(range(1, max_lag + 1)))
autolag = trial.suggest_categorical("autolag", [True, False])
# Imputation parameters
imputation_params = {
"df_hist": df_hist,
"column_pairs": column_pairs,
"lags": lags,
"autolag": autolag,
"n_estimators": n_estimators,
"max_depth": max_depth,
"learning_rate": learning_rate,
"subsample": subsample
}
# Evaluate imputation
metrics_df = evaluate_imputation_mlflow(
df,
imputation_xgb_openmeteo,
imputation_params=imputation_params,
method_name=None,
sample_frac=sample_frac,
random_state=random_state
)
# Return overall NRMSE to minimize
return metrics_df.loc['Overall', 'NRMSE']