| from prefect import task
|
| import pandas as pd
|
| from prefect.logging import get_run_logger
|
| import yaml
|
| import pickle
|
|
|
| with open('config/params.yml') as file:
|
| config = yaml.safe_load(file)
|
|
|
| data_availability = config['data_availability']
|
|
|
| datetime_col = config['datetime_col']
|
| value_col = config['value_col']
|
| datastream_id_col = config['datastream_id_col']
|
| datastream_name_col = config['datastream_name_col']
|
|
|
|
|
|
|
|
|
| def look_for_value(df, sensor, date):
|
| tmp = df[(df[datastream_name_col] == sensor)&(df[datetime_col] == date)][value_col]
|
| if len(tmp) == 1:
|
| return tmp.values[0]
|
| else:
|
| return None
|
|
|
| @task(task_run_name="read_input_data_{consortium_name}")
|
| def read_input_data(consortium_name, has_weather_data=True, has_crop_data=True, has_soil_data=True, has_remote_sensing_data=True):
|
| full_table = pd.read_parquet(f'data//04_model_input//full_table_{consortium_name}.parquet')
|
|
|
| if has_crop_data:
|
| with open(f'data//03_primary//crop_type_data_{consortium_name}.pickle', 'rb') as handle:
|
| crop_type_data = pickle.load(handle)
|
| else:
|
| crop_type_data = None
|
|
|
| if has_soil_data:
|
| soil_type_data = pd.read_parquet(f'data//03_primary//soil_type_data_{consortium_name}.parquet')
|
| else:
|
| soil_type_data = None
|
|
|
| return full_table, crop_type_data, soil_type_data
|
|
|
|
|
| @task(task_run_name="data_preparation")
|
| def data_preparation(full_table, crop_type_data, soil_type_data):
|
| logger = get_run_logger()
|
|
|
| features_to_be_removed = {'Active_Lime_(permil/Note)',
|
| 'Horizon',
|
| 'CEC_(cmol/kg)',
|
| 'Hrz_Lower_Limit_(cm)',
|
| 'Organic_C',
|
| 'RZD_Modal_(cm)',
|
| 'Skeletal',
|
| }
|
|
|
| feature_cols = list(set(full_table.columns) - features_to_be_removed)
|
|
|
| full_table = full_table.dropna(subset=feature_cols)
|
| tensiometers = full_table[full_table['sensor_type'] == 'Soil Moisture Tension'].drop(columns=['sensor_type', 'irrigator'])
|
| elmed = full_table[full_table['sensor_type'] != 'Soil Moisture Tension'].drop(columns=['sensor_type', 'irrigator'])
|
|
|
| if not soil_type_data is None:
|
| tensiometers = tensiometers.merge(
|
| soil_type_data.drop(columns=[datastream_id_col, 'uc', 'soil_type'] + list(set(soil_type_data.columns).intersection(features_to_be_removed))),
|
| on = [datastream_name_col],
|
| how = 'left'
|
| )
|
| crop_df = pd.DataFrame(
|
| columns = ['Name', 'Aer', 'CC0', 'CCmin', 'CDC', 'CGC'],
|
| data = [[crop_type_data.Name, crop_type_data.Aer, crop_type_data.CC0, crop_type_data.CCmin, crop_type_data.CDC, crop_type_data.CGC]]
|
| )
|
| tensiometers = tensiometers.join(
|
| crop_df,
|
| how='cross'
|
| )
|
|
|
| if all(tensiometers['associated_sensors'].transform(lambda x: len(x)) == 1):
|
| tensiometers['associated_value'] = tensiometers.apply(
|
| lambda row: look_for_value(tensiometers, row['associated_sensors'][0], row[datetime_col]),
|
| axis = 1
|
| )
|
| tensiometers = tensiometers.drop(columns=['associated_sensors'])
|
|
|
| features_to_be_removed = features_to_be_removed | {datetime_col, datastream_name_col, 'associated_sensors', 'irrigator', 'sensor_type', 'Name'}
|
| feature_cols = set(tensiometers.columns) - features_to_be_removed
|
|
|
| for col in feature_cols:
|
| if len(tensiometers[col]) > len(tensiometers[col].dropna()):
|
| feature_cols = feature_cols - {col}
|
| logger.info(f'Need to drop column {col}')
|
|
|
| feature_cols = list(feature_cols)
|
| assert len(tensiometers[feature_cols]) == len(tensiometers[feature_cols].dropna())
|
|
|
| return tensiometers, elmed, feature_cols
|
|
|
|
|
| @task(task_run_name="remove_feature_inf_zero_std")
|
| def remove_feature_inf_zero_std(mean_std_df, feature_cols):
|
| logger = get_run_logger()
|
|
|
| mean_std_df = mean_std_df.loc[feature_cols]
|
| inf_std_cols = set(mean_std_df[mean_std_df['std'].isna()].index)
|
| zero_std_cols = set(mean_std_df[mean_std_df['std'] == 0].index)
|
| logger.info(f'CAREFUL: columns {inf_std_cols} are being removed because they have NaN standard deviation.')
|
| logger.info(f'CAREFUL: columns {zero_std_cols} are being removed because they have 0 standard deviation.')
|
| feature_cols = list(set(feature_cols) - inf_std_cols - zero_std_cols)
|
| mean_std_df = mean_std_df.loc[feature_cols]
|
|
|
| return mean_std_df, feature_cols |