climate-risk-engine / src /pricing /budget_optimizer.py
jtlevine's picture
Initial commit: Extreme Heat Risk Engine
e2d3383
"""
Budget allocation optimizer for philanthropic heat insurance programs.
Given a fixed budget, allocates funds across zones to maximize
risk-adjusted impact. Zones where each dollar prevents the most
harm get funded first.
Strategy:
impact_score = (risk_weight * enrolled_workers) / actuarial_cost
Greedy allocation in descending impact_score order — fully fund
the highest-impact zone, then the next, until budget runs out.
The last zone may be partially funded.
Also provides stretch analysis: "if workers contribute $X/year,
coverage increases from Y% to Z%."
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from src.pricing.actuarial import ActuarialResult
log = logging.getLogger(__name__)
# Map vulnerability label to numeric weight for impact scoring
_VULN_WEIGHTS = {
"high": 1.0,
"moderate": 0.6,
"low": 0.3,
}
@dataclass
class ZoneAllocation:
"""Allocation outcome for a single zone."""
zone_id: str
zone_name: str
city: str
actuarial_cost_per_worker: float
allocated_budget: float
workers_covered: int
workers_total: int
coverage_pct: float
priority_rank: int
impact_score: float
@dataclass
class AllocationResult:
"""Full allocation outcome across all zones."""
total_budget: float
total_workers_covered: int
total_workers_enrolled: int
overall_coverage_pct: float
zones_fully_funded: int
zones_partially_funded: int
zones_unfunded: int
allocations: list[ZoneAllocation]
stretch_analysis: dict
class BudgetOptimizer:
"""Greedy budget allocator optimizing risk-adjusted impact per dollar."""
def optimize(
self,
budget_usd: float,
actuarial_results: list[ActuarialResult],
payout_per_event: float,
worker_contribution: float = 0.0,
) -> AllocationResult:
"""
Allocate a fixed budget across zones by impact-per-dollar.
Parameters
----------
budget_usd : float
Total philanthropic budget available.
actuarial_results : list[ActuarialResult]
Pricing results from ActuarialPricer.
payout_per_event : float
USD payout per event per worker (used for risk weighting).
worker_contribution : float
Optional per-worker annual co-pay that reduces the subsidy needed.
Returns
-------
AllocationResult with per-zone allocations and stretch analysis.
"""
# Build scored list
scored = []
for ar in actuarial_results:
# Look up vulnerability from the zone config
from config import ZONE_MAP
zone = ZONE_MAP.get(ar.zone_id)
vuln = zone.heat_vulnerability if zone else "moderate"
risk_weight = _VULN_WEIGHTS.get(vuln, 0.5)
# Net cost after worker contribution
net_cost = max(ar.cost_per_worker_year - worker_contribution, 0.01)
total_zone_cost = net_cost * ar.enrolled_workers
# Impact = risk_weight * enrolled / total_zone_cost
# High risk zones with many workers and low cost rank highest
impact_score = (risk_weight * ar.enrolled_workers) / total_zone_cost
scored.append({
"ar": ar,
"net_cost": net_cost,
"total_zone_cost": total_zone_cost,
"impact_score": impact_score,
"risk_weight": risk_weight,
})
# Sort by impact score descending
scored.sort(key=lambda s: s["impact_score"], reverse=True)
# Greedy allocation
remaining = budget_usd
allocations: list[ZoneAllocation] = []
rank = 0
fully_funded = 0
partially_funded = 0
unfunded = 0
total_covered = 0
for s in scored:
ar = s["ar"]
zone_cost = s["total_zone_cost"]
net_cost_pw = s["net_cost"]
rank += 1
if remaining <= 0:
# Unfunded
alloc = ZoneAllocation(
zone_id=ar.zone_id,
zone_name=ar.zone_name,
city=ar.city,
actuarial_cost_per_worker=ar.cost_per_worker_year,
allocated_budget=0.0,
workers_covered=0,
workers_total=ar.enrolled_workers,
coverage_pct=0.0,
priority_rank=rank,
impact_score=round(s["impact_score"], 4),
)
allocations.append(alloc)
unfunded += 1
continue
if remaining >= zone_cost:
# Fully funded
allocated = zone_cost
covered = ar.enrolled_workers
remaining -= allocated
fully_funded += 1
else:
# Partially funded — cover as many workers as budget allows
covered = int(remaining / net_cost_pw)
covered = min(covered, ar.enrolled_workers)
allocated = covered * net_cost_pw
remaining -= allocated
partially_funded += 1
total_covered += covered
coverage = (covered / max(ar.enrolled_workers, 1)) * 100 # percentage
alloc = ZoneAllocation(
zone_id=ar.zone_id,
zone_name=ar.zone_name,
city=ar.city,
actuarial_cost_per_worker=ar.cost_per_worker_year,
allocated_budget=round(allocated, 2),
workers_covered=covered,
workers_total=ar.enrolled_workers,
coverage_pct=round(coverage, 4),
priority_rank=rank,
impact_score=round(s["impact_score"], 4),
)
allocations.append(alloc)
total_enrolled = sum(ar.enrolled_workers for ar in actuarial_results)
overall_coverage = (total_covered / max(total_enrolled, 1)) * 100 # percentage
# Stretch analysis — what if workers contribute $2/year?
stretch_contributions = [2.0, 5.0, 10.0]
stretch = {}
for contrib in stretch_contributions:
stretch_result = self._run_allocation(
budget_usd, actuarial_results, contrib
)
stretch[f"${contrib:.0f}/worker"] = {
"workers_covered": stretch_result["covered"],
"coverage_pct": round(stretch_result["coverage"], 4),
"coverage_increase_pct": round(
stretch_result["coverage"] - overall_coverage, 4
),
"additional_workers": stretch_result["covered"] - total_covered,
}
stretch_analysis = {
"baseline_coverage_pct": round(overall_coverage, 4),
"scenarios": stretch,
"summary": self._stretch_summary(
overall_coverage, total_covered, stretch, total_enrolled
),
}
result = AllocationResult(
total_budget=budget_usd,
total_workers_covered=total_covered,
total_workers_enrolled=total_enrolled,
overall_coverage_pct=round(overall_coverage, 4),
zones_fully_funded=fully_funded,
zones_partially_funded=partially_funded,
zones_unfunded=unfunded,
allocations=allocations,
stretch_analysis=stretch_analysis,
)
log.info(
"Budget $%,.0f: %d/%d zones fully funded, %d partial, %.0f%% coverage (%d/%d workers)",
budget_usd, fully_funded, len(actuarial_results),
partially_funded, overall_coverage * 100,
total_covered, total_enrolled,
)
return result
def _run_allocation(
self,
budget_usd: float,
actuarial_results: list[ActuarialResult],
worker_contribution: float,
) -> dict:
"""Quick allocation pass for stretch analysis (no full result object)."""
from config import ZONE_MAP
scored = []
for ar in actuarial_results:
zone = ZONE_MAP.get(ar.zone_id)
vuln = zone.heat_vulnerability if zone else "moderate"
risk_weight = _VULN_WEIGHTS.get(vuln, 0.5)
net_cost = max(ar.cost_per_worker_year - worker_contribution, 0.01)
total_zone_cost = net_cost * ar.enrolled_workers
impact = (risk_weight * ar.enrolled_workers) / total_zone_cost
scored.append({
"ar": ar,
"net_cost": net_cost,
"total_zone_cost": total_zone_cost,
"impact": impact,
})
scored.sort(key=lambda s: s["impact"], reverse=True)
remaining = budget_usd
covered = 0
total_enrolled = sum(ar.enrolled_workers for ar in actuarial_results)
for s in scored:
if remaining <= 0:
break
zone_cost = s["total_zone_cost"]
if remaining >= zone_cost:
covered += s["ar"].enrolled_workers
remaining -= zone_cost
else:
partial = int(remaining / s["net_cost"])
partial = min(partial, s["ar"].enrolled_workers)
covered += partial
remaining -= partial * s["net_cost"]
return {
"covered": covered,
"coverage": (covered / max(total_enrolled, 1)) * 100, # percentage
}
@staticmethod
def _stretch_summary(
baseline: float,
baseline_workers: int,
stretch: dict,
total_enrolled: int,
) -> str:
"""Human-readable stretch summary."""
parts = []
for label, data in stretch.items():
increase = data["coverage_increase_pct"]
additional = data["additional_workers"]
if increase > 0:
parts.append(
f"With {label} contribution, coverage increases by "
f"{increase:.0%} ({additional:,} additional workers)"
)
if parts:
return "; ".join(parts)
return "Worker contributions would not materially increase coverage at this budget level"