SmartContractAudit / inference.py
ajaxwin
task1, task2 evaluated
671787b
raw
history blame
18.1 kB
"""
inference.py
------------
Baseline inference script β€” Smart Contract Audit RL Environment.
Implements agents for all three tasks using the OpenAI-compatible client.
Emits mandatory structured stdout in the OpenEnv format.
MANDATORY ENV VARS:
API_BASE_URL LLM API endpoint (default: https://api.openai.com/v1)
MODEL_NAME Model identifier (default: gpt-4o-mini)
HF_TOKEN API key / HF token
MANDATORY STDOUT FORMAT (per episode):
[START] task=<id> env=smart-contract-audit model=<model>
[STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<str|null>
[END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...>
Usage:
python inference.py
Output:
Structured stdout per episode, plus baseline_scores.json summary.
"""
import asyncio
import json
import os
import sys
import time
from typing import Any, Dict, List, Optional
from openai import OpenAI
from tasks.task1 import Task1Environment
from tasks.task2 import Task2Environment
from tasks.task3 import Task3Environment
from env.schemas import Action, ActionType
from utils import T1_SYSTEM, T2_SYSTEM, T3_SYSTEM
# ─────────────────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────────────────
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini")
HF_TOKEN = os.getenv("HF_TOKEN", "")
if not HF_TOKEN:
print("[WARN] HF_TOKEN not set β€” API calls may fail.", file=sys.stderr)
# Benchmark / environment identifier (constant for this env)
ENV_BENCHMARK = "smart-contract-audit"
# Episodes per task
NUM_EPISODES = 3
SEED_BASE = 42
# Max steps per task
MAX_STEPS_T1 = 15
MAX_STEPS_T2 = 10
MAX_STEPS_T3 = 12
# A grader_score >= this is considered a "success" for the [END] line
SUCCESS_SCORE_THRESHOLD = 0.5
client = OpenAI(api_key=HF_TOKEN, base_url=API_BASE_URL)
# ─────────────────────────────────────────────────────────────────────────────
# Mandatory stdout helpers
# ─────────────────────────────────────────────────────────────────────────────
def log_start(task: str, env: str, model: str) -> None:
"""Emit the [START] line β€” one per episode."""
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(
step: int,
action: str,
reward: float,
done: bool,
error: Optional[str] = None,
) -> None:
"""Emit a [STEP] line β€” one per env.step() call."""
error_val = error if error else "null"
print(
f"[STEP] step={step} action={action} "
f"reward={reward:.2f} done={str(done).lower()} error={error_val}",
flush=True,
)
def log_end(
success: bool,
steps: int,
score: float,
rewards: List[float],
) -> None:
"""Emit the [END] line β€” one per episode, always emitted."""
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} "
f"score={score:.3f} rewards={rewards_str}",
flush=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Task 1 β€” Targeted Vulnerability Detection
# ─────────────────────────────────────────────────────────────────────────────
def _t1_user_msg(obs: Dict[str, Any]) -> str:
return (
f"Contract: {obs['contract_name']}\n"
f"Description: {obs['contract_description']}\n"
f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n"
f"Last action : {obs['last_action'] or 'None'}\n"
f"Last result : {obs['last_action_result'] or 'Episode just started.'}"
)
def _run_t1_episode(env: Task1Environment, seed: int, ep_num: int) -> Dict[str, Any]:
"""Run one Task 1 episode; emit [START]/[STEP]/[END]."""
r = env.reset(seed=seed)
obs = r.observation.model_dump()
log_start(task="task1_vuln_detection", env=ENV_BENCHMARK, model=MODEL_NAME)
messages = [{"role": "system", "content": T1_SYSTEM}]
step_rewards: List[float] = []
grader_score = 0.0
steps_taken = 0
error_msg: Optional[str] = None
try:
for step in range(1, MAX_STEPS_T1 + 1):
messages.append({"role": "user", "content": _t1_user_msg(obs)})
try:
resp = client.chat.completions.create(
model=MODEL_NAME, messages=messages,
max_tokens=200, temperature=0.0,
)
raw = resp.choices[0].message.content.strip() # type: ignore
error_msg = None
except Exception as e:
raw = ""
error_msg = str(e)[:80]
print(f"[DEBUG] T1 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr)
try:
parsed = json.loads(raw)
at = ActionType(parsed["action"])
params = parsed.get("params", {})
except Exception:
at, params = ActionType.LIST_FUNCTIONS, {}
messages.append({"role": "assistant", "content": raw})
result = env.step(Action(action_type=at, params=params))
obs = result.observation.model_dump()
r_val = result.reward.value
done = result.done
step_rewards.append(r_val)
steps_taken = step
log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg)
if done:
v = r_val
grader_score = 1.0 if v >= 4.9 else (0.5 if v >= 0.9 else 0.0)
break
time.sleep(0.3)
finally:
success = grader_score >= SUCCESS_SCORE_THRESHOLD
log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards)
return {
"episode": ep_num,
"seed": seed,
"contract": obs["contract_name"],
"grader_score": grader_score,
"cumulative_reward": obs["cumulative_reward"],
}
# ─────────────────────────────────────────────────────────────────────────────
# Task 2 β€” Property Discovery
# ─────────────────────────────────────────────────────────────────────────────
def _t2_user_msg(obs: Dict[str, Any]) -> str:
extra = obs.get("extra", {})
return (
f"Contract : {obs['contract_name']}\n"
f"Function : {extra.get('target_function', '?')} "
f"({extra.get('target_signature', '')})\n"
f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n"
f"Last action : {obs['last_action'] or 'None'}\n"
f"Last result :\n{obs['last_action_result'] or 'Episode just started.'}"
)
def _run_t2_episode(env: Task2Environment, seed: int, ep_num: int) -> Dict[str, Any]:
"""Run one Task 2 episode; emit [START]/[STEP]/[END]."""
r = env.reset(seed=seed)
obs = r.observation.model_dump()
fn = obs["extra"].get("target_function", "?")
log_start(task="task2_property_discovery", env=ENV_BENCHMARK, model=MODEL_NAME)
messages = [{"role": "system", "content": T2_SYSTEM}]
step_rewards: List[float] = []
grader_score = 0.0
steps_taken = 0
error_msg: Optional[str] = None
try:
for step in range(1, MAX_STEPS_T2 + 1):
messages.append({"role": "user", "content": _t2_user_msg(obs)})
try:
resp = client.chat.completions.create(
model=MODEL_NAME, messages=messages,
max_tokens=400, temperature=0.0,
)
raw = resp.choices[0].message.content.strip() # type: ignore
error_msg = None
except Exception as e:
raw = ""
error_msg = str(e)[:80]
print(f"[DEBUG] T2 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr)
try:
parsed = json.loads(raw)
at = ActionType(parsed["action"])
params = parsed.get("params", {})
except Exception:
at, params = ActionType.GET_FUNCTION_CODE, {}
messages.append({"role": "assistant", "content": raw})
result = env.step(Action(action_type=at, params=params))
obs = result.observation.model_dump()
r_val = result.reward.value
done = result.done
step_rewards.append(r_val)
steps_taken = step
log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg)
if done:
grader_score = round(r_val / 5.0, 3) if r_val > 0 else 0.0
break
time.sleep(0.3)
finally:
success = grader_score >= SUCCESS_SCORE_THRESHOLD
log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards)
return {
"episode": ep_num,
"seed": seed,
"contract": obs["contract_name"],
"function": fn,
"grader_score": grader_score,
"cumulative_reward": obs["cumulative_reward"],
}
# ─────────────────────────────────────────────────────────────────────────────
# Task 3 β€” Rule Checker
# ─────────────────────────────────────────────────────────────────────────────
def _t3_user_msg(obs: Dict[str, Any]) -> str:
extra = obs.get("extra", {})
return (
f"Contract : {obs['contract_name']}\n"
f"Property : {extra.get('property_english', '(none)')}\n"
f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n"
f"Last action : {obs['last_action'] or 'None'}\n"
f"Last result :\n{obs['last_action_result'] or 'Episode just started.'}"
)
def _run_t3_episode(env: Task3Environment, seed: int, ep_num: int) -> Dict[str, Any]:
"""Run one Task 3 episode; emit [START]/[STEP]/[END]."""
r = env.reset(seed=seed)
obs = r.observation.model_dump()
log_start(task="task3_rule_checker", env=ENV_BENCHMARK, model=MODEL_NAME)
messages = [{"role": "system", "content": T3_SYSTEM}]
step_rewards: List[float] = []
grader_score = 0.0
steps_taken = 0
error_msg: Optional[str] = None
try:
for step in range(1, MAX_STEPS_T3 + 1):
messages.append({"role": "user", "content": _t3_user_msg(obs)})
try:
resp = client.chat.completions.create(
model=MODEL_NAME, messages=messages,
max_tokens=200, temperature=0.0,
)
raw = resp.choices[0].message.content.strip() # type: ignore
error_msg = None
except Exception as e:
raw = ""
error_msg = str(e)[:80]
print(f"[DEBUG] T3 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr)
try:
parsed = json.loads(raw)
at = ActionType(parsed["action"])
params = parsed.get("params", {})
except Exception:
at, params = ActionType.LIST_FUNCTIONS, {}
messages.append({"role": "assistant", "content": raw})
result = env.step(Action(action_type=at, params=params))
obs = result.observation.model_dump()
r_val = result.reward.value
done = result.done
step_rewards.append(r_val)
steps_taken = step
log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg)
if done:
v = r_val
grader_score = 1.0 if v >= 4.9 else (0.3 if v >= 1.0 else 0.0)
break
time.sleep(0.3)
finally:
success = grader_score >= SUCCESS_SCORE_THRESHOLD
log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards)
return {
"episode": ep_num,
"seed": seed,
"contract": obs["contract_name"],
"grader_score": grader_score,
"cumulative_reward": obs["cumulative_reward"],
}
# ─────────────────────────────────────────────────────────────────────────────
# Task runners
# ─────────────────────────────────────────────────────────────────────────────
def run_task1(n: int = NUM_EPISODES) -> Dict[str, Any]:
print("\n" + "="*60, flush=True)
print("TASK 1: Targeted Vulnerability Detection", flush=True)
print("="*60, flush=True)
env = Task1Environment()
episodes = [_run_t1_episode(env, SEED_BASE + i, i + 1) for i in range(n)]
avg_s = sum(e["grader_score"] for e in episodes) / n
avg_r = sum(e["cumulative_reward"] for e in episodes) / n
print(f"\n Avg grader score : {avg_s:.3f}", flush=True)
print(f" Avg cum reward : {avg_r:.2f}", flush=True)
return {
"task_id": "task1_vuln_detection", "name": "Targeted Vulnerability Detection",
"status": "active", "num_episodes": n, "episodes": episodes,
"avg_grader_score": avg_s, "avg_cumulative_reward": avg_r,
}
def run_task2(n: int = NUM_EPISODES) -> Dict[str, Any]:
print("\n" + "="*60, flush=True)
print("TASK 2: Property Discovery", flush=True)
print("="*60, flush=True)
env = Task2Environment()
episodes = [_run_t2_episode(env, SEED_BASE + i, i + 1) for i in range(n)]
avg_s = sum(e["grader_score"] for e in episodes) / n
avg_r = sum(e["cumulative_reward"] for e in episodes) / n
print(f"\n Avg grader score : {avg_s:.3f}", flush=True)
print(f" Avg cum reward : {avg_r:.2f}", flush=True)
return {
"task_id": "task2_property_discovery", "name": "Property Discovery",
"status": "active", "num_episodes": n, "episodes": episodes,
"avg_grader_score": avg_s, "avg_cumulative_reward": avg_r,
}
def run_task3(n: int = NUM_EPISODES) -> Dict[str, Any]:
print("\n" + "="*60, flush=True)
print("TASK 3: Rule Checker", flush=True)
print("="*60, flush=True)
env = Task3Environment()
episodes = [_run_t3_episode(env, SEED_BASE + i, i + 1) for i in range(n)]
avg_s = sum(e["grader_score"] for e in episodes) / n
avg_r = sum(e["cumulative_reward"] for e in episodes) / n
print(f"\n Avg grader score : {avg_s:.3f}", flush=True)
print(f" Avg cum reward : {avg_r:.2f}", flush=True)
return {
"task_id": "task3_rule_checker", "name": "Rule Checker",
"status": "active", "num_episodes": n, "episodes": episodes,
"avg_grader_score": avg_s, "avg_cumulative_reward": avg_r,
}
# ─────────────────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────────────────
async def main() -> None:
"""Async entry point (wraps sync env calls; asyncio.run() expected by caller)."""
print("Smart Contract Audit RL Environment β€” Baseline Inference", flush=True)
print(f"Model: {MODEL_NAME} | Base URL: {API_BASE_URL}", flush=True)
t1 = run_task1(NUM_EPISODES)
t2 = run_task2(NUM_EPISODES)
t3 = run_task3(NUM_EPISODES)
results = {
"model": MODEL_NAME,
"base_url": API_BASE_URL,
"tasks": [t1, t2, t3],
}
overall = sum(t["avg_grader_score"] for t in results["tasks"]) / 3
results["overall_avg_score"] = overall
print("\n" + "="*60, flush=True)
print("BASELINE SUMMARY", flush=True)
print("="*60, flush=True)
for t in results["tasks"]:
print(f" βœ… {t['name']:40s}: {t['avg_grader_score']:.3f}", flush=True)
print(f"\n Overall avg grader score: {overall:.3f}", flush=True)
with open("baseline_scores.json", "w") as f:
json.dump(results, f, indent=2)
print("\n Scores written to baseline_scores.json", flush=True)
if __name__ == "__main__":
asyncio.run(main())