petter2025's picture
Update app.py
414407c verified
raw
history blame
6.25 kB
import os
import json
import random
import datetime
import numpy as np
import gradio as gr
import requests
import faiss
from sentence_transformers import SentenceTransformer
from filelock import FileLock
# === Config ===
HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
HF_API_URL = "https://router.huggingface.co/hf-inference/v1/completions"
HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
print("✅ Hugging Face token loaded." if HF_TOKEN else "⚠️ No HF token found, using local analysis mode.")
# === Persistent FAISS Setup ===
VECTOR_DIM = 384
INDEX_FILE = "incident_vectors.index"
TEXTS_FILE = "incident_texts.json"
LOCK_FILE = "incident.lock"
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
def load_faiss_index():
if os.path.exists(INDEX_FILE) and os.path.exists(TEXTS_FILE):
index = faiss.read_index(INDEX_FILE)
with open(TEXTS_FILE, "r") as f:
texts = json.load(f)
return index, texts
else:
return faiss.IndexFlatL2(VECTOR_DIM), []
index, incident_texts = load_faiss_index()
def save_index():
"""Persist FAISS + metadata safely."""
with FileLock(LOCK_FILE):
faiss.write_index(index, INDEX_FILE)
with open(TEXTS_FILE, "w") as f:
json.dump(incident_texts, f)
# === Event Memory ===
events = []
# === Core Logic ===
def detect_anomaly(event):
latency = event["latency"]
error_rate = event["error_rate"]
# Occasional forced anomaly for testing
if random.random() < 0.25:
return True
return latency > 150 or error_rate > 0.05
def local_reliability_analysis(prompt: str):
"""Local semantic fallback analysis via vector similarity."""
embedding = model.encode([prompt])
index.add(np.array(embedding, dtype=np.float32))
incident_texts.append(prompt)
save_index()
if len(incident_texts) > 1:
D, I = index.search(np.array(embedding, dtype=np.float32), k=min(3, len(incident_texts)))
similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)]
return f"Local insight: {len(similar)} similar reliability events detected."
else:
return "Local insight: Initial incident stored."
def call_huggingface_analysis(prompt):
"""Hybrid HF/local analysis with graceful fallback."""
if not HF_TOKEN:
return local_reliability_analysis(prompt)
try:
payload = {
"model": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"prompt": prompt,
"max_tokens": 200,
"temperature": 0.3,
}
response = requests.post(HF_API_URL, headers=HEADERS, json=payload, timeout=10)
if response.status_code == 200:
result = response.json()
return result.get("choices", [{}])[0].get("text", "").strip() or local_reliability_analysis(prompt)
else:
print(f"⚠️ HF router error {response.status_code}: {response.text[:80]}...")
return local_reliability_analysis(prompt)
except Exception as e:
print(f"⚠️ HF inference error: {e}")
return local_reliability_analysis(prompt)
def simulate_healing(event):
actions = [
"Restarted container",
"Scaled up instance",
"Cleared queue backlog",
"No actionable step detected."
]
return random.choice(actions)
def analyze_event(component, latency, error_rate):
event = {
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"component": component,
"latency": latency,
"error_rate": error_rate,
}
event["anomaly"] = detect_anomaly(event)
event["status"] = "Anomaly" if event["anomaly"] else "Normal"
prompt = (
f"Component: {component}\nLatency: {latency:.2f}ms\nError Rate: {error_rate:.3f}\n"
f"Status: {event['status']}\n\n"
"Provide a short reliability insight or root cause."
)
analysis = call_huggingface_analysis(prompt)
event["analysis"] = analysis
event["healing_action"] = simulate_healing(event)
# Vector memory persistence
vec_text = f"{component} {latency} {error_rate} {analysis}"
vec = model.encode([vec_text])
index.add(np.array(vec, dtype=np.float32))
incident_texts.append(vec_text)
save_index()
# Retrieve similar
if len(incident_texts) > 1:
D, I = index.search(vec, k=min(3, len(incident_texts)))
similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)]
if similar:
event["healing_action"] += f" Found {len(similar)} similar incidents (e.g., {similar[0][:120]}...)."
else:
event["healing_action"] += " - Not enough incidents stored yet."
events.append(event)
return json.dumps(event, indent=2)
# === UI ===
def submit_event(component, latency, error_rate):
result = analyze_event(component, latency, error_rate)
parsed = json.loads(result)
table = [
[e["timestamp"], e["component"], e["latency"], e["error_rate"],
e["status"], e["analysis"], e["healing_action"]]
for e in events[-20:]
]
return (
f"✅ Event Processed ({parsed['status']})",
gr.Dataframe(
headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"],
value=table,
),
)
with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo:
gr.Markdown("## 🧠 Agentic Reliability Framework MVP\nAdaptive anomaly detection + AI-driven self-healing + persistent FAISS memory.")
with gr.Row():
component = gr.Textbox(label="Component", value="api-service")
latency = gr.Slider(10, 400, value=100, step=1, label="Latency (ms)")
error_rate = gr.Slider(0, 0.2, value=0.02, step=0.001, label="Error Rate")
submit = gr.Button("🚀 Submit Telemetry Event")
output_text = gr.Textbox(label="Detection Output")
table_output = gr.Dataframe(
headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"]
)
submit.click(fn=submit_event, inputs=[component, latency, error_rate], outputs=[output_text, table_output])
demo.launch(server_name="0.0.0.0", server_port=7860)