Macromrit's picture
Upload folder using huggingface_hub
bab7319 verified
"""
Inference Script — Vector Borne Disease Control
===================================
MANDATORY
- Before submitting, ensure the following variables are defined in your environment configuration:
API_BASE_URL The API endpoint for the LLM.
MODEL_NAME The model identifier to use for inference.
HF_TOKEN Your Hugging Face / API key.
IMAGE_NAME The name of the local Docker image to use for the environment.
- Defaults are set only for API_BASE_URL and MODEL_NAME
(and should reflect your active inference setup):
API_BASE_URL = os.getenv("API_BASE_URL", "<your-active-endpoint>")
MODEL_NAME = os.getenv("MODEL_NAME", "<your-active-model>")
- The inference script must be named `inference.py` and placed in the root directory of the project
- Participants must use OpenAI Client for all LLM calls using above variables
STDOUT FORMAT
- The script must emit exactly three line types to stdout, in this order:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> rewards=<r1,r2,...,rn>
Rules:
- One [START] line at episode begin.
- One [STEP] line per step, immediately after env.step() returns.
- One [END] line after env.close(), always emitted (even on exception).
- reward and rewards are formatted to 2 decimal places.
- done and success are lowercase booleans: true or false.
- error is the raw last_action_error string, or null if none.
- All fields on a single line with no newlines within a line.
Example:
[START] task=mosquito-control env=vector_borne_disease_control model=Qwen2.5-72B-Instruct
[STEP] step=1 action={"action_type":0,"zone_id":3} reward=1.20 done=false error=null
[STEP] step=2 action={"action_type":1,"zone_id":7} reward=0.50 done=false error=null
[STEP] step=3 action={"action_type":2,"zone_id":null} reward=-0.01 done=true error=null
[END] success=true steps=3 rewards=1.20,0.50,-0.01
"""
import asyncio
import json
import os
import textwrap
from typing import List, Optional, Tuple
from dotenv import load_dotenv
load_dotenv()
from openai import OpenAI
from vector_borne_disease_control import VectorBorneDiseaseControlEnv, VectorBorneDiseaseControlAction
from vector_borne_disease_control.models import ActionType, VectorBorneDiseaseControlObservation
IMAGE_NAME = os.getenv("IMAGE_NAME") or "openenv-vector_borne_disease_control"
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1"
MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct"
BENCHMARK = "vector_borne_disease_control"
TASKS = ["task1", "task2", "task3"]
MAX_STEPS = 30 # environment runs for 30 days (steps)
TEMPERATURE = 0.5 # lower temp for more deterministic action selection
MAX_TOKENS = 200
SUCCESS_SCORE_THRESHOLD = 0.5 # score = 1 - final_avg_infestation_rate
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an expert epidemiologist controlling mosquito-borne disease across a city modelled as a graph of zones.
Each step you receive the current state of all zones and must choose exactly one action:
- SPRAY (action_type=0, zone_id=<int>): Apply pesticide to a zone. Reduces infestation by ~85% in that zone and ~9% in its neighbours. Lasts 2 steps.
- TRAP (action_type=1, zone_id=<int>): Deploy a trap in a zone. Slows growth to 40% and spread probability to 50%. Lasts 3 steps.
- WAIT (action_type=2, zone_id=null): Do nothing this step.
You MUST reply with ONLY a single JSON object on one line, no other text:
{"action_type": <0|1|2>, "zone_id": <int|null>}
Examples:
{"action_type": 0, "zone_id": 5}
{"action_type": 1, "zone_id": 12}
{"action_type": 2, "zone_id": null}
"""
).strip()
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
def format_observation(obs: VectorBorneDiseaseControlObservation, last_reward: float) -> str:
"""Render the observation as a human-readable text block for the LLM."""
lines: List[str] = []
lines.append(f"Step: {obs.step} / {MAX_STEPS}")
lines.append(f"Resources — spray remaining: {obs.remaining_spray}, traps remaining: {obs.remaining_traps}")
lines.append(f"Cumulative avg infestation rate: {obs.cumulative_avg_infestation_rate:.3f} (last reward: {last_reward:.2f})")
lines.append("")
# Sort zones by infestation rate descending so the LLM sees the worst first
sorted_zones = sorted(obs.zones.items(), key=lambda kv: kv[1].infestation_rate, reverse=True)
lines.append("Zone states (sorted by infestation, worst first):")
lines.append(f"{'ZoneID':>7} {'Infest':>7} {'Pop':>7} {'Temp°C':>7} {'Hum':>5} {'Rain':>5} {'Trap':>6} {'TrapLeft':>9} {'Spray':>6} {'SprayLeft':>10} {'Neighbours'}")
for zone_id, z in sorted_zones:
neighbours = obs.adjacency.get(zone_id, [])
# Flag infested neighbours
infested_neighbours = [str(n) for n in neighbours if obs.zones.get(n, None) and obs.zones[n].infestation_rate > 0.1]
neighbour_str = f"[{','.join(infested_neighbours)}] infested / [{','.join(str(n) for n in neighbours)}] all"
lines.append(
f"{zone_id:>7} {z.infestation_rate:>7.3f} {z.population:>7} {z.temperature:>7.1f} {z.humidity:>5.2f} {z.rained:>5} "
f"{'yes':>6} {z.trap_steps_remaining:>9} {'yes':>6} {z.treatment_steps_remaining:>10} {neighbour_str}"
if z.has_trap and z.is_treated else
f"{zone_id:>7} {z.infestation_rate:>7.3f} {z.population:>7} {z.temperature:>7.1f} {z.humidity:>5.2f} {z.rained:>5} "
f"{'yes' if z.has_trap else 'no':>6} {z.trap_steps_remaining:>9} {'yes' if z.is_treated else 'no':>6} {z.treatment_steps_remaining:>10} {neighbour_str}"
)
lines.append("")
lines.append("Choose your action. Reply with ONLY the JSON object — no explanation, no markdown.")
return "\n".join(lines)
def parse_llm_action(text: str, valid_zone_ids: set) -> Tuple[VectorBorneDiseaseControlAction, str]:
"""
Parse the LLM's raw text output into a VectorBorneDiseaseControlAction.
Falls back to WAIT on any parse error.
Returns (action, action_str_for_logging).
"""
fallback_action = VectorBorneDiseaseControlAction(action_type=ActionType.WAIT, zone_id=None)
fallback_str = json.dumps({"action_type": 2, "zone_id": None})
# Extract the first JSON object from the response
text = text.strip()
brace_start = text.find("{")
brace_end = text.rfind("}")
if brace_start == -1 or brace_end == -1:
print(f"[DEBUG] No JSON object found in LLM output: {text!r}", flush=True)
return fallback_action, fallback_str
try:
payload = json.loads(text[brace_start : brace_end + 1])
except json.JSONDecodeError as exc:
print(f"[DEBUG] JSON parse error: {exc} | text: {text!r}", flush=True)
return fallback_action, fallback_str
raw_action_type = payload.get("action_type")
raw_zone_id = payload.get("zone_id")
# Validate action_type
try:
action_type = ActionType(int(raw_action_type))
except (TypeError, ValueError):
print(f"[DEBUG] Invalid action_type: {raw_action_type!r}", flush=True)
return fallback_action, fallback_str
# Validate zone_id for SPRAY/TRAP
zone_id: Optional[int] = None
if action_type in (ActionType.SPRAY, ActionType.TRAP):
if raw_zone_id is None:
print(f"[DEBUG] zone_id required for {action_type.name} but got null — falling back to WAIT", flush=True)
return fallback_action, fallback_str
try:
zone_id = int(raw_zone_id)
except (TypeError, ValueError):
print(f"[DEBUG] Invalid zone_id: {raw_zone_id!r}", flush=True)
return fallback_action, fallback_str
if zone_id not in valid_zone_ids:
print(f"[DEBUG] zone_id {zone_id} not in valid zones — falling back to WAIT", flush=True)
return fallback_action, fallback_str
action = VectorBorneDiseaseControlAction(action_type=action_type, zone_id=zone_id)
action_str = json.dumps({"action_type": action_type.value, "zone_id": zone_id})
return action, action_str
def get_model_action(
client: OpenAI,
obs: VectorBorneDiseaseControlObservation,
last_reward: float,
history: List[str],
) -> Tuple[VectorBorneDiseaseControlAction, str]:
"""Call the LLM with the current observation and return a parsed action."""
user_prompt = format_observation(obs, last_reward)
if history:
user_prompt = "Recent actions:\n" + "\n".join(history[-4:]) + "\n\n" + user_prompt
valid_zone_ids = set(obs.zones.keys())
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
fallback = VectorBorneDiseaseControlAction(action_type=ActionType.WAIT, zone_id=None)
return fallback, json.dumps({"action_type": 2, "zone_id": None})
return parse_llm_action(text, valid_zone_ids)
async def main() -> None:
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
env = await VectorBorneDiseaseControlEnv.from_docker_image(IMAGE_NAME)
try:
for task_name in TASKS:
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME)
try:
result = await env.reset()
obs: VectorBorneDiseaseControlObservation = result.observation
last_reward = 0.0
for step in range(1, MAX_STEPS + 1):
if result.done:
break
action, action_str = get_model_action(client, obs, last_reward, history)
result = await env.step(action)
obs = result.observation
reward = result.reward or 0.0
done = result.done
error = None
rewards.append(reward)
steps_taken = step
last_reward = reward
log_step(step=step, action=action_str, reward=reward, done=done, error=error)
history.append(
f"Step {step}: {action_str} -> reward {reward:+.2f} | avg_infestation {obs.cumulative_avg_infestation_rate:.3f}"
)
if done:
break
# Score: lower final infestation = higher score
score = 1.0 - obs.cumulative_avg_infestation_rate
score = min(max(score, 0.01), 0.99)
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as e:
print(f"[DEBUG] Task {task_name} error: {e}", flush=True)
score = max(0.01, min(0.99, score))
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
finally:
try:
await env.close()
except Exception as e:
print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True)
if __name__ == "__main__":
asyncio.run(main())