import os import streamlit as st import pdfplumber from sentence_transformers import SentenceTransformer import faiss import numpy as np from groq import Groq # Set background image and customize colors background_image_url = "https://cdn.pixabay.com/photo/2016/06/02/02/33/triangles-1430105_1280.png" st.markdown( f""" """, unsafe_allow_html=True ) # Use your Groq API key from Hugging Face Secrets HUGGINGFACE_KEY = os.getenv("HUGGINGFACE_KEY") if not HUGGINGFACE_KEY: st.error("Groq API key not found. Please set it in Hugging Face Secrets.") # Initialize Groq client with the correct API key groq_client = Groq(api_key=HUGGINGFACE_KEY) # Load the SentenceTransformer model for embedding generation embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') # Define file path and vector store folder file_path = "The Rise of Agentic AI.pdf" # File directly in the root directory of the app VECTORSTORE_FOLDER = "vectorstore" # Folder where the FAISS index will be stored # Ensure the vector store folder exists if not os.path.exists(VECTORSTORE_FOLDER): os.makedirs(VECTORSTORE_FOLDER) # Define the vector store path vectorstore_path = os.path.join(VECTORSTORE_FOLDER, "index.faiss") # Correct path to the index file # Load or create FAISS index if os.path.exists(vectorstore_path): # If the index file exists, read it try: index = faiss.read_index(vectorstore_path) except Exception as e: st.error(f"Error reading the FAISS index: {e}") index = faiss.IndexFlatL2(embedder.get_sentence_embedding_dimension()) else: # If the index file doesn't exist, create a new one index = faiss.IndexFlatL2(embedder.get_sentence_embedding_dimension()) # Variable to hold chunks globally chunks = [] # Function to load text from PDF def load_pdf_text(file_path): """Extract text from the given PDF file.""" text = "" with pdfplumber.open(file_path) as pdf: for page in pdf.pages: text += page.extract_text() return text # Function to chunk text into smaller pieces def chunk_text(text, chunk_size=500, overlap=100): """Chunk the text into overlapping chunks.""" chunks = [] for i in range(0, len(text), chunk_size - overlap): chunks.append(text[i:i + chunk_size]) return chunks # Process the document and update vector store def process_and_store_document(file_path): """Process the PDF document, chunk text, generate embeddings, and store them in FAISS.""" global chunks # Make chunks global to access in the query part st.info("Processing PDF document...") # Extract text from the PDF file text = load_pdf_text(file_path) # Chunk the text into smaller pieces chunks = chunk_text(text) # Generate embeddings for each chunk embeddings = embedder.encode(chunks, show_progress_bar=True) # Add the embeddings to the FAISS index index.add(np.array(embeddings)) # Save the updated FAISS index try: faiss.write_index(index, vectorstore_path) st.success("Document processed and vector store updated!") except Exception as e: st.error(f"Error saving the FAISS index: {e}") # User interface for Streamlit st.title("The Rise of Agentic AI RAG Application") # Button to trigger document processing if st.button("Process PDF"): process_and_store_document(file_path) # Query input for the user user_query = st.text_input("Enter your query:", key="query_input") if user_query: # Check if there are any chunks in the index if not chunks: st.error("Please process the document first by clicking 'Process PDF'.") else: # Generate embedding for the user query query_embedding = embedder.encode([user_query]) # Perform the search on the FAISS index distances, indices = index.search(np.array(query_embedding), k=5) # Check if the indices returned are valid if indices.size == 0 or np.any(indices[0] == -1): st.error("No relevant results found in the index.") else: # Ensure indices are within the bounds of the chunks list valid_indices = [idx for idx in indices[0] if idx < len(chunks)] if not valid_indices: st.error("No valid indices found for the retrieved chunks.") else: # Retrieve the most relevant chunks based on the valid indices retrieved_chunks = [chunks[idx] for idx in valid_indices] # Combine the retrieved chunks with the query and generate a response using Groq combined_input = " ".join(retrieved_chunks) + user_query # Generate a response with Groq try: chat_completion = groq_client.chat.completions.create( messages=[{ "role": "user", "content": combined_input, }], model="llama3-8b-8192", # Specify the model you want to use ) # Display only the generated response st.subheader("Generated Response") st.write(chat_completion.choices[0].message.content) except Exception as e: st.error(f"Error generating response: {e}") # Footer st.markdown("
", unsafe_allow_html=True)