| |
| |
| import os |
| import json |
| import random |
| import datetime |
| import threading |
| import numpy as np |
| import gradio as gr |
| import requests |
| import faiss |
| from fastapi import FastAPI, Query, Body, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from sentence_transformers import SentenceTransformer |
| from filelock import FileLock |
| import uvicorn |
| from pydantic import BaseModel, Field |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
| HF_API_URL = "https://router.huggingface.co/hf-inference/v1/completions" |
| HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
| print("✅ Hugging Face token loaded." if HF_TOKEN else "⚠️ No HF token found, using local analysis mode.") |
|
|
| |
| VECTOR_DIM = 384 |
| INDEX_FILE = "incident_vectors.index" |
| TEXTS_FILE = "incident_texts.json" |
| LOCK_FILE = "incident.lock" |
|
|
| |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
|
|
| def load_faiss_index(): |
| if os.path.exists(INDEX_FILE) and os.path.exists(TEXTS_FILE): |
| try: |
| idx = faiss.read_index(INDEX_FILE) |
| with open(TEXTS_FILE, "r") as f: |
| texts = json.load(f) |
| return idx, texts |
| except Exception as e: |
| print(f"⚠️ Failed to load index/texts: {e} — creating new in-memory index.") |
| return faiss.IndexFlatL2(VECTOR_DIM), [] |
|
|
| index, incident_texts = load_faiss_index() |
|
|
| def save_index(): |
| """Persist FAISS + metadata atomically using a file lock.""" |
| with FileLock(LOCK_FILE): |
| try: |
| faiss.write_index(index, INDEX_FILE) |
| with open(TEXTS_FILE, "w") as f: |
| json.dump(incident_texts, f) |
| except Exception as e: |
| print(f"⚠️ Error saving index/texts: {e}") |
|
|
| |
| events = [] |
|
|
| |
| def detect_anomaly(event): |
| latency = event["latency"] |
| error_rate = event["error_rate"] |
| |
| if random.random() < 0.25: |
| return True |
| return latency > 150 or error_rate > 0.05 |
|
|
| def local_reliability_analysis(prompt: str): |
| """Local fallback analysis using semantic similarity and simple heuristic text reply.""" |
| try: |
| embedding = model.encode([prompt]) |
| |
| index.add(np.array(embedding, dtype=np.float32)) |
| incident_texts.append(prompt) |
| save_index() |
| if len(incident_texts) > 1: |
| D, I = index.search(np.array(embedding, dtype=np.float32), k=min(3, len(incident_texts))) |
| similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)] |
| return f"Local insight: found {len(similar)} similar incident(s)." |
| return "Local insight: first incident stored." |
| except Exception as e: |
| return f"Local analysis error: {e}" |
|
|
| def call_huggingface_analysis(prompt: str): |
| """Try HF router -> on failure fall back to local analysis.""" |
| if not HF_TOKEN: |
| return local_reliability_analysis(prompt) |
|
|
| try: |
| payload = { |
| "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", |
| "prompt": prompt, |
| "max_tokens": 200, |
| "temperature": 0.3, |
| } |
| resp = requests.post(HF_API_URL, headers=HEADERS, json=payload, timeout=12) |
| if resp.status_code == 200: |
| result = resp.json() |
| |
| text = "" |
| if isinstance(result, dict): |
| |
| choices = result.get("choices") or [] |
| if choices: |
| text = choices[0].get("text") or choices[0].get("message", {}).get("content", "") |
| else: |
| text = result.get("generated_text") or "" |
| elif isinstance(result, list) and result: |
| text = result[0].get("text", "") |
| return (text or local_reliability_analysis(prompt)).strip() |
| else: |
| print(f"⚠️ HF router returned {resp.status_code}: {resp.text[:200]}") |
| return local_reliability_analysis(prompt) |
| except Exception as e: |
| print(f"⚠️ HF inference call error: {e}") |
| return local_reliability_analysis(prompt) |
|
|
| def simulate_healing(event): |
| actions = [ |
| "Restarted container", |
| "Scaled up instance", |
| "Cleared queue backlog", |
| "No actionable step detected." |
| ] |
| return random.choice(actions) |
|
|
| def analyze_event(component: str, latency: float, error_rate: float): |
| """Process one event end-to-end and persist vector memory.""" |
| event = { |
| "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "component": component, |
| "latency": float(latency), |
| "error_rate": float(error_rate), |
| } |
| event["anomaly"] = detect_anomaly(event) |
| event["status"] = "Anomaly" if event["anomaly"] else "Normal" |
|
|
| prompt = ( |
| f"Component: {component}\nLatency: {latency:.2f}ms\nError Rate: {error_rate:.3f}\n" |
| f"Status: {event['status']}\n\nProvide a one-line reliability insight or likely root cause." |
| ) |
|
|
| analysis = call_huggingface_analysis(prompt) |
| event["analysis"] = analysis |
| event["healing_action"] = simulate_healing(event) |
|
|
| |
| vec_text = f"{component} {latency} {error_rate} {analysis}" |
| try: |
| vec = model.encode([vec_text]) |
| index.add(np.array(vec, dtype=np.float32)) |
| incident_texts.append(vec_text) |
| save_index() |
| except Exception as e: |
| print(f"⚠️ Error encoding or saving vector: {e}") |
|
|
| |
| try: |
| if len(incident_texts) > 1: |
| D, I = index.search(vec, k=min(3, len(incident_texts))) |
| similar = [incident_texts[i] for i in I[0] if i < len(incident_texts)] |
| if similar: |
| event["healing_action"] += f" Found {len(similar)} similar incidents (e.g., {similar[0][:120]}...)." |
| else: |
| event["healing_action"] += " - Not enough incidents stored yet." |
| except Exception as e: |
| print(f"⚠️ Error searching index: {e}") |
|
|
| events.append(event) |
| |
| if len(events) > 1000: |
| events.pop(0) |
| return event |
|
|
| |
| app = FastAPI(title="Agentic Reliability API", version="0.3") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| class AddEventModel(BaseModel): |
| component: str = Field(..., example="api-service") |
| latency: float = Field(..., ge=0, example=120.5) |
| error_rate: float = Field(..., ge=0, le=1.0, example=0.03) |
|
|
| @app.post("/add-event") |
| def add_event(payload: AddEventModel = Body(...)): |
| """ |
| Add a telemetry event programmatically. |
| Body: { "component": "api-service", "latency": 120, "error_rate": 0.03 } |
| """ |
| try: |
| event = analyze_event(payload.component, payload.latency, payload.error_rate) |
| return {"status": "ok", "event": event} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to add event: {e}") |
|
|
| @app.get("/recent-events") |
| def recent_events(n: int = Query(20, ge=1, le=200, description="Number of recent events to return")): |
| """Return the most recent processed events (default: 20).""" |
| sliced = events[-n:] |
| return {"count": len(sliced), "events": sliced[::-1]} |
|
|
| @app.get("/semantic-search") |
| def semantic_search(query: str = Query(..., description="Search query for reliability memory"), k: int = 3): |
| """Perform semantic similarity search over stored reliability incidents.""" |
| if not incident_texts: |
| return {"results": [], "message": "No incidents in memory yet."} |
| try: |
| embedding = model.encode([query]) |
| D, I = index.search(np.array(embedding, dtype=np.float32), k=min(k, len(incident_texts))) |
| results = [] |
| for rank, idx in enumerate(I[0]): |
| if idx < len(incident_texts): |
| results.append({"text": incident_texts[idx], "distance": float(D[0][rank])}) |
| return {"query": query, "results": results} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Semantic search failed: {e}") |
|
|
| |
| def submit_event(component, latency, error_rate): |
| ev = analyze_event(component, latency, error_rate) |
| table = [ |
| [e["timestamp"], e["component"], e["latency"], e["error_rate"], |
| e["status"], e["analysis"], e["healing_action"]] |
| for e in events[-20:] |
| ] |
| return ( |
| f"✅ Event Processed ({ev['status']})", |
| gr.Dataframe( |
| headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"], |
| value=table |
| ) |
| ) |
|
|
| with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo: |
| gr.Markdown("## 🧠 Agentic Reliability Framework MVP\nAdaptive anomaly detection + AI-driven self-healing + FAISS persistent vector memory.") |
| with gr.Row(): |
| component = gr.Textbox(label="Component", value="api-service") |
| latency = gr.Slider(10, 400, value=100, step=1, label="Latency (ms)") |
| error_rate = gr.Slider(0, 0.2, value=0.02, step=0.001, label="Error Rate") |
| submit = gr.Button("🚀 Submit Telemetry Event") |
| output_text = gr.Textbox(label="Detection Output") |
| table_output = gr.Dataframe(headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"]) |
| submit.click(fn=submit_event, inputs=[component, latency, error_rate], outputs=[output_text, table_output]) |
|
|
| |
| def start_gradio(): |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |
|
|
| if __name__ == "__main__": |
| |
| t = threading.Thread(target=start_gradio, daemon=True) |
| t.start() |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|