petter2025's picture
Update app.py
89e9867 verified
raw
history blame
8.5 kB
# app.py – ARF v4 API with Gradio frontend (FastAPI mounted under /api)
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import gradio as gr
# ARF v4 imports
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.runtime.memory import create_faiss_index, RAGGraphMemory
from agentic_reliability_framework.runtime.memory.constants import MemoryConstants
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ========================= FASTAPI APP =========================
fastapi_app = FastAPI(title="ARF v4 API")
# Enable CORS for your frontend
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["https://arf-frontend-sandy.vercel.app"],
allow_methods=["*"],
allow_headers=["*"],
)
# ========================= ARF COMPONENTS =========================
risk_engine = RiskEngine()
faiss_index = create_faiss_index(dim=MemoryConstants.VECTOR_DIM)
memory = RAGGraphMemory(faiss_index)
# In‑memory storage for demo purposes (replace with a real DB later)
decision_history = []
# ========================= PYDANTIC MODELS =========================
class EvaluateRequest(BaseModel):
service_name: str
event_type: str
severity: str
metrics: Dict[str, float] = {}
class EvaluateResponse(BaseModel):
risk_score: float
base_risk: float
memory_risk: Optional[float] = None
weight: float
similar_events: list = []
confidence: float
# ========================= HELPER: Demo Intent =========================
class _DemoIntent:
environment = "dev"
deployment_target = "dev"
service_name = "demo"
# ========================= API ENDPOINTS =========================
@fastapi_app.get("/")
async def root():
"""Root endpoint – returns a welcome message."""
return {"message": "ARF v4 API. See /docs for documentation."}
@fastapi_app.get("/health")
async def health():
return {"status": "ok", "version": "4.0.0"}
@fastapi_app.get("/v1/get_risk")
async def get_risk():
"""Return the current demo risk."""
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
decision = "approve"
if risk_value > 0.8:
decision = "deny"
elif risk_value > 0.2:
decision = "escalate"
decision_id = str(uuid.uuid4())
decision_history.append({
"decision_id": decision_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"risk_score": float(risk_value),
"outcome": None, # will be filled when feedback is given
})
return {
"system_risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"contributions": contributions,
"decision_id": decision_id,
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}
@fastapi_app.get("/v1/history")
async def get_history():
"""Return the last 10 decisions."""
return decision_history[-10:]
@fastapi_app.post("/v1/incidents/evaluate", response_model=EvaluateResponse)
async def evaluate_incident(request: EvaluateRequest):
"""
Evaluate an incident and return a risk score with explainability.
This is a placeholder – replace with actual call to your risk engine.
"""
# For now, return a dummy response
return EvaluateResponse(
risk_score=0.23,
base_risk=0.15,
memory_risk=0.3,
weight=0.5,
similar_events=[],
confidence=0.9
)
@fastapi_app.post("/v1/feedback")
async def record_outcome(decision_id: str, success: bool):
"""Record the outcome of a decision (success/failure)."""
for dec in decision_history:
if dec["decision_id"] == decision_id:
dec["outcome"] = "success" if success else "failure"
# Update the risk engine (optional)
intent = _DemoIntent()
try:
risk_engine.update_outcome(intent, success)
except Exception as e:
logger.exception("Outcome update failed")
return {"status": "ok", "decision_id": decision_id, "outcome": dec["outcome"]}
return {"error": "decision not found"}
# ========================= GRADIO UI (Main App) =========================
# Gradio interface will be served at the root path ("/").
# The FastAPI app is mounted under the "/api" subpath.
def get_risk_snapshot():
try:
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
decision = "approve"
if risk_value > 0.8:
decision = "deny"
elif risk_value > 0.2:
decision = "escalate"
decision_id = str(uuid.uuid4())
decision_history.append({
"decision_id": decision_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"risk_score": float(risk_value),
"outcome": None,
})
return {
"risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"contributions": contributions,
"decision_id": decision_id,
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.exception("Failed to compute risk snapshot")
return {"error": str(e)}
def get_health_snapshot():
return {"status": "ok", "version": "4.0.0", "service": "ARF OSS API", "timestamp": datetime.now(timezone.utc).isoformat()}
def get_memory_snapshot():
if memory.has_historical_data():
return {"status": "ok", "memory_stats": memory.get_graph_stats(), "timestamp": datetime.now(timezone.utc).isoformat()}
return {"status": "empty", "memory_stats": "No historical memory yet.", "timestamp": datetime.now(timezone.utc).isoformat()}
def record_outcome_ui(success: bool):
if not decision_history:
return {"error": "no decisions yet"}
last = decision_history[-1]
last["outcome"] = "success" if success else "failure"
intent = _DemoIntent()
try:
risk_engine.update_outcome(intent, success)
except Exception as e:
logger.exception("Outcome update failed")
return {"decision_id": last["decision_id"], "outcome": last["outcome"], "timestamp": datetime.now(timezone.utc).isoformat()}
with gr.Blocks(title="ARF v4 Demo") as demo:
gr.Markdown("# Agentic Reliability Framework v4\n### Probabilistic Infrastructure Governance")
with gr.Row():
health_output = gr.JSON(label="Health")
risk_output = gr.JSON(label="Current Risk")
with gr.Row():
memory_output = gr.JSON(label="Memory Stats")
with gr.Row():
decision_output = gr.JSON(label="Recent Decisions")
with gr.Row():
refresh_btn = gr.Button("Evaluate Intent")
success_btn = gr.Button("Action Succeeded")
fail_btn = gr.Button("Action Failed")
refresh_btn.click(fn=get_risk_snapshot, outputs=risk_output)
success_btn.click(fn=lambda: record_outcome_ui(True), outputs=decision_output)
fail_btn.click(fn=lambda: record_outcome_ui(False), outputs=decision_output)
with gr.Row():
health_btn = gr.Button("Refresh Health")
memory_btn = gr.Button("Refresh Memory")
history_btn = gr.Button("Show Decision History")
health_btn.click(fn=get_health_snapshot, outputs=health_output)
memory_btn.click(fn=get_memory_snapshot, outputs=memory_output)
history_btn.click(fn=lambda: decision_history[-10:], outputs=decision_output)
# ========================= Mount FastAPI under /api =========================
# Mount FastAPI under /api
app = gr.mount_gradio_app(fastapi_app, demo, path="/api")
# This starts the server and keeps it alive
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
# Note: No manual server start. Hugging Face will serve this `app` (which is a FastAPI app with Gradio mounted)
# and the Gradio interface will be at the root, API endpoints under /api.