| """ |
| Inference Script — Vector Borne Disease Control |
| =================================== |
| MANDATORY |
| - Before submitting, ensure the following variables are defined in your environment configuration: |
| API_BASE_URL The API endpoint for the LLM. |
| MODEL_NAME The model identifier to use for inference. |
| HF_TOKEN Your Hugging Face / API key. |
| IMAGE_NAME The name of the local Docker image to use for the environment. |
| |
| - Defaults are set only for API_BASE_URL and MODEL_NAME |
| (and should reflect your active inference setup): |
| API_BASE_URL = os.getenv("API_BASE_URL", "<your-active-endpoint>") |
| MODEL_NAME = os.getenv("MODEL_NAME", "<your-active-model>") |
| |
| - The inference script must be named `inference.py` and placed in the root directory of the project |
| - Participants must use OpenAI Client for all LLM calls using above variables |
| |
| STDOUT FORMAT |
| - The script must emit exactly three line types to stdout, in this order: |
| |
| [START] task=<task_name> env=<benchmark> model=<model_name> |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> |
| [END] success=<true|false> steps=<n> rewards=<r1,r2,...,rn> |
| |
| Rules: |
| - One [START] line at episode begin. |
| - One [STEP] line per step, immediately after env.step() returns. |
| - One [END] line after env.close(), always emitted (even on exception). |
| - reward and rewards are formatted to 2 decimal places. |
| - done and success are lowercase booleans: true or false. |
| - error is the raw last_action_error string, or null if none. |
| - All fields on a single line with no newlines within a line. |
| |
| Example: |
| [START] task=mosquito-control env=vector_borne_disease_control model=Qwen2.5-72B-Instruct |
| [STEP] step=1 action={"action_type":0,"zone_id":3} reward=1.20 done=false error=null |
| [STEP] step=2 action={"action_type":1,"zone_id":7} reward=0.50 done=false error=null |
| [STEP] step=3 action={"action_type":2,"zone_id":null} reward=-0.01 done=true error=null |
| [END] success=true steps=3 rewards=1.20,0.50,-0.01 |
| """ |
|
|
| import asyncio |
| import json |
| import os |
| import textwrap |
| from typing import List, Optional, Tuple |
|
|
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| from openai import OpenAI |
|
|
| from vector_borne_disease_control import VectorBorneDiseaseControlEnv, VectorBorneDiseaseControlAction |
| from vector_borne_disease_control.models import ActionType, VectorBorneDiseaseControlObservation |
|
|
| IMAGE_NAME = os.getenv("IMAGE_NAME") or "openenv-vector_borne_disease_control" |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") |
|
|
| API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" |
| MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" |
| BENCHMARK = "vector_borne_disease_control" |
| TASKS = ["task1", "task2", "task3"] |
|
|
| MAX_STEPS = 30 |
| TEMPERATURE = 0.5 |
| MAX_TOKENS = 200 |
| SUCCESS_SCORE_THRESHOLD = 0.5 |
|
|
| SYSTEM_PROMPT = textwrap.dedent( |
| """ |
| You are an expert epidemiologist controlling mosquito-borne disease across a city modelled as a graph of zones. |
| |
| Each step you receive the current state of all zones and must choose exactly one action: |
| - SPRAY (action_type=0, zone_id=<int>): Apply pesticide to a zone. Reduces infestation by ~85% in that zone and ~9% in its neighbours. Lasts 2 steps. |
| - TRAP (action_type=1, zone_id=<int>): Deploy a trap in a zone. Slows growth to 40% and spread probability to 50%. Lasts 3 steps. |
| - WAIT (action_type=2, zone_id=null): Do nothing this step. |
| |
| You MUST reply with ONLY a single JSON object on one line, no other text: |
| {"action_type": <0|1|2>, "zone_id": <int|null>} |
| |
| Examples: |
| {"action_type": 0, "zone_id": 5} |
| {"action_type": 1, "zone_id": 12} |
| {"action_type": 2, "zone_id": null} |
| """ |
| ).strip() |
|
|
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: |
| error_val = error if error else "null" |
| done_val = str(done).lower() |
| print( |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", |
| flush=True, |
| ) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) |
|
|
|
|
| def format_observation(obs: VectorBorneDiseaseControlObservation, last_reward: float) -> str: |
| """Render the observation as a human-readable text block for the LLM.""" |
| lines: List[str] = [] |
|
|
| lines.append(f"Step: {obs.step} / {MAX_STEPS}") |
| lines.append(f"Resources — spray remaining: {obs.remaining_spray}, traps remaining: {obs.remaining_traps}") |
| lines.append(f"Cumulative avg infestation rate: {obs.cumulative_avg_infestation_rate:.3f} (last reward: {last_reward:.2f})") |
| lines.append("") |
|
|
| |
| sorted_zones = sorted(obs.zones.items(), key=lambda kv: kv[1].infestation_rate, reverse=True) |
|
|
| lines.append("Zone states (sorted by infestation, worst first):") |
| lines.append(f"{'ZoneID':>7} {'Infest':>7} {'Pop':>7} {'Temp°C':>7} {'Hum':>5} {'Rain':>5} {'Trap':>6} {'TrapLeft':>9} {'Spray':>6} {'SprayLeft':>10} {'Neighbours'}") |
|
|
| for zone_id, z in sorted_zones: |
| neighbours = obs.adjacency.get(zone_id, []) |
| |
| infested_neighbours = [str(n) for n in neighbours if obs.zones.get(n, None) and obs.zones[n].infestation_rate > 0.1] |
| neighbour_str = f"[{','.join(infested_neighbours)}] infested / [{','.join(str(n) for n in neighbours)}] all" |
| lines.append( |
| f"{zone_id:>7} {z.infestation_rate:>7.3f} {z.population:>7} {z.temperature:>7.1f} {z.humidity:>5.2f} {z.rained:>5} " |
| f"{'yes':>6} {z.trap_steps_remaining:>9} {'yes':>6} {z.treatment_steps_remaining:>10} {neighbour_str}" |
| if z.has_trap and z.is_treated else |
| f"{zone_id:>7} {z.infestation_rate:>7.3f} {z.population:>7} {z.temperature:>7.1f} {z.humidity:>5.2f} {z.rained:>5} " |
| f"{'yes' if z.has_trap else 'no':>6} {z.trap_steps_remaining:>9} {'yes' if z.is_treated else 'no':>6} {z.treatment_steps_remaining:>10} {neighbour_str}" |
| ) |
|
|
| lines.append("") |
| lines.append("Choose your action. Reply with ONLY the JSON object — no explanation, no markdown.") |
| return "\n".join(lines) |
|
|
|
|
| def parse_llm_action(text: str, valid_zone_ids: set) -> Tuple[VectorBorneDiseaseControlAction, str]: |
| """ |
| Parse the LLM's raw text output into a VectorBorneDiseaseControlAction. |
| Falls back to WAIT on any parse error. |
| Returns (action, action_str_for_logging). |
| """ |
| fallback_action = VectorBorneDiseaseControlAction(action_type=ActionType.WAIT, zone_id=None) |
| fallback_str = json.dumps({"action_type": 2, "zone_id": None}) |
|
|
| |
| text = text.strip() |
| brace_start = text.find("{") |
| brace_end = text.rfind("}") |
| if brace_start == -1 or brace_end == -1: |
| print(f"[DEBUG] No JSON object found in LLM output: {text!r}", flush=True) |
| return fallback_action, fallback_str |
|
|
| try: |
| payload = json.loads(text[brace_start : brace_end + 1]) |
| except json.JSONDecodeError as exc: |
| print(f"[DEBUG] JSON parse error: {exc} | text: {text!r}", flush=True) |
| return fallback_action, fallback_str |
|
|
| raw_action_type = payload.get("action_type") |
| raw_zone_id = payload.get("zone_id") |
|
|
| |
| try: |
| action_type = ActionType(int(raw_action_type)) |
| except (TypeError, ValueError): |
| print(f"[DEBUG] Invalid action_type: {raw_action_type!r}", flush=True) |
| return fallback_action, fallback_str |
|
|
| |
| zone_id: Optional[int] = None |
| if action_type in (ActionType.SPRAY, ActionType.TRAP): |
| if raw_zone_id is None: |
| print(f"[DEBUG] zone_id required for {action_type.name} but got null — falling back to WAIT", flush=True) |
| return fallback_action, fallback_str |
| try: |
| zone_id = int(raw_zone_id) |
| except (TypeError, ValueError): |
| print(f"[DEBUG] Invalid zone_id: {raw_zone_id!r}", flush=True) |
| return fallback_action, fallback_str |
| if zone_id not in valid_zone_ids: |
| print(f"[DEBUG] zone_id {zone_id} not in valid zones — falling back to WAIT", flush=True) |
| return fallback_action, fallback_str |
|
|
| action = VectorBorneDiseaseControlAction(action_type=action_type, zone_id=zone_id) |
| action_str = json.dumps({"action_type": action_type.value, "zone_id": zone_id}) |
| return action, action_str |
|
|
|
|
| def get_model_action( |
| client: OpenAI, |
| obs: VectorBorneDiseaseControlObservation, |
| last_reward: float, |
| history: List[str], |
| ) -> Tuple[VectorBorneDiseaseControlAction, str]: |
| """Call the LLM with the current observation and return a parsed action.""" |
| user_prompt = format_observation(obs, last_reward) |
| if history: |
| user_prompt = "Recent actions:\n" + "\n".join(history[-4:]) + "\n\n" + user_prompt |
|
|
| valid_zone_ids = set(obs.zones.keys()) |
|
|
| try: |
| completion = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_prompt}, |
| ], |
| temperature=TEMPERATURE, |
| max_tokens=MAX_TOKENS, |
| stream=False, |
| ) |
| text = (completion.choices[0].message.content or "").strip() |
| except Exception as exc: |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) |
| fallback = VectorBorneDiseaseControlAction(action_type=ActionType.WAIT, zone_id=None) |
| return fallback, json.dumps({"action_type": 2, "zone_id": None}) |
|
|
| return parse_llm_action(text, valid_zone_ids) |
|
|
|
|
| async def main() -> None: |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) |
|
|
| env = await VectorBorneDiseaseControlEnv.from_docker_image(IMAGE_NAME) |
|
|
| try: |
| for task_name in TASKS: |
| history: List[str] = [] |
| rewards: List[float] = [] |
| steps_taken = 0 |
| score = 0.0 |
| success = False |
|
|
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) |
|
|
| try: |
| result = await env.reset() |
| obs: VectorBorneDiseaseControlObservation = result.observation |
| last_reward = 0.0 |
|
|
| for step in range(1, MAX_STEPS + 1): |
| if result.done: |
| break |
|
|
| action, action_str = get_model_action(client, obs, last_reward, history) |
|
|
| result = await env.step(action) |
| obs = result.observation |
|
|
| reward = result.reward or 0.0 |
| done = result.done |
| error = None |
|
|
| rewards.append(reward) |
| steps_taken = step |
| last_reward = reward |
|
|
| log_step(step=step, action=action_str, reward=reward, done=done, error=error) |
|
|
| history.append( |
| f"Step {step}: {action_str} -> reward {reward:+.2f} | avg_infestation {obs.cumulative_avg_infestation_rate:.3f}" |
| ) |
|
|
| if done: |
| break |
|
|
| |
| score = 1.0 - obs.cumulative_avg_infestation_rate |
| score = min(max(score, 0.01), 0.99) |
| success = score >= SUCCESS_SCORE_THRESHOLD |
|
|
| except Exception as e: |
| print(f"[DEBUG] Task {task_name} error: {e}", flush=True) |
| score = max(0.01, min(0.99, score)) |
|
|
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) |
|
|
| finally: |
| try: |
| await env.close() |
| except Exception as e: |
| print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|