File size: 10,385 Bytes
6a3df22 ba59239 e94f0ea 6a3df22 5c55cb5 e94f0ea ba59239 0b2d10e 6a3df22 414407c 6a3df22 82009c8 e94f0ea ba59239 e94f0ea 414407c 6a3df22 e94f0ea 414407c 6a3df22 e94f0ea 414407c 6a3df22 414407c e94f0ea 6a3df22 414407c 6a3df22 e94f0ea 6a3df22 e94f0ea ba59239 6a3df22 ba59239 e94f0ea 6a3df22 e94f0ea ba59239 e94f0ea ba59239 414407c 6a3df22 ba59239 414407c 82009c8 ba59239 e94f0ea 6a3df22 ba59239 6a3df22 414407c ba59239 6a3df22 414407c 82009c8 e94f0ea 9fa5ff3 e94f0ea 9fa5ff3 e94f0ea 82009c8 e94f0ea ba59239 6a3df22 5c55cb5 e94f0ea 9fa5ff3 6a3df22 5c55cb5 414407c e94f0ea 6a3df22 e94f0ea d97b7c8 e94f0ea ba59239 414407c e94f0ea 6a3df22 414407c 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 e94f0ea 6a3df22 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 | # app.py - Agentic Reliability Framework MVP
# Drop-in replacement: supports Gradio UI + FastAPI REST endpoints (/semantic-search, /add-event, /recent-events)
import os
import json
import random
import datetime
import threading
import numpy as np
import gradio as gr
import requests
import faiss
from fastapi import FastAPI, Query, Body, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sentence_transformers import SentenceTransformer
from filelock import FileLock
import uvicorn
from pydantic import BaseModel, Field
# === Config ===
HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
HF_API_URL = "https://router.huggingface.co/hf-inference/v1/completions"
HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
print("✅ Hugging Face token loaded." if HF_TOKEN else "⚠️ No HF token found, using local analysis mode.")
# === Persistence / FAISS config ===
VECTOR_DIM = 384
INDEX_FILE = "incident_vectors.index"
TEXTS_FILE = "incident_texts.json"
LOCK_FILE = "incident.lock"
# Sentence-transformers model (small and fast)
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
def load_faiss_index():
if os.path.exists(INDEX_FILE) and os.path.exists(TEXTS_FILE):
try:
idx = faiss.read_index(INDEX_FILE)
with open(TEXTS_FILE, "r") as f:
texts = json.load(f)
return idx, texts
except Exception as e:
print(f"⚠️ Failed to load index/texts: {e} — creating new in-memory index.")
return faiss.IndexFlatL2(VECTOR_DIM), []
index, incident_texts = load_faiss_index()
def save_index():
"""Persist FAISS + metadata atomically using a file lock."""
with FileLock(LOCK_FILE):
try:
faiss.write_index(index, INDEX_FILE)
with open(TEXTS_FILE, "w") as f:
json.dump(incident_texts, f)
except Exception as e:
print(f"⚠️ Error saving index/texts: {e}")
# === In-memory events list ===
events = []
# === Core logic ===
def detect_anomaly(event):
latency = event["latency"]
error_rate = event["error_rate"]
# occasional forced anomaly for testing
if random.random() < 0.25:
return True
return latency > 150 or error_rate > 0.05
def local_reliability_analysis(prompt: str):
"""Local fallback analysis using semantic similarity and simple heuristic text reply."""
try:
embedding = model.encode([prompt])
# store the prompt as a data point (so local memory grows)
index.add(np.array(embedding, dtype=np.float32))
incident_texts.append(prompt)
save_index()
if len(incident_texts) > 1:
D, I = index.search(np.array(embedding, dtype=np.float32), k=min(3, len(incident_texts)))
similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)]
return f"Local insight: found {len(similar)} similar incident(s)."
return "Local insight: first incident stored."
except Exception as e:
return f"Local analysis error: {e}"
def call_huggingface_analysis(prompt: str):
"""Try HF router -> on failure fall back to local analysis."""
if not HF_TOKEN:
return local_reliability_analysis(prompt)
try:
payload = {
"model": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"prompt": prompt,
"max_tokens": 200,
"temperature": 0.3,
}
resp = requests.post(HF_API_URL, headers=HEADERS, json=payload, timeout=12)
if resp.status_code == 200:
result = resp.json()
# router output shapes vary; try to be defensive
text = ""
if isinstance(result, dict):
# common HF completion shape
choices = result.get("choices") or []
if choices:
text = choices[0].get("text") or choices[0].get("message", {}).get("content", "")
else:
text = result.get("generated_text") or ""
elif isinstance(result, list) and result:
text = result[0].get("text", "")
return (text or local_reliability_analysis(prompt)).strip()
else:
print(f"⚠️ HF router returned {resp.status_code}: {resp.text[:200]}")
return local_reliability_analysis(prompt)
except Exception as e:
print(f"⚠️ HF inference call error: {e}")
return local_reliability_analysis(prompt)
def simulate_healing(event):
actions = [
"Restarted container",
"Scaled up instance",
"Cleared queue backlog",
"No actionable step detected."
]
return random.choice(actions)
def analyze_event(component: str, latency: float, error_rate: float):
"""Process one event end-to-end and persist vector memory."""
event = {
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"component": component,
"latency": float(latency),
"error_rate": float(error_rate),
}
event["anomaly"] = detect_anomaly(event)
event["status"] = "Anomaly" if event["anomaly"] else "Normal"
prompt = (
f"Component: {component}\nLatency: {latency:.2f}ms\nError Rate: {error_rate:.3f}\n"
f"Status: {event['status']}\n\nProvide a one-line reliability insight or likely root cause."
)
analysis = call_huggingface_analysis(prompt)
event["analysis"] = analysis
event["healing_action"] = simulate_healing(event)
# persist vector memory (text + embedding)
vec_text = f"{component} {latency} {error_rate} {analysis}"
try:
vec = model.encode([vec_text])
index.add(np.array(vec, dtype=np.float32))
incident_texts.append(vec_text)
save_index()
except Exception as e:
print(f"⚠️ Error encoding or saving vector: {e}")
# find similar incidents and append a friendly snippet to healing_action
try:
if len(incident_texts) > 1:
D, I = index.search(vec, k=min(3, len(incident_texts)))
similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)]
if similar:
event["healing_action"] += f" Found {len(similar)} similar incidents (e.g., {similar[0][:120]}...)."
else:
event["healing_action"] += " - Not enough incidents stored yet."
except Exception as e:
print(f"⚠️ Error searching index: {e}")
events.append(event)
# keep events bounded to reasonable size
if len(events) > 1000:
events.pop(0)
return event
# === FastAPI app + models ===
app = FastAPI(title="Agentic Reliability API", version="0.3")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class AddEventModel(BaseModel):
component: str = Field(..., example="api-service")
latency: float = Field(..., ge=0, example=120.5)
error_rate: float = Field(..., ge=0, le=1.0, example=0.03)
@app.post("/add-event")
def add_event(payload: AddEventModel = Body(...)):
"""
Add a telemetry event programmatically.
Body: { "component": "api-service", "latency": 120, "error_rate": 0.03 }
"""
try:
event = analyze_event(payload.component, payload.latency, payload.error_rate)
return {"status": "ok", "event": event}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to add event: {e}")
@app.get("/recent-events")
def recent_events(n: int = Query(20, ge=1, le=200, description="Number of recent events to return")):
"""Return the most recent processed events (default: 20)."""
sliced = events[-n:]
return {"count": len(sliced), "events": sliced[::-1]} # newest first
@app.get("/semantic-search")
def semantic_search(query: str = Query(..., description="Search query for reliability memory"), k: int = 3):
"""Perform semantic similarity search over stored reliability incidents."""
if not incident_texts:
return {"results": [], "message": "No incidents in memory yet."}
try:
embedding = model.encode([query])
D, I = index.search(np.array(embedding, dtype=np.float32), k=min(k, len(incident_texts)))
results = []
for rank, idx in enumerate(I[0]):
if idx < len(incident_texts):
results.append({"text": incident_texts[idx], "distance": float(D[0][rank])})
return {"query": query, "results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Semantic search failed: {e}")
# === Gradio frontend ===
def submit_event(component, latency, error_rate):
ev = analyze_event(component, latency, error_rate)
table = [
[e["timestamp"], e["component"], e["latency"], e["error_rate"],
e["status"], e["analysis"], e["healing_action"]]
for e in events[-20:]
]
return (
f"✅ Event Processed ({ev['status']})",
gr.Dataframe(
headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"],
value=table
)
)
with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo:
gr.Markdown("## 🧠 Agentic Reliability Framework MVP\nAdaptive anomaly detection + AI-driven self-healing + FAISS persistent vector memory.")
with gr.Row():
component = gr.Textbox(label="Component", value="api-service")
latency = gr.Slider(10, 400, value=100, step=1, label="Latency (ms)")
error_rate = gr.Slider(0, 0.2, value=0.02, step=0.001, label="Error Rate")
submit = gr.Button("🚀 Submit Telemetry Event")
output_text = gr.Textbox(label="Detection Output")
table_output = gr.Dataframe(headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"])
submit.click(fn=submit_event, inputs=[component, latency, error_rate], outputs=[output_text, table_output])
# === Launch both servers (Gradio UI + FastAPI) in same process ===
def start_gradio():
demo.launch(server_name="0.0.0.0", server_port=7860, share=False)
if __name__ == "__main__":
# run Gradio in a thread and uvicorn for FastAPI in main thread
t = threading.Thread(target=start_gradio, daemon=True)
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
|