import asyncio import json import os import re import textwrap from typing import Dict, List, Optional from openai import OpenAI from client import BridgeForgeEnv from models import BridgeForgeAction API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") BENCHMARK = "bridge_forge" TASKS = ["easy", "medium", "hard"] MAX_STEPS = 50 TEMPERATURE = 0.3 MAX_TOKENS = 512 SYSTEM_PROMPT = textwrap.dedent("""\ You are a structural engineer AI designing 2D truss bridges. RESPONSE FORMAT: Reply with EXACTLY one JSON object per turn. No markdown, no explanation. {"action_type": "", "params": {}} ACTIONS (execute in this order): 1. select_type: {"bridge_type": "warren_truss"|"pratt_truss"|"howe_truss"|"simply_supported_beam"|"arch"} 2. add_node: {"node_id": "n1", "x": 0.0, "y": 0.0} 3. add_member: {"member_id": "m1", "node_start": "n1", "node_end": "n2", "material": "steel"|"concrete"|"timber", "section_area": 0.01} 4. add_support: {"node_id": "n1", "support_type": "pin"|"roller"} 5. add_load: {"node_id": "n2", "Fx": 0.0, "Fy": -50.0} 6. simulate: {} -- run structural analysis 7. submit: {} -- finalize (only after simulate shows structural_status=pass) MATERIALS: | Material | E (GPa) | Yield (MPa) | Density (kg/m3) | Cost (INR/kg) | | Steel | 200 | 250 | 7850 | 85 | | Concrete | 30 | 30 | 2400 | 12 | | Timber | 12 | 40 | 600 | 30 | Steel is strongest but heaviest/costliest. Timber is lightest/cheapest but weak. STRUCTURAL ENGINEERING RULES: - A stable truss needs triangulation. NEVER place all nodes on the same y-coordinate. - Minimum members for stability: 2*nodes - 3. - Pin support on one end, roller on the other. Both at y=0 on the span endpoints. - Top chord height should be span/4 to span/6 for good structural depth. - Larger section_area = stronger but heavier/costlier. Start with 0.005-0.01 m2 for steel. - If stress_ratio > 1.0 after simulate: increase section_area or add members. - If deflection is too high: increase section_area, add bracing, or increase truss depth. - Distribute loads across deck nodes, not just one point. UNITS: coordinates in meters, forces in kN, areas in m2. Reply with ONLY the JSON object.""") def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True) def format_observation(obs) -> str: parts = [ f"Scenario: {obs.scenario}", f"Bridge type: {obs.bridge_type or 'not selected'}", f"Nodes ({len(obs.nodes)}): {json.dumps(obs.nodes)}", f"Members ({len(obs.members)}): {json.dumps(obs.members)}", f"Supports ({len(obs.supports)}): {json.dumps(obs.supports)}", f"Loads ({len(obs.loads)}): {json.dumps(obs.loads)}", f"Constraints: {json.dumps(obs.constraints)}", f"Step: {obs.step_count}", f"Message: {obs.message}", ] if obs.simulation_result: parts.append(f"Simulation result: {json.dumps(obs.simulation_result)}") return "\n".join(parts) def fallback_action(obs) -> Optional[Dict]: if obs.simulation_result and obs.simulation_result.get("structural_status") == "pass": return {"action_type": "submit", "params": {}} if obs.nodes and obs.members and obs.supports and obs.loads: if obs.simulation_result is None: return {"action_type": "simulate", "params": {}} return {"action_type": "submit", "params": {}} return None def _extract_json(text: str) -> Optional[Dict]: text = text.strip().strip("`") if text.startswith("json"): text = text[4:].strip() try: return json.loads(text) except json.JSONDecodeError: pass match = re.search(r'\{[^{}]*"action_type"[^{}]*\}', text) if match: try: return json.loads(match.group()) except json.JSONDecodeError: pass match = re.search(r'\{.*\}', text, re.DOTALL) if match: try: return json.loads(match.group()) except json.JSONDecodeError: pass return None def get_model_action(client: OpenAI, obs, history: List[Dict]) -> Optional[Dict]: messages = [{"role": "system", "content": SYSTEM_PROMPT}] for h in history[-6:]: messages.append({"role": "assistant", "content": json.dumps(h["action"])}) messages.append({"role": "user", "content": h["observation"]}) messages.append({"role": "user", "content": format_observation(obs)}) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() parsed = _extract_json(text) if parsed is not None: return parsed print(f"[DEBUG] Could not parse JSON from: {text[:200]}", flush=True) return fallback_action(obs) except Exception as exc: print(f"[DEBUG] Model request failed: {exc}", flush=True) return fallback_action(obs) async def run_task(task_id: str, llm: OpenAI) -> float: if LOCAL_IMAGE_NAME: env = await BridgeForgeEnv.from_docker_image(LOCAL_IMAGE_NAME) else: base_url = os.getenv("ENV_BASE_URL", "http://localhost:8000") env = BridgeForgeEnv(base_url=base_url) rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) try: result = await env.reset(scenario_id=task_id) obs = result.observation history: List[Dict] = [] consecutive_failures = 0 for step in range(1, MAX_STEPS + 1): if result.done: break action_dict = get_model_action(llm, obs, history) if action_dict is None: consecutive_failures += 1 if consecutive_failures >= 3: print("[DEBUG] LLM unavailable and no valid fallback. Ending task.", flush=True) break continue else: consecutive_failures = 0 action_type = action_dict.get("action_type", "simulate") params = action_dict.get("params", {}) try: action = BridgeForgeAction(action_type=action_type, params=params) except Exception: action = BridgeForgeAction(action_type="simulate", params={}) action_dict = {"action_type": "simulate", "params": {}} result = await env.step(action) obs = result.observation reward = result.reward or 0.0 done = result.done error = obs.message if "not found" in obs.message.lower() or "invalid" in obs.message.lower() or "cannot" in obs.message.lower() else None rewards.append(reward) steps_taken = step action_str = f"{action_type}({json.dumps(params)})" log_step(step=step, action=action_str, reward=reward, done=done, error=error) history.append({ "action": action_dict, "observation": format_observation(obs), }) if done: break score = rewards[-1] if rewards else 0.0 score = min(max(score, 0.0), 1.0) success = score > 0.0 finally: try: await env.close() except Exception as e: print(f"[DEBUG] env.close() error: {e}", flush=True) log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return score async def main() -> None: llm = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) scores = [] for task_id in TASKS: score = await run_task(task_id, llm) scores.append(score) avg = sum(scores) / len(scores) if scores else 0.0 print(f"\n[SUMMARY] tasks={len(TASKS)} avg_score={avg:.2f} scores={','.join(f'{s:.2f}' for s in scores)}", flush=True) if __name__ == "__main__": asyncio.run(main())