Spaces:
Sleeping
Sleeping
File size: 5,175 Bytes
a95dc70 39605fc a95dc70 39605fc a95dc70 39605fc a95dc70 39605fc a95dc70 48e1617 39605fc a95dc70 39605fc a95dc70 39605fc a95dc70 39605fc a95dc70 39605fc a95dc70 39605fc d57c77b 39605fc a95dc70 39605fc a95dc70 39605fc d57c77b 39605fc d57c77b 39605fc a95dc70 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | import json
import os
import sys
import time
import textwrap
from typing import List, Optional
from openai import OpenAI
import httpx
# ---------------------------------------------------------------------------
# Config (MANDATORY per Checklist)
# ---------------------------------------------------------------------------
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
# Environment Server URL
ENV_URL = os.getenv("ENV_URL", "http://localhost:7860")
if not API_KEY:
# We print and exit to avoid unhandled exceptions later
print("ERROR: HF_TOKEN or API_KEY environment variable is required", flush=True)
sys.exit(1)
# OpenAI Client configured via environment variables
client = OpenAI(api_key=API_KEY, base_url=API_BASE_URL)
# ---------------------------------------------------------------------------
# Logging Utilities
# ---------------------------------------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
# ---------------------------------------------------------------------------
# Agent Logic
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """You are an Eco-Resilient Logistics Agent.
Your goal is to fulfill orders while minimizing CO2.
Available Actions: {"action_type": "order_parts | produce | offset | skip", "part_type": "chips | sensors | batteries | casing", "quantity": count, "mode": "sea | air | rail | road", "product": "EcoPhone | GreenTab"}
Respond ONLY with a valid JSON object."""
def get_action(obs) -> dict:
prompt = f"Current Observation: {json.dumps(obs)}\nChoose next action:"
try:
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
timeout=15.0
)
return json.loads(response.choices[0].message.content)
except Exception as e:
# Emergency fallback to prevent script crash
print(f"[DEBUG] Model error: {e}", file=sys.stderr)
return {"action_type": "skip"}
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def run_task(task_name: str):
success = False
score = 0.01 # Initialize to valid strictly-positive value
steps_taken = 0
rewards = []
log_start(task=task_name, env="netzero-nav", model=MODEL_NAME)
try:
with httpx.Client(base_url=ENV_URL, timeout=30.0) as app:
# Reset environment
resp = app.post("/reset", json={"task": task_name})
obs = resp.json()
done = False
while not done and steps_taken < 50:
steps_taken += 1
action_json = get_action(obs)
# Take step
resp = app.post("/step", json=action_json).json()
obs = resp["observation"]
reward = float(resp["reward"] or 0.0)
done = resp["done"]
info = resp.get("info", {})
error = info.get("error")
rewards.append(reward)
# Format action for logs
act_type = action_json.get("action_type", "skip")
act_part = action_json.get("part_type", "")
act_str = f"{act_type}-{act_part}" if act_part else act_type
log_step(step=steps_taken, action=act_str, reward=reward, done=done, error=error)
if done:
score = float(info.get("final_score", 0.01))
# Clamp strictly between 0 and 1
score = max(0.01, min(0.99, score))
success = score >= 0.95
except Exception as e:
print(f"[DEBUG] Runtime error during task {task_name}: {e}", file=sys.stderr)
finally:
# Final safety clamp before mandatory STDOUT
score = max(0.01, min(0.99, score))
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
if __name__ == "__main__":
for task in ["easy", "medium", "hard"]:
run_task(task)
|