| | import requests |
| | import os |
| | import joblib |
| | import pandas as pd |
| | import datetime |
| | import numpy as np |
| | import time |
| | from sklearn.preprocessing import OrdinalEncoder |
| | from dotenv import load_dotenv |
| | load_dotenv(override=True) |
| |
|
| |
|
| | def decode_features(df, feature_view): |
| | """Decodes features in the input DataFrame using corresponding Hopsworks Feature Store transformation functions""" |
| | df_res = df.copy() |
| |
|
| | import inspect |
| |
|
| |
|
| | td_transformation_functions = feature_view._batch_scoring_server._transformation_functions |
| |
|
| | res = {} |
| | for feature_name in td_transformation_functions: |
| | if feature_name in df_res.columns: |
| | td_transformation_function = td_transformation_functions[feature_name] |
| | sig, foobar_locals = inspect.signature(td_transformation_function.transformation_fn), locals() |
| | param_dict = dict([(param.name, param.default) for param in sig.parameters.values() if param.default != inspect._empty]) |
| | if td_transformation_function.name == "min_max_scaler": |
| | df_res[feature_name] = df_res[feature_name].map( |
| | lambda x: x * (param_dict["max_value"] - param_dict["min_value"]) + param_dict["min_value"]) |
| |
|
| | elif td_transformation_function.name == "standard_scaler": |
| | df_res[feature_name] = df_res[feature_name].map( |
| | lambda x: x * param_dict['std_dev'] + param_dict["mean"]) |
| | elif td_transformation_function.name == "label_encoder": |
| | dictionary = param_dict['value_to_index'] |
| | dictionary_ = {v: k for k, v in dictionary.items()} |
| | df_res[feature_name] = df_res[feature_name].map( |
| | lambda x: dictionary_[x]) |
| | return df_res |
| |
|
| |
|
| | def get_model(project, model_name, evaluation_metric, sort_metrics_by): |
| | """Retrieve desired model or download it from the Hopsworks Model Registry. |
| | In second case, it will be physically downloaded to this directory""" |
| | TARGET_FILE = "model.pkl" |
| | list_of_files = [os.path.join(dirpath,filename) for dirpath, _, filenames \ |
| | in os.walk('.') for filename in filenames if filename == TARGET_FILE] |
| |
|
| | if list_of_files: |
| | model_path = list_of_files[0] |
| | model = joblib.load(model_path) |
| | else: |
| | if not os.path.exists(TARGET_FILE): |
| | mr = project.get_model_registry() |
| | |
| | model = mr.get_best_model(model_name, |
| | evaluation_metric, |
| | sort_metrics_by) |
| | model_dir = model.download() |
| | model = joblib.load(model_dir + "/model.pkl") |
| |
|
| | return model |
| |
|
| |
|
| | def get_air_quality_data(station_name): |
| | AIR_QUALITY_API_KEY = os.getenv('AIR_QUALITY_API_KEY') |
| | request_value = f'https://api.waqi.info/feed/{station_name}/?token={AIR_QUALITY_API_KEY}' |
| | answer = requests.get(request_value).json()["data"] |
| | forecast = answer['forecast']['daily'] |
| | return [ |
| | answer["time"]["s"][:10], |
| | int(forecast['pm25'][0]['avg']), |
| | int(forecast['pm10'][0]['avg']), |
| | max(int(forecast['pm25'][0]['avg']), int(forecast['pm10'][0]['avg'])) |
| | ] |
| |
|
| | def get_air_quality_df(data): |
| | col_names = [ |
| | 'date', |
| | 'pm25', |
| | 'pm10', |
| | 'aqi' |
| | ] |
| |
|
| | new_data = pd.DataFrame( |
| | data |
| | ).T |
| | new_data.columns = col_names |
| | new_data['pm25'] = pd.to_numeric(new_data['pm25']) |
| | new_data['pm10'] = pd.to_numeric(new_data['pm10']) |
| | new_data['aqi'] = pd.to_numeric(new_data['aqi']) |
| |
|
| | print(new_data) |
| | return new_data |
| |
|
| |
|
| | def get_weather_data_daily(city): |
| | WEATHER_API_KEY = os.getenv('WEATHER_API_KEY') |
| | answer = requests.get(f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{city}/today?unitGroup=metric&include=days&key={WEATHER_API_KEY}&contentType=json').json() |
| | data = answer['days'][0] |
| | return [ |
| | answer['city'], |
| | data['date'], |
| | data['tempmax'], |
| | data['tempmin'], |
| | data['temp'], |
| | data['feelslikemax'], |
| | data['feelslikemin'], |
| | data['feelslike'], |
| | data['dew'], |
| | data['humidity'], |
| | data['precip'], |
| | data['precipprob'], |
| | data['precipcover'], |
| | data['snow'], |
| | data['snowdepth'], |
| | data['windgust'], |
| | data['windspeed'], |
| | data['winddir'], |
| | data['pressure'], |
| | data['cloudcover'], |
| | data['visibility'], |
| | data['solarradiation'], |
| | data['solarenergy'], |
| | data['uvindex'], |
| | data['conditions'] |
| | ] |
| | |
| | def get_weather_data_weekly(city: str, start_date: datetime) -> pd.DataFrame: |
| | |
| | |
| | next7days_weather=pd.read_csv('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/Beijing/next7days?unitGroup=metric&include=days&key=5WNL2M94KKQ4R4F32LFV8DPE4&contentType=csv') |
| | |
| | |
| | |
| | df_weather = pd.DataFrame(next7days_weather) |
| | df_weather.rename(columns = {"datetime": "date"}, |
| | inplace = True) |
| | df_weather.rename(columns = {"name": "city"}, |
| | inplace = True) |
| | df_weather.rename(columns = {"sealevelpressure": "pressure"}, |
| | inplace = True) |
| | df_weather = df_weather.drop(labels=['city','dew','precip','tempmax','pressure','tempmin','temp','feelslikemax','feelslikemin','feelslike','precipprob','precipcover','snow','snowdepth','cloudcover','severerisk','moonphase','preciptype','sunrise','sunset','conditions','description','icon','stations'], axis=1) |
| |
|
| | return df_weather |
| |
|
| | def get_weather_df(data): |
| | col_names = [ |
| | 'name', |
| | 'date', |
| | 'tempmax', |
| | 'tempmin', |
| | 'temp', |
| | 'feelslikemax', |
| | 'feelslikemin', |
| | 'feelslike', |
| | 'dew', |
| | 'humidity', |
| | 'precip', |
| | 'precipprob', |
| | 'precipcover', |
| | 'snow', |
| | 'snowdepth', |
| | 'windgust', |
| | 'windspeed', |
| | 'winddir', |
| | 'pressure', |
| | 'cloudcover', |
| | 'visibility', |
| | 'solarradiation', |
| | 'solarenergy', |
| | 'uvindex', |
| | 'conditions' |
| | ] |
| |
|
| | new_data = pd.DataFrame( |
| | data |
| | ).T |
| | new_data.columns = col_names |
| | for col in col_names: |
| | if col not in ['name', 'date', 'conditions']: |
| | new_data[col] = pd.to_numeric(new_data[col]) |
| |
|
| | return new_data |
| |
|
| | def data_encoder(X): |
| | X.drop(columns=['date', 'name'], inplace=True) |
| | X['conditions'] = OrdinalEncoder().fit_transform(X[['conditions']]) |
| | return X |
| |
|
| | def transform(df): |
| | df.loc[df["windgust"].isna(),'windgust'] = df['windspeed'] |
| | df['snow'].fillna(0,inplace=True) |
| | df['snowdepth'].fillna(0, inplace=True) |
| | df['pressure'].fillna(df['pressure'].mean(), inplace=True) |
| | return df |
| |
|
| |
|
| | def get_aplevel(temps:np.ndarray) -> list: |
| | boundary_list = np.array([0, 50, 100, 150, 200, 300]) |
| | redf = np.logical_not(temps<=boundary_list) |
| | hift = np.concatenate((np.roll(redf, -1)[:, :-1], np.full((temps.shape[0], 1), False)), axis = 1) |
| | cat = np.nonzero(np.not_equal(redf,hift)) |
| |
|
| | air_pollution_level = ['Good', 'Moderate', 'Unhealthy for sensitive Groups','Unhealthy' ,'Very Unhealthy', 'Hazardous'] |
| | level = [air_pollution_level[el] for el in cat[1]] |
| | return level |
| | |
| | def timestamp_2_time(x): |
| | dt_obj = datetime.datetime.strptime(str(x), '%Y-%m-%d') |
| | dt_obj = dt_obj.timestamp() * 1000 |
| | return int(dt_obj) |