| from prefect import flow, task
|
| from prefect.logging import get_run_logger
|
| from prefect.task_runners import ThreadPoolTaskRunner
|
| import json
|
| import os
|
| import datetime
|
| import numpy as np
|
| import pandas as pd
|
|
|
| import plotly.graph_objects as go
|
| from plotly.subplots import make_subplots
|
|
|
| from pipelines.preprocessing_pipeline import save_to_file
|
| from pipelines.model_preparation_pipeline import get_consortia
|
| from prefect.logging import get_run_logger
|
| import yaml
|
|
|
| from aquacrop import AquaCropModel, Crop, Soil, InitialWaterContent, IrrigationManagement
|
|
|
|
|
| with open('config/params.yml') as file:
|
| config = yaml.safe_load(file)
|
|
|
| with open('config/aquacrop_params.yml') as file:
|
| config_aquacrop = yaml.safe_load(file)
|
|
|
| datetime_col = config['datetime_col']
|
| value_col = config['value_col']
|
| datastream_id_col = config['datastream_id_col']
|
| datastream_name_col = config['datastream_name_col']
|
| sensor_type_col = config['sensor_type_col']
|
| ground_offset_col = config['ground_offset_col']
|
| resampling_window = config['resampling_window']
|
|
|
| sensors_forecasted = config_aquacrop['sensor_forecasted']
|
| strategy = config_aquacrop['strategy']
|
|
|
|
|
|
|
|
|
| @task(task_run_name="read_input_data_{consortium_name}_{sensor_forecasted}")
|
| def read_input_data(consortium_name, sensor_forecasted):
|
| weather_aquacrop = pd.read_parquet(f'data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//weather_aquacrop.parquet')
|
| if strategy == 'real' or strategy == 'hybrid':
|
| irrigation_aquacrop = pd.read_parquet(f'data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//irrigation_aquacrop.parquet')
|
| else:
|
| irrigation_aquacrop = None
|
|
|
| with open(f'data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//aquacrop_settings.json', 'r') as f:
|
| aquacrop_settings = json.load(f)
|
| soil_type = aquacrop_settings['soil_type']
|
| crop_type = aquacrop_settings['crop_type']
|
| planting_date = aquacrop_settings['planting_date']
|
| harvest_date = aquacrop_settings['harvest_date']
|
| reference_date = aquacrop_settings['reference_date']
|
| sim_start_date = aquacrop_settings['sim_start_date']
|
| sim_end_date = aquacrop_settings['sim_end_date']
|
| SMT = aquacrop_settings['SMT']
|
| MaxIrr = aquacrop_settings['MaxIrr']
|
|
|
| return weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr
|
|
|
|
|
| def final_stats(model, df, crop_type, season=0):
|
| try:
|
| final_stats = getattr(model._outputs, "final_stats", None)
|
| except:
|
| final_stats = None
|
|
|
| if isinstance(final_stats, pd.DataFrame) and len(final_stats) >= 1:
|
| return final_stats
|
|
|
| def safe_sum(df, col):
|
| if col in df.columns:
|
| return pd.to_numeric(df[col], errors='coerce').sum(skipna=True)
|
| return 0.0
|
|
|
| out_df = pd.DataFrame([{
|
| 'Season': int(season),
|
| 'crop Type': crop_type,
|
| 'Harvest Date (YYYY/MM/DD)': None,
|
| 'Harvest Date (Step)': None,
|
| 'Dry yield (tonne/ha)': safe_sum(df, 'DryYield'),
|
| 'Fresh yield (tonne/ha)': safe_sum(df, 'FreshYield'),
|
| 'Yield potential (tonne/ha)': safe_sum(df, 'YieldPot'),
|
| 'Seasonal irrigation (mm)': safe_sum(df, 'Irrigation')
|
| }])
|
|
|
| return out_df
|
|
|
|
|
| @task(task_run_name="aquacrop_run", retries=0, log_prints=False)
|
| def aquacrop_run(weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr, strategy):
|
|
|
| logger = get_run_logger()
|
|
|
| try:
|
| if strategy == 'aquacrop':
|
| initial_irrigation_strategy = 1
|
| else:
|
| initial_irrigation_strategy = 5
|
|
|
| results = []
|
|
|
| model = AquaCropModel(
|
| sim_start_time=sim_start_date,
|
| sim_end_time=sim_end_date,
|
| weather_df=weather_aquacrop,
|
| soil=Soil(soil_type),
|
| crop=Crop(crop_type, planting_date=planting_date, harvest_date=harvest_date),
|
| initial_water_content=InitialWaterContent(value=['FC']),
|
| irrigation_management=IrrigationManagement(irrigation_method=initial_irrigation_strategy, SMT=SMT, MaxIrr=MaxIrr)
|
| )
|
|
|
| model._initialize()
|
|
|
|
|
| if strategy == 'aquacrop':
|
|
|
| while model._clock_struct.model_is_finished is False:
|
|
|
| cur_date = pd.to_datetime(model._clock_struct.step_start_time).date()
|
|
|
| model.run_model(initialize_model=False)
|
|
|
| depletion = model._init_cond.depletion
|
| taw = model._init_cond.taw
|
| perc_dep = (depletion / taw * 100) if taw > 0 else 0
|
|
|
| results.append({
|
| 'Date': cur_date,
|
| 'Depletion': depletion,
|
| 'TAW': taw,
|
| 'Percentage Depletion': perc_dep
|
| })
|
|
|
|
|
| elif strategy == 'real':
|
|
|
| irrigation_aquacrop["Date"] = pd.to_datetime(irrigation_aquacrop["Date"]).dt.date
|
| schedule_map = {d: mm for d, mm in zip(irrigation_aquacrop["Date"], irrigation_aquacrop["Depth"])}
|
|
|
| while model._clock_struct.model_is_finished is False:
|
|
|
| cur_date = pd.to_datetime(model._clock_struct.step_start_time).date()
|
|
|
| depth_today = 0.0
|
|
|
| model.run_model(initialize_model=False)
|
|
|
| depletion = model._init_cond.depletion
|
| taw = model._init_cond.taw
|
| perc_dep = (depletion / taw * 100) if taw > 0 else 0
|
|
|
| if cur_date in schedule_map:
|
| depth_today = float(schedule_map[cur_date])
|
|
|
| model._param_struct.IrrMngt.depth = depth_today
|
|
|
| results.append({
|
| 'Date': cur_date,
|
| 'Depletion': depletion,
|
| 'TAW': taw,
|
| 'Percentage Depletion': perc_dep
|
| })
|
|
|
|
|
| elif strategy == 'hybrid':
|
|
|
| irrigation_aquacrop["Date"] = pd.to_datetime(irrigation_aquacrop["Date"]).dt.date
|
| schedule_map = {d: mm for d, mm in zip(irrigation_aquacrop["Date"], irrigation_aquacrop["Depth"])}
|
|
|
| reference_date_date = datetime.datetime.strptime(reference_date, "%Y-%m-%d").date()
|
| schedule_map_partial = {
|
| d: v for d, v in schedule_map.items()
|
| if d <= reference_date_date
|
| }
|
|
|
| while model._clock_struct.model_is_finished is False:
|
|
|
| cur_date = pd.to_datetime(model._clock_struct.step_start_time).date()
|
|
|
| if cur_date <= reference_date_date:
|
| model._param_struct.IrrMngt.Schedule = None
|
| if cur_date in schedule_map_partial:
|
| depth_today = float(schedule_map_partial[cur_date])
|
| else:
|
| depth_today = 0.0
|
| model._param_struct.IrrMngt.depth = depth_today
|
|
|
| elif cur_date > reference_date_date:
|
| model._param_struct.IrrMngt.depth = None
|
| model._param_struct.IrrMngt.irrigation_method = 1
|
| model._param_struct.IrrMngt.SMT = SMT
|
|
|
| model.run_model(initialize_model=False)
|
|
|
| depletion = model._init_cond.depletion
|
| taw = model._init_cond.taw
|
| perc_dep = (depletion / taw * 100) if taw > 0 else 0
|
|
|
| results.append({
|
| 'Date': cur_date,
|
| 'Depletion': depletion,
|
| 'TAW': taw,
|
| 'Percentage Depletion': perc_dep
|
| })
|
| else:
|
| raise ValueError(f"Invalid strategy: {strategy}")
|
|
|
|
|
| depletion_df = pd.DataFrame(results)
|
| depletion_df['Date'] = pd.to_datetime(depletion_df['Date'])
|
|
|
| water_flux = model._outputs.water_flux
|
| date_range = pd.date_range(start=sim_start_date, end=sim_end_date, freq='D')
|
| water_flux.insert(0, 'Date', date_range)
|
| water_flux = water_flux.iloc[:-1]
|
|
|
| water_storage = model._outputs.water_storage
|
| date_range = pd.date_range(start=sim_start_date, end=sim_end_date, freq='D')
|
| water_storage.insert(0, 'Date', date_range)
|
| water_storage = water_storage.iloc[:-1]
|
|
|
| crop_growth = model._outputs.crop_growth
|
| date_range = pd.date_range(start=sim_start_date, end=sim_end_date, freq='D')
|
| crop_growth.insert(0, 'Date', date_range)
|
| crop_growth = crop_growth.iloc[:-1]
|
|
|
| water_flux_subset = water_flux[['Date', 'IrrDay', 'Infl', 'Runoff']].rename(columns={'IrrDay': 'Irrigation'})
|
| water_flux_subset['Precipitation'] = water_flux_subset['Infl'] + water_flux_subset['Runoff'] - water_flux_subset[
|
| 'Irrigation']
|
| water_flux_subset = water_flux_subset[['Date', 'Irrigation', 'Precipitation']]
|
|
|
| crop_growth_subset = crop_growth[
|
| ['Date', 'canopy_cover', 'canopy_cover_ns', 'biomass', 'biomass_ns', 'DryYield', 'YieldPot']].rename(columns={
|
| 'canopy_cover': 'CanopyCover',
|
| 'canopy_cover_ns': 'CanopyCoverPot',
|
| 'biomass': 'Biomass',
|
| 'biomass_ns': 'BiomassPot'
|
| })
|
|
|
| summary_season = depletion_df.merge(water_flux_subset, on='Date', how='left').merge(crop_growth_subset, on='Date', how='left')
|
| summary_table = final_stats(model, summary_season, crop_type=crop_type, season=0)
|
|
|
| return water_flux, water_storage, crop_growth, summary_season, summary_table
|
|
|
| except AssertionError:
|
| logger.error(f"AquaCrop failed due to invalid physical condition")
|
| return None
|
| except Exception as e:
|
| logger.error(f"AquaCrop failed due to unexpected error: {e}")
|
| return None
|
|
|
|
|
| def show_soil_depletion(df, date_col, depl_col, irr_col, prec_col, threshold, planting_date=None, reference_date=None, output_file=None):
|
| date_range = df[date_col]
|
|
|
| fig = make_subplots(
|
| rows=1, cols=1, specs=[[{"secondary_y": True}]]
|
| )
|
|
|
|
|
| fig.add_trace(
|
| go.Scatter(
|
| x=date_range,
|
| y=df[depl_col],
|
| mode='lines',
|
| line=dict(color='black', width=2),
|
| name='Percentage Depletion (%)',
|
| hovertemplate='%{x|%d %b %Y}<br>Depletion: %{y:.2f} %<extra></extra>'
|
| ),
|
| secondary_y=False
|
| )
|
|
|
|
|
| fig.add_trace(
|
| go.Bar(
|
| x=date_range,
|
| y=df[irr_col],
|
| name='Irrigation',
|
| marker_color='green',
|
| opacity=0.7,
|
| customdata=np.round(df[irr_col],2),
|
| hovertemplate='Irrigation: %{customdata} mm<extra></extra>'
|
| ),
|
| secondary_y=True
|
| )
|
|
|
|
|
| fig.add_trace(
|
| go.Bar(
|
| x=date_range,
|
| y=df[prec_col],
|
| name='Precipitation',
|
| marker_color='blue',
|
| opacity=0.5,
|
| customdata=np.round(df[prec_col],2),
|
| hovertemplate='Precipitation: %{customdata} mm<extra></extra>'
|
| ),
|
| secondary_y=True
|
| )
|
|
|
|
|
| fig.add_trace(
|
| go.Scatter(
|
| x=[date_range.min(), date_range.max()],
|
| y=[0, 0],
|
| mode='lines',
|
| line=dict(color='blue', width=3),
|
| name='Lower Threshold (FC)'
|
| ),
|
| secondary_y=False
|
| )
|
|
|
| fig.add_trace(
|
| go.Scatter(
|
| x=[date_range.min(), date_range.max()],
|
| y=[threshold, threshold],
|
| mode='lines',
|
| line=dict(color='red', width=3),
|
| name='Upper Threshold'
|
| ),
|
| secondary_y=False
|
| )
|
|
|
|
|
| if planting_date is not None:
|
| if isinstance(planting_date, str):
|
| planting_date = pd.to_datetime(planting_date).value // 10**6
|
| fig.add_vline(
|
| x=planting_date,
|
| line=dict(color='green', width=2, dash='dash'),
|
| annotation_text="Planting Date",
|
| annotation_position="top left"
|
| )
|
|
|
| if reference_date is not None:
|
| if isinstance(reference_date, str):
|
| reference_date = pd.to_datetime(reference_date).value // 10**6
|
| fig.add_vline(
|
| x=reference_date,
|
| line=dict(color='orange', width=2, dash='dash'),
|
| annotation_text="Reference Date",
|
| annotation_position="top left"
|
| )
|
|
|
|
|
| fig.update_yaxes(title_text="Percentage Depletion (%)", secondary_y=False)
|
| fig.update_yaxes(title_text="Water Amount (mm)", secondary_y=True, showgrid=False)
|
|
|
| fig.update_xaxes(
|
| tickformat="%b %Y",
|
| dtick="M1",
|
| tickangle=0,
|
| title_text="Date"
|
| )
|
|
|
|
|
| fig.update_layout(
|
| title="Soil Water Depletion Over Time",
|
| barmode='overlay',
|
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
|
| width=1400,
|
| height=600
|
| )
|
|
|
| if output_file:
|
| fig.write_image(output_file, scale=2)
|
| fig.show()
|
|
|
|
|
| def show_real_benchmark(df, date_col, col_real, col_benchmark, unit_name = 'Value', output_file=None):
|
| date_range = pd.to_datetime(df[date_col])
|
|
|
| fig = make_subplots(rows=1, cols=1, specs=[[{"secondary_y": False}]])
|
|
|
|
|
| fig.add_trace(
|
| go.Scatter(
|
| x=date_range,
|
| y=df[col_real],
|
| mode='lines',
|
| line=dict(color='black', width=2),
|
| name=col_real
|
| )
|
| )
|
|
|
|
|
| fig.add_trace(
|
| go.Scatter(
|
| x=date_range,
|
| y=df[col_benchmark],
|
| mode='lines',
|
| line=dict(color='red', width=2, dash='dash'),
|
| name=col_benchmark
|
| )
|
| )
|
|
|
|
|
| fig.update_yaxes(title_text=unit_name)
|
| fig.update_xaxes(
|
| tickformat="%b %Y",
|
| dtick="M1",
|
| tickangle=0,
|
| title_text=date_col
|
| )
|
|
|
|
|
| fig.update_layout(
|
| title=f"{col_real} vs {col_benchmark} Over Time",
|
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
|
| width=1400,
|
| height=600
|
| )
|
|
|
| if output_file:
|
| fig.write_image(output_file, scale=2)
|
| fig.show()
|
|
|
|
|
| @flow(name='aquacrop_pipeline', retries=1, task_runner=ThreadPoolTaskRunner())
|
| def aquacrop_pipeline() -> list[str]:
|
| logger = get_run_logger()
|
| logger.info(f'Starting model preparation pipeline!')
|
|
|
| consortia = get_consortia()
|
|
|
| for sensor_forecasted in sensors_forecasted:
|
| for consortium_name in consortia:
|
| parquet_path = f"data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//weather_aquacrop.parquet"
|
| settings_path = f"data//05_aquacrop_input//{consortium_name}//{sensor_forecasted}//aquacrop_settings.json"
|
|
|
| if os.path.exists(parquet_path) and os.path.exists(settings_path):
|
|
|
| try:
|
| weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr = read_input_data(consortium_name, sensor_forecasted)
|
| water_flux, water_storage, crop_growth, summary_season, summary_table = aquacrop_run(weather_aquacrop, irrigation_aquacrop, soil_type, crop_type, planting_date, harvest_date, reference_date, sim_start_date, sim_end_date, SMT, MaxIrr, strategy)
|
|
|
| tables_to_save = {
|
| "water_flux": water_flux,
|
| "water_storage": water_storage,
|
| "crop_growth": crop_growth,
|
| "summary_season": summary_season,
|
| "summary_table": summary_table
|
| }
|
|
|
| for name, df in tables_to_save.items():
|
| output_file = f'data//06_aquacrop_output//{consortium_name}//{sensor_forecasted}//{name}.parquet'
|
| os.makedirs(os.path.dirname(output_file), exist_ok=True)
|
|
|
| save_to_file(
|
| df=df,
|
| output_file=output_file
|
| )
|
| except Exception as e:
|
| logger.error(f"Error running model for consortium {consortium_name} and sensor {sensor_forecasted}: {e}")
|
| else:
|
| continue
|
|
|
|
|
| if __name__ == "__main__":
|
| aquacrop_pipeline() |