market-intelligence / src /policy /features.py
jtlevine's picture
Phase 11 v2 prep: KAMIS scraper 10x, seasonal calendars, market swap, Chronos precompute script
281418e
"""Feature builder for the DFL hold/sell policy.
`build_dp_features(dp, history, forecast=None, exogenous=None)` returns a
flat dict of finite floats/ints. No NaN is ever produced -- missing inputs
are filled with spot_price (prices) or 0.0 (exogenous).
Shared between training (`train_dfl.py`) and inference (`dfl_policy.py`),
so the feature order is stable and documented via FEATURE_NAMES.
"""
from __future__ import annotations
import math
from datetime import date, datetime
from typing import Any, Iterable, Mapping
# Stable, ordered list. train_dfl.py and dfl_policy.py both rely on this
# exact ordering when converting a feature dict to a numpy row.
FEATURE_NAMES: tuple[str, ...] = (
# Rolling prices (6)
"z_score_30d",
"z_score_14d",
"return_7d",
"return_30d",
"realized_vol_14d",
"realized_vol_30d",
# Seasonal (4)
"month",
"day_of_year_sin",
"day_of_year_cos",
"seasonal_flag",
# Forecast tail (9)
"forecast_q10_7d",
"forecast_q50_7d",
"forecast_q90_7d",
"forecast_q10_14d",
"forecast_q50_14d",
"forecast_q90_14d",
"forecast_q10_30d",
"forecast_q50_30d",
"forecast_q90_30d",
# Exogenous (3)
"rainfall_anomaly_90d",
"fx_30d_return_local",
"global_price_momentum",
# Identity (2)
"commodity_hash",
"region_flag",
)
# Per-commodity harvest-season calendars. seasonal_flag encoding:
# 0 = lean, 1 = planting, 2 = growth, 3 = harvest.
# Keys are lowercased commodity substrings; lookup is substring match so
# "Dry maize", "dry_maize", "Maize (Dry)" all resolve to the same calendar.
_SEASON_CALENDARS: dict[str, dict[int, int]] = {
# India pulses listed first (most specific substrings) so "Moong (Green
# Gram)"-shaped labels don't accidentally match the Kenya "green gram" key.
"tur": {1: 3, 2: 0, 3: 0, 4: 0, 5: 0, 6: 1, 7: 1, 8: 2, 9: 2, 10: 3, 11: 3, 12: 3},
"moong": {1: 3, 2: 0, 3: 0, 4: 0, 5: 0, 6: 1, 7: 1, 8: 2, 9: 2, 10: 3, 11: 3, 12: 3},
"urad": {1: 3, 2: 0, 3: 0, 4: 0, 5: 0, 6: 1, 7: 1, 8: 2, 9: 2, 10: 3, 11: 3, 12: 3},
"masur": {1: 2, 2: 3, 3: 3, 4: 3, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 1, 11: 1, 12: 2},
# Kenya maize (two rainy seasons): short rains harvest Feb-Mar,
# long rains harvest Aug-Oct, lean May-Jul.
"maize": {
1: 3, 2: 3, 3: 3, 4: 1, 5: 0, 6: 0,
7: 0, 8: 3, 9: 3, 10: 3, 11: 1, 12: 2,
},
# Kenya beans + green grams: long-rains sow Mar-Apr → harvest Jul-Aug,
# short-rains sow Oct-Nov → harvest Jan-Feb.
"bean": {1: 3, 2: 3, 3: 1, 4: 1, 5: 2, 6: 2, 7: 3, 8: 3, 9: 0, 10: 1, 11: 1, 12: 2},
"green gram": {1: 3, 2: 3, 3: 1, 4: 1, 5: 2, 6: 2, 7: 3, 8: 3, 9: 0, 10: 1, 11: 1, 12: 2},
# Kenya Irish potato (highland, Nyandarua/Meru): long-rains crop planted
# Mar-Apr → harvested Jul-Aug; short-rains planted Sep-Oct → Dec-Jan.
"potato": {1: 3, 2: 0, 3: 1, 4: 1, 5: 2, 6: 2, 7: 3, 8: 3, 9: 1, 10: 1, 11: 2, 12: 3},
}
# Keywords in mandi strings that indicate the Kenya region.
_KENYA_KEYWORDS = (
"bomet", "bungoma", "busia", "embu", "kakamega", "kericho", "kiambu",
"kisii", "kisumu", "kitui", "machakos", "makueni", "meru", "mombasa",
"nairobi", "nakuru", "nyandarua", "nyeri", "trans-nzoia", "uasin",
"kenya",
)
def _get(obj: Any, name: str, default: Any = None) -> Any:
"""Read attribute or dict key with a single signature."""
if obj is None:
return default
if isinstance(obj, Mapping):
return obj.get(name, default)
return getattr(obj, name, default)
def _to_date(value: Any) -> date:
if isinstance(value, datetime):
return value.date()
if isinstance(value, date):
return value
if isinstance(value, str):
# Accept both "2024-01-15" and "2024-01-15T00:00:00".
return datetime.fromisoformat(value.split("T")[0]).date()
raise TypeError(f"Cannot coerce {type(value).__name__} to date: {value!r}")
def _as_float(x: Any, fallback: float = 0.0) -> float:
try:
v = float(x)
except (TypeError, ValueError):
return fallback
if math.isnan(v) or math.isinf(v):
return fallback
return v
def _history_prices(history: Iterable[Mapping[str, Any]] | None) -> list[tuple[date, float]]:
"""Normalize a heterogeneous history list into [(date, price)] sorted ascending.
Accepts dicts with any of: modal_price_rs, price, spot_price_rs_per_quintal.
"""
out: list[tuple[date, float]] = []
if not history:
return out
for row in history:
d = row.get("date") if isinstance(row, Mapping) else None
if d is None:
continue
try:
d = _to_date(d)
except (TypeError, ValueError):
continue
price = None
for k in ("modal_price_rs", "price", "spot_price_rs_per_quintal", "modal_price"):
if k in row and row[k] is not None:
price = row[k]
break
p = _as_float(price, float("nan"))
if not math.isfinite(p):
continue
out.append((d, p))
out.sort(key=lambda t: t[0])
return out
def _rolling_window(series: list[float], n: int) -> list[float]:
if not series:
return []
return series[-n:] if len(series) > n else list(series)
def _mean(xs: list[float]) -> float:
return sum(xs) / len(xs) if xs else 0.0
def _stdev(xs: list[float]) -> float:
if len(xs) < 2:
return 0.0
m = _mean(xs)
var = sum((x - m) ** 2 for x in xs) / (len(xs) - 1)
return math.sqrt(var) if var > 0 else 0.0
def _log_return(prev: float, cur: float) -> float:
if prev <= 0 or cur <= 0:
return 0.0
return math.log(cur / prev)
def _realized_vol(prices: list[float]) -> float:
"""Stdev of consecutive log-returns (unannualized)."""
if len(prices) < 3:
return 0.0
rets = [
math.log(prices[i] / prices[i - 1])
for i in range(1, len(prices))
if prices[i - 1] > 0 and prices[i] > 0
]
return _stdev(rets)
def _seasonal_flag(commodity: str, month: int) -> int:
c = (commodity or "").lower()
for key, cal in _SEASON_CALENDARS.items():
if key in c:
return int(cal.get(month, 0))
return 0
def _region_flag(mandi: str) -> int:
m = (mandi or "").lower()
return 1 if any(k in m for k in _KENYA_KEYWORDS) else 0
def _commodity_hash(commodity: str) -> int:
# abs(hash(...)) % 100 is not stable across Python processes (PYTHONHASHSEED),
# so we use a deterministic sum-of-char-codes hash.
s = (commodity or "").lower().strip()
h = 0
for ch in s:
h = (h * 131 + ord(ch)) & 0xFFFFFFFF
return h % 100
def _forecast_slot(
forecast: Mapping[int, Mapping[str, float]] | None,
horizon: int,
spot: float,
) -> tuple[float, float, float]:
"""Return (q10, q50, q90) for a horizon. Fills with spot if absent."""
if forecast is None:
return spot, spot, spot
fc = forecast.get(horizon) if isinstance(forecast, Mapping) else None
if fc is None and isinstance(forecast, Mapping):
# Allow str keys too.
fc = forecast.get(str(horizon))
if not fc:
return spot, spot, spot
q50 = _as_float(fc.get("q50"), spot)
q10 = _as_float(fc.get("q10"), q50)
q90 = _as_float(fc.get("q90"), q50)
return q10, q50, q90
def build_dp_features(
dp: Any,
history: Iterable[Mapping[str, Any]] | None = None,
forecast: Mapping[int, Mapping[str, float]] | None = None,
exogenous: Mapping[str, float] | None = None,
) -> dict[str, float]:
"""Build a flat feature dict for a decision point.
All returned values are finite floats or small ints -- no NaN, no inf.
Missing inputs are filled sensibly:
- empty history -> rolling stats are 0 (returns) or 1.0 (z-scores)
- forecast=None -> fill with spot_price so the slot isn't a vacuum
- exogenous=None -> 0.0
"""
commodity = _get(dp, "commodity", "") or ""
mandi = _get(dp, "mandi", "") or ""
decision_date = _to_date(_get(dp, "decision_date"))
spot = _as_float(_get(dp, "spot_price_rs_per_quintal"), 0.0)
hist_dated = _history_prices(history)
# Defensive: a price of 0 breaks log-returns downstream. If we get it,
# fall back to whatever the most recent history price was, else 1.0.
if spot <= 0:
spot = hist_dated[-1][1] if hist_dated else 1.0
hist_prices = [p for _, p in hist_dated]
# Include the spot as the "current" observation for rolling stats.
prices_with_spot = hist_prices + [spot]
win30 = _rolling_window(prices_with_spot, 30)
win14 = _rolling_window(prices_with_spot, 14)
mean30, std30 = _mean(win30), _stdev(win30)
mean14, std14 = _mean(win14), _stdev(win14)
z30 = (spot - mean30) / std30 if std30 > 1e-9 else 0.0
z14 = (spot - mean14) / std14 if std14 > 1e-9 else 0.0
# Log-returns looking back 7 / 30 observations.
if len(prices_with_spot) > 7:
ret7 = _log_return(prices_with_spot[-8], spot)
else:
ret7 = 0.0
if len(prices_with_spot) > 30:
ret30 = _log_return(prices_with_spot[-31], spot)
else:
ret30 = _log_return(prices_with_spot[0], spot) if len(prices_with_spot) >= 2 else 0.0
vol14 = _realized_vol(win14)
vol30 = _realized_vol(win30)
month = decision_date.month
doy = decision_date.timetuple().tm_yday
# 365.25 gives a smooth year boundary.
doy_sin = math.sin(2.0 * math.pi * doy / 365.25)
doy_cos = math.cos(2.0 * math.pi * doy / 365.25)
flag = _seasonal_flag(commodity, month)
f7 = _forecast_slot(forecast, 7, spot)
f14 = _forecast_slot(forecast, 14, spot)
f30 = _forecast_slot(forecast, 30, spot)
if exogenous is None:
rain_anom = fx_ret = gpi_mom = 0.0
else:
rain_anom = _as_float(exogenous.get("rainfall_anomaly_90d"), 0.0)
fx_ret = _as_float(exogenous.get("fx_30d_return_local"), 0.0)
gpi_mom = _as_float(exogenous.get("global_price_momentum"), 0.0)
out = {
"z_score_30d": float(z30),
"z_score_14d": float(z14),
"return_7d": float(ret7),
"return_30d": float(ret30),
"realized_vol_14d": float(vol14),
"realized_vol_30d": float(vol30),
"month": int(month),
"day_of_year_sin": float(doy_sin),
"day_of_year_cos": float(doy_cos),
"seasonal_flag": int(flag),
"forecast_q10_7d": float(f7[0]),
"forecast_q50_7d": float(f7[1]),
"forecast_q90_7d": float(f7[2]),
"forecast_q10_14d": float(f14[0]),
"forecast_q50_14d": float(f14[1]),
"forecast_q90_14d": float(f14[2]),
"forecast_q10_30d": float(f30[0]),
"forecast_q50_30d": float(f30[1]),
"forecast_q90_30d": float(f30[2]),
"rainfall_anomaly_90d": float(rain_anom),
"fx_30d_return_local": float(fx_ret),
"global_price_momentum": float(gpi_mom),
"commodity_hash": int(_commodity_hash(commodity)),
"region_flag": int(_region_flag(mandi)),
}
# Final sanity pass: replace any non-finite slip-through with 0.0.
for k, v in list(out.items()):
if isinstance(v, float) and not math.isfinite(v):
out[k] = 0.0
return out
__all__ = ["FEATURE_NAMES", "build_dp_features"]