import os import json import re from datetime import datetime, timezone import torch import gradio as gr import pandas as pd # --- LangChain & Groq Imports --- from langchain_groq import ChatGroq from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings from langchain_core.documents import Document from langchain_core.prompts import PromptTemplate # --- 1. Setup API Key for Groq --- # Ensure you add GROQ_API_KEY to your Hugging Face Space Secrets GROQ_API_KEY = os.environ.get('GROQ_API_KEY') if not GROQ_API_KEY: raise ValueError("š“ GROQ_API_KEY not found. Please add it to your Hugging Face Space Secrets.") # --- 2. Build the RAG Chain & Feedback System --- FMEA_DATA_FILE = '10000fmea_data.csv' FEEDBACK_FILE = 'fmea_feedback.csv' QA_CHAIN = None RETRIEVER = None LLM = None PROMPT = None FMEA_DF = None DOCUMENTS = None feedback_vector_store = None embeddings = None DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"ā Using device: {DEVICE}") # --- FEEDBACK LOOP PART 1: Saving, Normalizing, and Loading Feedback --- def normalize_action(text: str) -> str: return re.sub(r'\s+', ' ', str(text).strip().lower()) def load_feedback_stats(): if not os.path.exists(FEEDBACK_FILE): return {} try: feedback_df = pd.read_csv(FEEDBACK_FILE) if feedback_df.empty: return {} if "rating" not in feedback_df.columns: return {} stats = feedback_df.groupby('action')['rating'].agg(['mean', 'count']).to_dict('index') return stats except pd.errors.EmptyDataError: return {} def ensure_feedback_schema(): target_cols = ["action", "rating", "feedback_type", "timestamp_utc"] if not os.path.exists(FEEDBACK_FILE): return try: existing_df = pd.read_csv(FEEDBACK_FILE) if existing_df.empty: pd.DataFrame(columns=target_cols).to_csv(FEEDBACK_FILE, index=False) return changed = False for col in target_cols: if col not in existing_df.columns: existing_df[col] = "" changed = True if changed: existing_df = existing_df[target_cols] existing_df.to_csv(FEEDBACK_FILE, index=False) except pd.errors.EmptyDataError: pd.DataFrame(columns=target_cols).to_csv(FEEDBACK_FILE, index=False) def save_feedback(action, feedback_choice, display_df): if not action: return "Please select a recommendation from the table first.", display_df choice_map = { "š Thumbs Up": ("thumbs_up", 10), "š Thumbs Down": ("thumbs_down", 3) } feedback_type, rating = choice_map.get(feedback_choice, ("thumbs_up", 10)) timestamp_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") norm_action = normalize_action(action) new_feedback = pd.DataFrame([{ 'action': norm_action, 'rating': int(rating), 'feedback_type': feedback_type, 'timestamp_utc': timestamp_utc }]) if not os.path.exists(FEEDBACK_FILE): new_feedback.to_csv(FEEDBACK_FILE, index=False) else: ensure_feedback_schema() new_feedback.to_csv(FEEDBACK_FILE, mode='a', header=False, index=False) build_feedback_db() msg = f"ā Feedback saved ({feedback_choice}) for: {action} at {timestamp_utc}" # Update the displayed table dynamically if display_df is not None and not display_df.empty: try: feedback_stats = load_feedback_stats() default_stat = {'mean': 0, 'count': 0} stats_list = [feedback_stats.get(normalize_action(act), default_stat) for act in display_df['Recommended Action']] display_df['Avg. Feedback'] = [f"{stat['mean']:.2f}/10 ({int(stat['count'])})" for stat in stats_list] except Exception as e: print(f"Error updating display_df: {e}") return msg, display_df def build_feedback_db(): global feedback_vector_store if not os.path.exists(FEEDBACK_FILE): return try: feedback_df = pd.read_csv(FEEDBACK_FILE) if feedback_df.empty: return except pd.errors.EmptyDataError: return avg_ratings = feedback_df.groupby('action')['rating'].mean() highly_rated_actions = avg_ratings[avg_ratings > 7].index.tolist() if highly_rated_actions and embeddings: print(f"Found {len(highly_rated_actions)} highly-rated actions. Building feedback vector store...") feedback_vector_store = FAISS.from_texts(highly_rated_actions, embeddings) print("ā Feedback vector store is ready.") def keyword_retrieve_documents(search_query: str, k: int = 2): if FMEA_DF is None or DOCUMENTS is None or FMEA_DF.empty: return [] tokens = [tok for tok in re.findall(r"[a-z0-9]+", str(search_query).lower()) if len(tok) >= 3] if not tokens: return DOCUMENTS[:k] scores = [] for idx, text in enumerate(FMEA_DF["__search_text"]): token_hits = sum(1 for tok in tokens if tok in text) if token_hits: scores.append((token_hits, idx)) if not scores: return DOCUMENTS[:k] scores.sort(key=lambda x: x[0], reverse=True) top_indices = [idx for _, idx in scores[:k]] return [DOCUMENTS[idx] for idx in top_indices] # --- build_rag_chain --- def build_rag_chain(): global QA_CHAIN, RETRIEVER, LLM, PROMPT, FMEA_DF, DOCUMENTS, feedback_vector_store, embeddings try: print(f"Loading FMEA data from {FMEA_DATA_FILE}...") fmea_df = pd.read_csv(FMEA_DATA_FILE).fillna("") documents = [] for idx, row in fmea_df.iterrows(): page_content = "\n".join([f"{col}: {row[col]}" for col in fmea_df.columns]) metadata = {"row": int(idx)} if "Failure_Mode" in fmea_df.columns: metadata["source"] = str(row["Failure_Mode"]) documents.append(Document(page_content=page_content, metadata=metadata)) search_cols = [c for c in ["Failure_Mode", "Effect", "Cause", "Recommended_Action", "Responsible_Department"] if c in fmea_df.columns] fmea_df["__search_text"] = fmea_df[search_cols].astype(str).agg(" ".join, axis=1).str.lower() FMEA_DF = fmea_df DOCUMENTS = documents print(f"ā Successfully loaded {len(documents)} records.") print("Initializing local HuggingFace embedding model...") try: embeddings = HuggingFaceEmbeddings( model_name='all-MiniLM-L6-v2', model_kwargs={'device': DEVICE} ) print("ā Local embedding model loaded.") build_feedback_db() print("Creating embeddings and building main FAISS vector store...") main_vector_store = FAISS.from_documents(documents, embeddings) RETRIEVER = main_vector_store.as_retriever(search_kwargs={"k": 2}) print("ā Main vector store created successfully.") except Exception as embed_error: embeddings = None RETRIEVER = None feedback_vector_store = None print(f"ā ļø Embedding setup failed, using keyword retrieval fallback. Details: {embed_error}") # --- UPDATED TO USE LLAMA 3.3 VIA GROQ --- llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.2) prompt_template = """ You are an expert FMEA analyst. Your task is to generate the TOP 3 recommended actions for the given failure. The user has provided their current S, O, and D scores. For EACH recommendation, you must also estimate the revised S, O, and D scores (1-10) that would result *after* that action is successfully implemented. - **new_S (Severity):** This score should *usually* stay the same as the original Severity. - **new_O (Occurrence):** This score should be *lower* than the original Occurrence. - **new_D (Detection):** This score should be *lower* than the original Detection (as the action makes the failure easier to detect). CONTEXT (Historical data and user feedback): {context} QUESTION (The new failure and its current scores): {question} INSTRUCTIONS: Format your entire response as a single, valid JSON object with a key "recommendations" which is a list of 3 objects. Each object must have these keys: "rank", "action", "action_details", "department", "ai_score", "new_S", "new_O", "new_D". - "rank": The rank of the recommendation (1, 2, 3). - "action": The recommended action text. - "action_details": 2-3 sentences explaining why this action works and practical implementation notes. - "department": The most likely responsible department. - "ai_score": Confidence score (1-100) for this recommendation. - "new_S": Your estimated new Severity score (1-10). - "new_O": Your estimated new Occurrence score (1-10). - "new_D": Your estimated new Detection score (1-10). CRITICAL: Output ONLY the raw JSON object. Do not include markdown formatting like ```json or any introductory text. """ PROMPT = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) LLM = llm QA_CHAIN = True print("ā RAG model is ready.") return True except Exception as e: print(f"š“ An error occurred during RAG setup: {e}") return False # --- 3. Gradio Interface Logic --- def fmea_rag_interface(mode, effect, cause, severity, occurrence, detection): if QA_CHAIN is None or LLM is None or PROMPT is None: return "RAG Model is not initialized.", pd.DataFrame(), "" rpn = severity * occurrence * detection rpn_text = f"Current RPN (SĆOĆD): {int(rpn)}" query = ( f"For a failure with Failure Mode='{mode}', Effect='{effect}', and Cause='{cause}', " f"what are the top 3 most appropriate recommended actions? " f"The current scores are: Severity={severity}, Occurrence={occurrence}, Detection={detection}." ) if RETRIEVER is not None: docs = RETRIEVER.invoke(query) else: docs = keyword_retrieve_documents(f"{mode} {effect} {cause}", k=2) context_from_history = "\n---\n".join([doc.page_content for doc in docs]) context_from_feedback = "" if feedback_vector_store: feedback_docs = feedback_vector_store.similarity_search(query, k=3) if feedback_docs: feedback_actions = "\n".join([doc.page_content for doc in feedback_docs]) context_from_feedback = f"\n\n--- Highly-Rated Actions from User Feedback ---\n{feedback_actions}" combined_context = f"--- Historical FMEA Entries ---\n{context_from_history}{context_from_feedback}" try: llm_input = PROMPT.format(context=combined_context, question=query) llm_response = LLM.invoke(llm_input) # --- IMPROVED JSON PARSING FOR LLAMA --- raw_output = str(getattr(llm_response, "content", llm_response)).strip() # Find everything between the first '{' and the last '}' match = re.search(r'\{.*\}', raw_output, re.DOTALL) if match: json_text = match.group(0) else: # Fallback if the regex fails json_text = raw_output.replace("```json", "").replace("```", "").strip() data = json.loads(json_text) output_df = pd.DataFrame(data['recommendations']) if 'action_details' not in output_df.columns: output_df['action_details'] = "No additional details provided." feedback_stats = load_feedback_stats() default_stat = {'mean': 0, 'count': 0} stats_list = [feedback_stats.get(normalize_action(action), default_stat) for action in output_df['action']] output_df['avg_feedback'] = [stat['mean'] for stat in stats_list] output_df['feedback_count'] = [stat['count'] for stat in stats_list] output_df['new_S'] = output_df['new_S'].astype(int) output_df['new_O'] = output_df['new_O'].astype(int) output_df['new_D'] = output_df['new_D'].astype(int) output_df['new_RPN'] = output_df['new_S'] * output_df['new_O'] * output_df['new_D'] rpn_change_list = [f"{int(rpn)} ā {int(new_rpn)}" for new_rpn in output_df['new_RPN']] display_df = pd.DataFrame({ "Rank": output_df['rank'], "Recommended Action": output_df['action'], "Action Details": output_df['action_details'], "Department": output_df['department'], "AI Confidence": [f"{score}%" for score in output_df['ai_score']], "Avg. Feedback": [f"{avg:.2f}/10 ({int(count)})" for avg, count in zip(output_df['avg_feedback'], output_df['feedback_count'])], "Revised RPN": rpn_change_list }) except Exception as e: print(f"Error parsing LLM output: {e}\nRaw Output was: {raw_output if 'raw_output' in locals() else 'None'}") return rpn_text, pd.DataFrame({"Error": [f"Could not parse AI response: {e}"]}), None return rpn_text, display_df, output_df def get_level_info(val): levels = { 10: "Hazardous", 9: "Serious", 8: "Extreme", 7: "Major", 6: "Significant", 5: "Moderate", 4: "Minor", 3: "Slight", 2: "Very Slight", 1: "No Effect" } return gr.update(info=f"Level: {levels.get(val, '')}") # --- 6. Main Application Execution --- if build_rag_chain(): print("\nš Launching Gradio Interface...") with gr.Blocks(theme=gr.themes.Default(primary_hue=gr.themes.colors.blue)) as demo: gr.Markdown("