| """ |
| inference.py β LLM-Driven Inference Script |
| Planetary Rover Navigation Simulator Β· Meta PyTorch Hackathon Round 1 |
| ===================================================================== |
| |
| This script connects to the running OpenEnv Docker container, runs |
| one episode per task (easy β medium β hard), and uses an LLM via the |
| OpenAI-compatible API to choose an action at every step. |
| |
| Environment variables (all required unless marked optional) |
| ----------------------------------------------------------- |
| API_BASE_URL Base URL of the OpenAI-compatible endpoint |
| e.g. "https://api-inference.huggingface.co/v1" |
| API_KEY Bearer token / HF_TOKEN for the LLM endpoint |
| MODEL_NAME Model identifier sent in every chat-completion request |
| e.g. "meta-llama/Llama-3.3-70B-Instruct" |
| IMAGE_NAME Docker image or base URL of the rover environment server |
| e.g. "http://localhost:7860" (running container) |
| or "rover-env:latest" (image name, if using |
| openenv_core.MyEnvV4Env.from_docker_image) |
| |
| Logging format (mandated by hackathon judges) |
| --------------------------------------------- |
| [START] task=<task_id> env=<IMAGE_NAME> model=<MODEL_NAME> |
| [STEP] step=<n> action=<json> reward=<float> done=<bool> error=<str|null> |
| [END] success=<bool> steps=<n> score=<float> rewards=<csv> |
| |
| Exit codes |
| ---------- |
| 0 all three tasks returned score > 0.0 |
| 1 at least one task scored 0.0 (smoke-test failure) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import math |
| import os |
| import re |
| import sys |
| import time |
| from typing import Any |
|
|
| import aiohttp |
| from openai import AsyncOpenAI |
|
|
| |
| |
| |
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api-inference.huggingface.co/v1") |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.3-70B-Instruct") |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "http://localhost:7860") |
|
|
| |
| _BASE_URL: str = LOCAL_IMAGE_NAME.rstrip("/") |
|
|
| |
| TASKS: list[str] = ["easy", "medium", "hard"] |
|
|
| |
| LLM_MAX_TOKENS: int = 256 |
| LLM_TEMPERATURE: float = 0.2 |
| LLM_TIMEOUT: float = 30.0 |
|
|
| |
| |
| |
| FALLBACK_ACTION: dict[str, Any] = { |
| "thrust": 0.5, |
| "steering": 0.0, |
| "brake": 0, |
| "vertical_thruster": 0.0, |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| SYSTEM_PROMPT = """\ |
| You are the autonomous navigation controller for a planetary rover. |
| Your sole responsibility is to output a single JSON action object every turn. |
| Respond ONLY with a JSON object β no explanation, no markdown, no extra text. |
| |
| ## Action space (exact JSON schema) |
| { |
| "thrust": <float in [0.0, 1.0]>, // forward drive power |
| "steering": <float in [-1.0, 1.0]>, // -1=hard left, 0=straight, 1=hard right |
| "brake": <int 0 or 1>, // 1=apply regen braking, 0=drive/coast |
| "vertical_thruster": <float in [-0.2, 0.2]> // vertical adjust (ignored on flat terrain) |
| } |
| |
| ## Physics you must know |
| - heading_error = atan2(target_dy, target_dx) - rover_heading |
| - Normalise heading_error to (-Ο, Ο] before using it. |
| - steering = clamp(heading_error * 2.5, -1.0, 1.0) β P-controller |
| - Rover only steers when thrust > 0; steering at thrust=0 has no effect. |
| - Battery depletes every step; brake=1 halves speed and recovers a tiny |
| amount of battery β only useful when you would otherwise overshoot. |
| |
| ## Task strategies |
| |
| EASY (Flat Plains Transit) |
| - Compute heading to target, set thrust=1.0, steer to correct heading. |
| - Battery is abundant; never brake unless target_distance < 3 m. |
| - Scoring: proximity*0.85 + step_efficiency*0.15 β arrive fast. |
| |
| MEDIUM (Crater Avoidance) |
| - A ring of obstacles sits between you and the waypoint. |
| - If nearest_obstacle_distance < 28 m: steer 90Β° perpendicular to the |
| waypoint direction (pick left or right and hold it) until |
| nearest_obstacle_distance > 35 m, then resume beeline. |
| - Each collision costs -0.06 from the final score. |
| - Scoring: proximity*0.75 + step_efficiency*0.25 - collision_penalty. |
| |
| HARD (Battery Sprint) |
| - Starting battery is only 35%. Drain multiplier is Γ4. |
| - On step 1: compute atan2(target_dy, target_dx), lock that heading, NEVER change it. |
| - Use thrust=1.0 every step. NEVER brake. NEVER deviate. |
| - Scoring: proximity*0.65 + battery_efficiency*0.35. |
| |
| Respond ONLY with the JSON object. Nothing else.\ |
| """ |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def build_user_prompt( |
| task_id: str, |
| obs: dict[str, Any], |
| step_num: int, |
| max_steps: int, |
| ) -> str: |
| """ |
| Build the per-step user message sent to the LLM. |
| |
| Parameters |
| ---------- |
| task_id : "easy" | "medium" | "hard" |
| obs : the Observation dict returned by /reset or /step |
| step_num : current step index (1-based) |
| max_steps : step budget for this task |
| |
| Returns |
| ------- |
| A compact plain-text string. JSON was considered but plain text is |
| more token-efficient and models handle it well for numeric inputs. |
| """ |
| |
| |
| |
| |
| dx = obs["target_relative"]["x"] |
| dy = obs["target_relative"]["y"] |
|
|
| |
| target_heading = math.atan2(dy, dx) |
| raw_error = target_heading - obs["rover_heading"] |
| |
| while raw_error > math.pi: raw_error -= 2 * math.pi |
| while raw_error <= -math.pi: raw_error += 2 * math.pi |
|
|
| |
| |
| suggested_steering = max(-1.0, min(1.0, raw_error * 2.5)) |
|
|
| terrain_names = {0: "flat/sand", 1: "rocky", 2: "crater_floor", 3: "crater_rim"} |
| terrain_label = terrain_names.get(obs["terrain_type"], "unknown") |
|
|
| return ( |
| f"TASK: {task_id}\n" |
| f"STEP: {step_num}/{max_steps} " |
| f"steps_remaining_norm={obs['steps_remaining_norm']:.3f}\n" |
| f"\n" |
| f"NAVIGATION\n" |
| f" target_distance = {obs['target_distance']:.2f} m\n" |
| f" target_dx = {dx:.2f} m\n" |
| f" target_dy = {dy:.2f} m\n" |
| f" rover_heading = {obs['rover_heading']:.4f} rad\n" |
| f" heading_error = {raw_error:.4f} rad\n" |
| f" suggested_steering = {suggested_steering:.4f} " |
| f"(P-control, clamp to [-1,1])\n" |
| f"\n" |
| f"POWER\n" |
| f" battery_level = {obs['battery_level']:.4f} " |
| f"(0=dead, 1=full)\n" |
| f" battery_drain_rate = {obs['battery_drain_rate']:.6f} per step\n" |
| f"\n" |
| f"OBSTACLES\n" |
| f" nearest_obstacle_distance = {obs['nearest_obstacle_distance']:.2f} m " |
| f"(sensor range=50 m; collision at 0.5 m)\n" |
| f" obstacle_count = {obs['obstacle_count']}\n" |
| f"\n" |
| f"TERRAIN\n" |
| f" terrain_type = {obs['terrain_type']} ({terrain_label})\n" |
| f" terrain_slope = {obs['terrain_slope']}\n" |
| f"\n" |
| f"Output your action JSON now.\n" |
| f"Remember: ONLY a JSON object, no explanation." |
| ) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def _clamp(v: float, lo: float, hi: float) -> float: |
| return max(lo, min(hi, v)) |
|
|
|
|
| def parse_llm_action(raw_text: str) -> tuple[dict[str, Any], str | None]: |
| """ |
| Parse the LLM's raw text response into a valid Action dict. |
| |
| Returns |
| ------- |
| (action_dict, error_str) |
| action_dict : always a valid action (fallback if parsing failed) |
| error_str : None if parsing succeeded; human-readable error string |
| if we fell back (this goes into [STEP] error=<str>) |
| |
| Strategy |
| -------- |
| Step 1 β Strip markdown fences. |
| Models trained with RLHF often wrap JSON in ```json ... ```. |
| We remove those first. |
| |
| Step 2 β Extract the first { ... } block. |
| If the model prepended prose ("Sure, here is my action:"), this |
| regex finds the JSON object regardless of what came before it. |
| |
| Step 3 β Parse JSON. |
| Standard json.loads(). If it fails we try a light repair: |
| replace single quotes with double quotes (common LLM mistake). |
| |
| Step 4 β Field normalisation. |
| Accept common aliases (e.g. "steer" β "steering", "gas" β "thrust"). |
| Any missing required field is filled from FALLBACK_ACTION. |
| |
| Step 5 β Bounds clamping. |
| Every float/int is clamped to its declared range so the server's |
| Pydantic validation never rejects our action. |
| """ |
|
|
| |
| |
| stripped = re.sub(r"```(?:json)?\s*", "", raw_text).replace("```", "").strip() |
|
|
| |
| |
| match = re.search(r"\{[^{}]*\}", stripped, re.DOTALL) |
| if not match: |
| |
| return FALLBACK_ACTION.copy(), f"no JSON object found in: {raw_text[:80]!r}" |
|
|
| candidate = match.group(0) |
|
|
| |
| try: |
| parsed = json.loads(candidate) |
| except json.JSONDecodeError: |
| |
| repaired = candidate.replace("'", '"') |
| try: |
| parsed = json.loads(repaired) |
| except json.JSONDecodeError as e: |
| return FALLBACK_ACTION.copy(), f"JSON parse failed: {e} | text: {candidate[:80]!r}" |
|
|
| if not isinstance(parsed, dict): |
| return FALLBACK_ACTION.copy(), f"parsed JSON is not a dict: {type(parsed)}" |
|
|
| |
| |
| ALIASES: dict[str, str] = { |
| "steer": "steering", |
| "turn": "steering", |
| "yaw": "steering", |
| "gas": "thrust", |
| "throttle": "thrust", |
| "accelerate": "thrust", |
| "speed": "thrust", |
| "brakes": "brake", |
| "braking": "brake", |
| "vert": "vertical_thruster", |
| "vertical": "vertical_thruster", |
| "vertical_thrust": "vertical_thruster", |
| "vthruster": "vertical_thruster", |
| } |
| normalised: dict[str, Any] = {} |
| for key, val in parsed.items(): |
| canonical = ALIASES.get(key.lower().strip(), key.lower().strip()) |
| normalised[canonical] = val |
|
|
| |
| |
| action: dict[str, Any] = {} |
| missing_fields: list[str] = [] |
| required_fields = ["thrust", "steering", "brake", "vertical_thruster"] |
|
|
| for field in required_fields: |
| if field in normalised: |
| action[field] = normalised[field] |
| else: |
| action[field] = FALLBACK_ACTION[field] |
| missing_fields.append(field) |
|
|
| |
| |
| |
| coerce_errors: list[str] = [] |
| try: |
| action["thrust"] = _clamp(float(action["thrust"]), 0.0, 1.0) |
| except (TypeError, ValueError) as e: |
| action["thrust"] = FALLBACK_ACTION["thrust"] |
| coerce_errors.append(f"thrust coerce: {e}") |
|
|
| try: |
| action["steering"] = _clamp(float(action["steering"]), -1.0, 1.0) |
| except (TypeError, ValueError) as e: |
| action["steering"] = FALLBACK_ACTION["steering"] |
| coerce_errors.append(f"steering coerce: {e}") |
|
|
| try: |
| |
| raw_brake = action["brake"] |
| if isinstance(raw_brake, bool): |
| action["brake"] = 1 if raw_brake else 0 |
| else: |
| action["brake"] = int(round(float(raw_brake))) |
| action["brake"] = max(0, min(1, action["brake"])) |
| except (TypeError, ValueError) as e: |
| action["brake"] = FALLBACK_ACTION["brake"] |
| coerce_errors.append(f"brake coerce: {e}") |
|
|
| try: |
| action["vertical_thruster"] = _clamp(float(action["vertical_thruster"]), -0.2, 0.2) |
| except (TypeError, ValueError) as e: |
| action["vertical_thruster"] = FALLBACK_ACTION["vertical_thruster"] |
| coerce_errors.append(f"vertical_thruster coerce: {e}") |
|
|
| |
| error_parts: list[str] = [] |
| if missing_fields: |
| error_parts.append(f"missing_fields={missing_fields}") |
| if coerce_errors: |
| error_parts.append(f"coerce_errors={coerce_errors}") |
|
|
| error_str = "; ".join(error_parts) if error_parts else None |
| return action, error_str |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| def log_start(task_id: str) -> None: |
| """[START] task=<task> env=<LOCAL_IMAGE_NAME> model=<MODEL_NAME>""" |
| print(f"[START] task={task_id} env={LOCAL_IMAGE_NAME} model={MODEL_NAME}", flush=True) |
|
|
|
|
| def log_step( |
| step_num: int, |
| action: dict[str, Any], |
| reward: float, |
| done: bool, |
| error: str | None, |
| ) -> None: |
| """[STEP] step=<n> action=<json> reward=<float> done=<bool> error=<str|null>""" |
| action_json = json.dumps(action, separators=(",", ":")) |
| error_val = f'"{error}"' if error else "null" |
| print( |
| f"[STEP] step={step_num} " |
| f"action={action_json} " |
| f"reward={reward:.4f} " |
| f"done={str(done).lower()} " |
| f"error={error_val}", |
| flush=True, |
| ) |
|
|
|
|
| def log_end( |
| success: bool, |
| steps: int, |
| score: float, |
| rewards: list[float], |
| ) -> None: |
| """[END] success=<bool> steps=<n> score=<float> rewards=<csv>""" |
| rewards_csv = ",".join(f"{r:.4f}" for r in rewards) |
| print( |
| f"[END] success={str(success).lower()} " |
| f"steps={steps} " |
| f"score={score:.4f} " |
| f"rewards={rewards_csv}", |
| flush=True, |
| ) |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| async def _http_get(session: aiohttp.ClientSession, path: str, **params) -> dict[str, Any]: |
| url = f"{_BASE_URL}{path}" |
| async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as r: |
| r.raise_for_status() |
| return await r.json() |
|
|
|
|
| async def _http_post( |
| session: aiohttp.ClientSession, |
| path: str, |
| body: dict[str, Any], |
| **params, |
| ) -> dict[str, Any]: |
| url = f"{_BASE_URL}{path}" |
| async with session.post( |
| url, json=body, params=params, |
| timeout=aiohttp.ClientTimeout(total=30), |
| ) as r: |
| r.raise_for_status() |
| return await r.json() |
|
|
|
|
| |
| |
| |
|
|
| async def llm_action( |
| client: AsyncOpenAI, |
| task_meta: dict[str, Any], |
| obs: dict[str, Any], |
| step_num: int, |
| ) -> tuple[dict[str, Any], str | None]: |
| """ |
| Ask the LLM for one action and parse its response. |
| |
| Returns (action_dict, error_str). |
| error_str is None on clean parse; a short description on fallback. |
| """ |
| user_msg = build_user_prompt( |
| task_id = task_meta["id"], |
| obs = obs, |
| step_num = step_num, |
| max_steps = task_meta["max_steps"], |
| ) |
|
|
| try: |
| response = await asyncio.wait_for( |
| client.chat.completions.create( |
| model = MODEL_NAME, |
| messages = [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_msg}, |
| ], |
| max_tokens = LLM_MAX_TOKENS, |
| temperature = LLM_TEMPERATURE, |
| ), |
| timeout=LLM_TIMEOUT, |
| ) |
| raw_text = response.choices[0].message.content or "" |
| except asyncio.TimeoutError: |
| return FALLBACK_ACTION.copy(), f"LLM timeout after {LLM_TIMEOUT}s" |
| except Exception as e: |
| return FALLBACK_ACTION.copy(), f"LLM API error: {type(e).__name__}: {e}" |
|
|
| return parse_llm_action(raw_text) |
|
|
|
|
| |
| |
| |
|
|
| async def grade_episode( |
| session: aiohttp.ClientSession, |
| episode_id: str, |
| task_id: str, |
| last_info: dict[str, Any], |
| last_obs: dict[str, Any], |
| ) -> float: |
| """ |
| POST /grader with the trajectory summary from the final step's info dict. |
| Returns the normalised score [0.0, 1.0]. |
| Falls back to 0.0 if the grader call fails. |
| """ |
| body = { |
| "episode_id": episode_id, |
| "task_id": task_id, |
| "termination_reason": last_info.get("termination_reason", "unknown"), |
| "initial_distance": last_info.get("initial_distance", last_obs.get("target_distance", 0.0)), |
| "min_distance_achieved": last_info.get("min_distance", last_obs.get("target_distance", 0.0)), |
| "waypoints_reached": last_info.get("waypoints_hit", 0), |
| "total_waypoints": last_info.get("total_waypoints", 1), |
| "steps_taken": last_info.get("steps", 0), |
| "max_steps": last_info.get("max_steps", 500), |
| "battery_remaining": last_info.get("battery", last_obs.get("battery_level", 0.0)), |
| "collision_count": last_info.get("collision_count", 0), |
| } |
| try: |
| resp = await _http_post(session, "/grader", body) |
| return float(resp.get("score", 0.0)) |
| except Exception as e: |
| print(f"[WARN] /grader call failed: {e}", flush=True) |
| return 0.0 |
|
|
|
|
| |
| |
| |
|
|
| async def run_task( |
| session: aiohttp.ClientSession, |
| client: AsyncOpenAI, |
| task_meta: dict[str, Any], |
| ) -> float: |
| """ |
| Run one complete episode for the given task. |
| |
| Flow |
| ---- |
| 1. POST /reset β episode_id, initial obs |
| 2. [START] log |
| 3. loop until done or truncated: |
| a. call LLM for action |
| b. POST /step β obs, reward, done, truncated, info |
| c. [STEP] log |
| 4. POST /grader β score |
| 5. [END] log |
| 6. return score |
| """ |
| task_id = task_meta["id"] |
| max_steps = task_meta["max_steps"] |
|
|
| |
| reset_resp = await _http_post(session, "/reset", {"task_id": task_id}) |
| episode_id = reset_resp["episode_id"] |
| obs = reset_resp["obs"] |
|
|
| |
| log_start(task_id) |
|
|
| |
| rewards: list[float] = [] |
| step_num: int = 0 |
| last_info: dict = {} |
| last_obs: dict = obs |
|
|
| |
| while True: |
| step_num += 1 |
|
|
| |
| action, parse_error = await llm_action(client, task_meta, obs, step_num) |
|
|
| |
| try: |
| step_resp = await _http_post( |
| session, "/step", action, |
| episode_id=episode_id, |
| ) |
| obs = step_resp["obs"] |
| reward = step_resp["reward"] |
| done = step_resp["done"] |
| truncated = step_resp["truncated"] |
| last_info = step_resp.get("info", {}) |
| last_obs = obs |
| step_error = parse_error |
|
|
| except Exception as e: |
| |
| reward = 0.0 |
| done = True |
| truncated = False |
| step_error = f"step HTTP error: {type(e).__name__}: {e}" |
|
|
| rewards.append(reward) |
|
|
| |
| log_step(step_num, action, reward, done or truncated, step_error) |
|
|
| if done or truncated: |
| break |
|
|
| |
| |
| if step_num >= max_steps: |
| break |
|
|
| |
| score = await grade_episode(session, episode_id, task_id, last_info, last_obs) |
|
|
| |
| success = score > 0.0 |
| log_end(success=success, steps=step_num, score=score, rewards=rewards) |
|
|
| return score |
|
|
|
|
| |
| |
| |
|
|
| async def main() -> int: |
| """ |
| Run all three tasks sequentially. |
| Returns 0 if every task scored > 0.0; returns 1 otherwise. |
| """ |
| |
| missing = [v for v in ("API_BASE_URL", "HF_TOKEN", "MODEL_NAME", "LOCAL_IMAGE_NAME") |
| if not os.environ.get(v)] |
| if missing: |
| |
| |
| print( |
| f"[WARN] The following environment variables are not set and " |
| f"defaults will be used: {missing}", |
| flush=True, |
| ) |
|
|
| if not HF_TOKEN: |
| print( |
| "[ERROR] API_KEY / HF_TOKEN is required for LLM calls. " |
| "Set it as an environment variable and re-run.", |
| file=sys.stderr, flush=True, |
| ) |
| return 2 |
|
|
| |
| |
| |
| llm_client = AsyncOpenAI( |
| api_key = HF_TOKEN, |
| base_url = API_BASE_URL, |
| ) |
|
|
| |
| |
| connector = aiohttp.TCPConnector(limit=4) |
| async with aiohttp.ClientSession(connector=connector) as http_session: |
|
|
| |
| try: |
| tasks_list = await _http_get(http_session, "/tasks") |
| except Exception as e: |
| print(f"[ERROR] Could not reach env server at {_BASE_URL}/tasks: {e}", |
| file=sys.stderr, flush=True) |
| return 2 |
|
|
| |
| tasks_by_id: dict[str, dict] = {t["id"]: t for t in tasks_list} |
|
|
| |
| scores: dict[str, float] = {} |
| for task_id in TASKS: |
| if task_id not in tasks_by_id: |
| print(f"[WARN] task_id={task_id!r} not found in /tasks response β skipping.", |
| flush=True) |
| scores[task_id] = 0.0 |
| continue |
|
|
| task_meta = tasks_by_id[task_id] |
| try: |
| score = await run_task(http_session, llm_client, task_meta) |
| except Exception as e: |
| print(f"[ERROR] Unhandled exception in task={task_id}: {e}", flush=True) |
| log_end(success=False, steps=0, score=0.0, rewards=[]) |
| score = 0.0 |
| scores[task_id] = score |
|
|
| |
| |
| await asyncio.sleep(0.5) |
|
|
| |
| print("\n" + "=" * 60, flush=True) |
| print("INFERENCE COMPLETE", flush=True) |
| print("=" * 60, flush=True) |
| for task_id, score in scores.items(): |
| status = "PASS" if score > 0.0 else "FAIL" |
| print(f" [{status}] {task_id:6s} score={score:.4f}", flush=True) |
| print("=" * 60, flush=True) |
|
|
| all_passed = all(s > 0.0 for s in scores.values()) |
| return 0 if all_passed else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(asyncio.run(main())) |
|
|