Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from langchain.chains import LLMChain | |
| from langchain.prompts import PromptTemplate | |
| from langchain_community.llms import HuggingFaceEndpoint | |
| import fitz # PyMuPDF | |
| import pytesseract | |
| from PIL import Image | |
| import io | |
| import re | |
| import numpy as np | |
| import boto3 | |
| from typing import List | |
| from sentence_transformers import SentenceTransformer | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import SentenceTransformerEmbeddings | |
| import os | |
| # AWS access credentials | |
| access_key = os.getenv("access_key") | |
| secret_key = os.getenv("secret_key") | |
| # S3 bucket details | |
| bucket_name = os.getenv("bucket_name") | |
| prefix = os.getenv("prefix") | |
| HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| def extract_text_from_pdf(pdf_content): | |
| """Extract text from PDF content using OCR.""" | |
| try: | |
| doc = fitz.open(stream=pdf_content, filetype="pdf") | |
| text = "" | |
| for page in doc: | |
| pix = page.get_pixmap() | |
| img = Image.open(io.BytesIO(pix.tobytes())) | |
| text += pytesseract.image_to_string(img) | |
| return text | |
| except Exception as e: | |
| print("Failed to extract text from PDF:", e) | |
| return "" | |
| def preprocess_text(text): | |
| """Preprocess text by cleaning and normalizing.""" | |
| try: | |
| text = text.replace('\n', ' ').replace('\r', ' ') | |
| text = re.sub(r'[^\x00-\x7F]+', ' ', text) | |
| text = text.lower() | |
| text = re.sub(r'[^\w\s]', '', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| except Exception as e: | |
| print("Failed to preprocess text:", e) | |
| return "" | |
| def process_files(file_contents: List[bytes]): | |
| """Process and combine text from multiple files.""" | |
| all_text = "" | |
| for file_content in file_contents: | |
| extracted_text = extract_text_from_pdf(file_content) | |
| preprocessed_text = preprocess_text(extracted_text) | |
| all_text += preprocessed_text + " " | |
| return all_text | |
| def compute_cosine_similarity_scores(query, retrieved_docs): | |
| """Compute cosine similarity scores between a query and retrieved documents.""" | |
| model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") | |
| query_embedding = model.encode(query, convert_to_tensor=True) | |
| doc_embeddings = model.encode(retrieved_docs, convert_to_tensor=True) | |
| cosine_scores = np.dot(doc_embeddings.cpu(), query_embedding.cpu().T) | |
| readable_scores = [{"doc": doc, "score": float(score)} for doc, score in zip(retrieved_docs, cosine_scores.flatten())] | |
| return readable_scores | |
| def fetch_files_from_s3(): | |
| """Fetch files from an S3 bucket.""" | |
| s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) | |
| objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) | |
| file_contents = [] | |
| for obj in objects.get('Contents', []): | |
| if not obj['Key'].endswith('/'): # Skip directories | |
| response = s3.get_object(Bucket=bucket_name, Key=obj['Key']) | |
| file_content = response['Body'].read() | |
| file_contents.append(file_content) | |
| return file_contents | |
| def create_vector_store(all_text): | |
| """Create a vector store for similarity-based searching.""" | |
| embeddings = SentenceTransformerEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| texts = text_splitter.split_text(all_text) | |
| if not texts: | |
| print("No text chunks created.") | |
| return None | |
| vector_store = Chroma.from_texts(texts, embeddings, collection_metadata={"hnsw:space": "cosine"}, persist_directory="stores/insurance_cosine") | |
| print("Vector DB Successfully Created!") | |
| return vector_store | |
| def load_vector_store(): | |
| """Load the vector store from the persistent directory.""" | |
| embeddings = SentenceTransformerEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
| try: | |
| db = Chroma(persist_directory="stores/insurance_cosine", embedding_function=embeddings) | |
| print("Vector DB Successfully Loaded!") | |
| return db | |
| except Exception as e: | |
| print("Failed to load Vector DB:", e) | |
| return None | |
| def answer_query_with_similarity(query): | |
| """Answer a query by finding similar documents and generating responses using a language model.""" | |
| try: | |
| # Load the vector store | |
| vector_store = load_vector_store() | |
| # If vector store doesn't exist, fetch files from S3, process them, and create the vector store | |
| if not vector_store: | |
| file_contents = fetch_files_from_s3() | |
| if not file_contents: | |
| print("No files fetched from S3.") | |
| return None | |
| all_text = process_files(file_contents) | |
| if not all_text.strip(): | |
| print("No text extracted from documents.") | |
| return None | |
| vector_store = create_vector_store(all_text) | |
| if not vector_store: | |
| print("Failed to create Vector DB.") | |
| return None | |
| # Perform similarity search | |
| docs = vector_store.similarity_search(query) | |
| print(f"\n\nDocuments retrieved: {len(docs)}") | |
| if not docs: | |
| print("No documents match the query.") | |
| return None | |
| docs_content = [doc.page_content for doc in docs] | |
| # Compute cosine similarity scores | |
| cosine_similarity_scores = compute_cosine_similarity_scores(query, docs_content) | |
| all_docs_content = " ".join(docs_content) | |
| # Generate response using a language model | |
| template = """ | |
| ### [INST] Instruction: | |
| You are an AI assistant named Goose. Your purpose is to provide accurate, relevant, and helpful information to users in a friendly, warm, and supportive manner, similar to ChatGPT. When responding to queries, please keep the following guidelines in mind: | |
| - When someone says hi, or small talk, only respond in a sentence. | |
| - Retrieve relevant information from your knowledge base to formulate accurate and informative responses. | |
| - Always maintain a positive, friendly, and encouraging tone in your interactions with users. | |
| - Strictly write crisp and clear answers, don't write unnecessary stuff. | |
| - Only answer the asked question, don't hallucinate or print any pre-information. | |
| - After providing the answer, always ask for any other help needed in the next paragraph. | |
| - Writing in bullet format is our top preference. | |
| Remember, your goal is to be a reliable, friendly, and supportive AI assistant that provides accurate information while creating a positive user experience, just like ChatGPT. Adapt your communication style to best suit each user's needs and preferences. | |
| ### Docs: {docs} | |
| ### Question: {question} | |
| """ | |
| prompt = PromptTemplate.from_template(template.format(docs=all_docs_content, question=query)) | |
| repo_id = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| llm = HuggingFaceEndpoint( | |
| repo_id=repo_id, | |
| temperature=0.1, | |
| model_kwargs={'token': HUGGINGFACEHUB_API_TOKEN}, | |
| top_p=0.15, | |
| max_new_tokens=256, | |
| repetition_penalty=1.1 | |
| ) | |
| llm_chain = LLMChain(prompt=prompt, llm=llm) | |
| answer = llm_chain.run(question=query).strip() | |
| print(f"\n\nAnswer: {answer}") | |
| return answer | |
| except Exception as e: | |
| print("An error occurred while getting the answer: ", str(e)) | |
| return None | |
| def gradio_interface(query): | |
| return answer_query_with_similarity(query) | |
| interface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.Textbox(lines=2, placeholder="Enter your query here..."), | |
| outputs="text", | |
| title="Document Query App" | |
| ) | |
| if __name__ == "__main__": | |
| interface.launch() |