malcolmSQ
Update allometric equations with corrected coefficients - R. racemosa: TB = 2.0738 路 (D虏H)^0.67628 (was 1.938) - A. germinans: TB = 1.5595 路 (D虏H)^0.55864 (was 1.486) - Updated allometry.py, config files, dashboard documentation, and tests
85ccbbb | """ | |
| Core implementation of the Emissions Reduction (ER) model for mangrove projects. | |
| """ | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import yaml | |
| import warnings | |
| from .allometry import calculate_biomass | |
| from .metrics import calculate_carbon | |
| # Growth model imports | |
| from .growth_models.declining_increment import declining_increment_growth, continuous_declining_increment_growth | |
| class Species: | |
| """Species-specific parameters for growth and carbon calculations.""" | |
| name: str | |
| planting_density: float | |
| # Old style | |
| mortality_rates: Optional[Dict[str, float]] = None | |
| # New style | |
| m_ref: Optional[float] = None | |
| DBH_ref: Optional[float] = None | |
| p: Optional[float] = None | |
| chapman_richards: Dict[str, Dict[str, float]] = None | |
| allometry: Dict[str, float] = None | |
| initial_values: Dict[str, float] = None | |
| linear: Optional[Dict[str, Dict[str, float]]] = None | |
| linear_plateau: Optional[Dict[str, Dict[str, float]]] = None | |
| declining_increment: Optional[Dict[str, Dict[str, float]]] = None | |
| class ProjectConfig: | |
| """Project configuration parameters.""" | |
| duration_years: int | |
| planting_schedule: Dict[str, float] | |
| class CarbonConfig: | |
| """Carbon conversion and adjustment parameters.""" | |
| biomass_to_carbon: float | |
| carbon_to_co2: float | |
| buffer_percentage: float | |
| leakage_percentage: float | |
| baseline_emissions: float | |
| soil_carbon_per_ha_per_year: float = 0.0 | |
| class ERModel: | |
| """ | |
| Emissions Reduction Model for mangrove projects. | |
| Calculates carbon sequestration over time based on tree growth, | |
| mortality, and carbon conversion factors. | |
| """ | |
| def __init__(self, config_path: Path = None, config: dict = None): | |
| """ | |
| Initialize the model from a YAML configuration file or a config dict. | |
| Args: | |
| config_path: Path to the YAML configuration file | |
| config: Config dict (optional) | |
| """ | |
| if config is not None: | |
| cfg = config | |
| else: | |
| with open(config_path) as f: | |
| cfg = yaml.safe_load(f) | |
| self.species = [Species(**s) for s in cfg["species"]] | |
| self.project = ProjectConfig(**cfg["project"]) | |
| self.carbon = CarbonConfig(**cfg["carbon"]) | |
| self.results: Optional[pd.DataFrame] = None | |
| self.species_results: Optional[pd.DataFrame] = None | |
| self.scenario_results: Optional[pd.DataFrame] = None | |
| self.growth_model = cfg.get('growth_model', 'chapman_richards') | |
| self.continuous_growth = cfg.get('continuous_growth', False) | |
| def calculate_cohort_surviving_trees(self, planting_year: int, current_year: int, initial_trees: float, species: Species, plateau_density: Optional[float] = None, growth_model: str = None) -> float: | |
| """ | |
| Calculate surviving trees for a cohort planted in planting_year, in current_year. | |
| Uses either per-year mortality (from config) or DBH-dependent mortality. | |
| growth_model: which growth model to use for DBH (e.g., 'linear', 'linear_plateau', etc.) | |
| """ | |
| if growth_model is None: | |
| growth_model = getattr(self, 'growth_model', 'chapman_richards') | |
| age = current_year - planting_year + 1 | |
| if age < 1: | |
| return 0 | |
| if initial_trees == 0: | |
| # Suppress debug output for zero-initial-trees cohorts | |
| return 0 | |
| N_live = initial_trees | |
| for year in range(1, age + 1): | |
| debug_info = { | |
| 'planting_year': planting_year, | |
| 'current_year': current_year, | |
| 'cohort_age': year, | |
| 'initial_trees': initial_trees, | |
| 'N_live_before': N_live | |
| } | |
| if species.mortality_rates is not None: | |
| year_key = f"year_{year}" | |
| if year_key in species.mortality_rates: | |
| mort_rate = species.mortality_rates[year_key] | |
| else: | |
| mort_rate = species.mortality_rates.get("subsequent", 0) | |
| print(f"[DEBUG] Year key '{year_key}' not found in mortality_rates for {species.name}. Using 'subsequent' or 0. Available keys: {list(species.mortality_rates.keys())}") | |
| m = mort_rate / 100.0 | |
| debug_info['mortality_logic'] = 'annual' | |
| debug_info['mortality_rate_percent'] = mort_rate | |
| else: | |
| growth_func, dbh_params = self.get_growth_function_and_params(species, growth_model, 'dbh') | |
| dbh = growth_func(year, dbh_params, species.initial_values["dbh"]) | |
| dbh = max(dbh, 1.0) | |
| m_ref = species.m_ref if species.m_ref is not None else 0.16 | |
| DBH_ref = species.DBH_ref if species.DBH_ref is not None else 9.0 | |
| p = species.p if species.p is not None else 1.493 | |
| m = m_ref * (DBH_ref / dbh) ** p | |
| m = min(max(m, 0), 0.99) | |
| debug_info['mortality_logic'] = 'dbh-dependent' | |
| debug_info['mortality_rate_percent'] = m * 100 | |
| debug_info['dbh'] = dbh | |
| N_live = N_live * (1 - m) | |
| debug_info['N_live_after'] = N_live | |
| print(f"[DEBUG] {species.name} | PlantingYear: {planting_year} | Year: {current_year} | CohortAge: {year} | MortalityLogic: {debug_info['mortality_logic']} | MortalityRate(%): {debug_info['mortality_rate_percent']} | N_live_before: {debug_info['N_live_before']} | N_live_after: {debug_info['N_live_after']}") | |
| return N_live | |
| def calculate_total_surviving_trees(self, year: int) -> Dict[str, float]: | |
| """ | |
| Calculate total surviving trees for each species in a given year, summing across all cohorts. | |
| Returns a dict: {species_name: total_surviving_trees} | |
| """ | |
| growth_model = getattr(self, 'growth_model', 'chapman_richards') | |
| totals = {} | |
| for species in self.species: | |
| total = 0 | |
| for planting_year, area in self.project.planting_schedule.items(): | |
| py = int(planting_year.split("_")[1]) | |
| initial_trees = species.planting_density * area | |
| # Use plateau_density as the year-5 value for this cohort | |
| plateau_density = species.planting_density * area if 5 <= (year - py + 1) else None | |
| total += self.calculate_cohort_surviving_trees(py, year, initial_trees, species, plateau_density, growth_model) | |
| totals[species.name] = total | |
| return totals | |
| def calculate_cumulative_area(self, year: int) -> float: | |
| """ | |
| Calculate cumulative area planted up to and including the given year. | |
| """ | |
| total = 0 | |
| for planting_year, area in self.project.planting_schedule.items(): | |
| py = int(planting_year.split("_")[1]) | |
| if py <= year: | |
| total += area | |
| return total | |
| def chapman_richards_growth(age: float, params: Dict[str, float], initial_value: float) -> float: | |
| a, b, c = params["a"], params["b"], params["c"] | |
| return initial_value + (a - initial_value) * (1 - np.exp(-b * age)) ** c | |
| def linear_growth(age: float, params: Dict[str, float], initial_value: float) -> float: | |
| r = params["r"] | |
| return initial_value + r * age | |
| def linear_plateau_growth(age: float, params: Dict[str, float], initial_value: float) -> float: | |
| r = params["r"] | |
| T_p = params["T_p"] | |
| a = params["a"] | |
| if age <= T_p: | |
| return initial_value + r * age | |
| else: | |
| return initial_value + a | |
| def declining_increment_growth(age: float, params: Dict[str, float], initial_value: float) -> float: | |
| r0 = params["r0"] | |
| T_m = params["T_m"] | |
| # Accumulate annual increments, never negative | |
| total = initial_value | |
| for i in range(1, int(np.floor(age)) + 1): | |
| increment = r0 * max(0, 1 - (i - 1) / T_m) | |
| total += increment | |
| # If age is fractional, add partial increment for the last year | |
| frac = age - int(np.floor(age)) | |
| if frac > 0: | |
| i = int(np.floor(age)) + 1 | |
| increment = r0 * max(0, 1 - (i - 1) / T_m) | |
| total += frac * increment | |
| return total | |
| def continuous_declining_increment_growth(age: float, params: Dict[str, float], initial_value: float) -> float: | |
| r0 = params["r0"] | |
| T_m = params["T_m"] | |
| # Continuous formula: initial + r0 * (age - age^2/(2*Tm)) | |
| return initial_value + r0 * (age - age**2 / (2 * T_m)) | |
| def calculate_carbon_for_species(self, species: Species, age: int, area: float, cohort_age: int) -> float: | |
| """ | |
| Calculate carbon sequestration for a single species, cohort, and cohort age. | |
| Args: | |
| species: Species parameters | |
| age: Project year (not used for growth) | |
| area: Planted area in hectares | |
| cohort_age: Age of this cohort (years since planting) | |
| Returns: | |
| Carbon sequestration in tCO2 | |
| """ | |
| if cohort_age < 1: | |
| return 0 | |
| initial_trees = species.planting_density * area | |
| plateau_density = species.planting_density * area if cohort_age >= 5 else None | |
| surviving = self.calculate_cohort_surviving_trees(1, cohort_age, initial_trees, species, plateau_density, self.growth_model) | |
| dbh_func, dbh_params = self.get_growth_function_and_params(species, self.growth_model, 'dbh') | |
| height_func, height_params = self.get_growth_function_and_params(species, self.growth_model, 'height') | |
| dbh = dbh_func(cohort_age, dbh_params, species.initial_values["dbh"]) | |
| height = height_func(cohort_age, height_params, species.initial_values["height"]) | |
| biomass = calculate_biomass(dbh, height, species.name, species.allometry) | |
| carbon = calculate_carbon( | |
| biomass * surviving, | |
| self.carbon.biomass_to_carbon, | |
| self.carbon.carbon_to_co2 | |
| ) | |
| return carbon | |
| def run(self) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Execute the full ER calculation pipeline. | |
| Returns: | |
| Tuple of (yearly results DataFrame, species results DataFrame) | |
| """ | |
| years = range(1, self.project.duration_years + 1) | |
| results = [] | |
| species_results = [] | |
| species_metrics_rows = [] # For new per-year, per-species metrics | |
| for year in years: | |
| year_results = {"year": year} | |
| species_year_results = {"Year": year} | |
| total_carbon = 0 | |
| cumulative_area = self.calculate_cumulative_area(year) | |
| for species in self.species: | |
| species_carbon = 0 | |
| # --- New metrics --- | |
| total_surviving = 0 | |
| total_dbh = 0 | |
| total_height = 0 | |
| total_biomass_per_tree = 0 | |
| total_biomass = 0 | |
| n_cohorts = 0 | |
| for planting_year, area in self.project.planting_schedule.items(): | |
| py = int(planting_year.split("_")[1]) | |
| cohort_age = year - py + 1 | |
| if cohort_age < 1: | |
| continue | |
| initial_trees = species.planting_density * area | |
| plateau_density = species.planting_density * area if cohort_age >= 5 else None | |
| surviving = self.calculate_cohort_surviving_trees(1, cohort_age, initial_trees, species, plateau_density, self.growth_model) | |
| dbh_func, dbh_params = self.get_growth_function_and_params(species, self.growth_model, 'dbh') | |
| height_func, height_params = self.get_growth_function_and_params(species, self.growth_model, 'height') | |
| dbh = dbh_func(cohort_age, dbh_params, species.initial_values["dbh"]) | |
| height = height_func(cohort_age, height_params, species.initial_values["height"]) | |
| biomass_per_tree = calculate_biomass(dbh, height, species.name, species.allometry) | |
| total_surviving += surviving | |
| total_dbh += dbh * surviving | |
| total_height += height * surviving | |
| total_biomass_per_tree += biomass_per_tree * surviving | |
| total_biomass += biomass_per_tree * surviving | |
| n_cohorts += surviving | |
| # --- End new metrics --- | |
| # Existing carbon calculation | |
| carbon = self.calculate_carbon_for_species(species, year, area, cohort_age) | |
| species_carbon += carbon | |
| total_carbon += species_carbon | |
| species_key = f"{species.name} tCO2" | |
| species_year_results[species_key] = species_carbon | |
| # Store per-year, per-species metrics | |
| if total_surviving > 0: | |
| avg_dbh = total_dbh / total_surviving | |
| avg_height = total_height / total_surviving | |
| avg_biomass_per_tree = total_biomass_per_tree / total_surviving | |
| else: | |
| avg_dbh = 0 | |
| avg_height = 0 | |
| avg_biomass_per_tree = 0 | |
| species_metrics_rows.append({ | |
| "Year": year, | |
| "Species": species.name, | |
| "Surviving Trees": total_surviving, | |
| "DBH (cm)": avg_dbh, | |
| "Height (m)": avg_height, | |
| "Biomass per Tree (kg)": avg_biomass_per_tree, | |
| "Total Biomass (kg)": total_biomass | |
| }) | |
| species_year_results["Total tCO2"] = total_carbon | |
| species_year_results["Cumulative ha"] = cumulative_area | |
| species_year_results["tCO2/ha"] = total_carbon / cumulative_area if cumulative_area > 0 else 0 | |
| gross_carbon = total_carbon | |
| buffer_carbon = gross_carbon * (1 - self.carbon.buffer_percentage / 100) | |
| buffer_carbon -= self.carbon.leakage_percentage / 100 * gross_carbon | |
| buffer_carbon -= self.carbon.baseline_emissions * cumulative_area | |
| # Cumulative soil carbon: add 1 t/ha for every hectare ever planted, each year | |
| soil_carbon = 0 | |
| if hasattr(self.carbon, 'soil_carbon_per_ha_per_year'): | |
| soil_carbon = self.carbon.soil_carbon_per_ha_per_year * cumulative_area | |
| gross_carbon_with_soil = gross_carbon + soil_carbon | |
| buffer_carbon_with_soil = buffer_carbon + soil_carbon | |
| year_results.update({ | |
| "gross_carbon": gross_carbon, | |
| "buffer_carbon": buffer_carbon, | |
| "cumulative_area": cumulative_area, | |
| "gross_carbon_with_soil": gross_carbon_with_soil, | |
| "buffer_carbon_with_soil": buffer_carbon_with_soil, | |
| "soil_carbon": soil_carbon | |
| }) | |
| results.append(year_results) | |
| species_results.append(species_year_results) | |
| self.results = pd.DataFrame(results) | |
| self.species_results = pd.DataFrame(species_results) | |
| self.species_metrics = pd.DataFrame(species_metrics_rows) | |
| return self.results, self.species_results | |
| def save_results(self, output_path: Path) -> None: | |
| """ | |
| Save results to CSV file. | |
| Args: | |
| output_path: Path to save the results CSV | |
| """ | |
| if self.results is None: | |
| raise ValueError("No results available. Run the model first.") | |
| self.results.to_csv(output_path, index=False) | |
| def get_growth_function_and_params(self, species, growth_model, dim): | |
| """ | |
| Returns the correct growth function and parameter dict for the given species and dimension (dbh or height). | |
| """ | |
| if growth_model == "linear": | |
| # from growth_models.linear import linear_growth | |
| func = None # ARCHIVED/NOT IN USE | |
| params = species.linear[dim] | |
| elif growth_model == "linear_plateau": | |
| # from growth_models.linear import linear_plateau_growth | |
| func = None # ARCHIVED/NOT IN USE | |
| params = species.linear_plateau[dim] | |
| elif growth_model == "declining_increment": | |
| if getattr(self, 'continuous_growth', False): | |
| func = continuous_declining_increment_growth | |
| else: | |
| func = declining_increment_growth | |
| params = species.declining_increment[dim] | |
| else: | |
| # from growth_models.chapman_richards import chapman_richards_growth | |
| func = None # ARCHIVED/NOT IN USE | |
| params = species.chapman_richards[dim] | |
| return func, params | |
| # --- Parameter sweep/test for plausible survival curves --- | |
| def test_dbh_mortality_sweep(): | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| m_refs = [0.01, 0.05, 0.1, 0.16] | |
| ps = [1.0, 1.5, 2.0] | |
| DBH_ref = 9.0 | |
| years = np.arange(1, 31) | |
| initial_trees = 1000 | |
| results = {} | |
| for m_ref in m_refs: | |
| for p in ps: | |
| N_live = initial_trees | |
| N_lives = [] | |
| for year in years: | |
| dbh = 1.0 + (year - 1) * 0.5 # simple linear DBH growth for test | |
| dbh = max(dbh, 1.0) | |
| m = m_ref * (DBH_ref / dbh) ** p | |
| m = min(max(m, 0), 0.99) | |
| N_live = N_live * (1 - m) | |
| N_lives.append(N_live) | |
| results[(m_ref, p)] = N_lives | |
| plt.figure(figsize=(10,6)) | |
| for (m_ref, p), N_lives in results.items(): | |
| plt.plot(years, N_lives, label=f"m_ref={m_ref}, p={p}") | |
| plt.xlabel("Year") | |
| plt.ylabel("Surviving Trees") | |
| plt.title("DBH-dependent Mortality Parameter Sweep") | |
| plt.legend() | |
| plt.grid(True) | |
| plt.show() | |
| # To run the test, call test_dbh_mortality_sweep() from __main__ or a notebook. |