Prod_Econ / app.py
jeffrey1963's picture
Update app.py
4f973dd verified
import os, re, json, copy
import gradio as gr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from openai import OpenAI
# ==========================
# Jerry — Logistic Yield Coach (app52) in app53 style
# - LLM-first parser (OpenAI GPT-5.0), regex fallback
# - Schema JSON -> HUD merge -> requirements checks
# - HUD reordered: Py, Px, Other, L, k, x0, range, which_action, last_focus_x
# - Blackboard "show your work" at any X
# ==========================
# ---------- HUD FIELDS ----------
HUD_FIELDS = [
"Py", "Px", "Other",
"L", "k", "x0",
"x_min", "x_max", "x_step",
"which_action",
"last_focus_x"
]
DEFAULTS = {k: None for k in HUD_FIELDS}
DEFAULTS.update({"x_min": 0.0, "x_max": 150.0, "x_step": 5.0, "Other": 0.0})
# ---------- Requirements ----------
REQUIREMENTS = {
"table": ["L","k","x0","x_min","x_max","x_step"],
"plot": ["L","k","x0","x_min","x_max","x_step"],
"app": ["L","k","x0","x_min","x_max","x_step"],
"mpp": ["L","k","x0","x_min","x_max","x_step"],
"mvp": ["L","k","x0","x_min","x_max","x_step","Py"],
"mic": ["Px"],
"profit": ["L","k","x0","x_min","x_max","x_step","Py","Px"],
"optimal": ["L","k","x0","x_min","x_max","x_step","Py","Px"],
"stage": ["L","k","x0","x_min","x_max","x_step"],
"work": ["L","k","x0"]
}
# ---------- Utils ----------
def _pp(x):
try:
return json.dumps(x, indent=2, ensure_ascii=False, default=str)
except Exception:
return str(x)
def merge_into_hud(hud: dict, new_data: dict):
base = copy.deepcopy(hud) if hud else copy.deepcopy(DEFAULTS)
changed = []
for k in HUD_FIELDS:
if k in new_data and new_data[k] is not None:
if base.get(k) != new_data[k]:
base[k] = new_data[k]
changed.append(k)
return base, changed
def missing_for(action: str, hud: dict):
need = REQUIREMENTS.get(action or "", [])
miss = [k for k in need if hud.get(k) in (None, "", float("nan"))]
return miss
# ---------- Model ----------
import numpy as np
import matplotlib.pyplot as plt
def Y_logistic_zero(X, L, k, x0):
"""
Logistic-like yield function that forces Y(0) = 0.
L = maximum yield (asymptote)
k = slope parameter
x0 = inflection point (fertilizer where curve steepens)
"""
return L * (1 - np.exp(-k * X)) / (1 + np.exp(-k * (X - x0)))
# Example
X = np.linspace(0, 150, 200)
Y = Y_logistic_zero(X, L=1200, k=0.06, x0=60)
plt.plot(X, Y)
plt.xlabel("Fertilizer (lbs)")
plt.ylabel("Yield")
plt.title("Shifted logistic yield (Y(0)=0)")
plt.show()
def MPP_logistic(X, L, k, x0):
e = np.exp(-float(k) * (X - float(x0)))
return float(L) * float(k) * e / (1.0 + e)**2
def build_table(h):
L, k, x0 = float(h["L"]), float(h["k"]), float(h["x0"])
X = np.arange(float(h["x_min"]), float(h["x_max"]) + 1e-9, float(h["x_step"]))
Y = [Y_logistic_zero(x,L,k,x0) for x in X]
APP = [y/x if x!=0 else 0 for x,y in zip(X,Y)]
MPP = [MPP_logistic(x,L,k,x0) for x in X]
data = {"X": X, "Y": Y, "APP": APP, "MPP": MPP}
if h.get("Py") is not None: data["MVP"] = [m*float(h["Py"]) for m in MPP]
if h.get("Px") is not None: data["MIC"] = [float(h["Px"])]*len(X)
if h.get("Py") is not None and h.get("Px") is not None:
data["Profit"] = [y*float(h["Py"]) - x*float(h["Px"]) - float(h.get("Other") or 0.0) for x,y in zip(X,Y)]
df = pd.DataFrame(data)
stage = []
for _, row in df.iterrows():
if row["MPP"] > 0:
stage.append("I" if row["APP"] >= row["MPP"] else "II")
else:
stage.append("III")
df["Stage"] = stage
return df
def find_optimal(h):
df = build_table(h)
if "MVP" not in df.columns or "MIC" not in df.columns:
raise ValueError("Need Py (for MVP) and Px (for MIC).")
mask = (df["Stage"]=="II")
if not mask.any(): mask = df["MPP"]>0
sub = df[mask].copy()
sub["gap"] = (sub["MVP"] - sub["MIC"]).abs()
j = sub["gap"].idxmin()
return df.loc[j], df
def row_at_x(h, x):
L, k, x0 = float(h["L"]), float(h["k"]), float(h["x0"])
x = float(x)
Y = Y_logistic_zero(x,L,k,x0)
APP = Y/x if x!=0 else 0.0
MPP = MPP_logistic(x,L,k,x0)
out = {"X":x,"Y":Y,"APP":APP,"MPP":MPP}
if h.get("Py") is not None: out["MVP"] = MPP*float(h["Py"])
if h.get("Px") is not None: out["MIC"] = float(h["Px"])
if h.get("Py") is not None and h.get("Px") is not None:
out["Profit"] = Y*float(h["Py"]) - x*float(h["Px"]) - float(h.get("Other") or 0.0)
out["Stage"] = "I" if MPP>0 and APP>=MPP else "II" if MPP>0 else "III"
return pd.DataFrame([out])
# ---------- LLM-first parser ----------
client = OpenAI()
LLM_OK = bool(os.getenv("OPENAI_API_KEY"))
JERRY_SYSTEM_PROMPT = (
"You are JERRY, a production economics coach. "
"Read the student's text and output ONLY a minified JSON object with keys: "
"Py, Px, Other, L, k, x0, x_min, x_max, x_step, which_action, last_focus_x."
)
def llm_parse(user: str):
if not LLM_OK:
return {}
try:
resp = client.chat.completions.create(
model="gpt-5.0",
temperature=0,
messages=[
{"role": "system", "content": JERRY_SYSTEM_PROMPT},
{"role": "user", "content": user},
]
)
txt = (resp.choices[0].message.content or "").strip()
if txt.startswith("{"):
return json.loads(txt)
except Exception as e:
print("LLM parse failed:", e)
return {}
NUM = r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?"
def _floatish(s):
if s is None: return None
t = str(s).replace("$","").replace(",","").strip()
try: return float(t)
except: return None
def which_from_text(t):
t=t.lower()
if "optimal" in t: return "optimal"
if "plot" in t or "graph" in t: return "plot"
if "table" in t: return "table"
if "work" in t or "explain" in t: return "work"
if "mvp" in t: return "mvp"
if "mic" in t: return "mic"
if "profit" in t: return "profit"
if "mpp" in t: return "mpp"
if "app" in t: return "app"
if "stage" in t: return "stage"
return None
def regex_parse(user: str):
d={}
m=re.search(r"(?:Py|output price)\s*=?\s*("+NUM+")",user,re.I); d["Py"]=_floatish(m.group(1)) if m else None
m=re.search(r"(?:Px|MIC|input price)\s*=?\s*("+NUM+")",user,re.I); d["Px"]=_floatish(m.group(1)) if m else None
m=re.search(r"(?:Other|other cost)\s*=?\s*("+NUM+")",user,re.I); d["Other"]=_floatish(m.group(1)) if m else None
m=re.search(r"\bL\s*=\s*("+NUM+")",user,re.I); d["L"]=_floatish(m.group(1)) if m else None
m=re.search(r"\bk\s*=\s*("+NUM+")",user,re.I); d["k"]=_floatish(m.group(1)) if m else None
m=re.search(r"\bx0\s*=\s*("+NUM+")",user,re.I); d["x0"]=_floatish(m.group(1)) if m else None
m=re.search(r"(?:range|from)\s*("+NUM+")\s*(?:to|-)\s*("+NUM+")\s*(?:by|step)\s*("+NUM+")",user,re.I)
if m:
d["x_min"]=_floatish(m.group(1)); d["x_max"]=_floatish(m.group(2)); d["x_step"]=_floatish(m.group(3))
m=re.search(r"(?:work|explain)[^\d]*x\s*=?\s*("+NUM+")",user,re.I)
if m: d["last_focus_x"]=_floatish(m.group(1))
d["which_action"]=which_from_text(user)
return d
def parse(user: str):
return llm_parse(user) or regex_parse(user)
# ---------- Blackboard ----------
def explain_row(h,row):
lines=[
f"At X = {row['X']:.4g}:",
f" Y ≈ {row['Y']:.4g}",
f" APP ≈ {row['APP']:.4g}",
f" MPP ≈ {row['MPP']:.4g}"
]
if 'MVP' in row: lines.append(f" MVP ≈ {row['MVP']:.4g}")
if 'MIC' in row: lines.append(f" MIC ≈ {row['MIC']:.4g}")
if 'Profit' in row: lines.append(f" Profit ≈ {row['Profit']:.4g}")
lines.append(f" Stage = {row['Stage']}")
return "\n".join(lines)
# ---------- Orchestrator ----------
def _ask_with_hud(user_text, hud_state):
parsed = parse(user_text)
new_hud, changed = merge_into_hud(hud_state or DEFAULTS, parsed)
action = new_hud.get("which_action")
if not action:
return ("Jerry → Tell me what to do", _pp(new_hud), ", ".join(changed) or "(none)",
_pp({"parsed":parsed}), new_hud, "(none)", pd.DataFrame(), None)
miss = missing_for(action, new_hud)
if miss:
return (f"Jerry → Not enough data for {action}. Missing: {', '.join(miss)}",
_pp(new_hud), ", ".join(changed) or "(none)",
_pp({"parsed":parsed,"missing":miss}), new_hud,
"(none)", pd.DataFrame(), None)
reply, steps, table, plot = "", "(none)", pd.DataFrame(), None
df=None
try:
if action in ("table","app","mpp","mvp","mic","profit","stage","plot","optimal"):
df = build_table(new_hud); table=df
if action=="table":
reply="Built table"; steps=explain_row(new_hud,df.iloc[len(df)//2].to_dict())
elif action=="plot":
metric="Y"; t=user_text.lower()
if "mpp" in t: metric="MPP"
elif "app" in t: metric="APP"
elif "mvp" in t and "MVP" in df.columns: metric="MVP"
elif "profit" in t and "Profit" in df.columns: metric="Profit"
fig=plt.figure(); ax=fig.add_subplot(111)
ax.plot(df["X"],df[metric]); ax.set_xlabel("X"); ax.set_ylabel(metric)
plot=fig; reply=f"Plotted {metric}"; steps=explain_row(new_hud,df.iloc[len(df)//2].to_dict())
elif action in ("app","mpp","mvp","mic","profit","stage"):
reply="Computed metrics"; steps=explain_row(new_hud,df.iloc[len(df)//2].to_dict())
elif action=="optimal":
row_star,df=find_optimal(new_hud); table=df
reply=f"Optimal X≈{row_star['X']:.4g}, Y≈{row_star['Y']:.4g}"
if "Profit" in row_star: reply+=f", Profit≈{row_star['Profit']:.4g}"
steps=explain_row(new_hud,row_star.to_dict())
elif action=="work":
x=new_hud.get("last_focus_x")
if x is None: reply="Tell me show work at X=...";
else:
rowdf=row_at_x(new_hud,x); table=rowdf; reply=f"Work at X={x}"; steps=explain_row(new_hud,rowdf.iloc[0].to_dict())
except Exception as e:
reply=f"Jerry → {e}"
return reply,_pp(new_hud),", ".join(changed) or "(none)",_pp({"parsed":parsed}),new_hud,steps,table,plot
# ---------- UI ----------
with gr.Blocks() as demo:
hud_state=gr.State(copy.deepcopy(DEFAULTS))
q=gr.Textbox(label="Talk to Jerry",lines=4)
ask=gr.Button("Ask Jerry")
out=gr.Textbox(label="Jerry says",lines=6)
hud_box=gr.Textbox(label="HUD",lines=18,value=_pp(DEFAULTS))
changed_box=gr.Textbox(label="Changed fields")
teacher_box=gr.Textbox(label="Teacher debug",lines=8,value="(none)")
steps_box=gr.Textbox(label="Blackboard",lines=12)
table=gr.Dataframe(label="Table")
plot=gr.Plot(label="Plot")
ask.click(_ask_with_hud,[q,hud_state],
[out,hud_box,changed_box,teacher_box,hud_state,steps_box,table,plot])
if __name__=="__main__":
demo.launch()