|
|
import os |
|
|
import io |
|
|
import logging |
|
|
import json |
|
|
import re |
|
|
import traceback |
|
|
from typing import Optional, List |
|
|
|
|
|
import gradio as gr |
|
|
from langchain_core.prompts import PromptTemplate |
|
|
from langchain_groq import ChatGroq |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langdetect import detect, LangDetectException |
|
|
from deep_translator import GoogleTranslator |
|
|
import torch |
|
|
from PIL import Image |
|
|
import fitz |
|
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
|
if not GROQ_API_KEY: |
|
|
raise ValueError("❌ Missing GROQ_API_KEY - set in Settings → Secrets") |
|
|
|
|
|
DB_PATH = "./chroma_db" |
|
|
DEVICE = "cpu" |
|
|
os.makedirs(DB_PATH, exist_ok=True) |
|
|
|
|
|
|
|
|
logger.info("Initializing components...") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("Loading BLIP image captioning model...") |
|
|
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") |
|
|
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(DEVICE) |
|
|
logger.info("✅ BLIP model loaded.") |
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ BLIP model loading failed: {e}") |
|
|
blip_model = None |
|
|
blip_processor = None |
|
|
|
|
|
try: |
|
|
embedding_model = HuggingFaceEmbeddings( |
|
|
model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" |
|
|
) |
|
|
logger.info("✅ Embeddings model loaded.") |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error loading embeddings: {e}") |
|
|
embedding_model = None |
|
|
|
|
|
try: |
|
|
vectorstore = Chroma( |
|
|
persist_directory=DB_PATH, |
|
|
embedding_function=embedding_model |
|
|
) |
|
|
logger.info("✅ Vectorstore initialized.") |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error initializing vectorstore: {e}") |
|
|
vectorstore = None |
|
|
|
|
|
try: |
|
|
llm = ChatGroq( |
|
|
groq_api_key=GROQ_API_KEY, |
|
|
model="openai/gpt-oss-120b", |
|
|
temperature=0.2 |
|
|
) |
|
|
logger.info("✅ LLM initialized.") |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error initializing LLM: {e}") |
|
|
llm = None |
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file_path: str) -> str: |
|
|
"""Extract text from PDF files""" |
|
|
try: |
|
|
with open(file_path, "rb") as f: |
|
|
pdf = fitz.open(stream=f.read(), filetype="pdf") |
|
|
text = "" |
|
|
for page_num, page in enumerate(pdf): |
|
|
text += f"\n--- Page {page_num + 1} ---\n" |
|
|
text += page.get_text() |
|
|
logger.info(f"✅ PDF extracted: {len(text)} chars") |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.error(f"❌ PDF extraction error: {e}") |
|
|
return "" |
|
|
|
|
|
def extract_text_from_txt(file_path: str) -> str: |
|
|
"""Extract text from TXT files""" |
|
|
try: |
|
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
|
|
text = f.read() |
|
|
logger.info(f"✅ TXT extracted: {len(text)} chars") |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.error(f"❌ TXT extraction error: {e}") |
|
|
return "" |
|
|
|
|
|
def extract_text_from_image(file_path: str) -> str: |
|
|
"""Extract description from image using BLIP""" |
|
|
try: |
|
|
if blip_model is None or blip_processor is None: |
|
|
logger.warning("BLIP model not available, using fallback") |
|
|
img = Image.open(file_path).convert("RGB") |
|
|
width, height = img.size |
|
|
return f"Image dimensions: {width}x{height} pixels" |
|
|
|
|
|
img = Image.open(file_path).convert("RGB") |
|
|
|
|
|
|
|
|
inputs = blip_processor(images=img, return_tensors="pt").to(DEVICE) |
|
|
output_ids = blip_model.generate(**inputs, max_length=100) |
|
|
caption = blip_processor.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
|
|
logger.info(f"✅ Image caption extracted: {caption}") |
|
|
return f"Image Description: {caption}" |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Image extraction error: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def translation_agent(text: str, target_lang: str = "en") -> str: |
|
|
"""Translate text to target language""" |
|
|
try: |
|
|
if not text or len(text.strip()) < 3: |
|
|
return text |
|
|
|
|
|
if target_lang == "Auto Detect": |
|
|
return text |
|
|
|
|
|
detected_lang = detect(text) |
|
|
logger.info(f"Detected language: {detected_lang}, target: {target_lang}") |
|
|
|
|
|
if detected_lang == target_lang: |
|
|
return text |
|
|
|
|
|
translated = GoogleTranslator( |
|
|
source="auto", |
|
|
target=target_lang |
|
|
).translate(text) |
|
|
logger.info(f"✅ Translation completed") |
|
|
return translated |
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ Translation failed: {e}") |
|
|
return text |
|
|
|
|
|
|
|
|
def retrieval_agent(query: str, k: int = 5) -> List[str]: |
|
|
"""Retrieve relevant documents from vectorstore""" |
|
|
if vectorstore is None: |
|
|
logger.warning("Vectorstore is None") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
logger.info(f"Retrieving top {k} documents for: {query[:50]}") |
|
|
docs = vectorstore.similarity_search(query, k=k) |
|
|
|
|
|
if not docs: |
|
|
logger.warning("No documents found") |
|
|
return [] |
|
|
|
|
|
results = [d.page_content for d in docs] |
|
|
logger.info(f"✅ Retrieved {len(results)} documents") |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Retrieval error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def summarization_agent(text: str) -> str: |
|
|
"""Summarize given text using LLM""" |
|
|
if llm is None: |
|
|
return "❌ LLM not available" |
|
|
|
|
|
try: |
|
|
if not text or len(text) < 50: |
|
|
return text |
|
|
|
|
|
prompt = f"""Summarize the following text concisely in 2-3 sentences: |
|
|
|
|
|
{text[:2000]} |
|
|
|
|
|
Summary:""" |
|
|
|
|
|
response = llm.invoke(prompt) |
|
|
summary = response.content if hasattr(response, "content") else str(response) |
|
|
logger.info(f"✅ Summarization completed") |
|
|
return summary |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Summarization error: {e}") |
|
|
return f"[Summarization failed: {str(e)[:100]}]" |
|
|
|
|
|
|
|
|
def fact_check_agent(claim: str, context: str) -> str: |
|
|
"""Fact-check a claim against retrieved context""" |
|
|
if llm is None: |
|
|
return "❌ LLM not available" |
|
|
|
|
|
try: |
|
|
prompt = f"""You are a fact-checker. Evaluate the following claim based on the context provided. |
|
|
|
|
|
Context: |
|
|
{context[:1500]} |
|
|
|
|
|
Claim: |
|
|
{claim} |
|
|
|
|
|
Response format: [TRUE/FALSE/UNVERIFIED] - Brief explanation""" |
|
|
|
|
|
response = llm.invoke(prompt) |
|
|
result = response.content if hasattr(response, "content") else str(response) |
|
|
logger.info(f"✅ Fact-check completed") |
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Fact-check error: {e}") |
|
|
return f"[Fact-check failed: {str(e)[:100]}]" |
|
|
|
|
|
|
|
|
def qa_agent(question: str, context: str) -> str: |
|
|
"""Answer question based on provided context""" |
|
|
if llm is None: |
|
|
return "❌ LLM not available" |
|
|
|
|
|
try: |
|
|
prompt = f"""You are a helpful assistant. Answer the following question based ONLY on the context provided. |
|
|
|
|
|
Context: |
|
|
{context[:3000]} |
|
|
|
|
|
Question: |
|
|
{question} |
|
|
|
|
|
If the context doesn't contain the answer, say "I cannot find this information in the provided context." |
|
|
|
|
|
Answer:""" |
|
|
|
|
|
response = llm.invoke(prompt) |
|
|
answer = response.content if hasattr(response, "content") else str(response) |
|
|
logger.info(f"✅ QA completed") |
|
|
return answer |
|
|
except Exception as e: |
|
|
logger.error(f"❌ QA error: {e}") |
|
|
return f"[Answer generation failed: {str(e)[:100]}]" |
|
|
|
|
|
|
|
|
def multi_agent_orchestrator(question: str, target_language: str) -> str: |
|
|
"""Main orchestrator - coordinates all agents""" |
|
|
|
|
|
if not question or not question.strip(): |
|
|
return "❌ Please enter a question." |
|
|
|
|
|
logger.info(f"Orchestrator started - Question: {question[:50]}") |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("STEP 1: Translation Agent") |
|
|
english_question = translation_agent(question, "en") |
|
|
|
|
|
|
|
|
logger.info("STEP 2: Retrieval Agent") |
|
|
retrieved_docs = retrieval_agent(english_question, k=5) |
|
|
|
|
|
if not retrieved_docs: |
|
|
return "❌ No documents found in vectorstore. Please upload and process files first." |
|
|
|
|
|
context = "\n\n".join(retrieved_docs) |
|
|
logger.info(f"Context length: {len(context)} chars") |
|
|
|
|
|
|
|
|
logger.info("STEP 3: QA Agent") |
|
|
answer = qa_agent(english_question, context) |
|
|
|
|
|
|
|
|
if target_language and target_language != "Auto Detect" and target_language != "en": |
|
|
logger.info("STEP 4: Translation Agent (answer)") |
|
|
answer = translation_agent(answer, target_language) |
|
|
|
|
|
logger.info("✅ Orchestration completed successfully") |
|
|
return answer |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Orchestration error: {e}\n{traceback.format_exc()}") |
|
|
return f"❌ Error: {str(e)[:300]}" |
|
|
|
|
|
|
|
|
def ingest_files(files) -> str: |
|
|
"""Ingest uploaded files into vectorstore""" |
|
|
|
|
|
if not files: |
|
|
return "⚠️ No files uploaded" |
|
|
|
|
|
if vectorstore is None or embedding_model is None: |
|
|
return "❌ Vectorstore not initialized" |
|
|
|
|
|
logger.info(f"Starting file ingestion for {len(files)} file(s)") |
|
|
|
|
|
total_chunks = 0 |
|
|
error_messages = [] |
|
|
|
|
|
try: |
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=1000, |
|
|
chunk_overlap=200 |
|
|
) |
|
|
|
|
|
for file_obj in files: |
|
|
try: |
|
|
|
|
|
if isinstance(file_obj, str): |
|
|
file_path = file_obj |
|
|
filename = os.path.basename(file_path).lower() |
|
|
else: |
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else str(file_obj) |
|
|
filename = os.path.basename(file_path).lower() |
|
|
|
|
|
logger.info(f"Processing: {filename} from {file_path}") |
|
|
|
|
|
text = "" |
|
|
|
|
|
|
|
|
if filename.endswith(".txt"): |
|
|
text = extract_text_from_txt(file_path) |
|
|
elif filename.endswith(".pdf"): |
|
|
text = extract_text_from_pdf(file_path) |
|
|
elif filename.endswith((".jpg", ".jpeg", ".png")): |
|
|
text = extract_text_from_image(file_path) |
|
|
else: |
|
|
error_messages.append(f"⚠️ Unsupported format: {filename}") |
|
|
continue |
|
|
|
|
|
|
|
|
if not text or len(text.strip()) < 20: |
|
|
error_messages.append(f"⚠️ No content extracted from: {filename}") |
|
|
logger.warning(f"No content from {filename}") |
|
|
continue |
|
|
|
|
|
|
|
|
chunks = text_splitter.split_text(text) |
|
|
logger.info(f"Created {len(chunks)} chunks from {filename}") |
|
|
|
|
|
vectorstore.add_texts(chunks) |
|
|
total_chunks += len(chunks) |
|
|
logger.info(f"✅ Added {len(chunks)} chunks from {filename}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing {filename}: {str(e)[:100]}" |
|
|
error_messages.append(error_msg) |
|
|
logger.error(error_msg) |
|
|
continue |
|
|
|
|
|
|
|
|
if total_chunks > 0: |
|
|
vectorstore.persist() |
|
|
logger.info(f"Vectorstore persisted with {total_chunks} chunks") |
|
|
|
|
|
|
|
|
message = f"✅ Successfully ingested {total_chunks} chunks from {len([f for f in files if f])} file(s)" |
|
|
|
|
|
if error_messages: |
|
|
message += "\n\n" + "\n".join(error_messages) |
|
|
|
|
|
logger.info(message) |
|
|
return message |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Ingestion error: {str(e)[:200]}" |
|
|
logger.error(f"{error_msg}\n{traceback.format_exc()}") |
|
|
return error_msg |
|
|
|
|
|
|
|
|
suggested_questions = [ |
|
|
"What is the main idea of the text?", |
|
|
"Summarize the content.", |
|
|
"What are the key points?", |
|
|
"¿Cuál es la idea principal?", |
|
|
"Quel est le résumé?", |
|
|
"Was sind die wichtigsten Punkte?", |
|
|
] |
|
|
|
|
|
supported_languages = [ |
|
|
"Auto Detect", "en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "hi", "bn", "ar" |
|
|
] |
|
|
|
|
|
with gr.Blocks(title="Nexus RAG- MultiAgent MultiLingual RAG System") as interface: |
|
|
gr.Markdown("# 🧠 Nexus RAG- MultiAgent MultiLingual RAG System") |
|
|
gr.Markdown("**Upload documents** → **Auto-ingest** → **Ask questions** in any language") |
|
|
gr.Markdown("---") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("### 📁 Document Upload") |
|
|
file_input = gr.File( |
|
|
label="Upload Files (TXT/PDF/Images)", |
|
|
file_count="multiple", |
|
|
file_types=[".txt", ".pdf", ".jpg", ".jpeg", ".png"] |
|
|
) |
|
|
ingest_output = gr.Textbox( |
|
|
label="Ingestion Status", |
|
|
interactive=False, |
|
|
lines=4 |
|
|
) |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
gr.Markdown("### ❓ Ask Your Question") |
|
|
question = gr.Textbox( |
|
|
label="Your Question", |
|
|
placeholder="Ask anything about your documents...", |
|
|
lines=3 |
|
|
) |
|
|
language_dropdown = gr.Dropdown( |
|
|
choices=supported_languages, |
|
|
value="Auto Detect", |
|
|
label="Output Language" |
|
|
) |
|
|
submit_button = gr.Button("💬 Ask Agent", variant="primary", size="lg") |
|
|
output_answer = gr.Markdown(label="📝 Answer") |
|
|
|
|
|
gr.Markdown("### 💡 Example Questions:") |
|
|
gr.Examples(suggested_questions, inputs=question) |
|
|
|
|
|
|
|
|
file_input.change( |
|
|
fn=ingest_files, |
|
|
inputs=file_input, |
|
|
outputs=ingest_output |
|
|
) |
|
|
|
|
|
|
|
|
submit_button.click( |
|
|
fn=multi_agent_orchestrator, |
|
|
inputs=[question, language_dropdown], |
|
|
outputs=output_answer |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
logger.info("🚀 Launching Gradio interface...") |
|
|
interface.launch() |