""" Inference Script — Vector Borne Disease Control =================================== MANDATORY - Before submitting, ensure the following variables are defined in your environment configuration: API_BASE_URL The API endpoint for the LLM. MODEL_NAME The model identifier to use for inference. HF_TOKEN Your Hugging Face / API key. IMAGE_NAME The name of the local Docker image to use for the environment. - Defaults are set only for API_BASE_URL and MODEL_NAME (and should reflect your active inference setup): API_BASE_URL = os.getenv("API_BASE_URL", "") MODEL_NAME = os.getenv("MODEL_NAME", "") - The inference script must be named `inference.py` and placed in the root directory of the project - Participants must use OpenAI Client for all LLM calls using above variables STDOUT FORMAT - The script must emit exactly three line types to stdout, in this order: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= rewards= Rules: - One [START] line at episode begin. - One [STEP] line per step, immediately after env.step() returns. - One [END] line after env.close(), always emitted (even on exception). - reward and rewards are formatted to 2 decimal places. - done and success are lowercase booleans: true or false. - error is the raw last_action_error string, or null if none. - All fields on a single line with no newlines within a line. Example: [START] task=mosquito-control env=vector_borne_disease_control model=Qwen2.5-72B-Instruct [STEP] step=1 action={"action_type":0,"zone_id":3} reward=1.20 done=false error=null [STEP] step=2 action={"action_type":1,"zone_id":7} reward=0.50 done=false error=null [STEP] step=3 action={"action_type":2,"zone_id":null} reward=-0.01 done=true error=null [END] success=true steps=3 rewards=1.20,0.50,-0.01 """ import asyncio import json import os import textwrap from typing import List, Optional, Tuple from dotenv import load_dotenv load_dotenv() from openai import OpenAI from vector_borne_disease_control import VectorBorneDiseaseControlEnv, VectorBorneDiseaseControlAction from vector_borne_disease_control.models import ActionType, VectorBorneDiseaseControlObservation IMAGE_NAME = os.getenv("IMAGE_NAME") or "openenv-vector_borne_disease_control" API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" BENCHMARK = "vector_borne_disease_control" TASKS = ["task1", "task2", "task3"] MAX_STEPS = 30 # environment runs for 30 days (steps) TEMPERATURE = 0.5 # lower temp for more deterministic action selection MAX_TOKENS = 200 SUCCESS_SCORE_THRESHOLD = 0.5 # score = 1 - final_avg_infestation_rate SYSTEM_PROMPT = textwrap.dedent( """ You are an expert epidemiologist controlling mosquito-borne disease across a city modelled as a graph of zones. Each step you receive the current state of all zones and must choose exactly one action: - SPRAY (action_type=0, zone_id=): Apply pesticide to a zone. Reduces infestation by ~85% in that zone and ~9% in its neighbours. Lasts 2 steps. - TRAP (action_type=1, zone_id=): Deploy a trap in a zone. Slows growth to 40% and spread probability to 50%. Lasts 3 steps. - WAIT (action_type=2, zone_id=null): Do nothing this step. You MUST reply with ONLY a single JSON object on one line, no other text: {"action_type": <0|1|2>, "zone_id": } Examples: {"action_type": 0, "zone_id": 5} {"action_type": 1, "zone_id": 12} {"action_type": 2, "zone_id": null} """ ).strip() def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def format_observation(obs: VectorBorneDiseaseControlObservation, last_reward: float) -> str: """Render the observation as a human-readable text block for the LLM.""" lines: List[str] = [] lines.append(f"Step: {obs.step} / {MAX_STEPS}") lines.append(f"Resources — spray remaining: {obs.remaining_spray}, traps remaining: {obs.remaining_traps}") lines.append(f"Cumulative avg infestation rate: {obs.cumulative_avg_infestation_rate:.3f} (last reward: {last_reward:.2f})") lines.append("") # Sort zones by infestation rate descending so the LLM sees the worst first sorted_zones = sorted(obs.zones.items(), key=lambda kv: kv[1].infestation_rate, reverse=True) lines.append("Zone states (sorted by infestation, worst first):") lines.append(f"{'ZoneID':>7} {'Infest':>7} {'Pop':>7} {'Temp°C':>7} {'Hum':>5} {'Rain':>5} {'Trap':>6} {'TrapLeft':>9} {'Spray':>6} {'SprayLeft':>10} {'Neighbours'}") for zone_id, z in sorted_zones: neighbours = obs.adjacency.get(zone_id, []) # Flag infested neighbours infested_neighbours = [str(n) for n in neighbours if obs.zones.get(n, None) and obs.zones[n].infestation_rate > 0.1] neighbour_str = f"[{','.join(infested_neighbours)}] infested / [{','.join(str(n) for n in neighbours)}] all" lines.append( f"{zone_id:>7} {z.infestation_rate:>7.3f} {z.population:>7} {z.temperature:>7.1f} {z.humidity:>5.2f} {z.rained:>5} " f"{'yes':>6} {z.trap_steps_remaining:>9} {'yes':>6} {z.treatment_steps_remaining:>10} {neighbour_str}" if z.has_trap and z.is_treated else f"{zone_id:>7} {z.infestation_rate:>7.3f} {z.population:>7} {z.temperature:>7.1f} {z.humidity:>5.2f} {z.rained:>5} " f"{'yes' if z.has_trap else 'no':>6} {z.trap_steps_remaining:>9} {'yes' if z.is_treated else 'no':>6} {z.treatment_steps_remaining:>10} {neighbour_str}" ) lines.append("") lines.append("Choose your action. Reply with ONLY the JSON object — no explanation, no markdown.") return "\n".join(lines) def parse_llm_action(text: str, valid_zone_ids: set) -> Tuple[VectorBorneDiseaseControlAction, str]: """ Parse the LLM's raw text output into a VectorBorneDiseaseControlAction. Falls back to WAIT on any parse error. Returns (action, action_str_for_logging). """ fallback_action = VectorBorneDiseaseControlAction(action_type=ActionType.WAIT, zone_id=None) fallback_str = json.dumps({"action_type": 2, "zone_id": None}) # Extract the first JSON object from the response text = text.strip() brace_start = text.find("{") brace_end = text.rfind("}") if brace_start == -1 or brace_end == -1: print(f"[DEBUG] No JSON object found in LLM output: {text!r}", flush=True) return fallback_action, fallback_str try: payload = json.loads(text[brace_start : brace_end + 1]) except json.JSONDecodeError as exc: print(f"[DEBUG] JSON parse error: {exc} | text: {text!r}", flush=True) return fallback_action, fallback_str raw_action_type = payload.get("action_type") raw_zone_id = payload.get("zone_id") # Validate action_type try: action_type = ActionType(int(raw_action_type)) except (TypeError, ValueError): print(f"[DEBUG] Invalid action_type: {raw_action_type!r}", flush=True) return fallback_action, fallback_str # Validate zone_id for SPRAY/TRAP zone_id: Optional[int] = None if action_type in (ActionType.SPRAY, ActionType.TRAP): if raw_zone_id is None: print(f"[DEBUG] zone_id required for {action_type.name} but got null — falling back to WAIT", flush=True) return fallback_action, fallback_str try: zone_id = int(raw_zone_id) except (TypeError, ValueError): print(f"[DEBUG] Invalid zone_id: {raw_zone_id!r}", flush=True) return fallback_action, fallback_str if zone_id not in valid_zone_ids: print(f"[DEBUG] zone_id {zone_id} not in valid zones — falling back to WAIT", flush=True) return fallback_action, fallback_str action = VectorBorneDiseaseControlAction(action_type=action_type, zone_id=zone_id) action_str = json.dumps({"action_type": action_type.value, "zone_id": zone_id}) return action, action_str def get_model_action( client: OpenAI, obs: VectorBorneDiseaseControlObservation, last_reward: float, history: List[str], ) -> Tuple[VectorBorneDiseaseControlAction, str]: """Call the LLM with the current observation and return a parsed action.""" user_prompt = format_observation(obs, last_reward) if history: user_prompt = "Recent actions:\n" + "\n".join(history[-4:]) + "\n\n" + user_prompt valid_zone_ids = set(obs.zones.keys()) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() except Exception as exc: print(f"[DEBUG] Model request failed: {exc}", flush=True) fallback = VectorBorneDiseaseControlAction(action_type=ActionType.WAIT, zone_id=None) return fallback, json.dumps({"action_type": 2, "zone_id": None}) return parse_llm_action(text, valid_zone_ids) async def main() -> None: client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) env = await VectorBorneDiseaseControlEnv.from_docker_image(IMAGE_NAME) try: for task_name in TASKS: history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) try: result = await env.reset() obs: VectorBorneDiseaseControlObservation = result.observation last_reward = 0.0 for step in range(1, MAX_STEPS + 1): if result.done: break action, action_str = get_model_action(client, obs, last_reward, history) result = await env.step(action) obs = result.observation reward = result.reward or 0.0 done = result.done error = None rewards.append(reward) steps_taken = step last_reward = reward log_step(step=step, action=action_str, reward=reward, done=done, error=error) history.append( f"Step {step}: {action_str} -> reward {reward:+.2f} | avg_infestation {obs.cumulative_avg_infestation_rate:.3f}" ) if done: break # Score: lower final infestation = higher score score = 1.0 - obs.cumulative_avg_infestation_rate score = min(max(score, 0.01), 0.99) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as e: print(f"[DEBUG] Task {task_name} error: {e}", flush=True) score = max(0.01, min(0.99, score)) log_end(success=success, steps=steps_taken, score=score, rewards=rewards) finally: try: await env.close() except Exception as e: print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True) if __name__ == "__main__": asyncio.run(main())