Jompatron commited on
Commit
b12d219
·
1 Parent(s): 820552c

new versions

Browse files
Files changed (2) hide show
  1. airquality/util.py +68 -10
  2. app.py +2 -2
airquality/util.py CHANGED
@@ -340,13 +340,71 @@ def check_file_path(file_path):
340
  print(f"File successfully found at the path: {file_path}")
341
 
342
  def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model):
343
- features_df = weather_fg.read()
344
- features_df = features_df.sort_values(by=['date'], ascending=True)
345
- features_df = features_df.tail(10)
346
- features_df['predicted_pm25'] = model.predict(features_df[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']])
347
- df = pd.merge(features_df, air_quality_df[['date','pm25','street','country']], on="date")
348
- df['days_before_forecast_day'] = 1
349
- hindcast_df = df
350
- df = df.drop('pm25', axis=1)
351
- monitor_fg.insert(df, write_options={"wait_for_job": True})
352
- return hindcast_df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
340
  print(f"File successfully found at the path: {file_path}")
341
 
342
  def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model):
343
+ # 1. Load some past weather
344
+ features_df = weather_fg.read().sort_values("date")
345
+ features_df = features_df.tail(10).reset_index(drop=True)
346
+
347
+ # 2. Load past PM25 (for building lag features)
348
+ air_quality_df = air_quality_df.sort_values("date").reset_index(drop=True)
349
+ pm25_series = air_quality_df["pm25"].values
350
+
351
+ # Need last 3 pm25 values to start autoregressive cycle
352
+ pm25_history = list(pm25_series[-3:])
353
+
354
+ predictions = []
355
+
356
+ for i in range(len(features_df)):
357
+ # Weather for this day
358
+ w = features_df.iloc[i]
359
+
360
+ # Build lag features
361
+ lag1 = pm25_history[-1]
362
+ lag2 = pm25_history[-2]
363
+ lag3 = pm25_history[-3]
364
+
365
+ roll_mean = np.mean(pm25_history[-3:])
366
+ roll_std = np.std(pm25_history[-3:])
367
+
368
+ X = pd.DataFrame({
369
+ "temperature_2m_mean": [w["temperature_2m_mean"]],
370
+ "precipitation_sum": [w["precipitation_sum"]],
371
+ "wind_speed_10m_max": [w["wind_speed_10m_max"]],
372
+ "wind_direction_10m_dominant": [w["wind_direction_10m_dominant"]],
373
+ "pm25_lag1": [lag1],
374
+ "pm25_lag2": [lag2],
375
+ "pm25_lag3": [lag3],
376
+ "pm25_roll3_mean": [roll_mean],
377
+ "pm25_roll3_std": [roll_std]
378
+ })
379
+
380
+ pred = float(model.predict(X)[0])
381
+ predictions.append(pred)
382
+
383
+ pm25_history.append(pred)
384
+
385
+ # Combine predictions with weather for monitoring insert
386
+ out = features_df.copy()
387
+ out["predicted_pm25"] = predictions
388
+ out["days_before_forecast_day"] = 1
389
+
390
+ # Pick metadata from AQ DF
391
+ out["street"] = air_quality_df.iloc[-1]["street"]
392
+ out["city"] = air_quality_df.iloc[-1]["city"]
393
+ out["country"] = air_quality_df.iloc[-1]["country"]
394
+
395
+ # Fix types
396
+ out["predicted_pm25"] = out["predicted_pm25"].astype(float)
397
+ out["days_before_forecast_day"] = out["days_before_forecast_day"].astype("int64")
398
+
399
+ # Insert monitoring data
400
+ monitor_fg.insert(out, write_options={"wait_for_job": True})
401
+
402
+ # Merge with pm25 ground truth for return
403
+ merged = pd.merge(
404
+ out,
405
+ air_quality_df[["date", "pm25"]],
406
+ on="date",
407
+ how="left"
408
+ )
409
+
410
+ return merged
app.py CHANGED
@@ -19,7 +19,7 @@ def load_resources():
19
  # Load Feature View
20
  fv = fs.get_feature_view(
21
  name="air_quality_fv",
22
- version=1
23
  )
24
  fv.init_batch_scoring(1)
25
 
@@ -28,7 +28,7 @@ def load_resources():
28
 
29
  # Load Model from Registry
30
  mr = project.get_model_registry()
31
- model_obj = mr.get_model("air_quality_xgboost_model", version=1)
32
  model_dir = model_obj.download()
33
 
34
  model = XGBRegressor()
 
19
  # Load Feature View
20
  fv = fs.get_feature_view(
21
  name="air_quality_fv",
22
+ version=2
23
  )
24
  fv.init_batch_scoring(1)
25
 
 
28
 
29
  # Load Model from Registry
30
  mr = project.get_model_registry()
31
+ model_obj = mr.get_model("air_quality_xgboost_model", version=2)
32
  model_dir = model_obj.download()
33
 
34
  model = XGBRegressor()