market-intelligence / src /optimizer.py
jtlevine's picture
Fix four correctness bugs surfaced in the post-MOS-rip diagnostic (#3)
e85bfb6 unverified
"""
Sell Optimizer -- computes when and where to sell for maximum net returns.
For each (mandi, timing) combination, calculates:
net_price = market_price - transport_cost - storage_loss - mandi_fees
Recommends the optimal combination across space (which mandi) and time
(sell now vs wait 7/14/30 days).
"""
from dataclasses import dataclass, field
from typing import Literal, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from src.dpi.models import FarmerProfile, KCCRepaymentStatus
# KCC utilization thresholds used to classify headroom as risk vs strength.
# 85% is the standard concern threshold lenders use when assessing how much
# additional credit a farmer can absorb; below 40% with meaningful headroom
# is treated as a positive signal.
KCC_UTILIZATION_HIGH_PCT = 85.0
KCC_UTILIZATION_SAFE_PCT = 40.0
KCC_MIN_HEADROOM_FOR_STRENGTH_RS = 20_000.0
from config import (
COMMODITY_MAP,
MANDI_MAP,
MANDIS,
MIN_TRANSPORT_COST_RS,
POST_HARVEST_LOSS,
TRANSPORT_COST_RS_PER_QUINTAL_PER_KM,
MANDI_FEE_PCT,
Mandi,
)
from src.geo import haversine_km
@dataclass
class SellOption:
"""A single sell option: specific mandi + timing."""
mandi_id: str
mandi_name: str
commodity_id: str
sell_timing: str # "now", "7d", "14d", "30d"
market_price_rs: float
transport_cost_rs: float
storage_loss_rs: float
storage_cost_rs: float
mandi_fee_rs: float
net_price_rs: float
distance_km: float
drive_time_min: float
confidence: float
price_source: str # "current" or "forecasted"
@dataclass
class SellRecommendation:
"""Complete sell recommendation for a farmer."""
commodity_id: str
commodity_name: str
quantity_quintals: float
farmer_lat: float
farmer_lon: float
best_option: SellOption
all_options: list[SellOption]
potential_gain_rs: float
recommendation_text: str
def _estimate_drive_time_min(distance_km: float) -> float:
"""Estimate drive time in minutes (avg 30 km/h for rural Tamil Nadu)."""
return (distance_km / 30) * 60
def optimize_sell(
farmer_lat: float,
farmer_lon: float,
commodity_id: str,
quantity_quintals: float,
reconciled_prices: dict[str, dict],
forecasted_prices: dict[str, dict] | None = None,
max_distance_km: float = 60.0,
storage_cost_rs_per_quintal_per_month: float = 20.0,
) -> SellRecommendation:
"""Compute optimal sell strategy across mandis and time horizons.
Parameters
----------
farmer_lat, farmer_lon : float
Farmer's location.
commodity_id : str
Commodity to sell.
quantity_quintals : float
Amount to sell.
reconciled_prices : dict
Mandi_id -> {commodity_id: {price_rs, ...}} -- current reconciled prices.
forecasted_prices : dict, optional
Mandi_id -> {commodity_id: {price_7d, price_14d, price_30d, ...}}.
max_distance_km : float
Maximum travel distance to consider.
storage_cost_rs_per_quintal_per_month : float
Warehouse storage rental cost if applicable.
Returns
-------
SellRecommendation
Best option with all alternatives ranked.
"""
commodity = COMMODITY_MAP.get(commodity_id, {})
commodity_name = commodity.get("name", commodity_id)
loss_data = POST_HARVEST_LOSS.get(commodity_id, {})
storage_loss_pct_month = loss_data.get("storage_per_month", 2.5)
if forecasted_prices is None:
forecasted_prices = {}
all_options: list[SellOption] = []
nearest_now_price = 0.0 # for potential gain calculation
for mandi in MANDIS:
if commodity_id not in mandi.commodities_traded:
continue
distance = haversine_km(farmer_lat, farmer_lon, mandi.latitude, mandi.longitude)
if distance > max_distance_km:
continue
drive_time = _estimate_drive_time_min(distance)
transport_cost = max(
MIN_TRANSPORT_COST_RS,
distance * TRANSPORT_COST_RS_PER_QUINTAL_PER_KM,
)
# Current price
mandi_prices = reconciled_prices.get(mandi.mandi_id, {})
commodity_price_data = mandi_prices.get(commodity_id, {})
current_price = commodity_price_data.get("price_rs", 0)
if current_price <= 0:
continue
# Track nearest mandi for potential gain
if nearest_now_price == 0.0:
nearest_now_price = current_price - transport_cost - (current_price * MANDI_FEE_PCT / 100)
# Time horizons
timings = [
("now", current_price, 0, "current"),
]
# Add forecasted prices
mandi_forecasts = forecasted_prices.get(mandi.mandi_id, {})
commodity_forecast = mandi_forecasts.get(commodity_id, {})
for label, key, months in [
("7d", "price_7d", 7 / 30),
("14d", "price_14d", 14 / 30),
("30d", "price_30d", 30 / 30),
]:
forecast_price = commodity_forecast.get(key, 0)
if forecast_price > 0:
timings.append((label, forecast_price, months, "forecasted"))
for timing_label, market_price, storage_months, price_source in timings:
# Storage loss (% of value per month of storage)
storage_loss_value = market_price * (storage_loss_pct_month / 100) * storage_months
storage_cost = storage_cost_rs_per_quintal_per_month * storage_months
# Mandi fee
mandi_fee = market_price * (MANDI_FEE_PCT / 100)
# Net price per quintal
net_price = market_price - transport_cost - storage_loss_value - storage_cost - mandi_fee
# Confidence decreases with forecast horizon
if price_source == "current":
confidence = commodity_price_data.get("confidence", 0.85)
else:
horizon_days = {"7d": 7, "14d": 14, "30d": 30}.get(timing_label, 7)
confidence = max(0.40, 0.85 - horizon_days * 0.01)
all_options.append(SellOption(
mandi_id=mandi.mandi_id,
mandi_name=mandi.name,
commodity_id=commodity_id,
sell_timing=timing_label,
market_price_rs=round(market_price, 0),
transport_cost_rs=round(transport_cost, 0),
storage_loss_rs=round(storage_loss_value, 0),
storage_cost_rs=round(storage_cost, 0),
mandi_fee_rs=round(mandi_fee, 0),
net_price_rs=round(net_price, 0),
distance_km=round(distance, 1),
drive_time_min=round(drive_time, 0),
confidence=round(confidence, 2),
price_source=price_source,
))
# Sort by net price descending
all_options.sort(key=lambda o: o.net_price_rs, reverse=True)
if not all_options:
# Distinguish between the three distinct empty-result cases so the
# farmer-facing message is honest:
# 1. Commodity not traded at any tracked market (config gap)
# 2. Commodity traded somewhere, but no recent price data (ingest
# gap — e.g. KAMIS had no bean prices today)
# 3. Data exists but everything is beyond max_distance_km
mandis_trading = [m for m in MANDIS if commodity_id in m.commodities_traded]
mandis_with_prices = [
m for m in mandis_trading
if reconciled_prices.get(m.mandi_id, {}).get(commodity_id, {}).get("price_rs", 0) > 0
]
if not mandis_trading:
fallback_label = f"{commodity_name} not traded at tracked markets"
elif not mandis_with_prices:
fallback_label = f"No recent price data for {commodity_name}"
else:
fallback_label = "No markets in range"
return SellRecommendation(
commodity_id=commodity_id,
commodity_name=commodity_name,
quantity_quintals=quantity_quintals,
farmer_lat=farmer_lat,
farmer_lon=farmer_lon,
best_option=SellOption(
mandi_id="", mandi_name=fallback_label,
commodity_id=commodity_id, sell_timing="now",
market_price_rs=0, transport_cost_rs=0, storage_loss_rs=0,
storage_cost_rs=0, mandi_fee_rs=0, net_price_rs=0,
distance_km=0, drive_time_min=0, confidence=0, price_source="none",
),
all_options=[],
potential_gain_rs=0,
recommendation_text="No mandis trading this commodity within travel distance.",
)
best = all_options[0]
# Nearest mandi selling now (for comparison)
now_options = [o for o in all_options if o.sell_timing == "now"]
if now_options:
nearest_now = min(now_options, key=lambda o: o.distance_km)
nearest_now_price = nearest_now.net_price_rs
potential_gain = (best.net_price_rs - nearest_now_price) * quantity_quintals
# Generate recommendation text
rec_text = _generate_recommendation_text(
best, all_options, commodity_name, quantity_quintals, potential_gain, nearest_now_price,
)
return SellRecommendation(
commodity_id=commodity_id,
commodity_name=commodity_name,
quantity_quintals=quantity_quintals,
farmer_lat=farmer_lat,
farmer_lon=farmer_lon,
best_option=best,
all_options=all_options[:15], # top 15 options
potential_gain_rs=round(potential_gain, 0),
recommendation_text=rec_text,
)
def _generate_recommendation_text(
best: SellOption,
all_options: list[SellOption],
commodity_name: str,
quantity: float,
potential_gain: float,
nearest_now_price: float,
) -> str:
"""Generate plain-language sell recommendation."""
parts = []
if best.sell_timing == "now":
parts.append(
f"Sell {commodity_name} NOW at {best.mandi_name} ({best.distance_km:.0f} km). "
f"Market price: Rs {best.market_price_rs:,.0f}/quintal. "
f"After transport (Rs {best.transport_cost_rs:,.0f}) and fees "
f"(Rs {best.mandi_fee_rs:,.0f}), net: Rs {best.net_price_rs:,.0f}/quintal."
)
else:
parts.append(
f"WAIT {best.sell_timing} and sell at {best.mandi_name} ({best.distance_km:.0f} km). "
f"Forecasted price: Rs {best.market_price_rs:,.0f}/quintal. "
f"After transport, storage loss (Rs {best.storage_loss_rs:,.0f}), and fees, "
f"net: Rs {best.net_price_rs:,.0f}/quintal."
)
if potential_gain > 0:
parts.append(
f"Potential gain over nearest mandi: Rs {potential_gain:,.0f} "
f"on {quantity:.0f} quintals."
)
# Compare best "sell now" vs best "wait"
now_options = [o for o in all_options if o.sell_timing == "now"]
wait_options = [o for o in all_options if o.sell_timing != "now"]
if now_options and wait_options:
best_now = now_options[0]
best_wait = wait_options[0]
if best_wait.net_price_rs > best_now.net_price_rs:
gain_per_quintal = best_wait.net_price_rs - best_now.net_price_rs
parts.append(
f"Waiting advantage: Rs {gain_per_quintal:,.0f}/quintal gain by selling "
f"{best_wait.sell_timing} at {best_wait.mandi_name} vs selling now at "
f"{best_now.mandi_name}."
)
else:
parts.append("Prices are expected to decline -- sell sooner rather than later.")
if best.confidence < 0.6:
parts.append("Note: forecast confidence is moderate. Monitor prices daily.")
return " ".join(parts)
# ---------------------------------------------------------------------------
# Credit readiness assessment — same data, farmer-facing view
# ---------------------------------------------------------------------------
@dataclass
class CreditReadiness:
"""Farmer-facing credit readiness assessment derived from sell optimization."""
readiness: Literal["strong", "moderate", "not_yet"]
expected_revenue_rs: float
min_revenue_rs: float
max_advisable_input_loan_rs: float
revenue_confidence: float
loan_to_revenue_pct: float # if farmer were to borrow max_advisable
strengths: list[str]
risks: list[str]
advice_en: str
advice_ta: str # Tamil placeholder
# DPI-aware fields — populated when a dpi_profile is passed in. Zeros
# + dpi_checked=False means the assessment never saw DPI data, so the
# dashboard can distinguish "no DPI" from "checked DPI, zero KCC".
kcc_limit_rs: float = 0.0
kcc_outstanding_rs: float = 0.0
kcc_repayment_status: str = "" # "" when no profile; otherwise KCCRepaymentStatus
dpi_checked: bool = False
@property
def kcc_headroom_rs(self) -> float:
return max(0.0, self.kcc_limit_rs - self.kcc_outstanding_rs)
def assess_credit_readiness(
rec: SellRecommendation,
has_storage: bool = True,
typical_input_loan_rs: float | None = None,
dpi_profile: Optional["FarmerProfile"] = None, # type: ignore[name-defined]
) -> CreditReadiness:
"""Assess whether a farmer should seek credit, based on her sell optimization.
This is advice FOR THE FARMER, not a score for a lender.
When `dpi_profile` is provided, the assessment uses the farmer's real
Kisan Credit Card state (limit, outstanding, repayment status) and
their registered land records to sharpen the recommendation:
- Max advisable loan is capped at the KCC headroom, not just at
40% of projected revenue.
- KCC near-limit utilization downgrades the classification.
- Overdue or defaulted KCC status forces "not_yet".
- Registered land acreage that doesn't support the claimed yield
shows up as a risk.
When `dpi_profile` is None, the function falls back to the prior
rule-based behavior — pure projection from sell data.
"""
if not rec.all_options or rec.best_option.net_price_rs <= 0:
return CreditReadiness(
readiness="not_yet",
expected_revenue_rs=0,
min_revenue_rs=0,
max_advisable_input_loan_rs=0,
revenue_confidence=0,
loan_to_revenue_pct=0,
strengths=[],
risks=["No market data available to estimate harvest revenue"],
advice_en="We don't have enough market data to assess your credit readiness right now. Check back after prices are available.",
advice_ta="",
dpi_checked=dpi_profile is not None,
)
best = rec.best_option
quantity = rec.quantity_quintals
# Revenue scenarios
expected_revenue = best.net_price_rs * quantity
now_options = [o for o in rec.all_options if o.sell_timing == "now"]
worst_net = min(o.net_price_rs for o in rec.all_options) if rec.all_options else best.net_price_rs
min_revenue = worst_net * quantity
# Conservative baseline: advisable loan is up to 40% of expected revenue.
# When DPI is available, cap at the real KCC headroom instead — this is
# the central upgrade the DPI integration delivers.
projected_max = expected_revenue * 0.40
if dpi_profile is not None and dpi_profile.kcc is not None:
kcc_headroom = dpi_profile.kcc.headroom
max_advisable = min(projected_max, kcc_headroom)
else:
max_advisable = projected_max
# If typical input loan provided, use it for comparison
if typical_input_loan_rs is None:
# Rough estimate: Rs 15,000-20,000 per hectare for most TN crops
# Use quantity as proxy (1 quintal ≈ 0.03-0.05 ha depending on yield)
typical_input_loan_rs = min(max_advisable, 25_000)
loan_to_revenue = (typical_input_loan_rs / expected_revenue * 100) if expected_revenue > 0 else 999
# Strengths and risks
strengths = []
risks = []
if best.confidence >= 0.75:
strengths.append("Strong price forecast confidence")
elif best.confidence < 0.55:
risks.append("Price forecast has low confidence — actual revenue may differ")
if has_storage:
strengths.append("Storage available — you can wait for better prices if needed")
else:
risks.append("No storage — you must sell quickly, limiting price flexibility")
if expected_revenue > typical_input_loan_rs * 3:
strengths.append(f"Expected revenue (Rs {expected_revenue:,.0f}) is well above typical input costs")
elif expected_revenue < typical_input_loan_rs * 1.5:
risks.append("Expected revenue is close to input costs — tight margins if prices drop")
if rec.potential_gain_rs > 0:
strengths.append(f"Agent found Rs {rec.potential_gain_rs:,.0f} more value by optimizing where and when you sell")
price_spread = best.net_price_rs - worst_net if worst_net > 0 else 0
if price_spread > best.net_price_rs * 0.15:
risks.append("Large price variation across markets — revenue depends on selling at the right time and place")
if len(now_options) < 2:
risks.append("Few markets trading your commodity nearby")
# ── DPI-derived strengths and risks ──────────────────────────────────
# Land-area vs claimed-yield cross-checks are deliberately omitted: they
# require a commodity-specific yield table (rice 40 q/ha, banana 300 q/ha,
# pulses 7 q/ha) which would duplicate data from the DPI simulator. The
# value of the DPI integration lives in the KCC state below — that's
# where real lender decisions actually get made.
kcc_force_not_yet = False
kcc_limit = 0.0
kcc_outstanding = 0.0
kcc_headroom = 0.0
kcc_repayment = ""
if dpi_profile is not None:
# Commodity-vs-registration check: the rec's commodity should match
# something on the farmer's registered crops. Mismatch is low-stakes
# (crop rotations happen) but worth surfacing.
if dpi_profile.land_records and rec.commodity_id not in dpi_profile.primary_crops:
risks.append(
f"{rec.commodity_name} is not on your registered crop list — "
f"verify your land record before applying for input credit"
)
if dpi_profile.kcc is not None:
kcc_limit = dpi_profile.kcc.credit_limit
kcc_outstanding = dpi_profile.kcc.outstanding
kcc_headroom = dpi_profile.kcc.headroom
kcc_repayment = dpi_profile.kcc.repayment_status
util_pct = dpi_profile.kcc.utilization_pct
if kcc_repayment == "defaulted":
risks.append(
f"KCC shows a prior default — most lenders will not extend "
f"new credit until it's resolved"
)
kcc_force_not_yet = True
elif kcc_repayment == "overdue":
risks.append(
f"KCC is overdue on your last payment — clear it before seeking new credit"
)
elif util_pct >= KCC_UTILIZATION_HIGH_PCT:
risks.append(
f"KCC is {util_pct:.0f}% used (Rs {kcc_outstanding:,.0f} outstanding) — "
f"very little headroom for additional borrowing"
)
elif util_pct <= KCC_UTILIZATION_SAFE_PCT and kcc_headroom > KCC_MIN_HEADROOM_FOR_STRENGTH_RS:
strengths.append(
f"KCC has Rs {kcc_headroom:,.0f} headroom (card only {util_pct:.0f}% used)"
)
if kcc_repayment == "current" and util_pct < KCC_UTILIZATION_HIGH_PCT:
strengths.append("KCC payments are current — you're in good standing with your card")
else:
risks.append(
"No Kisan Credit Card on file — consider enrolling at your PACS "
"for subsidized input credit"
)
# Readiness determination
if kcc_force_not_yet:
readiness = "not_yet"
elif len(risks) == 0 and expected_revenue > typical_input_loan_rs * 2:
readiness = "strong"
elif len(risks) <= 1 and expected_revenue > typical_input_loan_rs * 1.5:
readiness = "moderate"
else:
readiness = "not_yet"
# Generate farmer-facing advice
advice_en = _credit_advice_en(readiness, expected_revenue, min_revenue, max_advisable, loan_to_revenue, strengths, risks, rec)
advice_ta = "" # Tamil via Claude in recommendation agent
return CreditReadiness(
readiness=readiness,
expected_revenue_rs=round(expected_revenue, 0),
min_revenue_rs=round(min_revenue, 0),
max_advisable_input_loan_rs=round(max_advisable, 0),
revenue_confidence=best.confidence,
loan_to_revenue_pct=round(loan_to_revenue, 1),
strengths=strengths,
risks=risks,
advice_en=advice_en,
advice_ta=advice_ta,
kcc_limit_rs=round(kcc_limit, 0),
kcc_outstanding_rs=round(kcc_outstanding, 0),
kcc_repayment_status=kcc_repayment,
dpi_checked=dpi_profile is not None,
)
def _credit_advice_en(
readiness: str,
expected: float,
minimum: float,
max_advisable: float,
loan_pct: float,
strengths: list[str],
risks: list[str],
rec: SellRecommendation,
) -> str:
"""Generate plain-language credit readiness advice for the farmer."""
commodity = rec.commodity_name
quantity = rec.quantity_quintals
best = rec.best_option
if readiness == "strong":
return (
f"Your {commodity} harvest ({quantity:.0f} quintals) is expected to earn "
f"Rs {expected:,.0f} at {best.mandi_name}. "
f"An input loan of up to Rs {max_advisable:,.0f} looks manageable — "
f"that's {loan_pct:.0f}% of your expected revenue. "
f"Consider applying through your local SACCO, FPO, or mobile platform."
)
elif readiness == "moderate":
return (
f"Your {commodity} harvest ({quantity:.0f} quintals) should earn around "
f"Rs {expected:,.0f}, but {'prices are uncertain' if any('confidence' in r.lower() for r in risks) else 'margins are tight'}. "
f"A smaller input loan — up to Rs {max_advisable:,.0f} — could work, "
f"but keep the amount conservative. "
f"{'Consider crop insurance to protect against downside.' if not any('storage' in s.lower() for s in strengths) else 'Your storage gives you flexibility to wait for better prices.'}"
)
else:
main_risk = risks[0] if risks else "uncertain revenue"
return (
f"Based on current market conditions, applying for a large input loan "
f"carries risk: {main_risk.lower()}. "
f"Your expected revenue is Rs {expected:,.0f}, with a worst case of Rs {minimum:,.0f}. "
f"Consider starting with a very small amount, or waiting until after harvest "
f"when you have cash in hand."
)
def credit_readiness_to_dict(cr: CreditReadiness) -> dict:
"""Convert CreditReadiness to a JSON-serializable dict."""
return {
"readiness": cr.readiness,
"expected_revenue_rs": cr.expected_revenue_rs,
"min_revenue_rs": cr.min_revenue_rs,
"max_advisable_input_loan_rs": cr.max_advisable_input_loan_rs,
"revenue_confidence": cr.revenue_confidence,
"loan_to_revenue_pct": cr.loan_to_revenue_pct,
"strengths": cr.strengths,
"risks": cr.risks,
"advice_en": cr.advice_en,
"advice_ta": cr.advice_ta,
"kcc_limit_rs": cr.kcc_limit_rs,
"kcc_outstanding_rs": cr.kcc_outstanding_rs,
"kcc_headroom_rs": cr.kcc_headroom_rs,
"kcc_repayment_status": cr.kcc_repayment_status,
"dpi_checked": cr.dpi_checked,
}
def recommendation_to_dict(rec: SellRecommendation) -> dict:
"""Convert a SellRecommendation to a JSON-serializable dict."""
return {
"commodity_id": rec.commodity_id,
"commodity_name": rec.commodity_name,
"quantity_quintals": rec.quantity_quintals,
"farmer_lat": rec.farmer_lat,
"farmer_lon": rec.farmer_lon,
"best_option": {
"mandi_id": rec.best_option.mandi_id,
"mandi_name": rec.best_option.mandi_name,
"sell_timing": rec.best_option.sell_timing,
"market_price_rs": rec.best_option.market_price_rs,
"transport_cost_rs": rec.best_option.transport_cost_rs,
"storage_loss_rs": rec.best_option.storage_loss_rs,
"mandi_fee_rs": rec.best_option.mandi_fee_rs,
"net_price_rs": rec.best_option.net_price_rs,
"distance_km": rec.best_option.distance_km,
"confidence": rec.best_option.confidence,
"price_source": rec.best_option.price_source,
},
"all_options": [
{
"mandi_id": o.mandi_id,
"mandi_name": o.mandi_name,
"sell_timing": o.sell_timing,
"market_price_rs": o.market_price_rs,
"transport_cost_rs": o.transport_cost_rs,
"storage_loss_rs": o.storage_loss_rs,
"mandi_fee_rs": o.mandi_fee_rs,
"net_price_rs": o.net_price_rs,
"distance_km": o.distance_km,
"confidence": o.confidence,
"price_source": o.price_source,
}
for o in rec.all_options
],
"potential_gain_rs": rec.potential_gain_rs,
"recommendation_text": rec.recommendation_text,
"farmer_id": "",
"farmer_name": "",
# Local-language translation + ISO 639-1 code. Populated by the
# RECOMMEND step (recommendation_agent) after this dict is built;
# this is just the default shape so downstream consumers see a
# stable key. "ta" = Tamil (India), "sw" = Swahili (Kenya).
"recommendation_local": "",
"local_language_code": "",
}