Natwar's picture
Upload app.py
0a2dccb verified
# -*- coding: utf-8 -*-
"""Omni-RAG Analyst v10 (Stable).ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1U8IVDRfGNbCZ-1UgIv9Zn0sdRL73zKH9
"""
# --- 1. Dependency Installation ---
# This block checks for and installs all required libraries,
# removing the need for a requirements.txt file.
import os
import subprocess
import sys
import time
def install_dependencies():
"""
Installs all necessary Python libraries for the application.
Uses -q for a quieter installation.
"""
print("Starting dependency installation...")
start_time = time.time()
libraries = [
"gradio>=4.0.0",
"transformers[torch]",
"sentence-transformers",
"scikit-learn",
"faiss-cpu",
"pypdf",
"tavily-python", # Tavily Search
"google-search-results", # SerpApi Search
"openai",
"google-generativeai",
"gTTS",
"soundfile"
]
installed_all = True
for lib in libraries:
print(f"Installing {lib}...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "--disable-pip-version-check", lib])
except subprocess.CalledProcessError as e:
print(f"!!! CRITICAL: Failed to install {lib}. Error: {e}")
installed_all = False
end_time = time.time()
if installed_all:
print(f"All dependencies installed in {end_time - start_time:.2f} seconds.")
else:
print(f"!!! WARNING: One or more dependencies failed to install. The app may not run.")
# --- Run the installation ---
print("Checking for required dependencies...")
try:
import gradio
import pypdf
import faiss
import sentence_transformers
import gtts
import serpapi
print("All key dependencies seem to be satisfied.")
except ImportError:
print("Missing one or more dependencies. Running installer...")
install_dependencies()
print("\n" + "="*50)
print("INSTALLATION COMPLETE. If in a notebook, please RESTART THE KERNEL now.")
print("="*50 + "\n")
# --- 2. All Imports (Now that we know they are installed) ---
import gradio as gr
import pypdf
import faiss
import numpy as np
from transformers import pipeline
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import torch
import openai
import google.generativeai as genai
from tavily import TavilyClient
from serpapi import GoogleSearch
from gtts import gTTS
import logging
# Set up basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 3. COLAB-SPECIFIC: Mount Google Drive for Model Caching ---
IN_COLAB = 'google.colab' in sys.modules
MODEL_CACHE_DIR = "./hf_cache"
DRIVE_MOUNT_FAILED = False
if IN_COLAB:
print("Running in Google Colab. Mounting Google Drive for model cache...")
try:
from google.colab import drive
drive.mount('/content/drive')
MODEL_CACHE_DIR = "/content/drive/MyDrive/colab_hf_cache"
os.makedirs(MODEL_CACHE_DIR, exist_ok=True)
print(f"✅ Google Drive mounted. Hugging Face models will be cached in: {MODEL_CACHE_DIR}")
except Exception as e:
print(f"⚠️ WARNING: Failed to mount Google Drive. Models will be re-downloaded. Error: {e}")
MODEL_CACHE_DIR = "./hf_cache"
DRIVE_MOUNT_FAILED = True
else:
print("Not running in Colab. Using local cache directory.")
# --- 4. Economic Model Loading (with Graceful Degradation & Caching) ---
logger.info(f"Loading local AI models (this may take a moment)...")
logger.info(f"Using cache directory: {MODEL_CACHE_DIR}")
# --- Summarizer & Vectorizers (Essential) ---
try:
logger.info("Loading vectorization models...")
# This call sets the global cache directory for all of Hugging Face
dense_model = SentenceTransformer(
'all-MiniLM-L6-v2',
cache_folder=MODEL_CACHE_DIR
)
sparse_vectorizer = TfidfVectorizer()
logger.info("Loading summarizer agent...")
summarizer = pipeline(
"summarization",
model="sshleifer/distilbart-cnn-12-6",
min_length=25,
max_length=150
)
except Exception as e:
logger.error(f"CRITICAL: Failed to load essential models. The app may not work. Error: {e}")
# --- Speech-to-Text (Optional) ---
stt_enabled = False
stt_pipeline = None
try:
logger.info("Loading Speech-to-Text (Whisper) agent...")
stt_pipeline = pipeline(
"automatic-speech-recognition",
model="openai/whisper-base.en"
)
stt_enabled = True
logger.info("✅ Local STT (Whisper) model loaded successfully. Voice input enabled.")
except Exception as e:
logger.warning(f"⚠️ WARNING: Failed to load local STT model. Voice input will be disabled. Error: {e}")
# --- 5. ETL & Vectorization Functions (Organic/Document Flow) ---
def extract_text_from_pdf(pdf_file):
if pdf_file is None: return "", "Please upload a PDF file."
try:
pdf_reader = pypdf.PdfReader(pdf_file.name)
text = "".join(page.extract_text() or "" for page in pdf_reader.pages)
return text, None
except Exception as e:
return "", f"Error reading PDF: {str(e)}"
def chunk_text(text, chunk_size=500, overlap=50):
tokens = text.split()
chunks = [" ".join(tokens[i:i + chunk_size]) for i in range(0, len(tokens), chunk_size - overlap) if " ".join(tokens[i:i + chunk_size]).strip()]
return chunks
def build_vector_stores(chunks):
if not chunks: return None, None, "No text chunks to index."
try:
logger.info(f"Building vector stores for {len(chunks)} chunks...")
embeddings_dense = dense_model.encode(chunks)
index_dense = faiss.IndexFlatL2(embeddings_dense.shape[1])
index_dense.add(np.array(embeddings_dense).astype('float32'))
sparse_vectorizer.fit(chunks)
embeddings_sparse = sparse_vectorizer.transform(chunks)
logger.info("Vector stores built successfully.")
return index_dense, embeddings_sparse, None
except Exception as e:
logger.error(f"Error building vector stores: {e}")
return None, None, f"Error building vector stores: {str(e)}"
# --- 6. RAG & Analysis Functions (Organic/Document Flow) ---
def search_dense(query, index_dense, chunks, k=3):
query_embedding = dense_model.encode([query])
_, indices = index_dense.search(np.array(query_embedding).astype('float32'), k)
return [chunks[i] for i in indices[0]]
def search_sparse(query, embeddings_sparse, chunks, k=3):
query_embedding = sparse_vectorizer.transform([query])
similarities = cosine_similarity(query_embedding, embeddings_sparse).flatten()
top_k_indices = similarities.argsort()[-k:][::-1]
return [chunks[i] for i in top_k_indices]
def search_hybrid(query, index_dense, embeddings_sparse, chunks, k=3):
dense_results = search_dense(query, index_dense, chunks, k)
sparse_results = search_sparse(query, embeddings_sparse, chunks, k)
return list(dict.fromkeys(dense_results + sparse_results))
def run_analysis_agent(retrieved_chunks):
if not retrieved_chunks: return "No data for analysis."
full_retrieved_text = " ".join(retrieved_chunks)
try:
analysis_vectorizer = TfidfVectorizer(stop_words='english', max_features=10)
tfidf_matrix = analysis_vectorizer.fit_transform([full_retrieved_text])
feature_names = analysis_vectorizer.get_feature_names_out()
scores = tfidf_matrix.toarray().flatten()
keyword_data = {"Keyword": [], "Importance Score": []}
for i in scores.argsort()[-5:][::-1]:
keyword_data["Keyword"].append(feature_names[i])
keyword_data["Importance Score"].append(round(float(scores[i]), 3))
return keyword_data
except Exception:
return "Analysis failed (not enough unique content)."
def run_summary_agent(retrieved_chunks, query):
"""Summarization agent, now with truncation to prevent errors."""
if not retrieved_chunks: return "No relevant information found."
context = " ".join(retrieved_chunks)
prompt = f"Based on the following information:\n---\n{context}\n---\nPlease provide a concise answer to the query: \"{query}\""
try:
# We add truncation=True to automatically cut down
# inputs that are too long for the model (1024 tokens).
summary = summarizer(prompt, truncation=True)[0]['summary_text']
return summary
except Exception as e:
logger.error(f"Summarization agent failed: {e}")
return f"Summarization agent failed: {str(e)}"
# --- 7. Web Search Functions (Non-Organic/Web Flow) ---
def run_tavily_search_agent(query, tavily_api_key):
"""Uses Tavily to search the web."""
if not tavily_api_key:
raise gr.Error("Tavily API Key is required for this search provider.")
try:
client = TavilyClient(api_key=tavily_api_key)
response = client.search(query=query, search_depth="basic")
context = "\n".join([f"Source: {res['url']}\nContent: {res['content']}" for res in response['results']])
return context
except Exception as e:
raise gr.Error(f"Tavily web search failed: {str(e)}")
def run_serpapi_search_agent(query, serpapi_api_key):
"""Uses SerpApi to search the web."""
if not serpapi_api_key:
raise gr.Error("SerpApi API Key is required for this search provider.")
try:
params = {
"q": query,
"api_key": serpapi_api_key,
"engine": "google",
}
search = GoogleSearch(params)
response = search.get_dict()
snippets = []
if "answer_box" in response and "snippet" in response["answer_box"]:
snippets.append(f"Source: Google Answer Box\nContent: {response['answer_box']['snippet']}")
if "organic_results" in response:
for res in response["organic_results"][:4]:
if "snippet" in res:
snippets.append(f"Source: {res['link']}\nContent: {res['snippet']}")
if not snippets:
return "No snippets found by SerpApi for this query."
return "\n".join(snippets)
except Exception as e:
raise gr.Error(f"SerpApi web search failed: {str(e)}")
def run_llm_synthesis_agent(context, query, llm_provider, openai_key, gemini_key, openrouter_key):
system_prompt = "You are a helpful assistant. Answer the user's query based *only* on the provided context from a web search."
user_prompt = f"Here is the web search context:\n---\n{context}\n---\nNow, please answer this query: \"{query}\""
try:
if llm_provider == "OpenAI":
if not openai_key: raise gr.Error("OpenAI API Key is required.")
client = openai.OpenAI(api_key=openai_key)
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
)
return response.choices[0].message.content
elif llm_provider == "Gemini":
if not gemini_key: raise gr.Error("Gemini API Key is required.")
genai.configure(api_key=gemini_key)
model = genai.GenerativeModel('gemini-pro')
full_prompt = f"{system_prompt}\n\n{user_prompt}"
response = model.generate_content(full_prompt)
return response.text
elif llm_provider == "OpenRouter":
if not openrouter_key: raise gr.Error("OpenRouter API Key is required.")
client = openai.OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=openrouter_key
)
response = client.chat.completions.create(
model="mistralai/mistral-7b-instruct:free",
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"LLM Synthesis failed for {llm_provider}: {e}")
raise gr.Error(f"LLM Synthesis failed: {str(e)}")
# --- 8. Voice I/O Functions (Economic & Robust) ---
def transcribe_audio(audio_filepath):
"""Speech-to-Text: Transcribes audio file to text using small Whisper."""
if not stt_enabled or stt_pipeline is None:
gr.Warning("STT model is not loaded. Cannot transcribe audio.")
return ""
if audio_filepath is None:
return ""
try:
text = stt_pipeline(audio_filepath)["text"]
return text
except Exception as e:
gr.Warning(f"STT failed during transcription: {str(e)}")
return ""
def synthesize_speech(text):
"""Text-to-Speech: Uses gTTS API (zero local compute). Fails gracefully."""
if not text:
return None, gr.Button(visible=False), gr.Audio(visible=False)
try:
tts = gTTS(text)
tts.save("response_audio.mp3")
return "response_audio.mp3", gr.Button(visible=False), gr.Audio(value="response_audio.mp3", autoplay=True, visible=True)
except Exception as e:
gr.Warning(f"TTS failed (e.g., no internet connection): {str(e)}")
return None, gr.Button(visible=False), gr.Audio(visible=False)
# --- 9. Main Gradio Functions (Controller Logic) ---
document_cache = {"filename": None, "chunks": [], "index_dense": None, "embeddings_sparse": None}
def process_document(pdf_file, progress=gr.Progress()):
if pdf_file is None:
return "Please upload a PDF.", "Ask a question...", "Analyze Query", gr.Tabs(visible=False), "Web Search"
if document_cache["filename"] == pdf_file.name:
return f"✅ Document '{pdf_file.name}' is ready.", "Ask a question...", gr.Button(interactive=True), gr.Tabs(visible=True), "Document"
progress(0, desc="Extracting text...")
text, error = extract_text_from_pdf(pdf_file)
if error: return f"Error: {error}", "Ask a question...", gr.Button(interactive=False), gr.Tabs(visible=False), "Web Search"
progress(0.3, desc="Chunking text...")
chunks = chunk_text(text)
if not chunks:
return "Error: No text chunks found.", "Ask a question...", gr.Button(interactive=False), gr.Tabs(visible=False), "Web Search"
progress(0.6, desc=f"Building vector stores for {len(chunks)} chunks...")
index_dense, embeddings_sparse, error = build_vector_stores(chunks)
if error: return f"Error: {error}", "Ask a question...", gr.Button(interactive=False), gr.Tabs(visible=False), "Web Search"
document_cache.update({"filename": pdf_file.name, "chunks": chunks, "index_dense": index_dense, "embeddings_sparse": embeddings_sparse})
status = f"✅ Success: Indexed '{pdf_file.name}'. Ready to chat."
return status, "Ask a question about the document...", gr.Button(interactive=True), gr.Tabs(visible=True), "Document"
def run_main_query(query, search_type, query_source,
openai_key, gemini_key, openrouter_key,
search_provider, tavily_key, serpapi_key,
llm_provider):
if not query:
raise gr.Error("Please enter a query.")
yield "Processing...", None, None, gr.Button(visible=False), gr.Audio(visible=False)
try:
if query_source == "Document":
if not document_cache["index_dense"]:
raise gr.Error("Please upload and process a document first.")
yield "1. 💬 Running 'Research Agent' on document...", None, None, gr.Button(visible=False), gr.Audio(visible=False)
if search_type == "Hybrid (Recommended)":
chunks = search_hybrid(query, document_cache["index_dense"], document_cache["embeddings_sparse"], document_cache["chunks"])
elif search_type == "Dense (Semantic)":
chunks = search_dense(query, document_cache["index_dense"], document_cache["chunks"])
else:
chunks = search_sparse(query, document_cache["embeddings_sparse"], document_cache["chunks"])
yield "2. 🧠 Running 'Summary Agent' (local)...", None, None, gr.Button(visible=False), gr.Audio(visible=False)
answer = run_summary_agent(chunks, query)
yield "3. 📊 Running 'Analysis Agent' (local)...", answer, None, gr.Button(visible=False), gr.Audio(visible=False)
analysis = run_analysis_agent(chunks)
yield "✅ Document query complete.", answer, analysis, gr.Button(visible=True, interactive=True), gr.Audio(visible=False)
else:
yield f"1. 💬 Running 'Web Search Agent' ({search_provider})...", None, None, gr.Button(visible=False), gr.Audio(visible=False)
if search_provider == "Tavily":
web_context = run_tavily_search_agent(query, tavily_key)
elif search_provider == "SerpApi":
web_context = run_serpapi_search_agent(query, serpapi_key)
else:
raise gr.Error("Invalid search provider selected.")
yield f"2. 🧠 Running 'Web Synthesis Agent' ({llm_provider})...", None, None, gr.Button(visible=False), gr.Audio(visible=False)
answer = run_llm_synthesis_agent(web_context, query, llm_provider, openai_key, gemini_key, openrouter_key)
yield "✅ Web query complete.", answer, None, gr.Button(visible=True, interactive=True), gr.Audio(visible=False)
except gr.Error as e:
yield f"Error: {e}", None, None, gr.Button(visible=False), gr.Audio(visible=False)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
yield f"An unexpected error occurred: {str(e)}", None, None, gr.Button(visible=False), gr.Audio(visible=False)
# --- 10. Gradio Interface Definition ---
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="orange")) as demo:
gr.Markdown(
"""
# 🚀 Omni-RAG Analyst v10 (Stable)
*A multi-source, multi-modal demo by **Natwar Upadhyay***
*OCI Data Science & AI Vector Search Certified Professional*
### What problem does this solve?
Generic chatbots give generic answers. This tool gives you answers based on **specific information** from two sources:
1. **Your Documents (Organic):** Upload a PDF to chat with your own data.
2. **The Live Web (Non-Organic):** Connects to Google (via SerpApi) or Tavily to answer up-to-the-minute questions.
It showcases a full **ETL -> Vector Search -> RAG** pipeline using economic, resource-friendly models.
"""
)
if IN_COLAB and DRIVE_MOUNT_FAILED:
gr.Markdown(
"""
<div style="background-color: #FFF3CD; border: 1px solid #FFEEBA; padding: 10px; border-radius: 5px;">
⚠️ **Google Drive Mount Failed:** Your Colab session couldn't connect to Google Drive (you may need to grant permissions).
The app will still work, but the large AI models (2GB+) will be **re-downloaded** for this session.
</div>
"""
)
with gr.Accordion("Step 1: API Key Configuration (Required for Web Search)", open=False):
gr.Markdown(
"""
To use the **Web Search** feature, you need API keys for **one** Search Provider and **one** LLM Synthesis provider.
"""
)
with gr.Row():
with gr.Column():
gr.Markdown("#### (A) Search Provider Keys")
search_provider_dropdown = gr.Dropdown(
label="Choose Search Provider",
choices=["Tavily", "SerpApi"],
value="Tavily"
)
tavily_key_box = gr.Textbox(label="Tavily API Key", placeholder="tvly-...", type="password")
serpapi_key_box = gr.Textbox(label="SerpApi API Key", placeholder="...", type="password")
with gr.Column():
gr.Markdown("#### (B) LLM Synthesis Keys")
llm_provider_dropdown = gr.Dropdown(
label="Choose LLM Provider",
choices=["OpenAI", "Gemini", "OpenRouter"],
value="OpenAI"
)
openai_key_box = gr.Textbox(label="OpenAI API Key", placeholder="sk-...", type="password")
gemini_key_box = gr.Textbox(label="Gemini API Key", placeholder="AIzaSy...", type="password")
openrouter_key_box = gr.Textbox(label="OpenRouter API Key", placeholder="sk-or-...", type="password")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Step 2: Load Document (For 'Document' Source)")
pdf_upload = gr.File(label="Upload PDF", file_types=[".pdf"])
upload_status = gr.Textbox(label="Processing Status", interactive=False, lines=3)
with gr.Column(scale=2):
gr.Markdown("### Step 3: Configure & Query")
stt_audio = gr.Audio(
label="🎙️ Record Query (or type below)",
sources=["microphone"],
type="filepath",
visible=stt_enabled
)
if not stt_enabled:
gr.Markdown("*(Local voice input (STT) failed to load. Please type your query.)*")
query_box = gr.Textbox(label="Query", placeholder="Ask a question...", interactive=True)
with gr.Row():
query_source_radio = gr.Radio(
label="Query Source",
choices=["Document", "Web Search"],
value="Web Search",
interactive=True
)
search_type_dropdown = gr.Dropdown(
label="Document Search Strategy",
choices=["Hybrid (Recommended)", "Dense (Semantic)", "Sparse (Keyword)"],
value="Hybrid (Recommended)",
info=" (Only applies if 'Document' is selected)"
)
analyze_button = gr.Button("Analyze Query", variant="primary", interactive=True)
with gr.Tabs(visible=True) as result_tabs:
with gr.TabItem("Synthesized Answer"):
answer_output = gr.Textbox(label="Answer (from AI Agent)", lines=5)
speak_button = gr.Button("🔊 Speak Answer", visible=False)
audio_output = gr.Audio(label="AI Voice Output", autoplay=False, visible=False, type="filepath")
with gr.TabItem("Document Context Analysis"):
analysis_output = gr.Dataframe(label="Keyword Analysis (from 'Analysis Agent')")
gr.Markdown("*This tab only populates when 'Document' is the query source.*")
# --- 11. Wire up the components ---
stt_audio.stop_recording(
fn=transcribe_audio,
inputs=[stt_audio],
outputs=[query_box]
)
pdf_upload.upload(
fn=process_document,
inputs=[pdf_upload],
outputs=[upload_status, query_box, analyze_button, result_tabs, query_source_radio],
show_progress="full"
)
analyze_button.click(
fn=run_main_query,
inputs=[
query_box, search_type_dropdown, query_source_radio,
openai_key_box, gemini_key_box, openrouter_key_box,
search_provider_dropdown, tavily_key_box, serpapi_key_box,
llm_provider_dropdown
],
# --- THIS IS THE FIX ---
outputs=[upload_status, answer_output, analysis_output, speak_button, audio_output]
)
speak_button.click(
fn=synthesize_speech,
inputs=[answer_output],
outputs=[audio_output, speak_button, audio_output]
)
if __name__ == "__main__":
demo.launch(debug=True)