| """ |
| FastAPI server bridging the LabEnv Python backend to the Next.js frontend. |
| |
| Endpoints: |
| POST /api/training/start — train the agent (SSE stream) |
| POST /api/run/ai — run one AI-agent episode |
| POST /api/run/naive — run one naive-agent episode |
| POST /api/env/reset — reset environment |
| POST /api/env/step — take one step |
| GET /api/stats — dashboard aggregate stats |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any |
|
|
| from fastapi import FastAPI, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import StreamingResponse |
| from pydantic import BaseModel |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
|
|
| from lab_env.env import ( |
| LabEnv, |
| INITIAL_BUDGET, |
| ACTION_SETUP_START, |
| ACTION_RUN_ASSAY, |
| ACTION_ORDER_TIPS, |
| ACTION_ORDER_BUFFER, |
| ACTION_ORDER_POLYMERASE, |
| ACTION_WAIT, |
| ACTION_FINISH, |
| ) |
| from lab_env.spec import pcr_experiment_spec, get_spec_for_workflow |
| from agents.naive_agent import NaiveAgent |
| from agents.rl_agent import ReinforceAgent |
|
|
| |
| _envs: dict[str, LabEnv] = {} |
|
|
| try: |
| from agents.research_llm_agent import ResearchLLMAgent |
| HAS_RESEARCH_AGENT = True |
| except ImportError: |
| ResearchLLMAgent = None |
| HAS_RESEARCH_AGENT = False |
|
|
| try: |
| from agents.research_generate_agent import ResearchGenerateAgent |
| HAS_RESEARCH_GENERATE_AGENT = True |
| except ImportError: |
| ResearchGenerateAgent = None |
| HAS_RESEARCH_GENERATE_AGENT = False |
|
|
| app = FastAPI(title="SimLab API") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| rl_agent: ReinforceAgent | None = None |
| _trained_agents: dict[str, ReinforceAgent] = {} |
| run_history: list[dict] = [] |
|
|
|
|
| def _get_env(workflow_id: str) -> LabEnv: |
| """Get or create LabEnv for this workflow. Uses spec from get_spec_for_workflow(workflow_id).""" |
| if workflow_id not in _envs: |
| spec = get_spec_for_workflow(workflow_id) |
| _envs[workflow_id] = LabEnv(spec=spec) |
| return _envs[workflow_id] |
|
|
|
|
| |
| |
| |
|
|
| class TrainRequest(BaseModel): |
| episodes: int = 2000 |
| lr: float = 3e-3 |
| max_trials: int = 4 |
| eval_episodes: int = 100 |
| workflow_id: str = "pcr-amplification" |
|
|
| class StepRequest(BaseModel): |
| action: int |
| workflow_id: str = "pcr-amplification" |
|
|
| class RunRequest(BaseModel): |
| seed: int = 42 |
| workflow_id: str = "pcr-amplification" |
|
|
|
|
| |
| |
| |
|
|
| def _env_state_dict(env: LabEnv) -> dict[str, Any]: |
| info = env._info() |
| return { |
| "step_index": info["step_index"], |
| "elapsed_minutes": info["elapsed_minutes"], |
| "remaining_budget": info["remaining_budget"], |
| "inventory": info["inventory"], |
| "last_result": info["last_result"], |
| "best_result": info["best_result"], |
| "max_time": 240, |
| "max_budget": 500, |
| } |
|
|
|
|
| def _trace_episode(env: LabEnv, agent: ReinforceAgent, seed: int) -> dict: |
| """Run an AI episode and produce a step-by-step timeline.""" |
| presets = env.spec.presets |
| obs, info = env.reset(seed=seed) |
| agent.reset() |
| timeline: list[dict] = [] |
| presets_tried: dict[int, str] = {} |
|
|
| for trial in range(agent.max_trials): |
| if agent._inventory_low(obs): |
| for act in (ACTION_ORDER_TIPS, ACTION_ORDER_BUFFER, ACTION_ORDER_POLYMERASE): |
| obs, rew, done, trunc, info = env.step(act) |
| timeline.append({ |
| "title": "Order Reagents", |
| "description": _order_label(act), |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": "action", |
| "icon": "order", |
| }) |
| if done or trunc: |
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
| preset = agent._select_preset(obs, deterministic=True) |
| p = presets[preset] |
| label = _preset_label(p) |
|
|
| obs, rew, done, trunc, info = env.step(ACTION_SETUP_START + preset) |
| timeline.append({ |
| "title": "Setup", |
| "description": label, |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": "pending", |
| "icon": "setup", |
| }) |
| if done or trunc: |
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
| obs, rew, done, trunc, info = env.step(ACTION_RUN_ASSAY) |
| result = info["last_result"] |
| presets_tried[preset] = result |
| timeline.append({ |
| "title": "Run Assay", |
| "description": _result_description(result), |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": result, |
| "icon": "run", |
| }) |
| if done or trunc: |
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
| if info.get("best_result") == "success": |
| obs, rew, _, _, info = env.step(ACTION_FINISH) |
| timeline.append({ |
| "title": "Finish", |
| "description": "Experiment complete — success!", |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": "success", |
| "icon": "finish", |
| }) |
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
| obs, rew, _, _, info = env.step(ACTION_FINISH) |
| timeline.append({ |
| "title": "Finish", |
| "description": f"Experiment complete — best: {info['best_result']}", |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": info["best_result"] if info["best_result"] in ("success", "partial") else "fail", |
| "icon": "finish", |
| }) |
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
|
|
| def _trace_naive_episode(env: LabEnv, agent: NaiveAgent, seed: int) -> dict: |
| presets = env.spec.presets |
| num_presets = len(presets) |
| obs, info = env.reset(seed=seed) |
| agent.reset() |
| timeline: list[dict] = [] |
| presets_tried: dict[int, str] = {} |
| total_reward = 0.0 |
|
|
| while True: |
| action = agent.select_action(obs) |
| obs, reward, done, trunc, info = env.step(action) |
| total_reward += reward |
|
|
| if ACTION_SETUP_START <= action < ACTION_SETUP_START + num_presets: |
| p = presets[action - ACTION_SETUP_START] |
| timeline.append({ |
| "title": "Setup", |
| "description": _preset_label(p), |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": "pending", |
| "icon": "setup", |
| }) |
| elif action == ACTION_RUN_ASSAY: |
| result = info["last_result"] |
| timeline.append({ |
| "title": "Run Assay", |
| "description": _result_description(result), |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": result, |
| "icon": "run", |
| }) |
| elif action in (ACTION_ORDER_TIPS, ACTION_ORDER_BUFFER, ACTION_ORDER_POLYMERASE): |
| timeline.append({ |
| "title": "Order Reagents", |
| "description": _order_label(action), |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": "action", |
| "icon": "order", |
| }) |
| elif action == ACTION_FINISH: |
| timeline.append({ |
| "title": "Finish", |
| "description": f"Experiment complete — best: {info['best_result']}", |
| "time": f"{info['elapsed_minutes']:.0f} min", |
| "status": info["best_result"] if info["best_result"] in ("success", "partial") else "fail", |
| "icon": "finish", |
| }) |
|
|
| if done or trunc: |
| break |
|
|
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
|
|
| def _build_run_result(env: LabEnv, info: dict, timeline: list[dict], presets_tried: dict[int, str]) -> dict: |
| presets = env.spec.presets |
| spec = env.spec |
| preset_statuses = [] |
| for i, p in enumerate(presets): |
| row: dict[str, Any] = { |
| "id": str(i), |
| "status": presets_tried.get(i, "untried"), |
| "label": _preset_label(p), |
| } |
| if "temp" in p: |
| row["temp"] = p["temp"] |
| row["cycles"] = p["cycles"] |
| row["ratio"] = p["ratio"] |
| if "coating_hr" in p: |
| row["coating_hr"] = p["coating_hr"] |
| row["block"] = p.get("block", "") |
| preset_statuses.append(row) |
| return { |
| "state": { |
| "elapsed_minutes": info["elapsed_minutes"], |
| "remaining_budget": info["remaining_budget"], |
| "inventory": info["inventory"], |
| "best_result": info["best_result"], |
| "max_time": getattr(spec, "max_minutes", 240), |
| "max_budget": getattr(spec, "initial_budget", 500), |
| }, |
| "timeline": timeline, |
| "presets": preset_statuses, |
| "reward": float(INITIAL_BUDGET - info["remaining_budget"]), |
| "best_result": info["best_result"], |
| } |
|
|
|
|
| def _result_description(result: str) -> str: |
| return {"success": "Success!", "partial": "Partial — low yield", "fail": "Failed — no amplification"}.get(result, result) |
|
|
|
|
| def _order_label(action: int) -> str: |
| return {ACTION_ORDER_TIPS: "+5 tips", ACTION_ORDER_BUFFER: "+5 buffer", ACTION_ORDER_POLYMERASE: "+3 polymerase"}.get(action, "reagents") |
|
|
|
|
| def _preset_label(preset: dict) -> str: |
| """Human-readable preset description for timeline/UI (PCR or ELISA).""" |
| if "coating_hr" in preset: |
| return f"{preset['coating_hr']}hr coat / {preset['temp']}°C / {preset.get('block', '')}" |
| return f"{preset.get('temp', '?')}°C / {preset.get('cycles', '?')} cyc / {preset.get('ratio', '?')}" |
|
|
|
|
| def _trace_research_episode(env: LabEnv, seed: int, max_trials: int = 5) -> dict: |
| """Run Research LLM agent episode and build timeline (Research → Hypothesis → Experiment → Learn). PCR only.""" |
| presets = env.spec.presets |
| if not HAS_RESEARCH_AGENT: |
| return _build_run_result(env, env._info(), [{"title": "Research agent unavailable", "description": "Install openai and set OPENAI_API_KEY", "time": "0 min", "status": "fail", "icon": "run"}], {}) |
| if env.spec.name != "pcr": |
| return _build_run_result(env, env._info(), [{"title": "Research agent", "description": "Research agent is only supported for PCR workflow.", "time": "0 min", "status": "fail", "icon": "run"}], {}) |
| agent = ResearchLLMAgent(max_trials=max_trials) |
| callback: list[dict] = [] |
| result = agent.run_episode(env, seed=seed, episode_callback=callback) |
| info = env._info() |
| timeline: list[dict] = [] |
| presets_tried: dict[int, str] = {} |
|
|
| for step in callback: |
| research = (step.get("research") or "")[:200] |
| if len(step.get("research") or "") > 200: |
| research += "..." |
| timeline.append({ |
| "title": "Research", |
| "description": research or "Literature search for PCR protocol", |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": "action", |
| "icon": "research", |
| }) |
| hyp = step.get("hypothesis") or {} |
| timeline.append({ |
| "title": "Hypothesis", |
| "description": f"temp={hyp.get('temp', '?')}°C, cycles={hyp.get('cycles', '?')}, ratio={hyp.get('ratio', '?')}", |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": "pending", |
| "icon": "hypothesis", |
| }) |
| params = step.get("params_used") or {} |
| res = step.get("result", "fail") |
| timeline.append({ |
| "title": "Run Assay", |
| "description": _result_description(res), |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": res, |
| "icon": "run", |
| }) |
| for i, p in enumerate(presets): |
| if p.get("temp") == params.get("temp") and p.get("cycles") == params.get("cycles") and p.get("ratio") == params.get("ratio"): |
| presets_tried[i] = res |
| break |
| timeline.append({ |
| "title": "Learn", |
| "description": f"temp_range={agent.knowledge.get('temp_range', [])}, cycle_range={agent.knowledge.get('cycle_range', [])}", |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": "action", |
| "icon": "learn", |
| }) |
|
|
| return _build_run_result(env, info, timeline, presets_tried) |
|
|
|
|
| def _protocol_dict_label(protocol: dict) -> str: |
| """Human-readable label for a protocol dict (PCR or ELISA).""" |
| if "coating_hr" in protocol: |
| return f"{protocol.get('coating_hr', '?')}hr / {protocol.get('temp', '?')}°C / {protocol.get('block', '?')}" |
| return f"{protocol.get('temp', '?')}°C / {protocol.get('cycles', '?')} cyc / {protocol.get('ratio', '?')}" |
|
|
|
|
| def _trace_research_generate_episode(env: LabEnv, seed: int, max_trials: int = 6) -> dict: |
| """Run Research & Generate agent (research → generate any protocol → run → learn). Works for PCR, ELISA, etc.""" |
| if not HAS_RESEARCH_GENERATE_AGENT: |
| return _build_run_result( |
| env, env._info(), |
| [{"title": "Research & Generate agent unavailable", "description": "Install openai and set OPENAI_API_KEY", "time": "0 min", "status": "fail", "icon": "run"}], |
| {}, |
| ) |
| if env.spec.evaluate_custom_protocol is None: |
| return _build_run_result( |
| env, env._info(), |
| [{"title": "Research & Generate", "description": "This workflow does not support custom protocols.", "time": "0 min", "status": "fail", "icon": "run"}], |
| {}, |
| ) |
| agent = ResearchGenerateAgent(max_trials=max_trials) |
| agent.run_episode(env, seed=seed, verbose=False) |
| info = env._info() |
| timeline: list[dict] = [] |
| preset_statuses: list[dict[str, Any]] = [] |
| for i, entry in enumerate(agent.feedback_history): |
| protocol = entry.get("protocol", {}) |
| result = entry.get("result", "fail") |
| label = _protocol_dict_label(protocol) |
| timeline.append({ |
| "title": "Research & Generate", |
| "description": f"Generated: {label}", |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": "pending", |
| "icon": "research", |
| }) |
| timeline.append({ |
| "title": "Run Assay", |
| "description": _result_description(result), |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": result, |
| "icon": "run", |
| }) |
| row: dict[str, Any] = {"id": str(i), "status": result, "label": label} |
| if "temp" in protocol: |
| row["temp"] = protocol.get("temp") |
| row["cycles"] = protocol.get("cycles") |
| row["ratio"] = protocol.get("ratio", "") |
| if "coating_hr" in protocol: |
| row["coating_hr"] = protocol.get("coating_hr") |
| row["block"] = protocol.get("block", "") |
| preset_statuses.append(row) |
| timeline.append({ |
| "title": "Finish", |
| "description": f"Best result: {info.get('best_result', 'none')}", |
| "time": f"{info.get('elapsed_minutes', 0):.0f} min", |
| "status": info["best_result"] if info["best_result"] in ("success", "partial") else "fail", |
| "icon": "finish", |
| }) |
| return { |
| "state": { |
| "elapsed_minutes": info["elapsed_minutes"], |
| "remaining_budget": info["remaining_budget"], |
| "inventory": info["inventory"], |
| "best_result": info["best_result"], |
| "max_time": getattr(env.spec, "max_minutes", 240), |
| "max_budget": getattr(env.spec, "initial_budget", 500), |
| }, |
| "timeline": timeline, |
| "presets": preset_statuses, |
| "reward": float(INITIAL_BUDGET - info["remaining_budget"]), |
| "best_result": info["best_result"], |
| } |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/training/start") |
| async def training_start(req: TrainRequest): |
| global rl_agent, _trained_agents |
|
|
| def generate(): |
| global rl_agent, _trained_agents |
| spec = get_spec_for_workflow(req.workflow_id) |
| agent = ReinforceAgent(lr=req.lr, max_trials=req.max_trials, spec=spec) |
| train_env = LabEnv(spec=spec) |
|
|
| window_rewards: list[float] = [] |
| window_successes: list[float] = [] |
| chart_data: list[dict] = [] |
| log_interval = max(req.episodes // 40, 10) |
|
|
| for ep in range(1, req.episodes + 1): |
| result = agent.run_episode(train_env, seed=42 + ep, train=True) |
| window_rewards.append(result["reward"]) |
| window_successes.append(float(result["success"])) |
|
|
| if ep % log_interval == 0 or ep == req.episodes: |
| avg_reward = sum(window_rewards) / len(window_rewards) |
| avg_success = sum(window_successes) / len(window_successes) * 100 |
| chart_data.append({ |
| "episode": ep, |
| "reward": round(avg_reward, 2), |
| "successRate": round(avg_success, 1), |
| }) |
| progress = round(ep / req.episodes * 100) |
| event = { |
| "type": "progress", |
| "episode": ep, |
| "total": req.episodes, |
| "progress": progress, |
| "reward": round(avg_reward, 2), |
| "successRate": round(avg_success, 1), |
| "chartData": chart_data, |
| } |
| yield f"data: {json.dumps(event)}\n\n" |
| window_rewards.clear() |
| window_successes.clear() |
|
|
| rl_agent = agent |
| _trained_agents[req.workflow_id] = agent |
|
|
| eval_seed = 999_999 |
| rl_results = [agent.run_episode(train_env, seed=eval_seed + i, train=False) for i in range(req.eval_episodes)] |
| naive = NaiveAgent(num_trials=3, seed=0) |
| naive_results = [] |
| for i in range(req.eval_episodes): |
| obs, info = train_env.reset(seed=eval_seed + i) |
| naive.reset() |
| total_r = 0.0 |
| while True: |
| a = naive.select_action(obs) |
| obs, r, d, t, info = train_env.step(a) |
| total_r += r |
| if d or t: |
| break |
| naive_results.append({"reward": total_r, "success": info["best_result"] == "success", |
| "partial": info["best_result"] == "partial", |
| "minutes": info["elapsed_minutes"], |
| "cost": 500.0 - info["remaining_budget"]}) |
|
|
| train_env.close() |
| n_rl = len(rl_results) |
| n_nv = len(naive_results) |
|
|
| def agg(res, n): |
| return { |
| "reward": round(sum(r["reward"] for r in res) / n, 1), |
| "success": round(sum(r["success"] for r in res) / n * 100, 1), |
| "partial": round(sum(r["partial"] for r in res) / n * 100, 1), |
| "minutes": round(sum(r["minutes"] for r in res) / n, 0), |
| "cost": round(sum(r["cost"] for r in res) / n, 1), |
| } |
|
|
| rl_s = agg(rl_results, n_rl) |
| nv_s = agg(naive_results, n_nv) |
|
|
| def imp(rl_v, nv_v): |
| if nv_v == 0: |
| return None |
| return round((rl_v - nv_v) / abs(nv_v) * 100) |
|
|
| comparison = [ |
| {"metric": "Avg Reward", "reinforce": rl_s["reward"], "baseline": nv_s["reward"], "improvement": imp(rl_s["reward"], nv_s["reward"]), "unit": ""}, |
| {"metric": "Success Rate", "reinforce": rl_s["success"], "baseline": nv_s["success"], "improvement": imp(rl_s["success"], nv_s["success"]), "unit": "%"}, |
| {"metric": "Partial Rate", "reinforce": rl_s["partial"], "baseline": nv_s["partial"], "improvement": imp(rl_s["partial"], nv_s["partial"]), "unit": "%"}, |
| {"metric": "Avg Time", "reinforce": rl_s["minutes"], "baseline": nv_s["minutes"], "improvement": imp(nv_s["minutes"], rl_s["minutes"]), "unit": "min"}, |
| {"metric": "Avg Cost", "reinforce": rl_s["cost"], "baseline": nv_s["cost"], "improvement": imp(nv_s["cost"], rl_s["cost"]), "unit": "$"}, |
| ] |
|
|
| final_event = { |
| "type": "done", |
| "chartData": chart_data, |
| "comparison": comparison, |
| } |
| yield f"data: {json.dumps(final_event)}\n\n" |
|
|
| return StreamingResponse(generate(), media_type="text/event-stream") |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/run/ai") |
| async def run_ai(req: RunRequest): |
| global rl_agent, _trained_agents |
| env = _get_env(req.workflow_id) |
| agent = _trained_agents.get(req.workflow_id) or rl_agent |
| if agent is None: |
| spec = get_spec_for_workflow(req.workflow_id) |
| agent = ReinforceAgent(max_trials=4, spec=spec) |
| rl_agent = agent |
| _trained_agents[req.workflow_id] = agent |
| return _trace_episode(env, agent, seed=req.seed) |
|
|
|
|
| @app.post("/api/run/naive") |
| async def run_naive(req: RunRequest): |
| env = _get_env(req.workflow_id) |
| agent = NaiveAgent(num_trials=3, seed=req.seed) |
| return _trace_naive_episode(env, agent, seed=req.seed) |
|
|
|
|
| @app.post("/api/run/research") |
| async def run_research(req: RunRequest): |
| """Run Research LLM agent (research → hypothesize → experiment → learn). PCR workflow only.""" |
| env = _get_env(req.workflow_id) |
| return _trace_research_episode(env, seed=req.seed, max_trials=5) |
|
|
|
|
| @app.post("/api/run/research-generate") |
| async def run_research_generate(req: RunRequest): |
| """Run Research & Generate agent (research → generate any protocol → run → learn). PCR, ELISA, any spec with evaluate_custom_protocol.""" |
| env = _get_env(req.workflow_id) |
| return _trace_research_generate_episode(env, seed=req.seed, max_trials=6) |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/env/reset") |
| async def env_reset(req: RunRequest): |
| env = _get_env(req.workflow_id) |
| obs, info = env.reset(seed=req.seed) |
| return _env_state_dict(env) |
|
|
|
|
| @app.post("/api/env/step") |
| async def env_step(req: StepRequest): |
| env = _get_env(req.workflow_id) |
| obs, reward, terminated, truncated, info = env.step(req.action) |
| return { |
| **_env_state_dict(env), |
| "reward": float(reward), |
| "terminated": terminated, |
| "truncated": truncated, |
| } |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/stats") |
| async def get_stats(): |
| n_runs = len(run_history) |
| if n_runs == 0: |
| return { |
| "active_workflows": 1, |
| "total_experiments": 0, |
| "success_rate": "—", |
| "budget_spent": "$0", |
| } |
| successes = sum(1 for r in run_history if r.get("best_result") == "success") |
| return { |
| "active_workflows": 1, |
| "total_experiments": n_runs, |
| "success_rate": f"{successes / n_runs:.0%}", |
| "budget_spent": f"${sum(r.get('cost', 0) for r in run_history):.0f}", |
| } |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|