Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import re | |
| from datetime import datetime, timezone | |
| import torch | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from supabase import create_client, Client | |
| load_dotenv() | |
| # --- LangChain & Groq Imports --- | |
| from langchain_groq import ChatGroq | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_core.documents import Document | |
| from langchain_core.prompts import PromptTemplate | |
| # --- 1. Setup API Keys & Clients --- | |
| GROQ_API_KEY = os.environ.get('GROQ_API_KEY') | |
| SUPABASE_URL = os.environ.get("VITE_SUPABASE_URL") | |
| SUPABASE_KEY = os.environ.get("VITE_SUPABASE_PUBLISHABLE_KEY") | |
| supabase: Client | None = None | |
| if SUPABASE_URL and SUPABASE_KEY: | |
| supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| else: | |
| print("β οΈ Supabase credentials not found. Real-time feedback disabled.") | |
| app = FastAPI(title="Pangun ReliAI Backend", description="AI Backend for FMEA Recommendations") | |
| # Allow CORS for local React development | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- 2. Build the RAG Chain --- | |
| FMEA_DATA_FILE = '10000fmea_data.csv' | |
| QA_CHAIN = None | |
| RETRIEVER = None | |
| LLM = None | |
| PROMPT = None | |
| FMEA_DF = None | |
| DOCUMENTS = None | |
| embeddings = None | |
| feedback_vector_store = None | |
| DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| print(f"β Using device: {DEVICE}") | |
| def build_feedback_db(): | |
| global feedback_vector_store | |
| if not supabase or not embeddings: | |
| return | |
| try: | |
| # Fetch highly rated feedback from Supabase | |
| response = supabase.table("fmea_feedback").select("action, rating").gte("rating", 7).execute() | |
| if not response.data: | |
| return | |
| highly_rated_actions = [item['action'] for item in response.data if item.get('action')] | |
| # Deduplicate | |
| highly_rated_actions = list(set(highly_rated_actions)) | |
| if highly_rated_actions: | |
| print(f"Found {len(highly_rated_actions)} highly-rated actions from Supabase. Building feedback DB...") | |
| feedback_vector_store = FAISS.from_texts(highly_rated_actions, embeddings) | |
| print("β Supabase Feedback vector store is ready.") | |
| except Exception as e: | |
| print(f"Failed to build feedback DB from Supabase: {e}") | |
| def keyword_retrieve_documents(search_query: str, k: int = 2): | |
| if FMEA_DF is None or DOCUMENTS is None or FMEA_DF.empty: | |
| return [] | |
| tokens = [tok for tok in re.findall(r"[a-z0-9]+", str(search_query).lower()) if len(tok) >= 3] | |
| if not tokens: | |
| return DOCUMENTS[:k] | |
| scores = [] | |
| for idx, text in enumerate(FMEA_DF["__search_text"]): | |
| token_hits = sum(1 for tok in tokens if tok in text) | |
| if token_hits: | |
| scores.append((token_hits, idx)) | |
| if not scores: | |
| return DOCUMENTS[:k] | |
| scores.sort(key=lambda x: x[0], reverse=True) | |
| top_indices = [idx for _, idx in scores[:k]] | |
| return [DOCUMENTS[idx] for idx in top_indices] | |
| def build_rag_chain(): | |
| global QA_CHAIN, RETRIEVER, LLM, PROMPT, FMEA_DF, DOCUMENTS, embeddings | |
| try: | |
| print(f"Loading FMEA data from {FMEA_DATA_FILE}...") | |
| if not os.path.exists(FMEA_DATA_FILE): | |
| print(f"β οΈ {FMEA_DATA_FILE} not found. Skipping document loading.") | |
| else: | |
| fmea_df = pd.read_csv(FMEA_DATA_FILE).fillna("") | |
| documents = [] | |
| for idx, row in fmea_df.iterrows(): | |
| page_content = "\n".join([f"{col}: {row[col]}" for col in fmea_df.columns]) | |
| metadata = {"row": int(idx)} | |
| if "Failure_Mode" in fmea_df.columns: | |
| metadata["source"] = str(row["Failure_Mode"]) | |
| documents.append(Document(page_content=page_content, metadata=metadata)) | |
| search_cols = [c for c in ["Failure_Mode", "Effect", "Cause", "Recommended_Action", "Responsible_Department"] if c in fmea_df.columns] | |
| if len(search_cols) > 0: | |
| fmea_df["__search_text"] = fmea_df[search_cols].astype(str).agg(" ".join, axis=1).str.lower() | |
| else: | |
| fmea_df["__search_text"] = "" | |
| FMEA_DF = fmea_df | |
| DOCUMENTS = documents | |
| print(f"β Successfully loaded {len(documents)} records.") | |
| print("Initializing local HuggingFace embedding model...") | |
| try: | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name='all-MiniLM-L6-v2', | |
| model_kwargs={'device': DEVICE} | |
| ) | |
| print("β Local embedding model loaded.") | |
| print("Creating embeddings and building main FAISS vector store...") | |
| main_vector_store = FAISS.from_documents(documents, embeddings) | |
| RETRIEVER = main_vector_store.as_retriever(search_kwargs={"k": 2}) | |
| print("β Main vector store created successfully.") | |
| # Build real-time feedback DB | |
| build_feedback_db() | |
| except Exception as embed_error: | |
| embeddings = None | |
| RETRIEVER = None | |
| print(f"β οΈ Embedding setup failed, using keyword retrieval fallback. Details: {embed_error}") | |
| if not GROQ_API_KEY: | |
| print("β οΈ GROQ_API_KEY not found. AI requests will fail until set.") | |
| else: | |
| llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.1, api_key=GROQ_API_KEY) | |
| prompt_template = """ | |
| You are an expert FMEA analyst. Your task is to generate the TOP 3 recommended actions for the given failure. | |
| The user has provided their current S, O, and D scores. | |
| For EACH recommendation, you must estimate the revised S, O, and D scores (1-10) that would result *after* that action is successfully implemented. | |
| - **new_S (Severity):** This score MUST usually stay the exact same as the original Severity. Do not lower it unless the action physically changes the design to mitigate the effect completely. | |
| - **new_O (Occurrence):** This score MUST be lower than or equal to the original Occurrence. | |
| - **new_D (Detection):** This score MUST be lower than or equal to the original Detection (as the action makes the failure easier to detect). | |
| CONTEXT (Historical data and highly-rated user feedback): | |
| {context} | |
| QUESTION (The new failure and its current scores): | |
| {question} | |
| INSTRUCTIONS: | |
| Format your entire response as a single, valid JSON object with a key "recommendations" which is a list of 3 objects. | |
| Each object must have these exactly keys: "rank", "action", "action_details", "department", "ai_score", "new_S", "new_O", "new_D". | |
| - "rank": The rank of the recommendation (1, 2, 3). | |
| - "action": The most effective recommended action text. Focus on actions present in the highly-rated user feedback if applicable. | |
| - "action_details": 2-3 sentences explaining why this action works and practical implementation notes. | |
| - "department": The most likely responsible department. | |
| - "ai_score": Confidence score (1-100) for this recommendation. | |
| - "new_S": Your estimated new Severity score (1-10). Must be an integer. | |
| - "new_O": Your estimated new Occurrence score (1-10). Must be an integer. | |
| - "new_D": Your estimated new Detection score (1-10). Must be an integer. | |
| CRITICAL: Output ONLY the raw JSON object. Do not calculate the RPN. Do not include markdown formatting like ```json or any introductory text. | |
| """ | |
| PROMPT = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) | |
| LLM = llm | |
| QA_CHAIN = True | |
| print("β RAG model is ready.") | |
| return True | |
| except Exception as e: | |
| print(f"π΄ An error occurred during RAG setup: {e}") | |
| return False | |
| # Initialize RAG on startup | |
| build_rag_chain() | |
| class FMEARequest(BaseModel): | |
| mode: str | |
| effect: str | |
| cause: str | |
| severity: int | |
| occurrence: int | |
| detection: int | |
| async def get_recommendations(req: FMEARequest): | |
| if QA_CHAIN is None or LLM is None or PROMPT is None: | |
| raise HTTPException(status_code=500, detail="AI Model is not initialized or GROQ_API_KEY is missing.") | |
| # Refresh feedback DB on every request to ensure real-time learning | |
| build_feedback_db() | |
| query = ( | |
| f"For a failure with Failure Mode='{req.mode}', Effect='{req.effect}', and Cause='{req.cause}', " | |
| f"what are the top 3 most appropriate recommended actions? " | |
| f"The current scores are: Severity={req.severity}, Occurrence={req.occurrence}, Detection={req.detection}." | |
| ) | |
| if RETRIEVER is not None: | |
| docs = RETRIEVER.invoke(query) | |
| else: | |
| docs = keyword_retrieve_documents(f"{req.mode} {req.effect} {req.cause}", k=2) | |
| context_from_history = "\n---\n".join([doc.page_content for doc in docs]) | |
| context_from_feedback = "" | |
| if feedback_vector_store: | |
| feedback_docs = feedback_vector_store.similarity_search(query, k=3) | |
| if feedback_docs: | |
| feedback_actions = "\n".join([doc.page_content for doc in feedback_docs]) | |
| context_from_feedback = f"\n\n--- Highly-Rated Actions from Real-Time User Feedback Database ---\n{feedback_actions}" | |
| combined_context = f"--- Historical FMEA Entries ---\n{context_from_history}{context_from_feedback}" | |
| try: | |
| llm_input = PROMPT.format(context=combined_context, question=query) | |
| llm_response = LLM.invoke(llm_input) | |
| raw_output = str(getattr(llm_response, "content", llm_response)).strip() | |
| match = re.search(r'\{.*\}', raw_output, re.DOTALL) | |
| if match: | |
| json_text = match.group(0) | |
| else: | |
| json_text = raw_output.replace("```json", "").replace("```", "").strip() | |
| data = json.loads(json_text) | |
| # Fetch community feedback for these specific actions | |
| feedback_map = {} | |
| if supabase: | |
| try: | |
| actions = [r['action'] for r in data.get('recommendations', []) if 'action' in r] | |
| if actions: | |
| fb_res = supabase.table("fmea_feedback").select("action, rating").in_("action", actions).execute() | |
| for item in fb_res.data: | |
| feedback_map.setdefault(item['action'], []).append(item['rating']) | |
| except Exception as fb_err: | |
| print(f"Error fetching community feedback: {fb_err}") | |
| # Ensure correct types and math (Deterministic RPN Logic) | |
| for r in data.get('recommendations', []): | |
| if 'action_details' not in r: | |
| r['action_details'] = "No additional details provided." | |
| # Force types to integers | |
| r['new_S'] = int(r.get('new_S', req.severity)) | |
| r['new_O'] = int(r.get('new_O', req.occurrence)) | |
| r['new_D'] = int(r.get('new_D', req.detection)) | |
| # Deterministic math done by Python | |
| r['new_RPN'] = r['new_S'] * r['new_O'] * r['new_D'] | |
| # Attach real community feedback | |
| ratings = feedback_map.get(r.get('action'), []) | |
| if ratings: | |
| avg_r = sum(ratings) / len(ratings) | |
| # Assume ratings >= 7 are thumbs up, else thumbs down | |
| thumbs_up = sum(1 for x in ratings if x >= 7) | |
| thumbs_down = sum(1 for x in ratings if x < 7) | |
| r['avg_feedback'] = f"π {thumbs_up} | π {thumbs_down} ({avg_r:.1f}/10)" | |
| else: | |
| r['avg_feedback'] = "β" | |
| return {"recommendations": data.get("recommendations", [])} | |
| except Exception as e: | |
| print(f"Error parsing LLM output: {e}\nRaw Output was: {raw_output if 'raw_output' in locals() else 'None'}") | |
| raise HTTPException(status_code=500, detail=f"Could not parse AI response: {str(e)}") | |
| def health_check(): | |
| return {"status": "ok", "message": "Pangun ReliAI Backend is running."} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("backend_api:app", host="0.0.0.0", port=8000, reload=True) | |