Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import textwrap | |
| from typing import Dict, List, Optional | |
| from openai import OpenAI | |
| from client import BridgeForgeEnv | |
| from models import BridgeForgeAction | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| BENCHMARK = "bridge_forge" | |
| TASKS = ["easy", "medium", "hard"] | |
| MAX_STEPS = 50 | |
| TEMPERATURE = 0.3 | |
| MAX_TOKENS = 512 | |
| SYSTEM_PROMPT = textwrap.dedent("""\ | |
| You are a structural engineer AI designing 2D truss bridges. | |
| RESPONSE FORMAT: Reply with EXACTLY one JSON object per turn. No markdown, no explanation. | |
| {"action_type": "<type>", "params": {<params>}} | |
| ACTIONS (execute in this order): | |
| 1. select_type: {"bridge_type": "warren_truss"|"pratt_truss"|"howe_truss"|"simply_supported_beam"|"arch"} | |
| 2. add_node: {"node_id": "n1", "x": 0.0, "y": 0.0} | |
| 3. add_member: {"member_id": "m1", "node_start": "n1", "node_end": "n2", "material": "steel"|"concrete"|"timber", "section_area": 0.01} | |
| 4. add_support: {"node_id": "n1", "support_type": "pin"|"roller"} | |
| 5. add_load: {"node_id": "n2", "Fx": 0.0, "Fy": -50.0} | |
| 6. simulate: {} -- run structural analysis | |
| 7. submit: {} -- finalize (only after simulate shows structural_status=pass) | |
| MATERIALS: | |
| | Material | E (GPa) | Yield (MPa) | Density (kg/m3) | Cost (INR/kg) | | |
| | Steel | 200 | 250 | 7850 | 85 | | |
| | Concrete | 30 | 30 | 2400 | 12 | | |
| | Timber | 12 | 40 | 600 | 30 | | |
| Steel is strongest but heaviest/costliest. Timber is lightest/cheapest but weak. | |
| STRUCTURAL ENGINEERING RULES: | |
| - A stable truss needs triangulation. NEVER place all nodes on the same y-coordinate. | |
| - Minimum members for stability: 2*nodes - 3. | |
| - Pin support on one end, roller on the other. Both at y=0 on the span endpoints. | |
| - Top chord height should be span/4 to span/6 for good structural depth. | |
| - Larger section_area = stronger but heavier/costlier. Start with 0.005-0.01 m2 for steel. | |
| - If stress_ratio > 1.0 after simulate: increase section_area or add members. | |
| - If deflection is too high: increase section_area, add bracing, or increase truss depth. | |
| - Distribute loads across deck nodes, not just one point. | |
| UNITS: coordinates in meters, forces in kN, areas in m2. | |
| Reply with ONLY the JSON object.""") | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True) | |
| def format_observation(obs) -> str: | |
| parts = [ | |
| f"Scenario: {obs.scenario}", | |
| f"Bridge type: {obs.bridge_type or 'not selected'}", | |
| f"Nodes ({len(obs.nodes)}): {json.dumps(obs.nodes)}", | |
| f"Members ({len(obs.members)}): {json.dumps(obs.members)}", | |
| f"Supports ({len(obs.supports)}): {json.dumps(obs.supports)}", | |
| f"Loads ({len(obs.loads)}): {json.dumps(obs.loads)}", | |
| f"Constraints: {json.dumps(obs.constraints)}", | |
| f"Step: {obs.step_count}", | |
| f"Message: {obs.message}", | |
| ] | |
| if obs.simulation_result: | |
| parts.append(f"Simulation result: {json.dumps(obs.simulation_result)}") | |
| return "\n".join(parts) | |
| def fallback_action(obs) -> Optional[Dict]: | |
| if obs.simulation_result and obs.simulation_result.get("structural_status") == "pass": | |
| return {"action_type": "submit", "params": {}} | |
| if obs.nodes and obs.members and obs.supports and obs.loads: | |
| if obs.simulation_result is None: | |
| return {"action_type": "simulate", "params": {}} | |
| return {"action_type": "submit", "params": {}} | |
| return None | |
| def _extract_json(text: str) -> Optional[Dict]: | |
| text = text.strip().strip("`") | |
| if text.startswith("json"): | |
| text = text[4:].strip() | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| pass | |
| match = re.search(r'\{[^{}]*"action_type"[^{}]*\}', text) | |
| if match: | |
| try: | |
| return json.loads(match.group()) | |
| except json.JSONDecodeError: | |
| pass | |
| match = re.search(r'\{.*\}', text, re.DOTALL) | |
| if match: | |
| try: | |
| return json.loads(match.group()) | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def get_model_action(client: OpenAI, obs, history: List[Dict]) -> Optional[Dict]: | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| for h in history[-6:]: | |
| messages.append({"role": "assistant", "content": json.dumps(h["action"])}) | |
| messages.append({"role": "user", "content": h["observation"]}) | |
| messages.append({"role": "user", "content": format_observation(obs)}) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=messages, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| parsed = _extract_json(text) | |
| if parsed is not None: | |
| return parsed | |
| print(f"[DEBUG] Could not parse JSON from: {text[:200]}", flush=True) | |
| return fallback_action(obs) | |
| except Exception as exc: | |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) | |
| return fallback_action(obs) | |
| async def run_task(task_id: str, llm: OpenAI) -> float: | |
| if LOCAL_IMAGE_NAME: | |
| env = await BridgeForgeEnv.from_docker_image(LOCAL_IMAGE_NAME) | |
| else: | |
| base_url = os.getenv("ENV_BASE_URL", "http://localhost:8000") | |
| env = BridgeForgeEnv(base_url=base_url) | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| result = await env.reset(scenario_id=task_id) | |
| obs = result.observation | |
| history: List[Dict] = [] | |
| consecutive_failures = 0 | |
| for step in range(1, MAX_STEPS + 1): | |
| if result.done: | |
| break | |
| action_dict = get_model_action(llm, obs, history) | |
| if action_dict is None: | |
| consecutive_failures += 1 | |
| if consecutive_failures >= 3: | |
| print("[DEBUG] LLM unavailable and no valid fallback. Ending task.", flush=True) | |
| break | |
| continue | |
| else: | |
| consecutive_failures = 0 | |
| action_type = action_dict.get("action_type", "simulate") | |
| params = action_dict.get("params", {}) | |
| try: | |
| action = BridgeForgeAction(action_type=action_type, params=params) | |
| except Exception: | |
| action = BridgeForgeAction(action_type="simulate", params={}) | |
| action_dict = {"action_type": "simulate", "params": {}} | |
| result = await env.step(action) | |
| obs = result.observation | |
| reward = result.reward or 0.0 | |
| done = result.done | |
| error = obs.message if "not found" in obs.message.lower() or "invalid" in obs.message.lower() or "cannot" in obs.message.lower() else None | |
| rewards.append(reward) | |
| steps_taken = step | |
| action_str = f"{action_type}({json.dumps(params)})" | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=error) | |
| history.append({ | |
| "action": action_dict, | |
| "observation": format_observation(obs), | |
| }) | |
| if done: | |
| break | |
| score = rewards[-1] if rewards else 0.0 | |
| score = min(max(score, 0.0), 1.0) | |
| success = score > 0.0 | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception as e: | |
| print(f"[DEBUG] env.close() error: {e}", flush=True) | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return score | |
| async def main() -> None: | |
| llm = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| scores = [] | |
| for task_id in TASKS: | |
| score = await run_task(task_id, llm) | |
| scores.append(score) | |
| avg = sum(scores) / len(scores) if scores else 0.0 | |
| print(f"\n[SUMMARY] tasks={len(TASKS)} avg_score={avg:.2f} scores={','.join(f'{s:.2f}' for s in scores)}", flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |