"""Core environment logic.""" import copy import random from typing import Any, Dict, List, Optional, Tuple from server.models import ( Action, ActionType, ErrorInfo, ErrorPhase, FileContent, FileType, Observation, TaskDifficulty, ) from server.simulators.docker_simulator import DockerSimulator from server.simulators.k8s_simulator import KubernetesSimulator from server.simulators.workflow_simulator import WorkflowSimulator from server.tasks.task_registry import TASK_REGISTRY, get_task class CloudNativeDebugEnvironment: MAX_STEPS = 10 MAX_HINTS = 3 def _resolve_task_id(self, task_id: Optional[object]) -> Optional[str]: """Resolve task identifiers from either canonical string IDs or numeric indices. Supports: - canonical ids (e.g. "dockerfile_syntax") - numeric values (1-based preferred, 0-based tolerated) - numeric strings ("1", "0") """ if task_id is None: return None keys = list(TASK_REGISTRY.keys()) if isinstance(task_id, int): if 1 <= task_id <= len(keys): return keys[task_id - 1] if 0 <= task_id < len(keys): return keys[task_id] return str(task_id) if isinstance(task_id, str): raw = task_id.strip() if raw in TASK_REGISTRY: return raw if raw.isdigit(): numeric = int(raw) if 1 <= numeric <= len(keys): return keys[numeric - 1] if 0 <= numeric < len(keys): return keys[numeric] return raw lowered = raw.lower() for key in keys: if key.lower() == lowered: return key return raw return str(task_id) def _find_workflow_file(self) -> Optional[FileContent]: """Return the first workflow file found in current_files.""" for path, fc in self.current_files.items(): if path.startswith(".github/workflows/") and path.endswith(".yml"): return fc return None def _validation_snapshot(self) -> Dict[str, bool]: """Return a detailed snapshot of all 7 simulator checks.""" docker_result = self.docker_sim.validate(self.current_files.get("Dockerfile"), self.current_files) workflow_file = self._find_workflow_file() workflow_result = self.workflow_sim.validate(workflow_file, self.current_files) k8s_result = self.k8s_sim.validate(self.current_files) has_docker = "Dockerfile" in self.current_files has_workflow = workflow_file is not None has_k8s = any(fc.file_type == FileType.KUBERNETES for fc in self.current_files.values()) snapshot: Dict[str, bool] = {} if has_docker: snapshot["docker_build_valid"] = bool(docker_result.get("build_success", False)) snapshot["docker_run_valid"] = bool(docker_result.get("run_success", False)) if has_workflow: snapshot["workflow_parse_valid"] = bool(workflow_result.get("parse_success", False)) snapshot["workflow_exec_valid"] = bool(workflow_result.get("execution_success", False)) if has_k8s: snapshot["k8s_valid"] = bool(k8s_result.get("valid", True)) snapshot["k8s_pod_running"] = k8s_result.get("pod_status", "N/A") == "Running" svc = k8s_result.get("service_status", "N/A") snapshot["k8s_service_active"] = "active" in svc.lower() or svc == "N/A" return snapshot def __init__(self): self.docker_sim = DockerSimulator() self.workflow_sim = WorkflowSimulator() self.k8s_sim = KubernetesSimulator() self.current_task_id: Optional[str] = None self.current_scenario_id: Optional[str] = None self.current_difficulty: Optional[TaskDifficulty] = None self.current_task = None self.original_files: Dict[str, FileContent] = {} self.current_files: Dict[str, FileContent] = {} self.expected_fixes: List[Dict[str, Any]] = [] self.current_error: Optional[ErrorInfo] = None self.issues_total = 0 self.issues_fixed = 0 self.step_count = 0 self.episode_reward = 0.0 self.done = False self.hints_used = 0 self.trajectory: List[Dict[str, Any]] = [] self.last_action_success: Optional[bool] = None self.last_action_feedback: Optional[str] = None def reset(self, task_id: Optional[str] = None, scenario_id: Optional[str] = None, seed: Optional[int] = None): if seed is not None: random.seed(seed) task_id = self._resolve_task_id(task_id) if task_id is None: task_id = random.choice(list(TASK_REGISTRY.keys())) if task_id not in TASK_REGISTRY: raise ValueError(f"Unknown task: {task_id}") self.current_task_id = task_id self.current_task = get_task(task_id) self.current_difficulty = self.current_task.DIFFICULTY scenario = self.current_task.load_scenario(scenario_id) self.current_scenario_id = scenario["id"] self.original_files = { f["path"]: FileContent( path=f["path"], content=f["content"], file_type=FileType(f["type"]), line_count=f["content"].count("\n") + 1, ) for f in scenario["files"] } self.current_files = copy.deepcopy(self.original_files) self.current_error = ErrorInfo( phase=ErrorPhase(scenario["error"]["phase"]), error_message=scenario["error"]["message"], exit_code=scenario["error"].get("exit_code"), failed_step=scenario["error"].get("failed_step"), line_hint=scenario["error"].get("line_hint"), ) self.expected_fixes = scenario["expected_fixes"] # Snapshot the initial broken state from simulators self.initial_snapshot = self._validation_snapshot() # Count how many checks are initially failing — that's our issues_total self.issues_total = sum(1 for v in self.initial_snapshot.values() if not v) # Ensure at least 1 issue (the scenario is supposed to be broken) self.issues_total = max(1, self.issues_total) self.issues_fixed = 0 self.step_count = 0 self.episode_reward = 0.0 self.done = False self.hints_used = 0 self.trajectory = [] self.last_action_success = None self.last_action_feedback = None return self.get_observation() def step(self, action: Action) -> Tuple[Observation, float, bool, Dict[str, Any]]: if self.done: return self.get_observation(), 0.0, True, {"error": "Episode already done"} self.step_count += 1 if action.action_type == ActionType.REQUEST_HINT: reward, feedback = self._handle_hint_request() elif action.action_type == ActionType.SUBMIT: reward, feedback = self._handle_submit() else: reward, feedback = self._handle_edit(action) self.last_action_feedback = feedback self.episode_reward += reward info: Dict[str, Any] = {} if self.step_count >= self.MAX_STEPS: self.done = True info["termination_reason"] = "max_steps" elif action.action_type == ActionType.SUBMIT: self.done = True info["termination_reason"] = "submitted" elif self.issues_fixed == self.issues_total: self.done = True info["termination_reason"] = "all_fixed" info["issues_fixed"] = self.issues_fixed info["issues_total"] = self.issues_total self.trajectory.append( {"step": self.step_count, "action": action.model_dump(), "reward": reward, "done": self.done, "info": info} ) return self.get_observation(), reward, self.done, info def _handle_edit(self, action: Action) -> Tuple[float, str]: if not action.edits: self.last_action_success = False return 0.0, "No edits provided" reward = 0.0 feedbacks: List[str] = [] applied_count = 0 for edit in action.edits: if edit.file_path not in self.current_files: # Allow creating new files (needed for K8s ConfigMap scenarios etc.) if action.action_type == ActionType.EDIT_FILE and edit.new_content: ft = FileType.OTHER if edit.file_path.startswith("k8s/") or edit.file_path.endswith(".yaml") or edit.file_path.endswith(".yml"): ft = FileType.KUBERNETES self.current_files[edit.file_path] = FileContent( path=edit.file_path, content=edit.new_content, file_type=ft, line_count=edit.new_content.count("\n") + 1, ) feedbacks.append(f"Created new file: {edit.file_path}") applied_count += 1 continue feedbacks.append(f"File not found: {edit.file_path}") continue file_content = self.current_files[edit.file_path] lines = file_content.content.split("\n") if action.action_type == ActionType.REPLACE_LINE: if edit.line_number and 1 <= edit.line_number <= len(lines): lines[edit.line_number - 1] = edit.new_content or "" feedbacks.append(f"Replaced line {edit.line_number} in {edit.file_path}") applied_count += 1 else: feedbacks.append(f"Invalid line number: {edit.line_number}") continue elif action.action_type == ActionType.ADD_LINE: insert_at = edit.line_number - 1 if edit.line_number else len(lines) insert_at = max(0, min(insert_at, len(lines))) lines.insert(insert_at, edit.new_content or "") feedbacks.append(f"Added line at {insert_at + 1} in {edit.file_path}") applied_count += 1 elif action.action_type == ActionType.DELETE_LINE: if edit.line_number and 1 <= edit.line_number <= len(lines): del lines[edit.line_number - 1] feedbacks.append(f"Deleted line {edit.line_number} in {edit.file_path}") applied_count += 1 else: feedbacks.append(f"Invalid line number: {edit.line_number}") continue elif action.action_type == ActionType.EDIT_FILE: if edit.old_content and edit.old_content in file_content.content: new_content = file_content.content.replace(edit.old_content, edit.new_content or "", 1) lines = new_content.split("\n") feedbacks.append(f"Edited content in {edit.file_path}") applied_count += 1 else: feedbacks.append(f"Content not found in {edit.file_path}") continue elif action.action_type == ActionType.ADD_BLOCK: insert_at = edit.line_number - 1 if edit.line_number else len(lines) insert_at = max(0, min(insert_at, len(lines))) block_lines = (edit.new_content or "").split("\n") lines[insert_at:insert_at] = block_lines feedbacks.append(f"Added block at {insert_at + 1} in {edit.file_path}") applied_count += 1 elif action.action_type == ActionType.DELETE_BLOCK: if edit.old_content and edit.old_content in file_content.content: new_content = file_content.content.replace(edit.old_content, "", 1) lines = new_content.split("\n") feedbacks.append(f"Deleted block in {edit.file_path}") applied_count += 1 else: feedbacks.append(f"Block content not found in {edit.file_path}") continue else: feedbacks.append(f"Unsupported action type for now: {action.action_type.value}") continue new_content = "\n".join(lines) self.current_files[edit.file_path] = FileContent( path=file_content.path, content=new_content, file_type=file_content.file_type, line_count=len(lines), ) reward += self._check_fix_progress() if applied_count == 0: self.last_action_success = False return max(-0.02, reward - 0.02), "; ".join(feedbacks) or "No edit applied" self.last_action_success = True return max(0.0, reward), "; ".join(feedbacks) def _check_fix_progress(self) -> float: """Check fix progress by comparing current simulator state against initial broken state. Counts how many simulator checks flipped from fail→pass since reset. """ current_snapshot = self._validation_snapshot() fixes_now = 0 for key, initially_broken in self.initial_snapshot.items(): if not initially_broken and current_snapshot.get(key, False): # This check was initially failing and now passes fixes_now += 1 new_fixed = fixes_now - self.issues_fixed if new_fixed > 0: self.issues_fixed = fixes_now return 0.3 * new_fixed return 0.0 def _handle_submit(self) -> Tuple[float, str]: docker_result = self.docker_sim.validate(self.current_files.get("Dockerfile"), self.current_files) workflow_file = self._find_workflow_file() workflow_result = self.workflow_sim.validate(workflow_file, self.current_files) k8s_result = self.k8s_sim.validate(self.current_files) has_k8s = any(fc.file_type == FileType.KUBERNETES for fc in self.current_files.values()) has_docker = "Dockerfile" in self.current_files has_workflow = workflow_file is not None reward = 0.0 parts: List[str] = [] # Determine weight distribution based on what file types are present if has_docker and has_workflow and has_k8s: # Full stack: Docker 20%, Workflow 30%, K8s 30%, fix progress 20% docker_w, wf_w, k8s_w = 0.20, 0.30, 0.30 elif has_docker and has_workflow: docker_w, wf_w, k8s_w = 0.50, 0.50, 0.0 elif has_docker and has_k8s: docker_w, wf_w, k8s_w = 0.40, 0.0, 0.40 elif has_workflow and has_k8s: docker_w, wf_w, k8s_w = 0.0, 0.40, 0.40 elif has_k8s: docker_w, wf_w, k8s_w = 0.0, 0.0, 0.80 elif has_docker: docker_w, wf_w, k8s_w = 0.50, 0.0, 0.0 else: docker_w, wf_w, k8s_w = 0.0, 0.50, 0.0 # Docker validation if has_docker: if docker_result.get("build_success"): reward += docker_w * 0.6 parts.append("Docker build: PASS") else: parts.append(f"Docker build: FAIL - {docker_result.get('error', 'unknown')}") if docker_result.get("run_success"): reward += docker_w * 0.4 parts.append("Docker run: PASS") else: parts.append(f"Docker run: FAIL - {docker_result.get('run_error', 'unknown')}") # Workflow validation if has_workflow: if workflow_result["parse_success"]: reward += wf_w * 0.4 parts.append("Workflow parse: PASS") else: parts.append(f"Workflow parse: FAIL - {workflow_result.get('error', 'unknown')}") if workflow_result["execution_success"]: reward += wf_w * 0.6 parts.append("Workflow execution: PASS") else: parts.append(f"Workflow execution: FAIL - {workflow_result.get('exec_error', 'unknown')}") # Kubernetes validation if has_k8s: if k8s_result["valid"]: reward += k8s_w * 0.4 parts.append("K8s manifests: VALID") else: k8s_errors = k8s_result.get("errors", []) parts.append(f"K8s manifests: INVALID - {'; '.join(k8s_errors[:2])}") pod_status = k8s_result.get("pod_status", "N/A") if pod_status == "Running": reward += k8s_w * 0.3 parts.append(f"K8s pod status: {pod_status}") else: parts.append(f"K8s pod status: {pod_status}") svc_status = k8s_result.get("service_status", "N/A") if "active" in svc_status.lower() or svc_status == "N/A": reward += k8s_w * 0.3 parts.append(f"K8s service: {svc_status}") else: parts.append(f"K8s service: {svc_status}") self.last_action_success = reward >= 0.6 return reward, "; ".join(parts) def _handle_hint_request(self) -> Tuple[float, str]: if self.hints_used >= self.MAX_HINTS: self.last_action_success = False return 0.0, "No hints remaining" self.hints_used += 1 for fix in self.expected_fixes: file_path = fix["file"] if file_path not in self.current_files: continue current_content = self.current_files[file_path].content is_fixed = False if fix["type"] == "contains": is_fixed = fix["expected"] in current_content elif fix["type"] == "not_contains": is_fixed = fix["expected"] not in current_content if not is_fixed: hint = fix.get("hint", f"Check {file_path}") self.last_action_success = True return -0.05, f"Hint ({self.hints_used}/{self.MAX_HINTS}): {hint}" self.last_action_success = True return 0.0, "All known issues appear to be fixed" def get_observation(self) -> Observation: return Observation( task_id=self.current_task_id or "unknown", task_description=self.current_task.DESCRIPTION if self.current_task else "", difficulty=self.current_difficulty or TaskDifficulty.EASY, files=list(self.current_files.values()), error=self.current_error or ErrorInfo( phase=ErrorPhase.DOCKER_BUILD, error_message="No error", ), available_secrets=self.current_task.AVAILABLE_SECRETS if self.current_task else [], runner_os="ubuntu-latest", step_number=self.step_count, max_steps=self.MAX_STEPS, hints_used=self.hints_used, hints_available=self.MAX_HINTS - self.hints_used, last_action_success=self.last_action_success, last_action_feedback=self.last_action_feedback, issues_found=self.issues_fixed, issues_fixed=self.issues_fixed, total_issues=max(1, self.issues_total), )