""" Core RL environment for the CI/CD Doctor. Ties together generator, stage_runner, and parser into a step/reset/state loop. """ import uuid from models import PipelineAction, PipelineObservation, PipelineState from core.scenarios.generator import generate_easy_scenario, generate_medium_scenario, generate_hard_scenario from core.pipeline.stage_runner import run_pipeline from core.validation.parser import parse_command from core.grading.grader import grade as grade_state, balance_score, StepContext, current_milestone_level MAX_STEPS_BY_TASK = {"easy": 10, "medium": 15, "hard": 25} IDEAL_STEPS_BY_TASK = {"easy": 3, "medium": 6, "hard": 10} DEFAULT_MAX_STEPS = 15 DEFAULT_IDEAL_STEPS = 6 MAX_DIAGNOSE_PER_EPISODE = 2 DIAGNOSIS_KEYWORD_THRESHOLD = 2 class PipelineEnvironment: def reset(self, task: str = "easy", seed: int = 42) -> PipelineObservation: if task == "medium": scenario = generate_medium_scenario(seed) elif task == "hard": scenario = generate_hard_scenario(seed) else: scenario = generate_easy_scenario(seed) self._filesystem = scenario["filesystem"] self._answer_key = scenario["answer_key"] self._task = task self._max_steps = MAX_STEPS_BY_TASK.get(task, DEFAULT_MAX_STEPS) self._ideal_steps = IDEAL_STEPS_BY_TASK.get(task, DEFAULT_IDEAL_STEPS) self._step_count = 0 self._done = False self._total_reward = 0.0 self._pipeline_status = "not_run" self._episode_id = str(uuid.uuid4()) self._last_logs: dict = {} self._milestones: set[str] = set() self._files_read: set[str] = set() self._fs_snapshot_at_last_run: str | None = None # None = no run yet self._pipeline_runs_since_last_edit: int = 0 self._last_score = grade_state(self.state()) self._prev_milestone_level = 0 self._edits_per_file: dict[str, int] = {} self._files_edited_without_reading: set[str] = set() self._diagnose_count: int = 0 files = ", ".join(sorted(self._filesystem.keys())) return PipelineObservation( stdout=( "Pipeline environment ready. The pipeline is failing. " f"Files: {files}. Investigate and fix it." ), exit_code=0, pipeline_status=self._pipeline_status, steps_remaining=self._max_steps, done=False, reward=0.0, ) def step(self, action: PipelineAction) -> PipelineObservation: if self._done: raise RuntimeError("Episode is done. Call reset() first.") self._step_count += 1 steps_remaining = self._max_steps - self._step_count cmd = parse_command(action.command) stdout = "" exit_code = 0 diagnosis_correct = False if cmd.type == "cat": content = self._filesystem.get(cmd.filename) if content is None: stdout = f"cat: {cmd.filename}: No such file or directory" exit_code = 1 else: stdout = content elif cmd.type == "echo_append": if cmd.filename in self._filesystem: self._filesystem[cmd.filename] += cmd.content + "\n" stdout = "" else: stdout = f"bash: {cmd.filename}: No such file or directory" exit_code = 1 elif cmd.type == "sed": if cmd.filename in self._filesystem: pattern = cmd.pattern.replace("\\n", "\n").replace("\\t", "\t") replacement = cmd.replacement.replace("\\n", "\n").replace("\\t", "\t") self._filesystem[cmd.filename] = self._filesystem[cmd.filename].replace( pattern, replacement ) stdout = "" else: stdout = f"sed: {cmd.filename}: No such file or directory" exit_code = 1 elif cmd.type == "pipeline_run": result = run_pipeline(self._filesystem, task=self._task) self._last_logs["last"] = result["logs"] self._filesystem["logs/install.log"] = result["logs"] self._pipeline_status = "passed" if result["exit_code"] == 0 else "failed" stdout = result["logs"] exit_code = result["exit_code"] elif cmd.type == "pipeline_logs": if cmd.stage: stdout = self._last_logs.get( cmd.stage, self._last_logs.get("last", f"No logs for stage '{cmd.stage}' yet. Run pipeline first."), ) else: stdout = self._last_logs.get("last", "No pipeline runs yet.") elif cmd.type == "pipeline_status": stdout = f"Pipeline status: {self._pipeline_status}" elif cmd.type == "diagnose": stdout, diagnosis_correct = self._handle_diagnose(cmd.diagnosis or "") else: stdout = f"Command not recognized: {action.command}" exit_code = 1 if cmd.type in ("echo_append", "sed") and cmd.filename and exit_code == 0: if cmd.filename not in self._files_read: self._files_edited_without_reading.add(cmd.filename) self._edits_per_file[cmd.filename] = self._edits_per_file.get(cmd.filename, 0) + 1 cross_referenced = self._check_cross_referenced() ctx = StepContext( cmd_type=cmd.type, filename=getattr(cmd, "filename", None), files_read=set(self._files_read), fs_changed_since_last_run=( self._fs_snapshot_at_last_run is None or repr(self._filesystem) != self._fs_snapshot_at_last_run ), step_count=self._step_count, max_steps=self._max_steps, ideal_steps=self._ideal_steps, pipeline_runs_since_last_edit=self._pipeline_runs_since_last_edit, prev_milestone_level=self._prev_milestone_level, edits_per_file=dict(self._edits_per_file), files_edited_without_reading=set(self._files_edited_without_reading), diagnosis_correct=diagnosis_correct, cross_referenced=cross_referenced, ) self._update_milestones(cmd) current_score = grade_state(self.state()) grade_delta = round(current_score - self._last_score, 2) self._last_score = current_score shaped = balance_score(self.state(), ctx) reward = round(grade_delta + shaped, 2) self._prev_milestone_level = current_milestone_level(self.state()) self._total_reward += reward if cmd.type == "cat" and exit_code == 0 and cmd.filename: self._files_read.add(cmd.filename) if cmd.type in ("echo_append", "sed") and exit_code == 0: self._pipeline_runs_since_last_edit = 0 if cmd.type == "pipeline_run": self._pipeline_runs_since_last_edit += 1 self._fs_snapshot_at_last_run = repr(self._filesystem) self._edits_per_file.clear() if self._pipeline_status == "passed" or steps_remaining <= 0: self._done = True return PipelineObservation( stdout=stdout, exit_code=exit_code, pipeline_status=self._pipeline_status, steps_remaining=max(0, steps_remaining), done=self._done, reward=reward, ) def state(self) -> PipelineState: return PipelineState( episode_id=self._episode_id, task=self._task, filesystem=dict(self._filesystem), pipeline_status=self._pipeline_status, step_count=self._step_count, done=self._done, total_reward=self._total_reward, answer_key=self._answer_key, milestones=sorted(self._milestones), ) def _handle_diagnose(self, diagnosis: str) -> tuple[str, bool]: """ Handle the diagnose command. Compare diagnosis text against answer_key diagnosis_keywords. Returns (stdout, is_correct). """ if self._diagnose_count >= MAX_DIAGNOSE_PER_EPISODE: return "Diagnosis limit reached (max 2 per episode).", False self._diagnose_count += 1 diagnosis_lower = diagnosis.lower() fixes = self._answer_key.get("fixes", {}) for fix_desc in fixes.values(): if not isinstance(fix_desc, dict): continue keywords = fix_desc.get("diagnosis_keywords", []) matches = sum(1 for kw in keywords if kw.lower() in diagnosis_lower) if matches >= DIAGNOSIS_KEYWORD_THRESHOLD: return "Diagnosis recorded.", True return "Diagnosis recorded.", False def _check_cross_referenced(self) -> bool: """ Check if the agent has read at least 2 diagnostic files, indicating cross-file reasoning. """ diagnostic_files = set(self._answer_key.get("diagnostic_files", [])) read_diagnostic = self._files_read & diagnostic_files return len(read_diagnostic) >= 2 def _update_milestones(self, cmd) -> None: """Mark reward tiers unlocked by this step. Each tier fires once.""" if cmd.type == "pipeline_run": self._milestones.add("investigated") is_log_read = cmd.type == "pipeline_logs" or ( cmd.type == "cat" and cmd.filename == "logs/install.log" ) if is_log_read and self._pipeline_status == "failed": self._milestones.add("logs_read") fix_files = set(self._answer_key.get("fixes", {}).keys()) if cmd.type == "cat" and cmd.filename in fix_files: self._milestones.add("correct_file_located") diagnostic_files = set(self._answer_key.get("diagnostic_files", [])) if cmd.type == "cat" and cmd.filename in diagnostic_files: self._milestones.add("diagnosed") if self._check_cross_referenced(): self._milestones.add("cross_referenced") def _grade(self) -> tuple[float, bool]: """ Delegate to the unified grader. Same rule for every task difficulty. Episode ends only when the pipeline has passed. """ score = grade_state(self.state()) done = self._pipeline_status == "passed" return score, done