Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_ollama import ChatOllama | |
| from langchain.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.retrievers import BaseRetriever | |
| from langchain_core.runnables import Runnable | |
| from langchain_core.documents import Document | |
| from langchain_core.embeddings import Embeddings | |
| import chromadb | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import pandas as pd | |
| from typing import Optional, List | |
| import re | |
| import torch | |
| import subprocess | |
| # Load tokenizer and model separately to configure properly | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| # OPTION 1: Use Hugging Face Pipeline (Recommended for HF Spaces) | |
| from transformers import pipeline | |
| from langchain.llms import HuggingFacePipeline | |
| # Disable ChromaDB telemetry to avoid the error | |
| os.environ["ANONYMIZED_TELEMETRY"] = "False" | |
| os.environ["CHROMA_SERVER_HOST"] = "localhost" | |
| os.environ["CHROMA_SERVER_HTTP_PORT"] = "8000" | |
| class ImprovedTFIDFEmbeddings(Embeddings): | |
| """Improved TF-IDF based embedding function with better preprocessing.""" | |
| def __init__(self): | |
| self.vectorizer = TfidfVectorizer( | |
| max_features=5000, | |
| stop_words='english', | |
| ngram_range=(1, 3), | |
| min_df=1, | |
| max_df=0.85, | |
| lowercase=True, | |
| strip_accents='unicode', | |
| analyzer='word' | |
| ) | |
| self.fitted = False | |
| self.documents = [] | |
| def embed_documents(self, texts): | |
| """Create embeddings for a list of texts.""" | |
| if not self.fitted: | |
| self.documents = texts | |
| self.vectorizer.fit(texts) | |
| self.fitted = True | |
| # Transform texts to TF-IDF vectors | |
| tfidf_matrix = self.vectorizer.transform(texts) | |
| # Convert to dense arrays and normalize | |
| embeddings = [] | |
| for i in range(tfidf_matrix.shape[0]): | |
| embedding = tfidf_matrix[i].toarray().flatten() | |
| # Normalize the embedding | |
| norm = np.linalg.norm(embedding) | |
| if norm > 0: | |
| embedding = embedding / norm | |
| # Pad or truncate to 512 dimensions | |
| if len(embedding) < 512: | |
| embedding = np.pad(embedding, (0, 512 - len(embedding))) | |
| else: | |
| embedding = embedding[:512] | |
| embeddings.append(embedding.tolist()) | |
| return embeddings | |
| def embed_query(self, text): | |
| """Create embedding for a single query text.""" | |
| if not self.fitted: | |
| # If not fitted, fit with just this text | |
| self.vectorizer.fit([text]) | |
| self.fitted = True | |
| # Transform query to TF-IDF vector | |
| tfidf_matrix = self.vectorizer.transform([text]) | |
| embedding = tfidf_matrix[0].toarray().flatten() | |
| # Normalize the embedding | |
| norm = np.linalg.norm(embedding) | |
| if norm > 0: | |
| embedding = embedding / norm | |
| # Pad or truncate to 512 dimensions | |
| if len(embedding) < 512: | |
| embedding = np.pad(embedding, (0, 512 - len(embedding))) | |
| else: | |
| embedding = embedding[:512] | |
| return embedding.tolist() | |
| class SmartFAQRetriever(BaseRetriever): | |
| """Smart retriever optimized for FAQ datasets with semantic similarity.""" | |
| def __init__(self, documents: List[Document], k: int = 4): | |
| super().__init__() | |
| self._documents = documents | |
| self._k = k | |
| self._vectorizer = None # Use private attribute | |
| def documents(self): | |
| return self._documents | |
| def k(self): | |
| return self._k | |
| def get_documents_with_confidence(self, query: str) -> List[dict]: | |
| """Return top documents and their confidence (similarity) scores.""" | |
| results = self._get_relevant_documents_with_scores(query) | |
| return [{"document": doc.page_content, "confidence": round(score, 3)} for doc, score in results] | |
| def _get_relevant_documents_with_scores(self, query: str) -> List[tuple[Document, float]]: | |
| """Retrieve documents along with similarity scores.""" | |
| if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_: | |
| self._vectorizer = TfidfVectorizer( | |
| max_features=3000, | |
| stop_words='english', | |
| ngram_range=(1, 2), | |
| min_df=1, | |
| max_df=0.9 | |
| ) | |
| questions = [doc.page_content.split("ANSWER:")[0].replace("QUESTION:", "").strip() | |
| if "QUESTION:" in doc.page_content else doc.page_content | |
| for doc in self._documents] | |
| self._vectorizer.fit(questions) | |
| query_vector = self._vectorizer.transform([query.lower().strip()]) | |
| question_texts = [doc.page_content.split("ANSWER:")[0].replace("QUESTION:", "").strip() | |
| if "QUESTION:" in doc.page_content else doc.page_content | |
| for doc in self._documents] | |
| question_vectors = self._vectorizer.transform(question_texts) | |
| similarities = cosine_similarity(query_vector, question_vectors).flatten() | |
| top_indices = similarities.argsort()[-self._k:][::-1] | |
| return [(self._documents[i], float(similarities[i])) for i in top_indices if similarities[i] > 0.1] | |
| def _get_relevant_documents(self, query: str) -> List[Document]: | |
| """Retrieve documents based on semantic similarity.""" | |
| # Ensure vectorizer is fitted | |
| if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_: | |
| print("[SmartFAQRetriever] Fitting vectorizer...") | |
| self._vectorizer = TfidfVectorizer( | |
| max_features=3000, | |
| stop_words='english', | |
| ngram_range=(1, 2), | |
| min_df=1, | |
| max_df=0.9 | |
| ) | |
| questions = [] | |
| for doc in self._documents: | |
| if "QUESTION:" in doc.page_content: | |
| question_part = doc.page_content.split("ANSWER:")[0] | |
| question = question_part.replace("QUESTION:", "").strip() | |
| questions.append(question) | |
| else: | |
| questions.append(doc.page_content) | |
| self._vectorizer.fit(questions) | |
| query_lower = query.lower().strip() | |
| # Extract questions from documents | |
| questions = [] | |
| for doc in self._documents: | |
| if "QUESTION:" in doc.page_content: | |
| question_part = doc.page_content.split("ANSWER:")[0] | |
| question = question_part.replace("QUESTION:", "").strip() | |
| questions.append(question) | |
| else: | |
| questions.append(doc.page_content) | |
| # Transform query and questions to TF-IDF vectors | |
| query_vector = self._vectorizer.transform([query_lower]) | |
| question_vectors = self._vectorizer.transform(questions) | |
| # Calculate cosine similarities | |
| similarities = cosine_similarity(query_vector, question_vectors).flatten() | |
| # Get top k documents | |
| top_indices = similarities.argsort()[-self._k:][::-1] | |
| # Return documents with highest similarity scores | |
| relevant_docs = [self._documents[i] for i in top_indices if similarities[i] > 0.1] | |
| if not relevant_docs: | |
| # Fallback to first k documents if no good matches | |
| relevant_docs = self._documents[:self._k] | |
| return relevant_docs | |
| async def _aget_relevant_documents(self, query: str) -> List[Document]: | |
| """Async version of get_relevant_documents.""" | |
| return self._get_relevant_documents(query) | |
| def setup_retriever(use_kaggle_data: bool = False, kaggle_dataset: Optional[str] = None, | |
| kaggle_username: Optional[str] = None, kaggle_key: Optional[str] = None, | |
| use_local_mental_health_data: bool = False) -> BaseRetriever: | |
| """ | |
| Creates a vector store with documents from test data, Kaggle datasets, or local mental health data. | |
| Args: | |
| use_kaggle_data: Whether to load Kaggle data instead of test documents | |
| kaggle_dataset: Kaggle dataset name (e.g., 'username/dataset-name') | |
| kaggle_username: Your Kaggle username (optional if using kaggle.json) | |
| kaggle_key: Your Kaggle API key (optional if using kaggle.json) | |
| use_local_mental_health_data: Whether to load local mental health FAQ data | |
| """ | |
| print("Setting up the retriever...") | |
| if use_local_mental_health_data: | |
| try: | |
| print("Loading mental health FAQ data from local file...") | |
| mental_health_file = "data/Mental_Health_FAQ.csv" | |
| if not os.path.exists(mental_health_file): | |
| print(f"Mental health FAQ file not found: {mental_health_file}") | |
| use_local_mental_health_data = False | |
| else: | |
| # Load mental health FAQ data | |
| df = pd.read_csv(mental_health_file) | |
| documents = [] | |
| for _, row in df.iterrows(): | |
| question = row['Questions'] | |
| answer = row['Answers'] | |
| # Create document in FAQ format | |
| content = f"QUESTION: {question}\nANSWER: {answer}" | |
| documents.append(Document(page_content=content)) | |
| print(f"Loaded {len(documents)} mental health FAQ documents") | |
| for i, doc in enumerate(documents[:3]): | |
| print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...") | |
| except Exception as e: | |
| print(f"Error loading mental health data: {e}") | |
| use_local_mental_health_data = False | |
| if use_kaggle_data and kaggle_dataset: | |
| try: | |
| from src.kaggle_loader import KaggleDataLoader | |
| print(f"Loading Kaggle dataset: {kaggle_dataset}") | |
| # Create loader without parameters - it will auto-load from kaggle.json | |
| loader = KaggleDataLoader() | |
| # Download the dataset | |
| dataset_path = loader.download_dataset(kaggle_dataset) | |
| # Load documents based on file type - only process files from this specific dataset | |
| documents = [] | |
| # Get the dataset name to identify the correct files | |
| dataset_name = kaggle_dataset.split('/')[-1] | |
| print(f"Processing files in dataset directory: {dataset_path}") | |
| for file in os.listdir(dataset_path): | |
| file_path = os.path.join(dataset_path, file) | |
| if file.endswith('.csv'): | |
| print(f"Loading CSV file: {file}") | |
| # For FAQ datasets, use the improved loading method | |
| if 'faq' in file.lower() or 'mental' in file.lower(): | |
| documents.extend(loader.load_csv_dataset(file_path, [], chunk_size=50)) | |
| else: | |
| # For other CSV files, use first few columns as text | |
| df = pd.read_csv(file_path) | |
| text_columns = df.columns[:3].tolist() # Use first 3 columns | |
| documents.extend(loader.load_csv_dataset(file_path, text_columns, chunk_size=50)) | |
| elif file.endswith('.json'): | |
| print(f"Loading JSON file: {file}") | |
| documents.extend(loader.load_json_dataset(file_path)) | |
| elif file.endswith('.txt'): | |
| print(f"Loading text file: {file}") | |
| documents.extend(loader.load_text_dataset(file_path)) | |
| print(f"Loaded {len(documents)} documents from Kaggle dataset") | |
| for i, doc in enumerate(documents[:3]): | |
| print(f"Sample doc {i+1}: {doc.page_content[:200]}") | |
| except Exception as e: | |
| print(f"Error loading Kaggle data: {e}") | |
| print("Falling back to test documents...") | |
| use_kaggle_data = False | |
| if not use_kaggle_data and not use_local_mental_health_data: | |
| # No test documents - use mental health data as default | |
| print("No specific data source specified, loading mental health FAQ data as default...") | |
| try: | |
| mental_health_file = "data/Mental_Health_FAQ.csv" | |
| if not os.path.exists(mental_health_file): | |
| raise FileNotFoundError(f"Mental health FAQ file not found: {mental_health_file}") | |
| # Load mental health FAQ data | |
| df = pd.read_csv(mental_health_file) | |
| documents = [] | |
| for _, row in df.iterrows(): | |
| question = row['Questions'] | |
| answer = row['Answers'] | |
| # Create document in FAQ format | |
| content = f"QUESTION: {question}\nANSWER: {answer}" | |
| documents.append(Document(page_content=content)) | |
| print(f"Loaded {len(documents)} mental health FAQ documents") | |
| for i, doc in enumerate(documents[:3]): | |
| print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...") | |
| except Exception as e: | |
| print(f"Error loading mental health data: {e}") | |
| raise Exception("No valid data source available. Please ensure mental health FAQ data is present or provide Kaggle credentials.") | |
| print("Creating TF-IDF embeddings...") | |
| embeddings = ImprovedTFIDFEmbeddings() | |
| print("Creating ChromaDB vector store...") | |
| client = chromadb.PersistentClient(path="./tmp/chroma_db") | |
| # Clear existing collections to prevent mixing old and new data | |
| try: | |
| collections = client.list_collections() | |
| for collection in collections: | |
| print(f"Deleting existing collection: {collection.name}") | |
| client.delete_collection(collection.name) | |
| except Exception as e: | |
| print(f"Warning: Could not clear existing collections: {e}") | |
| print(f"Processing {len(documents)} documents...") | |
| # Check if this is a FAQ dataset and use smart retriever | |
| if any("QUESTION:" in doc.page_content for doc in documents): | |
| print("Using SmartFAQRetriever for better semantic matching...") | |
| return SmartFAQRetriever(documents, k=4) | |
| else: | |
| # Use vector store for non-FAQ datasets | |
| vectorstore = Chroma.from_documents( | |
| documents=documents, | |
| embedding=embeddings, | |
| client=client | |
| ) | |
| print("Retriever setup complete.") | |
| return vectorstore.as_retriever(k=4) | |
| # def setup_rag_chain() -> Runnable: | |
| # """Sets up the RAG chain with a prompt template and an LLM.""" | |
| # # Define the prompt template for the LLM | |
| # prompt = PromptTemplate( | |
| # template="""You are an assistant for question-answering tasks. | |
| # Use the following documents to answer the question. | |
| # If you don't know the answer, just say that you don't know. | |
| # Use three sentences maximum and keep the answer concise: | |
| # Question: {question} | |
| # Documents: {documents} | |
| # Answer: | |
| # """, | |
| # input_variables=["question", "documents"], | |
| # ) | |
| # # Initialize the LLM with dolphin-llama3:8b model | |
| # # Note: This requires the Ollama server to be running with the specified model | |
| # llm = ChatOllama( | |
| # model="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", | |
| # temperature=0, | |
| # ) | |
| # # Create a chain combining the prompt template and LLM | |
| # return prompt | llm | StrOutputParser() | |
| def setup_rag_chain() -> Runnable: | |
| """Sets up the RAG chain with a prompt template and an LLM.""" | |
| # Define the prompt template for the LLM | |
| prompt = PromptTemplate( | |
| template="""Context: You are a medical information assistant that answers health questions using verified medical documents. | |
| Primary Task: Answer the medical question using ONLY the provided documents. | |
| Instructions: | |
| 1. For medical questions: Provide a clear, accurate answer based solely on the document content | |
| 2. If documents lack sufficient information: "I don't have enough information in the provided documents to answer this question" | |
| 3. For non-medical questions: "I specialize in medical information. Please ask a health-related question." | |
| 4. For identity questions: "I am a medical information assistant designed to help answer health-related questions based on verified medical documents." | |
| 5. Always use patient-friendly language | |
| 6. Keep responses 2-4 sentences maximum | |
| 7. For serious symptoms, recommend consulting healthcare professionals | |
| Documents: {documents} | |
| Question: {question} | |
| Medical Answer:""", | |
| input_variables=["question", "documents"], | |
| ) | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM3-3B") | |
| model = AutoModelForCausalLM.from_pretrained( | |
| "HuggingFaceTB/SmolLM3-3B", | |
| device_map="auto", | |
| torch_dtype=torch.float16 | |
| ) | |
| # Fix the tokenizer configuration properly | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| print(f"Tokenizer pad_token_id: {tokenizer.pad_token_id}") | |
| print(f"Tokenizer eos_token_id: {tokenizer.eos_token_id}") | |
| # Initialize pipeline with correct token IDs from tokenizer | |
| hf_pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=50, # Start small for testing | |
| temperature=0.2, | |
| return_full_text=False, | |
| do_sample=True, | |
| # Use actual tokenizer token IDs, not hardcoded values | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=tokenizer.eos_token_id, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| # Test the pipeline with a simple input | |
| test_input = "What is diabetes?" | |
| print(f"Testing pipeline with: {test_input}") | |
| test_result = hf_pipeline(test_input) | |
| print(f"Pipeline test successful: {test_result}") | |
| except Exception as e: | |
| print(f"Error setting up BioGPT: {e}") | |
| print("Falling back to DistilGPT-2...") | |
| # Fallback to a more stable model | |
| hf_pipeline = pipeline( | |
| "text-generation", | |
| model="distilgpt2", | |
| max_new_tokens=50, | |
| temperature=0.2, | |
| return_full_text=False, | |
| do_sample=True, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| # Test the fallback pipeline | |
| test_input = "What is diabetes?" | |
| print(f"Testing fallback pipeline with: {test_input}") | |
| test_result = hf_pipeline(test_input) | |
| print(f"Fallback pipeline test successful: {test_result}") | |
| # Wrap it in LangChain | |
| llm = HuggingFacePipeline(pipeline=hf_pipeline) | |
| # Create a chain combining the prompt template and LLM | |
| return prompt | llm | StrOutputParser() | |
| # Also update the RAG application class with better error handling | |
| class RAGApplication: | |
| def __init__(self, retriever: BaseRetriever, rag_chain: Runnable): | |
| self.retriever = retriever | |
| self.rag_chain = rag_chain | |
| # def run(self, question: str) -> str: | |
| # """Runs the RAG pipeline for a given question.""" | |
| # try: | |
| # # Input validation | |
| # if not question or not question.strip(): | |
| # return "Please provide a valid question." | |
| # question = question.strip() | |
| # print(f"\nProcessing question: '{question}'") | |
| # # Retrieve relevant documents | |
| # documents = self.retriever.invoke(question) | |
| # # Debug: Print retrieved documents | |
| # print(f"DEBUG: Retrieved {len(documents)} documents") | |
| # for i, doc in enumerate(documents): | |
| # print(f"DEBUG: Document {i+1}: {doc.page_content[:200]}...") | |
| # # Extract content from retrieved documents | |
| # doc_texts = "\n\n".join([doc.page_content for doc in documents]) | |
| # # Limit the total input length to prevent token overflow | |
| # max_input_length = 500 # Conservative limit | |
| # if len(doc_texts) > max_input_length: | |
| # doc_texts = doc_texts[:max_input_length] + "..." | |
| # print(f"DEBUG: Truncated document text to {max_input_length} characters") | |
| # print(f"DEBUG: Combined document text length: {len(doc_texts)}") | |
| # # Get the answer from the language model | |
| # print("DEBUG: Calling language model...") | |
| # answer = self.rag_chain.invoke({"question": question, "documents": doc_texts}) | |
| # print(f"DEBUG: Language model response: {answer}") | |
| # return answer | |
| # except Exception as e: | |
| # print(f"Error in RAG application: {str(e)}") | |
| # import traceback | |
| # traceback.print_exc() | |
| # return f"I apologize, but I encountered an error processing your question: {str(e)}. Please try rephrasing it or ask a different question." | |
| def run(self, question: str) -> str: | |
| try: | |
| if not question.strip(): | |
| return "Please provide a valid question." | |
| print(f"\nProcessing question: '{question}'") | |
| if hasattr(self.retriever, "get_documents_with_confidence"): | |
| docs_with_scores = self.retriever.get_documents_with_confidence(question) | |
| documents = [Document(page_content=d["document"]) for d in docs_with_scores] | |
| confidence_info = "\n".join([f"- Score: {d['confidence']}, Snippet: {d['document'][:100]}..." for d in docs_with_scores]) | |
| else: | |
| documents = self.retriever.invoke(question) | |
| confidence_info = "Confidence scoring not available." | |
| print(f"Retrieved {len(documents)} documents") | |
| print(confidence_info) | |
| doc_texts = "\n\n".join([doc.page_content for doc in documents]) | |
| if len(doc_texts) > 500: | |
| doc_texts = doc_texts[:500] + "..." | |
| answer = self.rag_chain.invoke({"question": question, "documents": doc_texts}) | |
| # Append confidence footer | |
| footer = "\n\n(Note: This answer is based on documents with confidence scores. Review full context if critical.)" | |
| return answer.strip() + footer | |
| except Exception as e: | |
| print(f"Error in RAG application: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"I apologize, but I encountered an error processing your question: {str(e)}. Please try rephrasing it or ask a different question." | |
| # Main execution block | |
| if __name__ == "__main__": | |
| load_dotenv() | |
| # 1. Setup the components | |
| retriever = setup_retriever() | |
| rag_chain = setup_rag_chain() | |
| # 2. Initialize the RAG application | |
| rag_application = RAGApplication(retriever, rag_chain) | |
| # 3. Run an example query | |
| question = "What is terminal illness?" | |
| print("\n--- Running RAG Application ---") | |
| print(f"Question: {question}") | |
| answer = rag_application.run(question) | |
| print(f"Answer: {answer}") | |