import os import json import random import datetime import numpy as np import gradio as gr import requests import faiss from sentence_transformers import SentenceTransformer from filelock import FileLock # === Config === HF_TOKEN = os.getenv("HF_TOKEN", "").strip() HF_API_URL = "https://router.huggingface.co/hf-inference/v1/completions" HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} print("✅ Hugging Face token loaded." if HF_TOKEN else "⚠️ No HF token found, using local analysis mode.") # === Persistent FAISS Setup === VECTOR_DIM = 384 INDEX_FILE = "incident_vectors.index" TEXTS_FILE = "incident_texts.json" LOCK_FILE = "incident.lock" model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") def load_faiss_index(): if os.path.exists(INDEX_FILE) and os.path.exists(TEXTS_FILE): index = faiss.read_index(INDEX_FILE) with open(TEXTS_FILE, "r") as f: texts = json.load(f) return index, texts else: return faiss.IndexFlatL2(VECTOR_DIM), [] index, incident_texts = load_faiss_index() def save_index(): """Persist FAISS + metadata safely.""" with FileLock(LOCK_FILE): faiss.write_index(index, INDEX_FILE) with open(TEXTS_FILE, "w") as f: json.dump(incident_texts, f) # === Event Memory === events = [] # === Core Logic === def detect_anomaly(event): latency = event["latency"] error_rate = event["error_rate"] # Occasional forced anomaly for testing if random.random() < 0.25: return True return latency > 150 or error_rate > 0.05 def local_reliability_analysis(prompt: str): """Local semantic fallback analysis via vector similarity.""" embedding = model.encode([prompt]) index.add(np.array(embedding, dtype=np.float32)) incident_texts.append(prompt) save_index() if len(incident_texts) > 1: D, I = index.search(np.array(embedding, dtype=np.float32), k=min(3, len(incident_texts))) similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)] return f"Local insight: {len(similar)} similar reliability events detected." else: return "Local insight: Initial incident stored." def call_huggingface_analysis(prompt): """Hybrid HF/local analysis with graceful fallback.""" if not HF_TOKEN: return local_reliability_analysis(prompt) try: payload = { "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", "prompt": prompt, "max_tokens": 200, "temperature": 0.3, } response = requests.post(HF_API_URL, headers=HEADERS, json=payload, timeout=10) if response.status_code == 200: result = response.json() return result.get("choices", [{}])[0].get("text", "").strip() or local_reliability_analysis(prompt) else: print(f"⚠️ HF router error {response.status_code}: {response.text[:80]}...") return local_reliability_analysis(prompt) except Exception as e: print(f"⚠️ HF inference error: {e}") return local_reliability_analysis(prompt) def simulate_healing(event): actions = [ "Restarted container", "Scaled up instance", "Cleared queue backlog", "No actionable step detected." ] return random.choice(actions) def analyze_event(component, latency, error_rate): event = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "component": component, "latency": latency, "error_rate": error_rate, } event["anomaly"] = detect_anomaly(event) event["status"] = "Anomaly" if event["anomaly"] else "Normal" prompt = ( f"Component: {component}\nLatency: {latency:.2f}ms\nError Rate: {error_rate:.3f}\n" f"Status: {event['status']}\n\n" "Provide a short reliability insight or root cause." ) analysis = call_huggingface_analysis(prompt) event["analysis"] = analysis event["healing_action"] = simulate_healing(event) # Vector memory persistence vec_text = f"{component} {latency} {error_rate} {analysis}" vec = model.encode([vec_text]) index.add(np.array(vec, dtype=np.float32)) incident_texts.append(vec_text) save_index() # Retrieve similar if len(incident_texts) > 1: D, I = index.search(vec, k=min(3, len(incident_texts))) similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)] if similar: event["healing_action"] += f" Found {len(similar)} similar incidents (e.g., {similar[0][:120]}...)." else: event["healing_action"] += " - Not enough incidents stored yet." events.append(event) return json.dumps(event, indent=2) # === UI === def submit_event(component, latency, error_rate): result = analyze_event(component, latency, error_rate) parsed = json.loads(result) table = [ [e["timestamp"], e["component"], e["latency"], e["error_rate"], e["status"], e["analysis"], e["healing_action"]] for e in events[-20:] ] return ( f"✅ Event Processed ({parsed['status']})", gr.Dataframe( headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"], value=table, ), ) with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo: gr.Markdown("## 🧠 Agentic Reliability Framework MVP\nAdaptive anomaly detection + AI-driven self-healing + persistent FAISS memory.") with gr.Row(): component = gr.Textbox(label="Component", value="api-service") latency = gr.Slider(10, 400, value=100, step=1, label="Latency (ms)") error_rate = gr.Slider(0, 0.2, value=0.02, step=0.001, label="Error Rate") submit = gr.Button("🚀 Submit Telemetry Event") output_text = gr.Textbox(label="Detection Output") table_output = gr.Dataframe( headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"] ) submit.click(fn=submit_event, inputs=[component, latency, error_rate], outputs=[output_text, table_output]) demo.launch(server_name="0.0.0.0", server_port=7860)