import os import random import time import pandas as pd import numpy as np import gradio as gr import torch from sentence_transformers import SentenceTransformer import faiss import requests from dotenv import load_dotenv # ======================== # Initialization # ======================== load_dotenv() HF_API_TOKEN = (os.getenv("HF_API_TOKEN") or "").strip() HF_INFERENCE_ENDPOINT = "https://api-inference.huggingface.co/models/distilbert-base-uncased" # fallback in case the token isn't available if not HF_API_TOKEN: print("⚠️ Warning: No HF_API_TOKEN found — using read-only mode (no inference calls).") # Vector memory setup embedder = SentenceTransformer("all-MiniLM-L6-v2") embedding_dim = 384 index = faiss.IndexFlatL2(embedding_dim) incident_memory = [] # stores {vector, metadata} # Helper: create embeddings def embed_text(text): vector = embedder.encode([text], convert_to_numpy=True) return vector # ======================== # Core Functions # ======================== def detect_anomaly(event): """Simple adaptive anomaly detection.""" # Random anomaly forcing for test verification force_anomaly = random.random() < 0.25 # 25% of events become anomalies automatically if force_anomaly or event["latency"] > 150 or event["error_rate"] > 0.05: return "Anomaly" return "Normal" def analyze_with_hf_api(text): """Call Hugging Face Inference API safely.""" if not HF_API_TOKEN: return "⚠️ No API token — running offline simulation." headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} try: response = requests.post( HF_INFERENCE_ENDPOINT, headers=headers, json={"inputs": text}, timeout=5 ) if response.status_code == 200: result = response.json() if isinstance(result, list): return result[0].get("label", "No label") return str(result) else: return f"Error {response.status_code}: {response.text}" except Exception as e: return f"Error generating analysis: {e}" def simulate_healing(event): """Simulate automated remediation based on anomaly context.""" actions = [ "Restarted container", "Scaled up pods", "Cleared queue backlog", "Purged cache and retried" ] if event["status"] == "Anomaly": return random.choice(actions) return "-" def add_to_vector_memory(event): """Store event context in FAISS for post-incident learning.""" text = f"Component: {event['component']} | Latency: {event['latency']} | ErrorRate: {event['error_rate']} | Analysis: {event['analysis']}" vector = embed_text(text) index.add(vector) incident_memory.append({ "vector": vector, "metadata": text }) return len(incident_memory) def find_similar_events(event, top_k=3): """Find semantically similar past incidents.""" if len(incident_memory) < 3: return "Not enough incidents stored yet." text = f"Component: {event['component']} | Latency: {event['latency']} | ErrorRate: {event['error_rate']} | Analysis: {event['analysis']}" query_vec = embed_text(text) distances, indices = index.search(query_vec, top_k) results = [incident_memory[i]["metadata"] for i in indices[0] if i < len(incident_memory)] return f"Found {len(results)} similar incidents (e.g., {results[0][:100]}...)." if results else "No matches found." # ======================== # Event Handling # ======================== events = [] def process_event(component, latency, error_rate): event = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "component": component, "latency": float(latency), "error_rate": float(error_rate), } event["status"] = detect_anomaly(event) event["analysis"] = analyze_with_hf_api(f"{component} latency={latency}, error={error_rate}") event["healing_action"] = simulate_healing(event) # Vector memory & similarity learning add_to_vector_memory(event) event["healing_action"] += " " + find_similar_events(event) events.append(event) if len(events) > 20: events.pop(0) df = pd.DataFrame(events) return "✅ Event Processed", df # ======================== # Gradio UI # ======================== with gr.Blocks(title="Agentic Reliability Framework MVP") as demo: gr.Markdown("## 🧠 Agentic Reliability Framework MVP\nAdaptive anomaly detection + AI-driven self-healing + vector memory") with gr.Row(): component_input = gr.Dropdown( ["api-service", "data-ingestor", "queue-worker", "model-runner"], label="Component", value="api-service" ) latency_input = gr.Number(label="Latency (ms)", value=random.uniform(50, 200)) error_input = gr.Number(label="Error Rate", value=random.uniform(0.01, 0.15)) submit_btn = gr.Button("🚀 Submit Telemetry Event") output_text = gr.Textbox(label="Detection Output") output_table = gr.Dataframe(headers=["timestamp", "component", "latency", "error_rate", "analysis", "status", "healing_action"], label="Recent Events (Last 20)") submit_btn.click( fn=process_event, inputs=[component_input, latency_input, error_input], outputs=[output_text, output_table] ) # ======================== # Launch # ======================== if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)