import numpy as np import pandas as pd from .compute_crop_calendar import compute_crop_calendar from .calculate_HIGC import calculate_HIGC from .calculate_HI_linear import calculate_HI_linear from ..entities.co2 import CO2 from ..entities.crop import Crop from copy import deepcopy from os.path import dirname, abspath from typing import TYPE_CHECKING if TYPE_CHECKING: # Important: classes are only imported when types are checked, not in production. from pandas import DataFrame from aquacrop.entities.clockStruct import ClockStruct from aquacrop.entities.paramStruct import ParamStruct def compute_variables( param_struct: "ParamStruct", weather_df: "DataFrame", clock_struct: "ClockStruct", acfp: str = dirname(dirname(abspath(__file__))), ) -> "ParamStruct": """ Function to compute additional variables needed to run the model eg. CO2 Arguments: param_struct (ParamStruct): Contains model paramaters weather_df (DataFrame): weather data clock_struct (ClockStruct): time params acfp (Path): path to aquacrop_a directory containing co2 data Returns: param_struct (ParamStruct): updated model params """ if param_struct.water_table == 1: param_struct.Soil.add_capillary_rise_params() # Calculate readily evaporable water in surface layer if param_struct.Soil.adj_rew == 0: param_struct.Soil.rew = round( ( 1000 * ( param_struct.Soil.profile.th_fc.iloc[0] - param_struct.Soil.profile.th_dry.iloc[0] ) * param_struct.Soil.evap_z_surf ), 2, ) if param_struct.Soil.calc_cn == 1: # adjust curve number ksat = param_struct.Soil.profile.Ksat.iloc[0] if ksat > 864: param_struct.Soil.cn = 46 elif ksat > 347: param_struct.Soil.cn = 61 elif ksat > 36: param_struct.Soil.cn = 72 elif ksat > 0: param_struct.Soil.cn = 77 assert ksat > 0 for i in range(param_struct.NCrops): crop = param_struct.CropList[i] # crop.calculate_additional_params() # Crop calander crop = compute_crop_calendar( crop, clock_struct.planting_dates, clock_struct.simulation_start_date, clock_struct.simulation_end_date, clock_struct.time_span, weather_df, ) # Harvest index param_struct.Seasonal_Crop_List[clock_struct.season_counter].Paramsgrowth coefficient crop.HIGC = calculate_HIGC( crop.YldFormCD, crop.HI0, crop.HIini, ) # Days to linear harvest_index switch point if crop.CropType == 3: # Determine linear switch point and HIGC rate for fruit/grain crops crop.tLinSwitch, crop.dHILinear = calculate_HI_linear( crop.YldFormCD, crop.HIini, crop.HI0, crop.HIGC ) else: # No linear switch for leafy vegetable or root/tiber crops crop.tLinSwitch = 0 crop.dHILinear = 0.0 param_struct.CropList[i] = crop # Calculate WP adjustment factor for elevation in CO2 concentration # Load CO2 data co2Data = param_struct.CO2.co2_data # Years start_year, end_year = pd.DatetimeIndex( [clock_struct.simulation_start_date, clock_struct.simulation_end_date] ).year sim_years = np.arange(start_year, end_year + 1) # Interpolate data CO2conc_interp = np.interp(sim_years, co2Data.year, co2Data.ppm) # Store data param_struct.CO2.co2_data_processed = pd.Series(CO2conc_interp, index=sim_years) # maybe get rid of this # Get CO2 concentration for first year CO2conc = param_struct.CO2.co2_data_processed.iloc[0] # param_struct.CO2 = param_struct.co2_concentration_adj # if user specified constant concentration if param_struct.CO2.constant_conc is True: if param_struct.CO2.current_concentration > 0.: CO2conc = param_struct.CO2.current_concentration else: CO2conc = param_struct.CO2.co2_data_processed.iloc[0] param_struct.CO2.current_concentration = CO2conc CO2ref = param_struct.CO2.ref_concentration # Get CO2 weighting factor for first year if CO2conc <= CO2ref: fw = 0 else: if CO2conc >= 550: fw = 1 else: fw = 1 - ((550 - CO2conc) / (550 - CO2ref)) # Determine adjustment for each crop in first year of simulation for i in range(param_struct.NCrops): crop = param_struct.CropList[i] # Determine initial adjustment fCO2old = (CO2conc / CO2ref) / ( 1 + (CO2conc - CO2ref) * ( (1 - fw) * crop.bsted + fw * ((crop.bsted * crop.fsink) + (crop.bface * (1 - crop.fsink))) ) ) # New adjusted correction coefficient for CO2 (version 7 of AquaCrop) if (CO2conc > CO2ref): # Calculate shape factor fshape = -4.61824 - 3.43831*crop.fsink - 5.32587*crop.fsink*crop.fsink # Determine adjustment for CO2 if (CO2conc >= 2000): fCO2new = 1.58 # Maximum CO2 adjustment else: CO2rel = (CO2conc-CO2ref)/(2000-CO2ref) fCO2new = 1 + 0.58 * ((np.exp(CO2rel*fshape) - 1)/(np.exp(fshape) - 1)) # Select adjusted coefficient for CO2 if (CO2conc <= CO2ref): fCO2 = fCO2old elif ((CO2conc <= 550) and (fCO2old < fCO2new)): fCO2 = fCO2old else: fCO2 = fCO2new # Consider crop type if crop.WP >= 40: # No correction for C4 crops ftype = 0 elif crop.WP <= 20: # Full correction for C3 crops ftype = 1 else: ftype = (40 - crop.WP) / (40 - 20) # Total adjustment crop.fCO2 = 1 + ftype * (fCO2 - 1) param_struct.CropList[i] = crop # change this later if param_struct.NCrops == 1: crop_list = [ deepcopy(param_struct.CropList[0]) for i in range(len(param_struct.CropChoices)) ] # param_struct.Seasonal_Crop_List = [deepcopy(param_struct.CropList[0]) for i in range(len(param_struct.CropChoices))] else: crop_list = param_struct.CropList # add crop for out of growing season # param_struct.Fallow_Crop = deepcopy(param_struct.Seasonal_Crop_List[0]) Fallow_Crop = deepcopy(crop_list[0]) param_struct.Seasonal_Crop_List = [] for crop in crop_list: #crop_struct = Crop(crop.Name, crop.planting_date) # changed from CropStruct to Crop during removal of numba AOT/JIT compilation #for a, v in crop.__dict__.items(): # if hasattr(crop_struct, a): # crop_struct.__setattr__(a, v) param_struct.Seasonal_Crop_List.append(crop) fallow_struct = Crop(crop.Name, crop.planting_date) # changed from CropStruct to Crop during removal of numba AOT/JIT compilation for a, v in Fallow_Crop.__dict__.items(): if hasattr(fallow_struct, a): fallow_struct.__setattr__(a, v) param_struct.Fallow_Crop = fallow_struct return param_struct