predictive_irrigation_models / pipelines /aquacrop_pipeline.py
paolog-fbk's picture
Upload folder using huggingface_hub
64ab846 verified
from prefect import flow, task
from prefect.logging import get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
import json
import os
import datetime
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pipelines.preprocessing_pipeline import save_to_file
from pipelines.model_preparation_pipeline import get_consortia
from prefect.logging import get_run_logger
import yaml
from aquacrop import AquaCropModel, Crop, Soil, InitialWaterContent, IrrigationManagement
with open('config/params.yml') as file:
config = yaml.safe_load(file)
with open('config/aquacrop_params.yml') as file:
config_aquacrop = yaml.safe_load(file)
datetime_col = config['datetime_col']
value_col = config['value_col']
datastream_id_col = config['datastream_id_col']
datastream_name_col = config['datastream_name_col']
sensor_type_col = config['sensor_type_col']
ground_offset_col = config['ground_offset_col']
resampling_window = config['resampling_window']
sensors_forecasted = config_aquacrop['sensor_forecasted']
strategy = config_aquacrop['strategy']
### Pipeline nodes
@task(task_run_name="read_input_data_{consortium_name}_{sensor_forecasted}")
def read_input_data(consortium_name, sensor_forecasted):
weather_aquacrop = pd.read_parquet(f'data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//weather_aquacrop.parquet')
if strategy == 'real' or strategy == 'hybrid':
irrigation_aquacrop = pd.read_parquet(f'data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//irrigation_aquacrop.parquet')
else:
irrigation_aquacrop = None
with open(f'data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//aquacrop_settings.json', 'r') as f:
aquacrop_settings = json.load(f)
soil_type = aquacrop_settings['soil_type']
crop_type = aquacrop_settings['crop_type']
planting_date = aquacrop_settings['planting_date']
harvest_date = aquacrop_settings['harvest_date']
reference_date = aquacrop_settings['reference_date']
sim_start_date = aquacrop_settings['sim_start_date']
sim_end_date = aquacrop_settings['sim_end_date']
SMT = aquacrop_settings['SMT']
MaxIrr = aquacrop_settings['MaxIrr']
return weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr
def final_stats(model, df, crop_type, season=0):
try:
final_stats = getattr(model._outputs, "final_stats", None)
except:
final_stats = None
if isinstance(final_stats, pd.DataFrame) and len(final_stats) >= 1:
return final_stats
def safe_sum(df, col):
if col in df.columns:
return pd.to_numeric(df[col], errors='coerce').sum(skipna=True)
return 0.0
out_df = pd.DataFrame([{
'Season': int(season),
'crop Type': crop_type,
'Harvest Date (YYYY/MM/DD)': None,
'Harvest Date (Step)': None,
'Dry yield (tonne/ha)': safe_sum(df, 'DryYield'),
'Fresh yield (tonne/ha)': safe_sum(df, 'FreshYield'),
'Yield potential (tonne/ha)': safe_sum(df, 'YieldPot'),
'Seasonal irrigation (mm)': safe_sum(df, 'Irrigation')
}])
return out_df
@task(task_run_name="aquacrop_run", retries=0, log_prints=False)
def aquacrop_run(weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr, strategy):
logger = get_run_logger()
try:
if strategy == 'aquacrop':
initial_irrigation_strategy = 1 # aquacrop suggestions
else:
initial_irrigation_strategy = 5 # real irrigation
results = []
model = AquaCropModel(
sim_start_time=sim_start_date,
sim_end_time=sim_end_date,
weather_df=weather_aquacrop,
soil=Soil(soil_type),
crop=Crop(crop_type, planting_date=planting_date, harvest_date=harvest_date),
initial_water_content=InitialWaterContent(value=['FC']),
irrigation_management=IrrigationManagement(irrigation_method=initial_irrigation_strategy, SMT=SMT, MaxIrr=MaxIrr)
)
model._initialize()
# Strategy: AquaCrop
if strategy == 'aquacrop':
while model._clock_struct.model_is_finished is False:
# t = model._clock_struct.time_step_counter
cur_date = pd.to_datetime(model._clock_struct.step_start_time).date()
model.run_model(initialize_model=False)
depletion = model._init_cond.depletion # initial depletion is 0 (beginning of start_sim_date)
taw = model._init_cond.taw
perc_dep = (depletion / taw * 100) if taw > 0 else 0
results.append({
'Date': cur_date,
'Depletion': depletion,
'TAW': taw,
'Percentage Depletion': perc_dep
})
# Strategy: Real Irrigation
elif strategy == 'real':
irrigation_aquacrop["Date"] = pd.to_datetime(irrigation_aquacrop["Date"]).dt.date
schedule_map = {d: mm for d, mm in zip(irrigation_aquacrop["Date"], irrigation_aquacrop["Depth"])}
while model._clock_struct.model_is_finished is False:
#t = model._clock_struct.time_step_counter
cur_date = pd.to_datetime(model._clock_struct.step_start_time).date()
depth_today = 0.0
model.run_model(initialize_model=False)
depletion = model._init_cond.depletion
taw = model._init_cond.taw
perc_dep = (depletion / taw * 100) if taw > 0 else 0
if cur_date in schedule_map:
depth_today = float(schedule_map[cur_date])
model._param_struct.IrrMngt.depth = depth_today
results.append({
'Date': cur_date,
'Depletion': depletion,
'TAW': taw,
'Percentage Depletion': perc_dep
})
# Strategy: Real + AquaCrop (hybrid)
elif strategy == 'hybrid':
irrigation_aquacrop["Date"] = pd.to_datetime(irrigation_aquacrop["Date"]).dt.date
schedule_map = {d: mm for d, mm in zip(irrigation_aquacrop["Date"], irrigation_aquacrop["Depth"])}
reference_date_date = datetime.datetime.strptime(reference_date, "%Y-%m-%d").date()
schedule_map_partial = {
d: v for d, v in schedule_map.items()
if d <= reference_date_date
}
while model._clock_struct.model_is_finished is False:
#t = model._clock_struct.time_step_counter
cur_date = pd.to_datetime(model._clock_struct.step_start_time).date()
if cur_date <= reference_date_date:
model._param_struct.IrrMngt.Schedule = None
if cur_date in schedule_map_partial:
depth_today = float(schedule_map_partial[cur_date])
else:
depth_today = 0.0
model._param_struct.IrrMngt.depth = depth_today
elif cur_date > reference_date_date:
model._param_struct.IrrMngt.depth = None
model._param_struct.IrrMngt.irrigation_method = 1
model._param_struct.IrrMngt.SMT = SMT
model.run_model(initialize_model=False)
depletion = model._init_cond.depletion
taw = model._init_cond.taw
perc_dep = (depletion / taw * 100) if taw > 0 else 0
results.append({
'Date': cur_date,
'Depletion': depletion,
'TAW': taw,
'Percentage Depletion': perc_dep
})
else:
raise ValueError(f"Invalid strategy: {strategy}")
depletion_df = pd.DataFrame(results)
depletion_df['Date'] = pd.to_datetime(depletion_df['Date'])
water_flux = model._outputs.water_flux
date_range = pd.date_range(start=sim_start_date, end=sim_end_date, freq='D')
water_flux.insert(0, 'Date', date_range)
water_flux = water_flux.iloc[:-1]
water_storage = model._outputs.water_storage
date_range = pd.date_range(start=sim_start_date, end=sim_end_date, freq='D')
water_storage.insert(0, 'Date', date_range)
water_storage = water_storage.iloc[:-1]
crop_growth = model._outputs.crop_growth
date_range = pd.date_range(start=sim_start_date, end=sim_end_date, freq='D')
crop_growth.insert(0, 'Date', date_range)
crop_growth = crop_growth.iloc[:-1]
water_flux_subset = water_flux[['Date', 'IrrDay', 'Infl', 'Runoff']].rename(columns={'IrrDay': 'Irrigation'})
water_flux_subset['Precipitation'] = water_flux_subset['Infl'] + water_flux_subset['Runoff'] - water_flux_subset[
'Irrigation']
water_flux_subset = water_flux_subset[['Date', 'Irrigation', 'Precipitation']]
crop_growth_subset = crop_growth[
['Date', 'canopy_cover', 'canopy_cover_ns', 'biomass', 'biomass_ns', 'DryYield', 'YieldPot']].rename(columns={
'canopy_cover': 'CanopyCover',
'canopy_cover_ns': 'CanopyCoverPot',
'biomass': 'Biomass',
'biomass_ns': 'BiomassPot'
})
summary_season = depletion_df.merge(water_flux_subset, on='Date', how='left').merge(crop_growth_subset, on='Date', how='left')
summary_table = final_stats(model, summary_season, crop_type=crop_type, season=0)
return water_flux, water_storage, crop_growth, summary_season, summary_table
except AssertionError:
logger.error(f"AquaCrop failed due to invalid physical condition")
return None
except Exception as e:
logger.error(f"AquaCrop failed due to unexpected error: {e}")
return None
def show_soil_depletion(df, date_col, depl_col, irr_col, prec_col, threshold, planting_date=None, reference_date=None, output_file=None):
date_range = df[date_col]
fig = make_subplots(
rows=1, cols=1, specs=[[{"secondary_y": True}]]
)
# Percentage Depletion
fig.add_trace(
go.Scatter(
x=date_range,
y=df[depl_col],
mode='lines',
line=dict(color='black', width=2),
name='Percentage Depletion (%)',
hovertemplate='%{x|%d %b %Y}<br>Depletion: %{y:.2f} %<extra></extra>'
),
secondary_y=False
)
# Irrigation
fig.add_trace(
go.Bar(
x=date_range,
y=df[irr_col],
name='Irrigation',
marker_color='green',
opacity=0.7,
customdata=np.round(df[irr_col],2),
hovertemplate='Irrigation: %{customdata} mm<extra></extra>'
),
secondary_y=True
)
# Precipitation
fig.add_trace(
go.Bar(
x=date_range,
y=df[prec_col],
name='Precipitation',
marker_color='blue',
opacity=0.5,
customdata=np.round(df[prec_col],2),
hovertemplate='Precipitation: %{customdata} mm<extra></extra>'
),
secondary_y=True
)
# Horizontal lines for Thresholds
fig.add_trace(
go.Scatter(
x=[date_range.min(), date_range.max()],
y=[0, 0],
mode='lines',
line=dict(color='blue', width=3),
name='Lower Threshold (FC)'
),
secondary_y=False
)
fig.add_trace(
go.Scatter(
x=[date_range.min(), date_range.max()],
y=[threshold, threshold],
mode='lines',
line=dict(color='red', width=3),
name='Upper Threshold'
),
secondary_y=False
)
# Vertical lines for planting_date and reference_date
if planting_date is not None:
if isinstance(planting_date, str):
planting_date = pd.to_datetime(planting_date).value // 10**6
fig.add_vline(
x=planting_date,
line=dict(color='green', width=2, dash='dash'),
annotation_text="Planting Date",
annotation_position="top left"
)
if reference_date is not None:
if isinstance(reference_date, str):
reference_date = pd.to_datetime(reference_date).value // 10**6
fig.add_vline(
x=reference_date,
line=dict(color='orange', width=2, dash='dash'),
annotation_text="Reference Date",
annotation_position="top left"
)
# Axes
fig.update_yaxes(title_text="Percentage Depletion (%)", secondary_y=False)
fig.update_yaxes(title_text="Water Amount (mm)", secondary_y=True, showgrid=False)
fig.update_xaxes(
tickformat="%b %Y",
dtick="M1",
tickangle=0,
title_text="Date"
)
# Layout
fig.update_layout(
title="Soil Water Depletion Over Time",
barmode='overlay',
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
width=1400,
height=600
)
if output_file:
fig.write_image(output_file, scale=2)
fig.show()
def show_real_benchmark(df, date_col, col_real, col_benchmark, unit_name = 'Value', output_file=None):
date_range = pd.to_datetime(df[date_col])
fig = make_subplots(rows=1, cols=1, specs=[[{"secondary_y": False}]])
# Real / Simulated values
fig.add_trace(
go.Scatter(
x=date_range,
y=df[col_real],
mode='lines',
line=dict(color='black', width=2),
name=col_real
)
)
# Benchmark values
fig.add_trace(
go.Scatter(
x=date_range,
y=df[col_benchmark],
mode='lines',
line=dict(color='red', width=2, dash='dash'),
name=col_benchmark
)
)
# Axes
fig.update_yaxes(title_text=unit_name)
fig.update_xaxes(
tickformat="%b %Y",
dtick="M1",
tickangle=0,
title_text=date_col
)
# Layout
fig.update_layout(
title=f"{col_real} vs {col_benchmark} Over Time",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
width=1400,
height=600
)
if output_file:
fig.write_image(output_file, scale=2)
fig.show()
@flow(name='aquacrop_pipeline', retries=1, task_runner=ThreadPoolTaskRunner())
def aquacrop_pipeline() -> list[str]:
logger = get_run_logger()
logger.info(f'Starting model preparation pipeline!')
consortia = get_consortia()
for sensor_forecasted in sensors_forecasted:
for consortium_name in consortia:
parquet_path = f"data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//weather_aquacrop.parquet"
settings_path = f"data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//aquacrop_settings.json"
if os.path.exists(parquet_path) and os.path.exists(settings_path):
try:
weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr = read_input_data(consortium_name, sensor_forecasted)
water_flux, water_storage, crop_growth, summary_season, summary_table = aquacrop_run(weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr, strategy)
tables_to_save = {
"water_flux": water_flux,
"water_storage": water_storage,
"crop_growth": crop_growth,
"summary_season": summary_season,
"summary_table": summary_table
}
for name, df in tables_to_save.items():
output_file = f'data//06_aquacrop_output//{consortium_name}//{sensor_forecasted}//{name}.parquet'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
save_to_file(
df=df,
output_file=output_file
)
except Exception as e:
logger.error(f"Error running model for consortium {consortium_name} and sensor {sensor_forecasted}: {e}")
else:
continue
if __name__ == "__main__":
aquacrop_pipeline()