NexusRAG / app.py
saandip5's picture
Update app.py
ba047ca verified
import os
import io
import logging
import json
import re
import traceback
from typing import Optional, List
import gradio as gr
from langchain_core.prompts import PromptTemplate
from langchain_groq import ChatGroq
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langdetect import detect, LangDetectException
from deep_translator import GoogleTranslator
import torch
from PIL import Image
import fitz
from transformers import BlipProcessor, BlipForConditionalGeneration
# ===== LOGGING =====
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ===== CONFIG =====
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
raise ValueError("❌ Missing GROQ_API_KEY - set in Settings → Secrets")
DB_PATH = "./chroma_db"
DEVICE = "cpu"
os.makedirs(DB_PATH, exist_ok=True)
# ===== INITIALIZE COMPONENTS =====
logger.info("Initializing components...")
# Initialize BLIP for image captioning
try:
logger.info("Loading BLIP image captioning model...")
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(DEVICE)
logger.info("✅ BLIP model loaded.")
except Exception as e:
logger.warning(f"⚠️ BLIP model loading failed: {e}")
blip_model = None
blip_processor = None
try:
embedding_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
)
logger.info("✅ Embeddings model loaded.")
except Exception as e:
logger.error(f"❌ Error loading embeddings: {e}")
embedding_model = None
try:
vectorstore = Chroma(
persist_directory=DB_PATH,
embedding_function=embedding_model
)
logger.info("✅ Vectorstore initialized.")
except Exception as e:
logger.error(f"❌ Error initializing vectorstore: {e}")
vectorstore = None
try:
llm = ChatGroq(
groq_api_key=GROQ_API_KEY,
model="openai/gpt-oss-120b",
temperature=0.2
)
logger.info("✅ LLM initialized.")
except Exception as e:
logger.error(f"❌ Error initializing LLM: {e}")
llm = None
# ===== FILE EXTRACTION =====
def extract_text_from_pdf(file_path: str) -> str:
"""Extract text from PDF files"""
try:
with open(file_path, "rb") as f:
pdf = fitz.open(stream=f.read(), filetype="pdf")
text = ""
for page_num, page in enumerate(pdf):
text += f"\n--- Page {page_num + 1} ---\n"
text += page.get_text()
logger.info(f"✅ PDF extracted: {len(text)} chars")
return text
except Exception as e:
logger.error(f"❌ PDF extraction error: {e}")
return ""
def extract_text_from_txt(file_path: str) -> str:
"""Extract text from TXT files"""
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
logger.info(f"✅ TXT extracted: {len(text)} chars")
return text
except Exception as e:
logger.error(f"❌ TXT extraction error: {e}")
return ""
def extract_text_from_image(file_path: str) -> str:
"""Extract description from image using BLIP"""
try:
if blip_model is None or blip_processor is None:
logger.warning("BLIP model not available, using fallback")
img = Image.open(file_path).convert("RGB")
width, height = img.size
return f"Image dimensions: {width}x{height} pixels"
img = Image.open(file_path).convert("RGB")
# Generate caption using BLIP
inputs = blip_processor(images=img, return_tensors="pt").to(DEVICE)
output_ids = blip_model.generate(**inputs, max_length=100)
caption = blip_processor.decode(output_ids[0], skip_special_tokens=True)
logger.info(f"✅ Image caption extracted: {caption}")
return f"Image Description: {caption}"
except Exception as e:
logger.error(f"❌ Image extraction error: {e}")
return ""
# ===== AGENTS =====
# 1. TRANSLATION AGENT
def translation_agent(text: str, target_lang: str = "en") -> str:
"""Translate text to target language"""
try:
if not text or len(text.strip()) < 3:
return text
if target_lang == "Auto Detect":
return text
detected_lang = detect(text)
logger.info(f"Detected language: {detected_lang}, target: {target_lang}")
if detected_lang == target_lang:
return text
translated = GoogleTranslator(
source="auto",
target=target_lang
).translate(text)
logger.info(f"✅ Translation completed")
return translated
except Exception as e:
logger.warning(f"⚠️ Translation failed: {e}")
return text
# 2. RETRIEVAL AGENT
def retrieval_agent(query: str, k: int = 5) -> List[str]:
"""Retrieve relevant documents from vectorstore"""
if vectorstore is None:
logger.warning("Vectorstore is None")
return []
try:
logger.info(f"Retrieving top {k} documents for: {query[:50]}")
docs = vectorstore.similarity_search(query, k=k)
if not docs:
logger.warning("No documents found")
return []
results = [d.page_content for d in docs]
logger.info(f"✅ Retrieved {len(results)} documents")
return results
except Exception as e:
logger.error(f"❌ Retrieval error: {e}")
return []
# 3. SUMMARIZATION AGENT
def summarization_agent(text: str) -> str:
"""Summarize given text using LLM"""
if llm is None:
return "❌ LLM not available"
try:
if not text or len(text) < 50:
return text
prompt = f"""Summarize the following text concisely in 2-3 sentences:
{text[:2000]}
Summary:"""
response = llm.invoke(prompt)
summary = response.content if hasattr(response, "content") else str(response)
logger.info(f"✅ Summarization completed")
return summary
except Exception as e:
logger.error(f"❌ Summarization error: {e}")
return f"[Summarization failed: {str(e)[:100]}]"
# 4. FACT-CHECK AGENT
def fact_check_agent(claim: str, context: str) -> str:
"""Fact-check a claim against retrieved context"""
if llm is None:
return "❌ LLM not available"
try:
prompt = f"""You are a fact-checker. Evaluate the following claim based on the context provided.
Context:
{context[:1500]}
Claim:
{claim}
Response format: [TRUE/FALSE/UNVERIFIED] - Brief explanation"""
response = llm.invoke(prompt)
result = response.content if hasattr(response, "content") else str(response)
logger.info(f"✅ Fact-check completed")
return result
except Exception as e:
logger.error(f"❌ Fact-check error: {e}")
return f"[Fact-check failed: {str(e)[:100]}]"
# 5. QUESTION-ANSWERING AGENT
def qa_agent(question: str, context: str) -> str:
"""Answer question based on provided context"""
if llm is None:
return "❌ LLM not available"
try:
prompt = f"""You are a helpful assistant. Answer the following question based ONLY on the context provided.
Context:
{context[:3000]}
Question:
{question}
If the context doesn't contain the answer, say "I cannot find this information in the provided context."
Answer:"""
response = llm.invoke(prompt)
answer = response.content if hasattr(response, "content") else str(response)
logger.info(f"✅ QA completed")
return answer
except Exception as e:
logger.error(f"❌ QA error: {e}")
return f"[Answer generation failed: {str(e)[:100]}]"
# ===== ORCHESTRATOR =====
def multi_agent_orchestrator(question: str, target_language: str) -> str:
"""Main orchestrator - coordinates all agents"""
if not question or not question.strip():
return "❌ Please enter a question."
logger.info(f"Orchestrator started - Question: {question[:50]}")
try:
# Step 1: Translate question to English if needed
logger.info("STEP 1: Translation Agent")
english_question = translation_agent(question, "en")
# Step 2: Retrieve relevant documents
logger.info("STEP 2: Retrieval Agent")
retrieved_docs = retrieval_agent(english_question, k=5)
if not retrieved_docs:
return "❌ No documents found in vectorstore. Please upload and process files first."
context = "\n\n".join(retrieved_docs)
logger.info(f"Context length: {len(context)} chars")
# Step 3: Answer question using QA agent
logger.info("STEP 3: QA Agent")
answer = qa_agent(english_question, context)
# Step 4: Translate answer to target language if needed
if target_language and target_language != "Auto Detect" and target_language != "en":
logger.info("STEP 4: Translation Agent (answer)")
answer = translation_agent(answer, target_language)
logger.info("✅ Orchestration completed successfully")
return answer
except Exception as e:
logger.error(f"❌ Orchestration error: {e}\n{traceback.format_exc()}")
return f"❌ Error: {str(e)[:300]}"
# ===== FILE INGESTION =====
def ingest_files(files) -> str:
"""Ingest uploaded files into vectorstore"""
if not files:
return "⚠️ No files uploaded"
if vectorstore is None or embedding_model is None:
return "❌ Vectorstore not initialized"
logger.info(f"Starting file ingestion for {len(files)} file(s)")
total_chunks = 0
error_messages = []
try:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
for file_obj in files:
try:
# Handle both string paths and file objects
if isinstance(file_obj, str):
file_path = file_obj
filename = os.path.basename(file_path).lower()
else:
file_path = file_obj.name if hasattr(file_obj, 'name') else str(file_obj)
filename = os.path.basename(file_path).lower()
logger.info(f"Processing: {filename} from {file_path}")
text = ""
# Extract based on file type
if filename.endswith(".txt"):
text = extract_text_from_txt(file_path)
elif filename.endswith(".pdf"):
text = extract_text_from_pdf(file_path)
elif filename.endswith((".jpg", ".jpeg", ".png")):
text = extract_text_from_image(file_path)
else:
error_messages.append(f"⚠️ Unsupported format: {filename}")
continue
# Check if text was extracted
if not text or len(text.strip()) < 20:
error_messages.append(f"⚠️ No content extracted from: {filename}")
logger.warning(f"No content from {filename}")
continue
# Split and add to vectorstore
chunks = text_splitter.split_text(text)
logger.info(f"Created {len(chunks)} chunks from {filename}")
vectorstore.add_texts(chunks)
total_chunks += len(chunks)
logger.info(f"✅ Added {len(chunks)} chunks from {filename}")
except Exception as e:
error_msg = f"Error processing {filename}: {str(e)[:100]}"
error_messages.append(error_msg)
logger.error(error_msg)
continue
# Persist vectorstore
if total_chunks > 0:
vectorstore.persist()
logger.info(f"Vectorstore persisted with {total_chunks} chunks")
# Build response message
message = f"✅ Successfully ingested {total_chunks} chunks from {len([f for f in files if f])} file(s)"
if error_messages:
message += "\n\n" + "\n".join(error_messages)
logger.info(message)
return message
except Exception as e:
error_msg = f"❌ Ingestion error: {str(e)[:200]}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return error_msg
# ===== GRADIO INTERFACE =====
suggested_questions = [
"What is the main idea of the text?",
"Summarize the content.",
"What are the key points?",
"¿Cuál es la idea principal?",
"Quel est le résumé?",
"Was sind die wichtigsten Punkte?",
]
supported_languages = [
"Auto Detect", "en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "hi", "bn", "ar"
]
with gr.Blocks(title="Nexus RAG- MultiAgent MultiLingual RAG System") as interface:
gr.Markdown("# 🧠 Nexus RAG- MultiAgent MultiLingual RAG System")
gr.Markdown("**Upload documents** → **Auto-ingest** → **Ask questions** in any language")
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### 📁 Document Upload")
file_input = gr.File(
label="Upload Files (TXT/PDF/Images)",
file_count="multiple",
file_types=[".txt", ".pdf", ".jpg", ".jpeg", ".png"]
)
ingest_output = gr.Textbox(
label="Ingestion Status",
interactive=False,
lines=4
)
with gr.Column(scale=3):
gr.Markdown("### ❓ Ask Your Question")
question = gr.Textbox(
label="Your Question",
placeholder="Ask anything about your documents...",
lines=3
)
language_dropdown = gr.Dropdown(
choices=supported_languages,
value="Auto Detect",
label="Output Language"
)
submit_button = gr.Button("💬 Ask Agent", variant="primary", size="lg")
output_answer = gr.Markdown(label="📝 Answer")
gr.Markdown("### 💡 Example Questions:")
gr.Examples(suggested_questions, inputs=question)
# Auto-ingest files when uploaded
file_input.change(
fn=ingest_files,
inputs=file_input,
outputs=ingest_output
)
# Submit question
submit_button.click(
fn=multi_agent_orchestrator,
inputs=[question, language_dropdown],
outputs=output_answer
)
if __name__ == "__main__":
logger.info("🚀 Launching Gradio interface...")
interface.launch()