File size: 16,975 Bytes
7e9a520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
"""AtlasOps HF Space entry point.

Serves the custom ops console UI at / and wires the coordinator API.
This is what HF Spaces runs via the Dockerfile.
"""

import hashlib
import hmac
import json
import logging
import os
import subprocess
import time
import uuid
from pathlib import Path

from config.hf_space_env import apply_hf_space_inference_defaults

apply_hf_space_inference_defaults()

from fastapi import FastAPI, HTTPException, Request, Security
from fastapi.security import APIKeyHeader
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field

log = logging.getLogger("atlasops")

# ── Auth ──────────────────────────────────────────────────────────────────────
# Set ATLASOPS_API_KEY env var to enable auth on all mutating endpoints.
# If unset, mutations are allowed without a key (dev / demo mode with a warning).
_API_KEY = os.getenv("ATLASOPS_API_KEY", "")
_api_key_header = APIKeyHeader(name="X-AtlasOps-Key", auto_error=False)

# HMAC secret for validating Alertmanager webhook payloads.
# Set ALERTMANAGER_WEBHOOK_SECRET and configure Alertmanager to send:
#   Authorization: Bearer <secret>
_WEBHOOK_SECRET = os.getenv("ALERTMANAGER_WEBHOOK_SECRET", "")

if not _API_KEY:
    log.warning("ATLASOPS_API_KEY not set β€” mutating endpoints are unauthenticated (dev mode)")
if not _WEBHOOK_SECRET:
    log.warning("ALERTMANAGER_WEBHOOK_SECRET not set β€” webhook accepts unsigned payloads (dev mode)")


def _truthy_env(name: str) -> bool:
    return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}


def _require_api_key(key: str | None = Security(_api_key_header)) -> None:
    """Dependency: validates X-AtlasOps-Key header when ATLASOPS_API_KEY is set."""
    if not _API_KEY:
        return  # dev mode β€” no key required
    if key != _API_KEY:
        raise HTTPException(status_code=401, detail="Invalid or missing X-AtlasOps-Key header")


def _verify_webhook_signature(body: bytes, authorization: str | None) -> None:
    """Validate Alertmanager webhook Bearer token when ALERTMANAGER_WEBHOOK_SECRET is set."""
    if not _WEBHOOK_SECRET:
        return  # dev mode
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing Authorization header on webhook")
    token = authorization.removeprefix("Bearer ").strip()
    # Constant-time comparison to prevent timing attacks
    if not hmac.compare_digest(token.encode(), _WEBHOOK_SECRET.encode()):
        raise HTTPException(status_code=401, detail="Invalid webhook secret")

# Import coordinator internals
from agents.coordinator import handle_incident, app as coordinator_app
from agents.approval import approval_gate
from agents.audit import audit_log
from agents.circuit_breaker import circuit_breaker
from agents.correlator import correlator
from agents.prometheus_metrics import build_dashboard_metrics_payload
from agents.stream import subscribe, get_history

app = FastAPI(title="AtlasOps", docs_url="/api/docs")

# Mount coordinator routes
app.mount("/api", coordinator_app)

# Serve static files
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
    app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")


class InjectRequest(BaseModel):
    scenario_id: str = Field(min_length=1)
    name: str | None = None


class InjectResponse(BaseModel):
    ok: bool
    scenario_id: str
    correlation_id: str
    kubectl_skipped: bool = False


class RuntimeConfigResponse(BaseModel):
    coordinator_url: str
    grafana_url: str
    argocd_url: str
    boutique_url: str


class ApprovalCallbackRequest(BaseModel):
    token: str = Field(min_length=1)
    decision: str = Field(min_length=1)
    approved_by: str = ""
    reason: str = ""


@app.get("/", response_class=HTMLResponse)
async def root():
    index = static_dir / "index.html"
    if index.exists():
        return HTMLResponse(index.read_text(encoding="utf-8"))
    return HTMLResponse("<h1>AtlasOps</h1><p>Static files not found.</p>")


@app.post("/inject", dependencies=[Security(_require_api_key)])
async def inject_chaos(request: Request):
    """Apply a chaos scenario manifest to the real GKE cluster."""
    body = InjectRequest.model_validate(await request.json())
    scenario_id = body.scenario_id
    correlation_id = f"inj-{int(time.time())}-{uuid.uuid4().hex[:8]}"
    manifest = Path("bench/chaos_manifests") / f"{scenario_id}.yaml"

    if not manifest.exists():
        return JSONResponse({"ok": False, "error": f"Manifest not found: {scenario_id}"}, 404)

    kubectl_skipped = _truthy_env("ATLASOPS_SKIP_KUBECTL_INJECT")
    if kubectl_skipped:
        log.warning(
            "ATLASOPS_SKIP_KUBECTL_INJECT: not applying manifests; firing incident pipeline anyway "
            "(HF Space demo without kubeconfig)."
        )
    else:
        env = os.environ.copy()
        env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True"
        r = subprocess.run(
            ["kubectl", "apply", "-f", str(manifest)],
            capture_output=True, text=True, env=env, timeout=15,
        )
        if r.returncode != 0:
            err_msg = (r.stderr or "").strip() or r.stdout.strip() or f"exit {r.returncode}"
            return JSONResponse(
                {
                    "ok": False,
                    "error": err_msg,
                    "hint": (
                        "On Hugging Face Spaces, kubectl usually has no kubeconfig β€” set "
                        "ATLASOPS_SKIP_KUBECTL_INJECT=1 to run triage/diagnosis via LLMs using live "
                        "Prometheus/Alertmanager only (no chaos manifests applied)."
                    ),
                },
                500,
            )

    # Fire the incident through the coordinator after a brief wait
    import asyncio
    asyncio.create_task(_handle_after_delay(body.name or scenario_id, scenario_id, correlation_id))
    return JSONResponse(
        InjectResponse(
            ok=True,
            scenario_id=scenario_id,
            correlation_id=correlation_id,
            kubectl_skipped=kubectl_skipped,
        ).model_dump()
    )


async def _handle_after_delay(name: str, scenario_id: str, correlation_id: str):
    import asyncio
    await asyncio.sleep(20)
    from agents.tools.alertmanager import alertmanager_list_alerts
    result = alertmanager_list_alerts(active_only=True)
    alert = {
        "commonLabels": {"alertname": result["alerts"][0]["alertname"] if result.get("alerts") else name},
        "alerts": result.get("alerts", []),
        "scenario_id": scenario_id,
        "correlation_id": correlation_id,
    }
    # Route through correlator so UI-injected incidents obey the same
    # deduplication and dispatch rules as real Alertmanager webhooks.
    incident_id, _is_new, should_dispatch = correlator.ingest(alert)
    if not should_dispatch:
        return
    correlator.mark_processing(incident_id, True)
    try:
        await handle_incident(alert, incident_id=incident_id)
    finally:
        correlator.mark_processing(incident_id, False)


@app.post("/reset", dependencies=[Security(_require_api_key)])
async def reset_chaos():
    if _truthy_env("ATLASOPS_SKIP_KUBECTL_INJECT"):
        log.warning("ATLASOPS_SKIP_KUBECTL_INJECT: skipping kubectl delete on reset")
        return JSONResponse({"ok": True, "kubectl_skipped": True})
    env = os.environ.copy()
    env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True"
    subprocess.run(
        ["kubectl", "delete",
         "podchaos,networkchaos,stresschaos,dnschaos,iochaos,timechaos",
         "--all", "-A", "--ignore-not-found=true"],
        capture_output=True, env=env,
    )
    return JSONResponse({"ok": True})


@app.get("/stream")
async def stream():
    return StreamingResponse(
        subscribe(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


@app.get("/thoughts")
async def thoughts():
    return JSONResponse({"thoughts": get_history()})


@app.get("/metrics")
async def proxy_metrics():
    """Server-side Prometheus proxy β€” avoids browser CORS on direct GKE IP access."""
    return JSONResponse(await build_dashboard_metrics_payload())


@app.get("/bench/results/comparison_table.md")
async def comparison_table_markdown():
    """UI fetches this path; serve repo file or a tiny placeholder."""
    p = Path(__file__).resolve().parent / "bench" / "results" / "comparison_table.md"
    if not p.is_file():
        return PlainTextResponse(
            "| Scenario | Outcome | Notes |\n|---|---|---|\n"
            "| *pending* | β€” | Run `python bench/runner.py` or `python -m bench.quick_eval` locally, then commit `bench/results/comparison_table.md`. |\n",
            media_type="text/markdown; charset=utf-8",
        )
    return PlainTextResponse(p.read_text(encoding="utf-8"), media_type="text/markdown; charset=utf-8")


@app.get("/health")
async def health():
    from agents.coordinator import _live_judge_requested

    agent_base = os.getenv("VLLM_BASE", "").rstrip("/")
    ju = os.getenv("JUDGE_URL", "").rstrip("/")
    return JSONResponse({
        "status": "ok",
        "model": os.getenv("AGENT_MODEL", "Qwen/Qwen2.5-7B-Instruct"),
        "agent_base": agent_base,
        "backend": os.getenv("BACKEND", "vllm"),
        "judge_model": os.getenv("JUDGE_MODEL", ""),
        "judge_base": ju,
        "hf_inference_pack": os.getenv("ATLASOPS_USE_HF_INFERENCE", ""),
        "live_judge": _live_judge_requested(),
        "discord_webhook_configured": bool(os.getenv("DISCORD_WEBHOOK_URL", "").strip()),
        "slack_webhook_configured": bool(os.getenv("SLACK_WEBHOOK_URL", "").strip()),
    })


@app.get("/config")
async def runtime_config():
    """Expose runtime URLs so UI doesn't rely on hardcoded IPs."""
    # Empty β†’ browser keeps `window.location.origin` (required for HF Spaces).
    coordinator_url = (os.getenv("COORDINATOR_URL") or "").strip()
    grafana_url = os.getenv("GRAFANA_URL", "")
    argocd_url = os.getenv("ARGOCD_URL", "")
    boutique_url = os.getenv("BOUTIQUE_URL", "")
    return JSONResponse(
        RuntimeConfigResponse(
            coordinator_url=coordinator_url,
            grafana_url=grafana_url,
            argocd_url=argocd_url,
            boutique_url=boutique_url,
        ).model_dump()
    )


@app.post("/approval/callback", dependencies=[Security(_require_api_key)])
async def approval_callback(request: Request):
    payload = ApprovalCallbackRequest.model_validate(await request.json())
    result = approval_gate.callback(
        token=payload.token,
        decision=payload.decision,
        approved_by=payload.approved_by,
        reason=payload.reason,
    )
    status = 200 if result.get("ok") else 400
    return JSONResponse(result, status_code=status)


@app.get("/approval/pending")
async def approval_pending():
    return JSONResponse({"pending": approval_gate.pending()})


@app.get("/circuit-breaker/status")
async def circuit_breaker_status():
    return JSONResponse(circuit_breaker.status())


@app.post("/circuit-breaker/reset", dependencies=[Security(_require_api_key)])
async def circuit_breaker_reset():
    return JSONResponse(circuit_breaker.reset())


@app.get("/incidents/active")
async def incidents_active():
    return JSONResponse({"incidents": correlator.get_active()})


@app.get("/audit/log")
async def audit_log_entries(limit: int = 100, offset: int = 0):
    limit = max(1, min(limit, 500))
    offset = max(0, offset)
    return JSONResponse({"entries": audit_log.tail(limit=limit, offset=offset)})


@app.get("/audit/verify")
async def audit_verify():
    return JSONResponse(audit_log.verify_integrity())


_TOPOLOGY_SERVICES = [
    "frontend", "cartservice", "checkoutservice", "paymentservice",
    "currencyservice", "shippingservice", "emailservice",
    "recommendationservice", "productcatalogservice", "adservice", "redis-cart",
]


@app.get("/cluster/health")
async def cluster_health():
    """Per-service health from kubectl get pods -n default."""
    env = os.environ.copy()
    env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True"
    try:
        r = subprocess.run(
            ["kubectl", "get", "pods", "-n", "default", "-o", "json"],
            capture_output=True, text=True, env=env, timeout=8,
        )
        if r.returncode != 0:
            return JSONResponse({"ok": False, "services": {}})

        items = json.loads(r.stdout).get("items", [])
        services: dict = {}

        for pod in items:
            meta = pod.get("metadata", {})
            app_label = meta.get("labels", {}).get("app", "")
            if app_label not in _TOPOLOGY_SERVICES:
                continue

            status = pod.get("status", {})
            phase = status.get("phase", "Unknown")
            cs = status.get("containerStatuses", [])
            restarts = sum(c.get("restartCount", 0) for c in cs)
            ready_count = sum(1 for c in cs if c.get("ready", False))
            total = len(cs)

            if phase == "Running" and ready_count == total and total > 0:
                health = "healthy"
            elif phase in ("Pending", "Terminating"):
                health = "degraded"
            else:
                health = "down"

            if app_label in services:
                prev = services[app_label]
                # worst-case wins
                if "down" in (health, prev["status"]):
                    health = "down"
                elif "degraded" in (health, prev["status"]):
                    health = "degraded"
                services[app_label]["restarts"] = max(prev["restarts"], restarts)
                services[app_label]["status"] = health
            else:
                services[app_label] = {
                    "status": health,
                    "restarts": restarts,
                    "ready": f"{ready_count}/{total}",
                    "phase": phase,
                }

        for svc in _TOPOLOGY_SERVICES:
            if svc not in services:
                services[svc] = {"status": "unknown", "restarts": 0, "ready": "0/0", "phase": "?"}

        total_healthy = sum(1 for s in services.values() if s["status"] == "healthy")
        return JSONResponse({"ok": True, "services": services, "healthy": total_healthy, "total": len(_TOPOLOGY_SERVICES)})
    except Exception as e:
        return JSONResponse({"ok": False, "error": str(e), "services": {}})


@app.post("/webhook")
async def webhook_proxy(request: Request):
    """Top-level Alertmanager webhook β€” validates signature then forwards to coordinator.

    Alertmanager config:
      receivers:
        - name: atlasops
          webhook_configs:
            - url: 'http://<host>:7860/webhook'
              http_config:
                authorization:
                  type: Bearer
                  credentials: <ALERTMANAGER_WEBHOOK_SECRET>
    """
    body = await request.body()
    _verify_webhook_signature(body, request.headers.get("Authorization"))
    import json as _json
    from agents.coordinator import app as _coord
    # Re-dispatch through coordinator's webhook handler directly
    from agents.coordinator import handle_incident
    from agents.correlator import correlator
    try:
        payload = _json.loads(body)
    except _json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON payload")
    incident_id, _is_new, should_dispatch = correlator.ingest(payload)
    if not should_dispatch:
        return JSONResponse({"ok": True, "incident_id": incident_id, "dispatched": False})
    correlator.mark_processing(incident_id, True)
    import asyncio
    asyncio.create_task(_dispatch_incident(payload, incident_id))
    return JSONResponse({"ok": True, "incident_id": incident_id, "dispatched": True})


async def _dispatch_incident(payload: dict, incident_id: str) -> None:
    from agents.coordinator import handle_incident
    from agents.correlator import correlator
    try:
        await handle_incident(payload, incident_id=incident_id)
    finally:
        correlator.mark_processing(incident_id, False)


@app.get("/slack/feed")
async def slack_feed():
    """Return last 30 comms posts from the local log (powers the UI feed)."""
    log_path = Path("data/slack_posts.jsonl")
    if not log_path.exists():
        return JSONResponse({"posts": []})
    posts = []
    for line in log_path.read_text(encoding="utf-8").strip().splitlines():
        try:
            posts.append(json.loads(line))
        except json.JSONDecodeError:
            pass
    return JSONResponse({"posts": posts[-30:]})


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)