Krishna1107's picture
fixed inference
2794920
"""Core environment logic."""
import copy
import random
from typing import Any, Dict, List, Optional, Tuple
from server.models import (
Action,
ActionType,
ErrorInfo,
ErrorPhase,
FileContent,
FileType,
Observation,
TaskDifficulty,
)
from server.simulators.docker_simulator import DockerSimulator
from server.simulators.k8s_simulator import KubernetesSimulator
from server.simulators.workflow_simulator import WorkflowSimulator
from server.tasks.task_registry import TASK_REGISTRY, get_task
class CloudNativeDebugEnvironment:
MAX_STEPS = 10
MAX_HINTS = 3
def _resolve_task_id(self, task_id: Optional[object]) -> Optional[str]:
"""Resolve task identifiers from either canonical string IDs or numeric indices.
Supports:
- canonical ids (e.g. "dockerfile_syntax")
- numeric values (1-based preferred, 0-based tolerated)
- numeric strings ("1", "0")
"""
if task_id is None:
return None
keys = list(TASK_REGISTRY.keys())
if isinstance(task_id, int):
if 1 <= task_id <= len(keys):
return keys[task_id - 1]
if 0 <= task_id < len(keys):
return keys[task_id]
return str(task_id)
if isinstance(task_id, str):
raw = task_id.strip()
if raw in TASK_REGISTRY:
return raw
if raw.isdigit():
numeric = int(raw)
if 1 <= numeric <= len(keys):
return keys[numeric - 1]
if 0 <= numeric < len(keys):
return keys[numeric]
return raw
lowered = raw.lower()
for key in keys:
if key.lower() == lowered:
return key
return raw
return str(task_id)
def _find_workflow_file(self) -> Optional[FileContent]:
"""Return the first workflow file found in current_files."""
for path, fc in self.current_files.items():
if path.startswith(".github/workflows/") and path.endswith(".yml"):
return fc
return None
def _validation_snapshot(self) -> Dict[str, bool]:
"""Return a detailed snapshot of all 7 simulator checks."""
docker_result = self.docker_sim.validate(self.current_files.get("Dockerfile"), self.current_files)
workflow_file = self._find_workflow_file()
workflow_result = self.workflow_sim.validate(workflow_file, self.current_files)
k8s_result = self.k8s_sim.validate(self.current_files)
has_docker = "Dockerfile" in self.current_files
has_workflow = workflow_file is not None
has_k8s = any(fc.file_type == FileType.KUBERNETES for fc in self.current_files.values())
snapshot: Dict[str, bool] = {}
if has_docker:
snapshot["docker_build_valid"] = bool(docker_result.get("build_success", False))
snapshot["docker_run_valid"] = bool(docker_result.get("run_success", False))
if has_workflow:
snapshot["workflow_parse_valid"] = bool(workflow_result.get("parse_success", False))
snapshot["workflow_exec_valid"] = bool(workflow_result.get("execution_success", False))
if has_k8s:
snapshot["k8s_valid"] = bool(k8s_result.get("valid", True))
snapshot["k8s_pod_running"] = k8s_result.get("pod_status", "N/A") == "Running"
svc = k8s_result.get("service_status", "N/A")
snapshot["k8s_service_active"] = "active" in svc.lower() or svc == "N/A"
return snapshot
def __init__(self):
self.docker_sim = DockerSimulator()
self.workflow_sim = WorkflowSimulator()
self.k8s_sim = KubernetesSimulator()
self.current_task_id: Optional[str] = None
self.current_scenario_id: Optional[str] = None
self.current_difficulty: Optional[TaskDifficulty] = None
self.current_task = None
self.original_files: Dict[str, FileContent] = {}
self.current_files: Dict[str, FileContent] = {}
self.expected_fixes: List[Dict[str, Any]] = []
self.current_error: Optional[ErrorInfo] = None
self.issues_total = 0
self.issues_fixed = 0
self.step_count = 0
self.episode_reward = 0.0
self.done = False
self.hints_used = 0
self.trajectory: List[Dict[str, Any]] = []
self.last_action_success: Optional[bool] = None
self.last_action_feedback: Optional[str] = None
def reset(self, task_id: Optional[str] = None, scenario_id: Optional[str] = None, seed: Optional[int] = None):
if seed is not None:
random.seed(seed)
task_id = self._resolve_task_id(task_id)
if task_id is None:
task_id = random.choice(list(TASK_REGISTRY.keys()))
if task_id not in TASK_REGISTRY:
raise ValueError(f"Unknown task: {task_id}")
self.current_task_id = task_id
self.current_task = get_task(task_id)
self.current_difficulty = self.current_task.DIFFICULTY
scenario = self.current_task.load_scenario(scenario_id)
self.current_scenario_id = scenario["id"]
self.original_files = {
f["path"]: FileContent(
path=f["path"],
content=f["content"],
file_type=FileType(f["type"]),
line_count=f["content"].count("\n") + 1,
)
for f in scenario["files"]
}
self.current_files = copy.deepcopy(self.original_files)
self.current_error = ErrorInfo(
phase=ErrorPhase(scenario["error"]["phase"]),
error_message=scenario["error"]["message"],
exit_code=scenario["error"].get("exit_code"),
failed_step=scenario["error"].get("failed_step"),
line_hint=scenario["error"].get("line_hint"),
)
self.expected_fixes = scenario["expected_fixes"]
# Snapshot the initial broken state from simulators
self.initial_snapshot = self._validation_snapshot()
# Count how many checks are initially failing — that's our issues_total
self.issues_total = sum(1 for v in self.initial_snapshot.values() if not v)
# Ensure at least 1 issue (the scenario is supposed to be broken)
self.issues_total = max(1, self.issues_total)
self.issues_fixed = 0
self.step_count = 0
self.episode_reward = 0.0
self.done = False
self.hints_used = 0
self.trajectory = []
self.last_action_success = None
self.last_action_feedback = None
return self.get_observation()
def step(self, action: Action) -> Tuple[Observation, float, bool, Dict[str, Any]]:
if self.done:
return self.get_observation(), 0.0, True, {"error": "Episode already done"}
self.step_count += 1
if action.action_type == ActionType.REQUEST_HINT:
reward, feedback = self._handle_hint_request()
elif action.action_type == ActionType.SUBMIT:
reward, feedback = self._handle_submit()
else:
reward, feedback = self._handle_edit(action)
self.last_action_feedback = feedback
self.episode_reward += reward
info: Dict[str, Any] = {}
if self.step_count >= self.MAX_STEPS:
self.done = True
info["termination_reason"] = "max_steps"
elif action.action_type == ActionType.SUBMIT:
self.done = True
info["termination_reason"] = "submitted"
elif self.issues_fixed == self.issues_total:
self.done = True
info["termination_reason"] = "all_fixed"
info["issues_fixed"] = self.issues_fixed
info["issues_total"] = self.issues_total
self.trajectory.append(
{"step": self.step_count, "action": action.model_dump(), "reward": reward, "done": self.done, "info": info}
)
return self.get_observation(), reward, self.done, info
def _handle_edit(self, action: Action) -> Tuple[float, str]:
if not action.edits:
self.last_action_success = False
return 0.0, "No edits provided"
reward = 0.0
feedbacks: List[str] = []
applied_count = 0
for edit in action.edits:
if edit.file_path not in self.current_files:
# Allow creating new files (needed for K8s ConfigMap scenarios etc.)
if action.action_type == ActionType.EDIT_FILE and edit.new_content:
ft = FileType.OTHER
if edit.file_path.startswith("k8s/") or edit.file_path.endswith(".yaml") or edit.file_path.endswith(".yml"):
ft = FileType.KUBERNETES
self.current_files[edit.file_path] = FileContent(
path=edit.file_path,
content=edit.new_content,
file_type=ft,
line_count=edit.new_content.count("\n") + 1,
)
feedbacks.append(f"Created new file: {edit.file_path}")
applied_count += 1
continue
feedbacks.append(f"File not found: {edit.file_path}")
continue
file_content = self.current_files[edit.file_path]
lines = file_content.content.split("\n")
if action.action_type == ActionType.REPLACE_LINE:
if edit.line_number and 1 <= edit.line_number <= len(lines):
lines[edit.line_number - 1] = edit.new_content or ""
feedbacks.append(f"Replaced line {edit.line_number} in {edit.file_path}")
applied_count += 1
else:
feedbacks.append(f"Invalid line number: {edit.line_number}")
continue
elif action.action_type == ActionType.ADD_LINE:
insert_at = edit.line_number - 1 if edit.line_number else len(lines)
insert_at = max(0, min(insert_at, len(lines)))
lines.insert(insert_at, edit.new_content or "")
feedbacks.append(f"Added line at {insert_at + 1} in {edit.file_path}")
applied_count += 1
elif action.action_type == ActionType.DELETE_LINE:
if edit.line_number and 1 <= edit.line_number <= len(lines):
del lines[edit.line_number - 1]
feedbacks.append(f"Deleted line {edit.line_number} in {edit.file_path}")
applied_count += 1
else:
feedbacks.append(f"Invalid line number: {edit.line_number}")
continue
elif action.action_type == ActionType.EDIT_FILE:
if edit.old_content and edit.old_content in file_content.content:
new_content = file_content.content.replace(edit.old_content, edit.new_content or "", 1)
lines = new_content.split("\n")
feedbacks.append(f"Edited content in {edit.file_path}")
applied_count += 1
else:
feedbacks.append(f"Content not found in {edit.file_path}")
continue
elif action.action_type == ActionType.ADD_BLOCK:
insert_at = edit.line_number - 1 if edit.line_number else len(lines)
insert_at = max(0, min(insert_at, len(lines)))
block_lines = (edit.new_content or "").split("\n")
lines[insert_at:insert_at] = block_lines
feedbacks.append(f"Added block at {insert_at + 1} in {edit.file_path}")
applied_count += 1
elif action.action_type == ActionType.DELETE_BLOCK:
if edit.old_content and edit.old_content in file_content.content:
new_content = file_content.content.replace(edit.old_content, "", 1)
lines = new_content.split("\n")
feedbacks.append(f"Deleted block in {edit.file_path}")
applied_count += 1
else:
feedbacks.append(f"Block content not found in {edit.file_path}")
continue
else:
feedbacks.append(f"Unsupported action type for now: {action.action_type.value}")
continue
new_content = "\n".join(lines)
self.current_files[edit.file_path] = FileContent(
path=file_content.path,
content=new_content,
file_type=file_content.file_type,
line_count=len(lines),
)
reward += self._check_fix_progress()
if applied_count == 0:
self.last_action_success = False
return max(-0.02, reward - 0.02), "; ".join(feedbacks) or "No edit applied"
self.last_action_success = True
return max(0.0, reward), "; ".join(feedbacks)
def _check_fix_progress(self) -> float:
"""Check fix progress by comparing current simulator state against initial broken state.
Counts how many simulator checks flipped from fail→pass since reset.
"""
current_snapshot = self._validation_snapshot()
fixes_now = 0
for key, initially_broken in self.initial_snapshot.items():
if not initially_broken and current_snapshot.get(key, False):
# This check was initially failing and now passes
fixes_now += 1
new_fixed = fixes_now - self.issues_fixed
if new_fixed > 0:
self.issues_fixed = fixes_now
return 0.3 * new_fixed
return 0.0
def _handle_submit(self) -> Tuple[float, str]:
docker_result = self.docker_sim.validate(self.current_files.get("Dockerfile"), self.current_files)
workflow_file = self._find_workflow_file()
workflow_result = self.workflow_sim.validate(workflow_file, self.current_files)
k8s_result = self.k8s_sim.validate(self.current_files)
has_k8s = any(fc.file_type == FileType.KUBERNETES for fc in self.current_files.values())
has_docker = "Dockerfile" in self.current_files
has_workflow = workflow_file is not None
reward = 0.0
parts: List[str] = []
# Determine weight distribution based on what file types are present
if has_docker and has_workflow and has_k8s:
# Full stack: Docker 20%, Workflow 30%, K8s 30%, fix progress 20%
docker_w, wf_w, k8s_w = 0.20, 0.30, 0.30
elif has_docker and has_workflow:
docker_w, wf_w, k8s_w = 0.50, 0.50, 0.0
elif has_docker and has_k8s:
docker_w, wf_w, k8s_w = 0.40, 0.0, 0.40
elif has_workflow and has_k8s:
docker_w, wf_w, k8s_w = 0.0, 0.40, 0.40
elif has_k8s:
docker_w, wf_w, k8s_w = 0.0, 0.0, 0.80
elif has_docker:
docker_w, wf_w, k8s_w = 0.50, 0.0, 0.0
else:
docker_w, wf_w, k8s_w = 0.0, 0.50, 0.0
# Docker validation
if has_docker:
if docker_result.get("build_success"):
reward += docker_w * 0.6
parts.append("Docker build: PASS")
else:
parts.append(f"Docker build: FAIL - {docker_result.get('error', 'unknown')}")
if docker_result.get("run_success"):
reward += docker_w * 0.4
parts.append("Docker run: PASS")
else:
parts.append(f"Docker run: FAIL - {docker_result.get('run_error', 'unknown')}")
# Workflow validation
if has_workflow:
if workflow_result["parse_success"]:
reward += wf_w * 0.4
parts.append("Workflow parse: PASS")
else:
parts.append(f"Workflow parse: FAIL - {workflow_result.get('error', 'unknown')}")
if workflow_result["execution_success"]:
reward += wf_w * 0.6
parts.append("Workflow execution: PASS")
else:
parts.append(f"Workflow execution: FAIL - {workflow_result.get('exec_error', 'unknown')}")
# Kubernetes validation
if has_k8s:
if k8s_result["valid"]:
reward += k8s_w * 0.4
parts.append("K8s manifests: VALID")
else:
k8s_errors = k8s_result.get("errors", [])
parts.append(f"K8s manifests: INVALID - {'; '.join(k8s_errors[:2])}")
pod_status = k8s_result.get("pod_status", "N/A")
if pod_status == "Running":
reward += k8s_w * 0.3
parts.append(f"K8s pod status: {pod_status}")
else:
parts.append(f"K8s pod status: {pod_status}")
svc_status = k8s_result.get("service_status", "N/A")
if "active" in svc_status.lower() or svc_status == "N/A":
reward += k8s_w * 0.3
parts.append(f"K8s service: {svc_status}")
else:
parts.append(f"K8s service: {svc_status}")
self.last_action_success = reward >= 0.6
return reward, "; ".join(parts)
def _handle_hint_request(self) -> Tuple[float, str]:
if self.hints_used >= self.MAX_HINTS:
self.last_action_success = False
return 0.0, "No hints remaining"
self.hints_used += 1
for fix in self.expected_fixes:
file_path = fix["file"]
if file_path not in self.current_files:
continue
current_content = self.current_files[file_path].content
is_fixed = False
if fix["type"] == "contains":
is_fixed = fix["expected"] in current_content
elif fix["type"] == "not_contains":
is_fixed = fix["expected"] not in current_content
if not is_fixed:
hint = fix.get("hint", f"Check {file_path}")
self.last_action_success = True
return -0.05, f"Hint ({self.hints_used}/{self.MAX_HINTS}): {hint}"
self.last_action_success = True
return 0.0, "All known issues appear to be fixed"
def get_observation(self) -> Observation:
return Observation(
task_id=self.current_task_id or "unknown",
task_description=self.current_task.DESCRIPTION if self.current_task else "",
difficulty=self.current_difficulty or TaskDifficulty.EASY,
files=list(self.current_files.values()),
error=self.current_error or ErrorInfo(
phase=ErrorPhase.DOCKER_BUILD,
error_message="No error",
),
available_secrets=self.current_task.AVAILABLE_SECRETS if self.current_task else [],
runner_os="ubuntu-latest",
step_number=self.step_count,
max_steps=self.MAX_STEPS,
hints_used=self.hints_used,
hints_available=self.MAX_HINTS - self.hints_used,
last_action_success=self.last_action_success,
last_action_feedback=self.last_action_feedback,
issues_found=self.issues_fixed,
issues_fixed=self.issues_fixed,
total_issues=max(1, self.issues_total),
)