petter2025 commited on
Commit
d97b7c8
·
verified ·
1 Parent(s): 8974b1e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +239 -100
app.py CHANGED
@@ -1,139 +1,278 @@
 
1
  import os
2
  import random
3
  import time
 
 
 
 
 
4
  import gradio as gr
5
  import pandas as pd
 
6
  from huggingface_hub import InferenceClient
7
- from statistics import mean
8
 
9
- # === Initialize Hugging Face client ===
 
 
10
  HF_TOKEN = os.getenv("HF_API_TOKEN")
11
- client = InferenceClient(token=HF_TOKEN)
 
12
 
13
- # === Mock telemetry state ===
14
- events_log = []
15
- anomaly_counter = 0
16
 
17
- # === Configurable parameters ===
18
- ROLLING_WINDOW = 30
19
- LATENCY_BASE_THRESHOLD = 150
20
- ERROR_BASE_THRESHOLD = 0.05
21
 
 
 
 
22
 
23
- def simulate_event(force_anomaly=False):
24
- """Simulate one telemetry datapoint."""
25
- component = random.choice(["api-service", "data-ingestor", "model-runner", "queue-worker"])
26
- if force_anomaly:
27
- latency = round(random.uniform(260, 400), 2)
28
- error_rate = round(random.uniform(0.12, 0.25), 3)
29
- else:
30
- latency = round(random.gauss(150, 60), 2)
31
- error_rate = round(random.random() * 0.2, 3)
32
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
33
- return {"timestamp": timestamp, "component": component, "latency": latency, "error_rate": error_rate}
34
 
 
 
35
 
36
- def adaptive_thresholds():
37
- """Compute dynamic thresholds based on rolling averages."""
38
- if len(events_log) < ROLLING_WINDOW:
39
- return LATENCY_BASE_THRESHOLD, ERROR_BASE_THRESHOLD
40
- latencies = [e["latency"] for e in events_log[-ROLLING_WINDOW:]]
41
- errors = [e["error_rate"] for e in events_log[-ROLLING_WINDOW:]]
42
- adaptive_latency = mean(latencies) * 1.25
43
- adaptive_error = mean(errors) * 1.5
44
- return adaptive_latency, adaptive_error
45
 
 
 
 
 
 
46
 
47
- def detect_anomaly(event):
48
- """Adaptive anomaly detection."""
49
- lat_thresh, err_thresh = adaptive_thresholds()
50
- if event["latency"] > lat_thresh or event["error_rate"] > err_thresh:
51
- return True
52
- return False
 
 
 
 
 
 
 
 
 
 
53
 
 
 
 
54
 
55
- def analyze_cause(event):
56
- """Use LLM to interpret and explain anomalies."""
57
  prompt = f"""
58
- You are an AI reliability engineer analyzing telemetry.
59
- Component: {event['component']}
60
- Latency: {event['latency']}ms
61
- Error Rate: {event['error_rate']}
62
- Timestamp: {event['timestamp']}
63
-
64
- Explain the likely root cause and one safe auto-healing action.
65
- Output in this format:
66
- Cause: <short cause summary>
67
- Action: <short repair suggestion>
68
- """
 
 
 
 
 
 
 
69
  try:
70
- response = client.text_generation(
71
- model="mistralai/Mixtral-8x7B-Instruct-v0.1",
72
- prompt=prompt,
73
- max_new_tokens=180
74
- )
75
- return response.strip()
76
- except Exception as e:
77
- return f"Error generating analysis: {e}"
 
 
 
 
 
 
 
 
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
- def simulate_healing(action_text):
81
- """Mock execution of a self-healing action."""
82
- if "restart" in action_text.lower():
83
- outcome = "✅ Service restarted successfully."
84
- elif "reset" in action_text.lower():
85
- outcome = "✅ Connection reset resolved issue."
86
- elif "cache" in action_text.lower():
87
- outcome = "✅ Cache cleared; metrics normalizing."
 
 
 
88
  else:
89
- outcome = "🕒 Monitoring post-action stabilization."
90
- return outcome
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
 
 
 
92
 
93
- def process_event():
94
- """Simulate event → detect → diagnose → heal → log."""
95
- global anomaly_counter
 
 
96
 
97
- # Force an anomaly every 4 events
98
- anomaly_counter += 1
99
- force_anomaly = anomaly_counter % 4 == 0
 
 
 
 
 
 
 
 
 
 
100
 
101
- event = simulate_event(force_anomaly=force_anomaly)
102
  is_anomaly = detect_anomaly(event)
103
- result = {"event": event, "anomaly": is_anomaly, "analysis": None, "healing_action": None}
 
 
 
 
 
 
 
104
 
105
  if is_anomaly:
106
- analysis = analyze_cause(event)
107
- event["analysis"] = analysis
108
- event["status"] = "Anomaly"
109
-
110
- # Attempt to extract and simulate healing
111
- if "Action:" in analysis:
112
- action_line = analysis.split("Action:")[-1].strip()
113
- healing_outcome = simulate_healing(action_line)
114
- event["healing_action"] = healing_outcome
115
  else:
116
- event["healing_action"] = "No actionable step detected."
117
-
 
 
 
 
 
 
 
118
  else:
119
- event["analysis"] = "-"
120
- event["status"] = "Normal"
121
- event["healing_action"] = "-"
122
 
123
- events_log.append(event)
124
- df = pd.DataFrame(events_log).tail(20)
125
- return f" Event Processed ({event['status']})", df
 
 
 
 
 
 
 
 
 
126
 
 
 
127
 
128
- # === Gradio UI ===
 
 
 
 
 
 
 
 
129
  with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo:
130
- gr.Markdown("# 🧠 Agentic Reliability Framework MVP\n### Adaptive anomaly detection + AI-driven self-healing simulation")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
- run_btn = gr.Button("🚀 Submit Telemetry Event")
133
- status = gr.Textbox(label="Detection Output")
134
- alerts = gr.Dataframe(headers=["timestamp", "component", "latency", "error_rate", "status", "analysis", "healing_action"],
135
- label="Recent Events (Last 20)", wrap=True)
 
 
136
 
137
- run_btn.click(fn=process_event, inputs=None, outputs=[status, alerts])
 
138
 
139
- demo.launch()
 
1
+ # app.py
2
  import os
3
  import random
4
  import time
5
+ import json
6
+ import io
7
+ from datetime import datetime
8
+ from typing import Tuple, Dict, Any, Optional
9
+
10
  import gradio as gr
11
  import pandas as pd
12
+ import matplotlib.pyplot as plt
13
  from huggingface_hub import InferenceClient
 
14
 
15
+ # -------------------------
16
+ # CONFIG
17
+ # -------------------------
18
  HF_TOKEN = os.getenv("HF_API_TOKEN")
19
+ if not HF_TOKEN:
20
+ raise RuntimeError("HF_API_TOKEN environment variable not set in the Space secrets.")
21
 
22
+ # model to use for diagnostics (inference endpoint / HF model)
23
+ HF_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" # replace if you want another model
 
24
 
25
+ # Force an anomaly every N runs (helps verify inference quickly)
26
+ FORCE_EVERY_N = 4
 
 
27
 
28
+ # detection thresholds (intentionally sensitive per your request)
29
+ LATENCY_THRESHOLD = 150 # ms
30
+ ERROR_RATE_THRESHOLD = 0.05
31
 
32
+ # healing success simulation probability
33
+ SIMULATED_HEAL_SUCCESS_PROB = 0.8
 
 
 
 
 
 
 
 
 
34
 
35
+ # keep last N events in display
36
+ DISPLAY_TAIL = 20
37
 
38
+ # -------------------------
39
+ # CLIENTS & STATE
40
+ # -------------------------
41
+ client = InferenceClient(token=HF_TOKEN)
42
+
43
+ events_log = [] # list of dict events
44
+ run_counter = {"count": 0}
 
 
45
 
46
+ # -------------------------
47
+ # Helper functions
48
+ # -------------------------
49
+ def now_ts() -> str:
50
+ return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
51
 
52
+ def simulate_event(forced_anomaly: bool = False) -> Dict[str, Any]:
53
+ """Create a synthetic telemetry event. If forced_anomaly=True, bump latency/error to trigger."""
54
+ component = random.choice(["api-service", "data-ingestor", "model-runner", "queue-worker"])
55
+ latency = round(random.gauss(150, 60), 2)
56
+ error_rate = round(random.random() * 0.2, 3)
57
+ if forced_anomaly:
58
+ # bump values to guarantee anomaly
59
+ latency = max(latency, LATENCY_THRESHOLD + random.uniform(20, 150))
60
+ error_rate = max(error_rate, ERROR_RATE_THRESHOLD + random.uniform(0.02, 0.2))
61
+ timestamp = now_ts()
62
+ return {
63
+ "timestamp": timestamp,
64
+ "component": component,
65
+ "latency": latency,
66
+ "error_rate": error_rate
67
+ }
68
 
69
+ def detect_anomaly(event: Dict[str, Any]) -> bool:
70
+ """Detection rule (threshold-based for MVP)."""
71
+ return (event["latency"] > LATENCY_THRESHOLD) or (event["error_rate"] > ERROR_RATE_THRESHOLD)
72
 
73
+ def build_prompt_for_diagnosis(event: Dict[str, Any]) -> str:
74
+ """Ask the LLM to return strict JSON with cause, confidence (0-1), and a safe one-line action."""
75
  prompt = f"""
76
+ You are an experienced reliability engineer. Given the telemetry below, produce a JSON object only (no extra text)
77
+ with three fields:
78
+ - "cause": short plain-English reason for the anomaly (1-2 sentences).
79
+ - "confidence": a float between 0.0 and 1.0 indicating how confident you are in the cause.
80
+ - "action": a safe, specific, one-line remediation the system could attempt automatically (e.g., "restart service X", "retry job queue", "reload config from storage", "rollback model to version v1").
81
+ Telemetry:
82
+ - timestamp: {event['timestamp']}
83
+ - component: {event['component']}
84
+ - latency_ms: {event['latency']}
85
+ - error_rate: {event['error_rate']}
86
+
87
+ Return valid JSON only.
88
+ """
89
+ return prompt
90
+
91
+ def call_hf_diagnosis(event: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], str]:
92
+ """Call HF inference API and parse JSON result robustly."""
93
+ prompt = build_prompt_for_diagnosis(event)
94
  try:
95
+ # Use text_generation or text to handle instruct-style prompt depending on client
96
+ resp = client.text_generation(model=HF_MODEL, prompt=prompt, max_new_tokens=180)
97
+ # resp may be a string, dict, or object. Try to extract text robustly.
98
+ if isinstance(resp, str):
99
+ text = resp
100
+ elif isinstance(resp, dict):
101
+ # common shapes: {'generated_text': '...'} or {'choices':[{'text':'...'}]}
102
+ if "generated_text" in resp:
103
+ text = resp["generated_text"]
104
+ elif "choices" in resp and isinstance(resp["choices"], list) and "text" in resp["choices"][0]:
105
+ text = resp["choices"][0]["text"]
106
+ else:
107
+ # fallback to str
108
+ text = json.dumps(resp)
109
+ else:
110
+ text = str(resp)
111
 
112
+ # Extract JSON blob from the text (in-case model adds explanation)
113
+ # Find first "{" and last "}" to attempt JSON parse
114
+ start = text.find("{")
115
+ end = text.rfind("}")
116
+ if start != -1 and end != -1 and end > start:
117
+ json_str = text[start:end+1]
118
+ else:
119
+ json_str = text # let json.loads try, will likely fail
120
+
121
+ parsed = json.loads(json_str)
122
+ # normalize keys/values
123
+ parsed["confidence"] = float(parsed.get("confidence", 0.0))
124
+ parsed["cause"] = str(parsed.get("cause", "")).strip()
125
+ parsed["action"] = str(parsed.get("action", "")).strip()
126
+ return parsed, text
127
+ except Exception as e:
128
+ # return None and raw error message for UI
129
+ return None, f"Error generating/parsing analysis: {e}"
130
 
131
+ def simulate_execute_healing(action: str) -> Dict[str, Any]:
132
+ """
133
+ Simulate executing the remediation action.
134
+ This is intentionally a safe simulation — no external system calls.
135
+ Returns a dict with status and message.
136
+ """
137
+ success = random.random() < SIMULATED_HEAL_SUCCESS_PROB
138
+ # Simulate idempotency & short wait
139
+ time.sleep(0.15)
140
+ if success:
141
+ return {"result": "success", "notes": f"Simulated execution of '{action}' - succeeded."}
142
  else:
143
+ return {"result": "failed", "notes": f"Simulated execution of '{action}' - failed (needs manual review)."}
144
+
145
+ def update_analytics_plot(df: pd.DataFrame) -> io.BytesIO:
146
+ """Return a PNG of trend charts (latency & error_rate) for the recent window."""
147
+ plt.clf()
148
+ fig, ax1 = plt.subplots(figsize=(8, 3.5))
149
+ ax2 = ax1.twinx()
150
+
151
+ # plotting last up to 50 points
152
+ tail = df.tail(50)
153
+ x = range(len(tail))
154
+ ax1.plot(x, tail["latency"], linewidth=1)
155
+ ax2.plot(x, tail["error_rate"], linewidth=1, linestyle="--")
156
+
157
+ ax1.set_xlabel("recent events")
158
+ ax1.set_ylabel("latency (ms)")
159
+ ax2.set_ylabel("error_rate")
160
 
161
+ plt.title("Telemetry trends (latency vs error_rate)")
162
+ plt.tight_layout()
163
 
164
+ buf = io.BytesIO()
165
+ fig.savefig(buf, format="png")
166
+ buf.seek(0)
167
+ plt.close(fig)
168
+ return buf
169
 
170
+ # -------------------------
171
+ # Core processing pipeline
172
+ # -------------------------
173
+ def process_event_and_return_outputs() -> Tuple[str, pd.DataFrame, io.BytesIO]:
174
+ """
175
+ Full loop:
176
+ - simulate event (force anomaly every N runs)
177
+ - detect anomaly
178
+ - if anomaly: call HF for diagnosis -> parse JSON -> simulate healing (optional)
179
+ - append to events_log and return UI-friendly outputs
180
+ """
181
+ run_counter["count"] += 1
182
+ forced = (run_counter["count"] % FORCE_EVERY_N == 0)
183
 
184
+ event = simulate_event(forced_anomaly=forced)
185
  is_anomaly = detect_anomaly(event)
186
+
187
+ record = dict(event) # flatten copy
188
+ record["anomaly"] = is_anomaly
189
+ record["analysis_raw"] = ""
190
+ record["cause"] = ""
191
+ record["confidence"] = None
192
+ record["action"] = ""
193
+ record["healing_result"] = ""
194
 
195
  if is_anomaly:
196
+ parsed, raw = call_hf_diagnosis(event)
197
+ record["analysis_raw"] = raw
198
+ if parsed is None:
199
+ record["cause"] = f"Diagnosis failed: {raw}"
200
+ record["confidence"] = 0.0
201
+ record["action"] = ""
202
+ record["healing_result"] = "No-action"
 
 
203
  else:
204
+ record["cause"] = parsed.get("cause", "")
205
+ record["confidence"] = parsed.get("confidence", 0.0)
206
+ record["action"] = parsed.get("action", "")
207
+ # Decide whether to auto-execute: only auto if confidence > 0.5 and action is non-empty
208
+ if record["confidence"] >= 0.5 and record["action"]:
209
+ execution = simulate_execute_healing(record["action"])
210
+ record["healing_result"] = json.dumps(execution)
211
+ else:
212
+ record["healing_result"] = "deferred (low confidence or no action)"
213
  else:
214
+ record["analysis_raw"] = "-"
215
+ record["healing_result"] = "-"
 
216
 
217
+ # normalize fields & append
218
+ events_log.append({
219
+ "timestamp": record["timestamp"],
220
+ "component": record["component"],
221
+ "latency": record["latency"],
222
+ "error_rate": record["error_rate"],
223
+ "status": "Anomaly" if is_anomaly else "Normal",
224
+ "cause": record["cause"],
225
+ "confidence": record["confidence"],
226
+ "action": record["action"],
227
+ "healing_result": record["healing_result"]
228
+ })
229
 
230
+ # prepare DataFrame for display
231
+ df = pd.DataFrame(events_log).fillna("-").tail(DISPLAY_TAIL)
232
 
233
+ # analytics plot
234
+ plot_buf = update_analytics_plot(pd.DataFrame(events_log).fillna(0))
235
+
236
+ status_text = f"✅ Event Processed ({'Anomaly' if is_anomaly else 'Normal'}) — forced={forced}"
237
+ return status_text, df, plot_buf
238
+
239
+ # -------------------------
240
+ # Gradio UI
241
+ # -------------------------
242
  with gr.Blocks(title="🧠 Agentic Reliability Framework MVP") as demo:
243
+ gr.Markdown("# 🧠 Agentic Reliability Framework MVP")
244
+ gr.Markdown(
245
+ "Real-time telemetry simulation → anomaly detection → HF-based diagnosis → simulated self-heal\n\n"
246
+ f"**Force anomaly every** `{FORCE_EVERY_N}` runs. Detection thresholds: latency>{LATENCY_THRESHOLD}ms or error_rate>{ERROR_RATE_THRESHOLD}."
247
+ )
248
+
249
+ with gr.Row():
250
+ run_btn = gr.Button("🚀 Submit Telemetry Event")
251
+ reset_btn = gr.Button("♻️ Reset Logs")
252
+ info = gr.Markdown("Status: waiting")
253
+
254
+ status = gr.Textbox(label="Detection Output", interactive=False)
255
+ alerts = gr.Dataframe(headers=["timestamp", "component", "latency", "error_rate", "status", "cause", "confidence", "action", "healing_result"], label="Recent Events (Tail)", wrap=True)
256
+ plot_output = gr.Image(label="Telemetry Trends (latency / error_rate)")
257
+
258
+ # callbacks
259
+ run_btn.click(fn=process_event_and_return_outputs, inputs=None, outputs=[status, alerts, plot_output])
260
+
261
+ def reset_logs():
262
+ events_log.clear()
263
+ run_counter["count"] = 0
264
+ # return empty placeholders
265
+ return "Logs reset", pd.DataFrame([], columns=["timestamp", "component", "latency", "error_rate", "status", "cause", "confidence", "action", "healing_result"]), io.BytesIO()
266
+
267
+ reset_btn.click(fn=reset_logs, inputs=None, outputs=[status, alerts, plot_output])
268
 
269
+ gr.Markdown(
270
+ "Notes:\n\n"
271
+ "- This MVP **simulates** healing — it does NOT execute real infra changes. Replace `simulate_execute_healing` with safe idempotent remote calls when ready.\n"
272
+ "- The model is prompted to return JSON only; we robustly parse the response but still handle parse errors.\n"
273
+ "- To test inference quickly, the system forces anomalies every N runs so you'll see diagnosis output frequently.\n"
274
+ )
275
 
276
+ if __name__ == "__main__":
277
+ demo.launch(server_name="0.0.0.0", server_port=7860)
278