permit-pathfinder / server /permit_env_environment.py
yashppawar's picture
Upload folder using huggingface_hub
b22b2e7 verified
"""
PermitPathfinder Environment.
An agent opens a business by navigating a DAG of municipal permits.
Each permit progresses through 5 effective stages:
LOCKED β†’ AVAILABLE β†’ APPROVED β†’ PAID β†’ ISSUED
- LOCKED: prerequisites not yet met (auto-unlocks when all prereqs ISSUED)
- AVAILABLE: ready to submit (action: submit β†’ instantly APPROVED)
- APPROVED: ready to pay (action: pay β†’ deducts fee, moves to PAID)
- PAID: ready for inspection (action: inspect β†’ ISSUED)
- ISSUED: complete β€” may unlock downstream permits
Permits have prerequisites: a permit becomes AVAILABLE only when all
prerequisite permits are ISSUED. Acting on a locked/wrong-stage permit
is a "wasted submission" that docks reward.
Three tasks expose difficulty progression:
easy_foodtruck β€” 3 permits, no dependencies, budget=500
medium_cafe β€” 6 permits, 2 dependency chains, budget=1000
hard_restaurant β€” 10 permits across 3 agencies, cross-deps, budget=2500
Reward is dense partial-credit per stage progression + budget bonus
βˆ’ waste penalty, clamped to [0.0, 1.0]. Episodes end when all permits
are ISSUED, budget runs out with unfinished work, or max_steps reached.
"""
import os
import random
from typing import Optional
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import PermitAction, PermitObservation
except ImportError:
from models import PermitAction, PermitObservation
# ---------- Stage constants ----------
STAGE_LOCKED = "locked"
STAGE_AVAILABLE = "available"
STAGE_SUBMITTED = "submitted"
STAGE_APPROVED = "approved"
STAGE_PAID = "paid"
STAGE_INSPECTED = "inspected"
STAGE_ISSUED = "issued"
STAGE_ORDER = [
STAGE_LOCKED,
STAGE_AVAILABLE,
STAGE_SUBMITTED,
STAGE_APPROVED,
STAGE_PAID,
STAGE_INSPECTED,
STAGE_ISSUED,
]
MAX_STAGE_VALUE = len(STAGE_ORDER) - 1 # 6
# ---------- Task definitions ----------
TASKS: dict = {
"easy_foodtruck": {
"name": "Food Truck Permit",
"description": "Open a mobile food vendor. 3 permits, no dependencies.",
"budget": 500.0,
"max_steps": 20,
"hidden_prereq_count": 0,
"inquiry_budget": None,
"inquiry_cost": 0,
"permits": {
"business_license": {"fee": 50.0, "prereqs": []},
"food_handler_cert": {"fee": 30.0, "prereqs": []},
"mobile_vendor_permit": {"fee": 100.0, "prereqs": []},
},
},
"medium_cafe": {
"name": "Neighborhood CafΓ©",
"description": "Open a 20-seat cafΓ©. 6 permits, 2 dependency chains.",
"budget": 1000.0,
"max_steps": 40,
"hidden_prereq_count": 2,
"inquiry_budget": 5,
"inquiry_cost": 25,
"permits": {
"business_license": {"fee": 50.0, "prereqs": []},
"zoning_approval": {"fee": 100.0, "prereqs": []},
"health_permit": {"fee": 150.0, "prereqs": ["zoning_approval"]},
"fire_inspection": {"fee": 75.0, "prereqs": ["zoning_approval"]},
"signage_permit": {"fee": 40.0, "prereqs": ["business_license"]},
"food_service_license": {
"fee": 200.0,
"prereqs": ["health_permit", "fire_inspection"],
},
},
},
"hard_restaurant": {
"name": "Full-Service Restaurant",
"description": (
"Open a full restaurant with bar. 10 permits across 3 agencies, "
"cross-dependencies, and a random missing-document event."
),
"budget": 2500.0,
"max_steps": 70,
"hidden_prereq_count": 5,
"inquiry_budget": 3,
"inquiry_cost": 50,
"permits": {
"business_license": {"fee": 75.0, "prereqs": []},
"zoning_variance": {"fee": 200.0, "prereqs": []},
"building_permit": {"fee": 300.0, "prereqs": ["zoning_variance"]},
"plumbing_permit": {"fee": 150.0, "prereqs": ["building_permit"]},
"electrical_permit": {"fee": 150.0, "prereqs": ["building_permit"]},
"hvac_permit": {"fee": 150.0, "prereqs": ["building_permit"]},
"health_permit": {"fee": 200.0, "prereqs": ["plumbing_permit"]},
"fire_certificate": {
"fee": 250.0,
"prereqs": ["electrical_permit", "hvac_permit"],
},
"liquor_license": {
"fee": 500.0,
"prereqs": ["business_license", "zoning_variance"],
},
"food_service_license": {
"fee": 300.0,
"prereqs": ["health_permit", "fire_certificate"],
},
},
},
}
class PermitEnvironment(Environment):
"""
PermitPathfinder environment β€” navigate a municipal permit DAG.
Action space:
PermitAction(action_type, permit_id)
action_type ∈ {submit, pay, inspect, query, list, set_task}
set_task uses permit_id field to carry the task name (workaround
for reset() not accepting kwargs in the base Environment interface).
Observation:
PermitObservation with current permit states, budget, errors,
and the list of legal actions.
Reward:
Computed at every step. Dense partial credit: mean stage progress
across all required permits + budget bonus βˆ’ waste penalty.
Clamped to [0.0, 1.0].
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
"""Initialize with the easy task by default."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._seed: Optional[int] = None
default_task = os.getenv("PERMIT_TASK", "easy_foodtruck")
if default_task not in TASKS:
default_task = "easy_foodtruck"
self._init_task(default_task)
# ---------- Task lifecycle ----------
def _derive_rng(self) -> random.Random:
"""Build a deterministic RNG.
When a seed is provided, the RNG depends ONLY on (seed, task_name)
so identical seeds produce identical episodes β€” required for
reproducibility. When no seed is given, the random episode_id
provides per-reset variation.
"""
if self._seed is not None:
key = f"{self._seed}|{self._task_name}"
else:
key = f"{self._state.episode_id}|{self._task_name}"
return random.Random(hash(key) & 0xFFFFFFFF)
def _init_task(self, task_name: str) -> None:
"""Load a task configuration with seeded per-episode variation.
Randomization injected per reset:
- permit iteration order is shuffled (breaks 'first-legal' tricks)
- fees are jittered by Β±20% (breaks exact memoization of optimal
policies and forces the agent to read the current fee)
- budget is also jittered Β±10% so fee/budget ratios differ
"""
task = TASKS[task_name]
self._task_name = task_name
self._max_steps = task["max_steps"]
self._wasted = 0
self._rng = self._derive_rng()
base_budget = task["budget"]
budget_jitter = 1.0 + self._rng.uniform(-0.10, 0.10)
self._budget = round(base_budget * budget_jitter, 2)
# Stored so we can compute budget_frac in _compute_reward
self._initial_budget = self._budget
# Shuffled permit iteration order
permit_items = list(task["permits"].items())
self._rng.shuffle(permit_items)
self._permits = {}
for pid, cfg in permit_items:
fee_jitter = 1.0 + self._rng.uniform(-0.20, 0.20)
fee = round(cfg["fee"] * fee_jitter, 2)
self._permits[pid] = {
"fee": fee,
"prereqs": list(cfg["prereqs"]),
"stage": (
STAGE_AVAILABLE if not cfg["prereqs"] else STAGE_LOCKED
),
}
# --- Partial observability: hide some prereqs ---
self._hidden_prereqs: dict[str, list] = {}
hidden_count = task.get("hidden_prereq_count", 0)
if hidden_count > 0:
# Pick permits that actually have prereqs
candidates = [
pid for pid, p in self._permits.items() if p["prereqs"]
]
self._rng.shuffle(candidates)
for pid in candidates[:hidden_count]:
self._hidden_prereqs[pid] = list(self._permits[pid]["prereqs"])
self._permits[pid]["prereqs"] = ["???"]
# --- Inquiry budget ---
self._queries_used: int = 0
self._inquiry_budget = task.get("inquiry_budget") # None = unlimited
self._inquiry_cost = task.get("inquiry_cost", 0)
self._done = False
self._missing_doc_fired = False
self._regulation_event_fired = False
def _real_prereqs(self, pid: str) -> list:
"""Return the real prereqs for a permit, whether hidden or visible."""
if pid in self._hidden_prereqs:
return self._hidden_prereqs[pid]
return self._permits[pid]["prereqs"]
def _update_unlocks(self) -> None:
"""Promote LOCKED β†’ AVAILABLE when all real prereqs are ISSUED."""
for pid, p in self._permits.items():
if p["stage"] != STAGE_LOCKED:
continue
real = self._real_prereqs(pid)
if all(
self._permits[dep]["stage"] == STAGE_ISSUED
for dep in real
):
p["stage"] = STAGE_AVAILABLE
def _check_done(self) -> bool:
"""Episode ends when all permits ISSUED, no progress possible, or step cap hit."""
if all(p["stage"] == STAGE_ISSUED for p in self._permits.values()):
return True
if self._state.step_count >= self._max_steps:
return True
# No legal progress possible: any unissued permit is either locked with
# stuck prereqs OR approved but budget < fee
for p in self._permits.values():
stage = p["stage"]
if stage == STAGE_ISSUED:
continue
if stage == STAGE_AVAILABLE:
return False
if stage == STAGE_APPROVED and self._budget >= p["fee"]:
return False
if stage in (STAGE_PAID, STAGE_SUBMITTED):
return False
if stage == STAGE_LOCKED:
# locked is fine if some unissued prereq can still make progress
return False
return True
# ---------- Reward ----------
def _compute_reward(self) -> float:
"""Dense partial credit: stage progress + budget bonus βˆ’ waste."""
if not self._permits:
return 0.0
total_stage = 0.0
for p in self._permits.values():
total_stage += STAGE_ORDER.index(p["stage"]) / MAX_STAGE_VALUE
base = total_stage / len(self._permits)
initial_budget = getattr(self, "_initial_budget", 0.0)
budget_frac = max(0.0, self._budget / initial_budget) if initial_budget else 0.0
# Budget bonus only if agent has actually made meaningful progress
budget_bonus = 0.1 * budget_frac * base
waste_penalty = min(0.25, 0.02 * self._wasted)
reward = base + budget_bonus - waste_penalty
return max(0.0, min(1.0, reward))
# ---------- Action helpers ----------
def _available_actions(self) -> list:
"""Return the set of action TYPES currently legal on at least
one permit. Intentionally does NOT expose permit IDs β€” the agent
must read the `permits` dict and reason about which ID to target.
This prevents a trivial "pick the first string" solution."""
types = {"list", "query"}
for p in self._permits.values():
stage = p["stage"]
if stage == STAGE_AVAILABLE:
types.add("submit")
elif stage == STAGE_APPROVED:
types.add("pay")
elif stage == STAGE_PAID:
types.add("inspect")
# Sorted for stable observation payload
return sorted(types)
def _snapshot_permits(self) -> dict:
"""Serialize permits for observation payload."""
snapshot = {}
for pid, p in self._permits.items():
if pid in self._hidden_prereqs:
# Hidden: show placeholder prereqs, always unmet
snapshot[pid] = {
"stage": p["stage"],
"fee": p["fee"],
"prereqs": ["???"],
"prereqs_met": False,
}
else:
prereqs_met = all(
self._permits[dep]["stage"] == STAGE_ISSUED
for dep in p["prereqs"]
)
snapshot[pid] = {
"stage": p["stage"],
"fee": p["fee"],
"prereqs": p["prereqs"],
"prereqs_met": prereqs_met,
}
return snapshot
def _build_observation(
self, message: str, error: Optional[str] = None
) -> PermitObservation:
reward = self._compute_reward()
return PermitObservation(
message=message,
permits=self._snapshot_permits(),
budget_remaining=round(self._budget, 2),
wasted_submissions=self._wasted,
last_action_error=error,
available_actions=self._available_actions(),
task_name=self._task_name,
done=self._done,
reward=reward,
metadata={
"step_count": self._state.step_count,
"max_steps": self._max_steps,
"reward_breakdown": {
"completion_progress": (
sum(1 for p in self._permits.values() if p["stage"] == STAGE_ISSUED)
/ max(len(self._permits), 1)
),
"cost_efficiency": (
self._budget / self._initial_budget
if self._initial_budget
else 0.0
),
"investigation_efficiency": (
1.0 - (
max(0, self._queries_used - (self._inquiry_budget or self._queries_used))
/ max(self._queries_used, 1)
)
),
"error_rate": (
1.0 - (self._wasted / max(self._state.step_count, 1))
),
},
},
)
# ---------- Environment API ----------
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_name: Optional[str] = None,
**kwargs,
) -> PermitObservation:
"""Reset the environment per OpenEnv best practice.
Accepts optional kwargs:
- seed: deterministic RNG seed. When omitted, a fresh
episode_id is used (non-deterministic).
- episode_id: caller-supplied episode identifier.
- task_name: one of TASKS keys. Falls back to PERMIT_TASK env
var, then 'easy_foodtruck'.
Extra kwargs are accepted silently so the HTTP server layer can
forward arbitrary JSON bodies (e.g. empty {}) without raising.
"""
self._state = State(
episode_id=episode_id or str(uuid4()),
step_count=0,
)
self._seed = seed
chosen = task_name or os.getenv(
"PERMIT_TASK", self._task_name or "easy_foodtruck"
)
if chosen not in TASKS:
chosen = "easy_foodtruck"
self._init_task(chosen)
return self._build_observation(
message=(
f"Permit environment ready. Task: {self._task_name}. "
f"Budget: ${self._budget:.2f}. "
f"Read the 'permits' dict to see each permit's stage, "
f"fee, and prereqs, then submit β†’ pay β†’ inspect each."
),
error=None,
)
def step(self, action: PermitAction) -> PermitObservation: # type: ignore[override]
self._state.step_count += 1
atype = (action.action_type or "").strip().lower()
pid = action.permit_id
error: Optional[str] = None
message = ""
# Task switcher (workaround since reset() can't accept kwargs)
if atype == "set_task":
if pid in TASKS:
self._init_task(pid)
self._state.step_count = 0
self._done = False
return self._build_observation(
message=f"Task switched to {pid}.",
error=None,
)
else:
error = f"Unknown task: {pid}"
self._wasted += 1
elif atype == "list":
message = "Listing all permits."
elif atype == "query":
if pid not in self._permits:
error = f"Unknown permit: {pid}"
self._wasted += 1
else:
# Inquiry budget tracking
self._queries_used += 1
budget_warning = ""
if (
self._inquiry_budget is not None
and self._queries_used > self._inquiry_budget
):
cost = self._inquiry_cost
if self._budget >= cost:
self._budget -= cost
budget_warning = (
f" [Inquiry fee: ${cost} deducted. "
f"Budget: ${self._budget:.2f}]"
)
else:
budget_warning = (
f" [WARNING: Insufficient budget for ${cost} "
f"inquiry fee (have ${self._budget:.2f})]"
)
# Reveal hidden prereqs if queried
if pid in self._hidden_prereqs:
real_prereqs = self._hidden_prereqs.pop(pid)
self._permits[pid]["prereqs"] = real_prereqs
message = (
f"{pid}: stage={self._permits[pid]['stage']} "
f"fee=${self._permits[pid]['fee']:.2f} "
f"prereqs={real_prereqs} [REVEALED]"
f"{budget_warning}"
)
else:
p = self._permits[pid]
message = (
f"{pid}: stage={p['stage']} fee=${p['fee']:.2f} "
f"prereqs={p['prereqs']}"
f"{budget_warning}"
)
elif atype == "submit":
if pid not in self._permits:
error = f"Unknown permit: {pid}"
self._wasted += 1
else:
p = self._permits[pid]
if p["stage"] != STAGE_AVAILABLE:
error = (
f"Cannot submit {pid}: stage={p['stage']} "
f"(must be 'available')."
)
self._wasted += 1
else:
p["stage"] = STAGE_SUBMITTED
# Auto-approve after submit (simulates 1-step review)
p["stage"] = STAGE_APPROVED
message = f"Submitted and approved {pid}."
elif atype == "pay":
if pid not in self._permits:
error = f"Unknown permit: {pid}"
self._wasted += 1
else:
p = self._permits[pid]
if p["stage"] != STAGE_APPROVED:
error = (
f"Cannot pay {pid}: stage={p['stage']} "
f"(must be 'approved')."
)
self._wasted += 1
elif self._budget < p["fee"]:
error = (
f"Insufficient budget: need ${p['fee']:.2f}, "
f"have ${self._budget:.2f}."
)
self._wasted += 1
else:
self._budget -= p["fee"]
p["stage"] = STAGE_PAID
message = (
f"Paid ${p['fee']:.2f} for {pid}. "
f"Budget: ${self._budget:.2f}."
)
elif atype == "inspect":
if pid not in self._permits:
error = f"Unknown permit: {pid}"
self._wasted += 1
else:
p = self._permits[pid]
if p["stage"] != STAGE_PAID:
error = (
f"Cannot inspect {pid}: stage={p['stage']} "
f"(must be 'paid')."
)
self._wasted += 1
else:
p["stage"] = STAGE_INSPECTED
p["stage"] = STAGE_ISSUED
message = f"Inspection passed, {pid} ISSUED."
else:
error = (
f"Unknown action_type: '{atype}'. "
f"Valid: submit, pay, inspect, query, list, set_task."
)
self._wasted += 1
# Propagate unlocks from newly-issued permits
self._update_unlocks()
# Missing-doc event (hard task only): once, around step 10, one
# already-issued permit gets knocked back to PAID and must be
# re-inspected. Adds realism without creating a budget trap.
if (
self._task_name == "hard_restaurant"
and not self._missing_doc_fired
and self._state.step_count >= 10
):
candidates = [
pid for pid, p in self._permits.items()
if p["stage"] == STAGE_ISSUED and p["prereqs"]
]
if candidates:
victim = self._rng.choice(candidates)
self._permits[victim]["stage"] = STAGE_PAID
self._missing_doc_fired = True
if not error:
message += f" [EVENT] Missing document: {victim} reverted to PAID (needs re-inspection)."
# Dynamic regulation event (hard task only): once, at a random step
# between 15-25, either hike a fee or add a new prerequisite.
if (
self._task_name == "hard_restaurant"
and not self._regulation_event_fired
and self._state.step_count >= 15
):
event_type = self._rng.choice(["fee_hike", "new_prereq"])
if event_type == "fee_hike":
# Pick a random permit NOT yet ISSUED, in stage AVAILABLE or APPROVED
candidates = [
pid for pid, p in self._permits.items()
if p["stage"] in (STAGE_AVAILABLE, STAGE_APPROVED)
]
if candidates:
target = self._rng.choice(candidates)
new_fee = round(self._permits[target]["fee"] * 2, 2)
self._permits[target]["fee"] = new_fee
self._regulation_event_fired = True
if not error:
message += (
f" [EVENT] Regulatory update: {target} fee"
f" increased to ${new_fee:.2f}"
)
elif event_type == "new_prereq":
# Pick a random permit NOT yet ISSUED, in stage LOCKED or AVAILABLE,
# that has at least one real prereq
candidates = [
pid for pid, p in self._permits.items()
if p["stage"] in (STAGE_LOCKED, STAGE_AVAILABLE)
and self._real_prereqs(pid)
]
issued_permits = [
pid for pid, p in self._permits.items()
if p["stage"] == STAGE_ISSUED
]
if candidates and issued_permits:
target = self._rng.choice(candidates)
real = self._real_prereqs(target)
# Pick a random already-ISSUED permit as the new prereq
# (exclude those already in the target's prereqs)
valid_new_prereqs = [
ip for ip in issued_permits
if ip not in real
]
if valid_new_prereqs:
new_prereq = self._rng.choice(valid_new_prereqs)
# Add to real prereqs (handles hidden prereq case)
if target in self._hidden_prereqs:
self._hidden_prereqs[target].append(new_prereq)
else:
self._permits[target]["prereqs"].append(new_prereq)
# If the permit was AVAILABLE and the new prereq is NOT
# issued, revert to LOCKED (though we picked from issued
# permits, so this won't normally trigger β€” but we check
# for correctness)
if (
self._permits[target]["stage"] == STAGE_AVAILABLE
and self._permits[new_prereq]["stage"] != STAGE_ISSUED
):
self._permits[target]["stage"] = STAGE_LOCKED
self._regulation_event_fired = True
if not error:
message += (
f" [EVENT] Regulatory update: {target}"
f" now also requires {new_prereq}"
)
self._done = self._check_done()
return self._build_observation(message=message, error=error)
@property
def state(self) -> State:
return self._state