Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| import requests | |
| from openai import OpenAI | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| HF_TOKEN = os.getenv("HF_TOKEN") or OPENAI_API_KEY | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| LOGENV_URL = os.getenv("LOGENV_URL", "http://localhost:7860") | |
| BENCHMARK = "NovaTechIncidentCommand" | |
| SUCCESS_THRESHOLD = 0.70 | |
| client = OpenAI(api_key=HF_TOKEN or "placeholder", base_url=API_BASE_URL) | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={error if error else 'null'}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={max(0.0, min(1.0, score)):.3f} rewards={','.join(f'{r:.2f}' for r in rewards)}", | |
| flush=True, | |
| ) | |
| def api_reset(task_id: str) -> Dict[str, Any]: | |
| response = requests.post(f"{LOGENV_URL}/reset", json={"task_id": task_id}, timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def api_step(payload: Dict[str, Any]) -> Dict[str, Any]: | |
| response = requests.post(f"{LOGENV_URL}/step", json=payload, timeout=60) | |
| response.raise_for_status() | |
| return response.json() | |
| def _allowed_hypotheses(task_id: str) -> List[Dict[str, Any]]: | |
| if task_id == "easy": | |
| return [ | |
| { | |
| "primary_service": "auth-service", | |
| "failure_mode": "resource_exhaustion", | |
| "dependency": "none", | |
| "customer_impact": "login_failures", | |
| "confidence": 0.88, | |
| }, | |
| { | |
| "primary_service": "user-service", | |
| "failure_mode": "traffic_abuse", | |
| "dependency": "ldap-directory", | |
| "customer_impact": "login_failures", | |
| "confidence": 0.52, | |
| }, | |
| ] | |
| if task_id == "medium": | |
| return [ | |
| { | |
| "primary_service": "payment-api", | |
| "failure_mode": "dependency_outage", | |
| "dependency": "payment-gateway", | |
| "customer_impact": "checkout_delays", | |
| "confidence": 0.87, | |
| }, | |
| { | |
| "primary_service": "auth-service", | |
| "failure_mode": "resource_exhaustion", | |
| "dependency": "none", | |
| "customer_impact": "login_failures", | |
| "confidence": 0.61, | |
| }, | |
| ] | |
| return [ | |
| { | |
| "primary_service": "auth-service", | |
| "failure_mode": "resource_exhaustion", | |
| "dependency": "payment-api", | |
| "customer_impact": "cross_service_major_incident", | |
| "confidence": 0.92, | |
| }, | |
| { | |
| "primary_service": "order-service", | |
| "failure_mode": "storage_saturation", | |
| "dependency": "mysql", | |
| "customer_impact": "order_write_failures", | |
| "confidence": 0.71, | |
| }, | |
| { | |
| "primary_service": "notification-service", | |
| "failure_mode": "certificate_expiry", | |
| "dependency": "email-relay", | |
| "customer_impact": "notification_delivery_failure", | |
| "confidence": 0.68, | |
| }, | |
| ] | |
| def _model_select_hypothesis(task_id: str, observation: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| if not HF_TOKEN: | |
| return None | |
| candidates = _allowed_hypotheses(task_id) | |
| visible_logs = observation.get("visible_logs", [])[:8] | |
| compact_logs = [ | |
| { | |
| "log_id": log["log_id"], | |
| "service_name": log["service_name"], | |
| "log_level": log["log_level"], | |
| "message": log["message"], | |
| "response_time_ms": log["response_time_ms"], | |
| "cpu_usage_percent": log["cpu_usage_percent"], | |
| "memory_usage_percent": log["memory_usage_percent"], | |
| } | |
| for log in visible_logs | |
| ] | |
| prompt = { | |
| "task_id": task_id, | |
| "briefing": observation.get("briefing", {}), | |
| "visible_logs": compact_logs, | |
| "candidates": candidates, | |
| "instruction": ( | |
| "Choose the single best candidate hypothesis index for the incident. " | |
| "Return strict JSON with keys selected_index and rationale. " | |
| "Do not invent any fields. Use only the provided candidates." | |
| ), | |
| } | |
| try: | |
| response = client.responses.create( | |
| model=MODEL_NAME, | |
| input=[ | |
| { | |
| "role": "system", | |
| "content": "You are a deterministic incident triage assistant. Return only valid JSON.", | |
| }, | |
| {"role": "user", "content": str(prompt)}, | |
| ], | |
| temperature=0, | |
| max_output_tokens=120, | |
| ) | |
| text = getattr(response, "output_text", "") or "" | |
| if not text: | |
| return None | |
| import json | |
| payload = json.loads(text) | |
| idx = int(payload.get("selected_index", -1)) | |
| if 0 <= idx < len(candidates): | |
| return candidates[idx] | |
| except Exception: | |
| return None | |
| return None | |
| def _severity_score(log: Dict[str, Any]) -> float: | |
| level_weight = {"CRITICAL": 4.0, "ERROR": 3.0, "WARN": 1.0, "INFO": 0.2} | |
| score = level_weight.get(str(log["log_level"]).upper(), 0.0) | |
| if float(log.get("cpu_usage_percent", 0.0)) >= 90.0: | |
| score += 1.0 | |
| if float(log.get("memory_usage_percent", 0.0)) >= 95.0: | |
| score += 1.0 | |
| if int(log.get("response_time_ms", 0)) >= 3000: | |
| score += 1.0 | |
| message = str(log["message"]).lower() | |
| for needle, bonus in { | |
| "outofmemoryerror": 2.0, | |
| "connection refused": 2.0, | |
| "disk full": 2.0, | |
| "ssl certificate expired": 1.8, | |
| "segmentation fault": 1.8, | |
| "timeout exceeded": 1.0, | |
| }.items(): | |
| if needle in message: | |
| score += bonus | |
| return score | |
| def _infer_hypothesis(observation: Dict[str, Any]) -> Dict[str, Any]: | |
| task_id = observation.get("task_id", "easy") | |
| model_choice = _model_select_hypothesis(task_id, observation) | |
| if model_choice is not None: | |
| return model_choice | |
| logs = sorted(observation.get("visible_logs", []), key=_severity_score, reverse=True) | |
| services = {log["service_name"] for log in logs} | |
| messages = " ".join(str(log["message"]).lower() for log in logs) | |
| if "outofmemoryerror" in messages and {"payment-api", "order-service", "notification-service"} & services: | |
| return { | |
| "primary_service": "auth-service", | |
| "failure_mode": "resource_exhaustion", | |
| "dependency": "payment-api", | |
| "customer_impact": "cross_service_major_incident", | |
| "confidence": 0.92, | |
| } | |
| if "connection refused" in messages or "payment confirmation" in messages: | |
| return { | |
| "primary_service": "payment-api", | |
| "failure_mode": "dependency_outage", | |
| "dependency": "payment-gateway", | |
| "customer_impact": "checkout_delays", | |
| "confidence": 0.87, | |
| } | |
| if "disk full" in messages: | |
| return { | |
| "primary_service": "order-service", | |
| "failure_mode": "storage_saturation", | |
| "dependency": "mysql", | |
| "customer_impact": "order_write_failures", | |
| "confidence": 0.82, | |
| } | |
| if "ssl certificate expired" in messages or "email-relay" in messages: | |
| return { | |
| "primary_service": "notification-service", | |
| "failure_mode": "certificate_expiry", | |
| "dependency": "email-relay", | |
| "customer_impact": "notification_delivery_failure", | |
| "confidence": 0.81, | |
| } | |
| return { | |
| "primary_service": observation["briefing"]["suspected_services"][0], | |
| "failure_mode": "traffic_abuse", | |
| "dependency": "none", | |
| "customer_impact": "login_failures", | |
| "confidence": 0.55, | |
| } | |
| def _containment_for_hypothesis(hypothesis: Dict[str, Any]) -> List[str]: | |
| if hypothesis["primary_service"] == "auth-service" and hypothesis["customer_impact"] == "cross_service_major_incident": | |
| return [ | |
| "increase_auth_heap", | |
| "enable_login_rate_limiting", | |
| "restore_payment_gateway_connectivity", | |
| "free_order_log_disk", | |
| "renew_smtp_certificate", | |
| "page_major_incident_team", | |
| ] | |
| if hypothesis["primary_service"] == "payment-api": | |
| return ["restore_payment_gateway_connectivity", "reduce_checkout_retry_pressure"] | |
| if hypothesis["primary_service"] == "order-service": | |
| return ["free_order_log_disk", "reset_mysql_connection_pool"] | |
| if hypothesis["primary_service"] == "notification-service": | |
| return ["renew_smtp_certificate", "reroute_notification_traffic"] | |
| return ["increase_auth_heap", "enable_login_rate_limiting"] | |
| def _build_report(observation: Dict[str, Any], hypothesis: Dict[str, Any]) -> Dict[str, Any]: | |
| logs = sorted(observation.get("visible_logs", []), key=lambda log: _severity_score(log), reverse=True) | |
| evidence_ids = [int(log["log_id"]) for log in logs[: min(10, len(logs))]] | |
| impacted_services = sorted({log["service_name"] for log in logs if _severity_score(log) >= 3.0}) | |
| if not impacted_services: | |
| impacted_services = [hypothesis["primary_service"]] | |
| return { | |
| "evidence_log_ids": evidence_ids, | |
| "impacted_services": impacted_services, | |
| "root_cause": hypothesis, | |
| "containment_plan": _containment_for_hypothesis(hypothesis), | |
| "summary": ( | |
| f"The most likely incident source is {hypothesis['primary_service']} with failure mode " | |
| f"{hypothesis['failure_mode']}, creating customer impact {hypothesis['customer_impact']}." | |
| ), | |
| } | |
| def run_task(task_id: str) -> float: | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| final_score = 0.0 | |
| success = False | |
| observation: Dict[str, Any] | None = None | |
| log_start(task_id, BENCHMARK, MODEL_NAME) | |
| try: | |
| observation = api_reset(task_id) | |
| session_id = observation["session_id"] | |
| query_payload = { | |
| "session_id": session_id, | |
| "action_type": "query_logs", | |
| "query": { | |
| "levels": ["CRITICAL", "ERROR"], | |
| "start_time": observation["briefing"]["incident_window_start"], | |
| "end_time": observation["briefing"]["incident_window_end"], | |
| "limit": 6, | |
| }, | |
| } | |
| result = api_step(query_payload) | |
| observation = result["observation"] | |
| rewards.append(float(result["reward"]["value"])) | |
| steps_taken = 1 | |
| log_step(1, "query_logs", rewards[-1], bool(result["done"]), None) | |
| target_service = max( | |
| observation["briefing"]["suspected_services"], | |
| key=lambda service: sum(1 for log in observation["visible_logs"] if log["service_name"] == service), | |
| ) | |
| dep_payload = { | |
| "session_id": session_id, | |
| "action_type": "inspect_dependencies", | |
| "target_service": target_service, | |
| } | |
| result = api_step(dep_payload) | |
| observation = result["observation"] | |
| rewards.append(float(result["reward"]["value"])) | |
| steps_taken = 2 | |
| log_step(2, f"inspect_dependencies({target_service})", rewards[-1], bool(result["done"]), None) | |
| hypothesis = _infer_hypothesis(observation) | |
| hyp_payload = { | |
| "session_id": session_id, | |
| "action_type": "update_hypothesis", | |
| "hypothesis": hypothesis, | |
| } | |
| result = api_step(hyp_payload) | |
| observation = result["observation"] | |
| rewards.append(float(result["reward"]["value"])) | |
| steps_taken = 3 | |
| log_step(3, "update_hypothesis", rewards[-1], bool(result["done"]), None) | |
| containment_payload = { | |
| "session_id": session_id, | |
| "action_type": "execute_containment", | |
| "containment_plan": _containment_for_hypothesis(hypothesis), | |
| } | |
| result = api_step(containment_payload) | |
| observation = result["observation"] | |
| rewards.append(float(result["reward"]["value"])) | |
| steps_taken = 4 | |
| log_step(4, "execute_containment", rewards[-1], bool(result["done"]), None) | |
| report_payload = { | |
| "session_id": session_id, | |
| "action_type": "submit_report", | |
| "report": _build_report(observation, hypothesis), | |
| } | |
| result = api_step(report_payload) | |
| final_score = float(result["reward"]["value"]) | |
| rewards.append(final_score) | |
| steps_taken = 5 | |
| log_step(5, "submit_report", final_score, bool(result["done"]), None) | |
| success = final_score >= SUCCESS_THRESHOLD | |
| except Exception as exc: | |
| log_step(steps_taken + 1 if steps_taken else 1, "error", 0.0, True, str(exc).replace("\n", " ")) | |
| final_score = 0.0 | |
| success = False | |
| finally: | |
| log_end(success, steps_taken if steps_taken else 1, final_score, rewards or [0.0]) | |
| return final_score | |
| if __name__ == "__main__": | |
| for task_name in ("easy", "medium", "hard"): | |
| run_task(task_name) | |