import os import json import random import datetime import numpy as np import gradio as gr import requests import faiss from fastapi import FastAPI, Body, Header, HTTPException from pydantic import BaseModel from sentence_transformers import SentenceTransformer from filelock import FileLock # === Config === HF_TOKEN = os.getenv("HF_TOKEN", "").strip() API_KEY = os.getenv("API_KEY", "").strip() HF_API_URL = "https://router.huggingface.co/hf-inference/v1/completions" HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} # === FAISS Setup === VECTOR_DIM = 384 INDEX_FILE = "incident_vectors.index" TEXTS_FILE = "incident_texts.json" LOCK_FILE = "faiss_save.lock" model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") if os.path.exists(INDEX_FILE): index = faiss.read_index(INDEX_FILE) with open(TEXTS_FILE, "r") as f: incident_texts = json.load(f) else: index = faiss.IndexFlatL2(VECTOR_DIM) incident_texts = [] # === Safe persistence === def save_index(): with FileLock(LOCK_FILE): faiss.write_index(index, INDEX_FILE) with open(TEXTS_FILE, "w") as f: json.dump(incident_texts, f) # === Core logic === events = [] def detect_anomaly(event): """Adaptive threshold-based anomaly detection.""" latency = event["latency"] error_rate = event["error_rate"] # Occasionally flag random anomaly for testing if random.random() < 0.25: return True return latency > 150 or error_rate > 0.05 def call_huggingface_analysis(prompt): """Uses HF Inference API or local fallback.""" if not HF_TOKEN: return "Offline mode: simulated analysis." try: payload = { "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", "prompt": prompt, "max_tokens": 200, "temperature": 0.3, } response = requests.post(HF_API_URL, headers=HEADERS, json=payload, timeout=10) if response.status_code == 200: result = response.json() return result.get("choices", [{}])[0].get("text", "").strip() else: return f"Error {response.status_code}: {response.text}" except Exception as e: return f"Error generating analysis: {e}" def simulate_healing(event): actions = [ "Restarted container", "Scaled up instance", "Cleared queue backlog", "No actionable step detected.", ] return random.choice(actions) def analyze_event(component, latency, error_rate): event = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "component": component, "latency": latency, "error_rate": error_rate, } is_anomaly = detect_anomaly(event) event["anomaly"] = is_anomaly event["status"] = "Anomaly" if is_anomaly else "Normal" prompt = ( f"Component: {component}\nLatency: {latency:.2f}ms\nError Rate: {error_rate:.3f}\n" f"Status: {event['status']}\n\n" "Provide a one-line reliability insight or root cause analysis." ) # AI Reliability analysis analysis = call_huggingface_analysis(prompt) event["analysis"] = analysis # Simulated self-healing healing_action = simulate_healing(event) event["healing_action"] = healing_action # === Vector learning & persistence === vector_text = f"{component} {latency} {error_rate} {analysis}" vec = model.encode([vector_text]) index.add(np.array(vec, dtype=np.float32)) incident_texts.append(vector_text) save_index() # Similar incident lookup if len(incident_texts) > 1: D, I = index.search(vec, k=min(3, len(incident_texts))) similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)] if similar: event["healing_action"] += f" Found {len(similar)} similar incidents (e.g., {similar[0][:100]}...)." else: event["healing_action"] += " - Not enough incidents stored yet." events.append(event) return event # === FastAPI backend === app = FastAPI(title="Agentic Reliability Framework API") class AddEventModel(BaseModel): component: str latency: float error_rate: float def verify_api_key(provided_key: str): if not API_KEY: return True # dev mode return provided_key == API_KEY @app.post("/add-event") def add_event( payload: AddEventModel = Body(...), x_api_key: str = Header(None, alias="X-API-Key"), ): """Add a telemetry event (secured via API key).""" if not verify_api_key(x_api_key): raise HTTPException(status_code=401, detail="Unauthorized: invalid API key.") try: event = analyze_event(payload.component, payload.latency, payload.error_rate) return {"status": "ok", "event": event} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to add event: {e}") # === Gradio Dashboard === def submit_event(component, latency, error_rate): event = analyze_event(component, latency, error_rate) table = [ [ e["timestamp"], e["component"], e["latency"], e["error_rate"], e["status"], e["analysis"], e["healing_action"], ] for e in events[-20:] ] return ( f"✅ Event Processed ({event['status']})", gr.Dataframe( headers=[ "timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action", ], value=table, ), ) with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo: gr.Markdown( "## 🧠 Agentic Reliability Framework MVP\n" "Adaptive anomaly detection + AI-driven self-healing + persistent FAISS memory" ) with gr.Row(): component = gr.Textbox(label="Component", value="api-service") latency = gr.Slider(10, 400, value=100, step=1, label="Latency (ms)") error_rate = gr.Slider(0, 0.2, value=0.02, step=0.001, label="Error Rate") submit = gr.Button("🚀 Submit Telemetry Event") output_text = gr.Textbox(label="Detection Output") table_output = gr.Dataframe( headers=[ "timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action", ] ) submit.click(fn=submit_event, inputs=[component, latency, error_rate], outputs=[output_text, table_output]) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)