| import pandas as pd
|
| import numpy as np
|
|
|
|
|
| def create_live_feature_vector(live_daily_summary: dict, historical_data: pd.DataFrame) -> pd.DataFrame:
|
| """Create a single-row DataFrame of features suitable for the 5-day models.
|
|
|
| This is a pragmatic, reduced-feature implementation: it fills a template row
|
| using the last historical day as a baseline and replaces/engineers the most
|
| important features from live_daily_summary + recent history.
|
|
|
| Note: The full project used ~157 features. Implementing all of them here is
|
| tedious and error-prone; this function focuses on ~25 high-importance
|
| features commonly used in temperature forecasting. It will also attempt to
|
| preserve the original columns order (using historical_data.columns) so
|
| models expecting the same schema are less likely to fail.
|
| """
|
| if historical_data is None or historical_data.empty:
|
| raise ValueError("historical_data must be a non-empty DataFrame")
|
|
|
|
|
| template = historical_data.iloc[-1].copy()
|
|
|
|
|
| today_row = pd.Series(index=historical_data.columns, dtype="float64")
|
|
|
|
|
| mappings = {
|
| 'temp': ['temp', 'temperature', 'avg_temp'],
|
| 'feelslike': ['feelslike', 'feels_like'],
|
| 'humidity': ['humidity'],
|
| 'precip': ['precip', 'precipitation', 'rain'],
|
| 'windspeed': ['windspeed', 'wind_speed', 'windspd'],
|
| 'cloudcover': ['cloudcover', 'clouds', 'cloud_percent']
|
| }
|
|
|
| for feature, candidates in mappings.items():
|
| val = None
|
| for c in candidates:
|
| if c in live_daily_summary:
|
| val = live_daily_summary.get(c)
|
| break
|
|
|
| if val is None and 'main' in live_daily_summary and feature in live_daily_summary['main']:
|
| val = live_daily_summary['main'].get(feature)
|
| if val is None and feature in live_daily_summary:
|
| val = live_daily_summary.get(feature)
|
|
|
|
|
| for col in historical_data.columns:
|
| if col == feature and val is not None:
|
| today_row[col] = float(val)
|
|
|
|
|
| if 'temp' in historical_data.columns and pd.isna(today_row.get('temp')):
|
| if 'temp' in live_daily_summary:
|
| today_row['temp'] = float(live_daily_summary['temp'])
|
| else:
|
| today_row['temp'] = float(template.get('temp', np.nan))
|
|
|
|
|
| today_ts = pd.Timestamp.now().normalize()
|
| if 'year' in historical_data.columns:
|
| today_row['year'] = today_ts.year
|
| if 'month' in historical_data.columns:
|
| today_row['month'] = today_ts.month
|
| if 'day_of_year' in historical_data.columns:
|
| today_row['day_of_year'] = today_ts.dayofyear
|
|
|
|
|
| def safe_hist(col, offset=1):
|
| idx = -offset
|
| try:
|
| return float(historical_data[col].iloc[idx])
|
| except Exception:
|
| return np.nan
|
|
|
| if 'temp_lag_1' in historical_data.columns:
|
| today_row['temp_lag_1'] = safe_hist('temp', 1)
|
| if 'temp_lag_2' in historical_data.columns:
|
| today_row['temp_lag_2'] = safe_hist('temp', 2)
|
| if 'humidity_lag_1' in historical_data.columns:
|
| today_row['humidity_lag_1'] = safe_hist('humidity', 1)
|
|
|
|
|
| def rolling_stat(col, window=7, stat='mean'):
|
| try:
|
| hist_vals = historical_data[col].dropna().iloc[-(window-1):].astype(float)
|
| if not np.isnan(today_row.get(col)):
|
| combined = pd.concat([hist_vals, pd.Series([today_row[col]])], ignore_index=True)
|
| else:
|
| combined = hist_vals
|
| if combined.empty:
|
| return np.nan
|
| if stat == 'mean':
|
| return float(combined.mean())
|
| if stat == 'std':
|
| return float(combined.std())
|
| if stat == 'sum':
|
| return float(combined.sum())
|
| return np.nan
|
| except Exception:
|
| return np.nan
|
|
|
| if 'temp_roll_7d_mean' in historical_data.columns:
|
| today_row['temp_roll_7d_mean'] = rolling_stat('temp', window=7, stat='mean')
|
| if 'temp_roll_7d_std' in historical_data.columns:
|
| today_row['temp_roll_7d_std'] = rolling_stat('temp', window=7, stat='std')
|
| if 'temp_roll_14d_std' in historical_data.columns:
|
| today_row['temp_roll_14d_std'] = rolling_stat('temp', window=14, stat='std')
|
|
|
|
|
| if 'precip' in historical_data.columns and 'precip_roll_7d_sum' in historical_data.columns:
|
| today_row['precip_roll_7d_sum'] = rolling_stat('precip', window=7, stat='sum')
|
|
|
|
|
| for col in historical_data.columns:
|
| if pd.isna(today_row.get(col)):
|
| try:
|
| today_row[col] = float(template[col]) if pd.notna(template[col]) else np.nan
|
| except Exception:
|
| today_row[col] = np.nan
|
|
|
|
|
| today_df = pd.DataFrame([today_row])
|
| today_df.index = [pd.Timestamp.now()]
|
|
|
|
|
| today_df = today_df.reindex(columns=historical_data.columns)
|
| return today_df
|
|
|