groundtruth-mrv / api /main.py
CryptoThaler's picture
Deploy GroundTruth MRV API v2.0.0 β€” RothC clay, Shannon diversity, CENTURY pools, Monte Carlo uncertainty
dc6b07d verified
"""
GroundTruth FastAPI Server β€” Biological Carbon Verification API v2.0
Wraps pipeline/mrv_model.py as a REST API with v2.0 mechanistic enrichments:
- RothC clay stabilization (replaces linear clay scoring)
- Shannon-Wiener diversity + diagnostic strength
- CENTURY carbon pool inference from oligotrophic ratio
- Monte Carlo uncertainty propagation (optional)
- Bray-Curtis temporal community monitoring
Plus v1.x: batch scoring, response caching, biome auto-detection
Endpoints:
POST /v1/score β€” Run MRV score (now with v2.0 enrichments)
POST /v1/score/batch β€” Batch score multiple sites
POST /v1/score/auto β€” Auto-fetch SoilGrids + score (lat/lon + taxa)
POST /v1/certificate β€” Generate MRV certificate
POST /v1/biome/detect β€” Auto-detect biome from GPS
POST /v1/uncertainty β€” Monte Carlo uncertainty analysis
POST /v1/diversity/temporal β€” Bray-Curtis temporal change monitoring
GET /v1/biomes β€” List available biomes and references
GET /v1/methodologies β€” List Verra methodology thresholds
GET /v1/taxa β€” List EMP taxa with weights and roles
GET /v1/land-use β€” List land use multipliers
GET /v1/presets β€” List canonical presets
GET /v1/presets/{name}/score β€” Run canonical preset
GET /v1/cache/stats β€” Cache performance statistics
GET /v1/health β€” Health check
Usage:
uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, field_validator
from pipeline.mrv_model import (
run_mrv_score,
MRVResult,
EMP_TAXA_WEIGHTS,
BIOME_REFS,
LAND_USE_MULTIPLIERS,
VERRA_THRESHOLDS,
PRESET_PRISTINE,
PRESET_REGEN,
PRESET_DEGRADED,
)
from pipeline.certificate_generator import (
generate_certificate_json,
generate_certificate_text,
)
from pipeline.cache import get_default_cache, ScoreCache
from pipeline.biome_detect import detect_biome
from pipeline.uncertainty import run_monte_carlo, TaxaUncertainty, SoilUncertainty, FluxUncertainty, LandUseUncertainty
from pipeline.diversity_index import compute_bray_curtis, assess_temporal_change
from pipeline.rothc_factors import infer_carbon_pools
# Module-level cache instance
_cache = get_default_cache()
app = FastAPI(
title="GroundTruth MRV API",
description=(
"Biological Carbon Verification Engine v2.0 β€” "
"EMP 16S Profiles x SoilGrids250m x FLUXNET2015 x Verra Registry Standards. "
"v2.0: RothC clay stabilization, Shannon-Wiener diversity, CENTURY pool inference, "
"Monte Carlo uncertainty propagation, Bray-Curtis temporal monitoring."
),
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── REQUEST / RESPONSE MODELS ───────────────────────────────────────────────
class TaxaInput(BaseModel):
"""EMP 16S taxon relative abundances (0–100%)."""
Koribacteraceae: float = Field(50.0, ge=0, le=100)
Acidobacteria_6: float = Field(50.0, ge=0, le=100, alias="Acidobacteria-6")
Bradyrhizobium: float = Field(50.0, ge=0, le=100)
Rhodoplanes: float = Field(50.0, ge=0, le=100)
Steroidobacter: float = Field(50.0, ge=0, le=100)
Pseudomonas: float = Field(50.0, ge=0, le=100)
Sphingomonas: float = Field(50.0, ge=0, le=100)
Bacillus: float = Field(50.0, ge=0, le=100)
Arthrobacter: float = Field(50.0, ge=0, le=100)
model_config = {"populate_by_name": True}
def to_dict(self) -> dict:
return {
"Koribacteraceae": self.Koribacteraceae,
"Acidobacteria-6": self.Acidobacteria_6,
"Bradyrhizobium": self.Bradyrhizobium,
"Rhodoplanes": self.Rhodoplanes,
"Steroidobacter": self.Steroidobacter,
"Pseudomonas": self.Pseudomonas,
"Sphingomonas": self.Sphingomonas,
"Bacillus": self.Bacillus,
"Arthrobacter": self.Arthrobacter,
}
class SoilInput(BaseModel):
"""SoilGrids250m physical parameters."""
ph: float = Field(6.2, ge=3.0, le=10.0)
soc_g_kg: float = Field(35.0, ge=0, le=500)
clay_pct: float = Field(25.0, ge=0, le=100)
bulk_density_g_cm3: float = Field(1.2, ge=0.3, le=2.5)
cec_cmol_kg: float = Field(20.0, ge=0, le=200)
class ScoreRequest(BaseModel):
"""Full MRV scoring request."""
taxa: TaxaInput
soil: SoilInput
biome: str = Field(..., description="One of: " + ", ".join(BIOME_REFS.keys()))
land_use: str = Field(..., description="One of: " + ", ".join(LAND_USE_MULTIPLIERS.keys()))
sample_id: Optional[str] = None
site_name: Optional[str] = None
coordinates: Optional[dict] = None
@field_validator("biome")
@classmethod
def validate_biome(cls, v):
if v not in BIOME_REFS:
raise ValueError(f"Invalid biome. Must be one of: {list(BIOME_REFS.keys())}")
return v
@field_validator("land_use")
@classmethod
def validate_land_use(cls, v):
if v not in LAND_USE_MULTIPLIERS:
raise ValueError(f"Invalid land_use. Must be one of: {list(LAND_USE_MULTIPLIERS.keys())}")
return v
class AutoScoreRequest(BaseModel):
"""Auto-fetch SoilGrids and score β€” requires only coordinates + taxa."""
lat: float = Field(..., ge=-90, le=90)
lon: float = Field(..., ge=-180, le=180)
taxa: TaxaInput
biome: str
land_use: str
sample_id: Optional[str] = None
site_name: Optional[str] = None
@field_validator("biome")
@classmethod
def validate_biome(cls, v):
if v not in BIOME_REFS:
raise ValueError(f"Invalid biome. Must be one of: {list(BIOME_REFS.keys())}")
return v
@field_validator("land_use")
@classmethod
def validate_land_use(cls, v):
if v not in LAND_USE_MULTIPLIERS:
raise ValueError(f"Invalid land_use. Must be one of: {list(LAND_USE_MULTIPLIERS.keys())}")
return v
class ScoreResponse(BaseModel):
"""MRV scoring response with v2.0 mechanistic enrichments."""
score: int
confidence_interval: list[int]
confidence_pct: int
carbon_estimate_tco2_ha_yr: float
permanence_risk: str
additionality: str
leakage_risk: str
best_methodology: Optional[dict]
eligible_methodologies: list[dict]
feature_importances: list[dict]
bio_score: int
soil_score: int
flux_score: int
biome_score: int
land_use_multiplier: float
citations: list[str]
timestamp: str
# v2.0 enrichments
diversity: Optional[dict] = None
carbon_pools: Optional[dict] = None
clay_stabilization: Optional[dict] = None
iom_estimate_t_c_ha: Optional[float] = None
# ── ENDPOINTS ────────────────────────────────────────────────────────────────
@app.get("/v1/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"platform": "GroundTruth MRV",
"version": "2.0.0",
"timestamp": datetime.now(timezone.utc).isoformat(),
"v1_optimizations": [
"batch_scoring", "response_caching", "biome_auto_detect",
"sigmoid_taxa_curves", "multi_depth_soil", "ndvi_signal",
"ameriflux_ingestion", "ssurgo_integration",
],
"v2_capabilities": [
"rothc_clay_stabilization", "shannon_wiener_diversity",
"century_pool_inference", "monte_carlo_uncertainty",
"bray_curtis_temporal_monitoring",
],
}
@app.post("/v1/score", response_model=ScoreResponse)
async def run_score(request: ScoreRequest):
"""
Run MRV carbon verification score.
Fuses biological (EMP 16S), physical (SoilGrids), and flux (FLUXNET)
signals with a land use multiplier to produce an auditable score 0–100.
"""
result = run_mrv_score(
taxa_abundances=request.taxa.to_dict(),
soil_params={
"ph": request.soil.ph,
"soc_g_kg": request.soil.soc_g_kg,
"clay_pct": request.soil.clay_pct,
"bulk_density_g_cm3": request.soil.bulk_density_g_cm3,
"cec_cmol_kg": request.soil.cec_cmol_kg,
},
biome=request.biome,
land_use=request.land_use,
)
return ScoreResponse(
score=result.score,
confidence_interval=result.confidence_interval,
confidence_pct=result.confidence_pct,
carbon_estimate_tco2_ha_yr=result.carbon_estimate_tco2_ha_yr,
permanence_risk=result.permanence_risk,
additionality=result.additionality,
leakage_risk=result.leakage_risk,
best_methodology=result.best_methodology,
eligible_methodologies=result.verra_eligible,
feature_importances=result.feature_importances,
bio_score=result.bio_score,
soil_score=result.soil_score,
flux_score=result.flux_score,
biome_score=result.biome_score,
land_use_multiplier=result.land_use_multiplier,
citations=result.citations,
timestamp=result.timestamp,
diversity=result.diversity,
carbon_pools=result.carbon_pools,
clay_stabilization=result.clay_stabilization,
iom_estimate_t_c_ha=result.iom_estimate_t_c_ha,
)
@app.post("/v1/score/auto")
async def run_auto_score(request: AutoScoreRequest):
"""
Auto-fetch SoilGrids data and run MRV score.
Fetches soil parameters from SoilGrids250m using lat/lon,
then runs the full MRV scoring pipeline.
"""
try:
from pipeline.soilgrids_fetch import fetch_soil_params
soil = fetch_soil_params(request.lat, request.lon)
except Exception as e:
raise HTTPException(
status_code=502,
detail=f"Failed to fetch SoilGrids data: {str(e)}",
)
result = run_mrv_score(
taxa_abundances=request.taxa.to_dict(),
soil_params=soil,
biome=request.biome,
land_use=request.land_use,
)
# Generate certificate
cert = generate_certificate_json(
result,
sample_id=request.sample_id or f"AUTO-{request.lat:.3f}_{request.lon:.3f}",
site_name=request.site_name or f"Site ({request.lat:.3f}, {request.lon:.3f})",
coordinates={"lat": request.lat, "lon": request.lon},
)
return {
"score": result.score,
"confidence_interval": result.confidence_interval,
"confidence_pct": result.confidence_pct,
"carbon_estimate_tco2_ha_yr": result.carbon_estimate_tco2_ha_yr,
"permanence_risk": result.permanence_risk,
"additionality": result.additionality,
"leakage_risk": result.leakage_risk,
"best_methodology": result.best_methodology,
"certificate_id": cert["certificate_id"],
"certificate_path": cert.get("_file_path"),
"soil_params_fetched": {
"ph": soil.ph,
"soc_g_kg": soil.soc_g_kg,
"clay_pct": soil.clay_pct,
"bulk_density_g_cm3": soil.bulk_density_g_cm3,
"cec_cmol_kg": soil.cec_cmol_kg,
},
}
@app.post("/v1/certificate")
async def generate_certificate(request: ScoreRequest):
"""
Run MRV score and generate a full certificate (JSON + text).
"""
result = run_mrv_score(
taxa_abundances=request.taxa.to_dict(),
soil_params={
"ph": request.soil.ph,
"soc_g_kg": request.soil.soc_g_kg,
"clay_pct": request.soil.clay_pct,
"bulk_density_g_cm3": request.soil.bulk_density_g_cm3,
"cec_cmol_kg": request.soil.cec_cmol_kg,
},
biome=request.biome,
land_use=request.land_use,
)
cert = generate_certificate_json(
result,
sample_id=request.sample_id or "API-REQUEST",
site_name=request.site_name or "Unknown Site",
coordinates=request.coordinates,
)
text_cert = generate_certificate_text(cert)
return {
"certificate": cert,
"text_display": text_cert,
}
@app.get("/v1/biomes")
async def list_biomes():
"""List all available biomes with FLUXNET reference values."""
return {
biome_key: {
"flux_tco2_ha_yr": ref["flux_tco2_ha_yr"],
"soc_baseline_g_kg": ref["soc_baseline_g_kg"],
"flux_direction": "sink" if ref["flux_tco2_ha_yr"] > 0 else "source",
}
for biome_key, ref in BIOME_REFS.items()
}
@app.get("/v1/methodologies")
async def list_methodologies():
"""List Verra methodology thresholds."""
return VERRA_THRESHOLDS
@app.get("/v1/taxa")
async def list_taxa():
"""List all EMP taxa with weights and ecological roles."""
return EMP_TAXA_WEIGHTS
@app.get("/v1/land-use")
async def list_land_use():
"""List land use categories and their multipliers."""
return LAND_USE_MULTIPLIERS
@app.get("/v1/presets")
async def list_presets():
"""List canonical validation presets."""
return {
"pristine": PRESET_PRISTINE,
"regen": PRESET_REGEN,
"degraded": PRESET_DEGRADED,
}
@app.get("/v1/presets/{preset_name}/score")
async def run_preset(preset_name: str):
"""Run a canonical preset and return the score."""
presets = {
"pristine": PRESET_PRISTINE,
"regen": PRESET_REGEN,
"degraded": PRESET_DEGRADED,
}
if preset_name not in presets:
raise HTTPException(
status_code=404,
detail=f"Unknown preset. Must be one of: {list(presets.keys())}",
)
p = presets[preset_name]
result = run_mrv_score(p["taxa"], p["soil"], p["biome"], p["land_use"])
return {
"preset": preset_name,
"score": result.score,
"confidence_interval": result.confidence_interval,
"confidence_pct": result.confidence_pct,
"carbon_estimate_tco2_ha_yr": result.carbon_estimate_tco2_ha_yr,
"permanence_risk": result.permanence_risk,
"best_methodology": result.best_methodology,
}
# ── v1.1 OPTIMIZATION ENDPOINTS ────────────────────────────────────────────
class BatchScoreRequest(BaseModel):
"""Batch MRV scoring β€” score multiple sites in one request."""
sites: list[ScoreRequest] = Field(
..., min_length=1, max_length=500,
description="Array of ScoreRequest objects (max 500 per batch)",
)
@app.post("/v1/score/batch")
async def run_batch_score(request: BatchScoreRequest):
"""
Score multiple sites in a single request.
Carbon project portfolios have 50–500 sites. Batch scoring processes
all sites and returns an array of results with summary statistics.
Max 500 sites per request.
"""
results = []
errors = []
for i, site in enumerate(request.sites):
try:
result = run_mrv_score(
taxa_abundances=site.taxa.to_dict(),
soil_params={
"ph": site.soil.ph,
"soc_g_kg": site.soil.soc_g_kg,
"clay_pct": site.soil.clay_pct,
"bulk_density_g_cm3": site.soil.bulk_density_g_cm3,
"cec_cmol_kg": site.soil.cec_cmol_kg,
},
biome=site.biome,
land_use=site.land_use,
)
results.append({
"index": i,
"sample_id": site.sample_id or f"batch-{i}",
"score": result.score,
"confidence_interval": result.confidence_interval,
"confidence_pct": result.confidence_pct,
"carbon_estimate_tco2_ha_yr": result.carbon_estimate_tco2_ha_yr,
"permanence_risk": result.permanence_risk,
"best_methodology": result.best_methodology,
"bio_score": result.bio_score,
"soil_score": result.soil_score,
})
except Exception as e:
errors.append({"index": i, "error": str(e)})
# Summary statistics
scores = [r["score"] for r in results]
summary = {}
if scores:
summary = {
"total_sites": len(request.sites),
"scored": len(results),
"errors": len(errors),
"mean_score": round(sum(scores) / len(scores), 1),
"min_score": min(scores),
"max_score": max(scores),
"high_risk_count": sum(1 for s in scores if s < 40),
"verra_eligible_count": sum(1 for s in scores if s >= 55),
}
return {
"results": results,
"errors": errors,
"summary": summary,
}
class BiomeDetectRequest(BaseModel):
"""Auto-detect biome from GPS coordinates."""
lat: float = Field(..., ge=-90, le=90)
lon: float = Field(..., ge=-180, le=180)
@app.post("/v1/biome/detect")
async def detect_biome_endpoint(request: BiomeDetectRequest):
"""
Auto-detect biome from GPS coordinates.
Uses Copernicus land cover crosswalk + latitude-based climate zone rules.
Eliminates the ~80% of MRV mistakes from wrong biome selection.
"""
result = detect_biome(request.lat, request.lon)
return result
@app.get("/v1/cache/stats")
async def cache_stats():
"""
Return cache performance statistics.
Shows hit rate, size, evictions, and expiration counts for
the in-memory response cache.
"""
return _cache.stats()
@app.post("/v1/cache/clear")
async def cache_clear():
"""Clear the response cache. Returns count of evicted entries."""
count = _cache.clear()
return {"cleared": count, "status": "ok"}
# ── v2.0 MECHANISTIC ENDPOINTS ────────────────────────────────────────────
class UncertaintyRequest(BaseModel):
"""Monte Carlo uncertainty analysis request."""
taxa: TaxaInput
soil: SoilInput
biome: str
land_use: str
n_iterations: int = Field(2000, ge=100, le=10000, description="Monte Carlo iterations")
sequencing_depth: int = Field(10000, ge=1000, le=1000000, description="16S sequencing depth")
@field_validator("biome")
@classmethod
def validate_biome(cls, v):
if v not in BIOME_REFS:
raise ValueError(f"Invalid biome. Must be one of: {list(BIOME_REFS.keys())}")
return v
@field_validator("land_use")
@classmethod
def validate_land_use(cls, v):
if v not in LAND_USE_MULTIPLIERS:
raise ValueError(f"Invalid land_use. Must be one of: {list(LAND_USE_MULTIPLIERS.keys())}")
return v
@app.post("/v1/uncertainty")
async def run_uncertainty_analysis(request: UncertaintyRequest):
"""
Monte Carlo uncertainty propagation for MRV scores.
Samples taxa abundances from Beta distributions (sequencing-depth-dependent),
soil parameters from Normal distributions (SoilGrids uncertainty bands),
and land use multiplier from Triangular distribution.
Returns:
- Point estimate (median of N simulations)
- 90% and 95% confidence intervals
- Sensitivity analysis: which inputs contribute most variance
- Score distribution summary
Citation: uses Beta, Normal, and Triangular sampling per
IPCC Tier 2 uncertainty guidance (IPCC 2006, Ch. 3).
"""
taxa_dict = request.taxa.to_dict()
soil_dict = {
"ph": request.soil.ph,
"soc_g_kg": request.soil.soc_g_kg,
"clay_pct": request.soil.clay_pct,
"bulk_density_g_cm3": request.soil.bulk_density_g_cm3,
"cec_cmol_kg": request.soil.cec_cmol_kg,
}
# Build uncertainty configurations
taxa_unc = TaxaUncertainty(sequencing_depth=request.sequencing_depth)
soil_unc = SoilUncertainty()
flux_unc = FluxUncertainty()
land_use_unc = LandUseUncertainty()
# Define the scoring function for Monte Carlo sampling
def scoring_fn(sampled_inputs):
result = run_mrv_score(
taxa_abundances=sampled_inputs.get("taxa", taxa_dict),
soil_params=sampled_inputs.get("soil", soil_dict),
biome=request.biome,
land_use=request.land_use,
)
return result.score
mc_result = run_monte_carlo(
scoring_fn=scoring_fn,
base_inputs={"taxa": taxa_dict, "soil": soil_dict},
uncertainty_configs={
"taxa": taxa_unc,
"soil": soil_unc,
},
n_iterations=request.n_iterations,
)
return {
"point_estimate": mc_result.point_estimate,
"mean": round(mc_result.mean, 2),
"std": round(mc_result.std, 2),
"ci_90": mc_result.ci_90,
"ci_95": mc_result.ci_95,
"n_iterations": mc_result.n_iterations,
"sensitivity": mc_result.sensitivity,
"score_distribution": {
"min": min(mc_result.score_distribution),
"max": max(mc_result.score_distribution),
"p5": mc_result.ci_90[0],
"p25": sorted(mc_result.score_distribution)[int(0.25 * len(mc_result.score_distribution))],
"p50": mc_result.point_estimate,
"p75": sorted(mc_result.score_distribution)[int(0.75 * len(mc_result.score_distribution))],
"p95": mc_result.ci_90[1],
},
"citation": "IPCC (2006) Guidelines for National Greenhouse Gas Inventories, Vol. 1, Ch. 3: Uncertainties.",
}
class TemporalRequest(BaseModel):
"""Bray-Curtis temporal change monitoring request."""
baseline_taxa: TaxaInput
current_taxa: TaxaInput
months_elapsed: int = Field(12, ge=1, description="Months between baseline and current")
@app.post("/v1/diversity/temporal")
async def temporal_change(request: TemporalRequest):
"""
Monitor microbial community change between two sampling events.
Uses Bray-Curtis dissimilarity (Bray & Curtis, 1957) to quantify
compositional shift. Combined with Shannon-Wiener diversity changes
to assess whether biological carbon indicators are stable.
Alert levels:
NONE: BC < 0.10 (stable community)
INFO: BC 0.10–0.25 (minor shift, normal variability)
WARNING: BC 0.25–0.40 (moderate shift, investigate)
CRITICAL: BC > 0.40 (major shift, re-score recommended)
"""
baseline = request.baseline_taxa.to_dict()
current = request.current_taxa.to_dict()
result = assess_temporal_change(
baseline_abundances=baseline,
current_abundances=current,
time_months=request.months_elapsed,
)
return {
"bray_curtis": result.bray_curtis,
"alert_level": result.alert_level,
"shifted_taxa": result.shifted_taxa,
"months_elapsed": request.months_elapsed,
"interpretation": result.interpretation,
"recommendation": result.recommendation,
"citation": "Bray, J.R. & Curtis, J.T. (1957) An ordination of the upland forest communities of southern Wisconsin. Ecological Monographs 27:325-349.",
}