import json import requests import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from amd_client import call_amd_llm EXECUTOR_SYSTEM_PROMPT = """You are an SRE executing a live incident triage. You have a strategy and you must take ONE action per step. You must respond ONLY with a valid JSON object. No explanation, no markdown, no extra text. JSON format: { "action_type": "", "value": "", "confidence": , "reasoning": "" } Valid action_type values and their value formats: - classify_severity → value: "P1", "P2", or "P3" - identify_root_cause → value: service name e.g. "payment-service", "user-db" - escalate → value: team name e.g. "backend-team", "sre-team", "dba-team" - remediate → value: "command:service" e.g. "restart:payment-service", "kill-query:user-db", "flush-cache:payment-db" - request_more_logs → value: service name to get more logs from - resolve → value: "resolved" - ignore → value: "noise" Rules: - Follow the strategy given to you - Do NOT repeat an action you already took - After remediating, always resolve - Never ignore a P1 incident """ def _format_step_prompt(observation: dict, strategy: dict, action_history: list, step: int) -> str: """Format the current state into a prompt for the executor.""" logs = observation.get("logs", []) service_state = observation.get("service_state", []) reward = observation.get("reward", 0) log_text = "\n".join([ f"[{log.get('level', 'INFO')}] {log.get('service', 'unknown')}: {log.get('message', '')}" for log in logs[-10:] # last 10 logs ]) service_text = "\n".join([ f"- {svc.get('name', 'unknown')}: status={svc.get('status', 'unknown')}, " f"error_rate={svc.get('error_rate', 0):.1%}" for svc in service_state ]) history_text = "\n".join([ f"Step {i+1}: {a.get('action_type')}:{a.get('value')} (reward: {a.get('reward', '?')})" for i, a in enumerate(action_history) ]) or "No actions taken yet." return f"""=== CURRENT STEP: {step} === STRATEGY: - Suspected severity: {strategy.get('suspected_severity')} - Suspected root cause: {strategy.get('suspected_root_cause')} - Confidence: {strategy.get('confidence')} ACTIONS ALREADY TAKEN: {history_text} CURRENT LOGS: {log_text} CURRENT SERVICE HEALTH: {service_text} Last reward received: {reward} What is your NEXT single action? Respond with JSON only:""" def run_executor( strategy: dict, env_url: str = "http://localhost:7860", task_id: str = "single_crash", seed: int = 42 ) -> dict: """ Runs the full step loop for one episode. Args: strategy: output from run_planner() env_url: base URL of the FastAPI environment task_id: which task to run seed: episode seed for reproducibility Returns: result: dict with episode_id, task_id, actions, final_observation, total_steps """ # Reset the environment reset_resp = requests.post( f"{env_url}/reset", json={"task_id": task_id, "seed": seed} ) reset_resp.raise_for_status() observation = reset_resp.json() episode_id = observation.get("info", {}).get("episode_id", "unknown") max_steps = observation.get("incident_metadata", {}).get("max_steps", 10) print(f"\n[EXECUTOR] Starting episode {episode_id} — task={task_id}, max_steps={max_steps}") action_history = [] done = False step = 0 final_observation = observation while not done and step < max_steps: step += 1 print(f"[EXECUTOR] Step {step}/{max_steps}...") # Ask LLM for next action prompt = _format_step_prompt(observation, strategy, action_history, step) response = call_amd_llm( prompt=prompt, system_prompt=EXECUTOR_SYSTEM_PROMPT, temperature=0.1 ) # Parse the action try: clean = response.strip().strip("```json").strip("```").strip() action = json.loads(clean) except json.JSONDecodeError: print(f"[EXECUTOR] Warning: Bad JSON on step {step}, using fallback action") action = { "action_type": "request_more_logs", "value": "system", "confidence": 0.5, "reasoning": "Fallback due to parse error" } print(f"[EXECUTOR] Action: {action.get('action_type')}:{action.get('value')} " f"(confidence={action.get('confidence', '?')})") # Send action to environment step_resp = requests.post( f"{env_url}/step", json={ "action_type": action["action_type"], "value": action["value"], "confidence": action.get("confidence", 0.7), "reasoning": action.get("reasoning", "") } ) step_resp.raise_for_status() observation = step_resp.json() reward = observation.get("reward", 0) done = observation.get("done", False) # Track action with its reward action["reward"] = reward action_history.append(action) print(f"[EXECUTOR] Reward: {reward:.3f}") final_observation = observation print(f"[EXECUTOR] Episode complete. Steps used: {step}") return { "episode_id": episode_id, "task_id": task_id, "total_steps": step, "action_history": action_history, "final_observation": final_observation, "cumulative_score": sum(a.get("reward", 0) for a in action_history) } if __name__ == "__main__": from agents.planner import run_planner import requests # First get initial observation reset = requests.post( "http://localhost:7860/reset", json={"task_id": "single_crash", "seed": 42} ).json() # Get strategy from planner strategy = run_planner(reset) # Run executor result = run_executor(strategy, task_id="single_crash", seed=42) print(f"\nEpisode result: {json.dumps(result, indent=2, default=str)}")