File size: 8,503 Bytes
40981c8 b635d04 f6fe53e b635d04 0f5f4d1 b635d04 0f5f4d1 189570d 0f5f4d1 642b6b3 6bb0212 642b6b3 c98f35f ff4d74f b635d04 0f5f4d1 40981c8 6756da2 0f5f4d1 40981c8 6756da2 642b6b3 6756da2 b635d04 d2a0c5e 6c7e606 0f5f4d1 642b6b3 6bb0212 0f5f4d1 f6fe53e 0f5f4d1 40981c8 6bb0212 0f5f4d1 6bb0212 40981c8 642b6b3 7cfde2b 40981c8 0f5f4d1 b635d04 f6fe53e 0f5f4d1 f6fe53e 0f5f4d1 40981c8 0f5f4d1 f6fe53e 54e37af 0f5f4d1 b635d04 f6fe53e 0f5f4d1 54e37af 40981c8 0f5f4d1 f6fe53e 40981c8 0f5f4d1 40981c8 0f5f4d1 f6fe53e 40981c8 0f5f4d1 40981c8 0f5f4d1 40981c8 f6fe53e b635d04 0f5f4d1 b635d04 0f5f4d1 b635d04 0f5f4d1 b635d04 0f5f4d1 b635d04 0f5f4d1 f6fe53e 0f5f4d1 b635d04 0f5f4d1 b635d04 0f5f4d1 f6fe53e b635d04 0f5f4d1 54e37af b635d04 54e37af b635d04 f6fe53e 0f5f4d1 f6fe53e b635d04 f6fe53e b635d04 0f5f4d1 f6fe53e 40981c8 89e9867 1da74ed 40981c8 89e9867 1da74ed | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | # app.py – ARF v4 API with Gradio frontend (FastAPI mounted under /api)
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import gradio as gr
# ARF v4 imports
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.runtime.memory import create_faiss_index, RAGGraphMemory
from agentic_reliability_framework.runtime.memory.constants import MemoryConstants
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ========================= FASTAPI APP =========================
fastapi_app = FastAPI(title="ARF v4 API")
# Enable CORS for your frontend
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["https://arf-frontend-sandy.vercel.app"],
allow_methods=["*"],
allow_headers=["*"],
)
# ========================= ARF COMPONENTS =========================
risk_engine = RiskEngine()
faiss_index = create_faiss_index(dim=MemoryConstants.VECTOR_DIM)
memory = RAGGraphMemory(faiss_index)
# In‑memory storage for demo purposes (replace with a real DB later)
decision_history = []
# ========================= PYDANTIC MODELS =========================
class EvaluateRequest(BaseModel):
service_name: str
event_type: str
severity: str
metrics: Dict[str, float] = {}
class EvaluateResponse(BaseModel):
risk_score: float
base_risk: float
memory_risk: Optional[float] = None
weight: float
similar_events: list = []
confidence: float
# ========================= HELPER: Demo Intent =========================
class _DemoIntent:
environment = "dev"
deployment_target = "dev"
service_name = "demo"
# ========================= API ENDPOINTS =========================
@fastapi_app.get("/")
async def root():
"""Root endpoint – returns a welcome message."""
return {"message": "ARF v4 API. See /docs for documentation."}
@fastapi_app.get("/health")
async def health():
return {"status": "ok", "version": "4.0.0"}
@fastapi_app.get("/v1/get_risk")
async def get_risk():
"""Return the current demo risk."""
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
decision = "approve"
if risk_value > 0.8:
decision = "deny"
elif risk_value > 0.2:
decision = "escalate"
decision_id = str(uuid.uuid4())
decision_history.append({
"decision_id": decision_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"risk_score": float(risk_value),
"outcome": None, # will be filled when feedback is given
})
return {
"system_risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"contributions": contributions,
"decision_id": decision_id,
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}
@fastapi_app.get("/v1/history")
async def get_history():
"""Return the last 10 decisions."""
return decision_history[-10:]
@fastapi_app.post("/v1/incidents/evaluate", response_model=EvaluateResponse)
async def evaluate_incident(request: EvaluateRequest):
"""
Evaluate an incident and return a risk score with explainability.
This is a placeholder – replace with actual call to your risk engine.
"""
# For now, return a dummy response
return EvaluateResponse(
risk_score=0.23,
base_risk=0.15,
memory_risk=0.3,
weight=0.5,
similar_events=[],
confidence=0.9
)
@fastapi_app.post("/v1/feedback")
async def record_outcome(decision_id: str, success: bool):
"""Record the outcome of a decision (success/failure)."""
for dec in decision_history:
if dec["decision_id"] == decision_id:
dec["outcome"] = "success" if success else "failure"
# Update the risk engine (optional)
intent = _DemoIntent()
try:
risk_engine.update_outcome(intent, success)
except Exception as e:
logger.exception("Outcome update failed")
return {"status": "ok", "decision_id": decision_id, "outcome": dec["outcome"]}
return {"error": "decision not found"}
# ========================= GRADIO UI (Main App) =========================
# Gradio interface will be served at the root path ("/").
# The FastAPI app is mounted under the "/api" subpath.
def get_risk_snapshot():
try:
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
decision = "approve"
if risk_value > 0.8:
decision = "deny"
elif risk_value > 0.2:
decision = "escalate"
decision_id = str(uuid.uuid4())
decision_history.append({
"decision_id": decision_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"risk_score": float(risk_value),
"outcome": None,
})
return {
"risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"contributions": contributions,
"decision_id": decision_id,
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.exception("Failed to compute risk snapshot")
return {"error": str(e)}
def get_health_snapshot():
return {"status": "ok", "version": "4.0.0", "service": "ARF OSS API", "timestamp": datetime.now(timezone.utc).isoformat()}
def get_memory_snapshot():
if memory.has_historical_data():
return {"status": "ok", "memory_stats": memory.get_graph_stats(), "timestamp": datetime.now(timezone.utc).isoformat()}
return {"status": "empty", "memory_stats": "No historical memory yet.", "timestamp": datetime.now(timezone.utc).isoformat()}
def record_outcome_ui(success: bool):
if not decision_history:
return {"error": "no decisions yet"}
last = decision_history[-1]
last["outcome"] = "success" if success else "failure"
intent = _DemoIntent()
try:
risk_engine.update_outcome(intent, success)
except Exception as e:
logger.exception("Outcome update failed")
return {"decision_id": last["decision_id"], "outcome": last["outcome"], "timestamp": datetime.now(timezone.utc).isoformat()}
with gr.Blocks(title="ARF v4 Demo") as demo:
gr.Markdown("# Agentic Reliability Framework v4\n### Probabilistic Infrastructure Governance")
with gr.Row():
health_output = gr.JSON(label="Health")
risk_output = gr.JSON(label="Current Risk")
with gr.Row():
memory_output = gr.JSON(label="Memory Stats")
with gr.Row():
decision_output = gr.JSON(label="Recent Decisions")
with gr.Row():
refresh_btn = gr.Button("Evaluate Intent")
success_btn = gr.Button("Action Succeeded")
fail_btn = gr.Button("Action Failed")
refresh_btn.click(fn=get_risk_snapshot, outputs=risk_output)
success_btn.click(fn=lambda: record_outcome_ui(True), outputs=decision_output)
fail_btn.click(fn=lambda: record_outcome_ui(False), outputs=decision_output)
with gr.Row():
health_btn = gr.Button("Refresh Health")
memory_btn = gr.Button("Refresh Memory")
history_btn = gr.Button("Show Decision History")
health_btn.click(fn=get_health_snapshot, outputs=health_output)
memory_btn.click(fn=get_memory_snapshot, outputs=memory_output)
history_btn.click(fn=lambda: decision_history[-10:], outputs=decision_output)
# ========================= Mount FastAPI under /api =========================
# Mount FastAPI under /api
app = gr.mount_gradio_app(fastapi_app, demo, path="/api")
# This starts the server and keeps it alive
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
# Note: No manual server start. Hugging Face will serve this `app` (which is a FastAPI app with Gradio mounted)
# and the Gradio interface will be at the root, API endpoints under /api. |