Spaces:
Running
Running
| import os | |
| import re | |
| import time | |
| import gdown | |
| import gradio as gr | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_groq import ChatGroq | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_core.output_parsers import StrOutputParser | |
| # ========================================== | |
| # 1. GROQ API KEY CONFIGURATION | |
| # ========================================== | |
| # ========================================== | |
| # GROQ API KEY CONFIGURATION (For HF Spaces Only) | |
| # ========================================== | |
| print("π Checking GROQ API Key...") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| if not GROQ_API_KEY: | |
| print("β οΈ No GROQ_API_KEY found in environment variables") | |
| if not GROQ_API_KEY: | |
| raise ValueError(""" | |
| β GROQ_API_KEY is missing! | |
| Please go to: | |
| Space Settings β Secrets | |
| and add: | |
| Key : GROQ_API_KEY | |
| Value : your_actual_groq_api_key_here | |
| """) | |
| os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
| print("β GROQ API Key loaded successfully!") | |
| # ========================================== | |
| # 2. CONFIGURATION | |
| # ========================================== | |
| links_to_process = [ | |
| " https://drive.google.com/file/d/1ZROphV1o6IoD5T1x9h8u9FfalKgZSHW9/view?usp=sharing", | |
| "https://drive.google.com/file/d/1r53tlaW8kvEh0S9yV17UNb3fR89PkMS7/view?usp=sharing", | |
| "https://drive.google.com/file/d/14UUOK6CZUmcoJt_zaj3Gt7lI99LOAqIg/view?usp=sharing", | |
| "https://drive.google.com/file/d/1mNtQY8-iq6LkjkFNfjVf_xATHYr8XW-i/view?usp=sharing", | |
| "https://drive.google.com/file/d/1jTD7t8-HMwE7qOYQ5r12vFQJCiE7AO2-/view?usp=sharing", | |
| "https://drive.google.com/file/d/1pQRXNlictDTa7yJ03j_JmuDr7AjcugWA/view?usp=sharing" | |
| ] | |
| output_dir = 'knowledge_base' | |
| os.makedirs(output_dir, exist_ok=True) | |
| # ========================================== | |
| # 3. HELPER: EXTRACT GOOGLE DRIVE FILE ID | |
| # ========================================== | |
| def extract_file_id(url): | |
| """Extract file ID from Google Drive links""" | |
| match = re.search(r'/d/([a-zA-Z0-9_-]+)', url) | |
| if match: | |
| return match.group(1) | |
| match = re.search(r'id=([a-zA-Z0-9_-]+)', url) | |
| return match.group(1) if match else None | |
| # ========================================== | |
| # 4. BUILD VECTOR DATABASE | |
| # ========================================== | |
| def build_vector_db(links): | |
| print(f"π₯ Starting download of {len(links)} documents...") | |
| downloaded_files = 0 | |
| for link in links: | |
| try: | |
| file_id = extract_file_id(link) | |
| if not file_id: | |
| print(f"β οΈ Invalid link: {link}") | |
| continue | |
| direct_url = f"https://drive.google.com/uc?id={file_id}" | |
| output_path = os.path.join(output_dir, f"{file_id}.pdf") | |
| print(f"π Downloading: {file_id}") | |
| gdown.download( | |
| url=direct_url, | |
| output=output_path, | |
| quiet=True, | |
| use_cookies=False | |
| ) | |
| downloaded_files += 1 | |
| time.sleep(1.5) | |
| except Exception as e: | |
| print(f"β Failed to download {link}: {e}") | |
| if downloaded_files == 0: | |
| raise ValueError("β No files downloaded. Check sharing settings ('Anyone with the link')") | |
| # Load PDFs | |
| all_docs = [] | |
| for filename in os.listdir(output_dir): | |
| if filename.endswith(".pdf"): | |
| file_path = os.path.join(output_dir, filename) | |
| try: | |
| loader = PyPDFLoader(file_path) | |
| all_docs.extend(loader.load()) | |
| print(f"β Loaded: {filename}") | |
| except Exception as e: | |
| print(f"β οΈ Error loading {filename}: {e}") | |
| print(f"β Total PDFs loaded: {len(all_docs)}") | |
| # Text Splitting | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, | |
| chunk_overlap=100 | |
| ) | |
| chunks = text_splitter.split_documents(all_docs) | |
| print(f"π§© Created {len(chunks)} chunks.") | |
| # Embeddings & Vector Store | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| vector_db = FAISS.from_documents(chunks, embeddings) | |
| print("π Vector Database Created Successfully!") | |
| return vector_db | |
| # ========================================== | |
| # 5. INITIALIZE RAG SYSTEM | |
| # ========================================== | |
| vector_store = build_vector_db(links_to_process) | |
| retriever = vector_store.as_retriever(search_kwargs={"k": 4}) | |
| llm = ChatGroq( | |
| model="llama-3.1-8b-instant", | |
| temperature=0.3, | |
| max_tokens=1024 | |
| ) | |
| prompt_template = """Answer the question professionally and accurately based ONLY on the context below. | |
| If the answer is not in the context, say "I don't have enough information from the provided documents." | |
| Context: | |
| {context} | |
| Question: {question} | |
| Answer:""" | |
| prompt = ChatPromptTemplate.from_template(prompt_template) | |
| rag_chain = ( | |
| {"context": retriever, "question": RunnablePassthrough()} | |
| | prompt | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| # ========================================== | |
| # 6. GRADIO INTERFACE | |
| # ========================================== | |
| # ========================================== | |
| # 6. GRADIO INTERFACE | |
| # ========================================== | |
| def process_query(query): | |
| if not query or not query.strip(): | |
| return "**Please enter a question.**" | |
| try: | |
| print(f"π Processing query: {query[:80]}...") # For debugging | |
| result = rag_chain.invoke(query) | |
| return result | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "rate limit" in error_str or "429" in error_str: | |
| return "β οΈ **Rate limit reached.** Please wait 20-30 seconds and try again." | |
| elif "api key" in error_str: | |
| return "β API Key issue. Please check GROQ_API_KEY in Secrets." | |
| else: | |
| return f"β Error: {str(e)}" | |
| # Custom CSS | |
| custom_css = """ | |
| .gradio-container { max-width: 1000px; margin: auto; } | |
| """ | |
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo: | |
| gr.Markdown("# ποΈ **DocMind Intelligence**") | |
| gr.Markdown("### Multi-Document RAG System | Powered by Groq + LangChain") | |
| with gr.Row(): | |
| query_input = gr.Textbox( | |
| label="Ask your question", | |
| placeholder="Type your question here about the uploaded documents...", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("π Get Answer", variant="primary", size="large") | |
| output = gr.Markdown(label="Response", value="_Waiting for your question..._") | |
| submit_btn.click(process_query, inputs=query_input, outputs=output) | |
| query_input.submit(process_query, inputs=query_input, outputs=output) | |
| gr.Markdown("---\n**Tip:** Be clear and specific in your questions for best results.") | |
| demo.launch() |