Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import os | |
| from typing import Any | |
| from openai import OpenAI | |
| from env.environment import CICDDebuggerEnvironment, REQUIRED_TOOLS | |
| from inference.metrics import EpisodeMetrics | |
| from inference.model_wrapper import ModelWrapper, score_action_candidate | |
| from inference.prompts import heuristic_action | |
| from inference.visualize import save_metrics_json, save_reward_curve, save_success_rate_history | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| API_KEY = OPENAI_API_KEY or HF_TOKEN | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| DEFAULT_TASK_ID = os.getenv("MY_ENV_V4_TASK", "easy-command-typo") | |
| DEFAULT_BENCHMARK = os.getenv("MY_ENV_V4_BENCHMARK", "cicd_debugger_env") | |
| MAX_STEPS_DEFAULT = int(os.getenv("MAX_STEPS", "8")) | |
| TEMPERATURE = float(os.getenv("TEMPERATURE", "0.2")) | |
| MAX_TOKENS = int(os.getenv("MAX_TOKENS", "120")) | |
| OFFLINE_INFERENCE = os.getenv("OFFLINE_INFERENCE", "0") == "1" | |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.1")) | |
| def log_start(task: str, env_name: str, model: str) -> None: | |
| print(f"[START] task={_single_line(task)} env={_single_line(env_name)} model={_single_line(model)}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: | |
| done_val = str(done).lower() | |
| error_val = _single_line(error) if error else "null" | |
| action_val = _single_line(action) | |
| print(f"[STEP] step={step} action={action_val} reward={reward:.2f} done={done_val} error={error_val}", flush=True) | |
| def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: | |
| rewards_str = ",".join(f"{value:.2f}" for value in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) | |
| def _single_line(value: Any) -> str: | |
| return " ".join(str(value).replace("\n", " ").replace("\r", " ").split()) | |
| def _is_hacking_action(action_text: str) -> bool: | |
| value = (action_text or "").lower() | |
| patterns = ( | |
| "if: false", | |
| "when: never", | |
| "echo \"tests passed\"", | |
| "echo 'tests passed'", | |
| "exit 0", | |
| "force success", | |
| "status: success", | |
| ) | |
| return any(token in value for token in patterns) | |
| def _extract_error(info: dict[str, Any] | None) -> str | None: | |
| if not info: | |
| return None | |
| error = info.get("error") | |
| return str(error) if error else None | |
| def _extract_observation_fields(observation: dict[str, Any]) -> tuple[str, str, list[str]]: | |
| config_text = str(observation.get("config") or "") | |
| error_message = str(observation.get("error_message") or "") | |
| tools = [str(item) for item in (observation.get("available_tools") or REQUIRED_TOOLS)] | |
| return config_text, error_message, tools | |
| def _tool_from_action(action_text: str) -> str: | |
| return str(action_text or "").split(":", 1)[0].strip().lower() | |
| def _is_action_allowed(action_text: str, available_tools: list[str]) -> bool: | |
| return _tool_from_action(action_text) in {tool.lower() for tool in available_tools} | |
| def _normalize_action(action_text: str, available_tools: list[str], fallback: str) -> str: | |
| action = str(action_text or "").strip() | |
| if not action: | |
| return fallback | |
| aliases = { | |
| "run_stage": "run_pipeline_stage", | |
| "validate": "validate_fix", | |
| "submit": "submit_solution", | |
| "submit_fix": "submit_solution", | |
| } | |
| tool = _tool_from_action(action) | |
| normalized_tool = aliases.get(tool, tool) | |
| if normalized_tool != tool: | |
| suffix = action.split(":", 1)[1].strip() if ":" in action else "" | |
| action = f"{normalized_tool}: {suffix}" if suffix else normalized_tool | |
| if _is_action_allowed(action, available_tools): | |
| return action | |
| return fallback | |
| def _select_action( | |
| model_wrapper: ModelWrapper, | |
| step: int, | |
| config_text: str, | |
| error_message: str, | |
| history: list[str], | |
| available_actions: list[str], | |
| policy_mode: str, | |
| trajectories: int, | |
| ) -> str: | |
| mode = (policy_mode or "imp").lower() | |
| fallback = heuristic_action(config_text, error_message, available_actions, history) | |
| if mode == "sft": | |
| return _normalize_action(fallback, available_actions, fallback) | |
| if mode == "direct": | |
| action = model_wrapper.generate_action( | |
| step=step, | |
| config_text=config_text, | |
| error_message=error_message, | |
| history=history, | |
| available_actions=available_actions, | |
| ) | |
| return _normalize_action(action, available_actions, fallback) | |
| candidates = model_wrapper.generate_candidates( | |
| step=step, | |
| config_text=config_text, | |
| error_message=error_message, | |
| history=history, | |
| count=max(1, int(trajectories)), | |
| available_actions=available_actions, | |
| ) | |
| if not candidates: | |
| return _normalize_action(fallback, available_actions, fallback) | |
| observation_text = f"{config_text}\n{error_message}" | |
| best = max(candidates, key=lambda item: score_action_candidate(observation_text, item, _is_hacking_action)) | |
| return _normalize_action(best, available_actions, fallback) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Run CI/CD debugger inference loop") | |
| parser.add_argument("--max-steps", type=int, default=MAX_STEPS_DEFAULT) | |
| parser.add_argument("--task", default=DEFAULT_TASK_ID) | |
| parser.add_argument("--benchmark", default=DEFAULT_BENCHMARK) | |
| parser.add_argument("--difficulty", choices=["easy", "medium", "hard"], default=None) | |
| parser.add_argument("--offline", action="store_true", default=OFFLINE_INFERENCE) | |
| parser.add_argument("--force-local-env", action="store_true", default=False) | |
| parser.add_argument("--policy-mode", choices=["sft", "imp", "direct"], default="imp") | |
| parser.add_argument("--trajectories", type=int, default=3) | |
| return parser.parse_args() | |
| async def run_episode(args: argparse.Namespace) -> int: | |
| history: list[str] = [] | |
| steps_taken = 0 | |
| success = False | |
| episode_completed_cleanly = False | |
| metrics = EpisodeMetrics() | |
| env = CICDDebuggerEnvironment(max_steps=max(1, int(args.max_steps))) | |
| offline_mode = bool(args.offline or not API_KEY) | |
| client: OpenAI | None = None | |
| if not offline_mode: | |
| try: | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| except Exception: | |
| client = None | |
| offline_mode = True | |
| model_wrapper = ModelWrapper( | |
| client=client, | |
| model_name=MODEL_NAME, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| offline=offline_mode, | |
| ) | |
| log_start(task=str(args.task), env_name=str(args.benchmark), model=MODEL_NAME) | |
| try: | |
| observation = await env.reset(task_id=str(args.task), difficulty=args.difficulty) | |
| for step in range(1, max(1, int(args.max_steps)) + 1): | |
| config_text, error_message, available_tools = _extract_observation_fields(observation) | |
| action_text = _select_action( | |
| model_wrapper=model_wrapper, | |
| step=step, | |
| config_text=config_text, | |
| error_message=error_message, | |
| history=history, | |
| available_actions=available_tools, | |
| policy_mode=str(args.policy_mode), | |
| trajectories=max(1, int(args.trajectories)), | |
| ) | |
| observation, reward, done, info = await env.step(action_text) | |
| step_error = _extract_error(info) | |
| metrics.add_step(action=action_text, reward=float(reward), error=step_error, done=bool(done)) | |
| steps_taken = step | |
| log_step(step=step, action=action_text, reward=float(reward), done=bool(done), error=step_error) | |
| history.append(f"step={step} action={_single_line(action_text)} reward={float(reward):.2f}") | |
| if done: | |
| episode_completed_cleanly = step_error is None and not _is_hacking_action(action_text) | |
| break | |
| except Exception as exc: | |
| success = False | |
| if not metrics.rewards: | |
| metrics.add_step(action="system_error", reward=0.0, error=str(exc), done=True) | |
| finally: | |
| score = max(0.0, min(1.0, float(metrics.average_reward))) | |
| success = episode_completed_cleanly and score >= SUCCESS_SCORE_THRESHOLD | |
| try: | |
| save_reward_curve(metrics.rewards) | |
| save_metrics_json(metrics.summary()) | |
| save_success_rate_history([success]) | |
| except Exception: | |
| pass | |
| try: | |
| await env.close() | |
| except Exception: | |
| pass | |
| log_end(success=success, steps=steps_taken, score=score, rewards=metrics.rewards) | |
| return 0 | |
| def main() -> int: | |
| args = parse_args() | |
| return asyncio.run(run_episode(args)) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |