Spaces:
Running
Running
| import streamlit as st | |
| import os | |
| from src.functions_pdf import pdfminer_pdf_to_text | |
| from src.functions_langchain import chunk_and_embed_pdf_text | |
| from src.functions_langchain import InMemoryVectorStore, graph_init, embeddings | |
| from src.functions_langchain import State, generate | |
| # https://aws.amazon.com/what-is/retrieval-augmented-generation/ | |
| # https://medium.com/@dminhk/retrieval-augmented-generation-rag-explained-b1dd89979681 | |
| # https://huggingface.co/transformers/model_doc/rag.html | |
| # https://huggingface.co/transformers/model_doc/rag-tokenizer.html | |
| # (BM25, Dense Passage Retrieval or Sentence Transformers). - need to find a tools for this | |
| # PostgreSQL or MongoDB - need to find a tools for this ( should be vectorial database) for the future use in semantic search | |
| # Testing API of indeed, linkedin, pole emploi | |
| # Testing API of huggingface | |
| ################################################################################ | |
| # Sidebar | |
| st.sidebar.title("App Parameters") | |
| chunk_size = st.sidebar.slider("Chunk Size", 100, 2000, 1000) | |
| chunk_overlap = st.sidebar.slider("Chunk Overlap", 0, 500, 100) | |
| # Main title | |
| st.title("RAG chat with PDF") | |
| st.divider() | |
| file = st.file_uploader("Upload a PDF file", type=["pdf"]) | |
| tab1, tab2 = st.tabs(["RAG", "Debugging"]) | |
| def save_uploaded_file(uploaded_file): | |
| path = "temp_uploaded_file.pdf" | |
| with open(path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| return path | |
| def load_and_extract_text(pdf_path): | |
| text = pdfminer_pdf_to_text(pdf_path) | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| return text | |
| def init_vector_store_and_graph(pdf_text, chunk_size, chunk_overlap): | |
| chunks, _ = chunk_and_embed_pdf_text(pdf_text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| vector_store = InMemoryVectorStore(embeddings) | |
| vector_store.add_texts(chunks) | |
| graph = graph_init(vector_store) | |
| return vector_store, graph, chunks | |
| # main tab | |
| with tab1: | |
| if file is not None: | |
| if "pdf_path" not in st.session_state or st.session_state["pdf_path"] != file.name: | |
| st.session_state["pdf_path"] = file.name | |
| st.session_state["temp_pdf_path"] = save_uploaded_file(file) | |
| st.session_state["pdf_text"] = None | |
| st.session_state["vector_store"] = None | |
| st.session_state["graph"] = None | |
| st.session_state["chunks"] = None | |
| st.session_state["state"] = None | |
| if st.button("Launch app"): | |
| with st.spinner("Extracting and processing PDF..."): | |
| text = load_and_extract_text(st.session_state["temp_pdf_path"]) | |
| if not text: | |
| st.warning("No text extracted from PDF.") | |
| else: | |
| st.session_state["pdf_text"] = text | |
| vector_store, graph, chunks = init_vector_store_and_graph(text, chunk_size, chunk_overlap) | |
| st.session_state["vector_store"] = vector_store | |
| st.session_state["graph"] = graph | |
| st.session_state["chunks"] = chunks | |
| st.success(f"Processed PDF with {len(chunks)} chunks.") | |
| if "graph" in st.session_state and st.session_state["graph"] is not None: | |
| query = st.text_input("Ask a question about the PDF:", key="query_tab1") | |
| if query: | |
| state = State(question=query, context=[], answer="") | |
| st.session_state["state"] = state | |
| with st.spinner("Retrieving context and generating answer..."): | |
| result_state = st.session_state["graph"].invoke(state) | |
| st.session_state["state"] = result_state | |
| if result_state.get("context"): | |
| st.success(f"Retrieved {len(result_state['context'])} relevant documents.") | |
| st.markdown("### Answer:") | |
| st.write(result_state.get("answer", "No answer generated.")) | |
| else: | |
| st.warning("No relevant context found for the question.") | |
| # Debugging tab | |
| with tab2: | |
| if file is not None: | |
| st.info(f"Uploaded file: **{file.name}** ({file.size / 1024:.2f} KB)") | |
| if st.button("Extract Text"): | |
| temp_pdf_path = save_uploaded_file(file) | |
| text = load_and_extract_text(temp_pdf_path) | |
| if text: | |
| st.success("Text extracted successfully!") | |
| st.session_state["pdf_text"] = text | |
| st.text_area("Extracted Text", text, height=300) | |
| st.download_button("Download Extracted Text", text, "extracted_text.txt", "text/plain") | |
| else: | |
| st.warning("No text extracted. Please check the PDF.") | |
| if "pdf_text" in st.session_state and st.session_state["pdf_text"]: | |
| if st.button("Process and Embed Text"): | |
| with st.spinner("Chunking and embedding text..."): | |
| vector_store, graph, chunks = init_vector_store_and_graph(st.session_state["pdf_text"], chunk_size, chunk_overlap) | |
| st.session_state["vector_store"] = vector_store | |
| st.session_state["graph"] = graph | |
| st.session_state["chunks"] = chunks | |
| st.success(f"Processed {len(chunks)} chunks and created embeddings.") | |
| for i, chunk in enumerate(chunks[:3]): | |
| st.markdown(f"**Chunk {i+1}:**") | |
| st.write(chunk) | |
| if "graph" in st.session_state and st.session_state["graph"] is not None: | |
| query_debug = st.text_input("Ask a question about the PDF:", key="query_tab2") | |
| if query_debug: | |
| state = State(question=query_debug, context=[], answer="") | |
| st.session_state["state"] = state | |
| with st.spinner("Retrieving context and generating answer..."): | |
| result_state = st.session_state["graph"].invoke(state) | |
| st.session_state["state"] = result_state | |
| if result_state.get("context"): | |
| st.success(f"Retrieved {len(result_state['context'])} documents.") | |
| st.markdown("### Answer:") | |
| st.write(result_state.get("answer", "No answer generated.")) | |
| else: | |
| st.warning("No relevant context found for the question.") | |
| # with tab1: | |
| # # Upload PDF | |
| # if file is not None: | |
| # temp_file_path = "temp_uploaded_file.pdf" | |
| # with open(temp_file_path, "wb") as temp_file: | |
| # temp_file.write(file.read()) | |
| # if st.button("Launch app"): | |
| # with st.spinner("Preloading information..."): | |
| # text = pdfminer_pdf_to_text(temp_file_path) | |
| # st.session_state["pdf_text"] = text | |
| # vector_store = InMemoryVectorStore(embeddings) | |
| # chunks, vectors = chunk_and_embed_pdf_text(st.session_state["pdf_text"], chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| # vector_store = InMemoryVectorStore(embeddings) | |
| # vector_store.add_texts(chunks) | |
| # st.session_state["vector_store"] = vector_store | |
| # st.session_state["graph"] = graph_init(vector_store) | |
| # st.success("App is ready to use!") | |
| # if "graph" in st.session_state: | |
| # query = st.text_input("Ask a question about the PDF:") | |
| # if query: | |
| # state = State(question=query, context=[], answer="") | |
| # st.session_state["state"] = state | |
| # with st.spinner("Retrieving context..."): | |
| # context = st.session_state["graph"].invoke(state) | |
| # st.session_state["state"]["context"] = context["context"] | |
| # if st.session_state["state"]["context"]: | |
| # st.success(f"Retrieved {len(st.session_state['state']['context'])} documents.") | |
| # with st.spinner("Generating answer..."): | |
| # answer = generate(st.session_state["state"]) | |
| # st.session_state["state"]["answer"] = answer["answer"] | |
| # st.markdown("### Answer:") | |
| # st.write(st.session_state["state"]["answer"]) | |
| # else: | |
| # st.warning("No relevant context found for the question.") | |
| # with tab2: | |
| # ### FIRST ETAPE ----UPLOAD THE PDF-FILE AND RETURN THE TEXT RESULT ---- | |
| # if file is not None: | |
| # st.info(f"Uploaded file: **{file.name}** ({file.size / 1024:.2f} KB)") | |
| # if st.button("Extract Text"): | |
| # temp_file_path = "temp_uploaded_file.pdf" | |
| # with open(temp_file_path, "wb") as temp_file: | |
| # temp_file.write(file.read()) | |
| # text = pdfminer_pdf_to_text(temp_file_path) | |
| # if os.path.exists(temp_file_path): | |
| # os.remove(temp_file_path) | |
| # if text: | |
| # st.success("Text extracted successfully!") | |
| # st.session_state["pdf_text"] = text | |
| # if st.checkbox("Show extracted text"): | |
| # st.text_area("Extracted Text", text, height=300) | |
| # st.download_button( | |
| # label="Download Extracted Text", | |
| # data=text, | |
| # file_name="extracted_text.txt", | |
| # mime="text/plain" | |
| # ) | |
| # else: | |
| # st.warning("No text extracted. Please check the PDF.") | |
| # else: | |
| # st.warning("Please upload a PDF file to proceed.") | |
| # # SECOND ETAPE ---- New button and logic for chunking & embedding ( no mongo db, only session state ) ---- | |
| # vector_store = InMemoryVectorStore(embeddings) | |
| # if "pdf_text" in st.session_state: | |
| # if st.button("Process and Embed Text"): | |
| # with st.spinner("Chunking and embedding text..."): | |
| # chunks, vectors = chunk_and_embed_pdf_text(st.session_state["pdf_text"], chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| # # Initialize vector store and add texts | |
| # vector_store = InMemoryVectorStore(embeddings) | |
| # vector_store.add_texts(chunks) | |
| # # Save vector store and graph in session state | |
| # st.session_state["vector_store"] = vector_store | |
| # st.session_state["graph"] = graph_init(vector_store) | |
| # st.success(f"Processed {len(chunks)} chunks and created embeddings.") | |
| # for i, chunk in enumerate(chunks[:3]): | |
| # st.markdown(f"**Chunk {i+1}:**") | |
| # st.write(chunk) | |
| # # THIRD ETAPE ---- Add a question and answer logic ---- | |
| # if "graph" in st.session_state: | |
| # query = st.text_input("Ask a question about the PDF:") | |
| # if query: | |
| # state = State(question=query, context=[], answer="") | |
| # st.session_state["state"] = state | |
| # with st.spinner("Retrieving context..."): | |
| # context = st.session_state["graph"].invoke(state) | |
| # st.session_state["state"]["context"] = context["context"] | |
| # if st.session_state["state"]["context"]: | |
| # st.success(f"Retrieved {len(st.session_state['state']['context'])} documents.") | |
| # with st.spinner("Generating answer..."): | |
| # answer = generate(st.session_state["state"]) | |
| # st.session_state["state"]["answer"] = answer["answer"] | |
| # st.markdown("### Answer:") | |
| # st.write(st.session_state["state"]["answer"]) | |
| # else: | |
| # st.warning("No relevant context found for the question.") |