malcolmSQ
Fix: Use cumulative soil carbon for all tables and outputs; remove cumsum from table logic
fa68af3 | """ | |
| Core implementation of the Emissions Reduction (ER) model for mangrove projects. | |
| """ | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import yaml | |
| import warnings | |
| from er_model_core.allometry import calculate_biomass | |
| from er_model_core.metrics import calculate_carbon | |
| from er_model_core.config_loader import load_model_config | |
| from er_model_core.types import Species, ProjectConfig, CarbonConfig | |
| # Growth model imports - ensure all are imported | |
| from er_model_core.growth_models.chapman_richards import chapman_richards_growth | |
| from er_model_core.growth_models.linear import linear_growth, linear_plateau_growth | |
| from er_model_core.growth_models.declining_increment import declining_increment_growth, continuous_declining_increment_growth | |
| class Species: | |
| """Species-specific parameters for growth and carbon calculations.""" | |
| name: str | |
| planting_density: float | |
| # Old style | |
| mortality_rates: Optional[Dict[str, float]] = None | |
| # New style | |
| m_ref: Optional[float] = None | |
| DBH_ref: Optional[float] = None | |
| p: Optional[float] = None | |
| chapman_richards: Dict[str, Dict[str, float]] = None | |
| allometry: Dict[str, float] = None | |
| initial_values: Dict[str, float] = None | |
| linear: Optional[Dict[str, Dict[str, float]]] = None | |
| linear_plateau: Optional[Dict[str, Dict[str, float]]] = None | |
| declining_increment: Optional[Dict[str, Dict[str, float]]] = None | |
| class ProjectConfig: | |
| """Project configuration parameters.""" | |
| duration_years: int | |
| planting_schedule: Dict[str, float] | |
| class CarbonConfig: | |
| """Carbon conversion and adjustment parameters.""" | |
| biomass_to_carbon: float | |
| carbon_to_co2: float | |
| buffer_percentage: float | |
| leakage_percentage: float | |
| baseline_emissions: float | |
| soil_carbon_per_ha_per_year: float = 0.0 | |
| class ERModel: | |
| """ | |
| Emissions Reduction Model for mangrove projects. | |
| Calculates carbon sequestration over time based on tree growth, | |
| mortality, and carbon conversion factors. | |
| """ | |
| def __init__(self, config_path: Optional[Path] = None, config: Optional[dict] = None): | |
| """ | |
| Initialize the model from a YAML configuration file or a config dict. | |
| Args: | |
| config_path: Path to the YAML configuration file | |
| config: Config dict (optional) | |
| """ | |
| ( | |
| self.species, | |
| self.project, | |
| self.carbon, | |
| self.growth_model, | |
| self.continuous_growth, | |
| self.raw_config # Storing the raw config might be useful for other parts | |
| ) = load_model_config(config_path=config_path, config_dict=config) | |
| self.results: Optional[pd.DataFrame] = None | |
| self.species_results: Optional[pd.DataFrame] = None | |
| self.scenario_results: Optional[pd.DataFrame] = None | |
| self.scenarios = self.raw_config.get("scenarios", { | |
| "area": 1000.0, | |
| "dbh_range": [1.0, 20.0], | |
| "height_range": [0.5, 12.0], | |
| "growth_rate_factor": 1.0 | |
| }) | |
| def calculate_cohort_surviving_trees(self, planting_year: int, current_year: int, initial_trees: float, species: Species, plateau_density: Optional[float] = None, growth_model: str = None) -> float: | |
| """ | |
| Calculate surviving trees for a cohort planted in planting_year, in current_year. | |
| Uses either per-year mortality (from config) or DBH-dependent mortality. | |
| growth_model: which growth model to use for DBH (e.g., 'linear', 'linear_plateau', etc.) | |
| """ | |
| if growth_model is None: | |
| growth_model = getattr(self, 'growth_model', 'chapman_richards') | |
| age = current_year - planting_year + 1 | |
| if age < 1: | |
| return 0 | |
| if initial_trees == 0: | |
| # Suppress debug output for zero-initial-trees cohorts | |
| return 0 | |
| N_live = initial_trees | |
| for year in range(1, age + 1): | |
| debug_info = { | |
| 'planting_year': planting_year, | |
| 'current_year': current_year, | |
| 'cohort_age': year, | |
| 'initial_trees': initial_trees, | |
| 'N_live_before': N_live | |
| } | |
| if species.mortality_rates is not None: | |
| year_key = f"year_{year}" | |
| if year_key in species.mortality_rates: | |
| mort_rate = species.mortality_rates[year_key] | |
| else: | |
| mort_rate = species.mortality_rates.get("subsequent", 0) | |
| print(f"[DEBUG] Year key '{year_key}' not found in mortality_rates for {species.name}. Using 'subsequent' or 0. Available keys: {list(species.mortality_rates.keys())}") | |
| m = mort_rate / 100.0 | |
| debug_info['mortality_logic'] = 'annual' | |
| debug_info['mortality_rate_percent'] = mort_rate | |
| else: | |
| growth_func, dbh_params = self.get_growth_function_and_params(species, growth_model, 'dbh') | |
| dbh = growth_func(year, dbh_params, species.initial_values["dbh"]) | |
| dbh = max(dbh, 1.0) | |
| m_ref = species.m_ref if species.m_ref is not None else 0.16 | |
| DBH_ref = species.DBH_ref if species.DBH_ref is not None else 9.0 | |
| p = species.p if species.p is not None else 1.493 | |
| m = m_ref * (DBH_ref / dbh) ** p | |
| m = min(max(m, 0), 0.99) | |
| debug_info['mortality_logic'] = 'dbh-dependent' | |
| debug_info['mortality_rate_percent'] = m * 100 | |
| debug_info['dbh'] = dbh | |
| N_live = N_live * (1 - m) | |
| debug_info['N_live_after'] = N_live | |
| print(f"[DEBUG] {species.name} | PlantingYear: {planting_year} | Year: {current_year} | CohortAge: {year} | MortalityLogic: {debug_info['mortality_logic']} | MortalityRate(%): {debug_info['mortality_rate_percent']} | N_live_before: {debug_info['N_live_before']} | N_live_after: {debug_info['N_live_after']}") | |
| return N_live | |
| def calculate_total_surviving_trees(self, year: int) -> Dict[str, float]: | |
| """ | |
| Calculate total surviving trees for each species in a given year, summing across all cohorts. | |
| Returns a dict: {species_name: total_surviving_trees} | |
| """ | |
| growth_model = getattr(self, 'growth_model', 'chapman_richards') | |
| totals = {} | |
| for species in self.species: | |
| total = 0 | |
| for planting_year, area in self.project.planting_schedule.items(): | |
| py = int(planting_year.split("_")[1]) | |
| initial_trees = species.planting_density * area | |
| # Use plateau_density as the year-5 value for this cohort | |
| plateau_density = species.planting_density * area if 5 <= (year - py + 1) else None | |
| total += self.calculate_cohort_surviving_trees(py, year, initial_trees, species, plateau_density, growth_model) | |
| totals[species.name] = total | |
| return totals | |
| def calculate_cumulative_area(self, year: int) -> float: | |
| """ | |
| Calculate cumulative area planted up to and including the given year. | |
| """ | |
| total = 0 | |
| for planting_year, area in self.project.planting_schedule.items(): | |
| py = int(planting_year.split("_")[1]) | |
| if py <= year: | |
| total += area | |
| return total | |
| def get_growth_function_and_params(self, species: Species, growth_model_name: str, dim: str) -> Tuple[callable, Dict[str, float]]: | |
| """ | |
| Get the growth function and its parameters for a given species, model, and dimension (dbh or height). | |
| Uses self.continuous_growth to select continuous version if applicable. | |
| """ | |
| params = species.__dict__.get(growth_model_name, {}).get(dim, {}) | |
| initial_value_key = f"initial_{dim}" | |
| # initial_value = species.initial_values.get(dim, 0) # This was the old way, let's ensure params are complete | |
| growth_functions = { | |
| "chapman_richards": chapman_richards_growth, | |
| "linear": linear_growth, | |
| "linear_plateau": linear_plateau_growth, | |
| "declining_increment": declining_increment_growth, | |
| # Potentially add more models here | |
| } | |
| continuous_growth_functions = { | |
| "declining_increment": continuous_declining_increment_growth, | |
| # Add other continuous versions if they exist | |
| } | |
| if self.continuous_growth and growth_model_name in continuous_growth_functions: | |
| selected_func = continuous_growth_functions[growth_model_name] | |
| elif growth_model_name in growth_functions: | |
| selected_func = growth_functions[growth_model_name] | |
| else: | |
| # Fallback or error if model name is not found | |
| # For now, let's use chapman_richards as a default if self.growth_model itself isn't specific enough | |
| # However, growth_model_name parameter should be the specific one. | |
| warnings.warn(f"Growth model '{growth_model_name}' not found or not supported. Defaulting to chapman_richards for {species.name} {dim}. Check config.") | |
| selected_func = chapman_richards_growth # Default fallback | |
| params = species.chapman_richards.get(dim, {}) if species.chapman_richards else {} | |
| if not params: | |
| warnings.warn(f"No parameters found for {growth_model_name} {dim} for species {species.name}. Growth may be incorrect.") | |
| # Provide some default empty params if really needed, or let it fail if func expects them. | |
| return selected_func, params | |
| def calculate_carbon_for_species(self, species: Species, age: int, area: float, cohort_age: int) -> float: | |
| """ | |
| Calculate carbon sequestration for a single species, cohort, and cohort age. | |
| Args: | |
| species: Species parameters | |
| age: Project year (not used for growth) | |
| area: Planted area in hectares | |
| cohort_age: Age of this cohort (years since planting) | |
| Returns: | |
| Carbon sequestration in tCO2 | |
| """ | |
| if cohort_age < 1: | |
| return 0 | |
| initial_trees = species.planting_density * area | |
| plateau_density = species.planting_density * area if cohort_age >= 5 else None | |
| surviving = self.calculate_cohort_surviving_trees(1, cohort_age, initial_trees, species, plateau_density, self.growth_model) | |
| dbh_func, dbh_params = self.get_growth_function_and_params(species, self.growth_model, 'dbh') | |
| height_func, height_params = self.get_growth_function_and_params(species, self.growth_model, 'height') | |
| dbh = dbh_func(cohort_age, dbh_params, species.initial_values["dbh"]) | |
| height = height_func(cohort_age, height_params, species.initial_values["height"]) | |
| biomass = calculate_biomass(dbh, height, species.name, species.allometry) | |
| carbon = calculate_carbon( | |
| biomass * surviving, | |
| self.carbon.biomass_to_carbon, | |
| self.carbon.carbon_to_co2 | |
| ) | |
| return carbon | |
| def run(self) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Execute the full ER calculation pipeline. | |
| Returns: | |
| Tuple of (yearly results DataFrame, species results DataFrame) | |
| """ | |
| years = range(1, self.project.duration_years + 1) | |
| results = [] | |
| species_results = [] | |
| species_metrics_rows = [] # For new per-year, per-species metrics | |
| cumulative_soil_carbon = 0 | |
| for year in years: | |
| year_results = {"year": year} | |
| species_year_results = {"Year": year} | |
| total_carbon = 0 | |
| cumulative_area = self.calculate_cumulative_area(year) | |
| for species in self.species: | |
| species_carbon = 0 | |
| # --- New metrics --- | |
| total_surviving = 0 | |
| total_dbh = 0 | |
| total_height = 0 | |
| total_biomass_per_tree = 0 | |
| total_biomass = 0 | |
| n_cohorts = 0 | |
| for planting_year, area in self.project.planting_schedule.items(): | |
| py = int(planting_year.split("_")[1]) | |
| cohort_age = year - py + 1 | |
| if cohort_age < 1: | |
| continue | |
| initial_trees = species.planting_density * area | |
| plateau_density = species.planting_density * area if cohort_age >= 5 else None | |
| surviving = self.calculate_cohort_surviving_trees(1, cohort_age, initial_trees, species, plateau_density, self.growth_model) | |
| dbh_func, dbh_params = self.get_growth_function_and_params(species, self.growth_model, 'dbh') | |
| height_func, height_params = self.get_growth_function_and_params(species, self.growth_model, 'height') | |
| dbh = dbh_func(cohort_age, dbh_params, species.initial_values["dbh"]) | |
| height = height_func(cohort_age, height_params, species.initial_values["height"]) | |
| biomass_per_tree = calculate_biomass(dbh, height, species.name, species.allometry) | |
| total_surviving += surviving | |
| total_dbh += dbh * surviving | |
| total_height += height * surviving | |
| total_biomass_per_tree += biomass_per_tree * surviving | |
| total_biomass += biomass_per_tree * surviving | |
| n_cohorts += surviving | |
| # --- End new metrics --- | |
| # Existing carbon calculation | |
| carbon = self.calculate_carbon_for_species(species, year, area, cohort_age) | |
| species_carbon += carbon | |
| total_carbon += species_carbon | |
| species_key = f"{species.name} tCO2" | |
| species_year_results[species_key] = species_carbon | |
| # Store per-year, per-species metrics | |
| if total_surviving > 0: | |
| avg_dbh = total_dbh / total_surviving | |
| avg_height = total_height / total_surviving | |
| avg_biomass_per_tree = total_biomass_per_tree / total_surviving | |
| else: | |
| avg_dbh = 0 | |
| avg_height = 0 | |
| avg_biomass_per_tree = 0 | |
| species_metrics_rows.append({ | |
| "Year": year, | |
| "Species": species.name, | |
| "Surviving Trees": total_surviving, | |
| "DBH (cm)": avg_dbh, | |
| "Height (m)": avg_height, | |
| "Biomass per Tree (kg)": avg_biomass_per_tree, | |
| "Total Biomass (kg)": total_biomass | |
| }) | |
| species_year_results["Total tCO2"] = total_carbon | |
| species_year_results["Cumulative ha"] = cumulative_area | |
| species_year_results["tCO2/ha"] = total_carbon / cumulative_area if cumulative_area > 0 else 0 | |
| gross_carbon = total_carbon | |
| buffer_carbon = gross_carbon * (1 - self.carbon.buffer_percentage / 100) | |
| buffer_carbon -= self.carbon.leakage_percentage / 100 * gross_carbon | |
| buffer_carbon -= self.carbon.baseline_emissions * cumulative_area | |
| # Cumulative soil carbon: add 1 t/ha for every hectare ever planted, each year | |
| soil_carbon = 0 | |
| if hasattr(self.carbon, 'soil_carbon_per_ha_per_year'): | |
| soil_carbon = self.carbon.soil_carbon_per_ha_per_year * cumulative_area | |
| cumulative_soil_carbon += soil_carbon | |
| gross_carbon_with_soil = gross_carbon + cumulative_soil_carbon | |
| buffer_carbon_with_soil = buffer_carbon + (cumulative_soil_carbon * (1 - self.carbon.buffer_percentage / 100)) | |
| year_results.update({ | |
| "gross_carbon": gross_carbon, | |
| "buffer_carbon": buffer_carbon, | |
| "cumulative_area": cumulative_area, | |
| "gross_carbon_with_soil": gross_carbon_with_soil, | |
| "buffer_carbon_with_soil": buffer_carbon_with_soil, | |
| "soil_carbon": soil_carbon, | |
| "cumulative_soil_carbon": cumulative_soil_carbon | |
| }) | |
| results.append(year_results) | |
| species_results.append(species_year_results) | |
| self.results = pd.DataFrame(results) | |
| self.species_results = pd.DataFrame(species_results) | |
| self.species_metrics = pd.DataFrame(species_metrics_rows) | |
| return self.results, self.species_results | |
| def save_results(self, output_path: Path) -> None: | |
| """ | |
| Save the main results DataFrame to a CSV file. | |
| """ | |
| if self.results is None: | |
| raise ValueError("Results have not been calculated yet. Run model.run() first.") | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| self.results.to_csv(output_path, index=False) |