CI_CD_Doctor / server /environment.py
samrat-rm's picture
Upload folder using huggingface_hub
e365f21 verified
"""
Core RL environment for the CI/CD Doctor.
Ties together generator, stage_runner, and parser into a step/reset/state loop.
"""
import uuid
from models import PipelineAction, PipelineObservation, PipelineState
from core.scenarios.generator import generate_easy_scenario, generate_medium_scenario, generate_hard_scenario
from core.pipeline.stage_runner import run_pipeline
from core.validation.parser import parse_command
from core.grading.grader import grade as grade_state, balance_score, StepContext, current_milestone_level
MAX_STEPS_BY_TASK = {"easy": 10, "medium": 15, "hard": 25}
IDEAL_STEPS_BY_TASK = {"easy": 3, "medium": 6, "hard": 10}
DEFAULT_MAX_STEPS = 15
DEFAULT_IDEAL_STEPS = 6
MAX_DIAGNOSE_PER_EPISODE = 2
DIAGNOSIS_KEYWORD_THRESHOLD = 2
class PipelineEnvironment:
def reset(self, task: str = "easy", seed: int = 42) -> PipelineObservation:
if task == "medium":
scenario = generate_medium_scenario(seed)
elif task == "hard":
scenario = generate_hard_scenario(seed)
else:
scenario = generate_easy_scenario(seed)
self._filesystem = scenario["filesystem"]
self._answer_key = scenario["answer_key"]
self._task = task
self._max_steps = MAX_STEPS_BY_TASK.get(task, DEFAULT_MAX_STEPS)
self._ideal_steps = IDEAL_STEPS_BY_TASK.get(task, DEFAULT_IDEAL_STEPS)
self._step_count = 0
self._done = False
self._total_reward = 0.0
self._pipeline_status = "not_run"
self._episode_id = str(uuid.uuid4())
self._last_logs: dict = {}
self._milestones: set[str] = set()
self._files_read: set[str] = set()
self._fs_snapshot_at_last_run: str | None = None # None = no run yet
self._pipeline_runs_since_last_edit: int = 0
self._last_score = grade_state(self.state())
self._prev_milestone_level = 0
self._edits_per_file: dict[str, int] = {}
self._files_edited_without_reading: set[str] = set()
self._diagnose_count: int = 0
files = ", ".join(sorted(self._filesystem.keys()))
return PipelineObservation(
stdout=(
"Pipeline environment ready. The pipeline is failing. "
f"Files: {files}. Investigate and fix it."
),
exit_code=0,
pipeline_status=self._pipeline_status,
steps_remaining=self._max_steps,
done=False,
reward=0.0,
)
def step(self, action: PipelineAction) -> PipelineObservation:
if self._done:
raise RuntimeError("Episode is done. Call reset() first.")
self._step_count += 1
steps_remaining = self._max_steps - self._step_count
cmd = parse_command(action.command)
stdout = ""
exit_code = 0
diagnosis_correct = False
if cmd.type == "cat":
content = self._filesystem.get(cmd.filename)
if content is None:
stdout = f"cat: {cmd.filename}: No such file or directory"
exit_code = 1
else:
stdout = content
elif cmd.type == "echo_append":
if cmd.filename in self._filesystem:
self._filesystem[cmd.filename] += cmd.content + "\n"
stdout = ""
else:
stdout = f"bash: {cmd.filename}: No such file or directory"
exit_code = 1
elif cmd.type == "sed":
if cmd.filename in self._filesystem:
pattern = cmd.pattern.replace("\\n", "\n").replace("\\t", "\t")
replacement = cmd.replacement.replace("\\n", "\n").replace("\\t", "\t")
self._filesystem[cmd.filename] = self._filesystem[cmd.filename].replace(
pattern, replacement
)
stdout = ""
else:
stdout = f"sed: {cmd.filename}: No such file or directory"
exit_code = 1
elif cmd.type == "pipeline_run":
result = run_pipeline(self._filesystem, task=self._task)
self._last_logs["last"] = result["logs"]
self._filesystem["logs/install.log"] = result["logs"]
self._pipeline_status = "passed" if result["exit_code"] == 0 else "failed"
stdout = result["logs"]
exit_code = result["exit_code"]
elif cmd.type == "pipeline_logs":
if cmd.stage:
stdout = self._last_logs.get(
cmd.stage,
self._last_logs.get("last", f"No logs for stage '{cmd.stage}' yet. Run pipeline first."),
)
else:
stdout = self._last_logs.get("last", "No pipeline runs yet.")
elif cmd.type == "pipeline_status":
stdout = f"Pipeline status: {self._pipeline_status}"
elif cmd.type == "diagnose":
stdout, diagnosis_correct = self._handle_diagnose(cmd.diagnosis or "")
else:
stdout = f"Command not recognized: {action.command}"
exit_code = 1
if cmd.type in ("echo_append", "sed") and cmd.filename and exit_code == 0:
if cmd.filename not in self._files_read:
self._files_edited_without_reading.add(cmd.filename)
self._edits_per_file[cmd.filename] = self._edits_per_file.get(cmd.filename, 0) + 1
cross_referenced = self._check_cross_referenced()
ctx = StepContext(
cmd_type=cmd.type,
filename=getattr(cmd, "filename", None),
files_read=set(self._files_read),
fs_changed_since_last_run=(
self._fs_snapshot_at_last_run is None
or repr(self._filesystem) != self._fs_snapshot_at_last_run
),
step_count=self._step_count,
max_steps=self._max_steps,
ideal_steps=self._ideal_steps,
pipeline_runs_since_last_edit=self._pipeline_runs_since_last_edit,
prev_milestone_level=self._prev_milestone_level,
edits_per_file=dict(self._edits_per_file),
files_edited_without_reading=set(self._files_edited_without_reading),
diagnosis_correct=diagnosis_correct,
cross_referenced=cross_referenced,
)
self._update_milestones(cmd)
current_score = grade_state(self.state())
grade_delta = round(current_score - self._last_score, 2)
self._last_score = current_score
shaped = balance_score(self.state(), ctx)
reward = round(grade_delta + shaped, 2)
self._prev_milestone_level = current_milestone_level(self.state())
self._total_reward += reward
if cmd.type == "cat" and exit_code == 0 and cmd.filename:
self._files_read.add(cmd.filename)
if cmd.type in ("echo_append", "sed") and exit_code == 0:
self._pipeline_runs_since_last_edit = 0
if cmd.type == "pipeline_run":
self._pipeline_runs_since_last_edit += 1
self._fs_snapshot_at_last_run = repr(self._filesystem)
self._edits_per_file.clear()
if self._pipeline_status == "passed" or steps_remaining <= 0:
self._done = True
return PipelineObservation(
stdout=stdout,
exit_code=exit_code,
pipeline_status=self._pipeline_status,
steps_remaining=max(0, steps_remaining),
done=self._done,
reward=reward,
)
def state(self) -> PipelineState:
return PipelineState(
episode_id=self._episode_id,
task=self._task,
filesystem=dict(self._filesystem),
pipeline_status=self._pipeline_status,
step_count=self._step_count,
done=self._done,
total_reward=self._total_reward,
answer_key=self._answer_key,
milestones=sorted(self._milestones),
)
def _handle_diagnose(self, diagnosis: str) -> tuple[str, bool]:
"""
Handle the diagnose command. Compare diagnosis text against
answer_key diagnosis_keywords. Returns (stdout, is_correct).
"""
if self._diagnose_count >= MAX_DIAGNOSE_PER_EPISODE:
return "Diagnosis limit reached (max 2 per episode).", False
self._diagnose_count += 1
diagnosis_lower = diagnosis.lower()
fixes = self._answer_key.get("fixes", {})
for fix_desc in fixes.values():
if not isinstance(fix_desc, dict):
continue
keywords = fix_desc.get("diagnosis_keywords", [])
matches = sum(1 for kw in keywords if kw.lower() in diagnosis_lower)
if matches >= DIAGNOSIS_KEYWORD_THRESHOLD:
return "Diagnosis recorded.", True
return "Diagnosis recorded.", False
def _check_cross_referenced(self) -> bool:
"""
Check if the agent has read at least 2 diagnostic files,
indicating cross-file reasoning.
"""
diagnostic_files = set(self._answer_key.get("diagnostic_files", []))
read_diagnostic = self._files_read & diagnostic_files
return len(read_diagnostic) >= 2
def _update_milestones(self, cmd) -> None:
"""Mark reward tiers unlocked by this step. Each tier fires once."""
if cmd.type == "pipeline_run":
self._milestones.add("investigated")
is_log_read = cmd.type == "pipeline_logs" or (
cmd.type == "cat" and cmd.filename == "logs/install.log"
)
if is_log_read and self._pipeline_status == "failed":
self._milestones.add("logs_read")
fix_files = set(self._answer_key.get("fixes", {}).keys())
if cmd.type == "cat" and cmd.filename in fix_files:
self._milestones.add("correct_file_located")
diagnostic_files = set(self._answer_key.get("diagnostic_files", []))
if cmd.type == "cat" and cmd.filename in diagnostic_files:
self._milestones.add("diagnosed")
if self._check_cross_referenced():
self._milestones.add("cross_referenced")
def _grade(self) -> tuple[float, bool]:
"""
Delegate to the unified grader. Same rule for every task difficulty.
Episode ends only when the pipeline has passed.
"""
score = grade_state(self.state())
done = self._pipeline_status == "passed"
return score, done