# import os
# import tempfile
# import streamlit as st
# from langchain.memory import ConversationBufferMemory
# from langchain.chains import ConversationalRetrievalChain
# from langchain.prompts import ChatPromptTemplate
# from langchain.vectorstores import Chroma
# from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
# from PIL import Image
# from docx import Document
# import PyPDF2
# import pytesseract
# # Optional: Set Tesseract path
# # pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' # Windows
# # Streamlit UI config
# st.set_page_config(page_title="๐ Gemini RAG Summarizer", layout="wide")
# # Custom CSS with softened radium colors, container backgrounds, and summary text styling
# st.markdown("""
#
# """, unsafe_allow_html=True)
# # State initialization
# for key in ['extracted_text', 'chat_history', 'summarized', 'file_uploader_key', 'file_uploaded']:
# if key not in st.session_state:
# st.session_state[key] = "" if key == "extracted_text" else [] if key == "chat_history" else False if key in ["summarized", "file_uploaded"] else 0
# # API Key
# api_key = st.secrets.get("genai") or st.text_input("๐ Enter Gemini API Key", type="password")
# # Main title with softened radium glow
# st.markdown("
๐ Gemini-Powered RAG Summarizer
", unsafe_allow_html=True)
# # Sidebar
# with st.sidebar:
# st.markdown("Upload Your File
", unsafe_allow_html=True)
# st.markdown("Upload a .pdf, .docx, .txt, .png, or .jpg file
", unsafe_allow_html=True)
# uploaded_file = st.file_uploader("", type=["pdf", "docx", "txt", "png", "jpg"], label_visibility="collapsed", accept_multiple_files=False, key=f"uploader_{st.session_state.file_uploader_key}")
# # Sidebar buttons
# if uploaded_file:
# st.session_state.file_uploaded = True
# if st.session_state.file_uploaded:
# st.markdown("### Actions")
# st.button("๐ Summarize", key="summarize_button", on_click=lambda: st.session_state.update({"summarize_clicked": True}), help="Summarize the uploaded file")
# st.button("๐ฌ Chat with Content", key="chat_button", on_click=lambda: st.session_state.update({"chat_clicked": True}), help="Chat about the file content")
# st.button("๐งน Clear", key="clear_button", on_click=lambda: st.session_state.update({"clear_clicked": True}), help="Clear uploaded file and reset", type="secondary")
# # Vector store setup
# if api_key:
# try:
# embeddings = GoogleGenerativeAIEmbeddings(google_api_key=api_key, model="models/embedding-001")
# persist_directory = "./chroma_db"
# vector_store = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
# retriever = vector_store.as_retriever()
# memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# system_template = """
# You are a helpful assistant that summarizes content or answers questions based on provided documents. Follow user instructions exactly.
# When listing key points, use structured markdown with bold headings and bullet points, like:
# **Category**
# - Detail 1
# - Detail 2
# Use the following context to generate the response: {context}
# """
# prompt_template = ChatPromptTemplate.from_messages([("system", system_template), ("human", "{question}")])
# qa_chain = ConversationalRetrievalChain.from_llm(
# llm=ChatGoogleGenerativeAI(google_api_key=api_key, model="gemini-1.5-flash", temperature=0.3),
# retriever=retriever,
# memory=memory,
# combine_docs_chain_kwargs={"prompt": prompt_template}
# )
# except Exception as e:
# st.error(f"Failed to initialize RAG pipeline: {str(e)}")
# qa_chain = None
# else:
# qa_chain = None
# # Helpers
# def rename_file(uploaded_file, prefix="file"):
# ext = os.path.splitext(uploaded_file.name)[1]
# return f"{prefix}_{uploaded_file.name.replace(' ', '_')}"
# def extract_text_from_file(file_path, ext):
# text = ""
# try:
# if ext == ".pdf":
# with open(file_path, "rb") as f:
# reader = PyPDF2.PdfReader(f)
# for page in reader.pages:
# text += page.extract_text() or ""
# elif ext == ".docx":
# doc = Document(file_path)
# for para in doc.paragraphs:
# text += para.text + "\n"
# elif ext == ".txt":
# with open(file_path, "r", encoding="utf-8") as f:
# text = f.read()
# except Exception as e:
# st.error(f"Text extraction failed: {str(e)}")
# return text
# def ocr_from_image(image_path):
# try:
# img = Image.open(image_path)
# text = pytesseract.image_to_string(img)
# if not text.strip():
# raise ValueError("No text extracted from image.")
# return text
# except Exception as e:
# raise Exception(f"OCR failed: {str(e)}")
# def handle_text_and_rag(text, instruction):
# if not text.strip():
# raise ValueError("No content found.")
# try:
# vector_store.add_texts([text])
# response = qa_chain.run(instruction)
# return response
# except Exception as e:
# st.error(f"RAG processing failed: {str(e)}")
# return None
# def clear_state():
# st.session_state.extracted_text = ""
# st.session_state.chat_history = []
# st.session_state.summarized = False
# st.session_state.file_uploaded = False
# st.session_state.file_uploader_key += 1
# st.session_state.pop('summarize_clicked', None)
# st.session_state.pop('chat_clicked', None)
# st.session_state.pop('clear_clicked', None)
# st.rerun()
# # File Processing and Interaction
# if uploaded_file and qa_chain:
# file_ext = os.path.splitext(uploaded_file.name)[1]
# renamed_file = rename_file(uploaded_file, "upload")
# with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp:
# tmp.write(uploaded_file.read())
# tmp_path = tmp.name
# st.markdown(f"๐ File uploaded: {renamed_file}
", unsafe_allow_html=True)
# # Extract text immediately for chat availability
# try:
# if file_ext in [".pdf", ".docx", ".txt"]:
# st.session_state.extracted_text = extract_text_from_file(tmp_path, file_ext)
# elif file_ext in [".png", ".jpg"]:
# st.session_state.extracted_text = ocr_from_image(tmp_path)
# except Exception as e:
# st.error(f"Initial text extraction failed: {str(e)}")
# # Handle Summarize
# if st.session_state.get('summarize_clicked', False):
# try:
# if not st.session_state.extracted_text.strip():
# st.error("โ No readable text found.")
# else:
# with st.container():
# st.markdown("", unsafe_allow_html=True)
# st.markdown("
๐ Summary
", unsafe_allow_html=True)
# summary = handle_text_and_rag(st.session_state.extracted_text, "Summarize this content in 100 words.")
# if summary:
# st.markdown(f"
{summary}
", unsafe_allow_html=True)
# st.session_state.summarized = True
# st.markdown("
", unsafe_allow_html=True)
# except Exception as e:
# st.error(f"Processing failed: {str(e)}")
# finally:
# if os.path.exists(tmp_path):
# os.remove(tmp_path)
# # Handle Clear
# if st.session_state.get('clear_clicked', False):
# clear_state()
# # Chat interface
# if st.session_state.get('chat_clicked', False) and st.session_state.extracted_text:
# with st.container():
# st.markdown("", unsafe_allow_html=True)
# st.markdown("
๐ฌ Chat with Your Content
", unsafe_allow_html=True)
# if not st.session_state.chat_history:
# st.info("Ask questions like: 'Summarize in 50 words' or 'List main points'.")
# # Display chat history (user question followed by assistant response)
# for msg in st.session_state.chat_history:
# with st.chat_message(msg["role"]):
# st.markdown(f"
{msg['content']}
", unsafe_allow_html=True)
# st.markdown("
", unsafe_allow_html=True)
# # Place chat input at the bottom, outside the container
# prompt = st.chat_input("Ask about the content...")
# if prompt:
# st.session_state.chat_history.append({"role": "user", "content": prompt})
# with st.container():
# st.markdown("", unsafe_allow_html=True)
# with st.chat_message("user"):
# st.markdown(f"
{prompt}
", unsafe_allow_html=True)
# try:
# response = handle_text_and_rag(st.session_state.extracted_text, prompt)
# if response:
# st.session_state.chat_history.append({"role": "assistant", "content": response})
# with st.chat_message("assistant"):
# st.markdown(f"
{response}
", unsafe_allow_html=True)
# except Exception as e:
# st.error(f"Chat failed: {str(e)}")
# st.markdown("
", unsafe_allow_html=True)
# else:
# if uploaded_file and not qa_chain:
# st.error("๐ Invalid or missing Gemini API Key.")
# elif not uploaded_file and api_key:
# st.info("Please upload a file to begin.")
# elif not api_key:
# st.warning("๐ Please enter your Gemini API Key to begimport os
import os
import tempfile
import streamlit as st
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.prompts import ChatPromptTemplate
from langchain.vectorstores import Chroma
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from PIL import Image
from docx import Document
import PyPDF2
import pytesseract
import speech_recognition as sr
from pytube import YouTube
import requests
from bs4 import BeautifulSoup
# Streamlit UI config
st.set_page_config(page_title="๐ Gemini RAG Summarizer", layout="wide")
# Custom CSS with softened radium colors, container backgrounds, and summary text styling
st.markdown("""
""", unsafe_allow_html=True)
# State initialization
for key in ['extracted_text', 'chat_history', 'summarized', 'file_uploader_key', 'file_uploaded']:
if key not in st.session_state:
st.session_state[key] = "" if key == "extracted_text" else [] if key == "chat_history" else False if key in ["summarized", "file_uploaded"] else 0
# API Key
api_key = st.secrets.get("genai") or st.text_input("๐ Enter Gemini API Key", type="password")
# Main title with softened radium glow
st.markdown("๐ Gemini-Powered RAG Summarizer
", unsafe_allow_html=True)
# Sidebar
with st.sidebar:
st.markdown("Upload Your File or Paste a URL
", unsafe_allow_html=True)
st.markdown("Upload a .pdf, .docx, .txt, .png, or .jpg file
", unsafe_allow_html=True)
uploaded_file = st.file_uploader("", type=["pdf", "docx", "txt", "png", "jpg"], label_visibility="collapsed", accept_multiple_files=False, key=f"uploader_{st.session_state.file_uploader_key}")
st.markdown("Or paste a YouTube video URL or webpage URL
", unsafe_allow_html=True)
url_input = st.text_input("Paste URL here", key="url_input")
st.markdown("โ ๏ธ Note: Summarizing YouTube videos may raise legal concerns. Please obtain the creator's permission before summarizing copyrighted content.
", unsafe_allow_html=True)
# Sidebar buttons
if uploaded_file or url_input:
st.session_state.file_uploaded = True
if st.session_state.file_uploaded:
st.markdown("### Actions")
st.button("๐ Summarize", key="summarize_button", on_click=lambda: st.session_state.update({"summarize_clicked": True}), help="Summarize the uploaded file or URL content")
st.button("๐ฌ Chat with Content", key="chat_button", on_click=lambda: st.session_state.update({"chat_clicked": True}), help="Chat about the file or URL content")
st.button("๐งน Clear", key="clear_button", on_click=lambda: st.session_state.update({"clear_clicked": True}), help="Clear uploaded file/URL and reset", type="secondary")
# Vector store setup
if api_key:
try:
embeddings = GoogleGenerativeAIEmbeddings(google_api_key=api_key, model="models/embedding-001")
persist_directory = "./chroma_db"
vector_store = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
retriever = vector_store.as_retriever()
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
system_template = """
You are a helpful assistant that summarizes content or answers questions based on provided documents or webpages. Follow user instructions exactly.
When listing key points, use structured markdown with bold headings and bullet points, like:
**Category**
- Detail 1
- Detail 2
Use the following context to generate the response: {context}
"""
prompt_template = ChatPromptTemplate.from_messages([("system", system_template), ("human", "{question}")])
qa_chain = ConversationalRetrievalChain.from_llm(
llm=ChatGoogleGenerativeAI(google_api_key=api_key, model="gemini-2.0-flash", temperature=0.3),
retriever=retriever,
memory=memory,
combine_docs_chain_kwargs={"prompt": prompt_template}
)
except Exception as e:
st.error(f"Failed to initialize RAG pipeline: {str(e)}")
qa_chain = None
else:
qa_chain = None
# Helpers
def rename_file(uploaded_file, prefix="file"):
ext = os.path.splitext(uploaded_file.name)[1]
return f"{prefix}_{uploaded_file.name.replace(' ', '_')}"
def extract_text_from_file(file_path, ext):
text = ""
try:
if ext == ".pdf":
with open(file_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text += page.extract_text() or ""
elif ext == ".docx":
doc = Document(file_path)
for para in doc.paragraphs:
text += para.text + "\n"
elif ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
except Exception as e:
st.error(f"Text extraction failed: {str(e)}")
return text
def ocr_from_image(image_path):
try:
img = Image.open(image_path)
text = pytesseract.image_to_string(img)
if not text.strip():
raise ValueError("No text extracted from image.")
return text
except Exception as e:
raise Exception(f"OCR failed: {str(e)}")
def transcribe_audio(audio_path):
try:
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio = recognizer.record(source)
text = recognizer.recognize_google(audio)
os.remove(audio_path) # Clean up audio file
return text
except Exception as e:
raise Exception(f"Audio transcription failed: {str(e)}")
def download_youtube_video(url):
try:
yt = YouTube(url)
# Debugging: Display video title and availability
st.info(f"Attempting to access YouTube video: {yt.title}")
stream = yt.streams.filter(only_audio=True).first()
if not stream:
raise ValueError("No audio stream available for this video. The video may be private, restricted, or deleted.")
audio_path = stream.download(filename="temp_audio")
return audio_path
except Exception as e:
if "404" in str(e).lower():
raise Exception("YouTube video access failed with a 404 error. The video may have been removed, set to private, or is unavailable in your region.")
raise Exception(f"YouTube video download failed: {str(e)}")
def extract_text_from_url(url):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style", "header", "footer", "nav"]):
script.decompose()
main_content = soup.find("article") or soup.find("div", class_="content") or soup
text = " ".join(main_content.stripped_strings)
if not text.strip():
raise ValueError("No text extracted from webpage.")
return text[:5000]
except Exception as e:
raise Exception(f"Webpage text extraction failed: {str(e)}")
def handle_text_and_rag(text, instruction):
if not text.strip():
raise ValueError("No content found.")
try:
vector_store.add_texts([text])
response = qa_chain.run(instruction)
return response
except Exception as e:
st.error(f"RAG processing failed: {str(e)}")
return None
def clear_state():
st.session_state.extracted_text = ""
st.session_state.chat_history = []
st.session_state.summarized = False
st.session_state.file_uploaded = False
st.session_state.file_uploader_key += 1
st.session_state.pop('summarize_clicked', None)
st.session_state.pop('chat_clicked', None)
st.session_state.pop('clear_clicked', None)
st.rerun()
# File Processing and Interaction
if (uploaded_file or url_input) and qa_chain:
file_ext = None
tmp_path = None
source_name = "URL content"
# Reset extracted text to avoid reusing old content
st.session_state.extracted_text = ""
# Handle uploaded file
if uploaded_file:
file_ext = os.path.splitext(uploaded_file.name)[1].lower()
source_name = rename_file(uploaded_file, "upload")
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp:
tmp.write(uploaded_file.read())
tmp_path = tmp.name
# Handle URL input
if url_input:
if "youtube.com" in url_input or "youtu.be" in url_input:
try:
audio_path = download_youtube_video(url_input)
tmp_path = audio_path
file_ext = ".wav"
source_name = "YouTube video"
except Exception as e:
st.error(str(e))
tmp_path = None
else:
try:
st.session_state.extracted_text = extract_text_from_url(url_input)
source_name = "Webpage content"
except Exception as e:
st.error(str(e))
st.session_state.extracted_text = ""
# Display success message
if uploaded_file or url_input:
st.markdown(f"๐ {source_name} processed
", unsafe_allow_html=True)
# Extract text based on file type
if tmp_path and file_ext:
try:
if file_ext in [".pdf", ".docx", ".txt"]:
st.session_state.extracted_text = extract_text_from_file(tmp_path, file_ext)
elif file_ext in [".png", ".jpg"]:
st.session_state.extracted_text = ocr_from_image(tmp_path)
elif file_ext == ".wav":
st.session_state.extracted_text = transcribe_audio(tmp_path)
except Exception as e:
st.error(f"Initial content extraction failed: {str(e)}")
finally:
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
# Debugging: Display extracted text for verification
if st.session_state.extracted_text:
with st.expander("View Extracted Text (Debugging)"):
st.text_area("Extracted Content", st.session_state.extracted_text, height=200)
# Handle Summarize
if st.session_state.get('summarize_clicked', False):
try:
if not st.session_state.extracted_text.strip():
st.error("โ No readable content found.")
else:
with st.container():
st.markdown("", unsafe_allow_html=True)
st.markdown("
๐ Summary
", unsafe_allow_html=True)
summary = handle_text_and_rag(st.session_state.extracted_text, "Summarize this content in 100 words.")
if summary:
st.markdown(f"
{summary}
", unsafe_allow_html=True)
st.session_state.summarized = True
st.markdown("
", unsafe_allow_html=True)
except Exception as e:
st.error(f"Processing failed: {str(e)}")
# Handle Clear
if st.session_state.get('clear_clicked', False):
clear_state()
# Chat interface
if st.session_state.get('chat_clicked', False) and st.session_state.extracted_text:
with st.container():
st.markdown("", unsafe_allow_html=True)
st.markdown("
๐ฌ Chat with Your Content
", unsafe_allow_html=True)
if not st.session_state.chat_history:
st.info("Ask questions like: 'Summarize in 50 words' or 'List main points'.")
# Display chat history
for msg in st.session_state.chat_history:
with st.chat_message(msg["role"]):
st.markdown(f"
{msg['content']}
", unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
# Place chat input at the bottom
prompt = st.chat_input("Ask about the content...")
if prompt:
st.session_state.chat_history.append({"role": "user", "content": prompt})
with st.container():
st.markdown("", unsafe_allow_html=True)
with st.chat_message("user"):
st.markdown(f"
{prompt}
", unsafe_allow_html=True)
try:
response = handle_text_and_rag(st.session_state.extracted_text, prompt)
if response:
st.session_state.chat_history.append({"role": "assistant", "content": response})
with st.chat_message("assistant"):
st.markdown(f"
{response}
", unsafe_allow_html=True)
except Exception as e:
st.error(f"Chat failed: {str(e)}")
st.markdown("
", unsafe_allow_html=True)
else:
if (uploaded_file or url_input) and not qa_chain:
st.error("๐ Invalid or missing Gemini API Key.")
elif not uploaded_file and not url_input and api_key:
st.info("Please upload a file or paste a URL to begin.")
elif not api_key:
st.warning("๐ Please enter your Gemini API Key to begin.")