FMEA-AI / app.py
TimeCapsuleX's picture
Add application file
ade2111
import os
import json
import re
from datetime import datetime, timezone
import torch
import gradio as gr
import pandas as pd
# --- LangChain & Groq Imports ---
from langchain_groq import ChatGroq
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
# --- 1. Setup API Key for Groq ---
# Ensure you add GROQ_API_KEY to your Hugging Face Space Secrets
GROQ_API_KEY = os.environ.get('GROQ_API_KEY')
if not GROQ_API_KEY:
raise ValueError("πŸ”΄ GROQ_API_KEY not found. Please add it to your Hugging Face Space Secrets.")
# --- 2. Build the RAG Chain & Feedback System ---
FMEA_DATA_FILE = '10000fmea_data.csv'
FEEDBACK_FILE = 'fmea_feedback.csv'
QA_CHAIN = None
RETRIEVER = None
LLM = None
PROMPT = None
FMEA_DF = None
DOCUMENTS = None
feedback_vector_store = None
embeddings = None
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"βœ… Using device: {DEVICE}")
# --- FEEDBACK LOOP PART 1: Saving, Normalizing, and Loading Feedback ---
def normalize_action(text: str) -> str:
return re.sub(r'\s+', ' ', str(text).strip().lower())
def load_feedback_stats():
if not os.path.exists(FEEDBACK_FILE):
return {}
try:
feedback_df = pd.read_csv(FEEDBACK_FILE)
if feedback_df.empty:
return {}
if "rating" not in feedback_df.columns:
return {}
stats = feedback_df.groupby('action')['rating'].agg(['mean', 'count']).to_dict('index')
return stats
except pd.errors.EmptyDataError:
return {}
def ensure_feedback_schema():
target_cols = ["action", "rating", "feedback_type", "timestamp_utc"]
if not os.path.exists(FEEDBACK_FILE):
return
try:
existing_df = pd.read_csv(FEEDBACK_FILE)
if existing_df.empty:
pd.DataFrame(columns=target_cols).to_csv(FEEDBACK_FILE, index=False)
return
changed = False
for col in target_cols:
if col not in existing_df.columns:
existing_df[col] = ""
changed = True
if changed:
existing_df = existing_df[target_cols]
existing_df.to_csv(FEEDBACK_FILE, index=False)
except pd.errors.EmptyDataError:
pd.DataFrame(columns=target_cols).to_csv(FEEDBACK_FILE, index=False)
def save_feedback(action, feedback_choice, display_df):
if not action:
return "Please select a recommendation from the table first.", display_df
choice_map = {
"πŸ‘ Thumbs Up": ("thumbs_up", 10),
"πŸ‘Ž Thumbs Down": ("thumbs_down", 3)
}
feedback_type, rating = choice_map.get(feedback_choice, ("thumbs_up", 10))
timestamp_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
norm_action = normalize_action(action)
new_feedback = pd.DataFrame([{
'action': norm_action,
'rating': int(rating),
'feedback_type': feedback_type,
'timestamp_utc': timestamp_utc
}])
if not os.path.exists(FEEDBACK_FILE):
new_feedback.to_csv(FEEDBACK_FILE, index=False)
else:
ensure_feedback_schema()
new_feedback.to_csv(FEEDBACK_FILE, mode='a', header=False, index=False)
build_feedback_db()
msg = f"βœ… Feedback saved ({feedback_choice}) for: {action} at {timestamp_utc}"
# Update the displayed table dynamically
if display_df is not None and not display_df.empty:
try:
feedback_stats = load_feedback_stats()
default_stat = {'mean': 0, 'count': 0}
stats_list = [feedback_stats.get(normalize_action(act), default_stat) for act in display_df['Recommended Action']]
display_df['Avg. Feedback'] = [f"{stat['mean']:.2f}/10 ({int(stat['count'])})" for stat in stats_list]
except Exception as e:
print(f"Error updating display_df: {e}")
return msg, display_df
def build_feedback_db():
global feedback_vector_store
if not os.path.exists(FEEDBACK_FILE):
return
try:
feedback_df = pd.read_csv(FEEDBACK_FILE)
if feedback_df.empty:
return
except pd.errors.EmptyDataError:
return
avg_ratings = feedback_df.groupby('action')['rating'].mean()
highly_rated_actions = avg_ratings[avg_ratings > 7].index.tolist()
if highly_rated_actions and embeddings:
print(f"Found {len(highly_rated_actions)} highly-rated actions. Building feedback vector store...")
feedback_vector_store = FAISS.from_texts(highly_rated_actions, embeddings)
print("βœ… Feedback vector store is ready.")
def keyword_retrieve_documents(search_query: str, k: int = 2):
if FMEA_DF is None or DOCUMENTS is None or FMEA_DF.empty:
return []
tokens = [tok for tok in re.findall(r"[a-z0-9]+", str(search_query).lower()) if len(tok) >= 3]
if not tokens:
return DOCUMENTS[:k]
scores = []
for idx, text in enumerate(FMEA_DF["__search_text"]):
token_hits = sum(1 for tok in tokens if tok in text)
if token_hits:
scores.append((token_hits, idx))
if not scores:
return DOCUMENTS[:k]
scores.sort(key=lambda x: x[0], reverse=True)
top_indices = [idx for _, idx in scores[:k]]
return [DOCUMENTS[idx] for idx in top_indices]
# --- build_rag_chain ---
def build_rag_chain():
global QA_CHAIN, RETRIEVER, LLM, PROMPT, FMEA_DF, DOCUMENTS, feedback_vector_store, embeddings
try:
print(f"Loading FMEA data from {FMEA_DATA_FILE}...")
fmea_df = pd.read_csv(FMEA_DATA_FILE).fillna("")
documents = []
for idx, row in fmea_df.iterrows():
page_content = "\n".join([f"{col}: {row[col]}" for col in fmea_df.columns])
metadata = {"row": int(idx)}
if "Failure_Mode" in fmea_df.columns:
metadata["source"] = str(row["Failure_Mode"])
documents.append(Document(page_content=page_content, metadata=metadata))
search_cols = [c for c in ["Failure_Mode", "Effect", "Cause", "Recommended_Action", "Responsible_Department"] if c in fmea_df.columns]
fmea_df["__search_text"] = fmea_df[search_cols].astype(str).agg(" ".join, axis=1).str.lower()
FMEA_DF = fmea_df
DOCUMENTS = documents
print(f"βœ… Successfully loaded {len(documents)} records.")
print("Initializing local HuggingFace embedding model...")
try:
embeddings = HuggingFaceEmbeddings(
model_name='all-MiniLM-L6-v2',
model_kwargs={'device': DEVICE}
)
print("βœ… Local embedding model loaded.")
build_feedback_db()
print("Creating embeddings and building main FAISS vector store...")
main_vector_store = FAISS.from_documents(documents, embeddings)
RETRIEVER = main_vector_store.as_retriever(search_kwargs={"k": 2})
print("βœ… Main vector store created successfully.")
except Exception as embed_error:
embeddings = None
RETRIEVER = None
feedback_vector_store = None
print(f"⚠️ Embedding setup failed, using keyword retrieval fallback. Details: {embed_error}")
# --- UPDATED TO USE LLAMA 3.3 VIA GROQ ---
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.2)
prompt_template = """
You are an expert FMEA analyst. Your task is to generate the TOP 3 recommended actions for the given failure.
The user has provided their current S, O, and D scores.
For EACH recommendation, you must also estimate the revised S, O, and D scores (1-10) that would result *after* that action is successfully implemented.
- **new_S (Severity):** This score should *usually* stay the same as the original Severity.
- **new_O (Occurrence):** This score should be *lower* than the original Occurrence.
- **new_D (Detection):** This score should be *lower* than the original Detection (as the action makes the failure easier to detect).
CONTEXT (Historical data and user feedback):
{context}
QUESTION (The new failure and its current scores):
{question}
INSTRUCTIONS:
Format your entire response as a single, valid JSON object with a key "recommendations" which is a list of 3 objects.
Each object must have these keys: "rank", "action", "action_details", "department", "ai_score", "new_S", "new_O", "new_D".
- "rank": The rank of the recommendation (1, 2, 3).
- "action": The recommended action text.
- "action_details": 2-3 sentences explaining why this action works and practical implementation notes.
- "department": The most likely responsible department.
- "ai_score": Confidence score (1-100) for this recommendation.
- "new_S": Your estimated new Severity score (1-10).
- "new_O": Your estimated new Occurrence score (1-10).
- "new_D": Your estimated new Detection score (1-10).
CRITICAL: Output ONLY the raw JSON object. Do not include markdown formatting like ```json or any introductory text.
"""
PROMPT = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
LLM = llm
QA_CHAIN = True
print("βœ… RAG model is ready.")
return True
except Exception as e:
print(f"πŸ”΄ An error occurred during RAG setup: {e}")
return False
# --- 3. Gradio Interface Logic ---
def fmea_rag_interface(mode, effect, cause, severity, occurrence, detection):
if QA_CHAIN is None or LLM is None or PROMPT is None:
return "RAG Model is not initialized.", pd.DataFrame(), ""
rpn = severity * occurrence * detection
rpn_text = f"Current RPN (SΓ—OΓ—D): {int(rpn)}"
query = (
f"For a failure with Failure Mode='{mode}', Effect='{effect}', and Cause='{cause}', "
f"what are the top 3 most appropriate recommended actions? "
f"The current scores are: Severity={severity}, Occurrence={occurrence}, Detection={detection}."
)
if RETRIEVER is not None:
docs = RETRIEVER.invoke(query)
else:
docs = keyword_retrieve_documents(f"{mode} {effect} {cause}", k=2)
context_from_history = "\n---\n".join([doc.page_content for doc in docs])
context_from_feedback = ""
if feedback_vector_store:
feedback_docs = feedback_vector_store.similarity_search(query, k=3)
if feedback_docs:
feedback_actions = "\n".join([doc.page_content for doc in feedback_docs])
context_from_feedback = f"\n\n--- Highly-Rated Actions from User Feedback ---\n{feedback_actions}"
combined_context = f"--- Historical FMEA Entries ---\n{context_from_history}{context_from_feedback}"
try:
llm_input = PROMPT.format(context=combined_context, question=query)
llm_response = LLM.invoke(llm_input)
# --- IMPROVED JSON PARSING FOR LLAMA ---
raw_output = str(getattr(llm_response, "content", llm_response)).strip()
# Find everything between the first '{' and the last '}'
match = re.search(r'\{.*\}', raw_output, re.DOTALL)
if match:
json_text = match.group(0)
else:
# Fallback if the regex fails
json_text = raw_output.replace("```json", "").replace("```", "").strip()
data = json.loads(json_text)
output_df = pd.DataFrame(data['recommendations'])
if 'action_details' not in output_df.columns:
output_df['action_details'] = "No additional details provided."
feedback_stats = load_feedback_stats()
default_stat = {'mean': 0, 'count': 0}
stats_list = [feedback_stats.get(normalize_action(action), default_stat) for action in output_df['action']]
output_df['avg_feedback'] = [stat['mean'] for stat in stats_list]
output_df['feedback_count'] = [stat['count'] for stat in stats_list]
output_df['new_S'] = output_df['new_S'].astype(int)
output_df['new_O'] = output_df['new_O'].astype(int)
output_df['new_D'] = output_df['new_D'].astype(int)
output_df['new_RPN'] = output_df['new_S'] * output_df['new_O'] * output_df['new_D']
rpn_change_list = [f"{int(rpn)} βž” {int(new_rpn)}" for new_rpn in output_df['new_RPN']]
display_df = pd.DataFrame({
"Rank": output_df['rank'],
"Recommended Action": output_df['action'],
"Action Details": output_df['action_details'],
"Department": output_df['department'],
"AI Confidence": [f"{score}%" for score in output_df['ai_score']],
"Avg. Feedback": [f"{avg:.2f}/10 ({int(count)})" for avg, count in zip(output_df['avg_feedback'], output_df['feedback_count'])],
"Revised RPN": rpn_change_list
})
except Exception as e:
print(f"Error parsing LLM output: {e}\nRaw Output was: {raw_output if 'raw_output' in locals() else 'None'}")
return rpn_text, pd.DataFrame({"Error": [f"Could not parse AI response: {e}"]}), None
return rpn_text, display_df, output_df
def get_level_info(val):
levels = {
10: "Hazardous", 9: "Serious", 8: "Extreme", 7: "Major",
6: "Significant", 5: "Moderate", 4: "Minor", 3: "Slight",
2: "Very Slight", 1: "No Effect"
}
return gr.update(info=f"Level: {levels.get(val, '')}")
# --- 6. Main Application Execution ---
if build_rag_chain():
print("\nπŸš€ Launching Gradio Interface...")
with gr.Blocks(theme=gr.themes.Default(primary_hue=gr.themes.colors.blue)) as demo:
gr.Markdown("<h1>Pangun ReliAI-FMEA</h1>")
with gr.Group():
gr.Markdown("## FMEA Inputs ")
with gr.Row():
with gr.Column(scale=2):
f_mode = gr.Textbox(label="Failure Mode", placeholder="e.g., Engine Overheating")
f_effect = gr.Textbox(label="Effect", placeholder="e.g., Reduced vehicle performance")
f_cause = gr.Textbox(label="Cause", placeholder="e.g., Coolant leak")
with gr.Column(scale=1):
f_sev = gr.Slider(1, 10, value=5, step=1, label="Severity", info="Level: Moderate")
f_occ = gr.Slider(1, 10, value=5, step=1, label="Occurrence", info="Level: Moderate")
f_det = gr.Slider(1, 10, value=5, step=1, label="Detection", info="Level: Moderate")
f_sev.change(fn=get_level_info, inputs=f_sev, outputs=f_sev)
f_occ.change(fn=get_level_info, inputs=f_occ, outputs=f_occ)
f_det.change(fn=get_level_info, inputs=f_det, outputs=f_det)
submit_btn = gr.Button("Get AI Recommendations", variant="primary")
with gr.Group():
gr.Markdown("## πŸ’‘ Top 3 AI-Generated Recommendations")
rpn_output = gr.Textbox(label="Current RPN", interactive=False)
recommendations_output = gr.DataFrame(
headers=["Rank", "Recommended Action", "Action Details", "Department", "AI Confidence", "Avg. Feedback", "Revised RPN"],
datatype=["number", "str", "str", "str", "str", "str", "str"]
)
df_state = gr.State()
with gr.Group():
gr.Markdown("## ⭐ Provide Feedback")
gr.Markdown("Click a row in the table above to select it, then submit a thumbs up or thumbs down.")
selected_action_text = gr.Textbox(label="Selected for Feedback", interactive=False)
feedback_choice = gr.Radio(
choices=["πŸ‘ Thumbs Up", "πŸ‘Ž Thumbs Down"],
value="πŸ‘ Thumbs Up",
label="Your Feedback"
)
submit_feedback_btn = gr.Button("Submit Feedback")
feedback_status = gr.Textbox(label="Feedback Status", interactive=False)
# FIX 1: Safer update_selection function
def update_selection(table_df, evt: gr.SelectData):
if table_df is None or len(table_df) == 0:
return ""
row_idx = evt.index[0]
# "Recommended Action" is the 2nd column in your UI table (index 1)
selected_action = table_df.iloc[row_idx, 1]
return selected_action
submit_btn.click(
fn=fmea_rag_interface,
inputs=[f_mode, f_effect, f_cause, f_sev, f_occ, f_det],
outputs=[rpn_output, recommendations_output, df_state]
)
# FIX 2: Trigger relies on the visible table
recommendations_output.select(
fn=update_selection,
inputs=[recommendations_output],
outputs=[selected_action_text]
)
submit_feedback_btn.click(
fn=save_feedback,
inputs=[selected_action_text, feedback_choice, recommendations_output],
outputs=[feedback_status, recommendations_output]
)
# Launch command for Hugging Face
demo.launch()