petter2025's picture
Rename hf_demo.py to app.py
6c7e606 verified
raw
history blame
11.4 kB
"""
Enterprise-Grade FastAPI Backend for ARF OSS Demo
Uses real ARF components, no simulation
"""
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
import uuid
from datetime import datetime
import logging
# Real ARF OSS imports
from agentic_reliability_framework.engine import (
v3_reliability,
healing_policies,
mcp_client,
business
)
from agentic_reliability_framework.memory import rag_graph
from arf_orchestrator import ARFOrchestrator
from memory_store import ARFMemoryStore
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI
app = FastAPI(
title="ARF OSS Real Engine",
version="3.3.9",
description="Real ARF OSS backend - Bayesian risk, RAG memory, MCP client"
)
# CORS for Replit UI
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replit domains will be added
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============== PYDANTIC MODELS ==============
# Match Replit UI exactly
class ActionRequest(BaseModel):
"""Matches Replit UI's action structure"""
id: Optional[int] = None
incidentId: Optional[int] = None
description: str = Field(..., description="Human-readable description")
proposedAction: str = Field(..., description="Actual command")
confidenceScore: float = Field(..., ge=0.0, le=1.0)
riskLevel: str = Field(..., regex="^(LOW|MEDIUM|HIGH|CRITICAL)$")
requiredLevel: Optional[str] = None
requiresHuman: bool = False
rollbackFeasible: bool = True
metadata: Optional[Dict[str, Any]] = None
class ConfigUpdateRequest(BaseModel):
confidenceThreshold: Optional[float] = Field(None, ge=0.5, le=1.0)
maxAutonomousRisk: Optional[str] = Field(None, regex="^(LOW|MEDIUM|HIGH|CRITICAL)$")
riskScoreThresholds: Optional[Dict[str, float]] = None
class GateResult(BaseModel):
"""Matches Replit UI's gate display"""
gate: str
reason: str
passed: bool
threshold: Optional[float] = None
actual: Optional[float] = None
metadata: Optional[Dict[str, Any]] = None
class EvaluationResponse(BaseModel):
"""Matches Replit UI's expected response"""
allowed: bool
requiredLevel: str
gatesTriggered: List[GateResult]
shouldEscalate: bool
escalationReason: Optional[str] = None
executionLadder: Optional[Dict[str, Any]] = None
# ============== INITIALIZE REAL ARF ==============
arf = ARFOrchestrator()
memory = ARFMemoryStore() # Light persistence with RAG
# ============== API ENDPOINTS ==============
@app.get("/api/v1/config")
async def get_config():
"""Get current ARF configuration - real OSS config"""
return {
"confidenceThreshold": arf.policy_engine.config.confidence_threshold,
"maxAutonomousRisk": arf.policy_engine.config.max_autonomous_risk,
"riskScoreThresholds": arf.policy_engine.config.risk_thresholds
}
@app.post("/api/v1/config")
async def update_config(config: ConfigUpdateRequest):
"""Update ARF configuration - live updates"""
try:
if config.confidenceThreshold:
arf.policy_engine.update_confidence_threshold(config.confidenceThreshold)
if config.maxAutonomousRisk:
arf.policy_engine.update_max_risk(config.maxAutonomousRisk)
# Log config change for audit
logger.info(f"Config updated: {config.dict(exclude_unset=True)}")
return await get_config()
except Exception as e:
logger.error(f"Config update failed: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.post("/api/v1/evaluate")
async def evaluate_action(action: ActionRequest):
"""
Real ARF OSS evaluation pipeline
Used by Replit UI's ARFPlayground component
"""
try:
start_time = datetime.utcnow()
# 1. Bayesian risk assessment (real)
risk_assessment = arf.risk_engine.assess(
action_text=action.proposedAction,
context={
"description": action.description,
"risk_level": action.riskLevel,
"requires_human": action.requiresHuman,
"rollback_feasible": action.rollbackFeasible
}
)
# 2. MCP client check (real)
mcp_result = await arf.mcp_client.evaluate(
action=action.proposedAction,
risk_score=risk_assessment.score,
confidence=action.confidenceScore
)
# 3. Policy evaluation (real OSS - advisory)
policy_result = arf.policy_engine.evaluate(
action=action.proposedAction,
risk_assessment=risk_assessment,
confidence=action.confidenceScore,
mode="advisory" # OSS mode
)
# 4. RAG memory recall (light persistence)
similar_incidents = memory.find_similar(
action=action.proposedAction,
risk_score=risk_assessment.score,
limit=5
)
# 5. Build gate results for Replit UI
gates = [
GateResult(
gate="confidence_threshold",
reason=f"Confidence {action.confidenceScore:.2f} meets threshold {arf.policy_engine.config.confidence_threshold}"
if action.confidenceScore >= arf.policy_engine.config.confidence_threshold
else f"Confidence {action.confidenceScore:.2f} below threshold {arf.policy_engine.config.confidence_threshold}",
passed=action.confidenceScore >= arf.policy_engine.config.confidence_threshold,
threshold=arf.policy_engine.config.confidence_threshold,
actual=action.confidenceScore
),
GateResult(
gate="risk_assessment",
reason=f"Risk level {action.riskLevel} within autonomous range (≤ {arf.policy_engine.config.max_autonomous_risk})"
if arf._risk_level_allowed(action.riskLevel)
else f"Risk level {action.riskLevel} exceeds autonomous threshold",
passed=arf._risk_level_allowed(action.riskLevel),
metadata={
"maxAutonomousRisk": arf.policy_engine.config.max_autonomous_risk,
"actionRisk": action.riskLevel
}
),
GateResult(
gate="rollback_feasibility",
reason="Non-destructive operation" if not arf._is_destructive(action.proposedAction)
else "Has rollback plan" if action.rollbackFeasible
else "Destructive operation lacks rollback plan",
passed=not arf._is_destructive(action.proposedAction) or action.rollbackFeasible,
metadata={
"isDestructive": arf._is_destructive(action.proposedAction),
"requiresRollback": arf._is_destructive(action.proposedAction)
}
),
GateResult(
gate="human_review",
reason="Human review not required" if not action.requiresHuman
else "Human review required by policy",
passed=not action.requiresHuman,
metadata={"policyRequiresHuman": action.requiresHuman}
),
GateResult(
gate="license_check",
reason="OSS edition - advisory only",
passed=True, # OSS always passes license check
metadata={"licenseSensitive": False, "edition": "OSS"}
)
]
# Add MCP result as gate
if mcp_result:
gates.append(GateResult(
gate="mcp_validation",
reason=mcp_result.reason,
passed=mcp_result.passed,
metadata=mcp_result.metadata
))
# Add novel action check if similar incidents exist
if similar_incidents and len(similar_incidents) < 2:
gates.append(GateResult(
gate="novel_action_review",
reason="Action pattern rarely seen in historical data",
passed=False,
metadata={"similar_count": len(similar_incidents)}
))
# Determine final decision
all_passed = all(g.passed for g in gates)
# Store in memory for future recall
memory.store_evaluation(
action=action.proposedAction,
risk_score=risk_assessment.score,
gates=gates,
allowed=all_passed,
timestamp=start_time
)
# Log for lead scoring
logger.info(f"Evaluation complete: action={action.description[:30]}..., allowed={all_passed}")
# Track enterprise interest signals
if not all_passed and any(g.gate == "novel_action_review" for g in gates if not g.passed):
# Novel action that would need Enterprise review board
memory.track_enterprise_signal("novel_action", action.proposedAction)
elif risk_assessment.score > 0.8 and not all_passed:
memory.track_enterprise_signal("high_risk_blocked", action.proposedAction)
return EvaluationResponse(
allowed=all_passed,
requiredLevel=arf._determine_required_level(all_passed, action.riskLevel),
gatesTriggered=gates,
shouldEscalate=not all_passed,
escalationReason=None if all_passed else "Failed mechanical gates",
executionLadder=arf._build_execution_ladder(gates)
)
except Exception as e:
logger.error(f"Evaluation failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/memory/similar")
async def get_similar_actions(action: str, limit: int = 5):
"""RAG memory recall - similar historical evaluations"""
return memory.find_similar(action, limit=limit)
@app.get("/api/v1/audit/stream")
async def get_audit_logs(limit: int = 50):
"""Audit stream for Replit UI"""
return memory.get_recent_logs(limit)
@app.post("/api/v1/process")
async def process_action(action: ActionRequest):
"""
Full ARF pipeline with MCP
Still advisory only in OSS
"""
evaluation = await evaluate_action(action)
# In OSS, always advisory
return {
"evaluation": evaluation.dict(),
"execution": {
"status": "advisory_only",
"message": "OSS edition provides advisory only. Enterprise adds execution.",
"would_execute": evaluation.allowed and not evaluation.shouldEscalate
},
"next_steps": {
"enterprise_demo": "https://calendly.com/petter2025us/arf-demo" if evaluation.allowed else None
}
}
@app.get("/api/v1/enterprise/signals")
async def get_enterprise_signals():
"""Lead intelligence - actions that indicate Enterprise need"""
return memory.get_enterprise_signals()
# Health check
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"arf_version": "3.3.9",
"oss_mode": True,
"memory_enabled": memory.is_enabled
}