| from __future__ import annotations |
|
|
| import json |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Any, Callable |
|
|
| from openai import OpenAI |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parent / "src")) |
|
|
| from delivery_dispatch_v3.environment import V3DeliveryDispatchEnv |
| from delivery_dispatch_v3.models import V3Action, V3Observation |
| from delivery_dispatch_v3.policies import heuristic_policy |
|
|
|
|
| API_BASE_URL = os.getenv("API_BASE_URL") |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4.1-mini") |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("OPENAI_API_KEY") |
| SUCCESS_SCORE_THRESHOLD = 0.1 |
|
|
| ENV_NAME = "fleetmind_v3" |
| EVALUATION_PUBLIC_SEEDS = { |
| "easy_dispatch": 17031, |
| "medium_dispatch": 27031, |
| "hard_dispatch": 37031, |
| } |
| PolicyFn = Callable[[V3Observation], V3Action] |
| SYSTEM_PROMPT = ( |
| "You are playing a delivery fleet allocation benchmark. " |
| "Return JSON only with the shape " |
| "{\"target_allocations\": [{\"zone_id\": \"...\", \"courier_count\": 0}]}. " |
| "You will be given zone-level courier counts, visible demand, per-order rewards, " |
| "and congestion multipliers. Optimize long-term cumulative reward, not just immediate demand. " |
| "Respect the total courier count and include every zone exactly once." |
| ) |
|
|
|
|
| def _format_bool(value: bool) -> str: |
| return "true" if value else "false" |
|
|
|
|
| def _format_reward(value: float) -> str: |
| return f"{value:.2f}" |
|
|
|
|
| def _action_str(action: V3Action) -> str: |
| return json.dumps(action.model_dump(mode="json"), separators=(",", ":")) |
|
|
|
|
| def _print_start(task_id: str) -> None: |
| print(f"[START] task={task_id} env={ENV_NAME} model={MODEL_NAME}", flush=True) |
|
|
|
|
| def _print_step(step_index: int, action: V3Action, reward: float, done: bool, error: str | None) -> None: |
| error_value = error if error is not None else "null" |
| print( |
| f"[STEP] step={step_index} action={_action_str(action)} " |
| f"reward={_format_reward(reward)} done={_format_bool(done)} error={error_value}", |
| flush=True, |
| ) |
|
|
|
|
| def _print_end(success: bool, rewards: list[float], score: float | None = None) -> None: |
| reward_values = ",".join(_format_reward(value) for value in rewards) |
| score_value = "null" if score is None else f"{score:.4f}" |
| print( |
| f"[END] success={_format_bool(success)} steps={len(rewards)} score={score_value} rewards={reward_values}", |
| flush=True, |
| ) |
|
|
|
|
| def llm_configured() -> bool: |
| return bool(API_KEY and MODEL_NAME) |
|
|
|
|
| def build_client() -> OpenAI: |
| kwargs: dict[str, Any] = {"api_key": API_KEY} |
| if API_BASE_URL: |
| kwargs["base_url"] = API_BASE_URL |
| return OpenAI(**kwargs) |
|
|
|
|
| def parse_action(raw_text: str) -> V3Action: |
| try: |
| payload: dict[str, Any] = json.loads(raw_text) |
| except json.JSONDecodeError: |
| start = raw_text.find("{") |
| end = raw_text.rfind("}") |
| if start == -1 or end == -1 or end <= start: |
| return V3Action() |
| try: |
| payload = json.loads(raw_text[start : end + 1]) |
| except json.JSONDecodeError: |
| return V3Action() |
| try: |
| return V3Action.model_validate(payload) |
| except Exception: |
| return V3Action() |
|
|
|
|
| def choose_action_with_llm(observation: V3Observation) -> V3Action: |
| client = build_client() |
| response = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| { |
| "role": "user", |
| "content": json.dumps(observation.model_dump(mode="json"), separators=(",", ":")), |
| }, |
| ], |
| temperature=0.1, |
| ) |
| raw_text = response.choices[0].message.content or "" |
| return parse_action(raw_text) |
|
|
|
|
| def fallback_policy(observation: V3Observation) -> V3Action: |
| return heuristic_policy(observation) |
|
|
|
|
| def choose_action(observation: V3Observation, prefer_llm: bool) -> tuple[V3Action, str | None]: |
| if not prefer_llm or not llm_configured(): |
| return fallback_policy(observation), None if not prefer_llm else "LLM config missing; using deterministic fallback" |
| try: |
| return choose_action_with_llm(observation), None |
| except Exception as exc: |
| return fallback_policy(observation), str(exc) |
|
|
|
|
| def run_task(task_id: str, seed: int, prefer_llm: bool = True) -> dict[str, Any]: |
| env = V3DeliveryDispatchEnv(default_task_id=task_id) |
| observation = env.reset(task_id=task_id, seed=seed) |
| rewards: list[float] = [] |
| step_index = 0 |
| success = False |
| final_summary: dict[str, Any] | None = None |
|
|
| _print_start(task_id) |
|
|
| try: |
| done = False |
| while not done: |
| step_index += 1 |
| action, error = choose_action(observation, prefer_llm=prefer_llm) |
| result = env.step(action) |
| observation = result.observation |
| rewards.append(result.reward.step_reward) |
| done = result.done |
| if done: |
| final_summary = result.info.get("episode_summary") if isinstance(result.info, dict) else None |
| _print_step(step_index, action, result.reward.step_reward, done, error) |
| success = True |
| except Exception as exc: |
| fallback_action = V3Action() |
| _print_step(step_index + 1, fallback_action, 0.0, True, str(exc)) |
| finally: |
| score = None if final_summary is None else float(final_summary["graded_score"]) |
| success = success and score is not None and score >= SUCCESS_SCORE_THRESHOLD |
| _print_end(success, rewards, score=score) |
|
|
| return { |
| "task_id": task_id, |
| "seed": seed, |
| "raw_reward": 0.0 if final_summary is None else float(final_summary["raw_reward"]), |
| "baseline_reward": 0.0 if final_summary is None else float(final_summary["baseline_reward"]), |
| "target_reward": 0.0 if final_summary is None else float(final_summary["target_reward"]), |
| "score": 0.0 if final_summary is None else float(final_summary["graded_score"]), |
| "heuristic_reward": None if final_summary is None else final_summary.get("heuristic_reward"), |
| } |
|
|
|
|
| def score_tasks(policy_name: str = "baseline") -> dict[str, Any]: |
| prefer_llm = policy_name != "baseline" |
| task_results: list[dict[str, Any]] = [] |
| for task_id, seed in EVALUATION_PUBLIC_SEEDS.items(): |
| task_results.append(run_task(task_id=task_id, seed=seed, prefer_llm=prefer_llm)) |
| overall_score = sum(task["score"] for task in task_results) / len(task_results) |
| return { |
| "tasks": task_results, |
| "overall_score": overall_score, |
| "mode": "llm-first" if prefer_llm else "deterministic-fallback", |
| } |
|
|
|
|
| def main() -> None: |
| for task_id, seed in EVALUATION_PUBLIC_SEEDS.items(): |
| run_task(task_id=task_id, seed=seed, prefer_llm=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|