""" inference.py ------------ Baseline inference script — Smart Contract Audit RL Environment. Implements agents for all three tasks using the OpenAI-compatible client. Emits mandatory structured stdout in the OpenEnv format. MANDATORY ENV VARS: API_BASE_URL LLM API endpoint (default: https://api.openai.com/v1) MODEL_NAME Model identifier (default: gpt-4o-mini) HF_TOKEN API key / HF token MANDATORY STDOUT FORMAT (per episode): [START] task= env=smart-contract-audit model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score=<0.000> rewards= Usage: python inference.py Output: Structured stdout per episode, plus baseline_scores.json summary. """ import asyncio import json import os import sys import time from typing import Any, Dict, List, Optional from openai import OpenAI from tasks.task1 import Task1Environment from tasks.task2 import Task2Environment from tasks.task3 import Task3Environment from env.schemas import Action, ActionType from utils import T1_SYSTEM, T2_SYSTEM, T3_SYSTEM # ───────────────────────────────────────────────────────────────────────────── # Configuration # ───────────────────────────────────────────────────────────────────────────── API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") HF_TOKEN = os.getenv("HF_TOKEN", "") if not HF_TOKEN: print("[WARN] HF_TOKEN not set — API calls may fail.", file=sys.stderr) # Benchmark / environment identifier (constant for this env) ENV_BENCHMARK = "smart-contract-audit" # Episodes per task NUM_EPISODES = 3 SEED_BASE = 42 # Max steps per task MAX_STEPS_T1 = 15 MAX_STEPS_T2 = 10 MAX_STEPS_T3 = 12 # A grader_score >= this is considered a "success" for the [END] line SUCCESS_SCORE_THRESHOLD = 0.5 client = OpenAI(api_key=HF_TOKEN, base_url=API_BASE_URL) # ───────────────────────────────────────────────────────────────────────────── # Mandatory stdout helpers # ───────────────────────────────────────────────────────────────────────────── def log_start(task: str, env: str, model: str) -> None: """Emit the [START] line — one per episode.""" print(f"[START] task={task} env={env} model={model}", flush=True) def log_step( step: int, action: str, reward: float, done: bool, error: Optional[str] = None, ) -> None: """Emit a [STEP] line — one per env.step() call.""" error_val = error if error else "null" print( f"[STEP] step={step} action={action} " f"reward={reward:.2f} done={str(done).lower()} error={error_val}", flush=True, ) def log_end( success: bool, steps: int, score: float, rewards: List[float], ) -> None: """Emit the [END] line — one per episode, always emitted.""" rewards_str = ",".join(f"{r:.2f}" for r in rewards) print( f"[END] success={str(success).lower()} steps={steps} " f"score={score:.3f} rewards={rewards_str}", flush=True, ) # ───────────────────────────────────────────────────────────────────────────── # Task 1 — Targeted Vulnerability Detection # ───────────────────────────────────────────────────────────────────────────── def _t1_user_msg(obs: Dict[str, Any]) -> str: return ( f"Contract: {obs['contract_name']}\n" f"Description: {obs['contract_description']}\n" f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n" f"Last action : {obs['last_action'] or 'None'}\n" f"Last result : {obs['last_action_result'] or 'Episode just started.'}" ) def _run_t1_episode(env: Task1Environment, seed: int, ep_num: int) -> Dict[str, Any]: """Run one Task 1 episode; emit [START]/[STEP]/[END].""" r = env.reset(seed=seed) obs = r.observation.model_dump() log_start(task="task1_vuln_detection", env=ENV_BENCHMARK, model=MODEL_NAME) messages = [{"role": "system", "content": T1_SYSTEM}] step_rewards: List[float] = [] grader_score = 0.0 steps_taken = 0 error_msg: Optional[str] = None try: for step in range(1, MAX_STEPS_T1 + 1): messages.append({"role": "user", "content": _t1_user_msg(obs)}) try: resp = client.chat.completions.create( model=MODEL_NAME, messages=messages, max_tokens=200, temperature=0.0, ) raw = resp.choices[0].message.content.strip() # type: ignore error_msg = None except Exception as e: raw = "" error_msg = str(e)[:80] print(f"[DEBUG] T1 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr) try: parsed = json.loads(raw) at = ActionType(parsed["action"]) params = parsed.get("params", {}) except Exception: at, params = ActionType.LIST_FUNCTIONS, {} messages.append({"role": "assistant", "content": raw}) result = env.step(Action(action_type=at, params=params)) obs = result.observation.model_dump() r_val = result.reward.value done = result.done step_rewards.append(r_val) steps_taken = step log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg) if done: v = r_val grader_score = 1.0 if v >= 4.9 else (0.5 if v >= 0.9 else 0.0) break time.sleep(0.3) finally: success = grader_score >= SUCCESS_SCORE_THRESHOLD log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards) return { "episode": ep_num, "seed": seed, "contract": obs["contract_name"], "grader_score": grader_score, "cumulative_reward": obs["cumulative_reward"], } # ───────────────────────────────────────────────────────────────────────────── # Task 2 — Property Discovery # ───────────────────────────────────────────────────────────────────────────── def _t2_user_msg(obs: Dict[str, Any]) -> str: extra = obs.get("extra", {}) return ( f"Contract : {obs['contract_name']}\n" f"Function : {extra.get('target_function', '?')} " f"({extra.get('target_signature', '')})\n" f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n" f"Last action : {obs['last_action'] or 'None'}\n" f"Last result :\n{obs['last_action_result'] or 'Episode just started.'}" ) def _run_t2_episode(env: Task2Environment, seed: int, ep_num: int) -> Dict[str, Any]: """Run one Task 2 episode; emit [START]/[STEP]/[END].""" r = env.reset(seed=seed) obs = r.observation.model_dump() fn = obs["extra"].get("target_function", "?") log_start(task="task2_property_discovery", env=ENV_BENCHMARK, model=MODEL_NAME) messages = [{"role": "system", "content": T2_SYSTEM}] step_rewards: List[float] = [] grader_score = 0.0 steps_taken = 0 error_msg: Optional[str] = None try: for step in range(1, MAX_STEPS_T2 + 1): messages.append({"role": "user", "content": _t2_user_msg(obs)}) try: resp = client.chat.completions.create( model=MODEL_NAME, messages=messages, max_tokens=400, temperature=0.0, ) raw = resp.choices[0].message.content.strip() # type: ignore error_msg = None except Exception as e: raw = "" error_msg = str(e)[:80] print(f"[DEBUG] T2 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr) try: parsed = json.loads(raw) at = ActionType(parsed["action"]) params = parsed.get("params", {}) except Exception: at, params = ActionType.GET_FUNCTION_CODE, {} messages.append({"role": "assistant", "content": raw}) result = env.step(Action(action_type=at, params=params)) obs = result.observation.model_dump() r_val = result.reward.value done = result.done step_rewards.append(r_val) steps_taken = step log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg) if done: grader_score = round(r_val / 5.0, 3) if r_val > 0 else 0.0 break time.sleep(0.3) finally: success = grader_score >= SUCCESS_SCORE_THRESHOLD log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards) return { "episode": ep_num, "seed": seed, "contract": obs["contract_name"], "function": fn, "grader_score": grader_score, "cumulative_reward": obs["cumulative_reward"], } # ───────────────────────────────────────────────────────────────────────────── # Task 3 — Rule Checker # ───────────────────────────────────────────────────────────────────────────── def _t3_user_msg(obs: Dict[str, Any]) -> str: extra = obs.get("extra", {}) return ( f"Contract : {obs['contract_name']}\n" f"Property : {extra.get('property_english', '(none)')}\n" f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n" f"Last action : {obs['last_action'] or 'None'}\n" f"Last result :\n{obs['last_action_result'] or 'Episode just started.'}" ) def _run_t3_episode(env: Task3Environment, seed: int, ep_num: int) -> Dict[str, Any]: """Run one Task 3 episode; emit [START]/[STEP]/[END].""" r = env.reset(seed=seed) obs = r.observation.model_dump() log_start(task="task3_rule_checker", env=ENV_BENCHMARK, model=MODEL_NAME) messages = [{"role": "system", "content": T3_SYSTEM}] step_rewards: List[float] = [] grader_score = 0.0 steps_taken = 0 error_msg: Optional[str] = None try: for step in range(1, MAX_STEPS_T3 + 1): messages.append({"role": "user", "content": _t3_user_msg(obs)}) try: resp = client.chat.completions.create( model=MODEL_NAME, messages=messages, max_tokens=200, temperature=0.0, ) raw = resp.choices[0].message.content.strip() # type: ignore error_msg = None except Exception as e: raw = "" error_msg = str(e)[:80] print(f"[DEBUG] T3 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr) try: parsed = json.loads(raw) at = ActionType(parsed["action"]) params = parsed.get("params", {}) except Exception: at, params = ActionType.LIST_FUNCTIONS, {} messages.append({"role": "assistant", "content": raw}) result = env.step(Action(action_type=at, params=params)) obs = result.observation.model_dump() r_val = result.reward.value done = result.done step_rewards.append(r_val) steps_taken = step log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg) if done: v = r_val grader_score = 1.0 if v >= 4.9 else (0.3 if v >= 1.0 else 0.0) break time.sleep(0.3) finally: success = grader_score >= SUCCESS_SCORE_THRESHOLD log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards) return { "episode": ep_num, "seed": seed, "contract": obs["contract_name"], "grader_score": grader_score, "cumulative_reward": obs["cumulative_reward"], } # ───────────────────────────────────────────────────────────────────────────── # Task runners # ───────────────────────────────────────────────────────────────────────────── def run_task1(n: int = NUM_EPISODES) -> Dict[str, Any]: print("\n" + "="*60, flush=True) print("TASK 1: Targeted Vulnerability Detection", flush=True) print("="*60, flush=True) env = Task1Environment() episodes = [_run_t1_episode(env, SEED_BASE + i, i + 1) for i in range(n)] avg_s = sum(e["grader_score"] for e in episodes) / n avg_r = sum(e["cumulative_reward"] for e in episodes) / n print(f"\n Avg grader score : {avg_s:.3f}", flush=True) print(f" Avg cum reward : {avg_r:.2f}", flush=True) return { "task_id": "task1_vuln_detection", "name": "Targeted Vulnerability Detection", "status": "active", "num_episodes": n, "episodes": episodes, "avg_grader_score": avg_s, "avg_cumulative_reward": avg_r, } def run_task2(n: int = NUM_EPISODES) -> Dict[str, Any]: print("\n" + "="*60, flush=True) print("TASK 2: Property Discovery", flush=True) print("="*60, flush=True) env = Task2Environment() episodes = [_run_t2_episode(env, SEED_BASE + i, i + 1) for i in range(n)] avg_s = sum(e["grader_score"] for e in episodes) / n avg_r = sum(e["cumulative_reward"] for e in episodes) / n print(f"\n Avg grader score : {avg_s:.3f}", flush=True) print(f" Avg cum reward : {avg_r:.2f}", flush=True) return { "task_id": "task2_property_discovery", "name": "Property Discovery", "status": "active", "num_episodes": n, "episodes": episodes, "avg_grader_score": avg_s, "avg_cumulative_reward": avg_r, } def run_task3(n: int = NUM_EPISODES) -> Dict[str, Any]: print("\n" + "="*60, flush=True) print("TASK 3: Rule Checker", flush=True) print("="*60, flush=True) env = Task3Environment() episodes = [_run_t3_episode(env, SEED_BASE + i, i + 1) for i in range(n)] avg_s = sum(e["grader_score"] for e in episodes) / n avg_r = sum(e["cumulative_reward"] for e in episodes) / n print(f"\n Avg grader score : {avg_s:.3f}", flush=True) print(f" Avg cum reward : {avg_r:.2f}", flush=True) return { "task_id": "task3_rule_checker", "name": "Rule Checker", "status": "active", "num_episodes": n, "episodes": episodes, "avg_grader_score": avg_s, "avg_cumulative_reward": avg_r, } # ───────────────────────────────────────────────────────────────────────────── # Main # ───────────────────────────────────────────────────────────────────────────── async def main() -> None: """Async entry point (wraps sync env calls; asyncio.run() expected by caller).""" print("Smart Contract Audit RL Environment — Baseline Inference", flush=True) print(f"Model: {MODEL_NAME} | Base URL: {API_BASE_URL}", flush=True) t1 = run_task1(NUM_EPISODES) t2 = run_task2(NUM_EPISODES) t3 = run_task3(NUM_EPISODES) results = { "model": MODEL_NAME, "base_url": API_BASE_URL, "tasks": [t1, t2, t3], } overall = sum(t["avg_grader_score"] for t in results["tasks"]) / 3 results["overall_avg_score"] = overall print("\n" + "="*60, flush=True) print("BASELINE SUMMARY", flush=True) print("="*60, flush=True) for t in results["tasks"]: print(f" ✅ {t['name']:40s}: {t['avg_grader_score']:.3f}", flush=True) print(f"\n Overall avg grader score: {overall:.3f}", flush=True) with open("baseline_scores.json", "w") as f: json.dump(results, f, indent=2) print("\n Scores written to baseline_scores.json", flush=True) if __name__ == "__main__": asyncio.run(main())