Spaces:
Sleeping
Sleeping
File size: 6,243 Bytes
f61dd2d 8ba6403 f61dd2d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | import json
import requests
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from amd_client import call_amd_llm
EXECUTOR_SYSTEM_PROMPT = """You are an SRE executing a live incident triage.
You have a strategy and you must take ONE action per step.
You must respond ONLY with a valid JSON object. No explanation, no markdown, no extra text.
JSON format:
{
"action_type": "<one of the valid action types>",
"value": "<appropriate value for the action type>",
"confidence": <float 0.0 to 1.0>,
"reasoning": "<one sentence max>"
}
Valid action_type values and their value formats:
- classify_severity β value: "P1", "P2", or "P3"
- identify_root_cause β value: service name e.g. "payment-service", "user-db"
- escalate β value: team name e.g. "backend-team", "sre-team", "dba-team"
- remediate β value: "command:service" e.g. "restart:payment-service", "kill-query:user-db", "flush-cache:payment-db"
- request_more_logs β value: service name to get more logs from
- resolve β value: "resolved"
- ignore β value: "noise"
Rules:
- Follow the strategy given to you
- Do NOT repeat an action you already took
- After remediating, always resolve
- Never ignore a P1 incident
"""
def _format_step_prompt(observation: dict, strategy: dict, action_history: list, step: int) -> str:
"""Format the current state into a prompt for the executor."""
logs = observation.get("logs", [])
service_state = observation.get("service_state", [])
reward = observation.get("reward", 0)
log_text = "\n".join([
f"[{log.get('level', 'INFO')}] {log.get('service', 'unknown')}: {log.get('message', '')}"
for log in logs[-10:] # last 10 logs
])
service_text = "\n".join([
f"- {svc.get('name', 'unknown')}: status={svc.get('status', 'unknown')}, "
f"error_rate={svc.get('error_rate', 0):.1%}"
for svc in service_state
])
history_text = "\n".join([
f"Step {i+1}: {a.get('action_type')}:{a.get('value')} (reward: {a.get('reward', '?')})"
for i, a in enumerate(action_history)
]) or "No actions taken yet."
return f"""=== CURRENT STEP: {step} ===
STRATEGY:
- Suspected severity: {strategy.get('suspected_severity')}
- Suspected root cause: {strategy.get('suspected_root_cause')}
- Confidence: {strategy.get('confidence')}
ACTIONS ALREADY TAKEN:
{history_text}
CURRENT LOGS:
{log_text}
CURRENT SERVICE HEALTH:
{service_text}
Last reward received: {reward}
What is your NEXT single action? Respond with JSON only:"""
def run_executor(
strategy: dict,
env_url: str = "http://localhost:7860",
task_id: str = "single_crash",
seed: int = 42
) -> dict:
"""
Runs the full step loop for one episode.
Args:
strategy: output from run_planner()
env_url: base URL of the FastAPI environment
task_id: which task to run
seed: episode seed for reproducibility
Returns:
result: dict with episode_id, task_id, actions, final_observation, total_steps
"""
# Reset the environment
reset_resp = requests.post(
f"{env_url}/reset",
json={"task_id": task_id, "seed": seed}
)
reset_resp.raise_for_status()
observation = reset_resp.json()
episode_id = observation.get("info", {}).get("episode_id", "unknown")
max_steps = observation.get("incident_metadata", {}).get("max_steps", 10)
print(f"\n[EXECUTOR] Starting episode {episode_id} β task={task_id}, max_steps={max_steps}")
action_history = []
done = False
step = 0
final_observation = observation
while not done and step < max_steps:
step += 1
print(f"[EXECUTOR] Step {step}/{max_steps}...")
# Ask LLM for next action
prompt = _format_step_prompt(observation, strategy, action_history, step)
response = call_amd_llm(
prompt=prompt,
system_prompt=EXECUTOR_SYSTEM_PROMPT,
temperature=0.1
)
# Parse the action
try:
clean = response.strip().strip("```json").strip("```").strip()
action = json.loads(clean)
except json.JSONDecodeError:
print(f"[EXECUTOR] Warning: Bad JSON on step {step}, using fallback action")
action = {
"action_type": "request_more_logs",
"value": "system",
"confidence": 0.5,
"reasoning": "Fallback due to parse error"
}
print(f"[EXECUTOR] Action: {action.get('action_type')}:{action.get('value')} "
f"(confidence={action.get('confidence', '?')})")
# Send action to environment
step_resp = requests.post(
f"{env_url}/step",
json={
"action_type": action["action_type"],
"value": action["value"],
"confidence": action.get("confidence", 0.7),
"reasoning": action.get("reasoning", "")
}
)
step_resp.raise_for_status()
observation = step_resp.json()
reward = observation.get("reward", 0)
done = observation.get("done", False)
# Track action with its reward
action["reward"] = reward
action_history.append(action)
print(f"[EXECUTOR] Reward: {reward:.3f}")
final_observation = observation
print(f"[EXECUTOR] Episode complete. Steps used: {step}")
return {
"episode_id": episode_id,
"task_id": task_id,
"total_steps": step,
"action_history": action_history,
"final_observation": final_observation,
"cumulative_score": sum(a.get("reward", 0) for a in action_history)
}
if __name__ == "__main__":
from agents.planner import run_planner
import requests
# First get initial observation
reset = requests.post(
"http://localhost:7860/reset",
json={"task_id": "single_crash", "seed": 42}
).json()
# Get strategy from planner
strategy = run_planner(reset)
# Run executor
result = run_executor(strategy, task_id="single_crash", seed=42)
print(f"\nEpisode result: {json.dumps(result, indent=2, default=str)}") |