| from __future__ import annotations |
|
|
| import argparse |
| import asyncio |
| import os |
| from typing import Any |
|
|
| from openai import OpenAI |
|
|
| from env.environment import CICDDebuggerEnvironment, REQUIRED_TOOLS |
| from inference.metrics import EpisodeMetrics |
| from inference.model_wrapper import ModelWrapper, score_action_candidate |
| from inference.prompts import heuristic_action |
| from inference.visualize import save_metrics_json, save_reward_curve, save_success_rate_history |
|
|
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| API_KEY = OPENAI_API_KEY or HF_TOKEN |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") |
| DEFAULT_TASK_ID = os.getenv("MY_ENV_V4_TASK", "easy-command-typo") |
| DEFAULT_BENCHMARK = os.getenv("MY_ENV_V4_BENCHMARK", "cicd_debugger_env") |
|
|
| MAX_STEPS_DEFAULT = int(os.getenv("MAX_STEPS", "8")) |
| TEMPERATURE = float(os.getenv("TEMPERATURE", "0.2")) |
| MAX_TOKENS = int(os.getenv("MAX_TOKENS", "120")) |
| OFFLINE_INFERENCE = os.getenv("OFFLINE_INFERENCE", "0") == "1" |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.1")) |
|
|
|
|
| def log_start(task: str, env_name: str, model: str) -> None: |
| print(f"[START] task={_single_line(task)} env={_single_line(env_name)} model={_single_line(model)}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: |
| done_val = str(done).lower() |
| error_val = _single_line(error) if error else "null" |
| action_val = _single_line(action) |
| print(f"[STEP] step={step} action={action_val} reward={reward:.2f} done={done_val} error={error_val}", flush=True) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: |
| rewards_str = ",".join(f"{value:.2f}" for value in rewards) |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) |
|
|
|
|
| def _single_line(value: Any) -> str: |
| return " ".join(str(value).replace("\n", " ").replace("\r", " ").split()) |
|
|
|
|
| def _is_hacking_action(action_text: str) -> bool: |
| value = (action_text or "").lower() |
| patterns = ( |
| "if: false", |
| "when: never", |
| "echo \"tests passed\"", |
| "echo 'tests passed'", |
| "exit 0", |
| "force success", |
| "status: success", |
| ) |
| return any(token in value for token in patterns) |
|
|
|
|
| def _extract_error(info: dict[str, Any] | None) -> str | None: |
| if not info: |
| return None |
| error = info.get("error") |
| return str(error) if error else None |
|
|
|
|
| def _extract_observation_fields(observation: dict[str, Any]) -> tuple[str, str, list[str]]: |
| config_text = str(observation.get("config") or "") |
| error_message = str(observation.get("error_message") or "") |
| tools = [str(item) for item in (observation.get("available_tools") or REQUIRED_TOOLS)] |
| return config_text, error_message, tools |
|
|
|
|
| def _tool_from_action(action_text: str) -> str: |
| return str(action_text or "").split(":", 1)[0].strip().lower() |
|
|
|
|
| def _is_action_allowed(action_text: str, available_tools: list[str]) -> bool: |
| return _tool_from_action(action_text) in {tool.lower() for tool in available_tools} |
|
|
|
|
| def _normalize_action(action_text: str, available_tools: list[str], fallback: str) -> str: |
| action = str(action_text or "").strip() |
| if not action: |
| return fallback |
|
|
| aliases = { |
| "run_stage": "run_pipeline_stage", |
| "validate": "validate_fix", |
| "submit": "submit_solution", |
| "submit_fix": "submit_solution", |
| } |
| tool = _tool_from_action(action) |
| normalized_tool = aliases.get(tool, tool) |
| if normalized_tool != tool: |
| suffix = action.split(":", 1)[1].strip() if ":" in action else "" |
| action = f"{normalized_tool}: {suffix}" if suffix else normalized_tool |
|
|
| if _is_action_allowed(action, available_tools): |
| return action |
|
|
| return fallback |
|
|
|
|
| def _select_action( |
| model_wrapper: ModelWrapper, |
| step: int, |
| config_text: str, |
| error_message: str, |
| history: list[str], |
| available_actions: list[str], |
| policy_mode: str, |
| trajectories: int, |
| ) -> str: |
| mode = (policy_mode or "imp").lower() |
| fallback = heuristic_action(config_text, error_message, available_actions, history) |
|
|
| if mode == "sft": |
| return _normalize_action(fallback, available_actions, fallback) |
|
|
| if mode == "direct": |
| action = model_wrapper.generate_action( |
| step=step, |
| config_text=config_text, |
| error_message=error_message, |
| history=history, |
| available_actions=available_actions, |
| ) |
| return _normalize_action(action, available_actions, fallback) |
|
|
| candidates = model_wrapper.generate_candidates( |
| step=step, |
| config_text=config_text, |
| error_message=error_message, |
| history=history, |
| count=max(1, int(trajectories)), |
| available_actions=available_actions, |
| ) |
|
|
| if not candidates: |
| return _normalize_action(fallback, available_actions, fallback) |
|
|
| observation_text = f"{config_text}\n{error_message}" |
| best = max(candidates, key=lambda item: score_action_candidate(observation_text, item, _is_hacking_action)) |
| return _normalize_action(best, available_actions, fallback) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Run CI/CD debugger inference loop") |
| parser.add_argument("--max-steps", type=int, default=MAX_STEPS_DEFAULT) |
| parser.add_argument("--task", default=DEFAULT_TASK_ID) |
| parser.add_argument("--benchmark", default=DEFAULT_BENCHMARK) |
| parser.add_argument("--difficulty", choices=["easy", "medium", "hard"], default=None) |
| parser.add_argument("--offline", action="store_true", default=OFFLINE_INFERENCE) |
| parser.add_argument("--force-local-env", action="store_true", default=False) |
| parser.add_argument("--policy-mode", choices=["sft", "imp", "direct"], default="imp") |
| parser.add_argument("--trajectories", type=int, default=3) |
| return parser.parse_args() |
|
|
|
|
| async def run_episode(args: argparse.Namespace) -> int: |
| history: list[str] = [] |
| steps_taken = 0 |
| success = False |
| episode_completed_cleanly = False |
| metrics = EpisodeMetrics() |
|
|
| env = CICDDebuggerEnvironment(max_steps=max(1, int(args.max_steps))) |
|
|
| offline_mode = bool(args.offline or not API_KEY) |
| client: OpenAI | None = None |
| if not offline_mode: |
| try: |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) |
| except Exception: |
| client = None |
| offline_mode = True |
|
|
| model_wrapper = ModelWrapper( |
| client=client, |
| model_name=MODEL_NAME, |
| temperature=TEMPERATURE, |
| max_tokens=MAX_TOKENS, |
| offline=offline_mode, |
| ) |
|
|
| log_start(task=str(args.task), env_name=str(args.benchmark), model=MODEL_NAME) |
|
|
| try: |
| observation = await env.reset(task_id=str(args.task), difficulty=args.difficulty) |
|
|
| for step in range(1, max(1, int(args.max_steps)) + 1): |
| config_text, error_message, available_tools = _extract_observation_fields(observation) |
|
|
| action_text = _select_action( |
| model_wrapper=model_wrapper, |
| step=step, |
| config_text=config_text, |
| error_message=error_message, |
| history=history, |
| available_actions=available_tools, |
| policy_mode=str(args.policy_mode), |
| trajectories=max(1, int(args.trajectories)), |
| ) |
|
|
| observation, reward, done, info = await env.step(action_text) |
| step_error = _extract_error(info) |
|
|
| metrics.add_step(action=action_text, reward=float(reward), error=step_error, done=bool(done)) |
| steps_taken = step |
|
|
| log_step(step=step, action=action_text, reward=float(reward), done=bool(done), error=step_error) |
| history.append(f"step={step} action={_single_line(action_text)} reward={float(reward):.2f}") |
|
|
| if done: |
| episode_completed_cleanly = step_error is None and not _is_hacking_action(action_text) |
| break |
|
|
| except Exception as exc: |
| success = False |
| if not metrics.rewards: |
| metrics.add_step(action="system_error", reward=0.0, error=str(exc), done=True) |
| finally: |
| score = max(0.0, min(1.0, float(metrics.average_reward))) |
| success = episode_completed_cleanly and score >= SUCCESS_SCORE_THRESHOLD |
|
|
| try: |
| save_reward_curve(metrics.rewards) |
| save_metrics_json(metrics.summary()) |
| save_success_rate_history([success]) |
| except Exception: |
| pass |
|
|
| try: |
| await env.close() |
| except Exception: |
| pass |
|
|
| log_end(success=success, steps=steps_taken, score=score, rewards=metrics.rewards) |
|
|
| return 0 |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| return asyncio.run(run_episode(args)) |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|