| | """
|
| | Pricing Decision Core — Robust Optimization under Elasticity Uncertainty
|
| |
|
| | Purpose:
|
| | Select a price that maximizes risk-adjusted profit by propagating
|
| | uncertainty in demand elasticity into profit distributions and penalizing
|
| | downside fragility.
|
| |
|
| | Core Assumptions:
|
| | - Demand follows a power-law response to price: q = A * p^beta
|
| | - Elasticity uncertainty is captured via bootstrap resampling
|
| | - Decisions are evaluated using median profit and downside risk
|
| |
|
| | What this module DOES:
|
| | - Estimates elasticity
|
| | - Propagates uncertainty to profit
|
| | - Selects robust prices
|
| |
|
| | What this module DOES NOT do:
|
| | - Forecast demand over time
|
| | - Handle multiple SKUs
|
| | - Perform MLOps or deployment
|
| | """
|
| |
|
| |
|
| | import numpy as np
|
| | import pandas as pd
|
| | from typing import Tuple
|
| |
|
| |
|
| | def _qty_col(df: pd.DataFrame) -> str:
|
| | if "qty" in df.columns:
|
| | return "qty"
|
| | if "demand" in df.columns:
|
| | return "demand"
|
| | raise KeyError("Input df must contain 'qty' or 'demand' column.")
|
| |
|
| |
|
| | def estimate_loglog_elasticity(df: pd.DataFrame) -> Tuple[float, float]:
|
| | """
|
| | log(q) = a + b*log(p)
|
| | Returns: (a, b) where a=log(A), b=elasticity
|
| | """
|
| | data = df.copy()
|
| | q_col = _qty_col(data)
|
| |
|
| | data = data[(data["price"] > 0) & (data[q_col] > 0)].copy()
|
| | if len(data) < 3:
|
| | raise ValueError("Need at least 3 valid observations to fit elasticity.")
|
| |
|
| | X = np.log(data["price"].astype(float).values)
|
| | y = np.log(data[q_col].astype(float).values)
|
| |
|
| | X_mat = np.column_stack([np.ones(len(X)), X])
|
| |
|
| |
|
| | beta_hat, *_ = np.linalg.lstsq(X_mat, y, rcond=None)
|
| |
|
| | return float(beta_hat[0]), float(beta_hat[1])
|
| |
|
| |
|
| |
|
| | def profit_curve(
|
| | prices: np.ndarray,
|
| | intercept: float,
|
| | elasticity: float,
|
| | cost: float,
|
| | ) -> pd.DataFrame:
|
| | """
|
| | Compute demand and profit for a grid of prices.
|
| | """
|
| |
|
| | A = np.exp(intercept)
|
| |
|
| | demand = A * (prices ** elasticity)
|
| | profit = (prices - cost) * demand
|
| |
|
| | return pd.DataFrame(
|
| | {
|
| | "price": prices,
|
| | "demand": demand,
|
| | "profit": profit,
|
| | }
|
| | )
|
| |
|
| |
|
| |
|
| | def optimal_price(
|
| | curve: pd.DataFrame,
|
| | ) -> dict:
|
| | """
|
| | Select price that maximizes profit.
|
| | """
|
| | idx = curve["profit"].idxmax()
|
| | row = curve.loc[idx]
|
| |
|
| | return {
|
| | "price": float(row["price"]),
|
| | "profit": float(row["profit"]),
|
| | "demand": float(row["demand"]),
|
| | }
|
| |
|
| |
|
| | def bootstrap_optimal_price(
|
| | df: pd.DataFrame,
|
| | cost: float,
|
| | n_boot: int = 200,
|
| | n_grid: int = 200,
|
| | seed: int = 42,
|
| | ) -> pd.DataFrame:
|
| | """
|
| | Bootstrap uncertainty over elasticity by resampling rows (time periods) with replacement.
|
| | Returns a table of bootstrap draws with (intercept, elasticity, opt_price, opt_profit).
|
| | """
|
| | rng = np.random.default_rng(seed)
|
| |
|
| |
|
| | data = df[(df["price"] > 0) & (df[_qty_col(df)] > 0)].copy()
|
| | q_col = _qty_col(data)
|
| | data = data.rename(columns={q_col: "qty"})
|
| |
|
| | n = len(data)
|
| | if n < 10:
|
| | raise ValueError("Need at least 10 observations for bootstrap stability.")
|
| |
|
| |
|
| | p_min, p_max = float(data["price"].min()), float(data["price"].max())
|
| | price_grid = np.linspace(p_min, p_max, n_grid)
|
| |
|
| | rows = []
|
| | for _ in range(n_boot):
|
| |
|
| | idx = rng.integers(0, n, size=n)
|
| | sample = data.iloc[idx]
|
| |
|
| | a_hat, b_hat = estimate_loglog_elasticity(sample)
|
| |
|
| | curve = profit_curve(price_grid, a_hat, b_hat, cost)
|
| | dec = optimal_price(curve)
|
| |
|
| | rows.append(
|
| | {
|
| | "intercept": a_hat,
|
| | "elasticity": b_hat,
|
| | "opt_price": dec["price"],
|
| | "opt_profit": dec["profit"],
|
| | "opt_demand": dec["demand"],
|
| | }
|
| | )
|
| |
|
| | return pd.DataFrame(rows)
|
| |
|
| |
|
| | def decision_stability_summary(boot: pd.DataFrame) -> dict:
|
| | """
|
| | Summarize stability of the optimal price decision.
|
| | """
|
| | q10, q50, q90 = boot["opt_price"].quantile([0.1, 0.5, 0.9]).tolist()
|
| | spread = q90 - q10
|
| |
|
| | return {
|
| | "opt_price_median": float(q50),
|
| | "opt_price_q10": float(q10),
|
| | "opt_price_q90": float(q90),
|
| | "opt_price_spread_q90_q10": float(spread),
|
| | "elasticity_median": float(boot["elasticity"].median()),
|
| | "elasticity_q10": float(boot["elasticity"].quantile(0.1)),
|
| | "elasticity_q90": float(boot["elasticity"].quantile(0.9)),
|
| | }
|
| |
|
| |
|
| | def stability_flag(summary: dict, max_spread_frac: float = 0.15) -> dict:
|
| | """
|
| | Flag whether decision is stable: opt price spread <= max_spread_frac * median price.
|
| | """
|
| | denom = max(summary["opt_price_median"], 1e-9)
|
| | frac = summary["opt_price_spread_q90_q10"] / denom
|
| | return {
|
| | "stable": bool(frac <= max_spread_frac),
|
| | "spread_fraction_of_median": float(frac),
|
| | "threshold": float(max_spread_frac),
|
| | }
|
| |
|
| |
|
| | def robust_optimal_price(
|
| | boot_params: pd.DataFrame,
|
| | cost: float,
|
| | price_grid: np.ndarray,
|
| | risk_lambda: float = 0.5,
|
| | downside_quantile: float = 0.1,
|
| | ) -> dict:
|
| | """
|
| | Robust price selection: maximize median(profit) - lambda * downside_risk(profit)
|
| |
|
| | downside_risk(profit) = median(profit) - q_downside(profit)
|
| | where q_downside is e.g. 10th percentile across bootstrap draws.
|
| |
|
| | Inputs:
|
| | boot_params: DataFrame with columns ["intercept", "elasticity"] from bootstrap
|
| | cost: unit cost
|
| | price_grid: candidate prices to evaluate
|
| | risk_lambda: penalty weight (0 = median-only, higher = more conservative)
|
| | downside_quantile: e.g. 0.1 for q10
|
| |
|
| | Returns dict with chosen price and diagnostics.
|
| | """
|
| | if boot_params.empty:
|
| | raise ValueError("boot_params is empty.")
|
| |
|
| | A = np.exp(boot_params["intercept"].values)
|
| | beta = boot_params["elasticity"].values
|
| | prices = price_grid.astype(float)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | demand = A[:, None] * (prices[None, :] ** beta[:, None])
|
| | profit = (prices[None, :] - float(cost)) * demand
|
| |
|
| | med = np.median(profit, axis=0)
|
| | q_down = np.quantile(profit, downside_quantile, axis=0)
|
| | downside_risk = med - q_down
|
| |
|
| | score = med - risk_lambda * downside_risk
|
| |
|
| | best_idx = int(np.argmax(score))
|
| | p_star = float(prices[best_idx])
|
| |
|
| | return {
|
| | "price": p_star,
|
| | "score": float(score[best_idx]),
|
| | "median_profit": float(med[best_idx]),
|
| | "q_down_profit": float(q_down[best_idx]),
|
| | "downside_risk": float(downside_risk[best_idx]),
|
| | "risk_lambda": float(risk_lambda),
|
| | "downside_quantile": float(downside_quantile),
|
| | }
|
| |
|
| |
|
| | def profit_distribution_at_price(
|
| | boot_params: pd.DataFrame,
|
| | cost: float,
|
| | price: float,
|
| | q: float = 0.1,
|
| | ) -> dict:
|
| | A = np.exp(boot_params["intercept"].values)
|
| | beta = boot_params["elasticity"].values
|
| | p = float(price)
|
| |
|
| | profit = (p - float(cost)) * (A * (p ** beta))
|
| |
|
| | med = float(np.median(profit))
|
| | q_down = float(np.quantile(profit, q))
|
| | q_up = float(np.quantile(profit, 1 - q))
|
| |
|
| | return {
|
| | "price": p,
|
| | "median_profit": med,
|
| | "q_down_profit": q_down,
|
| | "q_up_profit": q_up,
|
| | "downside_risk": med - q_down,
|
| | "upside_spread": q_up - med,
|
| | }
|
| |
|
| |
|
| | def decision_justification_card(
|
| | robust_stats: dict,
|
| | naive_stats: dict,
|
| | decision_status: dict,
|
| | ) -> dict:
|
| | denom_profit = max(abs(float(naive_stats["median_profit"])), 1e-9)
|
| | denom_risk = max(abs(float(naive_stats["downside_risk"])), 1e-9)
|
| |
|
| | med_delta_pct = (robust_stats["median_profit"] - naive_stats["median_profit"]) / denom_profit * 100.0
|
| | downside_improvement_pct = (naive_stats["downside_risk"] - robust_stats["downside_risk"]) / denom_risk * 100.0
|
| |
|
| | rationale = (
|
| | "The selected price sacrifices negligible median profit to materially reduce downside risk "
|
| | "across plausible demand elasticities, producing a more stable and defensible pricing decision under uncertainty."
|
| | if decision_status["status"] == "ROBUST"
|
| | else
|
| | "The price decision shows excessive downside variability relative to expected payoff and should not be deployed "
|
| | "without further constraints or additional data."
|
| | )
|
| |
|
| | return {
|
| | "recommended_price": round(float(robust_stats["price"]), 2),
|
| | "naive_price": round(float(naive_stats["price"]), 2),
|
| | "median_profit_delta_pct": round(float(med_delta_pct), 2),
|
| | "downside_risk_improvement_pct": round(float(downside_improvement_pct), 2),
|
| | "decision_status": decision_status["status"],
|
| | "rationale": rationale,
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | def decision_status(
|
| | stats: dict,
|
| | max_downside_frac: float = 0.05,
|
| | ) -> dict:
|
| | frac = stats["downside_risk"] / max(stats["median_profit"], 1e-9)
|
| |
|
| | return {
|
| | "status": "ROBUST" if frac <= max_downside_frac else "FRAGILE",
|
| | "downside_fraction": round(frac, 3),
|
| | "threshold": max_downside_frac,
|
| | }
|
| |
|
| |
|
| | def sensitivity_table_at_price(
|
| | boot_params: pd.DataFrame,
|
| | base_cost: float,
|
| | price: float,
|
| | q: float = 0.1,
|
| | elasticity_scales: Tuple[float, ...] = (0.9, 1.0, 1.1),
|
| | cost_scales: Tuple[float, ...] = (0.9, 1.0, 1.1),
|
| | ) -> pd.DataFrame:
|
| | """
|
| | Returns median profit (and downside) at a fixed price under perturbations:
|
| | - scale elasticity draws by factors
|
| | - scale cost by factors
|
| | """
|
| | A = np.exp(boot_params["intercept"].values)
|
| | beta0 = boot_params["elasticity"].values
|
| | p = float(price)
|
| |
|
| | rows = []
|
| | for e_scale in elasticity_scales:
|
| | beta = beta0 * float(e_scale)
|
| | demand = A * (p ** beta)
|
| |
|
| | for c_scale in cost_scales:
|
| | c = float(base_cost) * float(c_scale)
|
| | profit = (p - c) * demand
|
| | med = float(np.median(profit))
|
| | q_down = float(np.quantile(profit, q))
|
| | rows.append(
|
| | {
|
| | "elasticity_scale": float(e_scale),
|
| | "cost_scale": float(c_scale),
|
| | "median_profit": med,
|
| | f"q{int(q*100)}_profit": q_down,
|
| | "downside_risk": med - q_down,
|
| | }
|
| | )
|
| | return pd.DataFrame(rows)
|
| |
|