Spaces:
Sleeping
Sleeping
| """ | |
| Core RL environment for the CI/CD Doctor. | |
| Ties together generator, stage_runner, and parser into a step/reset/state loop. | |
| """ | |
| import uuid | |
| from models import PipelineAction, PipelineObservation, PipelineState | |
| from core.scenarios.generator import generate_easy_scenario, generate_medium_scenario, generate_hard_scenario | |
| from core.pipeline.stage_runner import run_pipeline | |
| from core.validation.parser import parse_command | |
| from core.grading.grader import grade as grade_state, balance_score, StepContext, current_milestone_level | |
| MAX_STEPS_BY_TASK = {"easy": 10, "medium": 15, "hard": 25} | |
| IDEAL_STEPS_BY_TASK = {"easy": 3, "medium": 6, "hard": 10} | |
| DEFAULT_MAX_STEPS = 15 | |
| DEFAULT_IDEAL_STEPS = 6 | |
| MAX_DIAGNOSE_PER_EPISODE = 2 | |
| DIAGNOSIS_KEYWORD_THRESHOLD = 2 | |
| class PipelineEnvironment: | |
| def reset(self, task: str = "easy", seed: int = 42) -> PipelineObservation: | |
| if task == "medium": | |
| scenario = generate_medium_scenario(seed) | |
| elif task == "hard": | |
| scenario = generate_hard_scenario(seed) | |
| else: | |
| scenario = generate_easy_scenario(seed) | |
| self._filesystem = scenario["filesystem"] | |
| self._answer_key = scenario["answer_key"] | |
| self._task = task | |
| self._max_steps = MAX_STEPS_BY_TASK.get(task, DEFAULT_MAX_STEPS) | |
| self._ideal_steps = IDEAL_STEPS_BY_TASK.get(task, DEFAULT_IDEAL_STEPS) | |
| self._step_count = 0 | |
| self._done = False | |
| self._total_reward = 0.0 | |
| self._pipeline_status = "not_run" | |
| self._episode_id = str(uuid.uuid4()) | |
| self._last_logs: dict = {} | |
| self._milestones: set[str] = set() | |
| self._files_read: set[str] = set() | |
| self._fs_snapshot_at_last_run: str | None = None # None = no run yet | |
| self._pipeline_runs_since_last_edit: int = 0 | |
| self._last_score = grade_state(self.state()) | |
| self._prev_milestone_level = 0 | |
| self._edits_per_file: dict[str, int] = {} | |
| self._files_edited_without_reading: set[str] = set() | |
| self._diagnose_count: int = 0 | |
| files = ", ".join(sorted(self._filesystem.keys())) | |
| return PipelineObservation( | |
| stdout=( | |
| "Pipeline environment ready. The pipeline is failing. " | |
| f"Files: {files}. Investigate and fix it." | |
| ), | |
| exit_code=0, | |
| pipeline_status=self._pipeline_status, | |
| steps_remaining=self._max_steps, | |
| done=False, | |
| reward=0.0, | |
| ) | |
| def step(self, action: PipelineAction) -> PipelineObservation: | |
| if self._done: | |
| raise RuntimeError("Episode is done. Call reset() first.") | |
| self._step_count += 1 | |
| steps_remaining = self._max_steps - self._step_count | |
| cmd = parse_command(action.command) | |
| stdout = "" | |
| exit_code = 0 | |
| diagnosis_correct = False | |
| if cmd.type == "cat": | |
| content = self._filesystem.get(cmd.filename) | |
| if content is None: | |
| stdout = f"cat: {cmd.filename}: No such file or directory" | |
| exit_code = 1 | |
| else: | |
| stdout = content | |
| elif cmd.type == "echo_append": | |
| if cmd.filename in self._filesystem: | |
| self._filesystem[cmd.filename] += cmd.content + "\n" | |
| stdout = "" | |
| else: | |
| stdout = f"bash: {cmd.filename}: No such file or directory" | |
| exit_code = 1 | |
| elif cmd.type == "sed": | |
| if cmd.filename in self._filesystem: | |
| pattern = cmd.pattern.replace("\\n", "\n").replace("\\t", "\t") | |
| replacement = cmd.replacement.replace("\\n", "\n").replace("\\t", "\t") | |
| self._filesystem[cmd.filename] = self._filesystem[cmd.filename].replace( | |
| pattern, replacement | |
| ) | |
| stdout = "" | |
| else: | |
| stdout = f"sed: {cmd.filename}: No such file or directory" | |
| exit_code = 1 | |
| elif cmd.type == "pipeline_run": | |
| result = run_pipeline(self._filesystem, task=self._task) | |
| self._last_logs["last"] = result["logs"] | |
| self._filesystem["logs/install.log"] = result["logs"] | |
| self._pipeline_status = "passed" if result["exit_code"] == 0 else "failed" | |
| stdout = result["logs"] | |
| exit_code = result["exit_code"] | |
| elif cmd.type == "pipeline_logs": | |
| if cmd.stage: | |
| stdout = self._last_logs.get( | |
| cmd.stage, | |
| self._last_logs.get("last", f"No logs for stage '{cmd.stage}' yet. Run pipeline first."), | |
| ) | |
| else: | |
| stdout = self._last_logs.get("last", "No pipeline runs yet.") | |
| elif cmd.type == "pipeline_status": | |
| stdout = f"Pipeline status: {self._pipeline_status}" | |
| elif cmd.type == "diagnose": | |
| stdout, diagnosis_correct = self._handle_diagnose(cmd.diagnosis or "") | |
| else: | |
| stdout = f"Command not recognized: {action.command}" | |
| exit_code = 1 | |
| if cmd.type in ("echo_append", "sed") and cmd.filename and exit_code == 0: | |
| if cmd.filename not in self._files_read: | |
| self._files_edited_without_reading.add(cmd.filename) | |
| self._edits_per_file[cmd.filename] = self._edits_per_file.get(cmd.filename, 0) + 1 | |
| cross_referenced = self._check_cross_referenced() | |
| ctx = StepContext( | |
| cmd_type=cmd.type, | |
| filename=getattr(cmd, "filename", None), | |
| files_read=set(self._files_read), | |
| fs_changed_since_last_run=( | |
| self._fs_snapshot_at_last_run is None | |
| or repr(self._filesystem) != self._fs_snapshot_at_last_run | |
| ), | |
| step_count=self._step_count, | |
| max_steps=self._max_steps, | |
| ideal_steps=self._ideal_steps, | |
| pipeline_runs_since_last_edit=self._pipeline_runs_since_last_edit, | |
| prev_milestone_level=self._prev_milestone_level, | |
| edits_per_file=dict(self._edits_per_file), | |
| files_edited_without_reading=set(self._files_edited_without_reading), | |
| diagnosis_correct=diagnosis_correct, | |
| cross_referenced=cross_referenced, | |
| ) | |
| self._update_milestones(cmd) | |
| current_score = grade_state(self.state()) | |
| grade_delta = round(current_score - self._last_score, 2) | |
| self._last_score = current_score | |
| shaped = balance_score(self.state(), ctx) | |
| reward = round(grade_delta + shaped, 2) | |
| self._prev_milestone_level = current_milestone_level(self.state()) | |
| self._total_reward += reward | |
| if cmd.type == "cat" and exit_code == 0 and cmd.filename: | |
| self._files_read.add(cmd.filename) | |
| if cmd.type in ("echo_append", "sed") and exit_code == 0: | |
| self._pipeline_runs_since_last_edit = 0 | |
| if cmd.type == "pipeline_run": | |
| self._pipeline_runs_since_last_edit += 1 | |
| self._fs_snapshot_at_last_run = repr(self._filesystem) | |
| self._edits_per_file.clear() | |
| if self._pipeline_status == "passed" or steps_remaining <= 0: | |
| self._done = True | |
| return PipelineObservation( | |
| stdout=stdout, | |
| exit_code=exit_code, | |
| pipeline_status=self._pipeline_status, | |
| steps_remaining=max(0, steps_remaining), | |
| done=self._done, | |
| reward=reward, | |
| ) | |
| def state(self) -> PipelineState: | |
| return PipelineState( | |
| episode_id=self._episode_id, | |
| task=self._task, | |
| filesystem=dict(self._filesystem), | |
| pipeline_status=self._pipeline_status, | |
| step_count=self._step_count, | |
| done=self._done, | |
| total_reward=self._total_reward, | |
| answer_key=self._answer_key, | |
| milestones=sorted(self._milestones), | |
| ) | |
| def _handle_diagnose(self, diagnosis: str) -> tuple[str, bool]: | |
| """ | |
| Handle the diagnose command. Compare diagnosis text against | |
| answer_key diagnosis_keywords. Returns (stdout, is_correct). | |
| """ | |
| if self._diagnose_count >= MAX_DIAGNOSE_PER_EPISODE: | |
| return "Diagnosis limit reached (max 2 per episode).", False | |
| self._diagnose_count += 1 | |
| diagnosis_lower = diagnosis.lower() | |
| fixes = self._answer_key.get("fixes", {}) | |
| for fix_desc in fixes.values(): | |
| if not isinstance(fix_desc, dict): | |
| continue | |
| keywords = fix_desc.get("diagnosis_keywords", []) | |
| matches = sum(1 for kw in keywords if kw.lower() in diagnosis_lower) | |
| if matches >= DIAGNOSIS_KEYWORD_THRESHOLD: | |
| return "Diagnosis recorded.", True | |
| return "Diagnosis recorded.", False | |
| def _check_cross_referenced(self) -> bool: | |
| """ | |
| Check if the agent has read at least 2 diagnostic files, | |
| indicating cross-file reasoning. | |
| """ | |
| diagnostic_files = set(self._answer_key.get("diagnostic_files", [])) | |
| read_diagnostic = self._files_read & diagnostic_files | |
| return len(read_diagnostic) >= 2 | |
| def _update_milestones(self, cmd) -> None: | |
| """Mark reward tiers unlocked by this step. Each tier fires once.""" | |
| if cmd.type == "pipeline_run": | |
| self._milestones.add("investigated") | |
| is_log_read = cmd.type == "pipeline_logs" or ( | |
| cmd.type == "cat" and cmd.filename == "logs/install.log" | |
| ) | |
| if is_log_read and self._pipeline_status == "failed": | |
| self._milestones.add("logs_read") | |
| fix_files = set(self._answer_key.get("fixes", {}).keys()) | |
| if cmd.type == "cat" and cmd.filename in fix_files: | |
| self._milestones.add("correct_file_located") | |
| diagnostic_files = set(self._answer_key.get("diagnostic_files", [])) | |
| if cmd.type == "cat" and cmd.filename in diagnostic_files: | |
| self._milestones.add("diagnosed") | |
| if self._check_cross_referenced(): | |
| self._milestones.add("cross_referenced") | |
| def _grade(self) -> tuple[float, bool]: | |
| """ | |
| Delegate to the unified grader. Same rule for every task difficulty. | |
| Episode ends only when the pipeline has passed. | |
| """ | |
| score = grade_state(self.state()) | |
| done = self._pipeline_status == "passed" | |
| return score, done | |