import gradio as gr import asyncio import json import logging import traceback import os import torch import numpy as np import pandas as pd from datetime import datetime # ARF components from agentic_reliability_framework.runtime.engine import EnhancedReliabilityEngine from agentic_reliability_framework.core.models.event import ReliabilityEvent # Custom AI components from ai_event import AIEvent from ai_risk_engine import AIRiskEngine from hallucination_detective import HallucinationDetectiveAgent from memory_drift_diagnostician import MemoryDriftDiagnosticianAgent from nli_detector import NLIDetector from retrieval import SimpleRetriever from image_detector import ImageQualityDetector from audio_detector import AudioQualityDetector from iot_simulator import IoTSimulator from robotics_diagnostician import RoboticsDiagnostician from iot_event import IoTEvent # ========== Infrastructure Reliability Imports (with fallbacks) ========== INFRA_DEPS_AVAILABLE = False try: from infra_simulator import InfraSimulator from infra_graph import InfraGraph from bayesian_model import failure_model as pyro_model from gnn_predictor import FailureGNN from ontology_reasoner import InfraOntology import problog INFRA_DEPS_AVAILABLE = True logger.info("Infrastructure reliability modules loaded.") except ImportError as e: logger.warning(f"Infrastructure modules not fully available: {e}. The Infrastructure tab will be disabled.") # ---------------------------------------------------------------------- # Logging setup # ---------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ---------------------------------------------------------------------- # ARF infrastructure engine (optional) # ---------------------------------------------------------------------- try: logger.info("Initializing EnhancedReliabilityEngine...") infra_engine = EnhancedReliabilityEngine() except Exception as e: logger.error(f"Infrastructure engine init failed: {e}") infra_engine = None # ---------------------------------------------------------------------- # Text generation model (DialoGPT-small) with logprobs # ---------------------------------------------------------------------- from transformers import AutoTokenizer, AutoModelForCausalLM gen_model_name = "microsoft/DialoGPT-small" try: tokenizer = AutoTokenizer.from_pretrained(gen_model_name) model = AutoModelForCausalLM.from_pretrained(gen_model_name) model.eval() logger.info(f"Generator {gen_model_name} loaded.") except Exception as e: logger.error(f"Generator load failed: {e}") tokenizer = model = None def generate_with_logprobs(prompt, max_new_tokens=100): """Generate text and return (generated_text, avg_log_prob).""" if tokenizer is None or model is None: return "[Model not loaded]", -10.0 inputs = tokenizer(prompt, return_tensors="pt") with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_new_tokens, return_dict_in_generate=True, output_scores=True ) scores = outputs.scores log_probs = [torch.log_softmax(score, dim=-1) for score in scores] generated_ids = outputs.sequences[0][inputs['input_ids'].shape[1]:] token_log_probs = [] for i, lp in enumerate(log_probs): token_id = generated_ids[i] token_log_probs.append(lp[0, token_id].item()) avg_log_prob = sum(token_log_probs) / len(token_log_probs) if token_log_probs else -10.0 generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True) return generated_text, avg_log_prob # ---------------------------------------------------------------------- # NLI detector # ---------------------------------------------------------------------- nli_detector = NLIDetector() # ---------------------------------------------------------------------- # Retrieval (sentence‑transformers + ChromaDB) # ---------------------------------------------------------------------- retriever = SimpleRetriever() # ---------------------------------------------------------------------- # Image generation (tiny diffusion model) # ---------------------------------------------------------------------- from diffusers import StableDiffusionPipeline image_pipe = None try: image_pipe = StableDiffusionPipeline.from_pretrained( "hf-internal-testing/tiny-stable-diffusion-torch" ) if not torch.cuda.is_available(): image_pipe.to("cpu") logger.info("Image pipeline loaded.") except Exception as e: logger.warning(f"Image pipeline load failed (will be disabled): {e}") # ---------------------------------------------------------------------- # Audio transcription (Whisper tiny) # ---------------------------------------------------------------------- from transformers import pipeline audio_pipe = None try: audio_pipe = pipeline( "automatic-speech-recognition", model="openai/whisper-tiny.en", device=0 if torch.cuda.is_available() else -1 ) logger.info("Audio pipeline loaded.") except Exception as e: logger.warning(f"Audio pipeline load failed (will be disabled): {e}") # ---------------------------------------------------------------------- # AI agents # ---------------------------------------------------------------------- hallucination_detective = HallucinationDetectiveAgent(nli_detector=nli_detector) memory_drift_diagnostician = MemoryDriftDiagnosticianAgent() image_quality_detector = ImageQualityDetector() audio_quality_detector = AudioQualityDetector() robotics_diagnostician = RoboticsDiagnostician() # ---------------------------------------------------------------------- # Bayesian risk engine # ---------------------------------------------------------------------- ai_risk_engine = AIRiskEngine() # ---------------------------------------------------------------------- # IoT simulator # ---------------------------------------------------------------------- iot_sim = IoTSimulator() # ---------------------------------------------------------------------- # Infrastructure components (global, with fallback) # ---------------------------------------------------------------------- if INFRA_DEPS_AVAILABLE: # Use environment variables for Neo4j if provided, else mock infra_sim = InfraSimulator() infra_graph = InfraGraph( uri=os.getenv("NEO4J_URI"), user=os.getenv("NEO4J_USER"), password=os.getenv("NEO4J_PASSWORD") ) gnn_model = FailureGNN() ontology = InfraOntology() else: infra_sim = None infra_graph = None gnn_model = None ontology = None # ---------------------------------------------------------------------- # Helper: update risk with feedback (global state – shared across users) # For per‑session risk, use gr.State instead of globals. # ---------------------------------------------------------------------- last_task_category = None def feedback(thumbs_up: bool): """Handle user feedback to update Beta priors.""" global last_task_category if last_task_category is None: return "No previous analysis to rate." ai_risk_engine.update_outcome(last_task_category, success=thumbs_up) return f"Feedback recorded: {'👍' if thumbs_up else '👎'} for {last_task_category}." # ---------------------------------------------------------------------- # Async handlers for each tab # ---------------------------------------------------------------------- async def handle_text(task_type, prompt): """Handle text generation and analysis.""" global last_task_category last_task_category = task_type try: response, avg_log_prob = generate_with_logprobs(prompt) retrieval_score = retriever.get_similarity(prompt) event = AIEvent( timestamp=datetime.utcnow(), component="ai", service_mesh="ai", latency_p99=0, error_rate=0.0, throughput=1, cpu_util=None, memory_util=None, action_category=task_type, model_name=gen_model_name, model_version="latest", prompt=prompt, response=response, response_length=len(response), confidence=float(np.exp(avg_log_prob)), # convert to [0,1] scale (approx) perplexity=None, retrieval_scores=[retrieval_score], user_feedback=None, latency_ms=0 ) hallu_result = await hallucination_detective.analyze(event) drift_result = await memory_drift_diagnostician.analyze(event) risk_metrics = ai_risk_engine.risk_score(task_type) return { "response": response, "avg_log_prob": avg_log_prob, "confidence": event.confidence, "retrieval_score": retrieval_score, "hallucination_detection": hallu_result, "memory_drift_detection": drift_result, "risk_metrics": risk_metrics } except Exception as e: logger.error(f"Text task error: {e}") return {"error": str(e)} async def handle_image(prompt, steps): """Handle image generation with configurable steps. Returns (image, json_data).""" global last_task_category last_task_category = "image" if image_pipe is None: return None, {"error": "Image model not loaded"} try: import time start = time.time() image = image_pipe(prompt, num_inference_steps=steps).images[0] gen_time = time.time() - start retrieval_score = retriever.get_similarity(prompt) event = AIEvent( timestamp=datetime.utcnow(), component="image", service_mesh="ai", latency_p99=0, error_rate=0.0, throughput=1, cpu_util=None, memory_util=None, action_category="image", model_name="tiny-sd", model_version="latest", prompt=prompt, response="", # not text response_length=0, confidence=1.0 / (gen_time + 1), # heuristic perplexity=None, retrieval_scores=[retrieval_score, gen_time], user_feedback=None, latency_ms=gen_time * 1000 ) quality_result = await image_quality_detector.analyze(event) json_data = { "generation_time": gen_time, "retrieval_score": retrieval_score, "quality_detection": quality_result } return image, json_data except Exception as e: logger.error(f"Image task error: {e}") return None, {"error": str(e)} async def handle_audio(audio_file): """Handle audio transcription and quality analysis.""" global last_task_category last_task_category = "audio" if audio_pipe is None: return {"error": "Audio model not loaded"} try: import librosa import soundfile as sf import tempfile audio, sr = librosa.load(audio_file, sr=16000) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: sf.write(tmp.name, audio, sr) result = audio_pipe(tmp.name, return_timestamps=False) text = result["text"] # Whisper does not output log probs easily; use placeholder avg_log_prob = -2.0 event = AIEvent( timestamp=datetime.utcnow(), component="audio", service_mesh="ai", latency_p99=0, error_rate=0.0, throughput=1, cpu_util=None, memory_util=None, action_category="audio", model_name="whisper-tiny.en", model_version="latest", prompt="", # audio file path response=text, response_length=len(text), confidence=float(np.exp(avg_log_prob)), perplexity=None, retrieval_scores=[avg_log_prob], user_feedback=None, latency_ms=0 ) quality_result = await audio_quality_detector.analyze(event) return { "transcription": text, "avg_log_prob": avg_log_prob, "confidence": event.confidence, "quality_detection": quality_result } except Exception as e: logger.error(f"Audio task error: {e}") return {"error": str(e)} async def read_iot_sensors(fault_type, history_state): """Read simulated IoT sensors, run diagnostics, predict failure, and return updated plot data.""" global last_task_category last_task_category = "iot" iot_sim.set_fault(fault_type if fault_type != "none" else None) data = iot_sim.read() history_state.append(data) if len(history_state) > 100: history_state.pop(0) # Create IoTEvent with valid component name event = IoTEvent( timestamp=datetime.utcnow(), component="robotic-arm", service_mesh="factory", latency_p99=0, error_rate=0.0, throughput=1, cpu_util=None, memory_util=None, temperature=data['temperature'], vibration=data['vibration'], motor_current=data['motor_current'], position_error=data['position_error'] ) diag_result = await robotics_diagnostician.analyze(event) # Simple failure prediction prediction = None if len(history_state) >= 5: temps = [h['temperature'] for h in history_state[-5:]] x = np.arange(len(temps)) slope, intercept = np.polyfit(x, temps, 1) next_temp = slope * len(temps) + intercept if slope > 0.1: time_to_threshold = (40.0 - next_temp) / slope if slope > 0 else None prediction = { "predicted_temperature": next_temp, "time_to_overheat_min": time_to_threshold } # Prepare temperature history for plotting as DataFrame temp_history = [h['temperature'] for h in history_state[-20:]] df = pd.DataFrame({ "index": list(range(len(temp_history))), "temperature": temp_history }) return data, diag_result, prediction, df, history_state # ========== NEW: Infrastructure Reliability Handler ========== async def handle_infra(fault_type, session_state): """Run infrastructure reliability analysis.""" if not INFRA_DEPS_AVAILABLE: return {"error": "Infrastructure modules not installed (see logs)"}, session_state # Create a new simulator per session (or reuse from state) if "sim" not in session_state or session_state["sim"] is None: session_state["sim"] = InfraSimulator() sim = session_state["sim"] # Inject fault sim.set_fault(fault_type if fault_type != "none" else None) components = sim.read_state() # Update graph infra_graph.update_from_state(components) # Run Bayesian inference (mock for now; in reality would use Pyro) bayesian_risk = {"switch_failure": 0.1, "server_failure": 0.05} # Run GNN prediction (mock if PyG not available) predictions = {"at_risk": ["server-1"] if fault_type != "none" else []} # Run ProbLog (via python-problog) logic_explanations = "ProbLog output: ..." # Replace with actual ProbLog call # Ontology reasoning ontology_result = ontology.classify("server") if ontology else {"inferred": [], "consistent": True} # Combine results output = { "topology": components, "bayesian_risk": bayesian_risk, "gnn_predictions": predictions, "logic_explanations": logic_explanations, "ontology": ontology_result } return output, session_state # ---------------------------------------------------------------------- # Gradio UI # ---------------------------------------------------------------------- with gr.Blocks(title="ARF v4 – AI Reliability Lab", theme="soft") as demo: gr.Markdown("# 🧠 ARF v4 – AI Reliability Lab\n**Detect hallucinations, drift, and failures across text, image, audio, and robotics**") with gr.Tabs(): # Tab 1: Text Generation with gr.TabItem("Text Generation"): text_task = gr.Dropdown(["chat", "code", "summary"], value="chat", label="Task") text_prompt = gr.Textbox(label="Prompt", value="What is the capital of France?", lines=3) text_btn = gr.Button("Generate") text_output = gr.JSON(label="Analysis") # Tab 2: Image Generation with gr.TabItem("Image Generation"): img_prompt = gr.Textbox(label="Prompt", value="A cat wearing a hat") img_steps = gr.Slider(1, 10, value=2, step=1, label="Inference Steps (higher = better quality, slower)") img_btn = gr.Button("Generate") img_output = gr.Image(label="Generated Image") img_json = gr.JSON(label="Analysis") # Tab 3: Audio Transcription with gr.TabItem("Audio Transcription"): gr.Markdown("Click the microphone to record, or upload a file. Try the sample: [Sample Audio](https://huggingface.co/datasets/Narsil/asr_dummy/resolve/main/1.flac)") audio_input = gr.Audio(type="filepath", label="Upload audio file") audio_btn = gr.Button("Transcribe") audio_output = gr.JSON(label="Analysis") # Tab 4: Robotics / IoT with gr.TabItem("Robotics / IoT"): gr.Markdown("### Simulated Robotic Arm Monitoring") iot_state = gr.State(value=[]) with gr.Row(): with gr.Column(): fault_type = gr.Dropdown( ["none", "overheat", "vibration", "stall", "drift"], value="none", label="Inject Fault" ) refresh_btn = gr.Button("Read Sensors") with gr.Column(): sensor_display = gr.JSON(label="Sensor Readings") with gr.Row(): with gr.Column(): diag_display = gr.JSON(label="Diagnosis") with gr.Column(): pred_display = gr.JSON(label="Failure Prediction") with gr.Row(): with gr.Column(scale=1, min_width=600): temp_plot = gr.LinePlot( label="Temperature History (last 20 readings)", x="index", y="temperature" ) # ========== NEW: Infrastructure Reliability Tab ========== with gr.TabItem("Infrastructure Reliability"): gr.Markdown("### Neuro‑Symbolic Infrastructure Monitoring (Bayesian + Graph + Logic)") infra_state = gr.State(value={}) # per‑session state with gr.Row(): with gr.Column(): infra_fault = gr.Dropdown( ["none", "switch_down", "server_overload", "cascade"], value="none", label="Inject Fault" ) infra_btn = gr.Button("Run Analysis") with gr.Column(): infra_output = gr.JSON(label="Analysis Results") # Tab 5: Enterprise with gr.TabItem("Enterprise"): gr.Markdown(""" ## 🚀 ARF Enterprise – Governed Execution for Autonomous Infrastructure ... """) # Feedback row with gr.Row(): feedback_up = gr.Button("👍 Correct") feedback_down = gr.Button("👎 Incorrect") feedback_msg = gr.Textbox(label="Feedback", interactive=False) # Wire events text_btn.click( fn=lambda task, p: asyncio.run(handle_text(task, p)), inputs=[text_task, text_prompt], outputs=text_output ) img_btn.click( fn=lambda p, s: asyncio.run(handle_image(p, s)), inputs=[img_prompt, img_steps], outputs=[img_output, img_json] ) audio_btn.click( fn=lambda f: asyncio.run(handle_audio(f)), inputs=audio_input, outputs=audio_output ) refresh_btn.click( fn=lambda f, h: asyncio.run(read_iot_sensors(f, h)), inputs=[fault_type, iot_state], outputs=[sensor_display, diag_display, pred_display, temp_plot, iot_state] ) infra_btn.click( fn=lambda f, s: asyncio.run(handle_infra(f, s)), inputs=[infra_fault, infra_state], outputs=[infra_output, infra_state] ) feedback_up.click(fn=lambda: feedback(True), outputs=feedback_msg) feedback_down.click(fn=lambda: feedback(False), outputs=feedback_msg) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)