agentic-triage-amd / agents /executor.py
OGrohit's picture
Remove Done: true/false from executor output
8ba6403
Raw
History Blame Contribute Delete
6.24 kB
import json
import requests
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from amd_client import call_amd_llm
EXECUTOR_SYSTEM_PROMPT = """You are an SRE executing a live incident triage.
You have a strategy and you must take ONE action per step.
You must respond ONLY with a valid JSON object. No explanation, no markdown, no extra text.
JSON format:
{
"action_type": "<one of the valid action types>",
"value": "<appropriate value for the action type>",
"confidence": <float 0.0 to 1.0>,
"reasoning": "<one sentence max>"
}
Valid action_type values and their value formats:
- classify_severity → value: "P1", "P2", or "P3"
- identify_root_cause → value: service name e.g. "payment-service", "user-db"
- escalate → value: team name e.g. "backend-team", "sre-team", "dba-team"
- remediate → value: "command:service" e.g. "restart:payment-service", "kill-query:user-db", "flush-cache:payment-db"
- request_more_logs → value: service name to get more logs from
- resolve → value: "resolved"
- ignore → value: "noise"
Rules:
- Follow the strategy given to you
- Do NOT repeat an action you already took
- After remediating, always resolve
- Never ignore a P1 incident
"""
def _format_step_prompt(observation: dict, strategy: dict, action_history: list, step: int) -> str:
"""Format the current state into a prompt for the executor."""
logs = observation.get("logs", [])
service_state = observation.get("service_state", [])
reward = observation.get("reward", 0)
log_text = "\n".join([
f"[{log.get('level', 'INFO')}] {log.get('service', 'unknown')}: {log.get('message', '')}"
for log in logs[-10:] # last 10 logs
])
service_text = "\n".join([
f"- {svc.get('name', 'unknown')}: status={svc.get('status', 'unknown')}, "
f"error_rate={svc.get('error_rate', 0):.1%}"
for svc in service_state
])
history_text = "\n".join([
f"Step {i+1}: {a.get('action_type')}:{a.get('value')} (reward: {a.get('reward', '?')})"
for i, a in enumerate(action_history)
]) or "No actions taken yet."
return f"""=== CURRENT STEP: {step} ===
STRATEGY:
- Suspected severity: {strategy.get('suspected_severity')}
- Suspected root cause: {strategy.get('suspected_root_cause')}
- Confidence: {strategy.get('confidence')}
ACTIONS ALREADY TAKEN:
{history_text}
CURRENT LOGS:
{log_text}
CURRENT SERVICE HEALTH:
{service_text}
Last reward received: {reward}
What is your NEXT single action? Respond with JSON only:"""
def run_executor(
strategy: dict,
env_url: str = "http://localhost:7860",
task_id: str = "single_crash",
seed: int = 42
) -> dict:
"""
Runs the full step loop for one episode.
Args:
strategy: output from run_planner()
env_url: base URL of the FastAPI environment
task_id: which task to run
seed: episode seed for reproducibility
Returns:
result: dict with episode_id, task_id, actions, final_observation, total_steps
"""
# Reset the environment
reset_resp = requests.post(
f"{env_url}/reset",
json={"task_id": task_id, "seed": seed}
)
reset_resp.raise_for_status()
observation = reset_resp.json()
episode_id = observation.get("info", {}).get("episode_id", "unknown")
max_steps = observation.get("incident_metadata", {}).get("max_steps", 10)
print(f"\n[EXECUTOR] Starting episode {episode_id} — task={task_id}, max_steps={max_steps}")
action_history = []
done = False
step = 0
final_observation = observation
while not done and step < max_steps:
step += 1
print(f"[EXECUTOR] Step {step}/{max_steps}...")
# Ask LLM for next action
prompt = _format_step_prompt(observation, strategy, action_history, step)
response = call_amd_llm(
prompt=prompt,
system_prompt=EXECUTOR_SYSTEM_PROMPT,
temperature=0.1
)
# Parse the action
try:
clean = response.strip().strip("```json").strip("```").strip()
action = json.loads(clean)
except json.JSONDecodeError:
print(f"[EXECUTOR] Warning: Bad JSON on step {step}, using fallback action")
action = {
"action_type": "request_more_logs",
"value": "system",
"confidence": 0.5,
"reasoning": "Fallback due to parse error"
}
print(f"[EXECUTOR] Action: {action.get('action_type')}:{action.get('value')} "
f"(confidence={action.get('confidence', '?')})")
# Send action to environment
step_resp = requests.post(
f"{env_url}/step",
json={
"action_type": action["action_type"],
"value": action["value"],
"confidence": action.get("confidence", 0.7),
"reasoning": action.get("reasoning", "")
}
)
step_resp.raise_for_status()
observation = step_resp.json()
reward = observation.get("reward", 0)
done = observation.get("done", False)
# Track action with its reward
action["reward"] = reward
action_history.append(action)
print(f"[EXECUTOR] Reward: {reward:.3f}")
final_observation = observation
print(f"[EXECUTOR] Episode complete. Steps used: {step}")
return {
"episode_id": episode_id,
"task_id": task_id,
"total_steps": step,
"action_history": action_history,
"final_observation": final_observation,
"cumulative_score": sum(a.get("reward", 0) for a in action_history)
}
if __name__ == "__main__":
from agents.planner import run_planner
import requests
# First get initial observation
reset = requests.post(
"http://localhost:7860/reset",
json={"task_id": "single_crash", "seed": 42}
).json()
# Get strategy from planner
strategy = run_planner(reset)
# Run executor
result = run_executor(strategy, task_id="single_crash", seed=42)
print(f"\nEpisode result: {json.dumps(result, indent=2, default=str)}")