import os import datetime import time import requests import pandas as pd import json from geopy.geocoders import Nominatim import matplotlib.pyplot as plt from matplotlib.patches import Patch from matplotlib.ticker import MultipleLocator import openmeteo_requests import requests_cache from retry_requests import retry import hopsworks import hsfs from pathlib import Path import numpy as np def get_historical_weather(city, start_date, end_date, latitude, longitude): # latitude, longitude = get_city_coordinates(city) # Setup the Open-Meteo API client with cache and retry on error cache_session = requests_cache.CachedSession('.cache', expire_after = -1) retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) openmeteo = openmeteo_requests.Client(session = retry_session) # Make sure all required weather variables are listed here # The order of variables in hourly or daily is important to assign them correctly below url = "https://archive-api.open-meteo.com/v1/archive" params = { "latitude": latitude, "longitude": longitude, "start_date": start_date, "end_date": end_date, "daily": ["temperature_2m_mean", "precipitation_sum", "wind_speed_10m_max", "wind_direction_10m_dominant"] } responses = openmeteo.weather_api(url, params=params) # Process first location. Add a for-loop for multiple locations or weather models response = responses[0] print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E") print(f"Elevation {response.Elevation()} m asl") print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}") print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s") # Process daily data. The order of variables needs to be the same as requested. daily = response.Daily() daily_temperature_2m_mean = daily.Variables(0).ValuesAsNumpy() daily_precipitation_sum = daily.Variables(1).ValuesAsNumpy() daily_wind_speed_10m_max = daily.Variables(2).ValuesAsNumpy() daily_wind_direction_10m_dominant = daily.Variables(3).ValuesAsNumpy() daily_data = {"date": pd.date_range( start = pd.to_datetime(daily.Time(), unit = "s"), end = pd.to_datetime(daily.TimeEnd(), unit = "s"), freq = pd.Timedelta(seconds = daily.Interval()), inclusive = "left" )} daily_data["temperature_2m_mean"] = daily_temperature_2m_mean daily_data["precipitation_sum"] = daily_precipitation_sum daily_data["wind_speed_10m_max"] = daily_wind_speed_10m_max daily_data["wind_direction_10m_dominant"] = daily_wind_direction_10m_dominant daily_dataframe = pd.DataFrame(data = daily_data) daily_dataframe = daily_dataframe.dropna() daily_dataframe['city'] = city return daily_dataframe def get_hourly_weather_forecast(city, latitude, longitude): # latitude, longitude = get_city_coordinates(city) # Setup the Open-Meteo API client with cache and retry on error cache_session = requests_cache.CachedSession('.cache', expire_after = 3600) retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) openmeteo = openmeteo_requests.Client(session = retry_session) # Make sure all required weather variables are listed here # The order of variables in hourly or daily is important to assign them correctly below url = "https://api.open-meteo.com/v1/ecmwf" params = { "latitude": latitude, "longitude": longitude, "hourly": ["temperature_2m", "precipitation", "wind_speed_10m", "wind_direction_10m"] } responses = openmeteo.weather_api(url, params=params) # Process first location. Add a for-loop for multiple locations or weather models response = responses[0] print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E") print(f"Elevation {response.Elevation()} m asl") print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}") print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s") # Process hourly data. The order of variables needs to be the same as requested. hourly = response.Hourly() hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy() hourly_precipitation = hourly.Variables(1).ValuesAsNumpy() hourly_wind_speed_10m = hourly.Variables(2).ValuesAsNumpy() hourly_wind_direction_10m = hourly.Variables(3).ValuesAsNumpy() hourly_data = {"date": pd.date_range( start = pd.to_datetime(hourly.Time(), unit = "s"), end = pd.to_datetime(hourly.TimeEnd(), unit = "s"), freq = pd.Timedelta(seconds = hourly.Interval()), inclusive = "left" )} hourly_data["temperature_2m_mean"] = hourly_temperature_2m hourly_data["precipitation_sum"] = hourly_precipitation hourly_data["wind_speed_10m_max"] = hourly_wind_speed_10m hourly_data["wind_direction_10m_dominant"] = hourly_wind_direction_10m hourly_dataframe = pd.DataFrame(data = hourly_data) hourly_dataframe = hourly_dataframe.dropna() return hourly_dataframe def get_city_coordinates(city_name: str): """ Takes city name and returns its latitude and longitude (rounded to 2 digits after dot). """ # Initialize Nominatim API (for getting lat and long of the city) geolocator = Nominatim(user_agent="Johannes-MLFS-Lab (jdunkars@kth.se)") city = geolocator.geocode(city_name) latitude = round(city.latitude, 2) longitude = round(city.longitude, 2) return latitude, longitude def trigger_request(url:str): response = requests.get(url) if response.status_code == 200: # Extract the JSON content from the response data = response.json() else: print("Failed to retrieve data. Status Code:", response.status_code) raise requests.exceptions.RequestException(response.status_code) return data def get_pm25(aqicn_url: str, country: str, city: str, street: str, day: datetime.date, AQI_API_KEY: str): """ Returns DataFrame with air quality (pm25) as dataframe """ # The API endpoint URL url = f"{aqicn_url}/?token={AQI_API_KEY}" # Make a GET request to fetch the data from the API data = trigger_request(url) # if we get 'Unknown station' response then retry with city in url if data['data'] == "Unknown station": url1 = f"https://api.waqi.info/feed/{country}/{street}/?token={AQI_API_KEY}" data = trigger_request(url1) if data['data'] == "Unknown station": url2 = f"https://api.waqi.info/feed/{country}/{city}/{street}/?token={AQI_API_KEY}" data = trigger_request(url2) # Check if the API response contains the data if data['status'] == 'ok': # Extract the air quality data aqi_data = data['data'] aq_today_df = pd.DataFrame() aq_today_df['pm25'] = [aqi_data['iaqi'].get('pm25', {}).get('v', None)] aq_today_df['pm25'] = aq_today_df['pm25'].astype('float32') aq_today_df['country'] = country aq_today_df['city'] = city aq_today_df['street'] = street aq_today_df['date'] = day aq_today_df['date'] = pd.to_datetime(aq_today_df['date']) aq_today_df['url'] = aqicn_url else: print("Error: There may be an incorrect URL for your Sensor or it is not contactable right now. The API response does not contain data. Error message:", data['data']) raise requests.exceptions.RequestException(data['data']) return aq_today_df def get_pm25_test(aqicn_url: str, country: str, city: str, street: str, day: datetime.date, AQI_API_KEY: str): """ Returns DataFrame with air quality (pm25) as dataframe """ print("▶ Starting get_pm25()") print(f"URL base: {aqicn_url}") print(f"Country={country}, City={city}, Street={street}, Date={day}") # 1️⃣ First try url = f"{aqicn_url}/?token={AQI_API_KEY}" print(f"Trying main URL: {url}") try: data = trigger_request(url) print("✔ First request succeeded") except Exception as e: print("❌ First request failed:", e) raise # 2️⃣ Retry with other URLs if “Unknown station” if data.get("data") == "Unknown station": print("⚠ Unknown station, retrying with country/street...") url1 = f"https://api.waqi.info/feed/{country}/{street}/?token={AQI_API_KEY}" data = trigger_request(url1) print("✔ Second request done") if data.get("data") == "Unknown station": print("⚠ Still unknown, retrying with country/city/street...") url2 = f"https://api.waqi.info/feed/{country}/{city}/{street}/?token={AQI_API_KEY}" data = trigger_request(url2) print("✔ Third request done") # 3️⃣ Check result if data.get("status") == "ok": print("✅ API responded OK") aqi_data = data["data"] aq_today_df = pd.DataFrame() aq_today_df["pm25"] = [aqi_data["iaqi"].get("pm25", {}).get("v", None)] aq_today_df["pm25"] = aq_today_df["pm25"].astype("float32") aq_today_df["country"] = country aq_today_df["city"] = city aq_today_df["street"] = street aq_today_df["date"] = pd.to_datetime(day) aq_today_df["url"] = aqicn_url print("✅ DataFrame created successfully") return aq_today_df else: print("❌ Error: API response invalid or no data.") print("Response content:", data) raise requests.exceptions.RequestException(data.get("data")) def plot_air_quality_forecast(city: str, street: str, df: pd.DataFrame, file_path: str, hindcast=False): fig, ax = plt.subplots(figsize=(10, 6)) day = pd.to_datetime(df['date']).dt.date # Plot each column separately in matplotlib ax.plot(day, df['predicted_pm25'], label='Predicted PM2.5', color='red', linewidth=2, marker='o', markersize=5, markerfacecolor='blue') # Set the y-axis to a logarithmic scale ax.set_yscale('log') ax.set_yticks([0, 10, 25, 50, 100, 250, 500]) ax.get_yaxis().set_major_formatter(plt.ScalarFormatter()) ax.set_ylim(bottom=1) # Set the labels and title ax.set_xlabel('Date') ax.set_title(f"PM2.5 Predicted (Logarithmic Scale) for {city}, {street}") ax.set_ylabel('PM2.5') colors = ['green', 'yellow', 'orange', 'red', 'purple', 'darkred'] labels = ['Good', 'Moderate', 'Unhealthy for Some', 'Unhealthy', 'Very Unhealthy', 'Hazardous'] ranges = [(0, 49), (50, 99), (100, 149), (150, 199), (200, 299), (300, 500)] for color, (start, end) in zip(colors, ranges): ax.axhspan(start, end, color=color, alpha=0.3) # Add a legend for the different Air Quality Categories patches = [Patch(color=colors[i], label=f"{labels[i]}: {ranges[i][0]}-{ranges[i][1]}") for i in range(len(colors))] legend1 = ax.legend(handles=patches, loc='upper right', title="Air Quality Categories", fontsize='x-small') # Aim for ~10 annotated values on x-axis, will work for both forecasts ans hindcasts if len(df.index) > 11: every_x_tick = len(df.index) / 10 ax.xaxis.set_major_locator(MultipleLocator(every_x_tick)) plt.xticks(rotation=45) if hindcast == True: ax.plot(day, df['pm25'], label='Actual PM2.5', color='black', linewidth=2, marker='^', markersize=5, markerfacecolor='grey') legend2 = ax.legend(loc='upper left', fontsize='x-small') ax.add_artist(legend1) # Ensure everything is laid out neatly plt.tight_layout() # # Save the figure, overwriting any existing file with the same name plt.savefig(file_path) return plt def delete_feature_groups(fs, name): try: for fg in fs.get_feature_groups(name): fg.delete() print(f"Deleted {fg.name}/{fg.version}") except hsfs.client.exceptions.RestAPIError: print(f"No {name} feature group found") def delete_feature_views(fs, name): try: for fv in fs.get_feature_views(name): fv.delete() print(f"Deleted {fv.name}/{fv.version}") except hsfs.client.exceptions.RestAPIError: print(f"No {name} feature view found") def delete_models(mr, name): models = mr.get_models(name) if not models: print(f"No {name} model found") for model in models: model.delete() print(f"Deleted model {model.name}/{model.version}") def delete_secrets(proj, name): secrets = secrets_api(proj.name) try: secret = secrets.get_secret(name) secret.delete() print(f"Deleted secret {name}") except hopsworks.client.exceptions.RestAPIError: print(f"No {name} secret found") # WARNING - this will wipe out all your feature data and models def purge_project(proj): fs = proj.get_feature_store() mr = proj.get_model_registry() # Delete Feature Views before deleting the feature groups delete_feature_views(fs, "air_quality_fv") # Delete ALL Feature Groups delete_feature_groups(fs, "air_quality") delete_feature_groups(fs, "weather") delete_feature_groups(fs, "aq_predictions") # Delete all Models delete_models(mr, "air_quality_xgboost_model") delete_secrets(proj, "SENSOR_LOCATION_JSON") def check_file_path(file_path): my_file = Path(file_path) if my_file.is_file() == False: print(f"Error. File not found at the path: {file_path} ") else: print(f"File successfully found at the path: {file_path}") def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model): # 1. Load some past weather features_df = weather_fg.read().sort_values("date") features_df = features_df.tail(10).reset_index(drop=True) # 2. Load past PM25 (for building lag features) air_quality_df = air_quality_df.sort_values("date").reset_index(drop=True) pm25_series = air_quality_df["pm25"].values # Need last 3 pm25 values to start autoregressive cycle pm25_history = list(pm25_series[-3:]) predictions = [] for i in range(len(features_df)): # Weather for this day w = features_df.iloc[i] # Build lag features lag1 = pm25_history[-1] lag2 = pm25_history[-2] lag3 = pm25_history[-3] roll_mean = np.mean(pm25_history[-3:]) roll_std = np.std(pm25_history[-3:]) X = pd.DataFrame({ "temperature_2m_mean": [w["temperature_2m_mean"]], "precipitation_sum": [w["precipitation_sum"]], "wind_speed_10m_max": [w["wind_speed_10m_max"]], "wind_direction_10m_dominant": [w["wind_direction_10m_dominant"]], "pm25_lag1": [lag1], "pm25_lag2": [lag2], "pm25_lag3": [lag3], "pm25_roll3_mean": [roll_mean], "pm25_roll3_std": [roll_std] }) pred = float(model.predict(X)[0]) predictions.append(pred) pm25_history.append(pred) # Combine predictions with weather for monitoring insert out = features_df.copy() out["predicted_pm25"] = predictions out["days_before_forecast_day"] = 1 # Pick metadata from AQ DF out["street"] = air_quality_df.iloc[-1]["street"] out["city"] = air_quality_df.iloc[-1]["city"] out["country"] = air_quality_df.iloc[-1]["country"] # Fix types out["predicted_pm25"] = out["predicted_pm25"].astype(float) out["days_before_forecast_day"] = out["days_before_forecast_day"].astype("int64") # Insert monitoring data monitor_fg.insert(out, write_options={"wait_for_job": True}) # Merge with pm25 ground truth for return merged = pd.merge( out, air_quality_df[["date", "pm25"]], on="date", how="left" ) return merged