""" Insta-AutoApp v3 — 6-Agent Pipeline with InferenceClient LLM AI-powered symptom triage for 2023 Ford Bronco owners. Agents: 1. IntakeAgent — validates & normalizes user symptom input 2. ProfileAgent — injects vehicle profile context 3. ClarificationAgent — generates Bronco-specific follow-up questions 4. RetrievalAgent — RAG retrieval from OEM manual (FAISS + keyword fallback) 5. DiagnosticAgent — LLM-powered triage producing structured 4-field output 6. PresentationAgent — formats branded Triage Card with safety disclaimer Team Data Mavericks · Nasser Chaudhry · Miriam Camacho · Neil Driscoll ANLY 601 · Mays Business School · Texas A&M University """ import html import logging import os import re import time from dataclasses import dataclass, field from typing import Optional import gradio as gr from huggingface_hub import InferenceClient from config import ( APP_TITLE, APP_DESCRIPTION, DISCLAIMER_BANNER, DISCLAIMER_RESPONSE, ERROR_API_UNAVAILABLE, ERROR_NOT_IN_MANUAL, TRIM_OPTIONS, ENGINE_OPTIONS, PACKAGE_OPTIONS, TOP_TYPE_OPTIONS, MILEAGE_MIN, MILEAGE_MAX, MILEAGE_DEFAULT, FALLBACK_FOLLOWUP_QUESTIONS, SAFETY_CRITICAL_KEYWORDS, HF_API_TOKEN, MAX_RETRIES, RETRY_DELAY, ) from prompts import ( FOLLOWUP_SYSTEM_PROMPT, TRIAGE_SYSTEM_PROMPT, format_vehicle_profile, format_followup_context, format_retrieved_context, ) from rag_pipeline import get_rag_pipeline, initialize_rag logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────── # LLM Client — tries a chain of models until one answers # ────────────────────────────────────────────────────────────────────── # HF Inference Providers sometimes route to backends that 404 (e.g. Novita for # Mistral). A fallback chain makes this robust: first model that answers wins, # and we cache it for subsequent calls. MODEL_CHAIN = [ os.getenv("HF_MODEL_ID", ""), "deepseek-ai/DeepSeek-V3-0324", "meta-llama/Llama-3.3-70B-Instruct", "Qwen/Qwen2.5-7B-Instruct", "HuggingFaceH4/zephyr-7b-beta", ] MODEL_CHAIN = [m for m in MODEL_CHAIN if m] class LLMClient: """InferenceClient wrapper that tries a chain of models until one answers.""" def __init__(self): self.token = HF_API_TOKEN self.models = MODEL_CHAIN self._client = None self._working_model = None if self.token: try: self._client = InferenceClient(token=self.token, timeout=30) logger.info(f"LLMClient initialized. Chain: {self.models}") except Exception as e: logger.error(f"Failed to initialize InferenceClient: {e}") else: logger.warning("HF_API_TOKEN not set. LLM calls will fail.") def is_configured(self) -> bool: return self._client is not None def _try_model(self, model, prompt, max_new_tokens): try: response = self._client.chat_completion( model=model, messages=[{"role": "user", "content": prompt}], max_tokens=max_new_tokens, temperature=0.7, top_p=0.9, ) if hasattr(response, "choices") and response.choices: content = response.choices[0].message.content if content: return content.strip() except Exception as e: logger.warning(f"Model {model} failed: {type(e).__name__}: {str(e)[:200]}") return None def generate(self, prompt: str, max_new_tokens: int = 1024) -> Optional[str]: if not self._client: return None if self._working_model: result = self._try_model(self._working_model, prompt, max_new_tokens) if result: return result logger.warning(f"Cached model {self._working_model} failed, re-trying chain") self._working_model = None for model in self.models: logger.info(f"Trying model: {model}") result = self._try_model(model, prompt, max_new_tokens) if result: self._working_model = model logger.info(f"✓ {model} succeeded, caching") return result logger.error(f"All {len(self.models)} models in chain failed") return None _llm_client: Optional[LLMClient] = None def get_llm_client() -> LLMClient: global _llm_client if _llm_client is None: _llm_client = LLMClient() return _llm_client # ────────────────────────────────────────────────────────────────────── # Pipeline Context # ────────────────────────────────────────────────────────────────────── @dataclass class PipelineContext: raw_symptom: str = "" normalized_symptom: str = "" vehicle_profile: str = "" is_valid: bool = False validation_error: str = "" followup_questions: list = field(default_factory=list) followup_answers: list = field(default_factory=list) using_fallback_questions: bool = False retrieved_chunks: list = field(default_factory=list) triage_fields: dict = field(default_factory=dict) html_output: str = "" pipeline_trace: list = field(default_factory=list) safety_flagged: bool = False def trace(self, agent, status, msg): icon = {"ok": "✓", "warn": "⚠", "skip": "⊘", "fail": "✗"}.get(status, "·") self.pipeline_trace.append({"agent": agent, "status": status, "icon": icon, "msg": msg}) logger.info(f"[{agent}] {status.upper()}: {msg}") # ────────────────────────────────────────────────────────────────────── # Agents # ────────────────────────────────────────────────────────────────────── class IntakeAgent: MIN_LEN = 8 def process(self, ctx): raw = (ctx.raw_symptom or "").strip() if not raw: ctx.validation_error = "Please describe your symptom before submitting." ctx.is_valid = False ctx.trace("IntakeAgent", "fail", "Empty input rejected") return ctx if len(raw) < self.MIN_LEN: ctx.validation_error = "Please provide a bit more detail about what's happening." ctx.is_valid = False ctx.trace("IntakeAgent", "fail", f"Input too short ({len(raw)} chars)") return ctx ctx.normalized_symptom = re.sub(r"\s+", " ", raw) ctx.is_valid = True ctx.trace("IntakeAgent", "ok", f"Normalized {len(raw)} chars of input") if any(kw in ctx.normalized_symptom.lower() for kw in SAFETY_CRITICAL_KEYWORDS): ctx.safety_flagged = True ctx.trace("IntakeAgent", "warn", "Safety-critical keywords detected → conservative bias engaged") return ctx class ProfileAgent: def process(self, ctx, trim, engine, package, top_type, mileage): ctx.vehicle_profile = format_vehicle_profile(trim, engine, package, top_type, mileage) try: mi = int(mileage) if mileage else 0 except (TypeError, ValueError): mi = 0 ctx.trace("ProfileAgent", "ok", f"{trim} · {engine} · {package} · {mi:,} mi") return ctx class ClarificationAgent: MULTI_SYMPTOM_TRIGGERS = [ ("check engine", "4x4"), ("check engine", "transmission"), ("smell", "light"), ("noise", "light"), ("brake", "steering"), ("burning", "light"), ("4x4", "hesitat"), ] def _needs_clarification(self, symptom): s = symptom.lower() for a, b in self.MULTI_SYMPTOM_TRIGGERS: if a in s and b in s: return True if len(symptom.split()) < 12: return True if any(p in s for p in ["something", "weird", "strange", "acting up", "off"]): return True return False def process(self, ctx): if not self._needs_clarification(ctx.normalized_symptom): ctx.trace("ClarificationAgent", "skip", "Input specific enough — follow-ups skipped") return ctx llm = get_llm_client() if not llm.is_configured(): ctx.followup_questions = FALLBACK_FOLLOWUP_QUESTIONS.copy() ctx.using_fallback_questions = True ctx.trace("ClarificationAgent", "warn", "LLM not configured — using standard follow-ups") return ctx prompt = FOLLOWUP_SYSTEM_PROMPT.format( vehicle_profile=ctx.vehicle_profile, symptom=ctx.normalized_symptom, ) response = llm.generate(prompt, max_new_tokens=256) if response is None: ctx.followup_questions = FALLBACK_FOLLOWUP_QUESTIONS.copy() ctx.using_fallback_questions = True ctx.trace("ClarificationAgent", "warn", "LLM call failed — using standard follow-ups") return ctx questions = [] for line in response.strip().split("\n"): line = line.strip().lstrip("0123456789.)-• ").strip() if line and len(line) > 10 and "?" in line: questions.append(line) questions = questions[:2] if not questions: ctx.followup_questions = FALLBACK_FOLLOWUP_QUESTIONS.copy() ctx.using_fallback_questions = True ctx.trace("ClarificationAgent", "warn", "No valid questions parsed — using fallback") else: ctx.followup_questions = questions ctx.trace("ClarificationAgent", "ok", f"Generated {len(questions)} Bronco-specific follow-up(s)") return ctx class RetrievalAgent: def process(self, ctx): parts = [ctx.normalized_symptom] for q, a in zip(ctx.followup_questions[:len(ctx.followup_answers)], ctx.followup_answers): parts.append(f"{q} {a}") query = " ".join(parts) rag = get_rag_pipeline() if not rag.is_loaded(): ctx.retrieved_chunks = [] ctx.trace("RetrievalAgent", "fail", "RAG pipeline not loaded") return ctx chunks = rag.retrieve(query) ctx.retrieved_chunks = chunks mode = "FAISS semantic" if getattr(rag, "_use_faiss", False) else "keyword fallback" ctx.trace("RetrievalAgent", "ok", f"Retrieved {len(chunks)} OEM manual chunks ({mode})") return ctx class DiagnosticAgent: def process(self, ctx): llm = get_llm_client() if not llm.is_configured(): ctx.trace("DiagnosticAgent", "fail", "LLM not configured") return ctx followup_ctx = format_followup_context( ctx.followup_questions[:len(ctx.followup_answers)], ctx.followup_answers, ) prompt = TRIAGE_SYSTEM_PROMPT.format( vehicle_profile=ctx.vehicle_profile, symptom=ctx.normalized_symptom, followup_context=followup_ctx, retrieved_context=format_retrieved_context(ctx.retrieved_chunks), ) response = llm.generate(prompt, max_new_tokens=1024) if response is None: ctx.trace("DiagnosticAgent", "fail", "LLM generation failed after retries") return ctx fields = self._parse(response) if ctx.safety_flagged and fields.get("urgency", "").lower() in ("safe", "monitor"): fields["urgency"] = "Urgent" ctx.trace("DiagnosticAgent", "warn", "Urgency escalated to Urgent (safety-critical keywords)") ctx.triage_fields = fields ctx.trace("DiagnosticAgent", "ok", f"Triage generated — Urgency: {fields.get('urgency', '?')}") return ctx @staticmethod def _parse(text): fields = {"urgency": "", "meaning": "", "next_step": "", "citation": ""} patterns = { "urgency": r"(?:urgency(?:\s+level)?|\*\*urgency[^*]*\*\*)\s*[:\-]?\s*(.+?)(?=\n|$)", "meaning": r"(?:likely\s+meaning|meaning|cause)\s*[:\-]?\s*(.+?)(?=\n(?:recommended|next|oem|citation|\*\*)|\Z)", "next_step": r"(?:recommended\s+next\s+step|next\s+step|action)\s*[:\-]?\s*(.+?)(?=\n(?:oem|citation|\*\*)|\Z)", "citation": r"(?:oem\s+citation|citation|source|reference)\s*[:\-]?\s*(.+?)(?=\n\n|\Z)", } for key, pat in patterns.items(): m = re.search(pat, text, re.IGNORECASE | re.DOTALL) if m: val = m.group(1).strip().strip("*").strip() val = re.sub(r"\*\*", "", val) fields[key] = val[:800] if not any(fields.values()): fields["meaning"] = text.strip()[:500] fields["urgency"] = "Monitor" fields["next_step"] = "Consult a Ford-certified technician for inspection." fields["citation"] = "See 2023 Ford Bronco Owner's Manual." return fields class PresentationAgent: URGENCY_STYLES = { "safe": ("#1F7A3A", "#E8F5EB", "SAFE"), "monitor": ("#B68B00", "#FFF7D6", "MONITOR"), "urgent": ("#C84A1A", "#FFEDE0", "URGENT"), "do not drive": ("#A01818", "#FDE6E6", "DO NOT DRIVE"), } def process(self, ctx): if not ctx.triage_fields: ctx.html_output = self._error_card(ERROR_API_UNAVAILABLE) ctx.trace("PresentationAgent", "fail", "No triage fields to render") return ctx if not ctx.retrieved_chunks: ctx.html_output = self._error_card(ERROR_NOT_IN_MANUAL) ctx.trace("PresentationAgent", "warn", "No retrieved chunks — showing not-in-manual notice") return ctx ctx.html_output = self._triage_card(ctx.triage_fields) ctx.trace("PresentationAgent", "ok", "Triage card rendered") return ctx def _triage_card(self, f): urg_key = f.get("urgency", "monitor").lower().strip() matched = "monitor" for k in self.URGENCY_STYLES: if k in urg_key: matched = k break fg, bg, label = self.URGENCY_STYLES[matched] esc = lambda s: html.escape(s or "—") return f"""