Spaces:
Sleeping
Sleeping
| import os, re, json, copy | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from openai import OpenAI | |
| # ========================== | |
| # Jerry — Logistic Yield Coach (app52) in app53 style | |
| # - LLM-first parser (OpenAI GPT-5.0), regex fallback | |
| # - Schema JSON -> HUD merge -> requirements checks | |
| # - HUD reordered: Py, Px, Other, L, k, x0, range, which_action, last_focus_x | |
| # - Blackboard "show your work" at any X | |
| # ========================== | |
| # ---------- HUD FIELDS ---------- | |
| HUD_FIELDS = [ | |
| "Py", "Px", "Other", | |
| "L", "k", "x0", | |
| "x_min", "x_max", "x_step", | |
| "which_action", | |
| "last_focus_x" | |
| ] | |
| DEFAULTS = {k: None for k in HUD_FIELDS} | |
| DEFAULTS.update({"x_min": 0.0, "x_max": 150.0, "x_step": 5.0, "Other": 0.0}) | |
| # ---------- Requirements ---------- | |
| REQUIREMENTS = { | |
| "table": ["L","k","x0","x_min","x_max","x_step"], | |
| "plot": ["L","k","x0","x_min","x_max","x_step"], | |
| "app": ["L","k","x0","x_min","x_max","x_step"], | |
| "mpp": ["L","k","x0","x_min","x_max","x_step"], | |
| "mvp": ["L","k","x0","x_min","x_max","x_step","Py"], | |
| "mic": ["Px"], | |
| "profit": ["L","k","x0","x_min","x_max","x_step","Py","Px"], | |
| "optimal": ["L","k","x0","x_min","x_max","x_step","Py","Px"], | |
| "stage": ["L","k","x0","x_min","x_max","x_step"], | |
| "work": ["L","k","x0"] | |
| } | |
| # ---------- Utils ---------- | |
| def _pp(x): | |
| try: | |
| return json.dumps(x, indent=2, ensure_ascii=False, default=str) | |
| except Exception: | |
| return str(x) | |
| def merge_into_hud(hud: dict, new_data: dict): | |
| base = copy.deepcopy(hud) if hud else copy.deepcopy(DEFAULTS) | |
| changed = [] | |
| for k in HUD_FIELDS: | |
| if k in new_data and new_data[k] is not None: | |
| if base.get(k) != new_data[k]: | |
| base[k] = new_data[k] | |
| changed.append(k) | |
| return base, changed | |
| def missing_for(action: str, hud: dict): | |
| need = REQUIREMENTS.get(action or "", []) | |
| miss = [k for k in need if hud.get(k) in (None, "", float("nan"))] | |
| return miss | |
| # ---------- Model ---------- | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| def Y_logistic_zero(X, L, k, x0): | |
| """ | |
| Logistic-like yield function that forces Y(0) = 0. | |
| L = maximum yield (asymptote) | |
| k = slope parameter | |
| x0 = inflection point (fertilizer where curve steepens) | |
| """ | |
| return L * (1 - np.exp(-k * X)) / (1 + np.exp(-k * (X - x0))) | |
| # Example | |
| X = np.linspace(0, 150, 200) | |
| Y = Y_logistic_zero(X, L=1200, k=0.06, x0=60) | |
| plt.plot(X, Y) | |
| plt.xlabel("Fertilizer (lbs)") | |
| plt.ylabel("Yield") | |
| plt.title("Shifted logistic yield (Y(0)=0)") | |
| plt.show() | |
| def MPP_logistic(X, L, k, x0): | |
| e = np.exp(-float(k) * (X - float(x0))) | |
| return float(L) * float(k) * e / (1.0 + e)**2 | |
| def build_table(h): | |
| L, k, x0 = float(h["L"]), float(h["k"]), float(h["x0"]) | |
| X = np.arange(float(h["x_min"]), float(h["x_max"]) + 1e-9, float(h["x_step"])) | |
| Y = [Y_logistic_zero(x,L,k,x0) for x in X] | |
| APP = [y/x if x!=0 else 0 for x,y in zip(X,Y)] | |
| MPP = [MPP_logistic(x,L,k,x0) for x in X] | |
| data = {"X": X, "Y": Y, "APP": APP, "MPP": MPP} | |
| if h.get("Py") is not None: data["MVP"] = [m*float(h["Py"]) for m in MPP] | |
| if h.get("Px") is not None: data["MIC"] = [float(h["Px"])]*len(X) | |
| if h.get("Py") is not None and h.get("Px") is not None: | |
| data["Profit"] = [y*float(h["Py"]) - x*float(h["Px"]) - float(h.get("Other") or 0.0) for x,y in zip(X,Y)] | |
| df = pd.DataFrame(data) | |
| stage = [] | |
| for _, row in df.iterrows(): | |
| if row["MPP"] > 0: | |
| stage.append("I" if row["APP"] >= row["MPP"] else "II") | |
| else: | |
| stage.append("III") | |
| df["Stage"] = stage | |
| return df | |
| def find_optimal(h): | |
| df = build_table(h) | |
| if "MVP" not in df.columns or "MIC" not in df.columns: | |
| raise ValueError("Need Py (for MVP) and Px (for MIC).") | |
| mask = (df["Stage"]=="II") | |
| if not mask.any(): mask = df["MPP"]>0 | |
| sub = df[mask].copy() | |
| sub["gap"] = (sub["MVP"] - sub["MIC"]).abs() | |
| j = sub["gap"].idxmin() | |
| return df.loc[j], df | |
| def row_at_x(h, x): | |
| L, k, x0 = float(h["L"]), float(h["k"]), float(h["x0"]) | |
| x = float(x) | |
| Y = Y_logistic_zero(x,L,k,x0) | |
| APP = Y/x if x!=0 else 0.0 | |
| MPP = MPP_logistic(x,L,k,x0) | |
| out = {"X":x,"Y":Y,"APP":APP,"MPP":MPP} | |
| if h.get("Py") is not None: out["MVP"] = MPP*float(h["Py"]) | |
| if h.get("Px") is not None: out["MIC"] = float(h["Px"]) | |
| if h.get("Py") is not None and h.get("Px") is not None: | |
| out["Profit"] = Y*float(h["Py"]) - x*float(h["Px"]) - float(h.get("Other") or 0.0) | |
| out["Stage"] = "I" if MPP>0 and APP>=MPP else "II" if MPP>0 else "III" | |
| return pd.DataFrame([out]) | |
| # ---------- LLM-first parser ---------- | |
| client = OpenAI() | |
| LLM_OK = bool(os.getenv("OPENAI_API_KEY")) | |
| JERRY_SYSTEM_PROMPT = ( | |
| "You are JERRY, a production economics coach. " | |
| "Read the student's text and output ONLY a minified JSON object with keys: " | |
| "Py, Px, Other, L, k, x0, x_min, x_max, x_step, which_action, last_focus_x." | |
| ) | |
| def llm_parse(user: str): | |
| if not LLM_OK: | |
| return {} | |
| try: | |
| resp = client.chat.completions.create( | |
| model="gpt-5.0", | |
| temperature=0, | |
| messages=[ | |
| {"role": "system", "content": JERRY_SYSTEM_PROMPT}, | |
| {"role": "user", "content": user}, | |
| ] | |
| ) | |
| txt = (resp.choices[0].message.content or "").strip() | |
| if txt.startswith("{"): | |
| return json.loads(txt) | |
| except Exception as e: | |
| print("LLM parse failed:", e) | |
| return {} | |
| NUM = r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?" | |
| def _floatish(s): | |
| if s is None: return None | |
| t = str(s).replace("$","").replace(",","").strip() | |
| try: return float(t) | |
| except: return None | |
| def which_from_text(t): | |
| t=t.lower() | |
| if "optimal" in t: return "optimal" | |
| if "plot" in t or "graph" in t: return "plot" | |
| if "table" in t: return "table" | |
| if "work" in t or "explain" in t: return "work" | |
| if "mvp" in t: return "mvp" | |
| if "mic" in t: return "mic" | |
| if "profit" in t: return "profit" | |
| if "mpp" in t: return "mpp" | |
| if "app" in t: return "app" | |
| if "stage" in t: return "stage" | |
| return None | |
| def regex_parse(user: str): | |
| d={} | |
| m=re.search(r"(?:Py|output price)\s*=?\s*("+NUM+")",user,re.I); d["Py"]=_floatish(m.group(1)) if m else None | |
| m=re.search(r"(?:Px|MIC|input price)\s*=?\s*("+NUM+")",user,re.I); d["Px"]=_floatish(m.group(1)) if m else None | |
| m=re.search(r"(?:Other|other cost)\s*=?\s*("+NUM+")",user,re.I); d["Other"]=_floatish(m.group(1)) if m else None | |
| m=re.search(r"\bL\s*=\s*("+NUM+")",user,re.I); d["L"]=_floatish(m.group(1)) if m else None | |
| m=re.search(r"\bk\s*=\s*("+NUM+")",user,re.I); d["k"]=_floatish(m.group(1)) if m else None | |
| m=re.search(r"\bx0\s*=\s*("+NUM+")",user,re.I); d["x0"]=_floatish(m.group(1)) if m else None | |
| m=re.search(r"(?:range|from)\s*("+NUM+")\s*(?:to|-)\s*("+NUM+")\s*(?:by|step)\s*("+NUM+")",user,re.I) | |
| if m: | |
| d["x_min"]=_floatish(m.group(1)); d["x_max"]=_floatish(m.group(2)); d["x_step"]=_floatish(m.group(3)) | |
| m=re.search(r"(?:work|explain)[^\d]*x\s*=?\s*("+NUM+")",user,re.I) | |
| if m: d["last_focus_x"]=_floatish(m.group(1)) | |
| d["which_action"]=which_from_text(user) | |
| return d | |
| def parse(user: str): | |
| return llm_parse(user) or regex_parse(user) | |
| # ---------- Blackboard ---------- | |
| def explain_row(h,row): | |
| lines=[ | |
| f"At X = {row['X']:.4g}:", | |
| f" Y ≈ {row['Y']:.4g}", | |
| f" APP ≈ {row['APP']:.4g}", | |
| f" MPP ≈ {row['MPP']:.4g}" | |
| ] | |
| if 'MVP' in row: lines.append(f" MVP ≈ {row['MVP']:.4g}") | |
| if 'MIC' in row: lines.append(f" MIC ≈ {row['MIC']:.4g}") | |
| if 'Profit' in row: lines.append(f" Profit ≈ {row['Profit']:.4g}") | |
| lines.append(f" Stage = {row['Stage']}") | |
| return "\n".join(lines) | |
| # ---------- Orchestrator ---------- | |
| def _ask_with_hud(user_text, hud_state): | |
| parsed = parse(user_text) | |
| new_hud, changed = merge_into_hud(hud_state or DEFAULTS, parsed) | |
| action = new_hud.get("which_action") | |
| if not action: | |
| return ("Jerry → Tell me what to do", _pp(new_hud), ", ".join(changed) or "(none)", | |
| _pp({"parsed":parsed}), new_hud, "(none)", pd.DataFrame(), None) | |
| miss = missing_for(action, new_hud) | |
| if miss: | |
| return (f"Jerry → Not enough data for {action}. Missing: {', '.join(miss)}", | |
| _pp(new_hud), ", ".join(changed) or "(none)", | |
| _pp({"parsed":parsed,"missing":miss}), new_hud, | |
| "(none)", pd.DataFrame(), None) | |
| reply, steps, table, plot = "", "(none)", pd.DataFrame(), None | |
| df=None | |
| try: | |
| if action in ("table","app","mpp","mvp","mic","profit","stage","plot","optimal"): | |
| df = build_table(new_hud); table=df | |
| if action=="table": | |
| reply="Built table"; steps=explain_row(new_hud,df.iloc[len(df)//2].to_dict()) | |
| elif action=="plot": | |
| metric="Y"; t=user_text.lower() | |
| if "mpp" in t: metric="MPP" | |
| elif "app" in t: metric="APP" | |
| elif "mvp" in t and "MVP" in df.columns: metric="MVP" | |
| elif "profit" in t and "Profit" in df.columns: metric="Profit" | |
| fig=plt.figure(); ax=fig.add_subplot(111) | |
| ax.plot(df["X"],df[metric]); ax.set_xlabel("X"); ax.set_ylabel(metric) | |
| plot=fig; reply=f"Plotted {metric}"; steps=explain_row(new_hud,df.iloc[len(df)//2].to_dict()) | |
| elif action in ("app","mpp","mvp","mic","profit","stage"): | |
| reply="Computed metrics"; steps=explain_row(new_hud,df.iloc[len(df)//2].to_dict()) | |
| elif action=="optimal": | |
| row_star,df=find_optimal(new_hud); table=df | |
| reply=f"Optimal X≈{row_star['X']:.4g}, Y≈{row_star['Y']:.4g}" | |
| if "Profit" in row_star: reply+=f", Profit≈{row_star['Profit']:.4g}" | |
| steps=explain_row(new_hud,row_star.to_dict()) | |
| elif action=="work": | |
| x=new_hud.get("last_focus_x") | |
| if x is None: reply="Tell me show work at X=..."; | |
| else: | |
| rowdf=row_at_x(new_hud,x); table=rowdf; reply=f"Work at X={x}"; steps=explain_row(new_hud,rowdf.iloc[0].to_dict()) | |
| except Exception as e: | |
| reply=f"Jerry → {e}" | |
| return reply,_pp(new_hud),", ".join(changed) or "(none)",_pp({"parsed":parsed}),new_hud,steps,table,plot | |
| # ---------- UI ---------- | |
| with gr.Blocks() as demo: | |
| hud_state=gr.State(copy.deepcopy(DEFAULTS)) | |
| q=gr.Textbox(label="Talk to Jerry",lines=4) | |
| ask=gr.Button("Ask Jerry") | |
| out=gr.Textbox(label="Jerry says",lines=6) | |
| hud_box=gr.Textbox(label="HUD",lines=18,value=_pp(DEFAULTS)) | |
| changed_box=gr.Textbox(label="Changed fields") | |
| teacher_box=gr.Textbox(label="Teacher debug",lines=8,value="(none)") | |
| steps_box=gr.Textbox(label="Blackboard",lines=12) | |
| table=gr.Dataframe(label="Table") | |
| plot=gr.Plot(label="Plot") | |
| ask.click(_ask_with_hud,[q,hud_state], | |
| [out,hud_box,changed_box,teacher_box,hud_state,steps_box,table,plot]) | |
| if __name__=="__main__": | |
| demo.launch() | |