Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import re | |
| from datetime import datetime, timezone | |
| import torch | |
| import gradio as gr | |
| import pandas as pd | |
| # --- LangChain & Groq Imports --- | |
| from langchain_groq import ChatGroq | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_core.documents import Document | |
| from langchain_core.prompts import PromptTemplate | |
| # --- 1. Setup API Key for Groq --- | |
| # Ensure you add GROQ_API_KEY to your Hugging Face Space Secrets | |
| GROQ_API_KEY = os.environ.get('GROQ_API_KEY') | |
| if not GROQ_API_KEY: | |
| raise ValueError("π΄ GROQ_API_KEY not found. Please add it to your Hugging Face Space Secrets.") | |
| # --- 2. Build the RAG Chain & Feedback System --- | |
| FMEA_DATA_FILE = '10000fmea_data.csv' | |
| FEEDBACK_FILE = 'fmea_feedback.csv' | |
| QA_CHAIN = None | |
| RETRIEVER = None | |
| LLM = None | |
| PROMPT = None | |
| FMEA_DF = None | |
| DOCUMENTS = None | |
| feedback_vector_store = None | |
| embeddings = None | |
| DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| print(f"β Using device: {DEVICE}") | |
| # --- FEEDBACK LOOP PART 1: Saving, Normalizing, and Loading Feedback --- | |
| def normalize_action(text: str) -> str: | |
| return re.sub(r'\s+', ' ', str(text).strip().lower()) | |
| def load_feedback_stats(): | |
| if not os.path.exists(FEEDBACK_FILE): | |
| return {} | |
| try: | |
| feedback_df = pd.read_csv(FEEDBACK_FILE) | |
| if feedback_df.empty: | |
| return {} | |
| if "rating" not in feedback_df.columns: | |
| return {} | |
| stats = feedback_df.groupby('action')['rating'].agg(['mean', 'count']).to_dict('index') | |
| return stats | |
| except pd.errors.EmptyDataError: | |
| return {} | |
| def ensure_feedback_schema(): | |
| target_cols = ["action", "rating", "feedback_type", "timestamp_utc"] | |
| if not os.path.exists(FEEDBACK_FILE): | |
| return | |
| try: | |
| existing_df = pd.read_csv(FEEDBACK_FILE) | |
| if existing_df.empty: | |
| pd.DataFrame(columns=target_cols).to_csv(FEEDBACK_FILE, index=False) | |
| return | |
| changed = False | |
| for col in target_cols: | |
| if col not in existing_df.columns: | |
| existing_df[col] = "" | |
| changed = True | |
| if changed: | |
| existing_df = existing_df[target_cols] | |
| existing_df.to_csv(FEEDBACK_FILE, index=False) | |
| except pd.errors.EmptyDataError: | |
| pd.DataFrame(columns=target_cols).to_csv(FEEDBACK_FILE, index=False) | |
| def save_feedback(action, feedback_choice, display_df): | |
| if not action: | |
| return "Please select a recommendation from the table first.", display_df | |
| choice_map = { | |
| "π Thumbs Up": ("thumbs_up", 10), | |
| "π Thumbs Down": ("thumbs_down", 3) | |
| } | |
| feedback_type, rating = choice_map.get(feedback_choice, ("thumbs_up", 10)) | |
| timestamp_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| norm_action = normalize_action(action) | |
| new_feedback = pd.DataFrame([{ | |
| 'action': norm_action, | |
| 'rating': int(rating), | |
| 'feedback_type': feedback_type, | |
| 'timestamp_utc': timestamp_utc | |
| }]) | |
| if not os.path.exists(FEEDBACK_FILE): | |
| new_feedback.to_csv(FEEDBACK_FILE, index=False) | |
| else: | |
| ensure_feedback_schema() | |
| new_feedback.to_csv(FEEDBACK_FILE, mode='a', header=False, index=False) | |
| build_feedback_db() | |
| msg = f"β Feedback saved ({feedback_choice}) for: {action} at {timestamp_utc}" | |
| # Update the displayed table dynamically | |
| if display_df is not None and not display_df.empty: | |
| try: | |
| feedback_stats = load_feedback_stats() | |
| default_stat = {'mean': 0, 'count': 0} | |
| stats_list = [feedback_stats.get(normalize_action(act), default_stat) for act in display_df['Recommended Action']] | |
| display_df['Avg. Feedback'] = [f"{stat['mean']:.2f}/10 ({int(stat['count'])})" for stat in stats_list] | |
| except Exception as e: | |
| print(f"Error updating display_df: {e}") | |
| return msg, display_df | |
| def build_feedback_db(): | |
| global feedback_vector_store | |
| if not os.path.exists(FEEDBACK_FILE): | |
| return | |
| try: | |
| feedback_df = pd.read_csv(FEEDBACK_FILE) | |
| if feedback_df.empty: | |
| return | |
| except pd.errors.EmptyDataError: | |
| return | |
| avg_ratings = feedback_df.groupby('action')['rating'].mean() | |
| highly_rated_actions = avg_ratings[avg_ratings > 7].index.tolist() | |
| if highly_rated_actions and embeddings: | |
| print(f"Found {len(highly_rated_actions)} highly-rated actions. Building feedback vector store...") | |
| feedback_vector_store = FAISS.from_texts(highly_rated_actions, embeddings) | |
| print("β Feedback vector store is ready.") | |
| def keyword_retrieve_documents(search_query: str, k: int = 2): | |
| if FMEA_DF is None or DOCUMENTS is None or FMEA_DF.empty: | |
| return [] | |
| tokens = [tok for tok in re.findall(r"[a-z0-9]+", str(search_query).lower()) if len(tok) >= 3] | |
| if not tokens: | |
| return DOCUMENTS[:k] | |
| scores = [] | |
| for idx, text in enumerate(FMEA_DF["__search_text"]): | |
| token_hits = sum(1 for tok in tokens if tok in text) | |
| if token_hits: | |
| scores.append((token_hits, idx)) | |
| if not scores: | |
| return DOCUMENTS[:k] | |
| scores.sort(key=lambda x: x[0], reverse=True) | |
| top_indices = [idx for _, idx in scores[:k]] | |
| return [DOCUMENTS[idx] for idx in top_indices] | |
| # --- build_rag_chain --- | |
| def build_rag_chain(): | |
| global QA_CHAIN, RETRIEVER, LLM, PROMPT, FMEA_DF, DOCUMENTS, feedback_vector_store, embeddings | |
| try: | |
| print(f"Loading FMEA data from {FMEA_DATA_FILE}...") | |
| fmea_df = pd.read_csv(FMEA_DATA_FILE).fillna("") | |
| documents = [] | |
| for idx, row in fmea_df.iterrows(): | |
| page_content = "\n".join([f"{col}: {row[col]}" for col in fmea_df.columns]) | |
| metadata = {"row": int(idx)} | |
| if "Failure_Mode" in fmea_df.columns: | |
| metadata["source"] = str(row["Failure_Mode"]) | |
| documents.append(Document(page_content=page_content, metadata=metadata)) | |
| search_cols = [c for c in ["Failure_Mode", "Effect", "Cause", "Recommended_Action", "Responsible_Department"] if c in fmea_df.columns] | |
| fmea_df["__search_text"] = fmea_df[search_cols].astype(str).agg(" ".join, axis=1).str.lower() | |
| FMEA_DF = fmea_df | |
| DOCUMENTS = documents | |
| print(f"β Successfully loaded {len(documents)} records.") | |
| print("Initializing local HuggingFace embedding model...") | |
| try: | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name='all-MiniLM-L6-v2', | |
| model_kwargs={'device': DEVICE} | |
| ) | |
| print("β Local embedding model loaded.") | |
| build_feedback_db() | |
| print("Creating embeddings and building main FAISS vector store...") | |
| main_vector_store = FAISS.from_documents(documents, embeddings) | |
| RETRIEVER = main_vector_store.as_retriever(search_kwargs={"k": 2}) | |
| print("β Main vector store created successfully.") | |
| except Exception as embed_error: | |
| embeddings = None | |
| RETRIEVER = None | |
| feedback_vector_store = None | |
| print(f"β οΈ Embedding setup failed, using keyword retrieval fallback. Details: {embed_error}") | |
| # --- UPDATED TO USE LLAMA 3.3 VIA GROQ --- | |
| llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.2) | |
| prompt_template = """ | |
| You are an expert FMEA analyst. Your task is to generate the TOP 3 recommended actions for the given failure. | |
| The user has provided their current S, O, and D scores. | |
| For EACH recommendation, you must also estimate the revised S, O, and D scores (1-10) that would result *after* that action is successfully implemented. | |
| - **new_S (Severity):** This score should *usually* stay the same as the original Severity. | |
| - **new_O (Occurrence):** This score should be *lower* than the original Occurrence. | |
| - **new_D (Detection):** This score should be *lower* than the original Detection (as the action makes the failure easier to detect). | |
| CONTEXT (Historical data and user feedback): | |
| {context} | |
| QUESTION (The new failure and its current scores): | |
| {question} | |
| INSTRUCTIONS: | |
| Format your entire response as a single, valid JSON object with a key "recommendations" which is a list of 3 objects. | |
| Each object must have these keys: "rank", "action", "action_details", "department", "ai_score", "new_S", "new_O", "new_D". | |
| - "rank": The rank of the recommendation (1, 2, 3). | |
| - "action": The recommended action text. | |
| - "action_details": 2-3 sentences explaining why this action works and practical implementation notes. | |
| - "department": The most likely responsible department. | |
| - "ai_score": Confidence score (1-100) for this recommendation. | |
| - "new_S": Your estimated new Severity score (1-10). | |
| - "new_O": Your estimated new Occurrence score (1-10). | |
| - "new_D": Your estimated new Detection score (1-10). | |
| CRITICAL: Output ONLY the raw JSON object. Do not include markdown formatting like ```json or any introductory text. | |
| """ | |
| PROMPT = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) | |
| LLM = llm | |
| QA_CHAIN = True | |
| print("β RAG model is ready.") | |
| return True | |
| except Exception as e: | |
| print(f"π΄ An error occurred during RAG setup: {e}") | |
| return False | |
| # --- 3. Gradio Interface Logic --- | |
| def fmea_rag_interface(mode, effect, cause, severity, occurrence, detection): | |
| if QA_CHAIN is None or LLM is None or PROMPT is None: | |
| return "RAG Model is not initialized.", pd.DataFrame(), "" | |
| rpn = severity * occurrence * detection | |
| rpn_text = f"Current RPN (SΓOΓD): {int(rpn)}" | |
| query = ( | |
| f"For a failure with Failure Mode='{mode}', Effect='{effect}', and Cause='{cause}', " | |
| f"what are the top 3 most appropriate recommended actions? " | |
| f"The current scores are: Severity={severity}, Occurrence={occurrence}, Detection={detection}." | |
| ) | |
| if RETRIEVER is not None: | |
| docs = RETRIEVER.invoke(query) | |
| else: | |
| docs = keyword_retrieve_documents(f"{mode} {effect} {cause}", k=2) | |
| context_from_history = "\n---\n".join([doc.page_content for doc in docs]) | |
| context_from_feedback = "" | |
| if feedback_vector_store: | |
| feedback_docs = feedback_vector_store.similarity_search(query, k=3) | |
| if feedback_docs: | |
| feedback_actions = "\n".join([doc.page_content for doc in feedback_docs]) | |
| context_from_feedback = f"\n\n--- Highly-Rated Actions from User Feedback ---\n{feedback_actions}" | |
| combined_context = f"--- Historical FMEA Entries ---\n{context_from_history}{context_from_feedback}" | |
| try: | |
| llm_input = PROMPT.format(context=combined_context, question=query) | |
| llm_response = LLM.invoke(llm_input) | |
| # --- IMPROVED JSON PARSING FOR LLAMA --- | |
| raw_output = str(getattr(llm_response, "content", llm_response)).strip() | |
| # Find everything between the first '{' and the last '}' | |
| match = re.search(r'\{.*\}', raw_output, re.DOTALL) | |
| if match: | |
| json_text = match.group(0) | |
| else: | |
| # Fallback if the regex fails | |
| json_text = raw_output.replace("```json", "").replace("```", "").strip() | |
| data = json.loads(json_text) | |
| output_df = pd.DataFrame(data['recommendations']) | |
| if 'action_details' not in output_df.columns: | |
| output_df['action_details'] = "No additional details provided." | |
| feedback_stats = load_feedback_stats() | |
| default_stat = {'mean': 0, 'count': 0} | |
| stats_list = [feedback_stats.get(normalize_action(action), default_stat) for action in output_df['action']] | |
| output_df['avg_feedback'] = [stat['mean'] for stat in stats_list] | |
| output_df['feedback_count'] = [stat['count'] for stat in stats_list] | |
| output_df['new_S'] = output_df['new_S'].astype(int) | |
| output_df['new_O'] = output_df['new_O'].astype(int) | |
| output_df['new_D'] = output_df['new_D'].astype(int) | |
| output_df['new_RPN'] = output_df['new_S'] * output_df['new_O'] * output_df['new_D'] | |
| rpn_change_list = [f"{int(rpn)} β {int(new_rpn)}" for new_rpn in output_df['new_RPN']] | |
| display_df = pd.DataFrame({ | |
| "Rank": output_df['rank'], | |
| "Recommended Action": output_df['action'], | |
| "Action Details": output_df['action_details'], | |
| "Department": output_df['department'], | |
| "AI Confidence": [f"{score}%" for score in output_df['ai_score']], | |
| "Avg. Feedback": [f"{avg:.2f}/10 ({int(count)})" for avg, count in zip(output_df['avg_feedback'], output_df['feedback_count'])], | |
| "Revised RPN": rpn_change_list | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing LLM output: {e}\nRaw Output was: {raw_output if 'raw_output' in locals() else 'None'}") | |
| return rpn_text, pd.DataFrame({"Error": [f"Could not parse AI response: {e}"]}), None | |
| return rpn_text, display_df, output_df | |
| def get_level_info(val): | |
| levels = { | |
| 10: "Hazardous", 9: "Serious", 8: "Extreme", 7: "Major", | |
| 6: "Significant", 5: "Moderate", 4: "Minor", 3: "Slight", | |
| 2: "Very Slight", 1: "No Effect" | |
| } | |
| return gr.update(info=f"Level: {levels.get(val, '')}") | |
| # --- 6. Main Application Execution --- | |
| if build_rag_chain(): | |
| print("\nπ Launching Gradio Interface...") | |
| with gr.Blocks(theme=gr.themes.Default(primary_hue=gr.themes.colors.blue)) as demo: | |
| gr.Markdown("<h1>Pangun ReliAI-FMEA</h1>") | |
| with gr.Group(): | |
| gr.Markdown("## FMEA Inputs ") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| f_mode = gr.Textbox(label="Failure Mode", placeholder="e.g., Engine Overheating") | |
| f_effect = gr.Textbox(label="Effect", placeholder="e.g., Reduced vehicle performance") | |
| f_cause = gr.Textbox(label="Cause", placeholder="e.g., Coolant leak") | |
| with gr.Column(scale=1): | |
| f_sev = gr.Slider(1, 10, value=5, step=1, label="Severity", info="Level: Moderate") | |
| f_occ = gr.Slider(1, 10, value=5, step=1, label="Occurrence", info="Level: Moderate") | |
| f_det = gr.Slider(1, 10, value=5, step=1, label="Detection", info="Level: Moderate") | |
| f_sev.change(fn=get_level_info, inputs=f_sev, outputs=f_sev) | |
| f_occ.change(fn=get_level_info, inputs=f_occ, outputs=f_occ) | |
| f_det.change(fn=get_level_info, inputs=f_det, outputs=f_det) | |
| submit_btn = gr.Button("Get AI Recommendations", variant="primary") | |
| with gr.Group(): | |
| gr.Markdown("## π‘ Top 3 AI-Generated Recommendations") | |
| rpn_output = gr.Textbox(label="Current RPN", interactive=False) | |
| recommendations_output = gr.DataFrame( | |
| headers=["Rank", "Recommended Action", "Action Details", "Department", "AI Confidence", "Avg. Feedback", "Revised RPN"], | |
| datatype=["number", "str", "str", "str", "str", "str", "str"] | |
| ) | |
| df_state = gr.State() | |
| with gr.Group(): | |
| gr.Markdown("## β Provide Feedback") | |
| gr.Markdown("Click a row in the table above to select it, then submit a thumbs up or thumbs down.") | |
| selected_action_text = gr.Textbox(label="Selected for Feedback", interactive=False) | |
| feedback_choice = gr.Radio( | |
| choices=["π Thumbs Up", "π Thumbs Down"], | |
| value="π Thumbs Up", | |
| label="Your Feedback" | |
| ) | |
| submit_feedback_btn = gr.Button("Submit Feedback") | |
| feedback_status = gr.Textbox(label="Feedback Status", interactive=False) | |
| # FIX 1: Safer update_selection function | |
| def update_selection(table_df, evt: gr.SelectData): | |
| if table_df is None or len(table_df) == 0: | |
| return "" | |
| row_idx = evt.index[0] | |
| # "Recommended Action" is the 2nd column in your UI table (index 1) | |
| selected_action = table_df.iloc[row_idx, 1] | |
| return selected_action | |
| submit_btn.click( | |
| fn=fmea_rag_interface, | |
| inputs=[f_mode, f_effect, f_cause, f_sev, f_occ, f_det], | |
| outputs=[rpn_output, recommendations_output, df_state] | |
| ) | |
| # FIX 2: Trigger relies on the visible table | |
| recommendations_output.select( | |
| fn=update_selection, | |
| inputs=[recommendations_output], | |
| outputs=[selected_action_text] | |
| ) | |
| submit_feedback_btn.click( | |
| fn=save_feedback, | |
| inputs=[selected_action_text, feedback_choice, recommendations_output], | |
| outputs=[feedback_status, recommendations_output] | |
| ) | |
| # Launch command for Hugging Face | |
| demo.launch() | |