""" CodeReviewEnv – main OpenEnv environment. Interface --------- env = CodeReviewEnv(task_id="task_1_easy") obs = env.reset() result = env.step(action) state = env.state() """ from __future__ import annotations import time from typing import Any, Dict, List, Optional from corpus.snippets import CORPUS from env.models import ( Action, CodeSnippet, EnvironmentState, Observation, Reward, ReviewComment, StepResult, TaskDifficulty, TaskSpec, ) from graders.graders import GRADERS # --------------------------------------------------------------------------- # Task specs # --------------------------------------------------------------------------- TASK_SPECS: dict[str, TaskSpec] = { "task_1_easy": TaskSpec( task_id="task_1_easy", title="Bug Detection & Style Review", difficulty=TaskDifficulty.EASY, categories=["bug", "style"], description=( "Review calculator.py for correctness bugs (division by zero, off-by-one, " "empty collection crashes) and Python style issues. " "You do NOT need to check for security or performance." ), max_steps=5, passing_threshold=0.55, ), "task_2_medium": TaskSpec( task_id="task_2_medium", title="Security & Performance Audit", difficulty=TaskDifficulty.MEDIUM, categories=["security", "performance"], description=( "Audit user_service.py for security vulnerabilities (SQL injection, weak " "hashing, unsafe deserialization) and performance problems (unbounded queries, " "connection churn). Identify ALL critical security issues – missing one costs " "heavily." ), max_steps=7, passing_threshold=0.60, ), "task_3_hard": TaskSpec( task_id="task_3_hard", title="Comprehensive Code Review", difficulty=TaskDifficulty.HARD, categories=["bug", "security", "performance", "style", "documentation"], description=( "Perform a full production-grade review of data_pipeline.py covering bugs, " "security flaws, performance issues, code style, and documentation gaps. " "You MUST provide a written summary of overall findings. " "This snippet has intentional issues across all five categories." ), max_steps=10, passing_threshold=0.65, ), } # --------------------------------------------------------------------------- # Environment # --------------------------------------------------------------------------- INSTRUCTIONS_TEMPLATE = """ You are performing a Python code review. Task: {title} Difficulty: {difficulty} Categories to check: {categories} {description} Your job: 1. Read the code snippet carefully. 2. Identify issues matching the specified categories. 3. For each issue, provide: line number (if applicable), category, severity, a clear message, and an optional fix suggestion. 4. When you are satisfied, set `submit=True` in your action. {summary_note} The code will be shown in the observation. Previous comments you have already submitted are also included so you can refine or expand them across steps. """.strip() class CodeReviewEnv: """ OpenEnv-compliant environment for Python code review tasks. """ def __init__(self, task_id: str = "task_1_easy"): if task_id not in TASK_SPECS: raise ValueError(f"Unknown task_id '{task_id}'. Choose from: {list(TASK_SPECS)}") self.task_id = task_id self.spec: TaskSpec = TASK_SPECS[task_id] self.corpus_entry: dict = CORPUS[task_id] self.grader = GRADERS[task_id] self.ground_truth: List[ReviewComment] = self.corpus_entry["issues"] self.snippet: CodeSnippet = self.corpus_entry["snippet"] # State self._step: int = 0 self._done: bool = False self._comments: List[ReviewComment] = [] self._total_reward: float = 0.0 self._grader_scores: Dict[str, float] = {} self._last_feedback: Optional[str] = None # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def reset(self) -> Observation: """Reset the environment to initial state and return first observation.""" self._step = 0 self._done = False self._comments = [] self._total_reward = 0.0 self._grader_scores = {} self._last_feedback = None return self._build_observation() def step(self, action: Action) -> StepResult: """ Advance the environment by one step. Parameters ---------- action : Action Comments produced this step plus optional submit flag. Returns ------- StepResult with (observation, reward, done, info) """ if self._done: raise RuntimeError("Episode is done; call reset() first.") self._step += 1 # Accumulate comments (deduplicate by message fingerprint) new_comments = self._deduplicate(action.comments) self._comments.extend(new_comments) # Compute incremental reward for new comments reward, feedback, grader_result = self._compute_reward(action, new_comments) self._grader_scores = grader_result self._total_reward = round(self._total_reward + reward.value, 4) self._last_feedback = feedback # Determine done done = action.submit or self._step >= self.spec.max_steps self._done = done obs = self._build_observation(feedback=feedback, done=done) info: Dict[str, Any] = { "step": self._step, "new_comments": len(new_comments), "total_comments": len(self._comments), "grader": grader_result, "passed": grader_result.get("score", 0.0) >= self.spec.passing_threshold, } return StepResult(observation=obs, reward=reward, done=done, info=info) def state(self) -> EnvironmentState: """Return full serialisable state snapshot.""" return EnvironmentState( task_id=self.task_id, step=self._step, max_steps=self.spec.max_steps, total_reward=self._total_reward, comments_so_far=self._comments, done=self._done, grader_scores=self._grader_scores, ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _build_observation( self, feedback: Optional[str] = None, done: bool = False, ) -> Observation: summary_note = ( "\n5. You MUST include a `summary` field with your overall assessment." if self.task_id == "task_3_hard" else "" ) instructions = INSTRUCTIONS_TEMPLATE.format( title=self.spec.title, difficulty=self.spec.difficulty.value.upper(), categories=", ".join(self.spec.categories), description=self.spec.description, summary_note=summary_note, ) return Observation( task_id=self.task_id, step=self._step, snippet=self.snippet, instructions=instructions, previous_comments=list(self._comments), feedback=feedback or self._last_feedback, done=done, ) def _compute_reward( self, action: Action, new_comments: List[ReviewComment], ) -> tuple[Reward, str, dict]: """ Compute reward with partial progress signals. Components ---------- * +step_signal : positive if new valid comments were added * +submit_bonus : grader score applied on final submit * -loop_penalty : penalty for submitting zero new comments repeatedly * -over_comment : penalty for > 2× the expected number of comments """ # Run grader against ALL accumulated comments full_action = Action( comments=self._comments, summary=action.summary, submit=action.submit, ) grader_result = self.grader.grade(full_action, self.ground_truth) current_score = grader_result["score"] breakdown: Dict[str, float] = {} reward_val = 0.0 if action.submit: # Final reward = full grader score (0–1 mapped to -0.2–1.0) submit_reward = current_score * 0.8 + (0.2 if current_score >= self.spec.passing_threshold else -0.2) reward_val += submit_reward breakdown["submit_reward"] = round(submit_reward, 4) feedback = ( f"Review submitted. Score: {current_score:.3f} " f"({'PASSED' if current_score >= self.spec.passing_threshold else 'FAILED'}). " f"Matched {grader_result['matched_count']}/{grader_result['total_ground_truth']} issues." ) else: # Incremental reward: positive if new valid comments detected if new_comments: # Small positive signal for adding comments (+0.05 per comment, capped) step_reward = min(0.05 * len(new_comments), 0.15) reward_val += step_reward breakdown["step_reward"] = round(step_reward, 4) # Progress signal: reward increase in grader score # We run a "previous" grader check without new comments to get delta prev_action = Action( comments=[c for c in self._comments if c not in new_comments], summary=None, submit=False, ) prev_result = self.grader.grade(prev_action, self.ground_truth) score_delta = current_score - prev_result["score"] if score_delta > 0: progress_reward = round(score_delta * 0.5, 4) reward_val += progress_reward breakdown["progress_reward"] = progress_reward else: # Penalty for empty step reward_val -= 0.05 breakdown["empty_step_penalty"] = -0.05 # Penalty for too many comments (spam) expected = grader_result["total_ground_truth"] if len(self._comments) > expected * 2.5: spam_penalty = -0.10 reward_val += spam_penalty breakdown["spam_penalty"] = spam_penalty feedback = ( f"Step {self._step}: Added {len(new_comments)} comment(s). " f"Running score: {current_score:.3f}. " f"Steps remaining: {self.spec.max_steps - self._step}." ) reward_val = round(max(-1.0, min(1.0, reward_val)), 4) return Reward(value=reward_val, breakdown=breakdown, reason=feedback), feedback, grader_result def _deduplicate(self, incoming: List[ReviewComment]) -> List[ReviewComment]: """Remove comments whose (line, category, message[:40]) already exist.""" existing_keys = { (c.line, c.category, c.message[:40]) for c in self._comments } new: List[ReviewComment] = [] for c in incoming: key = (c.line, c.category, c.message[:40]) if key not in existing_keys: existing_keys.add(key) new.append(c) return new