""" GroundTruth FastAPI Server — Biological Carbon Verification API v2.0 Wraps pipeline/mrv_model.py as a REST API with v2.0 mechanistic enrichments: - RothC clay stabilization (replaces linear clay scoring) - Shannon-Wiener diversity + diagnostic strength - CENTURY carbon pool inference from oligotrophic ratio - Monte Carlo uncertainty propagation (optional) - Bray-Curtis temporal community monitoring Plus v1.x: batch scoring, response caching, biome auto-detection Endpoints: POST /v1/score — Run MRV score (now with v2.0 enrichments) POST /v1/score/batch — Batch score multiple sites POST /v1/score/auto — Auto-fetch SoilGrids + score (lat/lon + taxa) POST /v1/certificate — Generate MRV certificate POST /v1/biome/detect — Auto-detect biome from GPS POST /v1/uncertainty — Monte Carlo uncertainty analysis POST /v1/diversity/temporal — Bray-Curtis temporal change monitoring GET /v1/biomes — List available biomes and references GET /v1/methodologies — List Verra methodology thresholds GET /v1/taxa — List EMP taxa with weights and roles GET /v1/land-use — List land use multipliers GET /v1/presets — List canonical presets GET /v1/presets/{name}/score — Run canonical preset GET /v1/cache/stats — Cache performance statistics GET /v1/health — Health check Usage: uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload """ from __future__ import annotations from datetime import datetime, timezone from typing import Optional from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, field_validator from pipeline.mrv_model import ( run_mrv_score, MRVResult, EMP_TAXA_WEIGHTS, BIOME_REFS, LAND_USE_MULTIPLIERS, VERRA_THRESHOLDS, PRESET_PRISTINE, PRESET_REGEN, PRESET_DEGRADED, ) from pipeline.certificate_generator import ( generate_certificate_json, generate_certificate_text, ) from pipeline.cache import get_default_cache, ScoreCache from pipeline.biome_detect import detect_biome from pipeline.uncertainty import run_monte_carlo, TaxaUncertainty, SoilUncertainty, FluxUncertainty, LandUseUncertainty from pipeline.diversity_index import compute_bray_curtis, assess_temporal_change from pipeline.rothc_factors import infer_carbon_pools # Module-level cache instance _cache = get_default_cache() app = FastAPI( title="GroundTruth MRV API", description=( "Biological Carbon Verification Engine v2.0 — " "EMP 16S Profiles x SoilGrids250m x FLUXNET2015 x Verra Registry Standards. " "v2.0: RothC clay stabilization, Shannon-Wiener diversity, CENTURY pool inference, " "Monte Carlo uncertainty propagation, Bray-Curtis temporal monitoring." ), version="2.0.0", docs_url="/docs", redoc_url="/redoc", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ── REQUEST / RESPONSE MODELS ─────────────────────────────────────────────── class TaxaInput(BaseModel): """EMP 16S taxon relative abundances (0–100%).""" Koribacteraceae: float = Field(50.0, ge=0, le=100) Acidobacteria_6: float = Field(50.0, ge=0, le=100, alias="Acidobacteria-6") Bradyrhizobium: float = Field(50.0, ge=0, le=100) Rhodoplanes: float = Field(50.0, ge=0, le=100) Steroidobacter: float = Field(50.0, ge=0, le=100) Pseudomonas: float = Field(50.0, ge=0, le=100) Sphingomonas: float = Field(50.0, ge=0, le=100) Bacillus: float = Field(50.0, ge=0, le=100) Arthrobacter: float = Field(50.0, ge=0, le=100) model_config = {"populate_by_name": True} def to_dict(self) -> dict: return { "Koribacteraceae": self.Koribacteraceae, "Acidobacteria-6": self.Acidobacteria_6, "Bradyrhizobium": self.Bradyrhizobium, "Rhodoplanes": self.Rhodoplanes, "Steroidobacter": self.Steroidobacter, "Pseudomonas": self.Pseudomonas, "Sphingomonas": self.Sphingomonas, "Bacillus": self.Bacillus, "Arthrobacter": self.Arthrobacter, } class SoilInput(BaseModel): """SoilGrids250m physical parameters.""" ph: float = Field(6.2, ge=3.0, le=10.0) soc_g_kg: float = Field(35.0, ge=0, le=500) clay_pct: float = Field(25.0, ge=0, le=100) bulk_density_g_cm3: float = Field(1.2, ge=0.3, le=2.5) cec_cmol_kg: float = Field(20.0, ge=0, le=200) class ScoreRequest(BaseModel): """Full MRV scoring request.""" taxa: TaxaInput soil: SoilInput biome: str = Field(..., description="One of: " + ", ".join(BIOME_REFS.keys())) land_use: str = Field(..., description="One of: " + ", ".join(LAND_USE_MULTIPLIERS.keys())) sample_id: Optional[str] = None site_name: Optional[str] = None coordinates: Optional[dict] = None @field_validator("biome") @classmethod def validate_biome(cls, v): if v not in BIOME_REFS: raise ValueError(f"Invalid biome. Must be one of: {list(BIOME_REFS.keys())}") return v @field_validator("land_use") @classmethod def validate_land_use(cls, v): if v not in LAND_USE_MULTIPLIERS: raise ValueError(f"Invalid land_use. Must be one of: {list(LAND_USE_MULTIPLIERS.keys())}") return v class AutoScoreRequest(BaseModel): """Auto-fetch SoilGrids and score — requires only coordinates + taxa.""" lat: float = Field(..., ge=-90, le=90) lon: float = Field(..., ge=-180, le=180) taxa: TaxaInput biome: str land_use: str sample_id: Optional[str] = None site_name: Optional[str] = None @field_validator("biome") @classmethod def validate_biome(cls, v): if v not in BIOME_REFS: raise ValueError(f"Invalid biome. Must be one of: {list(BIOME_REFS.keys())}") return v @field_validator("land_use") @classmethod def validate_land_use(cls, v): if v not in LAND_USE_MULTIPLIERS: raise ValueError(f"Invalid land_use. Must be one of: {list(LAND_USE_MULTIPLIERS.keys())}") return v class ScoreResponse(BaseModel): """MRV scoring response with v2.0 mechanistic enrichments.""" score: int confidence_interval: list[int] confidence_pct: int carbon_estimate_tco2_ha_yr: float permanence_risk: str additionality: str leakage_risk: str best_methodology: Optional[dict] eligible_methodologies: list[dict] feature_importances: list[dict] bio_score: int soil_score: int flux_score: int biome_score: int land_use_multiplier: float citations: list[str] timestamp: str # v2.0 enrichments diversity: Optional[dict] = None carbon_pools: Optional[dict] = None clay_stabilization: Optional[dict] = None iom_estimate_t_c_ha: Optional[float] = None # ── ENDPOINTS ──────────────────────────────────────────────────────────────── @app.get("/v1/health") async def health_check(): """Health check endpoint.""" return { "status": "healthy", "platform": "GroundTruth MRV", "version": "2.0.0", "timestamp": datetime.now(timezone.utc).isoformat(), "v1_optimizations": [ "batch_scoring", "response_caching", "biome_auto_detect", "sigmoid_taxa_curves", "multi_depth_soil", "ndvi_signal", "ameriflux_ingestion", "ssurgo_integration", ], "v2_capabilities": [ "rothc_clay_stabilization", "shannon_wiener_diversity", "century_pool_inference", "monte_carlo_uncertainty", "bray_curtis_temporal_monitoring", ], } @app.post("/v1/score", response_model=ScoreResponse) async def run_score(request: ScoreRequest): """ Run MRV carbon verification score. Fuses biological (EMP 16S), physical (SoilGrids), and flux (FLUXNET) signals with a land use multiplier to produce an auditable score 0–100. """ result = run_mrv_score( taxa_abundances=request.taxa.to_dict(), soil_params={ "ph": request.soil.ph, "soc_g_kg": request.soil.soc_g_kg, "clay_pct": request.soil.clay_pct, "bulk_density_g_cm3": request.soil.bulk_density_g_cm3, "cec_cmol_kg": request.soil.cec_cmol_kg, }, biome=request.biome, land_use=request.land_use, ) return ScoreResponse( score=result.score, confidence_interval=result.confidence_interval, confidence_pct=result.confidence_pct, carbon_estimate_tco2_ha_yr=result.carbon_estimate_tco2_ha_yr, permanence_risk=result.permanence_risk, additionality=result.additionality, leakage_risk=result.leakage_risk, best_methodology=result.best_methodology, eligible_methodologies=result.verra_eligible, feature_importances=result.feature_importances, bio_score=result.bio_score, soil_score=result.soil_score, flux_score=result.flux_score, biome_score=result.biome_score, land_use_multiplier=result.land_use_multiplier, citations=result.citations, timestamp=result.timestamp, diversity=result.diversity, carbon_pools=result.carbon_pools, clay_stabilization=result.clay_stabilization, iom_estimate_t_c_ha=result.iom_estimate_t_c_ha, ) @app.post("/v1/score/auto") async def run_auto_score(request: AutoScoreRequest): """ Auto-fetch SoilGrids data and run MRV score. Fetches soil parameters from SoilGrids250m using lat/lon, then runs the full MRV scoring pipeline. """ try: from pipeline.soilgrids_fetch import fetch_soil_params soil = fetch_soil_params(request.lat, request.lon) except Exception as e: raise HTTPException( status_code=502, detail=f"Failed to fetch SoilGrids data: {str(e)}", ) result = run_mrv_score( taxa_abundances=request.taxa.to_dict(), soil_params=soil, biome=request.biome, land_use=request.land_use, ) # Generate certificate cert = generate_certificate_json( result, sample_id=request.sample_id or f"AUTO-{request.lat:.3f}_{request.lon:.3f}", site_name=request.site_name or f"Site ({request.lat:.3f}, {request.lon:.3f})", coordinates={"lat": request.lat, "lon": request.lon}, ) return { "score": result.score, "confidence_interval": result.confidence_interval, "confidence_pct": result.confidence_pct, "carbon_estimate_tco2_ha_yr": result.carbon_estimate_tco2_ha_yr, "permanence_risk": result.permanence_risk, "additionality": result.additionality, "leakage_risk": result.leakage_risk, "best_methodology": result.best_methodology, "certificate_id": cert["certificate_id"], "certificate_path": cert.get("_file_path"), "soil_params_fetched": { "ph": soil.ph, "soc_g_kg": soil.soc_g_kg, "clay_pct": soil.clay_pct, "bulk_density_g_cm3": soil.bulk_density_g_cm3, "cec_cmol_kg": soil.cec_cmol_kg, }, } @app.post("/v1/certificate") async def generate_certificate(request: ScoreRequest): """ Run MRV score and generate a full certificate (JSON + text). """ result = run_mrv_score( taxa_abundances=request.taxa.to_dict(), soil_params={ "ph": request.soil.ph, "soc_g_kg": request.soil.soc_g_kg, "clay_pct": request.soil.clay_pct, "bulk_density_g_cm3": request.soil.bulk_density_g_cm3, "cec_cmol_kg": request.soil.cec_cmol_kg, }, biome=request.biome, land_use=request.land_use, ) cert = generate_certificate_json( result, sample_id=request.sample_id or "API-REQUEST", site_name=request.site_name or "Unknown Site", coordinates=request.coordinates, ) text_cert = generate_certificate_text(cert) return { "certificate": cert, "text_display": text_cert, } @app.get("/v1/biomes") async def list_biomes(): """List all available biomes with FLUXNET reference values.""" return { biome_key: { "flux_tco2_ha_yr": ref["flux_tco2_ha_yr"], "soc_baseline_g_kg": ref["soc_baseline_g_kg"], "flux_direction": "sink" if ref["flux_tco2_ha_yr"] > 0 else "source", } for biome_key, ref in BIOME_REFS.items() } @app.get("/v1/methodologies") async def list_methodologies(): """List Verra methodology thresholds.""" return VERRA_THRESHOLDS @app.get("/v1/taxa") async def list_taxa(): """List all EMP taxa with weights and ecological roles.""" return EMP_TAXA_WEIGHTS @app.get("/v1/land-use") async def list_land_use(): """List land use categories and their multipliers.""" return LAND_USE_MULTIPLIERS @app.get("/v1/presets") async def list_presets(): """List canonical validation presets.""" return { "pristine": PRESET_PRISTINE, "regen": PRESET_REGEN, "degraded": PRESET_DEGRADED, } @app.get("/v1/presets/{preset_name}/score") async def run_preset(preset_name: str): """Run a canonical preset and return the score.""" presets = { "pristine": PRESET_PRISTINE, "regen": PRESET_REGEN, "degraded": PRESET_DEGRADED, } if preset_name not in presets: raise HTTPException( status_code=404, detail=f"Unknown preset. Must be one of: {list(presets.keys())}", ) p = presets[preset_name] result = run_mrv_score(p["taxa"], p["soil"], p["biome"], p["land_use"]) return { "preset": preset_name, "score": result.score, "confidence_interval": result.confidence_interval, "confidence_pct": result.confidence_pct, "carbon_estimate_tco2_ha_yr": result.carbon_estimate_tco2_ha_yr, "permanence_risk": result.permanence_risk, "best_methodology": result.best_methodology, } # ── v1.1 OPTIMIZATION ENDPOINTS ──────────────────────────────────────────── class BatchScoreRequest(BaseModel): """Batch MRV scoring — score multiple sites in one request.""" sites: list[ScoreRequest] = Field( ..., min_length=1, max_length=500, description="Array of ScoreRequest objects (max 500 per batch)", ) @app.post("/v1/score/batch") async def run_batch_score(request: BatchScoreRequest): """ Score multiple sites in a single request. Carbon project portfolios have 50–500 sites. Batch scoring processes all sites and returns an array of results with summary statistics. Max 500 sites per request. """ results = [] errors = [] for i, site in enumerate(request.sites): try: result = run_mrv_score( taxa_abundances=site.taxa.to_dict(), soil_params={ "ph": site.soil.ph, "soc_g_kg": site.soil.soc_g_kg, "clay_pct": site.soil.clay_pct, "bulk_density_g_cm3": site.soil.bulk_density_g_cm3, "cec_cmol_kg": site.soil.cec_cmol_kg, }, biome=site.biome, land_use=site.land_use, ) results.append({ "index": i, "sample_id": site.sample_id or f"batch-{i}", "score": result.score, "confidence_interval": result.confidence_interval, "confidence_pct": result.confidence_pct, "carbon_estimate_tco2_ha_yr": result.carbon_estimate_tco2_ha_yr, "permanence_risk": result.permanence_risk, "best_methodology": result.best_methodology, "bio_score": result.bio_score, "soil_score": result.soil_score, }) except Exception as e: errors.append({"index": i, "error": str(e)}) # Summary statistics scores = [r["score"] for r in results] summary = {} if scores: summary = { "total_sites": len(request.sites), "scored": len(results), "errors": len(errors), "mean_score": round(sum(scores) / len(scores), 1), "min_score": min(scores), "max_score": max(scores), "high_risk_count": sum(1 for s in scores if s < 40), "verra_eligible_count": sum(1 for s in scores if s >= 55), } return { "results": results, "errors": errors, "summary": summary, } class BiomeDetectRequest(BaseModel): """Auto-detect biome from GPS coordinates.""" lat: float = Field(..., ge=-90, le=90) lon: float = Field(..., ge=-180, le=180) @app.post("/v1/biome/detect") async def detect_biome_endpoint(request: BiomeDetectRequest): """ Auto-detect biome from GPS coordinates. Uses Copernicus land cover crosswalk + latitude-based climate zone rules. Eliminates the ~80% of MRV mistakes from wrong biome selection. """ result = detect_biome(request.lat, request.lon) return result @app.get("/v1/cache/stats") async def cache_stats(): """ Return cache performance statistics. Shows hit rate, size, evictions, and expiration counts for the in-memory response cache. """ return _cache.stats() @app.post("/v1/cache/clear") async def cache_clear(): """Clear the response cache. Returns count of evicted entries.""" count = _cache.clear() return {"cleared": count, "status": "ok"} # ── v2.0 MECHANISTIC ENDPOINTS ──────────────────────────────────────────── class UncertaintyRequest(BaseModel): """Monte Carlo uncertainty analysis request.""" taxa: TaxaInput soil: SoilInput biome: str land_use: str n_iterations: int = Field(2000, ge=100, le=10000, description="Monte Carlo iterations") sequencing_depth: int = Field(10000, ge=1000, le=1000000, description="16S sequencing depth") @field_validator("biome") @classmethod def validate_biome(cls, v): if v not in BIOME_REFS: raise ValueError(f"Invalid biome. Must be one of: {list(BIOME_REFS.keys())}") return v @field_validator("land_use") @classmethod def validate_land_use(cls, v): if v not in LAND_USE_MULTIPLIERS: raise ValueError(f"Invalid land_use. Must be one of: {list(LAND_USE_MULTIPLIERS.keys())}") return v @app.post("/v1/uncertainty") async def run_uncertainty_analysis(request: UncertaintyRequest): """ Monte Carlo uncertainty propagation for MRV scores. Samples taxa abundances from Beta distributions (sequencing-depth-dependent), soil parameters from Normal distributions (SoilGrids uncertainty bands), and land use multiplier from Triangular distribution. Returns: - Point estimate (median of N simulations) - 90% and 95% confidence intervals - Sensitivity analysis: which inputs contribute most variance - Score distribution summary Citation: uses Beta, Normal, and Triangular sampling per IPCC Tier 2 uncertainty guidance (IPCC 2006, Ch. 3). """ taxa_dict = request.taxa.to_dict() soil_dict = { "ph": request.soil.ph, "soc_g_kg": request.soil.soc_g_kg, "clay_pct": request.soil.clay_pct, "bulk_density_g_cm3": request.soil.bulk_density_g_cm3, "cec_cmol_kg": request.soil.cec_cmol_kg, } # Build uncertainty configurations taxa_unc = TaxaUncertainty(sequencing_depth=request.sequencing_depth) soil_unc = SoilUncertainty() flux_unc = FluxUncertainty() land_use_unc = LandUseUncertainty() # Define the scoring function for Monte Carlo sampling def scoring_fn(sampled_inputs): result = run_mrv_score( taxa_abundances=sampled_inputs.get("taxa", taxa_dict), soil_params=sampled_inputs.get("soil", soil_dict), biome=request.biome, land_use=request.land_use, ) return result.score mc_result = run_monte_carlo( scoring_fn=scoring_fn, base_inputs={"taxa": taxa_dict, "soil": soil_dict}, uncertainty_configs={ "taxa": taxa_unc, "soil": soil_unc, }, n_iterations=request.n_iterations, ) return { "point_estimate": mc_result.point_estimate, "mean": round(mc_result.mean, 2), "std": round(mc_result.std, 2), "ci_90": mc_result.ci_90, "ci_95": mc_result.ci_95, "n_iterations": mc_result.n_iterations, "sensitivity": mc_result.sensitivity, "score_distribution": { "min": min(mc_result.score_distribution), "max": max(mc_result.score_distribution), "p5": mc_result.ci_90[0], "p25": sorted(mc_result.score_distribution)[int(0.25 * len(mc_result.score_distribution))], "p50": mc_result.point_estimate, "p75": sorted(mc_result.score_distribution)[int(0.75 * len(mc_result.score_distribution))], "p95": mc_result.ci_90[1], }, "citation": "IPCC (2006) Guidelines for National Greenhouse Gas Inventories, Vol. 1, Ch. 3: Uncertainties.", } class TemporalRequest(BaseModel): """Bray-Curtis temporal change monitoring request.""" baseline_taxa: TaxaInput current_taxa: TaxaInput months_elapsed: int = Field(12, ge=1, description="Months between baseline and current") @app.post("/v1/diversity/temporal") async def temporal_change(request: TemporalRequest): """ Monitor microbial community change between two sampling events. Uses Bray-Curtis dissimilarity (Bray & Curtis, 1957) to quantify compositional shift. Combined with Shannon-Wiener diversity changes to assess whether biological carbon indicators are stable. Alert levels: NONE: BC < 0.10 (stable community) INFO: BC 0.10–0.25 (minor shift, normal variability) WARNING: BC 0.25–0.40 (moderate shift, investigate) CRITICAL: BC > 0.40 (major shift, re-score recommended) """ baseline = request.baseline_taxa.to_dict() current = request.current_taxa.to_dict() result = assess_temporal_change( baseline_abundances=baseline, current_abundances=current, time_months=request.months_elapsed, ) return { "bray_curtis": result.bray_curtis, "alert_level": result.alert_level, "shifted_taxa": result.shifted_taxa, "months_elapsed": request.months_elapsed, "interpretation": result.interpretation, "recommendation": result.recommendation, "citation": "Bray, J.R. & Curtis, J.T. (1957) An ordination of the upland forest communities of southern Wisconsin. Ecological Monographs 27:325-349.", }