# -*- coding: utf-8 -*- """Omni-RAG Analyst v10 (Stable).ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1U8IVDRfGNbCZ-1UgIv9Zn0sdRL73zKH9 """ # --- 1. Dependency Installation --- # This block checks for and installs all required libraries, # removing the need for a requirements.txt file. import os import subprocess import sys import time def install_dependencies(): """ Installs all necessary Python libraries for the application. Uses -q for a quieter installation. """ print("Starting dependency installation...") start_time = time.time() libraries = [ "gradio>=4.0.0", "transformers[torch]", "sentence-transformers", "scikit-learn", "faiss-cpu", "pypdf", "tavily-python", # Tavily Search "google-search-results", # SerpApi Search "openai", "google-generativeai", "gTTS", "soundfile" ] installed_all = True for lib in libraries: print(f"Installing {lib}...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "--disable-pip-version-check", lib]) except subprocess.CalledProcessError as e: print(f"!!! CRITICAL: Failed to install {lib}. Error: {e}") installed_all = False end_time = time.time() if installed_all: print(f"All dependencies installed in {end_time - start_time:.2f} seconds.") else: print(f"!!! WARNING: One or more dependencies failed to install. The app may not run.") # --- Run the installation --- print("Checking for required dependencies...") try: import gradio import pypdf import faiss import sentence_transformers import gtts import serpapi print("All key dependencies seem to be satisfied.") except ImportError: print("Missing one or more dependencies. Running installer...") install_dependencies() print("\n" + "="*50) print("INSTALLATION COMPLETE. If in a notebook, please RESTART THE KERNEL now.") print("="*50 + "\n") # --- 2. All Imports (Now that we know they are installed) --- import gradio as gr import pypdf import faiss import numpy as np from transformers import pipeline from sentence_transformers import SentenceTransformer from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import torch import openai import google.generativeai as genai from tavily import TavilyClient from serpapi import GoogleSearch from gtts import gTTS import logging # Set up basic logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- 3. COLAB-SPECIFIC: Mount Google Drive for Model Caching --- IN_COLAB = 'google.colab' in sys.modules MODEL_CACHE_DIR = "./hf_cache" DRIVE_MOUNT_FAILED = False if IN_COLAB: print("Running in Google Colab. Mounting Google Drive for model cache...") try: from google.colab import drive drive.mount('/content/drive') MODEL_CACHE_DIR = "/content/drive/MyDrive/colab_hf_cache" os.makedirs(MODEL_CACHE_DIR, exist_ok=True) print(f"✅ Google Drive mounted. Hugging Face models will be cached in: {MODEL_CACHE_DIR}") except Exception as e: print(f"⚠️ WARNING: Failed to mount Google Drive. Models will be re-downloaded. Error: {e}") MODEL_CACHE_DIR = "./hf_cache" DRIVE_MOUNT_FAILED = True else: print("Not running in Colab. Using local cache directory.") # --- 4. Economic Model Loading (with Graceful Degradation & Caching) --- logger.info(f"Loading local AI models (this may take a moment)...") logger.info(f"Using cache directory: {MODEL_CACHE_DIR}") # --- Summarizer & Vectorizers (Essential) --- try: logger.info("Loading vectorization models...") # This call sets the global cache directory for all of Hugging Face dense_model = SentenceTransformer( 'all-MiniLM-L6-v2', cache_folder=MODEL_CACHE_DIR ) sparse_vectorizer = TfidfVectorizer() logger.info("Loading summarizer agent...") summarizer = pipeline( "summarization", model="sshleifer/distilbart-cnn-12-6", min_length=25, max_length=150 ) except Exception as e: logger.error(f"CRITICAL: Failed to load essential models. The app may not work. Error: {e}") # --- Speech-to-Text (Optional) --- stt_enabled = False stt_pipeline = None try: logger.info("Loading Speech-to-Text (Whisper) agent...") stt_pipeline = pipeline( "automatic-speech-recognition", model="openai/whisper-base.en" ) stt_enabled = True logger.info("✅ Local STT (Whisper) model loaded successfully. Voice input enabled.") except Exception as e: logger.warning(f"⚠️ WARNING: Failed to load local STT model. Voice input will be disabled. Error: {e}") # --- 5. ETL & Vectorization Functions (Organic/Document Flow) --- def extract_text_from_pdf(pdf_file): if pdf_file is None: return "", "Please upload a PDF file." try: pdf_reader = pypdf.PdfReader(pdf_file.name) text = "".join(page.extract_text() or "" for page in pdf_reader.pages) return text, None except Exception as e: return "", f"Error reading PDF: {str(e)}" def chunk_text(text, chunk_size=500, overlap=50): tokens = text.split() chunks = [" ".join(tokens[i:i + chunk_size]) for i in range(0, len(tokens), chunk_size - overlap) if " ".join(tokens[i:i + chunk_size]).strip()] return chunks def build_vector_stores(chunks): if not chunks: return None, None, "No text chunks to index." try: logger.info(f"Building vector stores for {len(chunks)} chunks...") embeddings_dense = dense_model.encode(chunks) index_dense = faiss.IndexFlatL2(embeddings_dense.shape[1]) index_dense.add(np.array(embeddings_dense).astype('float32')) sparse_vectorizer.fit(chunks) embeddings_sparse = sparse_vectorizer.transform(chunks) logger.info("Vector stores built successfully.") return index_dense, embeddings_sparse, None except Exception as e: logger.error(f"Error building vector stores: {e}") return None, None, f"Error building vector stores: {str(e)}" # --- 6. RAG & Analysis Functions (Organic/Document Flow) --- def search_dense(query, index_dense, chunks, k=3): query_embedding = dense_model.encode([query]) _, indices = index_dense.search(np.array(query_embedding).astype('float32'), k) return [chunks[i] for i in indices[0]] def search_sparse(query, embeddings_sparse, chunks, k=3): query_embedding = sparse_vectorizer.transform([query]) similarities = cosine_similarity(query_embedding, embeddings_sparse).flatten() top_k_indices = similarities.argsort()[-k:][::-1] return [chunks[i] for i in top_k_indices] def search_hybrid(query, index_dense, embeddings_sparse, chunks, k=3): dense_results = search_dense(query, index_dense, chunks, k) sparse_results = search_sparse(query, embeddings_sparse, chunks, k) return list(dict.fromkeys(dense_results + sparse_results)) def run_analysis_agent(retrieved_chunks): if not retrieved_chunks: return "No data for analysis." full_retrieved_text = " ".join(retrieved_chunks) try: analysis_vectorizer = TfidfVectorizer(stop_words='english', max_features=10) tfidf_matrix = analysis_vectorizer.fit_transform([full_retrieved_text]) feature_names = analysis_vectorizer.get_feature_names_out() scores = tfidf_matrix.toarray().flatten() keyword_data = {"Keyword": [], "Importance Score": []} for i in scores.argsort()[-5:][::-1]: keyword_data["Keyword"].append(feature_names[i]) keyword_data["Importance Score"].append(round(float(scores[i]), 3)) return keyword_data except Exception: return "Analysis failed (not enough unique content)." def run_summary_agent(retrieved_chunks, query): """Summarization agent, now with truncation to prevent errors.""" if not retrieved_chunks: return "No relevant information found." context = " ".join(retrieved_chunks) prompt = f"Based on the following information:\n---\n{context}\n---\nPlease provide a concise answer to the query: \"{query}\"" try: # We add truncation=True to automatically cut down # inputs that are too long for the model (1024 tokens). summary = summarizer(prompt, truncation=True)[0]['summary_text'] return summary except Exception as e: logger.error(f"Summarization agent failed: {e}") return f"Summarization agent failed: {str(e)}" # --- 7. Web Search Functions (Non-Organic/Web Flow) --- def run_tavily_search_agent(query, tavily_api_key): """Uses Tavily to search the web.""" if not tavily_api_key: raise gr.Error("Tavily API Key is required for this search provider.") try: client = TavilyClient(api_key=tavily_api_key) response = client.search(query=query, search_depth="basic") context = "\n".join([f"Source: {res['url']}\nContent: {res['content']}" for res in response['results']]) return context except Exception as e: raise gr.Error(f"Tavily web search failed: {str(e)}") def run_serpapi_search_agent(query, serpapi_api_key): """Uses SerpApi to search the web.""" if not serpapi_api_key: raise gr.Error("SerpApi API Key is required for this search provider.") try: params = { "q": query, "api_key": serpapi_api_key, "engine": "google", } search = GoogleSearch(params) response = search.get_dict() snippets = [] if "answer_box" in response and "snippet" in response["answer_box"]: snippets.append(f"Source: Google Answer Box\nContent: {response['answer_box']['snippet']}") if "organic_results" in response: for res in response["organic_results"][:4]: if "snippet" in res: snippets.append(f"Source: {res['link']}\nContent: {res['snippet']}") if not snippets: return "No snippets found by SerpApi for this query." return "\n".join(snippets) except Exception as e: raise gr.Error(f"SerpApi web search failed: {str(e)}") def run_llm_synthesis_agent(context, query, llm_provider, openai_key, gemini_key, openrouter_key): system_prompt = "You are a helpful assistant. Answer the user's query based *only* on the provided context from a web search." user_prompt = f"Here is the web search context:\n---\n{context}\n---\nNow, please answer this query: \"{query}\"" try: if llm_provider == "OpenAI": if not openai_key: raise gr.Error("OpenAI API Key is required.") client = openai.OpenAI(api_key=openai_key) response = client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] ) return response.choices[0].message.content elif llm_provider == "Gemini": if not gemini_key: raise gr.Error("Gemini API Key is required.") genai.configure(api_key=gemini_key) model = genai.GenerativeModel('gemini-pro') full_prompt = f"{system_prompt}\n\n{user_prompt}" response = model.generate_content(full_prompt) return response.text elif llm_provider == "OpenRouter": if not openrouter_key: raise gr.Error("OpenRouter API Key is required.") client = openai.OpenAI( base_url="https://openrouter.ai/api/v1", api_key=openrouter_key ) response = client.chat.completions.create( model="mistralai/mistral-7b-instruct:free", messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] ) return response.choices[0].message.content except Exception as e: logger.error(f"LLM Synthesis failed for {llm_provider}: {e}") raise gr.Error(f"LLM Synthesis failed: {str(e)}") # --- 8. Voice I/O Functions (Economic & Robust) --- def transcribe_audio(audio_filepath): """Speech-to-Text: Transcribes audio file to text using small Whisper.""" if not stt_enabled or stt_pipeline is None: gr.Warning("STT model is not loaded. Cannot transcribe audio.") return "" if audio_filepath is None: return "" try: text = stt_pipeline(audio_filepath)["text"] return text except Exception as e: gr.Warning(f"STT failed during transcription: {str(e)}") return "" def synthesize_speech(text): """Text-to-Speech: Uses gTTS API (zero local compute). Fails gracefully.""" if not text: return None, gr.Button(visible=False), gr.Audio(visible=False) try: tts = gTTS(text) tts.save("response_audio.mp3") return "response_audio.mp3", gr.Button(visible=False), gr.Audio(value="response_audio.mp3", autoplay=True, visible=True) except Exception as e: gr.Warning(f"TTS failed (e.g., no internet connection): {str(e)}") return None, gr.Button(visible=False), gr.Audio(visible=False) # --- 9. Main Gradio Functions (Controller Logic) --- document_cache = {"filename": None, "chunks": [], "index_dense": None, "embeddings_sparse": None} def process_document(pdf_file, progress=gr.Progress()): if pdf_file is None: return "Please upload a PDF.", "Ask a question...", "Analyze Query", gr.Tabs(visible=False), "Web Search" if document_cache["filename"] == pdf_file.name: return f"✅ Document '{pdf_file.name}' is ready.", "Ask a question...", gr.Button(interactive=True), gr.Tabs(visible=True), "Document" progress(0, desc="Extracting text...") text, error = extract_text_from_pdf(pdf_file) if error: return f"Error: {error}", "Ask a question...", gr.Button(interactive=False), gr.Tabs(visible=False), "Web Search" progress(0.3, desc="Chunking text...") chunks = chunk_text(text) if not chunks: return "Error: No text chunks found.", "Ask a question...", gr.Button(interactive=False), gr.Tabs(visible=False), "Web Search" progress(0.6, desc=f"Building vector stores for {len(chunks)} chunks...") index_dense, embeddings_sparse, error = build_vector_stores(chunks) if error: return f"Error: {error}", "Ask a question...", gr.Button(interactive=False), gr.Tabs(visible=False), "Web Search" document_cache.update({"filename": pdf_file.name, "chunks": chunks, "index_dense": index_dense, "embeddings_sparse": embeddings_sparse}) status = f"✅ Success: Indexed '{pdf_file.name}'. Ready to chat." return status, "Ask a question about the document...", gr.Button(interactive=True), gr.Tabs(visible=True), "Document" def run_main_query(query, search_type, query_source, openai_key, gemini_key, openrouter_key, search_provider, tavily_key, serpapi_key, llm_provider): if not query: raise gr.Error("Please enter a query.") yield "Processing...", None, None, gr.Button(visible=False), gr.Audio(visible=False) try: if query_source == "Document": if not document_cache["index_dense"]: raise gr.Error("Please upload and process a document first.") yield "1. 💬 Running 'Research Agent' on document...", None, None, gr.Button(visible=False), gr.Audio(visible=False) if search_type == "Hybrid (Recommended)": chunks = search_hybrid(query, document_cache["index_dense"], document_cache["embeddings_sparse"], document_cache["chunks"]) elif search_type == "Dense (Semantic)": chunks = search_dense(query, document_cache["index_dense"], document_cache["chunks"]) else: chunks = search_sparse(query, document_cache["embeddings_sparse"], document_cache["chunks"]) yield "2. 🧠 Running 'Summary Agent' (local)...", None, None, gr.Button(visible=False), gr.Audio(visible=False) answer = run_summary_agent(chunks, query) yield "3. 📊 Running 'Analysis Agent' (local)...", answer, None, gr.Button(visible=False), gr.Audio(visible=False) analysis = run_analysis_agent(chunks) yield "✅ Document query complete.", answer, analysis, gr.Button(visible=True, interactive=True), gr.Audio(visible=False) else: yield f"1. 💬 Running 'Web Search Agent' ({search_provider})...", None, None, gr.Button(visible=False), gr.Audio(visible=False) if search_provider == "Tavily": web_context = run_tavily_search_agent(query, tavily_key) elif search_provider == "SerpApi": web_context = run_serpapi_search_agent(query, serpapi_key) else: raise gr.Error("Invalid search provider selected.") yield f"2. 🧠 Running 'Web Synthesis Agent' ({llm_provider})...", None, None, gr.Button(visible=False), gr.Audio(visible=False) answer = run_llm_synthesis_agent(web_context, query, llm_provider, openai_key, gemini_key, openrouter_key) yield "✅ Web query complete.", answer, None, gr.Button(visible=True, interactive=True), gr.Audio(visible=False) except gr.Error as e: yield f"Error: {e}", None, None, gr.Button(visible=False), gr.Audio(visible=False) except Exception as e: logger.error(f"An unexpected error occurred: {e}") yield f"An unexpected error occurred: {str(e)}", None, None, gr.Button(visible=False), gr.Audio(visible=False) # --- 10. Gradio Interface Definition --- with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="orange")) as demo: gr.Markdown( """ # 🚀 Omni-RAG Analyst v10 (Stable) *A multi-source, multi-modal demo by **Natwar Upadhyay*** *OCI Data Science & AI Vector Search Certified Professional* ### What problem does this solve? Generic chatbots give generic answers. This tool gives you answers based on **specific information** from two sources: 1. **Your Documents (Organic):** Upload a PDF to chat with your own data. 2. **The Live Web (Non-Organic):** Connects to Google (via SerpApi) or Tavily to answer up-to-the-minute questions. It showcases a full **ETL -> Vector Search -> RAG** pipeline using economic, resource-friendly models. """ ) if IN_COLAB and DRIVE_MOUNT_FAILED: gr.Markdown( """
⚠️ **Google Drive Mount Failed:** Your Colab session couldn't connect to Google Drive (you may need to grant permissions). The app will still work, but the large AI models (2GB+) will be **re-downloaded** for this session.
""" ) with gr.Accordion("Step 1: API Key Configuration (Required for Web Search)", open=False): gr.Markdown( """ To use the **Web Search** feature, you need API keys for **one** Search Provider and **one** LLM Synthesis provider. """ ) with gr.Row(): with gr.Column(): gr.Markdown("#### (A) Search Provider Keys") search_provider_dropdown = gr.Dropdown( label="Choose Search Provider", choices=["Tavily", "SerpApi"], value="Tavily" ) tavily_key_box = gr.Textbox(label="Tavily API Key", placeholder="tvly-...", type="password") serpapi_key_box = gr.Textbox(label="SerpApi API Key", placeholder="...", type="password") with gr.Column(): gr.Markdown("#### (B) LLM Synthesis Keys") llm_provider_dropdown = gr.Dropdown( label="Choose LLM Provider", choices=["OpenAI", "Gemini", "OpenRouter"], value="OpenAI" ) openai_key_box = gr.Textbox(label="OpenAI API Key", placeholder="sk-...", type="password") gemini_key_box = gr.Textbox(label="Gemini API Key", placeholder="AIzaSy...", type="password") openrouter_key_box = gr.Textbox(label="OpenRouter API Key", placeholder="sk-or-...", type="password") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Step 2: Load Document (For 'Document' Source)") pdf_upload = gr.File(label="Upload PDF", file_types=[".pdf"]) upload_status = gr.Textbox(label="Processing Status", interactive=False, lines=3) with gr.Column(scale=2): gr.Markdown("### Step 3: Configure & Query") stt_audio = gr.Audio( label="🎙️ Record Query (or type below)", sources=["microphone"], type="filepath", visible=stt_enabled ) if not stt_enabled: gr.Markdown("*(Local voice input (STT) failed to load. Please type your query.)*") query_box = gr.Textbox(label="Query", placeholder="Ask a question...", interactive=True) with gr.Row(): query_source_radio = gr.Radio( label="Query Source", choices=["Document", "Web Search"], value="Web Search", interactive=True ) search_type_dropdown = gr.Dropdown( label="Document Search Strategy", choices=["Hybrid (Recommended)", "Dense (Semantic)", "Sparse (Keyword)"], value="Hybrid (Recommended)", info=" (Only applies if 'Document' is selected)" ) analyze_button = gr.Button("Analyze Query", variant="primary", interactive=True) with gr.Tabs(visible=True) as result_tabs: with gr.TabItem("Synthesized Answer"): answer_output = gr.Textbox(label="Answer (from AI Agent)", lines=5) speak_button = gr.Button("🔊 Speak Answer", visible=False) audio_output = gr.Audio(label="AI Voice Output", autoplay=False, visible=False, type="filepath") with gr.TabItem("Document Context Analysis"): analysis_output = gr.Dataframe(label="Keyword Analysis (from 'Analysis Agent')") gr.Markdown("*This tab only populates when 'Document' is the query source.*") # --- 11. Wire up the components --- stt_audio.stop_recording( fn=transcribe_audio, inputs=[stt_audio], outputs=[query_box] ) pdf_upload.upload( fn=process_document, inputs=[pdf_upload], outputs=[upload_status, query_box, analyze_button, result_tabs, query_source_radio], show_progress="full" ) analyze_button.click( fn=run_main_query, inputs=[ query_box, search_type_dropdown, query_source_radio, openai_key_box, gemini_key_box, openrouter_key_box, search_provider_dropdown, tavily_key_box, serpapi_key_box, llm_provider_dropdown ], # --- THIS IS THE FIX --- outputs=[upload_status, answer_output, analysis_output, speak_button, audio_output] ) speak_button.click( fn=synthesize_speech, inputs=[answer_output], outputs=[audio_output, speak_button, audio_output] ) if __name__ == "__main__": demo.launch(debug=True)