Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import validators | |
| import streamlit as st | |
| from typing import List, Dict, Any | |
| from langchain.prompts import PromptTemplate | |
| from langchain_groq import ChatGroq | |
| from langchain.chains.summarize import load_summarize_chain | |
| from langchain_community.document_loaders import YoutubeLoader, UnstructuredURLLoader, PyPDFLoader, TextLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document | |
| from langchain.vectorstores import FAISS | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.chains import RetrievalQA | |
| from dotenv import load_dotenv | |
| class ContentProcessor: | |
| def __init__(self): | |
| self.configure_streamlit() | |
| self.default_api_key = "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" | |
| self.initialize_session_state() | |
| def configure_streamlit(self): | |
| st.set_page_config(page_title="LangChain: Process Content from Multiple Sources", page_icon="🦜") | |
| st.title("🦜 LangChain: Process Content from Multiple Sources") | |
| def initialize_session_state(self): | |
| if 'action_count' not in st.session_state: | |
| st.session_state.action_count = 0 | |
| if 'docs' not in st.session_state: | |
| st.session_state.docs = None | |
| if 'retriever' not in st.session_state: | |
| st.session_state.retriever = None | |
| def calculate_chunk_size(self, text_length: int, model_context_length: int) -> int: | |
| target_chunk_size = model_context_length // 3 | |
| return max(1000, min(target_chunk_size, model_context_length // 2)) | |
| def get_configuration(self) -> Dict[str, Any]: | |
| with st.sidebar: | |
| st.header("Configuration") | |
| if st.session_state.action_count >= 3: | |
| groq_api_key = st.text_input("Groq API Key", type="password") | |
| else: | |
| groq_api_key = self.default_api_key | |
| st.info(f"Using default API key. {3 - st.session_state.action_count} free actions remaining.") | |
| model = st.selectbox("Select Model", ["llama3-8b-8192", "gemma2-9b-it", "mixtral-8x7b-32768"]) | |
| st.header("Task") | |
| task = st.radio("Choose task", ["Process Content", "Interactive Q&A"], index=0) | |
| return {"groq_api_key": groq_api_key, "model": model, "task": task} | |
| def get_sources(self) -> Dict[str, Any]: | |
| st.subheader('Select Sources to Process') | |
| use_urls = st.checkbox("URLs (YouTube or websites)") | |
| use_files = st.checkbox("File Upload (PDF or text files)") | |
| use_text = st.checkbox("Text Input") | |
| sources = {} | |
| if use_urls: | |
| sources['urls'] = st.text_area("Enter URLs (one per line)", placeholder="https://example.com\nhttps://youtube.com/watch?v=...") | |
| if use_files: | |
| sources['files'] = st.file_uploader("Upload PDF or text files", type=["pdf", "txt"], accept_multiple_files=True) | |
| if use_text: | |
| sources['text'] = st.text_area("Enter text content", placeholder="Paste your text here...") | |
| return sources | |
| def process_pdf(self, uploaded_file) -> List[Document]: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
| temp_file.write(uploaded_file.getvalue()) | |
| temp_file_path = temp_file.name | |
| loader = PyPDFLoader(temp_file_path) | |
| pdf_pages = loader.load() | |
| st.sidebar.write(f"Processing PDF: {uploaded_file.name}") | |
| st.sidebar.write(f"Total pages: {len(pdf_pages)}") | |
| os.unlink(temp_file_path) | |
| return pdf_pages | |
| def process_content(self, sources: Dict[str, Any]) -> List[Document]: | |
| all_docs = [] | |
| if 'urls' in sources and sources['urls']: | |
| url_list = [url.strip() for url in sources['urls'].split('\n') if url.strip()] | |
| for url in url_list: | |
| if not validators.url(url): | |
| st.warning(f"Skipping invalid URL: {url}") | |
| continue | |
| if "youtube.com" in url or "youtu.be" in url: | |
| loader = YoutubeLoader.from_youtube_url(url, add_video_info=True) | |
| st.info(f"Processing YouTube video: {url}") | |
| else: | |
| loader = UnstructuredURLLoader( | |
| urls=[url], | |
| ssl_verify=False, | |
| headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"} | |
| ) | |
| st.info(f"Processing website content: {url}") | |
| docs = loader.load() | |
| all_docs.extend(docs) | |
| if 'files' in sources and sources['files']: | |
| for uploaded_file in sources['files']: | |
| if uploaded_file.type == "application/pdf": | |
| st.info(f"Processing PDF: {uploaded_file.name}") | |
| all_docs.extend(self.process_pdf(uploaded_file)) | |
| else: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file: | |
| temp_file.write(uploaded_file.getvalue()) | |
| temp_file_path = temp_file.name | |
| loader = TextLoader(temp_file_path) | |
| st.info(f"Processing text file: {uploaded_file.name}") | |
| docs = loader.load() | |
| all_docs.extend(docs) | |
| os.unlink(temp_file_path) | |
| if 'text' in sources and sources['text']: | |
| with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".txt", encoding="utf-8") as temp_file: | |
| temp_file.write(sources['text']) | |
| temp_file_path = temp_file.name | |
| loader = TextLoader(temp_file_path) | |
| docs = loader.load() | |
| all_docs.extend(docs) | |
| st.info("Processing text input") | |
| os.unlink(temp_file_path) | |
| return all_docs | |
| def create_prompts(self) -> Dict[str, PromptTemplate]: | |
| prompt_template = """ | |
| Provide a {action} of the following content: | |
| Content: {text} | |
| {action}: | |
| """ | |
| refine_template = """ | |
| We have provided an existing {action} of the content: {existing_answer} | |
| We have some additional content to incorporate: {text} | |
| Given this new information, please refine and update the existing {action}. | |
| Refined {action}: | |
| """ | |
| return { | |
| "prompt": PromptTemplate(input_variables=['text', 'action'], template=prompt_template), | |
| "refine_prompt": PromptTemplate(input_variables=['text', 'action', 'existing_answer'], template=refine_template) | |
| } | |
| def process_documents(self, docs: List[Document], action: str, config: Dict[str, Any]) -> str: | |
| llm = ChatGroq(model=config['model'], groq_api_key=config['groq_api_key']) | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=self.calculate_chunk_size(sum(len(doc.page_content) for doc in docs), 8192), | |
| chunk_overlap=200 | |
| ) | |
| split_docs = text_splitter.split_documents(docs) | |
| prompts = self.create_prompts() | |
| chain = load_summarize_chain( | |
| llm=llm, | |
| chain_type="refine", | |
| question_prompt=prompts["prompt"], | |
| refine_prompt=prompts["refine_prompt"] | |
| ) | |
| result = chain.run(input_documents=split_docs, action=action.lower()) | |
| st.session_state.action_count += 1 | |
| return result | |
| def create_retriever(self, docs: List[Document]) -> FAISS: | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| return FAISS.from_documents(docs, embeddings) | |
| def answer_question(self, retriever: FAISS, question: str, config: Dict[str, Any]) -> str: | |
| llm = ChatGroq(model=config['model'], groq_api_key=config['groq_api_key']) | |
| qa_chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever.as_retriever()) | |
| return qa_chain.run(question) | |
| def run(self): | |
| config = self.get_configuration() | |
| sources = self.get_sources() | |
| if config['task'] == "Process Content": | |
| action_type = st.radio("Choose action type", ["Predefined", "Custom"]) | |
| if action_type == "Predefined": | |
| action = st.selectbox("Select Action", self.predefined_actions) | |
| else: | |
| action = st.text_input("Enter Custom Action", placeholder="e.g., Summarize in bullet points") | |
| else: | |
| action = "Answer questions about the content" | |
| process_button = st.button("Process Content") | |
| if process_button: | |
| if st.session_state.action_count >= 3: | |
| self.default_api_key = "" | |
| st.error("You have used all free actions. Please provide your Groq API Key in the sidebar.") | |
| elif not config['groq_api_key'].strip(): | |
| st.error("Please provide your Groq API Key in the sidebar.") | |
| elif not sources: | |
| st.error("Please select at least one source type and provide content.") | |
| elif config['task'] == "Process Content" and action_type == "Custom" and not action.strip(): | |
| st.error("Please enter a custom action.") | |
| else: | |
| with st.spinner("Processing..."): | |
| st.session_state.docs = self.process_content(sources) | |
| if not st.session_state.docs: | |
| st.error("No content was processed. Please check your inputs and try again.") | |
| elif config['task'] == "Process Content": | |
| output = self.process_documents(st.session_state.docs, action, config) | |
| st.success("Processing complete!") | |
| st.subheader(f"{action} Result") | |
| st.write(output) | |
| else: # Interactive Q&A | |
| st.session_state.retriever = self.create_retriever(st.session_state.docs) | |
| st.success("Document processed and ready for questions!") | |
| st.session_state.action_count += 1 | |
| if config['task'] == "Interactive Q&A" and st.session_state.retriever is not None: | |
| question = st.text_input("Ask a question about the document:") | |
| if question: | |
| with st.spinner("Finding answer..."): | |
| answer = self.answer_question(st.session_state.retriever, question, config) | |
| st.subheader("Answer") | |
| st.write(answer) | |
| st.divider() | |
| st.caption("Powered by LangChain and Groq") | |
| st.caption("Created by : Akshay Kumar BM") | |
| def predefined_actions(self): | |
| return [ | |
| "Summarize", "Analyze", "Review", "Critique", "Explain", | |
| "Paraphrase", "Simplify", "Elaborate", "Extract key points", | |
| "Provide an overview", "Highlight main ideas", "Create an outline", | |
| "Generate a report", "Identify themes", "List pros and cons", | |
| "Fact-check", "Create study notes", "Generate questions" | |
| ] | |
| if __name__ == "__main__": | |
| processor = ContentProcessor() | |
| processor.run() | |