predictive_irrigation_models / pipelines /soilcast_pipeline.py
paolog-fbk's picture
Upload folder using huggingface_hub
64ab846 verified
from prefect import task
import pandas as pd
from prefect.logging import get_run_logger
import yaml
import pickle
with open('config/params.yml') as file:
config = yaml.safe_load(file)
data_availability = config['data_availability']
datetime_col = config['datetime_col']
value_col = config['value_col']
datastream_id_col = config['datastream_id_col']
datastream_name_col = config['datastream_name_col']
### Helper functions
def look_for_value(df, sensor, date):
tmp = df[(df[datastream_name_col] == sensor)&(df[datetime_col] == date)][value_col]
if len(tmp) == 1:
return tmp.values[0]
else:
return None
@task(task_run_name="read_input_data_{consortium_name}")
def read_input_data(consortium_name, has_weather_data=True, has_crop_data=True, has_soil_data=True, has_remote_sensing_data=True):
full_table = pd.read_parquet(f'data//04_model_input//full_table_{consortium_name}.parquet')
if has_crop_data:
with open(f'data//03_primary//crop_type_data_{consortium_name}.pickle', 'rb') as handle:
crop_type_data = pickle.load(handle)
else:
crop_type_data = None
if has_soil_data:
soil_type_data = pd.read_parquet(f'data//03_primary//soil_type_data_{consortium_name}.parquet')
else:
soil_type_data = None
return full_table, crop_type_data, soil_type_data
@task(task_run_name="data_preparation")
def data_preparation(full_table, crop_type_data, soil_type_data):
logger = get_run_logger()
features_to_be_removed = {'Active_Lime_(permil/Note)',
'Horizon',
'CEC_(cmol/kg)',
'Hrz_Lower_Limit_(cm)',
'Organic_C',
'RZD_Modal_(cm)',
'Skeletal',
}
feature_cols = list(set(full_table.columns) - features_to_be_removed)
full_table = full_table.dropna(subset=feature_cols)
tensiometers = full_table[full_table['sensor_type'] == 'Soil Moisture Tension'].drop(columns=['sensor_type', 'irrigator'])
elmed = full_table[full_table['sensor_type'] != 'Soil Moisture Tension'].drop(columns=['sensor_type', 'irrigator'])
if not soil_type_data is None:
tensiometers = tensiometers.merge(
soil_type_data.drop(columns=[datastream_id_col, 'uc', 'soil_type'] + list(set(soil_type_data.columns).intersection(features_to_be_removed))),
on = [datastream_name_col],
how = 'left'
)
crop_df = pd.DataFrame(
columns = ['Name', 'Aer', 'CC0', 'CCmin', 'CDC', 'CGC'],
data = [[crop_type_data.Name, crop_type_data.Aer, crop_type_data.CC0, crop_type_data.CCmin, crop_type_data.CDC, crop_type_data.CGC]]
)
tensiometers = tensiometers.join(
crop_df,
how='cross'
)
if all(tensiometers['associated_sensors'].transform(lambda x: len(x)) == 1):
tensiometers['associated_value'] = tensiometers.apply(
lambda row: look_for_value(tensiometers, row['associated_sensors'][0], row[datetime_col]),
axis = 1
)
tensiometers = tensiometers.drop(columns=['associated_sensors'])
features_to_be_removed = features_to_be_removed | {datetime_col, datastream_name_col, 'associated_sensors', 'irrigator', 'sensor_type', 'Name'}
feature_cols = set(tensiometers.columns) - features_to_be_removed
for col in feature_cols:
if len(tensiometers[col]) > len(tensiometers[col].dropna()):
feature_cols = feature_cols - {col}
logger.info(f'Need to drop column {col}')
feature_cols = list(feature_cols)
assert len(tensiometers[feature_cols]) == len(tensiometers[feature_cols].dropna())
return tensiometers, elmed, feature_cols
@task(task_run_name="remove_feature_inf_zero_std")
def remove_feature_inf_zero_std(mean_std_df, feature_cols):
logger = get_run_logger()
mean_std_df = mean_std_df.loc[feature_cols]
inf_std_cols = set(mean_std_df[mean_std_df['std'].isna()].index)
zero_std_cols = set(mean_std_df[mean_std_df['std'] == 0].index)
logger.info(f'CAREFUL: columns {inf_std_cols} are being removed because they have NaN standard deviation.')
logger.info(f'CAREFUL: columns {zero_std_cols} are being removed because they have 0 standard deviation.')
feature_cols = list(set(feature_cols) - inf_std_cols - zero_std_cols)
mean_std_df = mean_std_df.loc[feature_cols]
return mean_std_df, feature_cols