DSEB65A_Group4_FinalProject_HCMWeatherForecast / feature_engineering_live.py
Harveyntt's picture
Upload feature_engineering_live.py
42b6dee verified
import pandas as pd
import numpy as np
def create_live_feature_vector(live_daily_summary: dict, historical_data: pd.DataFrame) -> pd.DataFrame:
"""Create a single-row DataFrame of features suitable for the 5-day models.
This is a pragmatic, reduced-feature implementation: it fills a template row
using the last historical day as a baseline and replaces/engineers the most
important features from live_daily_summary + recent history.
Note: The full project used ~157 features. Implementing all of them here is
tedious and error-prone; this function focuses on ~25 high-importance
features commonly used in temperature forecasting. It will also attempt to
preserve the original columns order (using historical_data.columns) so
models expecting the same schema are less likely to fail.
"""
if historical_data is None or historical_data.empty:
raise ValueError("historical_data must be a non-empty DataFrame")
# Use the last historical row as a template (copy to avoid mutation)
template = historical_data.iloc[-1].copy()
# Start with a series having same index as template (so column ordering is preserved)
today_row = pd.Series(index=historical_data.columns, dtype="float64")
# Basic direct mappings (if columns exist)
mappings = {
'temp': ['temp', 'temperature', 'avg_temp'],
'feelslike': ['feelslike', 'feels_like'],
'humidity': ['humidity'],
'precip': ['precip', 'precipitation', 'rain'],
'windspeed': ['windspeed', 'wind_speed', 'windspd'],
'cloudcover': ['cloudcover', 'clouds', 'cloud_percent']
}
for feature, candidates in mappings.items():
val = None
for c in candidates:
if c in live_daily_summary:
val = live_daily_summary.get(c)
break
# fallback to nested keys in OpenWeather-like structures
if val is None and 'main' in live_daily_summary and feature in live_daily_summary['main']:
val = live_daily_summary['main'].get(feature)
if val is None and feature in live_daily_summary:
val = live_daily_summary.get(feature)
# Put into today_row if a matching column exists
for col in historical_data.columns:
if col == feature and val is not None:
today_row[col] = float(val)
# If 'temp' column still missing fill from template or live summary
if 'temp' in historical_data.columns and pd.isna(today_row.get('temp')):
if 'temp' in live_daily_summary:
today_row['temp'] = float(live_daily_summary['temp'])
else:
today_row['temp'] = float(template.get('temp', np.nan))
# Temporal features
today_ts = pd.Timestamp.now().normalize()
if 'year' in historical_data.columns:
today_row['year'] = today_ts.year
if 'month' in historical_data.columns:
today_row['month'] = today_ts.month
if 'day_of_year' in historical_data.columns:
today_row['day_of_year'] = today_ts.dayofyear
# Lag features (use recent historical days)
def safe_hist(col, offset=1):
idx = -offset
try:
return float(historical_data[col].iloc[idx])
except Exception:
return np.nan
if 'temp_lag_1' in historical_data.columns:
today_row['temp_lag_1'] = safe_hist('temp', 1)
if 'temp_lag_2' in historical_data.columns:
today_row['temp_lag_2'] = safe_hist('temp', 2)
if 'humidity_lag_1' in historical_data.columns:
today_row['humidity_lag_1'] = safe_hist('humidity', 1)
# Rolling windows: combine last N historical days with today's live 'temp' when available
def rolling_stat(col, window=7, stat='mean'):
try:
hist_vals = historical_data[col].dropna().iloc[-(window-1):].astype(float)
if not np.isnan(today_row.get(col)):
combined = pd.concat([hist_vals, pd.Series([today_row[col]])], ignore_index=True)
else:
combined = hist_vals
if combined.empty:
return np.nan
if stat == 'mean':
return float(combined.mean())
if stat == 'std':
return float(combined.std())
if stat == 'sum':
return float(combined.sum())
return np.nan
except Exception:
return np.nan
if 'temp_roll_7d_mean' in historical_data.columns:
today_row['temp_roll_7d_mean'] = rolling_stat('temp', window=7, stat='mean')
if 'temp_roll_7d_std' in historical_data.columns:
today_row['temp_roll_7d_std'] = rolling_stat('temp', window=7, stat='std')
if 'temp_roll_14d_std' in historical_data.columns:
today_row['temp_roll_14d_std'] = rolling_stat('temp', window=14, stat='std')
# If the model expects precip_roll_7d_sum and we can compute it
if 'precip' in historical_data.columns and 'precip_roll_7d_sum' in historical_data.columns:
today_row['precip_roll_7d_sum'] = rolling_stat('precip', window=7, stat='sum')
# Fill other columns conservatively using the last historical values (template)
for col in historical_data.columns:
if pd.isna(today_row.get(col)):
try:
today_row[col] = float(template[col]) if pd.notna(template[col]) else np.nan
except Exception:
today_row[col] = np.nan
# Convert to single-row DataFrame and ensure dtypes
today_df = pd.DataFrame([today_row])
today_df.index = [pd.Timestamp.now()]
# Reorder columns to match historical_data (already aligned) and return
today_df = today_df.reindex(columns=historical_data.columns)
return today_df