Spaces:
Sleeping
Sleeping
| """ | |
| inference.py | |
| ------------ | |
| Baseline inference script β Smart Contract Audit RL Environment. | |
| Implements agents for all three tasks using the OpenAI-compatible client. | |
| Emits mandatory structured stdout in the OpenEnv format. | |
| MANDATORY ENV VARS: | |
| API_BASE_URL LLM API endpoint (default: https://api.openai.com/v1) | |
| MODEL_NAME Model identifier (default: gpt-4o-mini) | |
| HF_TOKEN API key / HF token | |
| MANDATORY STDOUT FORMAT (per episode): | |
| [START] task=<id> env=smart-contract-audit model=<model> | |
| [STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<str|null> | |
| [END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...> | |
| Usage: | |
| python inference.py | |
| Output: | |
| Structured stdout per episode, plus baseline_scores.json summary. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from typing import Any, Dict, List, Optional | |
| from openai import OpenAI | |
| from tasks.task1 import Task1Environment | |
| from tasks.task2 import Task2Environment | |
| from tasks.task3 import Task3Environment | |
| from env.schemas import Action, ActionType | |
| from utils import T1_SYSTEM, T2_SYSTEM, T3_SYSTEM | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Configuration | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| if not HF_TOKEN: | |
| print("[WARN] HF_TOKEN not set β API calls may fail.", file=sys.stderr) | |
| # Benchmark / environment identifier (constant for this env) | |
| ENV_BENCHMARK = "smart-contract-audit" | |
| # Episodes per task | |
| NUM_EPISODES = 3 | |
| SEED_BASE = 42 | |
| # Max steps per task | |
| MAX_STEPS_T1 = 15 | |
| MAX_STEPS_T2 = 10 | |
| MAX_STEPS_T3 = 12 | |
| # A grader_score >= this is considered a "success" for the [END] line | |
| SUCCESS_SCORE_THRESHOLD = 0.5 | |
| client = OpenAI(api_key=HF_TOKEN, base_url=API_BASE_URL) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Mandatory stdout helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_start(task: str, env: str, model: str) -> None: | |
| """Emit the [START] line β one per episode.""" | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step( | |
| step: int, | |
| action: str, | |
| reward: float, | |
| done: bool, | |
| error: Optional[str] = None, | |
| ) -> None: | |
| """Emit a [STEP] line β one per env.step() call.""" | |
| error_val = error if error else "null" | |
| print( | |
| f"[STEP] step={step} action={action} " | |
| f"reward={reward:.2f} done={str(done).lower()} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end( | |
| success: bool, | |
| steps: int, | |
| score: float, | |
| rewards: List[float], | |
| ) -> None: | |
| """Emit the [END] line β one per episode, always emitted.""" | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} " | |
| f"score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task 1 β Targeted Vulnerability Detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _t1_user_msg(obs: Dict[str, Any]) -> str: | |
| return ( | |
| f"Contract: {obs['contract_name']}\n" | |
| f"Description: {obs['contract_description']}\n" | |
| f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n" | |
| f"Last action : {obs['last_action'] or 'None'}\n" | |
| f"Last result : {obs['last_action_result'] or 'Episode just started.'}" | |
| ) | |
| def _run_t1_episode(env: Task1Environment, seed: int, ep_num: int) -> Dict[str, Any]: | |
| """Run one Task 1 episode; emit [START]/[STEP]/[END].""" | |
| r = env.reset(seed=seed) | |
| obs = r.observation.model_dump() | |
| log_start(task="task1_vuln_detection", env=ENV_BENCHMARK, model=MODEL_NAME) | |
| messages = [{"role": "system", "content": T1_SYSTEM}] | |
| step_rewards: List[float] = [] | |
| grader_score = 0.0 | |
| steps_taken = 0 | |
| error_msg: Optional[str] = None | |
| try: | |
| for step in range(1, MAX_STEPS_T1 + 1): | |
| messages.append({"role": "user", "content": _t1_user_msg(obs)}) | |
| try: | |
| resp = client.chat.completions.create( | |
| model=MODEL_NAME, messages=messages, | |
| max_tokens=200, temperature=0.0, | |
| ) | |
| raw = resp.choices[0].message.content.strip() # type: ignore | |
| error_msg = None | |
| except Exception as e: | |
| raw = "" | |
| error_msg = str(e)[:80] | |
| print(f"[DEBUG] T1 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr) | |
| try: | |
| parsed = json.loads(raw) | |
| at = ActionType(parsed["action"]) | |
| params = parsed.get("params", {}) | |
| except Exception: | |
| at, params = ActionType.LIST_FUNCTIONS, {} | |
| messages.append({"role": "assistant", "content": raw}) | |
| result = env.step(Action(action_type=at, params=params)) | |
| obs = result.observation.model_dump() | |
| r_val = result.reward.value | |
| done = result.done | |
| step_rewards.append(r_val) | |
| steps_taken = step | |
| log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg) | |
| if done: | |
| v = r_val | |
| grader_score = 1.0 if v >= 4.9 else (0.5 if v >= 0.9 else 0.0) | |
| break | |
| time.sleep(0.3) | |
| finally: | |
| success = grader_score >= SUCCESS_SCORE_THRESHOLD | |
| log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards) | |
| return { | |
| "episode": ep_num, | |
| "seed": seed, | |
| "contract": obs["contract_name"], | |
| "grader_score": grader_score, | |
| "cumulative_reward": obs["cumulative_reward"], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task 2 β Property Discovery | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _t2_user_msg(obs: Dict[str, Any]) -> str: | |
| extra = obs.get("extra", {}) | |
| return ( | |
| f"Contract : {obs['contract_name']}\n" | |
| f"Function : {extra.get('target_function', '?')} " | |
| f"({extra.get('target_signature', '')})\n" | |
| f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n" | |
| f"Last action : {obs['last_action'] or 'None'}\n" | |
| f"Last result :\n{obs['last_action_result'] or 'Episode just started.'}" | |
| ) | |
| def _run_t2_episode(env: Task2Environment, seed: int, ep_num: int) -> Dict[str, Any]: | |
| """Run one Task 2 episode; emit [START]/[STEP]/[END].""" | |
| r = env.reset(seed=seed) | |
| obs = r.observation.model_dump() | |
| fn = obs["extra"].get("target_function", "?") | |
| log_start(task="task2_property_discovery", env=ENV_BENCHMARK, model=MODEL_NAME) | |
| messages = [{"role": "system", "content": T2_SYSTEM}] | |
| step_rewards: List[float] = [] | |
| grader_score = 0.0 | |
| steps_taken = 0 | |
| error_msg: Optional[str] = None | |
| try: | |
| for step in range(1, MAX_STEPS_T2 + 1): | |
| messages.append({"role": "user", "content": _t2_user_msg(obs)}) | |
| try: | |
| resp = client.chat.completions.create( | |
| model=MODEL_NAME, messages=messages, | |
| max_tokens=400, temperature=0.0, | |
| ) | |
| raw = resp.choices[0].message.content.strip() # type: ignore | |
| error_msg = None | |
| except Exception as e: | |
| raw = "" | |
| error_msg = str(e)[:80] | |
| print(f"[DEBUG] T2 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr) | |
| try: | |
| parsed = json.loads(raw) | |
| at = ActionType(parsed["action"]) | |
| params = parsed.get("params", {}) | |
| except Exception: | |
| at, params = ActionType.GET_FUNCTION_CODE, {} | |
| messages.append({"role": "assistant", "content": raw}) | |
| result = env.step(Action(action_type=at, params=params)) | |
| obs = result.observation.model_dump() | |
| r_val = result.reward.value | |
| done = result.done | |
| step_rewards.append(r_val) | |
| steps_taken = step | |
| log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg) | |
| if done: | |
| grader_score = round(r_val / 5.0, 3) if r_val > 0 else 0.0 | |
| break | |
| time.sleep(0.3) | |
| finally: | |
| success = grader_score >= SUCCESS_SCORE_THRESHOLD | |
| log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards) | |
| return { | |
| "episode": ep_num, | |
| "seed": seed, | |
| "contract": obs["contract_name"], | |
| "function": fn, | |
| "grader_score": grader_score, | |
| "cumulative_reward": obs["cumulative_reward"], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task 3 β Rule Checker | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _t3_user_msg(obs: Dict[str, Any]) -> str: | |
| extra = obs.get("extra", {}) | |
| return ( | |
| f"Contract : {obs['contract_name']}\n" | |
| f"Property : {extra.get('property_english', '(none)')}\n" | |
| f"Step: {obs['step_count']} | Reward so far: {obs['cumulative_reward']:.2f}\n\n" | |
| f"Last action : {obs['last_action'] or 'None'}\n" | |
| f"Last result :\n{obs['last_action_result'] or 'Episode just started.'}" | |
| ) | |
| def _run_t3_episode(env: Task3Environment, seed: int, ep_num: int) -> Dict[str, Any]: | |
| """Run one Task 3 episode; emit [START]/[STEP]/[END].""" | |
| r = env.reset(seed=seed) | |
| obs = r.observation.model_dump() | |
| log_start(task="task3_rule_checker", env=ENV_BENCHMARK, model=MODEL_NAME) | |
| messages = [{"role": "system", "content": T3_SYSTEM}] | |
| step_rewards: List[float] = [] | |
| grader_score = 0.0 | |
| steps_taken = 0 | |
| error_msg: Optional[str] = None | |
| try: | |
| for step in range(1, MAX_STEPS_T3 + 1): | |
| messages.append({"role": "user", "content": _t3_user_msg(obs)}) | |
| try: | |
| resp = client.chat.completions.create( | |
| model=MODEL_NAME, messages=messages, | |
| max_tokens=200, temperature=0.0, | |
| ) | |
| raw = resp.choices[0].message.content.strip() # type: ignore | |
| error_msg = None | |
| except Exception as e: | |
| raw = "" | |
| error_msg = str(e)[:80] | |
| print(f"[DEBUG] T3 LLM error ep={ep_num} step={step}: {e}", file=sys.stderr) | |
| try: | |
| parsed = json.loads(raw) | |
| at = ActionType(parsed["action"]) | |
| params = parsed.get("params", {}) | |
| except Exception: | |
| at, params = ActionType.LIST_FUNCTIONS, {} | |
| messages.append({"role": "assistant", "content": raw}) | |
| result = env.step(Action(action_type=at, params=params)) | |
| obs = result.observation.model_dump() | |
| r_val = result.reward.value | |
| done = result.done | |
| step_rewards.append(r_val) | |
| steps_taken = step | |
| log_step(step=step, action=at.value, reward=r_val, done=done, error=error_msg) | |
| if done: | |
| v = r_val | |
| grader_score = 1.0 if v >= 4.9 else (0.3 if v >= 1.0 else 0.0) | |
| break | |
| time.sleep(0.3) | |
| finally: | |
| success = grader_score >= SUCCESS_SCORE_THRESHOLD | |
| log_end(success=success, steps=steps_taken, score=grader_score, rewards=step_rewards) | |
| return { | |
| "episode": ep_num, | |
| "seed": seed, | |
| "contract": obs["contract_name"], | |
| "grader_score": grader_score, | |
| "cumulative_reward": obs["cumulative_reward"], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task runners | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_task1(n: int = NUM_EPISODES) -> Dict[str, Any]: | |
| print("\n" + "="*60, flush=True) | |
| print("TASK 1: Targeted Vulnerability Detection", flush=True) | |
| print("="*60, flush=True) | |
| env = Task1Environment() | |
| episodes = [_run_t1_episode(env, SEED_BASE + i, i + 1) for i in range(n)] | |
| avg_s = sum(e["grader_score"] for e in episodes) / n | |
| avg_r = sum(e["cumulative_reward"] for e in episodes) / n | |
| print(f"\n Avg grader score : {avg_s:.3f}", flush=True) | |
| print(f" Avg cum reward : {avg_r:.2f}", flush=True) | |
| return { | |
| "task_id": "task1_vuln_detection", "name": "Targeted Vulnerability Detection", | |
| "status": "active", "num_episodes": n, "episodes": episodes, | |
| "avg_grader_score": avg_s, "avg_cumulative_reward": avg_r, | |
| } | |
| def run_task2(n: int = NUM_EPISODES) -> Dict[str, Any]: | |
| print("\n" + "="*60, flush=True) | |
| print("TASK 2: Property Discovery", flush=True) | |
| print("="*60, flush=True) | |
| env = Task2Environment() | |
| episodes = [_run_t2_episode(env, SEED_BASE + i, i + 1) for i in range(n)] | |
| avg_s = sum(e["grader_score"] for e in episodes) / n | |
| avg_r = sum(e["cumulative_reward"] for e in episodes) / n | |
| print(f"\n Avg grader score : {avg_s:.3f}", flush=True) | |
| print(f" Avg cum reward : {avg_r:.2f}", flush=True) | |
| return { | |
| "task_id": "task2_property_discovery", "name": "Property Discovery", | |
| "status": "active", "num_episodes": n, "episodes": episodes, | |
| "avg_grader_score": avg_s, "avg_cumulative_reward": avg_r, | |
| } | |
| def run_task3(n: int = NUM_EPISODES) -> Dict[str, Any]: | |
| print("\n" + "="*60, flush=True) | |
| print("TASK 3: Rule Checker", flush=True) | |
| print("="*60, flush=True) | |
| env = Task3Environment() | |
| episodes = [_run_t3_episode(env, SEED_BASE + i, i + 1) for i in range(n)] | |
| avg_s = sum(e["grader_score"] for e in episodes) / n | |
| avg_r = sum(e["cumulative_reward"] for e in episodes) / n | |
| print(f"\n Avg grader score : {avg_s:.3f}", flush=True) | |
| print(f" Avg cum reward : {avg_r:.2f}", flush=True) | |
| return { | |
| "task_id": "task3_rule_checker", "name": "Rule Checker", | |
| "status": "active", "num_episodes": n, "episodes": episodes, | |
| "avg_grader_score": avg_s, "avg_cumulative_reward": avg_r, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def main() -> None: | |
| """Async entry point (wraps sync env calls; asyncio.run() expected by caller).""" | |
| print("Smart Contract Audit RL Environment β Baseline Inference", flush=True) | |
| print(f"Model: {MODEL_NAME} | Base URL: {API_BASE_URL}", flush=True) | |
| t1 = run_task1(NUM_EPISODES) | |
| t2 = run_task2(NUM_EPISODES) | |
| t3 = run_task3(NUM_EPISODES) | |
| results = { | |
| "model": MODEL_NAME, | |
| "base_url": API_BASE_URL, | |
| "tasks": [t1, t2, t3], | |
| } | |
| overall = sum(t["avg_grader_score"] for t in results["tasks"]) / 3 | |
| results["overall_avg_score"] = overall | |
| print("\n" + "="*60, flush=True) | |
| print("BASELINE SUMMARY", flush=True) | |
| print("="*60, flush=True) | |
| for t in results["tasks"]: | |
| print(f" β {t['name']:40s}: {t['avg_grader_score']:.3f}", flush=True) | |
| print(f"\n Overall avg grader score: {overall:.3f}", flush=True) | |
| with open("baseline_scores.json", "w") as f: | |
| json.dump(results, f, indent=2) | |
| print("\n Scores written to baseline_scores.json", flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |