""" Enterprise-Grade FastAPI Backend for ARF OSS Demo Uses real ARF components, no simulation """ from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any import uuid from datetime import datetime import logging # Real ARF OSS imports from agentic_reliability_framework.engine import ( v3_reliability, healing_policies, mcp_client, business ) from agentic_reliability_framework.memory import rag_graph from arf_orchestrator import ARFOrchestrator from memory_store import ARFMemoryStore # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app = FastAPI( title="ARF OSS Real Engine", version="3.3.9", description="Real ARF OSS backend - Bayesian risk, RAG memory, MCP client" ) # CORS for Replit UI app.add_middleware( CORSMiddleware, allow_origins=["*"], # Replit domains will be added allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============== PYDANTIC MODELS ============== # Match Replit UI exactly class ActionRequest(BaseModel): """Matches Replit UI's action structure""" id: Optional[int] = None incidentId: Optional[int] = None description: str = Field(..., description="Human-readable description") proposedAction: str = Field(..., description="Actual command") confidenceScore: float = Field(..., ge=0.0, le=1.0) riskLevel: str = Field(..., regex="^(LOW|MEDIUM|HIGH|CRITICAL)$") requiredLevel: Optional[str] = None requiresHuman: bool = False rollbackFeasible: bool = True metadata: Optional[Dict[str, Any]] = None class ConfigUpdateRequest(BaseModel): confidenceThreshold: Optional[float] = Field(None, ge=0.5, le=1.0) maxAutonomousRisk: Optional[str] = Field(None, regex="^(LOW|MEDIUM|HIGH|CRITICAL)$") riskScoreThresholds: Optional[Dict[str, float]] = None class GateResult(BaseModel): """Matches Replit UI's gate display""" gate: str reason: str passed: bool threshold: Optional[float] = None actual: Optional[float] = None metadata: Optional[Dict[str, Any]] = None class EvaluationResponse(BaseModel): """Matches Replit UI's expected response""" allowed: bool requiredLevel: str gatesTriggered: List[GateResult] shouldEscalate: bool escalationReason: Optional[str] = None executionLadder: Optional[Dict[str, Any]] = None # ============== INITIALIZE REAL ARF ============== arf = ARFOrchestrator() memory = ARFMemoryStore() # Light persistence with RAG # ============== API ENDPOINTS ============== @app.get("/api/v1/config") async def get_config(): """Get current ARF configuration - real OSS config""" return { "confidenceThreshold": arf.policy_engine.config.confidence_threshold, "maxAutonomousRisk": arf.policy_engine.config.max_autonomous_risk, "riskScoreThresholds": arf.policy_engine.config.risk_thresholds } @app.post("/api/v1/config") async def update_config(config: ConfigUpdateRequest): """Update ARF configuration - live updates""" try: if config.confidenceThreshold: arf.policy_engine.update_confidence_threshold(config.confidenceThreshold) if config.maxAutonomousRisk: arf.policy_engine.update_max_risk(config.maxAutonomousRisk) # Log config change for audit logger.info(f"Config updated: {config.dict(exclude_unset=True)}") return await get_config() except Exception as e: logger.error(f"Config update failed: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.post("/api/v1/evaluate") async def evaluate_action(action: ActionRequest): """ Real ARF OSS evaluation pipeline Used by Replit UI's ARFPlayground component """ try: start_time = datetime.utcnow() # 1. Bayesian risk assessment (real) risk_assessment = arf.risk_engine.assess( action_text=action.proposedAction, context={ "description": action.description, "risk_level": action.riskLevel, "requires_human": action.requiresHuman, "rollback_feasible": action.rollbackFeasible } ) # 2. MCP client check (real) mcp_result = await arf.mcp_client.evaluate( action=action.proposedAction, risk_score=risk_assessment.score, confidence=action.confidenceScore ) # 3. Policy evaluation (real OSS - advisory) policy_result = arf.policy_engine.evaluate( action=action.proposedAction, risk_assessment=risk_assessment, confidence=action.confidenceScore, mode="advisory" # OSS mode ) # 4. RAG memory recall (light persistence) similar_incidents = memory.find_similar( action=action.proposedAction, risk_score=risk_assessment.score, limit=5 ) # 5. Build gate results for Replit UI gates = [ GateResult( gate="confidence_threshold", reason=f"Confidence {action.confidenceScore:.2f} meets threshold {arf.policy_engine.config.confidence_threshold}" if action.confidenceScore >= arf.policy_engine.config.confidence_threshold else f"Confidence {action.confidenceScore:.2f} below threshold {arf.policy_engine.config.confidence_threshold}", passed=action.confidenceScore >= arf.policy_engine.config.confidence_threshold, threshold=arf.policy_engine.config.confidence_threshold, actual=action.confidenceScore ), GateResult( gate="risk_assessment", reason=f"Risk level {action.riskLevel} within autonomous range (≤ {arf.policy_engine.config.max_autonomous_risk})" if arf._risk_level_allowed(action.riskLevel) else f"Risk level {action.riskLevel} exceeds autonomous threshold", passed=arf._risk_level_allowed(action.riskLevel), metadata={ "maxAutonomousRisk": arf.policy_engine.config.max_autonomous_risk, "actionRisk": action.riskLevel } ), GateResult( gate="rollback_feasibility", reason="Non-destructive operation" if not arf._is_destructive(action.proposedAction) else "Has rollback plan" if action.rollbackFeasible else "Destructive operation lacks rollback plan", passed=not arf._is_destructive(action.proposedAction) or action.rollbackFeasible, metadata={ "isDestructive": arf._is_destructive(action.proposedAction), "requiresRollback": arf._is_destructive(action.proposedAction) } ), GateResult( gate="human_review", reason="Human review not required" if not action.requiresHuman else "Human review required by policy", passed=not action.requiresHuman, metadata={"policyRequiresHuman": action.requiresHuman} ), GateResult( gate="license_check", reason="OSS edition - advisory only", passed=True, # OSS always passes license check metadata={"licenseSensitive": False, "edition": "OSS"} ) ] # Add MCP result as gate if mcp_result: gates.append(GateResult( gate="mcp_validation", reason=mcp_result.reason, passed=mcp_result.passed, metadata=mcp_result.metadata )) # Add novel action check if similar incidents exist if similar_incidents and len(similar_incidents) < 2: gates.append(GateResult( gate="novel_action_review", reason="Action pattern rarely seen in historical data", passed=False, metadata={"similar_count": len(similar_incidents)} )) # Determine final decision all_passed = all(g.passed for g in gates) # Store in memory for future recall memory.store_evaluation( action=action.proposedAction, risk_score=risk_assessment.score, gates=gates, allowed=all_passed, timestamp=start_time ) # Log for lead scoring logger.info(f"Evaluation complete: action={action.description[:30]}..., allowed={all_passed}") # Track enterprise interest signals if not all_passed and any(g.gate == "novel_action_review" for g in gates if not g.passed): # Novel action that would need Enterprise review board memory.track_enterprise_signal("novel_action", action.proposedAction) elif risk_assessment.score > 0.8 and not all_passed: memory.track_enterprise_signal("high_risk_blocked", action.proposedAction) return EvaluationResponse( allowed=all_passed, requiredLevel=arf._determine_required_level(all_passed, action.riskLevel), gatesTriggered=gates, shouldEscalate=not all_passed, escalationReason=None if all_passed else "Failed mechanical gates", executionLadder=arf._build_execution_ladder(gates) ) except Exception as e: logger.error(f"Evaluation failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v1/memory/similar") async def get_similar_actions(action: str, limit: int = 5): """RAG memory recall - similar historical evaluations""" return memory.find_similar(action, limit=limit) @app.get("/api/v1/audit/stream") async def get_audit_logs(limit: int = 50): """Audit stream for Replit UI""" return memory.get_recent_logs(limit) @app.post("/api/v1/process") async def process_action(action: ActionRequest): """ Full ARF pipeline with MCP Still advisory only in OSS """ evaluation = await evaluate_action(action) # In OSS, always advisory return { "evaluation": evaluation.dict(), "execution": { "status": "advisory_only", "message": "OSS edition provides advisory only. Enterprise adds execution.", "would_execute": evaluation.allowed and not evaluation.shouldEscalate }, "next_steps": { "enterprise_demo": "https://calendly.com/petter2025us/arf-demo" if evaluation.allowed else None } } @app.get("/api/v1/enterprise/signals") async def get_enterprise_signals(): """Lead intelligence - actions that indicate Enterprise need""" return memory.get_enterprise_signals() # Health check @app.get("/health") async def health_check(): return { "status": "healthy", "arf_version": "3.3.9", "oss_mode": True, "memory_enabled": memory.is_enabled }