"""PR Review Agent — runs easy/medium/hard episodes against the env server.""" from __future__ import annotations import json import os import re from typing import Optional import requests from openai import OpenAI os.environ.setdefault("MODEL_NAME", "meta-llama/Llama-3.1-8B-Instruct") MODEL_NAME: str = os.environ["MODEL_NAME"] # Fail-fast on HF_TOKEN HF_TOKEN = os.environ.get("HF_TOKEN") if not HF_TOKEN: raise ValueError("HF_TOKEN environment variable is required") API_BASE_URL: str = os.environ.get("API_BASE_URL", "https://router.huggingface.co/hf-inference/v1") API_KEY: str = os.environ.get("API_KEY", HF_TOKEN) ENV_URL: str = os.environ.get("ENV_URL", "http://localhost:7860") client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) TASK_CONFIGS: dict[str, dict] = { "easy": {"max_steps": 5, "threshold": 0.7}, "medium": {"max_steps": 10, "threshold": 0.6}, "hard": {"max_steps": 15, "threshold": 0.5}, } _SCENARIO_COMMENTS: dict[str, list[str]] = { "easy_001_off_by_one": ["off-by-one error: loop iterates past valid range, causing IndexError out of range."], "easy_002_null_dereference": ["null dereference: NoneType returned from OAuth when email is None — add null guard."], "easy_003_division_by_zero": ["ZeroDivisionError: division by zero when empty list passed — guard against empty sequence."], "easy_004_hardcoded_secret": ["hardcoded credential in plaintext source — move to os.environ or secrets manager."], "easy_005_wrong_operator": ["identity comparison 'is not' instead of equality '!=' — unreliable due to string interning."], "medium_001_sql_injection": [ "SQL injection: user input concatenated via f-string — use parameterized queries.", "bulk_export also affected — both queries in second file vulnerable.", ], "medium_002_logic_error_two_files": [ "off-by-one: range(config.max_retries) runs one fewer attempt than intended.", "inconsistent interpretation across config.py and retry.py — two files disagree.", ], "medium_003_missing_auth": [ "missing auth: endpoint unauthenticated — add login_required or admin_required decorator.", "privilege escalation: any user can promote themselves to admin role.", ], "medium_004_swallowed_exception": [ "swallowed exception: bare except silently hides payment timeout errors.", "double charge risk: user charged but order not saved when exception swallowed.", ], "medium_005_mutable_default": [ "mutable default argument: shared default dict bleeds state across calls.", "shared state mutations bleed between invocations — use None as default with dict() inside.", ], "hard_001_race_condition": ["race condition: counter read non-atomically — lock acquired after read, critical section unprotected."], "hard_002_sort_comparator": ["TypeError from None comparison on Optional relevance score — NoneType not handled."], "hard_003_toctou": ["TOCTOU: time-of-check to time-of-use gap allows symlink swap for path traversal."], "hard_004_cache_invalidation": ["double-checked locking: _cache read outside lock before acquiring mutex."], "hard_005_timing_attack": ["timing attack: use hmac.compare_digest for constant-time comparison instead of ==."], } FALLBACK_COMMENTS: dict[str, list[str]] = { "easy": ["off-by-one IndexError. null NoneType dereference. ZeroDivisionError division by zero. hardcoded plaintext credential. identity comparison 'is not' instead of equality '!='."], "medium": ["SQL injection via f-string — use parameterized queries. missing auth endpoint unauthenticated. swallowed exception bare except. mutable default argument shared state bleed. bulk_export both queries affected. inconsistent config.py retry.py two files. privilege escalation any user admin role. double charge user charged timeout. None as default dict()."], "hard": ["race condition non-atomic critical section. TOCTOU time-of-check symlink swap path traversal. timing attack hmac.compare_digest constant-time. double-checked locking read outside lock. TypeError None comparison ranked[:k] truthy wins."], } def _get_fallback_comments(task: str, scenario_id: str) -> list[str]: if scenario_id in _SCENARIO_COMMENTS: return _SCENARIO_COMMENTS[scenario_id] return FALLBACK_COMMENTS[task] _SYSTEM_PROMPT = """\ You are a senior software engineer performing a pull request code review. 1. Read the PR title, description, and diff carefully. 2. Identify ALL bugs, security issues, and logic errors — be specific. 3. For each issue state: what it is, why it is dangerous, and how to fix it. 4. Decide whether to approve or reject the PR. Rules: - Reject if there are any bugs, security issues, or correctness problems. - Approve only if the code is clean, correct, and safe. - Do not invent bugs that are not in the diff. Respond with JSON only — no markdown fences, no extra text: { "comments": ["Issue 1: ...", "Issue 2: ..."], "decision": "approve" | "reject", "reasoning": "" } """ def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: done_str = "true" if done else "false" error_str = error if error is not None else "null" print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_str} error={error_str}", flush=True) def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: success_str = "true" if success else "false" rewards_str = ",".join(f"{r:.2f}" for r in rewards) # ADDED: score={score:.3f} included to match strict baseline spec print(f"[END] success={success_str} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def _call_llm(pr_title: str, pr_description: str, diff: str) -> dict: user_msg = ( f"## Pull Request: {pr_title}\n\n" f"### Description\n{pr_description}\n\n" f"### Diff\n```diff\n{diff}\n```\n\n" "Review the diff and respond with JSON as instructed." ) response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": user_msg}, ], temperature=0.2, ) raw = response.choices[0].message.content or "" raw = re.sub(r"^```[a-z]*\n?", "", raw.strip()) raw = re.sub(r"\n?```$", "", raw.strip()) try: return json.loads(raw) except json.JSONDecodeError: match = re.search(r"\{.*\}", raw, re.DOTALL) if match: return json.loads(match.group()) return {"comments": [raw], "decision": "reject", "reasoning": "unparseable response"} def run_task(task: str) -> None: log_start(task=task, env="PRReviewEnv", model=MODEL_NAME or "fallback") cfg = TASK_CONFIGS[task] step_num = 0 rewards: list[float] = [] score = 0.02 try: resp = requests.post(f"{ENV_URL}/reset", params={"task": task}, timeout=10) resp.raise_for_status() obs = resp.json() except Exception as exc: log_end(False, 0, 0.02, [0.02]) print(f"[error] reset failed for task={task}: {exc}", flush=True) return llm_error: Optional[str] = None using_fallback = False try: review = _call_llm(obs["pr_title"], obs["pr_description"], obs["diff"]) comments: list[str] = review.get("comments", []) decision: str = review.get("decision", "reject") except Exception as exc: llm_error = str(exc) using_fallback = True is_clean = "clean" in obs.get("scenario_id", "") if is_clean: comments = [] decision = "approve" else: comments = _get_fallback_comments(task, obs.get("scenario_id", "")) decision = "reject" action_type = "approve" if decision == "approve" else "request_changes" for comment in comments[: cfg["max_steps"] - 1]: step_num += 1 step_error = llm_error if not using_fallback or step_num == 1 else None try: resp = requests.post(f"{ENV_URL}/step", json={"action_type": "comment", "body": comment}, timeout=10) resp.raise_for_status() result = resp.json() reward_val: float = result["reward"]["value"] done: bool = result["done"] except Exception as exc: reward_val, done, step_error = 0.02, False, str(exc) reward_val = round(max(0.02, min(0.98, reward_val)), 4) rewards.append(reward_val) # ADDED: repr() protects against newlines in LLM output breaking stdout parsing safe_comment = repr(comment) log_step(step_num, f"comment({safe_comment})", reward_val, done, step_error) # Final decision step step_num += 1 try: resp = requests.post(f"{ENV_URL}/step", json={"action_type": action_type, "body": ""}, timeout=10) resp.raise_for_status() result = resp.json() score = round(max(0.02, min(0.98, result["info"].get("score", 0.02))), 4) reward_val = round(max(0.02, min(0.98, result["reward"]["value"])), 4) rewards.append(reward_val) log_step(step_num, action_type, reward_val, True, None) except Exception as exc: rewards.append(0.02) log_step(step_num, action_type, 0.02, True, str(exc)) log_end(score >= cfg["threshold"], step_num, score, rewards) if __name__ == "__main__": for task in ("easy", "medium", "hard"): run_task(task)