codereview / env /environment.py
Avnishjain's picture
Upload 34 files
7b2a69c verified
"""
CodeReviewEnv – main OpenEnv environment.
Interface
---------
env = CodeReviewEnv(task_id="task_1_easy")
obs = env.reset()
result = env.step(action)
state = env.state()
"""
from __future__ import annotations
import time
from typing import Any, Dict, List, Optional
from corpus.snippets import CORPUS
from env.models import (
Action,
CodeSnippet,
EnvironmentState,
Observation,
Reward,
ReviewComment,
StepResult,
TaskDifficulty,
TaskSpec,
)
from graders.graders import GRADERS
# ---------------------------------------------------------------------------
# Task specs
# ---------------------------------------------------------------------------
TASK_SPECS: dict[str, TaskSpec] = {
"task_1_easy": TaskSpec(
task_id="task_1_easy",
title="Bug Detection & Style Review",
difficulty=TaskDifficulty.EASY,
categories=["bug", "style"],
description=(
"Review calculator.py for correctness bugs (division by zero, off-by-one, "
"empty collection crashes) and Python style issues. "
"You do NOT need to check for security or performance."
),
max_steps=5,
passing_threshold=0.55,
),
"task_2_medium": TaskSpec(
task_id="task_2_medium",
title="Security & Performance Audit",
difficulty=TaskDifficulty.MEDIUM,
categories=["security", "performance"],
description=(
"Audit user_service.py for security vulnerabilities (SQL injection, weak "
"hashing, unsafe deserialization) and performance problems (unbounded queries, "
"connection churn). Identify ALL critical security issues – missing one costs "
"heavily."
),
max_steps=7,
passing_threshold=0.60,
),
"task_3_hard": TaskSpec(
task_id="task_3_hard",
title="Comprehensive Code Review",
difficulty=TaskDifficulty.HARD,
categories=["bug", "security", "performance", "style", "documentation"],
description=(
"Perform a full production-grade review of data_pipeline.py covering bugs, "
"security flaws, performance issues, code style, and documentation gaps. "
"You MUST provide a written summary of overall findings. "
"This snippet has intentional issues across all five categories."
),
max_steps=10,
passing_threshold=0.65,
),
}
# ---------------------------------------------------------------------------
# Environment
# ---------------------------------------------------------------------------
INSTRUCTIONS_TEMPLATE = """
You are performing a Python code review.
Task: {title}
Difficulty: {difficulty}
Categories to check: {categories}
{description}
Your job:
1. Read the code snippet carefully.
2. Identify issues matching the specified categories.
3. For each issue, provide: line number (if applicable), category, severity, a clear message, and an optional fix suggestion.
4. When you are satisfied, set `submit=True` in your action.
{summary_note}
The code will be shown in the observation. Previous comments you have already submitted are also included so you can refine or expand them across steps.
""".strip()
class CodeReviewEnv:
"""
OpenEnv-compliant environment for Python code review tasks.
"""
def __init__(self, task_id: str = "task_1_easy"):
if task_id not in TASK_SPECS:
raise ValueError(f"Unknown task_id '{task_id}'. Choose from: {list(TASK_SPECS)}")
self.task_id = task_id
self.spec: TaskSpec = TASK_SPECS[task_id]
self.corpus_entry: dict = CORPUS[task_id]
self.grader = GRADERS[task_id]
self.ground_truth: List[ReviewComment] = self.corpus_entry["issues"]
self.snippet: CodeSnippet = self.corpus_entry["snippet"]
# State
self._step: int = 0
self._done: bool = False
self._comments: List[ReviewComment] = []
self._total_reward: float = 0.0
self._grader_scores: Dict[str, float] = {}
self._last_feedback: Optional[str] = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def reset(self) -> Observation:
"""Reset the environment to initial state and return first observation."""
self._step = 0
self._done = False
self._comments = []
self._total_reward = 0.0
self._grader_scores = {}
self._last_feedback = None
return self._build_observation()
def step(self, action: Action) -> StepResult:
"""
Advance the environment by one step.
Parameters
----------
action : Action
Comments produced this step plus optional submit flag.
Returns
-------
StepResult with (observation, reward, done, info)
"""
if self._done:
raise RuntimeError("Episode is done; call reset() first.")
self._step += 1
# Accumulate comments (deduplicate by message fingerprint)
new_comments = self._deduplicate(action.comments)
self._comments.extend(new_comments)
# Compute incremental reward for new comments
reward, feedback, grader_result = self._compute_reward(action, new_comments)
self._grader_scores = grader_result
self._total_reward = round(self._total_reward + reward.value, 4)
self._last_feedback = feedback
# Determine done
done = action.submit or self._step >= self.spec.max_steps
self._done = done
obs = self._build_observation(feedback=feedback, done=done)
info: Dict[str, Any] = {
"step": self._step,
"new_comments": len(new_comments),
"total_comments": len(self._comments),
"grader": grader_result,
"passed": grader_result.get("score", 0.0) >= self.spec.passing_threshold,
}
return StepResult(observation=obs, reward=reward, done=done, info=info)
def state(self) -> EnvironmentState:
"""Return full serialisable state snapshot."""
return EnvironmentState(
task_id=self.task_id,
step=self._step,
max_steps=self.spec.max_steps,
total_reward=self._total_reward,
comments_so_far=self._comments,
done=self._done,
grader_scores=self._grader_scores,
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _build_observation(
self,
feedback: Optional[str] = None,
done: bool = False,
) -> Observation:
summary_note = (
"\n5. You MUST include a `summary` field with your overall assessment."
if self.task_id == "task_3_hard"
else ""
)
instructions = INSTRUCTIONS_TEMPLATE.format(
title=self.spec.title,
difficulty=self.spec.difficulty.value.upper(),
categories=", ".join(self.spec.categories),
description=self.spec.description,
summary_note=summary_note,
)
return Observation(
task_id=self.task_id,
step=self._step,
snippet=self.snippet,
instructions=instructions,
previous_comments=list(self._comments),
feedback=feedback or self._last_feedback,
done=done,
)
def _compute_reward(
self,
action: Action,
new_comments: List[ReviewComment],
) -> tuple[Reward, str, dict]:
"""
Compute reward with partial progress signals.
Components
----------
* +step_signal : positive if new valid comments were added
* +submit_bonus : grader score applied on final submit
* -loop_penalty : penalty for submitting zero new comments repeatedly
* -over_comment : penalty for > 2× the expected number of comments
"""
# Run grader against ALL accumulated comments
full_action = Action(
comments=self._comments,
summary=action.summary,
submit=action.submit,
)
grader_result = self.grader.grade(full_action, self.ground_truth)
current_score = grader_result["score"]
breakdown: Dict[str, float] = {}
reward_val = 0.0
if action.submit:
# Final reward = full grader score (0–1 mapped to -0.2–1.0)
submit_reward = current_score * 0.8 + (0.2 if current_score >= self.spec.passing_threshold else -0.2)
reward_val += submit_reward
breakdown["submit_reward"] = round(submit_reward, 4)
feedback = (
f"Review submitted. Score: {current_score:.3f} "
f"({'PASSED' if current_score >= self.spec.passing_threshold else 'FAILED'}). "
f"Matched {grader_result['matched_count']}/{grader_result['total_ground_truth']} issues."
)
else:
# Incremental reward: positive if new valid comments detected
if new_comments:
# Small positive signal for adding comments (+0.05 per comment, capped)
step_reward = min(0.05 * len(new_comments), 0.15)
reward_val += step_reward
breakdown["step_reward"] = round(step_reward, 4)
# Progress signal: reward increase in grader score
# We run a "previous" grader check without new comments to get delta
prev_action = Action(
comments=[c for c in self._comments if c not in new_comments],
summary=None,
submit=False,
)
prev_result = self.grader.grade(prev_action, self.ground_truth)
score_delta = current_score - prev_result["score"]
if score_delta > 0:
progress_reward = round(score_delta * 0.5, 4)
reward_val += progress_reward
breakdown["progress_reward"] = progress_reward
else:
# Penalty for empty step
reward_val -= 0.05
breakdown["empty_step_penalty"] = -0.05
# Penalty for too many comments (spam)
expected = grader_result["total_ground_truth"]
if len(self._comments) > expected * 2.5:
spam_penalty = -0.10
reward_val += spam_penalty
breakdown["spam_penalty"] = spam_penalty
feedback = (
f"Step {self._step}: Added {len(new_comments)} comment(s). "
f"Running score: {current_score:.3f}. "
f"Steps remaining: {self.spec.max_steps - self._step}."
)
reward_val = round(max(-1.0, min(1.0, reward_val)), 4)
return Reward(value=reward_val, breakdown=breakdown, reason=feedback), feedback, grader_result
def _deduplicate(self, incoming: List[ReviewComment]) -> List[ReviewComment]:
"""Remove comments whose (line, category, message[:40]) already exist."""
existing_keys = {
(c.line, c.category, c.message[:40]) for c in self._comments
}
new: List[ReviewComment] = []
for c in incoming:
key = (c.line, c.category, c.message[:40])
if key not in existing_keys:
existing_keys.add(key)
new.append(c)
return new