| import os |
| import random |
| import json |
| import time |
| import numpy as np |
| import pandas as pd |
| import requests |
| from datetime import datetime |
| from sklearn.metrics.pairwise import cosine_similarity |
| from sentence_transformers import SentenceTransformer |
| import faiss |
| import gradio as gr |
|
|
| |
| |
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
| if not HF_TOKEN and os.path.exists(".env"): |
| try: |
| with open(".env", "r") as f: |
| HF_TOKEN = f.read().strip() |
| except Exception: |
| HF_TOKEN = "" |
|
|
| if HF_TOKEN: |
| print("✅ Hugging Face token loaded successfully.") |
| else: |
| print("⚠️ No Hugging Face token found. Running in fallback/local mode.") |
|
|
| |
| |
| |
| HF_API_URL = "https://router.huggingface.co/hf-inference" |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
| DATA_DIR = "./data" |
| os.makedirs(DATA_DIR, exist_ok=True) |
|
|
| |
| |
| |
| model = SentenceTransformer("all-MiniLM-L6-v2") |
| VECTOR_DIM = model.get_sentence_embedding_dimension() |
| FAISS_PATH = os.path.join(DATA_DIR, "incident_memory.faiss") |
| META_PATH = os.path.join(DATA_DIR, "incidents.json") |
|
|
| |
| if os.path.exists(FAISS_PATH): |
| try: |
| index = faiss.read_index(FAISS_PATH) |
| with open(META_PATH, "r") as f: |
| incident_memory = json.load(f) |
| print(f"✅ Loaded {len(incident_memory)} past incidents from FAISS.") |
| except Exception: |
| print("⚠️ Failed to load FAISS index. Starting fresh.") |
| index = faiss.IndexFlatL2(VECTOR_DIM) |
| incident_memory = [] |
| else: |
| index = faiss.IndexFlatL2(VECTOR_DIM) |
| incident_memory = [] |
|
|
| |
| |
| |
| def detect_anomaly(event): |
| """Detects anomalies based on latency/error_rate thresholds, with forced random noise.""" |
| force_anomaly = random.random() < 0.25 |
| if force_anomaly or event["latency"] > 150 or event["error_rate"] > 0.05: |
| return True |
| return False |
|
|
| |
| |
| |
| def analyze_event(event): |
| prompt = ( |
| f"Analyze this telemetry event and suggest a healing action:\n" |
| f"Component: {event['component']}\n" |
| f"Latency: {event['latency']}\n" |
| f"Error Rate: {event['error_rate']}\n" |
| f"Detected Anomaly: {event['anomaly']}\n" |
| ) |
|
|
| if not HF_TOKEN: |
| return "Local mode: analysis unavailable (no token).", "No action taken." |
|
|
| try: |
| response = requests.post( |
| f"{HF_API_URL}/mistralai/Mixtral-8x7B-Instruct-v0.1", |
| headers=headers, |
| json={"inputs": prompt}, |
| timeout=10, |
| ) |
| if response.status_code == 200: |
| result = response.json() |
| text = ( |
| result[0]["generated_text"] |
| if isinstance(result, list) and "generated_text" in result[0] |
| else str(result) |
| ) |
| return text, choose_healing_action(event, text) |
| else: |
| return f"Error {response.status_code}: {response.text}", "No actionable step detected." |
| except Exception as e: |
| return f"Error generating analysis: {e}", "No actionable step detected." |
|
|
| |
| |
| |
| def choose_healing_action(event, analysis_text): |
| possible_actions = [ |
| "Restarted container", |
| "Scaled service replicas", |
| "Cleared queue backlog", |
| "Invalidated cache", |
| "Re-deployed model endpoint", |
| ] |
| if "restart" in analysis_text.lower(): |
| return "Restarted container" |
| elif "scale" in analysis_text.lower(): |
| return "Scaled service replicas" |
| elif "cache" in analysis_text.lower(): |
| return "Invalidated cache" |
| return random.choice(possible_actions) |
|
|
| |
| |
| |
| def record_and_search_similar(event, analysis_text): |
| """Store each event vector in FAISS and search for similar incidents.""" |
| description = ( |
| f"Component: {event['component']} | " |
| f"Latency: {event['latency']} | " |
| f"ErrorRate: {event['error_rate']} | " |
| f"Analysis: {analysis_text}" |
| ) |
| embedding = model.encode(description).astype("float32").reshape(1, -1) |
|
|
| similar_info = "" |
| if len(incident_memory) > 0 and index.ntotal > 0: |
| k = min(3, len(incident_memory)) |
| D, I = index.search(embedding, k) |
| similar = [incident_memory[i]["description"] for i in I[0] if D[0][0] < 0.5] |
| if similar: |
| similar_info = f"Found {len(similar)} similar incidents (e.g., {similar[0][:120]}...)." |
|
|
| |
| incident_memory.append({"description": description}) |
| index.add(embedding) |
|
|
| |
| faiss.write_index(index, FAISS_PATH) |
| with open(META_PATH, "w") as f: |
| json.dump(incident_memory, f) |
|
|
| return similar_info |
|
|
| |
| |
| |
| event_log = [] |
|
|
| def process_event(component, latency, error_rate): |
| event = { |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "component": component, |
| "latency": latency, |
| "error_rate": error_rate, |
| } |
|
|
| event["anomaly"] = detect_anomaly(event) |
| status = "Anomaly" if event["anomaly"] else "Normal" |
| analysis, healing = analyze_event(event) |
| similar = record_and_search_similar(event, analysis) |
| healing = f"{healing} {similar}".strip() |
|
|
| event["status"] = status |
| event["analysis"] = analysis |
| event["healing_action"] = healing |
| event_log.append(event) |
|
|
| df = pd.DataFrame(event_log[-20:]) |
| return f"✅ Event Processed ({status})", df |
|
|
| |
| |
| |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown("## 🧠 Agentic Reliability Framework MVP") |
| gr.Markdown("Adaptive anomaly detection + AI-driven self-healing + vector memory (FAISS persistent)") |
|
|
| component = gr.Textbox(label="Component", value="api-service") |
| latency = gr.Slider(10, 400, value=100, label="Latency (ms)") |
| error_rate = gr.Slider(0.0, 0.2, value=0.02, label="Error Rate") |
|
|
| submit = gr.Button("🚀 Submit Telemetry Event", variant="primary") |
| output = gr.Textbox(label="Detection Output") |
| table = gr.Dataframe(label="Recent Events (Last 20)") |
|
|
| submit.click(process_event, [component, latency, error_rate], [output, table]) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|