Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import PyPDF2 | |
| import docx | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import streamlit as st | |
| import time | |
| from groq import Groq | |
| import re | |
| # Initialize embedding model | |
| embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # FAISS setup | |
| dimension = 384 # Dimension of 'all-MiniLM-L6-v2' embeddings | |
| index = faiss.IndexFlatL2(dimension) | |
| document_texts = [] # Store text corresponding to embeddings | |
| # Constants for file handling | |
| MAX_FILE_SIZE_MB = 100 # 100 MB | |
| MAX_NUM_FILES = 5 | |
| MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 | |
| # Set up the Groq API client directly with your API key | |
| api_key = "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" # Replace with your actual Groq API key | |
| client = Groq(api_key=api_key) | |
| # Function to get human-readable file size | |
| def get_human_readable_size(size_in_bytes): | |
| if size_in_bytes < 1024: | |
| return f"{size_in_bytes} Bytes" | |
| elif size_in_bytes < 1024 ** 2: | |
| return f"{size_in_bytes / 1024:.2f} KB" | |
| elif size_in_bytes < 1024 ** 3: | |
| return f"{size_in_bytes / (1024 ** 2):.2f} MB" | |
| else: | |
| return f"{size_in_bytes / (1024 ** 3):.2f} GB" | |
| # Function to extract text from uploaded files | |
| def extract_text_from_file(file): | |
| text = "" | |
| if file.name.endswith(".pdf"): | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| elif file.name.endswith(".csv"): | |
| df = pd.read_csv(file) | |
| text = "\n".join([" ".join(map(str, row)) for row in df.values]) | |
| elif file.name.endswith(".xlsx") or file.name.endswith(".xls"): | |
| df = pd.read_excel(file) | |
| text = "\n".join([" ".join(map(str, row)) for row in df.values]) | |
| elif file.name.endswith(".txt"): | |
| text = file.read().decode("utf-8") | |
| elif file.name.endswith(".docx"): | |
| doc = docx.Document(file) | |
| text = "\n".join([p.text for p in doc.paragraphs]) | |
| else: | |
| text = None | |
| return text | |
| # Function to split large text into smaller chunks | |
| def split_text_into_chunks(text, max_chunk_size=500): | |
| sentences = text.split(". ") | |
| chunks = [] | |
| chunk = [] | |
| current_size = 0 | |
| for sentence in sentences: | |
| sentence_size = len(sentence) | |
| if current_size + sentence_size <= max_chunk_size: | |
| chunk.append(sentence) | |
| current_size += sentence_size | |
| else: | |
| chunks.append(". ".join(chunk)) | |
| chunk = [sentence] | |
| current_size = sentence_size | |
| if chunk: | |
| chunks.append(". ".join(chunk)) | |
| return chunks | |
| # Function to add document text to FAISS index | |
| def add_to_index(text, index, document_texts): | |
| chunks = split_text_into_chunks(text) | |
| embeddings = embedding_model.encode(chunks, convert_to_numpy=True) | |
| index.add(embeddings) | |
| document_texts.extend(chunks) | |
| # Function to generate pre-questions based on the document | |
| def suggest_questions(text): | |
| # Example simple questions based on content type | |
| if len(text.split()) < 200: | |
| return [ | |
| "Can you summarize the main points?", | |
| "What is the main argument or conclusion?", | |
| "What is the purpose of this document?" | |
| ] | |
| else: | |
| return [ | |
| "What are the key takeaways from this document?", | |
| "Can you provide a summary of the main sections?", | |
| "What are the major findings or conclusions?" | |
| ] | |
| # Function to generate answer using Groq | |
| def generate_answer_with_groq(question, context): | |
| # Sending user input question to Groq for response | |
| chat_completion = client.chat.completions.create( | |
| messages=[{"role": "user", "content": f"Context: {context}\nQuestion: {question}"}], | |
| model="gemma2-9b-it", | |
| ) | |
| return chat_completion.choices[0].message.content | |
| # Function to validate user input (basic check for valid text) | |
| def is_valid_input(query): | |
| # Check if the input contains only alphabetic characters, spaces, or common punctuation | |
| # This heuristic helps detect typing errors or nonsensical queries | |
| query = query.strip() | |
| if not query: | |
| return False # Empty input is invalid | |
| # Regex to allow letters, spaces, and common punctuation | |
| pattern = r"^[A-Za-z0-9\s.,!?'-]*$" | |
| if re.match(pattern, query): | |
| return True | |
| return False | |
| # Handling user feedback | |
| def handle_feedback(feedback): | |
| if feedback: | |
| st.write("Thank you for your feedback!") | |
| # Streamlit UI | |
| st.title("Enhanced Document Q&A with RAG") | |
| st.sidebar.title("Tips for Better Experience") | |
| st.sidebar.write(""" | |
| 1. Maximum file size: 100 MB per file. | |
| 2. You can upload up to 5 files at a time. | |
| 3. Larger files may take longer to process. | |
| 4. Please break large files into smaller chunks if necessary. | |
| 5. Use the pre-generated questions to guide your inquiry. | |
| """) | |
| feedback = st.sidebar.text_area("Provide feedback to improve your experience:") | |
| # File uploader | |
| uploaded_files = st.file_uploader( | |
| "Upload documents (PDF, CSV, Excel, TXT, DOCX). Max size: 100 MB each.", | |
| type=["pdf", "csv", "xlsx", "xls", "txt", "docx"], | |
| accept_multiple_files=True, | |
| ) | |
| if uploaded_files: | |
| if len(uploaded_files) > MAX_NUM_FILES: | |
| st.error(f"Maximum {MAX_NUM_FILES} files can be uploaded at a time.") | |
| else: | |
| for file in uploaded_files: | |
| file_size = file.size | |
| human_readable_size = get_human_readable_size(file_size) | |
| st.write(f"File: {file.name} | Size: {human_readable_size}") | |
| if file_size > MAX_FILE_SIZE_BYTES: | |
| st.warning( | |
| f"File '{file.name}' exceeds the {MAX_FILE_SIZE_MB} MB limit. " | |
| "We will automatically break this file into smaller chunks." | |
| ) | |
| with st.spinner(f"Processing {file.name}..."): | |
| text = extract_text_from_file(file) | |
| if text: | |
| # Automatically break large file into chunks | |
| chunks = split_text_into_chunks(text) | |
| add_to_index(" ".join(chunks), index, document_texts) | |
| st.success(f"Processed {file.name}") | |
| else: | |
| st.error(f"Could not process {file.name}. Unsupported format.") | |
| else: | |
| st.warning("No documents uploaded yet. Please upload documents before asking questions.") | |
| # Display user feedback handling | |
| if feedback: | |
| handle_feedback(feedback) | |
| # Input for question | |
| query = st.text_input("Enter your question:") | |
| # If query is entered and documents are uploaded | |
| if query: | |
| if not document_texts: | |
| st.warning("Please upload and process documents before asking questions.") | |
| elif not is_valid_input(query): | |
| st.error("Please ask a relevant question.") | |
| else: | |
| # Use Groq to generate a response based on uploaded documents | |
| with st.spinner("Generating response..."): | |
| response = generate_answer_with_groq(query, " ".join(document_texts)) | |
| st.write("### Answer:") | |
| st.write(response) | |
| st.write("### Suggested Questions:") | |
| questions = suggest_questions(" ".join(document_texts)) # Generate based on full document content | |
| for question in questions: | |
| st.write(f"- {question}") | |
| # Instructions and reminders if not uploaded_files: | |
| if not uploaded_files: | |
| st.info("You haven't uploaded any documents yet. Please upload documents to start.") | |
| else: | |
| st.info("Enter a question to ask about the uploaded documents.") | |