# import os # import tempfile # import streamlit as st # from langchain.memory import ConversationBufferMemory # from langchain.chains import ConversationalRetrievalChain # from langchain.prompts import ChatPromptTemplate # from langchain.vectorstores import Chroma # from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings # from PIL import Image # from docx import Document # import PyPDF2 # import pytesseract # # Optional: Set Tesseract path # # pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' # Windows # # Streamlit UI config # st.set_page_config(page_title="๐Ÿ“„ Gemini RAG Summarizer", layout="wide") # # Custom CSS with softened radium colors, container backgrounds, and summary text styling # st.markdown(""" # # """, unsafe_allow_html=True) # # State initialization # for key in ['extracted_text', 'chat_history', 'summarized', 'file_uploader_key', 'file_uploaded']: # if key not in st.session_state: # st.session_state[key] = "" if key == "extracted_text" else [] if key == "chat_history" else False if key in ["summarized", "file_uploaded"] else 0 # # API Key # api_key = st.secrets.get("genai") or st.text_input("๐Ÿ” Enter Gemini API Key", type="password") # # Main title with softened radium glow # st.markdown("

๐Ÿ“„ Gemini-Powered RAG Summarizer

", unsafe_allow_html=True) # # Sidebar # with st.sidebar: # st.markdown("

Upload Your File

", unsafe_allow_html=True) # st.markdown("

Upload a .pdf, .docx, .txt, .png, or .jpg file

", unsafe_allow_html=True) # uploaded_file = st.file_uploader("", type=["pdf", "docx", "txt", "png", "jpg"], label_visibility="collapsed", accept_multiple_files=False, key=f"uploader_{st.session_state.file_uploader_key}") # # Sidebar buttons # if uploaded_file: # st.session_state.file_uploaded = True # if st.session_state.file_uploaded: # st.markdown("### Actions") # st.button("๐Ÿ” Summarize", key="summarize_button", on_click=lambda: st.session_state.update({"summarize_clicked": True}), help="Summarize the uploaded file") # st.button("๐Ÿ’ฌ Chat with Content", key="chat_button", on_click=lambda: st.session_state.update({"chat_clicked": True}), help="Chat about the file content") # st.button("๐Ÿงน Clear", key="clear_button", on_click=lambda: st.session_state.update({"clear_clicked": True}), help="Clear uploaded file and reset", type="secondary") # # Vector store setup # if api_key: # try: # embeddings = GoogleGenerativeAIEmbeddings(google_api_key=api_key, model="models/embedding-001") # persist_directory = "./chroma_db" # vector_store = Chroma(persist_directory=persist_directory, embedding_function=embeddings) # retriever = vector_store.as_retriever() # memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) # system_template = """ # You are a helpful assistant that summarizes content or answers questions based on provided documents. Follow user instructions exactly. # When listing key points, use structured markdown with bold headings and bullet points, like: # **Category** # - Detail 1 # - Detail 2 # Use the following context to generate the response: {context} # """ # prompt_template = ChatPromptTemplate.from_messages([("system", system_template), ("human", "{question}")]) # qa_chain = ConversationalRetrievalChain.from_llm( # llm=ChatGoogleGenerativeAI(google_api_key=api_key, model="gemini-1.5-flash", temperature=0.3), # retriever=retriever, # memory=memory, # combine_docs_chain_kwargs={"prompt": prompt_template} # ) # except Exception as e: # st.error(f"Failed to initialize RAG pipeline: {str(e)}") # qa_chain = None # else: # qa_chain = None # # Helpers # def rename_file(uploaded_file, prefix="file"): # ext = os.path.splitext(uploaded_file.name)[1] # return f"{prefix}_{uploaded_file.name.replace(' ', '_')}" # def extract_text_from_file(file_path, ext): # text = "" # try: # if ext == ".pdf": # with open(file_path, "rb") as f: # reader = PyPDF2.PdfReader(f) # for page in reader.pages: # text += page.extract_text() or "" # elif ext == ".docx": # doc = Document(file_path) # for para in doc.paragraphs: # text += para.text + "\n" # elif ext == ".txt": # with open(file_path, "r", encoding="utf-8") as f: # text = f.read() # except Exception as e: # st.error(f"Text extraction failed: {str(e)}") # return text # def ocr_from_image(image_path): # try: # img = Image.open(image_path) # text = pytesseract.image_to_string(img) # if not text.strip(): # raise ValueError("No text extracted from image.") # return text # except Exception as e: # raise Exception(f"OCR failed: {str(e)}") # def handle_text_and_rag(text, instruction): # if not text.strip(): # raise ValueError("No content found.") # try: # vector_store.add_texts([text]) # response = qa_chain.run(instruction) # return response # except Exception as e: # st.error(f"RAG processing failed: {str(e)}") # return None # def clear_state(): # st.session_state.extracted_text = "" # st.session_state.chat_history = [] # st.session_state.summarized = False # st.session_state.file_uploaded = False # st.session_state.file_uploader_key += 1 # st.session_state.pop('summarize_clicked', None) # st.session_state.pop('chat_clicked', None) # st.session_state.pop('clear_clicked', None) # st.rerun() # # File Processing and Interaction # if uploaded_file and qa_chain: # file_ext = os.path.splitext(uploaded_file.name)[1] # renamed_file = rename_file(uploaded_file, "upload") # with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp: # tmp.write(uploaded_file.read()) # tmp_path = tmp.name # st.markdown(f"
๐Ÿ“„ File uploaded: {renamed_file}
", unsafe_allow_html=True) # # Extract text immediately for chat availability # try: # if file_ext in [".pdf", ".docx", ".txt"]: # st.session_state.extracted_text = extract_text_from_file(tmp_path, file_ext) # elif file_ext in [".png", ".jpg"]: # st.session_state.extracted_text = ocr_from_image(tmp_path) # except Exception as e: # st.error(f"Initial text extraction failed: {str(e)}") # # Handle Summarize # if st.session_state.get('summarize_clicked', False): # try: # if not st.session_state.extracted_text.strip(): # st.error("โŒ No readable text found.") # else: # with st.container(): # st.markdown("
", unsafe_allow_html=True) # st.markdown("

๐Ÿ“‘ Summary

", unsafe_allow_html=True) # summary = handle_text_and_rag(st.session_state.extracted_text, "Summarize this content in 100 words.") # if summary: # st.markdown(f"
{summary}
", unsafe_allow_html=True) # st.session_state.summarized = True # st.markdown("
", unsafe_allow_html=True) # except Exception as e: # st.error(f"Processing failed: {str(e)}") # finally: # if os.path.exists(tmp_path): # os.remove(tmp_path) # # Handle Clear # if st.session_state.get('clear_clicked', False): # clear_state() # # Chat interface # if st.session_state.get('chat_clicked', False) and st.session_state.extracted_text: # with st.container(): # st.markdown("
", unsafe_allow_html=True) # st.markdown("

๐Ÿ’ฌ Chat with Your Content

", unsafe_allow_html=True) # if not st.session_state.chat_history: # st.info("Ask questions like: 'Summarize in 50 words' or 'List main points'.") # # Display chat history (user question followed by assistant response) # for msg in st.session_state.chat_history: # with st.chat_message(msg["role"]): # st.markdown(f"
{msg['content']}
", unsafe_allow_html=True) # st.markdown("
", unsafe_allow_html=True) # # Place chat input at the bottom, outside the container # prompt = st.chat_input("Ask about the content...") # if prompt: # st.session_state.chat_history.append({"role": "user", "content": prompt}) # with st.container(): # st.markdown("
", unsafe_allow_html=True) # with st.chat_message("user"): # st.markdown(f"
{prompt}
", unsafe_allow_html=True) # try: # response = handle_text_and_rag(st.session_state.extracted_text, prompt) # if response: # st.session_state.chat_history.append({"role": "assistant", "content": response}) # with st.chat_message("assistant"): # st.markdown(f"
{response}
", unsafe_allow_html=True) # except Exception as e: # st.error(f"Chat failed: {str(e)}") # st.markdown("
", unsafe_allow_html=True) # else: # if uploaded_file and not qa_chain: # st.error("๐Ÿ”‘ Invalid or missing Gemini API Key.") # elif not uploaded_file and api_key: # st.info("Please upload a file to begin.") # elif not api_key: # st.warning("๐Ÿ”‘ Please enter your Gemini API Key to begimport os import os import tempfile import streamlit as st from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain.prompts import ChatPromptTemplate from langchain.vectorstores import Chroma from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings from PIL import Image from docx import Document import PyPDF2 import pytesseract import speech_recognition as sr from pytube import YouTube import requests from bs4 import BeautifulSoup # Streamlit UI config st.set_page_config(page_title="๐Ÿ“„ Gemini RAG Summarizer", layout="wide") # Custom CSS with softened radium colors, container backgrounds, and summary text styling st.markdown(""" """, unsafe_allow_html=True) # State initialization for key in ['extracted_text', 'chat_history', 'summarized', 'file_uploader_key', 'file_uploaded']: if key not in st.session_state: st.session_state[key] = "" if key == "extracted_text" else [] if key == "chat_history" else False if key in ["summarized", "file_uploaded"] else 0 # API Key api_key = st.secrets.get("genai") or st.text_input("๐Ÿ” Enter Gemini API Key", type="password") # Main title with softened radium glow st.markdown("

๐Ÿ“„ Gemini-Powered RAG Summarizer

", unsafe_allow_html=True) # Sidebar with st.sidebar: st.markdown("

Upload Your File or Paste a URL

", unsafe_allow_html=True) st.markdown("

Upload a .pdf, .docx, .txt, .png, or .jpg file

", unsafe_allow_html=True) uploaded_file = st.file_uploader("", type=["pdf", "docx", "txt", "png", "jpg"], label_visibility="collapsed", accept_multiple_files=False, key=f"uploader_{st.session_state.file_uploader_key}") st.markdown("

Or paste a YouTube video URL or webpage URL

", unsafe_allow_html=True) url_input = st.text_input("Paste URL here", key="url_input") st.markdown("

โš ๏ธ Note: Summarizing YouTube videos may raise legal concerns. Please obtain the creator's permission before summarizing copyrighted content.

", unsafe_allow_html=True) # Sidebar buttons if uploaded_file or url_input: st.session_state.file_uploaded = True if st.session_state.file_uploaded: st.markdown("### Actions") st.button("๐Ÿ” Summarize", key="summarize_button", on_click=lambda: st.session_state.update({"summarize_clicked": True}), help="Summarize the uploaded file or URL content") st.button("๐Ÿ’ฌ Chat with Content", key="chat_button", on_click=lambda: st.session_state.update({"chat_clicked": True}), help="Chat about the file or URL content") st.button("๐Ÿงน Clear", key="clear_button", on_click=lambda: st.session_state.update({"clear_clicked": True}), help="Clear uploaded file/URL and reset", type="secondary") # Vector store setup if api_key: try: embeddings = GoogleGenerativeAIEmbeddings(google_api_key=api_key, model="models/embedding-001") persist_directory = "./chroma_db" vector_store = Chroma(persist_directory=persist_directory, embedding_function=embeddings) retriever = vector_store.as_retriever() memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) system_template = """ You are a helpful assistant that summarizes content or answers questions based on provided documents or webpages. Follow user instructions exactly. When listing key points, use structured markdown with bold headings and bullet points, like: **Category** - Detail 1 - Detail 2 Use the following context to generate the response: {context} """ prompt_template = ChatPromptTemplate.from_messages([("system", system_template), ("human", "{question}")]) qa_chain = ConversationalRetrievalChain.from_llm( llm=ChatGoogleGenerativeAI(google_api_key=api_key, model="gemini-2.0-flash", temperature=0.3), retriever=retriever, memory=memory, combine_docs_chain_kwargs={"prompt": prompt_template} ) except Exception as e: st.error(f"Failed to initialize RAG pipeline: {str(e)}") qa_chain = None else: qa_chain = None # Helpers def rename_file(uploaded_file, prefix="file"): ext = os.path.splitext(uploaded_file.name)[1] return f"{prefix}_{uploaded_file.name.replace(' ', '_')}" def extract_text_from_file(file_path, ext): text = "" try: if ext == ".pdf": with open(file_path, "rb") as f: reader = PyPDF2.PdfReader(f) for page in reader.pages: text += page.extract_text() or "" elif ext == ".docx": doc = Document(file_path) for para in doc.paragraphs: text += para.text + "\n" elif ext == ".txt": with open(file_path, "r", encoding="utf-8") as f: text = f.read() except Exception as e: st.error(f"Text extraction failed: {str(e)}") return text def ocr_from_image(image_path): try: img = Image.open(image_path) text = pytesseract.image_to_string(img) if not text.strip(): raise ValueError("No text extracted from image.") return text except Exception as e: raise Exception(f"OCR failed: {str(e)}") def transcribe_audio(audio_path): try: recognizer = sr.Recognizer() with sr.AudioFile(audio_path) as source: audio = recognizer.record(source) text = recognizer.recognize_google(audio) os.remove(audio_path) # Clean up audio file return text except Exception as e: raise Exception(f"Audio transcription failed: {str(e)}") def download_youtube_video(url): try: yt = YouTube(url) # Debugging: Display video title and availability st.info(f"Attempting to access YouTube video: {yt.title}") stream = yt.streams.filter(only_audio=True).first() if not stream: raise ValueError("No audio stream available for this video. The video may be private, restricted, or deleted.") audio_path = stream.download(filename="temp_audio") return audio_path except Exception as e: if "404" in str(e).lower(): raise Exception("YouTube video access failed with a 404 error. The video may have been removed, set to private, or is unavailable in your region.") raise Exception(f"YouTube video download failed: {str(e)}") def extract_text_from_url(url): try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") for script in soup(["script", "style", "header", "footer", "nav"]): script.decompose() main_content = soup.find("article") or soup.find("div", class_="content") or soup text = " ".join(main_content.stripped_strings) if not text.strip(): raise ValueError("No text extracted from webpage.") return text[:5000] except Exception as e: raise Exception(f"Webpage text extraction failed: {str(e)}") def handle_text_and_rag(text, instruction): if not text.strip(): raise ValueError("No content found.") try: vector_store.add_texts([text]) response = qa_chain.run(instruction) return response except Exception as e: st.error(f"RAG processing failed: {str(e)}") return None def clear_state(): st.session_state.extracted_text = "" st.session_state.chat_history = [] st.session_state.summarized = False st.session_state.file_uploaded = False st.session_state.file_uploader_key += 1 st.session_state.pop('summarize_clicked', None) st.session_state.pop('chat_clicked', None) st.session_state.pop('clear_clicked', None) st.rerun() # File Processing and Interaction if (uploaded_file or url_input) and qa_chain: file_ext = None tmp_path = None source_name = "URL content" # Reset extracted text to avoid reusing old content st.session_state.extracted_text = "" # Handle uploaded file if uploaded_file: file_ext = os.path.splitext(uploaded_file.name)[1].lower() source_name = rename_file(uploaded_file, "upload") with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp: tmp.write(uploaded_file.read()) tmp_path = tmp.name # Handle URL input if url_input: if "youtube.com" in url_input or "youtu.be" in url_input: try: audio_path = download_youtube_video(url_input) tmp_path = audio_path file_ext = ".wav" source_name = "YouTube video" except Exception as e: st.error(str(e)) tmp_path = None else: try: st.session_state.extracted_text = extract_text_from_url(url_input) source_name = "Webpage content" except Exception as e: st.error(str(e)) st.session_state.extracted_text = "" # Display success message if uploaded_file or url_input: st.markdown(f"
๐Ÿ“„ {source_name} processed
", unsafe_allow_html=True) # Extract text based on file type if tmp_path and file_ext: try: if file_ext in [".pdf", ".docx", ".txt"]: st.session_state.extracted_text = extract_text_from_file(tmp_path, file_ext) elif file_ext in [".png", ".jpg"]: st.session_state.extracted_text = ocr_from_image(tmp_path) elif file_ext == ".wav": st.session_state.extracted_text = transcribe_audio(tmp_path) except Exception as e: st.error(f"Initial content extraction failed: {str(e)}") finally: if tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) # Debugging: Display extracted text for verification if st.session_state.extracted_text: with st.expander("View Extracted Text (Debugging)"): st.text_area("Extracted Content", st.session_state.extracted_text, height=200) # Handle Summarize if st.session_state.get('summarize_clicked', False): try: if not st.session_state.extracted_text.strip(): st.error("โŒ No readable content found.") else: with st.container(): st.markdown("
", unsafe_allow_html=True) st.markdown("

๐Ÿ“‘ Summary

", unsafe_allow_html=True) summary = handle_text_and_rag(st.session_state.extracted_text, "Summarize this content in 100 words.") if summary: st.markdown(f"
{summary}
", unsafe_allow_html=True) st.session_state.summarized = True st.markdown("
", unsafe_allow_html=True) except Exception as e: st.error(f"Processing failed: {str(e)}") # Handle Clear if st.session_state.get('clear_clicked', False): clear_state() # Chat interface if st.session_state.get('chat_clicked', False) and st.session_state.extracted_text: with st.container(): st.markdown("
", unsafe_allow_html=True) st.markdown("

๐Ÿ’ฌ Chat with Your Content

", unsafe_allow_html=True) if not st.session_state.chat_history: st.info("Ask questions like: 'Summarize in 50 words' or 'List main points'.") # Display chat history for msg in st.session_state.chat_history: with st.chat_message(msg["role"]): st.markdown(f"
{msg['content']}
", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Place chat input at the bottom prompt = st.chat_input("Ask about the content...") if prompt: st.session_state.chat_history.append({"role": "user", "content": prompt}) with st.container(): st.markdown("
", unsafe_allow_html=True) with st.chat_message("user"): st.markdown(f"
{prompt}
", unsafe_allow_html=True) try: response = handle_text_and_rag(st.session_state.extracted_text, prompt) if response: st.session_state.chat_history.append({"role": "assistant", "content": response}) with st.chat_message("assistant"): st.markdown(f"
{response}
", unsafe_allow_html=True) except Exception as e: st.error(f"Chat failed: {str(e)}") st.markdown("
", unsafe_allow_html=True) else: if (uploaded_file or url_input) and not qa_chain: st.error("๐Ÿ”‘ Invalid or missing Gemini API Key.") elif not uploaded_file and not url_input and api_key: st.info("Please upload a file or paste a URL to begin.") elif not api_key: st.warning("๐Ÿ”‘ Please enter your Gemini API Key to begin.")