Spaces:
Sleeping
Sleeping
Fix task validation: accept OpenEnv action dict format in /step, add /grade endpoint, run all 3 tasks in inference.py
a7b1c20 | #!/usr/bin/env python3 | |
| """ | |
| Inference script for StructuralDesignEnv β OpenEnv Hackathon Round 1 submission. | |
| An LLM agent designs a steel building frame step-by-step, receiving physics | |
| analysis feedback (utilization ratios, deflections, drift) at each step. | |
| Required environment variables: | |
| API_BASE_URL β LLM API endpoint (e.g. https://api.openai.com/v1) | |
| MODEL_NAME β Model identifier (e.g. gpt-4o-mini) | |
| HF_TOKEN β Hugging Face / API key | |
| Optional: | |
| TASK_ID β task1_warehouse | task2_office | task3_hospital (default: task1_warehouse) | |
| SERVER_URL β URL of the running OpenEnv server (default: http://localhost:7860) | |
| (SPACE_URL is also accepted for backward compatibility) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import textwrap | |
| from typing import List, Optional | |
| import httpx | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| load_dotenv() | |
| # -------------------------------------------------------------------------- | |
| # Hackathon-required env vars (exact spec: only API_BASE_URL and MODEL_NAME have defaults) | |
| # -------------------------------------------------------------------------- | |
| API_BASE_URL: str = os.getenv("API_BASE_URL", "https://api.openai.com/v1") | |
| MODEL_NAME: str = os.getenv("MODEL_NAME", "gpt-4o-mini") | |
| HF_TOKEN: Optional[str] = os.getenv("HF_TOKEN") # no default β required at runtime | |
| LOCAL_IMAGE_NAME: Optional[str] = os.getenv("LOCAL_IMAGE_NAME") # optional, for from_docker_image() | |
| # -------------------------------------------------------------------------- | |
| # Optional config | |
| # -------------------------------------------------------------------------- | |
| TASK_ID: str = os.getenv("TASK_ID", "task1_warehouse") | |
| SPACE_URL: str = os.getenv("SERVER_URL") or os.getenv("SPACE_URL", "http://localhost:7860") | |
| TEMPERATURE: float = 0.0 | |
| MAX_TOKENS: int = 4096 # Qwen3.5 needs extra tokens for thinking before JSON | |
| BENCHMARK = "structural_design_env" | |
| SYSTEM_PROMPT = textwrap.dedent(""" | |
| You are a structural engineer designing a building frame step-by-step. | |
| Each response must be ONE JSON object and nothing else β no prose, no markdown fences. | |
| ACTION TYPES (exact field names required): | |
| 1. Place column: | |
| {"action_type":"place_column","grid_x":5,"grid_y":0,"floor":0,"section":"HEB200"} | |
| grid_x: 0 to site_width-1, grid_y: 0 to site_depth-1, floor: 0 to n_floors-1 | |
| sections: HEB140 HEB160 HEB200 HEB240 HEB300 HEB360 HEB400 | |
| 2. Place beam (connect two existing columns on same floor): | |
| {"action_type":"place_beam","from_node_x":0,"from_node_y":0,"to_node_x":5,"to_node_y":0,"floor":0,"section":"IPE300","orientation":"x"} | |
| orientation "x" = east-west beam, "y" = north-south beam | |
| sections: IPE200 IPE240 IPE300 IPE360 IPE400 IPE450 IPE500 | |
| 3. Add shear wall (both endpoint columns must already exist): | |
| {"action_type":"add_wall","from_node_x":0,"from_node_y":0,"to_node_x":0,"to_node_y":5,"floor":0,"thickness_m":0.2,"orientation":"y"} | |
| 4. Upgrade section (next larger): | |
| {"action_type":"upgrade_section","element_id":"col_5_0_0"} | |
| 5. Downgrade section (next smaller): | |
| {"action_type":"downgrade_section","element_id":"col_5_0_0"} | |
| 6. Finish design: | |
| {"action_type":"done"} | |
| PHYSICS RULES: | |
| - UR (utilization ratio) = demand/capacity. All URs must be < 1.0. | |
| - Beam deflection limit: span/300. Longer spans need bigger IPE sections. | |
| - Lateral drift limit: story_height/500. Add shear walls if drift is exceeded. | |
| DESIGN STRATEGY: | |
| 1. Place columns at 4-6m spacing (check site_width_m and site_depth_m in the observation) | |
| 2. Connect columns with beams in both x and y directions | |
| 3. If wind or seismic loads exist, add shear walls | |
| 4. Upgrade any member with UR > 1.0 | |
| 5. Send {"action_type":"done"} when all URs < 1.0 or you are satisfied | |
| """).strip() | |
| # -------------------------------------------------------------------------- | |
| # Required logging functions (hackathon spec β exact format mandatory) | |
| # -------------------------------------------------------------------------- | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) | |
| # -------------------------------------------------------------------------- | |
| # LLM helper | |
| # -------------------------------------------------------------------------- | |
| def _build_client() -> OpenAI: | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN is required") | |
| return OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| def build_user_prompt(step: int, obs_message: str, last_reward: float, history: List[str]) -> str: | |
| history_block = "\n".join(history[-4:]) if history else "None" | |
| return textwrap.dedent(f""" | |
| Step: {step} | |
| Last reward: {last_reward:.2f} | |
| Environment feedback: | |
| {obs_message} | |
| Previous actions: | |
| {history_block} | |
| Send your next action as a JSON object. | |
| """).strip() | |
| def get_model_action(step: int, obs_message: str, last_reward: float, history: List[str]) -> str: | |
| user_prompt = build_user_prompt(step, obs_message, last_reward, history) | |
| try: | |
| client = _build_client() | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| # Strip thinking preamble from reasoning models (Qwen3.5, DeepSeek-R1, etc.) | |
| # vllm may strip the opening <think> tag but keep </think>, so split on it | |
| if "</think>" in text: | |
| text = text.split("</think>", 1)[1].strip() | |
| elif "<think>" in text: | |
| text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() | |
| # Strip markdown code fences if present | |
| if text.startswith("```"): | |
| lines = text.split("```") | |
| text = lines[1] if len(lines) > 1 else text | |
| if text.startswith("json"): | |
| text = text[4:] | |
| text = text.strip() | |
| return text if text else '{"action_type": "done"}' | |
| except Exception as exc: | |
| print(f"[DEBUG] LLM request failed: {exc}", flush=True) | |
| return '{"action_type": "done"}' | |
| # -------------------------------------------------------------------------- | |
| # Episode runner | |
| # -------------------------------------------------------------------------- | |
| def run_episode(task_id: str) -> float: | |
| env = httpx.Client(base_url=SPACE_URL, timeout=60) | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| # Reset | |
| try: | |
| resp = env.post("/reset", json={"task_id": task_id}) | |
| resp.raise_for_status() | |
| except Exception as exc: | |
| print(f"[DEBUG] Reset failed: {exc}", flush=True) | |
| log_end(success=False, steps=0, score=0.0, rewards=[]) | |
| return 0.0 | |
| data = resp.json() | |
| session_id = data["session_id"] | |
| obs = data["observation"] | |
| max_steps: int = obs.get("max_steps", 100) | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| last_reward = 0.0 | |
| score = 0.0 | |
| success = False | |
| steps_taken = 0 | |
| done = False | |
| for step in range(1, max_steps + 1): | |
| if done: | |
| break | |
| # Query LLM | |
| action_str = get_model_action(step, obs.get("message", ""), last_reward, history) | |
| history.append(action_str) | |
| # Parse action_type for compact logging | |
| try: | |
| action_obj = json.loads(action_str) | |
| action_label = action_obj.get("action_type", action_str[:40]) | |
| except Exception: | |
| action_label = action_str[:40] | |
| # Step environment | |
| error_msg: Optional[str] = None | |
| try: | |
| resp = env.post("/step", json={"session_id": session_id, "message": action_str}) | |
| resp.raise_for_status() | |
| step_data = resp.json() | |
| except Exception as exc: | |
| error_msg = str(exc)[:80] | |
| log_step(step, action_label, 0.0, False, error_msg) | |
| break | |
| obs = step_data["observation"] | |
| reward: float = step_data.get("reward", 0.0) | |
| done = step_data.get("done", False) | |
| info = step_data.get("info", {}) | |
| if obs.get("last_action_error"): | |
| error_msg = obs["last_action_error"][:80] | |
| rewards.append(reward) | |
| last_reward = reward | |
| steps_taken = step | |
| log_step(step, action_label, reward, done, error_msg) | |
| if done: | |
| score = float(info.get("graded_score", 0.0)) | |
| success = obs.get("is_structurally_valid", False) | |
| break | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return score | |
| # -------------------------------------------------------------------------- | |
| # Entry point | |
| # -------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| valid_tasks = ["task1_warehouse", "task2_office", "task3_hospital"] | |
| if len(sys.argv) > 1: | |
| # Single task specified on command line | |
| task = sys.argv[1] | |
| if task not in valid_tasks: | |
| print(f"[DEBUG] Unknown task '{task}'. Valid: {valid_tasks}", flush=True) | |
| sys.exit(1) | |
| run_episode(task) | |
| else: | |
| # No argument: run all 3 tasks so the platform sees graders for each | |
| tasks_to_run = valid_tasks if TASK_ID == "task1_warehouse" else [TASK_ID] | |
| for task in tasks_to_run: | |
| run_episode(task) | |