File size: 4,290 Bytes
2d521fd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | from app.causal_explainer import CausalExplainer
from fastapi import APIRouter, Depends, Request, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
from enum import Enum
import time
import json
# ===== USAGE TRACKER IMPORTS =====
from app.core.usage_tracker import enforce_quota, UsageRecord, tracker
class HealingAction(str, Enum):
NO_ACTION = "no_action"
RESTART_CONTAINER = "restart_container"
SCALE_OUT = "scale_out"
ROLLBACK = "rollback"
CIRCUIT_BREAKER = "circuit_breaker"
TRAFFIC_SHIFT = "traffic_shift"
ALERT_TEAM = "alert_team"
class ReliabilityEvent(BaseModel):
component: str
latency_p99: float
error_rate: float
service_mesh: str = "default"
cpu_util: Optional[float] = None
memory_util: Optional[float] = None
router = APIRouter()
incident_history = []
@router.post("/report_incident")
async def report_incident(event: ReliabilityEvent):
incident_history.append(event.dict())
return {"status": "recorded"}
@router.post("/v1/incidents/evaluate")
async def evaluate_incident(
request: Request,
event: ReliabilityEvent,
background_tasks: BackgroundTasks,
quota: dict = Depends(enforce_quota)
):
start_time = time.time()
api_key = quota["api_key"]
tier = quota["tier"]
response_data = None
error_msg = None
try:
# Simple risk score (heuristic)
risk_score = min(1.0, (event.latency_p99 / 1000.0) * 0.7 + event.error_rate * 0.3)
if event.latency_p99 > 500 or event.error_rate > 0.15:
optimal_action = HealingAction.RESTART_CONTAINER
else:
optimal_action = HealingAction.NO_ACTION
current_state = {
"latency": event.latency_p99,
"error_rate": event.error_rate,
"last_action": {"action_type": "no_action"}
}
proposed_action = {"action_type": optimal_action.value, "params": {}}
ce = CausalExplainer()
causal_exp = ce.explain_healing_intent(proposed_action, current_state, "latency")
healing_intent = {
"action": optimal_action.value,
"component": event.component,
"parameters": proposed_action["params"],
"justification": f"Causal: {causal_exp.explanation_text}",
"confidence": 0.85,
"risk_score": risk_score,
"status": "oss_advisory_only"
}
response_data = {
"healing_intent": healing_intent,
"causal_explanation": {
"factual_outcome": causal_exp.factual_outcome,
"counterfactual_outcome": causal_exp.counterfactual_outcome,
"effect": causal_exp.effect,
"explanation_text": causal_exp.explanation_text,
"is_model_based": causal_exp.is_model_based,
"warnings": causal_exp.warnings
},
"utility_decision": {
"best_action": optimal_action.value,
"expected_utility": 0.5,
"explanation": "Heuristic decision based on latency/error thresholds"
}
}
# Asynchronous usage logging
if tracker:
record = UsageRecord(
api_key=api_key,
tier=tier,
timestamp=time.time(),
endpoint="/v1/incidents/evaluate",
request_body=event.dict(),
response=response_data,
processing_ms=(time.time() - start_time) * 1000,
)
await tracker.increment_usage_async(record, background_tasks)
return response_data
except HTTPException:
raise
except Exception as e:
error_msg = str(e)
# Log failure in background
if tracker:
record = UsageRecord(
api_key=api_key,
tier=tier,
timestamp=time.time(),
endpoint="/v1/incidents/evaluate",
request_body=event.dict(),
error=error_msg,
processing_ms=(time.time() - start_time) * 1000,
)
await tracker.increment_usage_async(record, background_tasks)
raise HTTPException(status_code=500, detail=error_msg)
|