Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Submission inference script for the honest narrow incident environment.""" | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from typing import Any | |
| from openai import OpenAI | |
| from unified_incident_env.client import UnifiedIncidentEnv | |
| from unified_incident_env.models import UnifiedIncidentAction, UnifiedIncidentObservation | |
| from unified_incident_env.server.challenge import DEFAULT_SCENARIO_ID, SCENARIOS | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct:novita") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| ENV_BASE_URL = os.getenv("ENV_BASE_URL") or UnifiedIncidentEnv.DEFAULT_BASE_URL | |
| ENV_NAME = "unified-incident-env" | |
| MAX_TOKENS = 260 | |
| def create_client() -> OpenAI | None: | |
| if not HF_TOKEN: | |
| return None | |
| return OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| def log_start(*, task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(*, step: int, action: str, reward: float, done: bool, error: str | None) -> None: | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={error or 'null'}", | |
| flush=True, | |
| ) | |
| def log_end(*, success: bool, steps: int, score: float, rewards: list[float]) -> None: | |
| rewards_text = ",".join(f"{reward:.2f}" for reward in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_text}", flush=True) | |
| def _service_order(observation: UnifiedIncidentObservation) -> list[str]: | |
| services = list(observation.service_health.items()) | |
| services.sort(key=lambda item: (item[1].status != "crashed", item[1].status != "degraded", -item[1].error_rate_pct)) | |
| return [name for name, _payload in services] | |
| def _heuristic_root_cause(observation: UnifiedIncidentObservation) -> tuple[str, list[str]]: | |
| """Pick a plausible root cause + affected services from the observation. | |
| Decision tree maps the loudest signal to the most likely root cause across | |
| all 12 templates. This is the Stage-1 fallback — when the LLM is offline | |
| or returns malformed JSON, the heuristic still produces a calibrated | |
| hypothesis good enough to score partial credit on the 5-component rubric. | |
| Each branch corresponds to one of the 12 RootCauseType enum values. | |
| """ | |
| summary = (observation.incident_summary or "").lower() | |
| services = observation.service_health | |
| db = services.get("database") | |
| cache = services.get("cache") | |
| worker = services.get("worker") | |
| gateway = services.get("api-gateway") | |
| # Round-2 templates first (the new failure modes) | |
| if "memory" in summary or "oom" in summary or "restart loop" in summary: | |
| return "memory_leak_runaway", ["worker", "database", "api-gateway"] | |
| if "token" in summary or "credential" in summary or "auth" in summary and "401" in summary: | |
| return "credential_rotation_breakage", ["worker", "api-gateway"] | |
| if "dns" in summary or "discovery" in summary or "partition" in summary: | |
| return "network_dns_partition", ["cache", "worker", "api-gateway"] | |
| if "retry" in summary or "rate limit" in summary or "429" in summary: | |
| return "external_rate_limit_storm", ["worker", "database", "api-gateway"] | |
| if "lock" in summary or "migration" in summary and "concurrently" in summary: | |
| return "migration_lock_contention", ["database", "worker", "api-gateway"] | |
| if "maxclients" in summary or "pool" in summary and "cache" in summary: | |
| return "dependency_pool_exhausted", ["cache", "worker", "api-gateway"] | |
| # Vibe-coded SaaS extension band | |
| if "stripe" in summary or "webhook" in summary: | |
| return "payment_webhook_regression", ["api-gateway", "database"] | |
| if "schema" in summary or "prisma" in summary or "plan_tier" in summary: | |
| return "schema_migration_mismatch", ["api-gateway", "worker", "database"] | |
| if "ttl" in summary or "stale" in summary or "session" in summary and "cross" in summary: | |
| return "cache_ttl_regression", ["cache", "api-gateway"] | |
| # v2 catalogue (default fallbacks based on which service is loudest) | |
| if gateway and gateway.error_rate_pct >= 30 and (db is None or db.error_rate_pct < 5): | |
| return "api_gateway_fault", ["api-gateway", "worker"] | |
| if db and db.status == "degraded" and (worker is None or worker.status != "crashed"): | |
| return "database_only_failure", ["database", "api-gateway", "worker"] | |
| return "bad_worker_deploy", ["worker", "database", "api-gateway"] | |
| def _default_action_for_type(action_type: str, observation: UnifiedIncidentObservation) -> dict[str, Any]: | |
| services = _service_order(observation) | |
| service = services[0] if services else "database" | |
| if action_type in {"query_logs", "query_dependencies", "query_deploys", "rollback_deploy", "restart_service", "isolate_service"}: | |
| if action_type == "rollback_deploy": | |
| service = "worker" | |
| return {"action_type": action_type, "service": service} | |
| if action_type == "query_metrics": | |
| return {"action_type": action_type, "service": service, "metric": "cpu"} | |
| if action_type == "run_check": | |
| check_name = "database_recovery" | |
| if observation.service_health.get("database") and observation.service_health["database"].status == "healthy": | |
| check_name = "end_to_end" | |
| return {"action_type": action_type, "check_name": check_name} | |
| if action_type == "submit_hypothesis": | |
| root_cause, affected = _heuristic_root_cause(observation) | |
| return { | |
| "action_type": "submit_hypothesis", | |
| "hypothesis": { | |
| "root_cause": root_cause, | |
| "affected_services": affected, | |
| "confidence": 0.6, | |
| "recommended_next_action": "rollback_deploy", | |
| }, | |
| } | |
| return {"action_type": action_type} | |
| def parse_action(raw: str, observation: UnifiedIncidentObservation) -> UnifiedIncidentAction | None: | |
| text = raw.strip() | |
| if not text: | |
| return None | |
| try: | |
| data = json.loads(text) | |
| except Exception: | |
| return None | |
| if not isinstance(data, dict): | |
| return None | |
| if "action" in data and "action_type" not in data and isinstance(data["action"], str): | |
| data = {**data, "action_type": data["action"]} | |
| data.pop("action", None) | |
| action_type = data.get("action_type") | |
| if action_type not in observation.allowed_actions: | |
| return None | |
| try: | |
| return UnifiedIncidentAction(**data) | |
| except Exception: | |
| return None | |
| def build_user_prompt(observation: UnifiedIncidentObservation) -> str: | |
| required_lines = [] | |
| for action, fields in observation.required_fields_by_action.items(): | |
| required_lines.append(f"- {action}: {', '.join(fields) if fields else '(no extra fields)'}") | |
| checks = "\n".join( | |
| f"- {check.name}: {'passed' if check.passed else 'pending'} - {check.detail}" | |
| for check in observation.checks | |
| ) or "- none" | |
| return ( | |
| "Return exactly one JSON object representing the next action.\n" | |
| f"Current stage: {observation.workflow_stage}\n" | |
| f"Incident summary: {observation.incident_summary}\n" | |
| f"Current score: {observation.final_score:.4f}\n" | |
| f"Last action result: {observation.last_action_result or 'none'}\n" | |
| f"Tool output: {observation.tool_output or 'none'}\n" | |
| f"Failure: {observation.failure_type or 'none'}\n" | |
| f"Why failed: {observation.why_failed or 'none'}\n" | |
| f"User impact: {observation.user_impact:.2f}\n" | |
| f"SLO burn rate: {observation.slo_burn_rate:.2f}\n" | |
| "Allowed actions:\n" | |
| + "\n".join(f"- {action}" for action in observation.allowed_actions) | |
| + "\nRequired fields:\n" | |
| + "\n".join(required_lines) | |
| + "\nChecks:\n" | |
| + checks | |
| ) | |
| _ROOT_CAUSE_ENUM: list[str] = [ | |
| # Original 3 (v2 catalogue) | |
| "bad_worker_deploy", | |
| "database_only_failure", | |
| "api_gateway_fault", | |
| # Vibe-coded SaaS extension band | |
| "payment_webhook_regression", | |
| "schema_migration_mismatch", | |
| "cache_ttl_regression", | |
| # Round-2 Basic-tier additions (April 2026 hackathon) | |
| "dependency_pool_exhausted", | |
| "memory_leak_runaway", | |
| "credential_rotation_breakage", | |
| "network_dns_partition", | |
| "external_rate_limit_storm", | |
| "migration_lock_contention", | |
| ] | |
| """All 12 root causes the model can hypothesize. | |
| Mirrors ``unified_incident_env.models.RootCauseType`` exactly. Kept as a module- | |
| level constant so a CI test can import and diff this against the Literal | |
| without round-tripping through reflection. | |
| """ | |
| def _schema(observation: UnifiedIncidentObservation) -> dict[str, Any]: | |
| properties: dict[str, Any] = { | |
| "action_type": {"type": "string", "enum": observation.allowed_actions}, | |
| "service": {"type": "string", "enum": sorted(observation.service_health)}, | |
| "metric": {"type": "string", "enum": ["cpu", "error_rate", "latency"]}, | |
| "check_name": {"type": "string", "enum": ["database_recovery", "end_to_end"]}, | |
| "hypothesis": { | |
| "type": "object", | |
| "properties": { | |
| "root_cause": {"type": "string", "enum": list(_ROOT_CAUSE_ENUM)}, | |
| "affected_services": { | |
| "type": "array", | |
| "items": {"type": "string", "enum": sorted(observation.service_health)}, | |
| "minItems": 1, | |
| }, | |
| "confidence": {"type": "number", "minimum": 0.0, "maximum": 1.0}, | |
| "recommended_next_action": { | |
| "type": "string", | |
| "enum": [ | |
| "query_logs", | |
| "query_metrics", | |
| "query_dependencies", | |
| "query_deploys", | |
| "rollback_deploy", | |
| "restart_service", | |
| "run_check", | |
| "isolate_service", | |
| "escalate", | |
| "declare_resolved", | |
| ], | |
| }, | |
| }, | |
| "required": ["root_cause", "affected_services", "confidence", "recommended_next_action"], | |
| "additionalProperties": False, | |
| }, | |
| } | |
| required = ["action_type"] | |
| for action, fields in observation.required_fields_by_action.items(): | |
| if action in observation.allowed_actions: | |
| for field in fields: | |
| if field not in required: | |
| required.append(field) | |
| return { | |
| "type": "object", | |
| "properties": properties, | |
| "required": required, | |
| "additionalProperties": False, | |
| } | |
| def request_action(client: OpenAI, observation: UnifiedIncidentObservation) -> str: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": "You are an incident responder. Respond with JSON only."}, | |
| {"role": "user", "content": build_user_prompt(observation)}, | |
| ], | |
| response_format={ | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "incident_action", | |
| "strict": True, | |
| "schema": _schema(observation), | |
| }, | |
| }, | |
| max_tokens=MAX_TOKENS, | |
| temperature=0.0, | |
| ) | |
| return (completion.choices[0].message.content or "").strip() | |
| def build_fallback_action(observation: UnifiedIncidentObservation) -> UnifiedIncidentAction: | |
| services = _service_order(observation) | |
| if "query_deploys" in observation.allowed_actions and "worker" in observation.service_health: | |
| return UnifiedIncidentAction(action_type="query_deploys", service="worker") | |
| if "query_logs" in observation.allowed_actions: | |
| return UnifiedIncidentAction(action_type="query_logs", service=services[0] if services else "database") | |
| if "query_metrics" in observation.allowed_actions: | |
| return UnifiedIncidentAction(action_type="query_metrics", service=services[0] if services else "database", metric="cpu") | |
| action_type = observation.allowed_actions[0] | |
| return UnifiedIncidentAction(**_default_action_for_type(action_type, observation)) | |
| def get_model_action(client: OpenAI | None, observation: UnifiedIncidentObservation) -> tuple[UnifiedIncidentAction, str | None]: | |
| if client is None: | |
| return build_fallback_action(observation), "model_unavailable" | |
| try: | |
| parsed = parse_action(request_action(client, observation), observation) | |
| if parsed is not None: | |
| return parsed, None | |
| except Exception: | |
| pass | |
| return build_fallback_action(observation), "fallback_used" | |
| def run_scenario(client: OpenAI | None, scenario_id: str) -> dict[str, Any]: | |
| with UnifiedIncidentEnv(base_url=ENV_BASE_URL).sync() as env: | |
| observation = env.reset(scenario_id=scenario_id).observation | |
| rewards: list[float] = [] | |
| step = 0 | |
| log_start(task=scenario_id, env=ENV_NAME, model=MODEL_NAME) | |
| while not observation.done: | |
| step += 1 | |
| action, error = get_model_action(client, observation) | |
| result = env.step(action) | |
| observation = result.observation | |
| rewards.append(float(result.reward)) | |
| log_step( | |
| step=step, | |
| action=json.dumps(action.model_dump(exclude_none=True), separators=(",", ":")), | |
| reward=float(result.reward), | |
| done=bool(result.done), | |
| error=error or observation.failure_type, | |
| ) | |
| log_end( | |
| success=bool(observation.done and observation.incident_resolved), | |
| steps=step, | |
| score=observation.final_score, | |
| rewards=rewards, | |
| ) | |
| return { | |
| "success": bool(observation.done and observation.incident_resolved), | |
| "score": observation.final_score, | |
| "steps": step, | |
| "rewards": rewards, | |
| } | |
| def main() -> None: | |
| client = create_client() | |
| for scenario_id in SCENARIOS: | |
| run_scenario(client, scenario_id) | |
| if __name__ == "__main__": | |
| main() | |