Spaces:
Sleeping
Sleeping
File size: 11,006 Bytes
543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f d2e9a6b 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 543a85f c4d6f05 543a85f 9299271 543a85f f6ff18c 9299271 543a85f 9299271 543a85f 9299271 543a85f 9299271 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | """
inference.py β PrivilegeDesk OpenEnv Agent
Runs an LLM agent through all 3 privilege management tasks and emits structured stdout logs.
Required environment variables:
API_BASE_URL LLM API endpoint (OpenAI-compatible)
MODEL_NAME Model identifier
HF_TOKEN HuggingFace / API key
Stdout format (must not deviate):
[START] task=<task> env=<benchmark> model=<model>
[STEP] step=<n> action=<action> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...>
"""
import argparse
import json
import os
import sys
import textwrap
from typing import Any, Dict, List, Optional
from openai import OpenAI
# ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
HF_TOKEN = os.getenv("HF_TOKEN")
MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.3-70B-Instruct")
ENV_URL = os.getenv("ENV_URL", "http://localhost:8000")
BENCHMARK = "privilege_desk"
MAX_STEPS = 20
TEMPERATURE = 0.0
TASK_IDS = ["access_decision", "jit_escalation", "access_review"]
# ββ System prompt βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
SYSTEM_PROMPT = textwrap.dedent("""
You are an enterprise IAM (Identity & Access Management) specialist agent.
You operate inside the PrivilegeDesk environment, which simulates a corporate
zero-standing-privilege access control system.
Your job is to use the available tools to complete the assigned task.
You must respond with EXACTLY ONE JSON object per turn, with this structure:
{
"tool_name": "<tool_name>",
"arguments": { "<key>": "<value>", ... }
}
Rules:
- Only call tools listed in available_tools
- Do not add explanations outside the JSON
- For access.decide: use "approve" or "deny" for the decision field
- For entitlement.revoke: provide entitlement_id
- For review.submit: call it when you have finished revoking all risky entitlements
- When done, the environment will signal done=true
Available context is in the observation JSON you receive.
""").strip()
# ββ Logging helpers (judge-parsed format) ββββββββββββββββββββββββββββββββββββ
def log_start(task: str, env: str, model: str) -> None:
"""[START] line β emitted exactly once at episode begin."""
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str] = None) -> None:
"""[STEP] line β emitted immediately after each env.step() returns."""
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
"""[END] line β always emitted (even on exception) via finally block."""
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}",
flush=True,
)
# ββ LLM call βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def call_llm(client: OpenAI, observation: Dict[str, Any], history: List[str]) -> Dict[str, Any]:
"""Ask the LLM for the next action given the current observation."""
# Build compact observation summary for the prompt
obs_summary = {
"task_id": observation.get("task_id"),
"task_goal": observation.get("task_goal"),
"step": observation.get("step"),
"max_steps": observation.get("max_steps"),
"available_tools": observation.get("available_tools", []),
"pending_requests": observation.get("pending_requests", {}),
"policies": observation.get("policies", {}),
"entitlements": {eid: {k: v for k, v in e.items()
if k in ("role", "user_id", "resource_id",
"days_since_use", "expires_at")}
for eid, e in list(observation.get("entitlements", {}).items())[:10]},
"last_tool_result": observation.get("tool_result"),
"objectives": observation.get("objectives", []),
"review_target_user_id": observation.get("review_target_user_id"),
}
user_msg = (
f"Current observation:\n{json.dumps(obs_summary, indent=2)}\n\n"
+ (f"History:\n" + "\n".join(history[-4:]) + "\n\n" if history else "")
+ "What is your next tool call? Respond with JSON only."
)
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
temperature=TEMPERATURE,
max_tokens=300,
response_format={"type": "json_object"},
)
text = completion.choices[0].message.content or "{}"
action = json.loads(text)
if "tool_name" not in action:
raise ValueError("No tool_name in response")
return action
except Exception as exc:
print(f"[DEBUG] LLM error: {exc}", file=sys.stderr, flush=True)
return {"tool_name": "policy.list", "arguments": {}}
# ββ Episode runner ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def run_episode(client: OpenAI, task_id: str) -> None:
"""Run one full episode for a task, emitting [START]/[STEP]/[END] logs."""
import requests
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
last_error: Optional[str] = None
log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
try:
# Reset
try:
resp = requests.post(
f"{ENV_URL}/reset",
json={"task_id": task_id, "seed": 42},
timeout=30,
)
resp.raise_for_status()
reset_data = resp.json()
except Exception as e:
last_error = str(e)
log_end(success=False, steps=0, score=0.0, rewards=[])
return
observation = reset_data.get("observation", reset_data)
done = False
# Episode loop
while not done and steps_taken < MAX_STEPS:
steps_taken += 1
# Get action from LLM
action = call_llm(client, observation, history)
tool = action.get("tool_name", "unknown")
args = action.get("arguments", {})
action_str = f"{tool}({json.dumps(args)[:40]})"
# Execute step
try:
step_resp = requests.post(
f"{ENV_URL}/step",
json={"action": action},
timeout=30,
)
step_resp.raise_for_status()
step_data = step_resp.json()
last_error = None
except Exception as e:
last_error = str(e)
reward = 0.0
done = True
rewards.append(reward)
log_step(step=steps_taken, action=action_str, reward=reward, done=done, error=last_error)
break
observation = step_data.get("observation", {})
reward = step_data.get("reward", 0.0) or 0.0
done = step_data.get("done", False)
tool_result = observation.get("tool_result", {})
status = (tool_result or {}).get("status", "?")
obs_lines = (tool_result or {}).get("observations", [])
rewards.append(reward)
obs_preview = obs_lines[0][:60] if obs_lines else ""
history.append(f"Step {steps_taken}: {tool} β {status}: {obs_preview}")
log_step(step=steps_taken, action=action_str, reward=reward, done=done, error=last_error)
# Get final grade from environment's grader endpoint
try:
grade_resp = requests.post(f"{ENV_URL}/grader", json={}, timeout=10)
grade_data = grade_resp.json() if grade_resp.ok else {}
score = grade_data.get("score", 0.0)
except Exception:
# Fallback: compute mean of step rewards
score = sum(rewards) / len(rewards) if rewards else 0.0
# Clamp score strictly to (0.01, 0.99) as required by judge
score = max(0.01, min(score, 0.99))
success = score > 0.333 # above random baseline
except Exception as exc:
print(f"[DEBUG] Episode error: {exc}", file=sys.stderr, flush=True)
last_error = str(exc)
finally:
# [END] MUST always be emitted
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
# ββ Main ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def main():
global ENV_URL
parser = argparse.ArgumentParser(description="PrivilegeDesk baseline inference")
parser.add_argument("--task", choices=TASK_IDS, help="Run a specific task only")
parser.add_argument("--all", action="store_true", help="Run all 3 tasks (default)")
parser.add_argument("--url", default=ENV_URL, help="Environment base URL")
args = parser.parse_args()
ENV_URL = args.url
# Check for TASK_NAME environment variable (judge may set this)
target_task = os.getenv("TASK_NAME")
if target_task:
# Map task names to task_ids
if "access_decision" in target_task or "easy" in target_task:
args.task = "access_decision"
elif "jit_escalation" in target_task or "medium" in target_task:
args.task = "jit_escalation"
elif "access_review" in target_task or "hard" in target_task:
args.task = "access_review"
if not HF_TOKEN:
print("[ERROR] HF_TOKEN environment variable not set", file=sys.stderr)
sys.exit(1)
client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
tasks = [args.task] if args.task else TASK_IDS
for task_id in tasks:
run_episode(client, task_id)
if __name__ == "__main__":
main() |