| """Core environment logic.""" |
|
|
| import copy |
| import random |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from server.models import ( |
| Action, |
| ActionType, |
| ErrorInfo, |
| ErrorPhase, |
| FileContent, |
| FileType, |
| Observation, |
| TaskDifficulty, |
| ) |
| from server.simulators.docker_simulator import DockerSimulator |
| from server.simulators.k8s_simulator import KubernetesSimulator |
| from server.simulators.workflow_simulator import WorkflowSimulator |
| from server.tasks.task_registry import TASK_REGISTRY, get_task |
|
|
|
|
| class CloudNativeDebugEnvironment: |
| MAX_STEPS = 10 |
| MAX_HINTS = 3 |
|
|
| def _resolve_task_id(self, task_id: Optional[object]) -> Optional[str]: |
| """Resolve task identifiers from either canonical string IDs or numeric indices. |
| |
| Supports: |
| - canonical ids (e.g. "dockerfile_syntax") |
| - numeric values (1-based preferred, 0-based tolerated) |
| - numeric strings ("1", "0") |
| """ |
| if task_id is None: |
| return None |
|
|
| keys = list(TASK_REGISTRY.keys()) |
|
|
| if isinstance(task_id, int): |
| if 1 <= task_id <= len(keys): |
| return keys[task_id - 1] |
| if 0 <= task_id < len(keys): |
| return keys[task_id] |
| return str(task_id) |
|
|
| if isinstance(task_id, str): |
| raw = task_id.strip() |
| if raw in TASK_REGISTRY: |
| return raw |
| if raw.isdigit(): |
| numeric = int(raw) |
| if 1 <= numeric <= len(keys): |
| return keys[numeric - 1] |
| if 0 <= numeric < len(keys): |
| return keys[numeric] |
| return raw |
| lowered = raw.lower() |
| for key in keys: |
| if key.lower() == lowered: |
| return key |
| return raw |
|
|
| return str(task_id) |
|
|
| def _find_workflow_file(self) -> Optional[FileContent]: |
| """Return the first workflow file found in current_files.""" |
| for path, fc in self.current_files.items(): |
| if path.startswith(".github/workflows/") and path.endswith(".yml"): |
| return fc |
| return None |
|
|
| def _validation_snapshot(self) -> Dict[str, bool]: |
| """Return a detailed snapshot of all 7 simulator checks.""" |
| docker_result = self.docker_sim.validate(self.current_files.get("Dockerfile"), self.current_files) |
| workflow_file = self._find_workflow_file() |
| workflow_result = self.workflow_sim.validate(workflow_file, self.current_files) |
| k8s_result = self.k8s_sim.validate(self.current_files) |
|
|
| has_docker = "Dockerfile" in self.current_files |
| has_workflow = workflow_file is not None |
| has_k8s = any(fc.file_type == FileType.KUBERNETES for fc in self.current_files.values()) |
|
|
| snapshot: Dict[str, bool] = {} |
| if has_docker: |
| snapshot["docker_build_valid"] = bool(docker_result.get("build_success", False)) |
| snapshot["docker_run_valid"] = bool(docker_result.get("run_success", False)) |
| if has_workflow: |
| snapshot["workflow_parse_valid"] = bool(workflow_result.get("parse_success", False)) |
| snapshot["workflow_exec_valid"] = bool(workflow_result.get("execution_success", False)) |
| if has_k8s: |
| snapshot["k8s_valid"] = bool(k8s_result.get("valid", True)) |
| snapshot["k8s_pod_running"] = k8s_result.get("pod_status", "N/A") == "Running" |
| svc = k8s_result.get("service_status", "N/A") |
| snapshot["k8s_service_active"] = "active" in svc.lower() or svc == "N/A" |
|
|
| return snapshot |
|
|
| def __init__(self): |
| self.docker_sim = DockerSimulator() |
| self.workflow_sim = WorkflowSimulator() |
| self.k8s_sim = KubernetesSimulator() |
|
|
| self.current_task_id: Optional[str] = None |
| self.current_scenario_id: Optional[str] = None |
| self.current_difficulty: Optional[TaskDifficulty] = None |
| self.current_task = None |
|
|
| self.original_files: Dict[str, FileContent] = {} |
| self.current_files: Dict[str, FileContent] = {} |
| self.expected_fixes: List[Dict[str, Any]] = [] |
|
|
| self.current_error: Optional[ErrorInfo] = None |
| self.issues_total = 0 |
| self.issues_fixed = 0 |
|
|
| self.step_count = 0 |
| self.episode_reward = 0.0 |
| self.done = False |
| self.hints_used = 0 |
|
|
| self.trajectory: List[Dict[str, Any]] = [] |
| self.last_action_success: Optional[bool] = None |
| self.last_action_feedback: Optional[str] = None |
|
|
| def reset(self, task_id: Optional[str] = None, scenario_id: Optional[str] = None, seed: Optional[int] = None): |
| if seed is not None: |
| random.seed(seed) |
|
|
| task_id = self._resolve_task_id(task_id) |
|
|
| if task_id is None: |
| task_id = random.choice(list(TASK_REGISTRY.keys())) |
| if task_id not in TASK_REGISTRY: |
| raise ValueError(f"Unknown task: {task_id}") |
|
|
| self.current_task_id = task_id |
| self.current_task = get_task(task_id) |
| self.current_difficulty = self.current_task.DIFFICULTY |
|
|
| scenario = self.current_task.load_scenario(scenario_id) |
| self.current_scenario_id = scenario["id"] |
|
|
| self.original_files = { |
| f["path"]: FileContent( |
| path=f["path"], |
| content=f["content"], |
| file_type=FileType(f["type"]), |
| line_count=f["content"].count("\n") + 1, |
| ) |
| for f in scenario["files"] |
| } |
| self.current_files = copy.deepcopy(self.original_files) |
|
|
| self.current_error = ErrorInfo( |
| phase=ErrorPhase(scenario["error"]["phase"]), |
| error_message=scenario["error"]["message"], |
| exit_code=scenario["error"].get("exit_code"), |
| failed_step=scenario["error"].get("failed_step"), |
| line_hint=scenario["error"].get("line_hint"), |
| ) |
|
|
| self.expected_fixes = scenario["expected_fixes"] |
|
|
| |
| self.initial_snapshot = self._validation_snapshot() |
| |
| self.issues_total = sum(1 for v in self.initial_snapshot.values() if not v) |
| |
| self.issues_total = max(1, self.issues_total) |
| self.issues_fixed = 0 |
|
|
| self.step_count = 0 |
| self.episode_reward = 0.0 |
| self.done = False |
| self.hints_used = 0 |
| self.trajectory = [] |
| self.last_action_success = None |
| self.last_action_feedback = None |
|
|
| return self.get_observation() |
|
|
| def step(self, action: Action) -> Tuple[Observation, float, bool, Dict[str, Any]]: |
| if self.done: |
| return self.get_observation(), 0.0, True, {"error": "Episode already done"} |
|
|
| self.step_count += 1 |
|
|
| if action.action_type == ActionType.REQUEST_HINT: |
| reward, feedback = self._handle_hint_request() |
| elif action.action_type == ActionType.SUBMIT: |
| reward, feedback = self._handle_submit() |
| else: |
| reward, feedback = self._handle_edit(action) |
|
|
| self.last_action_feedback = feedback |
| self.episode_reward += reward |
|
|
| info: Dict[str, Any] = {} |
| if self.step_count >= self.MAX_STEPS: |
| self.done = True |
| info["termination_reason"] = "max_steps" |
| elif action.action_type == ActionType.SUBMIT: |
| self.done = True |
| info["termination_reason"] = "submitted" |
| elif self.issues_fixed == self.issues_total: |
| self.done = True |
| info["termination_reason"] = "all_fixed" |
|
|
| info["issues_fixed"] = self.issues_fixed |
| info["issues_total"] = self.issues_total |
|
|
| self.trajectory.append( |
| {"step": self.step_count, "action": action.model_dump(), "reward": reward, "done": self.done, "info": info} |
| ) |
|
|
| return self.get_observation(), reward, self.done, info |
|
|
| def _handle_edit(self, action: Action) -> Tuple[float, str]: |
| if not action.edits: |
| self.last_action_success = False |
| return 0.0, "No edits provided" |
|
|
| reward = 0.0 |
| feedbacks: List[str] = [] |
| applied_count = 0 |
| for edit in action.edits: |
| if edit.file_path not in self.current_files: |
| |
| if action.action_type == ActionType.EDIT_FILE and edit.new_content: |
| ft = FileType.OTHER |
| if edit.file_path.startswith("k8s/") or edit.file_path.endswith(".yaml") or edit.file_path.endswith(".yml"): |
| ft = FileType.KUBERNETES |
| self.current_files[edit.file_path] = FileContent( |
| path=edit.file_path, |
| content=edit.new_content, |
| file_type=ft, |
| line_count=edit.new_content.count("\n") + 1, |
| ) |
| feedbacks.append(f"Created new file: {edit.file_path}") |
| applied_count += 1 |
| continue |
| feedbacks.append(f"File not found: {edit.file_path}") |
| continue |
|
|
| file_content = self.current_files[edit.file_path] |
| lines = file_content.content.split("\n") |
|
|
| if action.action_type == ActionType.REPLACE_LINE: |
| if edit.line_number and 1 <= edit.line_number <= len(lines): |
| lines[edit.line_number - 1] = edit.new_content or "" |
| feedbacks.append(f"Replaced line {edit.line_number} in {edit.file_path}") |
| applied_count += 1 |
| else: |
| feedbacks.append(f"Invalid line number: {edit.line_number}") |
| continue |
| elif action.action_type == ActionType.ADD_LINE: |
| insert_at = edit.line_number - 1 if edit.line_number else len(lines) |
| insert_at = max(0, min(insert_at, len(lines))) |
| lines.insert(insert_at, edit.new_content or "") |
| feedbacks.append(f"Added line at {insert_at + 1} in {edit.file_path}") |
| applied_count += 1 |
| elif action.action_type == ActionType.DELETE_LINE: |
| if edit.line_number and 1 <= edit.line_number <= len(lines): |
| del lines[edit.line_number - 1] |
| feedbacks.append(f"Deleted line {edit.line_number} in {edit.file_path}") |
| applied_count += 1 |
| else: |
| feedbacks.append(f"Invalid line number: {edit.line_number}") |
| continue |
| elif action.action_type == ActionType.EDIT_FILE: |
| if edit.old_content and edit.old_content in file_content.content: |
| new_content = file_content.content.replace(edit.old_content, edit.new_content or "", 1) |
| lines = new_content.split("\n") |
| feedbacks.append(f"Edited content in {edit.file_path}") |
| applied_count += 1 |
| else: |
| feedbacks.append(f"Content not found in {edit.file_path}") |
| continue |
| elif action.action_type == ActionType.ADD_BLOCK: |
| insert_at = edit.line_number - 1 if edit.line_number else len(lines) |
| insert_at = max(0, min(insert_at, len(lines))) |
| block_lines = (edit.new_content or "").split("\n") |
| lines[insert_at:insert_at] = block_lines |
| feedbacks.append(f"Added block at {insert_at + 1} in {edit.file_path}") |
| applied_count += 1 |
| elif action.action_type == ActionType.DELETE_BLOCK: |
| if edit.old_content and edit.old_content in file_content.content: |
| new_content = file_content.content.replace(edit.old_content, "", 1) |
| lines = new_content.split("\n") |
| feedbacks.append(f"Deleted block in {edit.file_path}") |
| applied_count += 1 |
| else: |
| feedbacks.append(f"Block content not found in {edit.file_path}") |
| continue |
| else: |
| feedbacks.append(f"Unsupported action type for now: {action.action_type.value}") |
| continue |
|
|
| new_content = "\n".join(lines) |
| self.current_files[edit.file_path] = FileContent( |
| path=file_content.path, |
| content=new_content, |
| file_type=file_content.file_type, |
| line_count=len(lines), |
| ) |
|
|
| reward += self._check_fix_progress() |
|
|
| if applied_count == 0: |
| self.last_action_success = False |
| return max(-0.02, reward - 0.02), "; ".join(feedbacks) or "No edit applied" |
|
|
| self.last_action_success = True |
| return max(0.0, reward), "; ".join(feedbacks) |
|
|
| def _check_fix_progress(self) -> float: |
| """Check fix progress by comparing current simulator state against initial broken state. |
| |
| Counts how many simulator checks flipped from fail→pass since reset. |
| """ |
| current_snapshot = self._validation_snapshot() |
|
|
| fixes_now = 0 |
| for key, initially_broken in self.initial_snapshot.items(): |
| if not initially_broken and current_snapshot.get(key, False): |
| |
| fixes_now += 1 |
|
|
| new_fixed = fixes_now - self.issues_fixed |
| if new_fixed > 0: |
| self.issues_fixed = fixes_now |
| return 0.3 * new_fixed |
| return 0.0 |
|
|
| def _handle_submit(self) -> Tuple[float, str]: |
| docker_result = self.docker_sim.validate(self.current_files.get("Dockerfile"), self.current_files) |
| workflow_file = self._find_workflow_file() |
| workflow_result = self.workflow_sim.validate(workflow_file, self.current_files) |
| k8s_result = self.k8s_sim.validate(self.current_files) |
|
|
| has_k8s = any(fc.file_type == FileType.KUBERNETES for fc in self.current_files.values()) |
| has_docker = "Dockerfile" in self.current_files |
| has_workflow = workflow_file is not None |
|
|
| reward = 0.0 |
| parts: List[str] = [] |
|
|
| |
| if has_docker and has_workflow and has_k8s: |
| |
| docker_w, wf_w, k8s_w = 0.20, 0.30, 0.30 |
| elif has_docker and has_workflow: |
| docker_w, wf_w, k8s_w = 0.50, 0.50, 0.0 |
| elif has_docker and has_k8s: |
| docker_w, wf_w, k8s_w = 0.40, 0.0, 0.40 |
| elif has_workflow and has_k8s: |
| docker_w, wf_w, k8s_w = 0.0, 0.40, 0.40 |
| elif has_k8s: |
| docker_w, wf_w, k8s_w = 0.0, 0.0, 0.80 |
| elif has_docker: |
| docker_w, wf_w, k8s_w = 0.50, 0.0, 0.0 |
| else: |
| docker_w, wf_w, k8s_w = 0.0, 0.50, 0.0 |
|
|
| |
| if has_docker: |
| if docker_result.get("build_success"): |
| reward += docker_w * 0.6 |
| parts.append("Docker build: PASS") |
| else: |
| parts.append(f"Docker build: FAIL - {docker_result.get('error', 'unknown')}") |
|
|
| if docker_result.get("run_success"): |
| reward += docker_w * 0.4 |
| parts.append("Docker run: PASS") |
| else: |
| parts.append(f"Docker run: FAIL - {docker_result.get('run_error', 'unknown')}") |
|
|
| |
| if has_workflow: |
| if workflow_result["parse_success"]: |
| reward += wf_w * 0.4 |
| parts.append("Workflow parse: PASS") |
| else: |
| parts.append(f"Workflow parse: FAIL - {workflow_result.get('error', 'unknown')}") |
|
|
| if workflow_result["execution_success"]: |
| reward += wf_w * 0.6 |
| parts.append("Workflow execution: PASS") |
| else: |
| parts.append(f"Workflow execution: FAIL - {workflow_result.get('exec_error', 'unknown')}") |
|
|
| |
| if has_k8s: |
| if k8s_result["valid"]: |
| reward += k8s_w * 0.4 |
| parts.append("K8s manifests: VALID") |
| else: |
| k8s_errors = k8s_result.get("errors", []) |
| parts.append(f"K8s manifests: INVALID - {'; '.join(k8s_errors[:2])}") |
|
|
| pod_status = k8s_result.get("pod_status", "N/A") |
| if pod_status == "Running": |
| reward += k8s_w * 0.3 |
| parts.append(f"K8s pod status: {pod_status}") |
| else: |
| parts.append(f"K8s pod status: {pod_status}") |
|
|
| svc_status = k8s_result.get("service_status", "N/A") |
| if "active" in svc_status.lower() or svc_status == "N/A": |
| reward += k8s_w * 0.3 |
| parts.append(f"K8s service: {svc_status}") |
| else: |
| parts.append(f"K8s service: {svc_status}") |
|
|
| self.last_action_success = reward >= 0.6 |
| return reward, "; ".join(parts) |
|
|
| def _handle_hint_request(self) -> Tuple[float, str]: |
| if self.hints_used >= self.MAX_HINTS: |
| self.last_action_success = False |
| return 0.0, "No hints remaining" |
|
|
| self.hints_used += 1 |
| for fix in self.expected_fixes: |
| file_path = fix["file"] |
| if file_path not in self.current_files: |
| continue |
| current_content = self.current_files[file_path].content |
| is_fixed = False |
| if fix["type"] == "contains": |
| is_fixed = fix["expected"] in current_content |
| elif fix["type"] == "not_contains": |
| is_fixed = fix["expected"] not in current_content |
| if not is_fixed: |
| hint = fix.get("hint", f"Check {file_path}") |
| self.last_action_success = True |
| return -0.05, f"Hint ({self.hints_used}/{self.MAX_HINTS}): {hint}" |
|
|
| self.last_action_success = True |
| return 0.0, "All known issues appear to be fixed" |
|
|
| def get_observation(self) -> Observation: |
| return Observation( |
| task_id=self.current_task_id or "unknown", |
| task_description=self.current_task.DESCRIPTION if self.current_task else "", |
| difficulty=self.current_difficulty or TaskDifficulty.EASY, |
| files=list(self.current_files.values()), |
| error=self.current_error or ErrorInfo( |
| phase=ErrorPhase.DOCKER_BUILD, |
| error_message="No error", |
| ), |
| available_secrets=self.current_task.AVAILABLE_SECRETS if self.current_task else [], |
| runner_os="ubuntu-latest", |
| step_number=self.step_count, |
| max_steps=self.MAX_STEPS, |
| hints_used=self.hints_used, |
| hints_available=self.MAX_HINTS - self.hints_used, |
| last_action_success=self.last_action_success, |
| last_action_feedback=self.last_action_feedback, |
| issues_found=self.issues_fixed, |
| issues_fixed=self.issues_fixed, |
| total_issues=max(1, self.issues_total), |
| ) |
|
|