# app.py – ARF v4 API with Gradio frontend (FastAPI mounted under /api) import logging import uuid from datetime import datetime, timezone from typing import Dict, Any, Optional from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import gradio as gr # ARF v4 imports from agentic_reliability_framework.core.governance.risk_engine import RiskEngine from agentic_reliability_framework.runtime.memory import create_faiss_index, RAGGraphMemory from agentic_reliability_framework.runtime.memory.constants import MemoryConstants logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ========================= FASTAPI APP ========================= fastapi_app = FastAPI(title="ARF v4 API") # Enable CORS for your frontend fastapi_app.add_middleware( CORSMiddleware, allow_origins=["https://arf-frontend-sandy.vercel.app"], allow_methods=["*"], allow_headers=["*"], ) # ========================= ARF COMPONENTS ========================= risk_engine = RiskEngine() faiss_index = create_faiss_index(dim=MemoryConstants.VECTOR_DIM) memory = RAGGraphMemory(faiss_index) # In‑memory storage for demo purposes (replace with a real DB later) decision_history = [] # ========================= PYDANTIC MODELS ========================= class EvaluateRequest(BaseModel): service_name: str event_type: str severity: str metrics: Dict[str, float] = {} class EvaluateResponse(BaseModel): risk_score: float base_risk: float memory_risk: Optional[float] = None weight: float similar_events: list = [] confidence: float # ========================= HELPER: Demo Intent ========================= class _DemoIntent: environment = "dev" deployment_target = "dev" service_name = "demo" # ========================= API ENDPOINTS ========================= @fastapi_app.get("/") async def root(): """Root endpoint – returns a welcome message.""" return {"message": "ARF v4 API. See /docs for documentation."} @fastapi_app.get("/health") async def health(): return {"status": "ok", "version": "4.0.0"} @fastapi_app.get("/v1/get_risk") async def get_risk(): """Return the current demo risk.""" intent = _DemoIntent() risk_value, explanation, contributions = risk_engine.calculate_risk( intent=intent, cost_estimate=None, policy_violations=[], ) decision = "approve" if risk_value > 0.8: decision = "deny" elif risk_value > 0.2: decision = "escalate" decision_id = str(uuid.uuid4()) decision_history.append({ "decision_id": decision_id, "timestamp": datetime.now(timezone.utc).isoformat(), "risk_score": float(risk_value), "outcome": None, # will be filled when feedback is given }) return { "system_risk": float(risk_value), "status": "critical" if risk_value > 0.8 else "normal", "explanation": explanation, "contributions": contributions, "decision_id": decision_id, "decision": decision, "timestamp": datetime.now(timezone.utc).isoformat() } @fastapi_app.get("/v1/history") async def get_history(): """Return the last 10 decisions.""" return decision_history[-10:] @fastapi_app.post("/v1/incidents/evaluate", response_model=EvaluateResponse) async def evaluate_incident(request: EvaluateRequest): """ Evaluate an incident and return a risk score with explainability. This is a placeholder – replace with actual call to your risk engine. """ # For now, return a dummy response return EvaluateResponse( risk_score=0.23, base_risk=0.15, memory_risk=0.3, weight=0.5, similar_events=[], confidence=0.9 ) @fastapi_app.post("/v1/feedback") async def record_outcome(decision_id: str, success: bool): """Record the outcome of a decision (success/failure).""" for dec in decision_history: if dec["decision_id"] == decision_id: dec["outcome"] = "success" if success else "failure" # Update the risk engine (optional) intent = _DemoIntent() try: risk_engine.update_outcome(intent, success) except Exception as e: logger.exception("Outcome update failed") return {"status": "ok", "decision_id": decision_id, "outcome": dec["outcome"]} return {"error": "decision not found"} # ========================= GRADIO UI (Main App) ========================= # Gradio interface will be served at the root path ("/"). # The FastAPI app is mounted under the "/api" subpath. def get_risk_snapshot(): try: intent = _DemoIntent() risk_value, explanation, contributions = risk_engine.calculate_risk( intent=intent, cost_estimate=None, policy_violations=[], ) decision = "approve" if risk_value > 0.8: decision = "deny" elif risk_value > 0.2: decision = "escalate" decision_id = str(uuid.uuid4()) decision_history.append({ "decision_id": decision_id, "timestamp": datetime.now(timezone.utc).isoformat(), "risk_score": float(risk_value), "outcome": None, }) return { "risk": float(risk_value), "status": "critical" if risk_value > 0.8 else "normal", "explanation": explanation, "contributions": contributions, "decision_id": decision_id, "decision": decision, "timestamp": datetime.now(timezone.utc).isoformat() } except Exception as e: logger.exception("Failed to compute risk snapshot") return {"error": str(e)} def get_health_snapshot(): return {"status": "ok", "version": "4.0.0", "service": "ARF OSS API", "timestamp": datetime.now(timezone.utc).isoformat()} def get_memory_snapshot(): if memory.has_historical_data(): return {"status": "ok", "memory_stats": memory.get_graph_stats(), "timestamp": datetime.now(timezone.utc).isoformat()} return {"status": "empty", "memory_stats": "No historical memory yet.", "timestamp": datetime.now(timezone.utc).isoformat()} def record_outcome_ui(success: bool): if not decision_history: return {"error": "no decisions yet"} last = decision_history[-1] last["outcome"] = "success" if success else "failure" intent = _DemoIntent() try: risk_engine.update_outcome(intent, success) except Exception as e: logger.exception("Outcome update failed") return {"decision_id": last["decision_id"], "outcome": last["outcome"], "timestamp": datetime.now(timezone.utc).isoformat()} with gr.Blocks(title="ARF v4 Demo") as demo: gr.Markdown("# Agentic Reliability Framework v4\n### Probabilistic Infrastructure Governance") with gr.Row(): health_output = gr.JSON(label="Health") risk_output = gr.JSON(label="Current Risk") with gr.Row(): memory_output = gr.JSON(label="Memory Stats") with gr.Row(): decision_output = gr.JSON(label="Recent Decisions") with gr.Row(): refresh_btn = gr.Button("Evaluate Intent") success_btn = gr.Button("Action Succeeded") fail_btn = gr.Button("Action Failed") refresh_btn.click(fn=get_risk_snapshot, outputs=risk_output) success_btn.click(fn=lambda: record_outcome_ui(True), outputs=decision_output) fail_btn.click(fn=lambda: record_outcome_ui(False), outputs=decision_output) with gr.Row(): health_btn = gr.Button("Refresh Health") memory_btn = gr.Button("Refresh Memory") history_btn = gr.Button("Show Decision History") health_btn.click(fn=get_health_snapshot, outputs=health_output) memory_btn.click(fn=get_memory_snapshot, outputs=memory_output) history_btn.click(fn=lambda: decision_history[-10:], outputs=decision_output) # ========================= Mount FastAPI under /api ========================= # This mounts the Gradio interface onto the FastAPI app. # The resulting `app` is a FastAPI application that serves both: # - The API at `/api` (and its subpaths) # - The Gradio UI at the root `/` app = gr.mount_gradio_app(fastapi_app, demo, path="/api") # Note: No manual server start! The Dockerfile will run uvicorn on this `app` object. # Make sure your Dockerfile's CMD is: CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "7860"]