File size: 16,975 Bytes
7e9a520 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 | """AtlasOps HF Space entry point.
Serves the custom ops console UI at / and wires the coordinator API.
This is what HF Spaces runs via the Dockerfile.
"""
import hashlib
import hmac
import json
import logging
import os
import subprocess
import time
import uuid
from pathlib import Path
from config.hf_space_env import apply_hf_space_inference_defaults
apply_hf_space_inference_defaults()
from fastapi import FastAPI, HTTPException, Request, Security
from fastapi.security import APIKeyHeader
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
log = logging.getLogger("atlasops")
# ββ Auth ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Set ATLASOPS_API_KEY env var to enable auth on all mutating endpoints.
# If unset, mutations are allowed without a key (dev / demo mode with a warning).
_API_KEY = os.getenv("ATLASOPS_API_KEY", "")
_api_key_header = APIKeyHeader(name="X-AtlasOps-Key", auto_error=False)
# HMAC secret for validating Alertmanager webhook payloads.
# Set ALERTMANAGER_WEBHOOK_SECRET and configure Alertmanager to send:
# Authorization: Bearer <secret>
_WEBHOOK_SECRET = os.getenv("ALERTMANAGER_WEBHOOK_SECRET", "")
if not _API_KEY:
log.warning("ATLASOPS_API_KEY not set β mutating endpoints are unauthenticated (dev mode)")
if not _WEBHOOK_SECRET:
log.warning("ALERTMANAGER_WEBHOOK_SECRET not set β webhook accepts unsigned payloads (dev mode)")
def _truthy_env(name: str) -> bool:
return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}
def _require_api_key(key: str | None = Security(_api_key_header)) -> None:
"""Dependency: validates X-AtlasOps-Key header when ATLASOPS_API_KEY is set."""
if not _API_KEY:
return # dev mode β no key required
if key != _API_KEY:
raise HTTPException(status_code=401, detail="Invalid or missing X-AtlasOps-Key header")
def _verify_webhook_signature(body: bytes, authorization: str | None) -> None:
"""Validate Alertmanager webhook Bearer token when ALERTMANAGER_WEBHOOK_SECRET is set."""
if not _WEBHOOK_SECRET:
return # dev mode
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing Authorization header on webhook")
token = authorization.removeprefix("Bearer ").strip()
# Constant-time comparison to prevent timing attacks
if not hmac.compare_digest(token.encode(), _WEBHOOK_SECRET.encode()):
raise HTTPException(status_code=401, detail="Invalid webhook secret")
# Import coordinator internals
from agents.coordinator import handle_incident, app as coordinator_app
from agents.approval import approval_gate
from agents.audit import audit_log
from agents.circuit_breaker import circuit_breaker
from agents.correlator import correlator
from agents.prometheus_metrics import build_dashboard_metrics_payload
from agents.stream import subscribe, get_history
app = FastAPI(title="AtlasOps", docs_url="/api/docs")
# Mount coordinator routes
app.mount("/api", coordinator_app)
# Serve static files
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
class InjectRequest(BaseModel):
scenario_id: str = Field(min_length=1)
name: str | None = None
class InjectResponse(BaseModel):
ok: bool
scenario_id: str
correlation_id: str
kubectl_skipped: bool = False
class RuntimeConfigResponse(BaseModel):
coordinator_url: str
grafana_url: str
argocd_url: str
boutique_url: str
class ApprovalCallbackRequest(BaseModel):
token: str = Field(min_length=1)
decision: str = Field(min_length=1)
approved_by: str = ""
reason: str = ""
@app.get("/", response_class=HTMLResponse)
async def root():
index = static_dir / "index.html"
if index.exists():
return HTMLResponse(index.read_text(encoding="utf-8"))
return HTMLResponse("<h1>AtlasOps</h1><p>Static files not found.</p>")
@app.post("/inject", dependencies=[Security(_require_api_key)])
async def inject_chaos(request: Request):
"""Apply a chaos scenario manifest to the real GKE cluster."""
body = InjectRequest.model_validate(await request.json())
scenario_id = body.scenario_id
correlation_id = f"inj-{int(time.time())}-{uuid.uuid4().hex[:8]}"
manifest = Path("bench/chaos_manifests") / f"{scenario_id}.yaml"
if not manifest.exists():
return JSONResponse({"ok": False, "error": f"Manifest not found: {scenario_id}"}, 404)
kubectl_skipped = _truthy_env("ATLASOPS_SKIP_KUBECTL_INJECT")
if kubectl_skipped:
log.warning(
"ATLASOPS_SKIP_KUBECTL_INJECT: not applying manifests; firing incident pipeline anyway "
"(HF Space demo without kubeconfig)."
)
else:
env = os.environ.copy()
env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True"
r = subprocess.run(
["kubectl", "apply", "-f", str(manifest)],
capture_output=True, text=True, env=env, timeout=15,
)
if r.returncode != 0:
err_msg = (r.stderr or "").strip() or r.stdout.strip() or f"exit {r.returncode}"
return JSONResponse(
{
"ok": False,
"error": err_msg,
"hint": (
"On Hugging Face Spaces, kubectl usually has no kubeconfig β set "
"ATLASOPS_SKIP_KUBECTL_INJECT=1 to run triage/diagnosis via LLMs using live "
"Prometheus/Alertmanager only (no chaos manifests applied)."
),
},
500,
)
# Fire the incident through the coordinator after a brief wait
import asyncio
asyncio.create_task(_handle_after_delay(body.name or scenario_id, scenario_id, correlation_id))
return JSONResponse(
InjectResponse(
ok=True,
scenario_id=scenario_id,
correlation_id=correlation_id,
kubectl_skipped=kubectl_skipped,
).model_dump()
)
async def _handle_after_delay(name: str, scenario_id: str, correlation_id: str):
import asyncio
await asyncio.sleep(20)
from agents.tools.alertmanager import alertmanager_list_alerts
result = alertmanager_list_alerts(active_only=True)
alert = {
"commonLabels": {"alertname": result["alerts"][0]["alertname"] if result.get("alerts") else name},
"alerts": result.get("alerts", []),
"scenario_id": scenario_id,
"correlation_id": correlation_id,
}
# Route through correlator so UI-injected incidents obey the same
# deduplication and dispatch rules as real Alertmanager webhooks.
incident_id, _is_new, should_dispatch = correlator.ingest(alert)
if not should_dispatch:
return
correlator.mark_processing(incident_id, True)
try:
await handle_incident(alert, incident_id=incident_id)
finally:
correlator.mark_processing(incident_id, False)
@app.post("/reset", dependencies=[Security(_require_api_key)])
async def reset_chaos():
if _truthy_env("ATLASOPS_SKIP_KUBECTL_INJECT"):
log.warning("ATLASOPS_SKIP_KUBECTL_INJECT: skipping kubectl delete on reset")
return JSONResponse({"ok": True, "kubectl_skipped": True})
env = os.environ.copy()
env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True"
subprocess.run(
["kubectl", "delete",
"podchaos,networkchaos,stresschaos,dnschaos,iochaos,timechaos",
"--all", "-A", "--ignore-not-found=true"],
capture_output=True, env=env,
)
return JSONResponse({"ok": True})
@app.get("/stream")
async def stream():
return StreamingResponse(
subscribe(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.get("/thoughts")
async def thoughts():
return JSONResponse({"thoughts": get_history()})
@app.get("/metrics")
async def proxy_metrics():
"""Server-side Prometheus proxy β avoids browser CORS on direct GKE IP access."""
return JSONResponse(await build_dashboard_metrics_payload())
@app.get("/bench/results/comparison_table.md")
async def comparison_table_markdown():
"""UI fetches this path; serve repo file or a tiny placeholder."""
p = Path(__file__).resolve().parent / "bench" / "results" / "comparison_table.md"
if not p.is_file():
return PlainTextResponse(
"| Scenario | Outcome | Notes |\n|---|---|---|\n"
"| *pending* | β | Run `python bench/runner.py` or `python -m bench.quick_eval` locally, then commit `bench/results/comparison_table.md`. |\n",
media_type="text/markdown; charset=utf-8",
)
return PlainTextResponse(p.read_text(encoding="utf-8"), media_type="text/markdown; charset=utf-8")
@app.get("/health")
async def health():
from agents.coordinator import _live_judge_requested
agent_base = os.getenv("VLLM_BASE", "").rstrip("/")
ju = os.getenv("JUDGE_URL", "").rstrip("/")
return JSONResponse({
"status": "ok",
"model": os.getenv("AGENT_MODEL", "Qwen/Qwen2.5-7B-Instruct"),
"agent_base": agent_base,
"backend": os.getenv("BACKEND", "vllm"),
"judge_model": os.getenv("JUDGE_MODEL", ""),
"judge_base": ju,
"hf_inference_pack": os.getenv("ATLASOPS_USE_HF_INFERENCE", ""),
"live_judge": _live_judge_requested(),
"discord_webhook_configured": bool(os.getenv("DISCORD_WEBHOOK_URL", "").strip()),
"slack_webhook_configured": bool(os.getenv("SLACK_WEBHOOK_URL", "").strip()),
})
@app.get("/config")
async def runtime_config():
"""Expose runtime URLs so UI doesn't rely on hardcoded IPs."""
# Empty β browser keeps `window.location.origin` (required for HF Spaces).
coordinator_url = (os.getenv("COORDINATOR_URL") or "").strip()
grafana_url = os.getenv("GRAFANA_URL", "")
argocd_url = os.getenv("ARGOCD_URL", "")
boutique_url = os.getenv("BOUTIQUE_URL", "")
return JSONResponse(
RuntimeConfigResponse(
coordinator_url=coordinator_url,
grafana_url=grafana_url,
argocd_url=argocd_url,
boutique_url=boutique_url,
).model_dump()
)
@app.post("/approval/callback", dependencies=[Security(_require_api_key)])
async def approval_callback(request: Request):
payload = ApprovalCallbackRequest.model_validate(await request.json())
result = approval_gate.callback(
token=payload.token,
decision=payload.decision,
approved_by=payload.approved_by,
reason=payload.reason,
)
status = 200 if result.get("ok") else 400
return JSONResponse(result, status_code=status)
@app.get("/approval/pending")
async def approval_pending():
return JSONResponse({"pending": approval_gate.pending()})
@app.get("/circuit-breaker/status")
async def circuit_breaker_status():
return JSONResponse(circuit_breaker.status())
@app.post("/circuit-breaker/reset", dependencies=[Security(_require_api_key)])
async def circuit_breaker_reset():
return JSONResponse(circuit_breaker.reset())
@app.get("/incidents/active")
async def incidents_active():
return JSONResponse({"incidents": correlator.get_active()})
@app.get("/audit/log")
async def audit_log_entries(limit: int = 100, offset: int = 0):
limit = max(1, min(limit, 500))
offset = max(0, offset)
return JSONResponse({"entries": audit_log.tail(limit=limit, offset=offset)})
@app.get("/audit/verify")
async def audit_verify():
return JSONResponse(audit_log.verify_integrity())
_TOPOLOGY_SERVICES = [
"frontend", "cartservice", "checkoutservice", "paymentservice",
"currencyservice", "shippingservice", "emailservice",
"recommendationservice", "productcatalogservice", "adservice", "redis-cart",
]
@app.get("/cluster/health")
async def cluster_health():
"""Per-service health from kubectl get pods -n default."""
env = os.environ.copy()
env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True"
try:
r = subprocess.run(
["kubectl", "get", "pods", "-n", "default", "-o", "json"],
capture_output=True, text=True, env=env, timeout=8,
)
if r.returncode != 0:
return JSONResponse({"ok": False, "services": {}})
items = json.loads(r.stdout).get("items", [])
services: dict = {}
for pod in items:
meta = pod.get("metadata", {})
app_label = meta.get("labels", {}).get("app", "")
if app_label not in _TOPOLOGY_SERVICES:
continue
status = pod.get("status", {})
phase = status.get("phase", "Unknown")
cs = status.get("containerStatuses", [])
restarts = sum(c.get("restartCount", 0) for c in cs)
ready_count = sum(1 for c in cs if c.get("ready", False))
total = len(cs)
if phase == "Running" and ready_count == total and total > 0:
health = "healthy"
elif phase in ("Pending", "Terminating"):
health = "degraded"
else:
health = "down"
if app_label in services:
prev = services[app_label]
# worst-case wins
if "down" in (health, prev["status"]):
health = "down"
elif "degraded" in (health, prev["status"]):
health = "degraded"
services[app_label]["restarts"] = max(prev["restarts"], restarts)
services[app_label]["status"] = health
else:
services[app_label] = {
"status": health,
"restarts": restarts,
"ready": f"{ready_count}/{total}",
"phase": phase,
}
for svc in _TOPOLOGY_SERVICES:
if svc not in services:
services[svc] = {"status": "unknown", "restarts": 0, "ready": "0/0", "phase": "?"}
total_healthy = sum(1 for s in services.values() if s["status"] == "healthy")
return JSONResponse({"ok": True, "services": services, "healthy": total_healthy, "total": len(_TOPOLOGY_SERVICES)})
except Exception as e:
return JSONResponse({"ok": False, "error": str(e), "services": {}})
@app.post("/webhook")
async def webhook_proxy(request: Request):
"""Top-level Alertmanager webhook β validates signature then forwards to coordinator.
Alertmanager config:
receivers:
- name: atlasops
webhook_configs:
- url: 'http://<host>:7860/webhook'
http_config:
authorization:
type: Bearer
credentials: <ALERTMANAGER_WEBHOOK_SECRET>
"""
body = await request.body()
_verify_webhook_signature(body, request.headers.get("Authorization"))
import json as _json
from agents.coordinator import app as _coord
# Re-dispatch through coordinator's webhook handler directly
from agents.coordinator import handle_incident
from agents.correlator import correlator
try:
payload = _json.loads(body)
except _json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
incident_id, _is_new, should_dispatch = correlator.ingest(payload)
if not should_dispatch:
return JSONResponse({"ok": True, "incident_id": incident_id, "dispatched": False})
correlator.mark_processing(incident_id, True)
import asyncio
asyncio.create_task(_dispatch_incident(payload, incident_id))
return JSONResponse({"ok": True, "incident_id": incident_id, "dispatched": True})
async def _dispatch_incident(payload: dict, incident_id: str) -> None:
from agents.coordinator import handle_incident
from agents.correlator import correlator
try:
await handle_incident(payload, incident_id=incident_id)
finally:
correlator.mark_processing(incident_id, False)
@app.get("/slack/feed")
async def slack_feed():
"""Return last 30 comms posts from the local log (powers the UI feed)."""
log_path = Path("data/slack_posts.jsonl")
if not log_path.exists():
return JSONResponse({"posts": []})
posts = []
for line in log_path.read_text(encoding="utf-8").strip().splitlines():
try:
posts.append(json.loads(line))
except json.JSONDecodeError:
pass
return JSONResponse({"posts": posts[-30:]})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|