import os import io import logging import json import re import traceback from typing import Optional, List import gradio as gr from langchain_core.prompts import PromptTemplate from langchain_groq import ChatGroq from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from langdetect import detect, LangDetectException from deep_translator import GoogleTranslator import torch from PIL import Image import fitz from transformers import BlipProcessor, BlipForConditionalGeneration # ===== LOGGING ===== logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ===== CONFIG ===== GROQ_API_KEY = os.getenv("GROQ_API_KEY") if not GROQ_API_KEY: raise ValueError("❌ Missing GROQ_API_KEY - set in Settings → Secrets") DB_PATH = "./chroma_db" DEVICE = "cpu" os.makedirs(DB_PATH, exist_ok=True) # ===== INITIALIZE COMPONENTS ===== logger.info("Initializing components...") # Initialize BLIP for image captioning try: logger.info("Loading BLIP image captioning model...") blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(DEVICE) logger.info("✅ BLIP model loaded.") except Exception as e: logger.warning(f"⚠️ BLIP model loading failed: {e}") blip_model = None blip_processor = None try: embedding_model = HuggingFaceEmbeddings( model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" ) logger.info("✅ Embeddings model loaded.") except Exception as e: logger.error(f"❌ Error loading embeddings: {e}") embedding_model = None try: vectorstore = Chroma( persist_directory=DB_PATH, embedding_function=embedding_model ) logger.info("✅ Vectorstore initialized.") except Exception as e: logger.error(f"❌ Error initializing vectorstore: {e}") vectorstore = None try: llm = ChatGroq( groq_api_key=GROQ_API_KEY, model="openai/gpt-oss-120b", temperature=0.2 ) logger.info("✅ LLM initialized.") except Exception as e: logger.error(f"❌ Error initializing LLM: {e}") llm = None # ===== FILE EXTRACTION ===== def extract_text_from_pdf(file_path: str) -> str: """Extract text from PDF files""" try: with open(file_path, "rb") as f: pdf = fitz.open(stream=f.read(), filetype="pdf") text = "" for page_num, page in enumerate(pdf): text += f"\n--- Page {page_num + 1} ---\n" text += page.get_text() logger.info(f"✅ PDF extracted: {len(text)} chars") return text except Exception as e: logger.error(f"❌ PDF extraction error: {e}") return "" def extract_text_from_txt(file_path: str) -> str: """Extract text from TXT files""" try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: text = f.read() logger.info(f"✅ TXT extracted: {len(text)} chars") return text except Exception as e: logger.error(f"❌ TXT extraction error: {e}") return "" def extract_text_from_image(file_path: str) -> str: """Extract description from image using BLIP""" try: if blip_model is None or blip_processor is None: logger.warning("BLIP model not available, using fallback") img = Image.open(file_path).convert("RGB") width, height = img.size return f"Image dimensions: {width}x{height} pixels" img = Image.open(file_path).convert("RGB") # Generate caption using BLIP inputs = blip_processor(images=img, return_tensors="pt").to(DEVICE) output_ids = blip_model.generate(**inputs, max_length=100) caption = blip_processor.decode(output_ids[0], skip_special_tokens=True) logger.info(f"✅ Image caption extracted: {caption}") return f"Image Description: {caption}" except Exception as e: logger.error(f"❌ Image extraction error: {e}") return "" # ===== AGENTS ===== # 1. TRANSLATION AGENT def translation_agent(text: str, target_lang: str = "en") -> str: """Translate text to target language""" try: if not text or len(text.strip()) < 3: return text if target_lang == "Auto Detect": return text detected_lang = detect(text) logger.info(f"Detected language: {detected_lang}, target: {target_lang}") if detected_lang == target_lang: return text translated = GoogleTranslator( source="auto", target=target_lang ).translate(text) logger.info(f"✅ Translation completed") return translated except Exception as e: logger.warning(f"⚠️ Translation failed: {e}") return text # 2. RETRIEVAL AGENT def retrieval_agent(query: str, k: int = 5) -> List[str]: """Retrieve relevant documents from vectorstore""" if vectorstore is None: logger.warning("Vectorstore is None") return [] try: logger.info(f"Retrieving top {k} documents for: {query[:50]}") docs = vectorstore.similarity_search(query, k=k) if not docs: logger.warning("No documents found") return [] results = [d.page_content for d in docs] logger.info(f"✅ Retrieved {len(results)} documents") return results except Exception as e: logger.error(f"❌ Retrieval error: {e}") return [] # 3. SUMMARIZATION AGENT def summarization_agent(text: str) -> str: """Summarize given text using LLM""" if llm is None: return "❌ LLM not available" try: if not text or len(text) < 50: return text prompt = f"""Summarize the following text concisely in 2-3 sentences: {text[:2000]} Summary:""" response = llm.invoke(prompt) summary = response.content if hasattr(response, "content") else str(response) logger.info(f"✅ Summarization completed") return summary except Exception as e: logger.error(f"❌ Summarization error: {e}") return f"[Summarization failed: {str(e)[:100]}]" # 4. FACT-CHECK AGENT def fact_check_agent(claim: str, context: str) -> str: """Fact-check a claim against retrieved context""" if llm is None: return "❌ LLM not available" try: prompt = f"""You are a fact-checker. Evaluate the following claim based on the context provided. Context: {context[:1500]} Claim: {claim} Response format: [TRUE/FALSE/UNVERIFIED] - Brief explanation""" response = llm.invoke(prompt) result = response.content if hasattr(response, "content") else str(response) logger.info(f"✅ Fact-check completed") return result except Exception as e: logger.error(f"❌ Fact-check error: {e}") return f"[Fact-check failed: {str(e)[:100]}]" # 5. QUESTION-ANSWERING AGENT def qa_agent(question: str, context: str) -> str: """Answer question based on provided context""" if llm is None: return "❌ LLM not available" try: prompt = f"""You are a helpful assistant. Answer the following question based ONLY on the context provided. Context: {context[:3000]} Question: {question} If the context doesn't contain the answer, say "I cannot find this information in the provided context." Answer:""" response = llm.invoke(prompt) answer = response.content if hasattr(response, "content") else str(response) logger.info(f"✅ QA completed") return answer except Exception as e: logger.error(f"❌ QA error: {e}") return f"[Answer generation failed: {str(e)[:100]}]" # ===== ORCHESTRATOR ===== def multi_agent_orchestrator(question: str, target_language: str) -> str: """Main orchestrator - coordinates all agents""" if not question or not question.strip(): return "❌ Please enter a question." logger.info(f"Orchestrator started - Question: {question[:50]}") try: # Step 1: Translate question to English if needed logger.info("STEP 1: Translation Agent") english_question = translation_agent(question, "en") # Step 2: Retrieve relevant documents logger.info("STEP 2: Retrieval Agent") retrieved_docs = retrieval_agent(english_question, k=5) if not retrieved_docs: return "❌ No documents found in vectorstore. Please upload and process files first." context = "\n\n".join(retrieved_docs) logger.info(f"Context length: {len(context)} chars") # Step 3: Answer question using QA agent logger.info("STEP 3: QA Agent") answer = qa_agent(english_question, context) # Step 4: Translate answer to target language if needed if target_language and target_language != "Auto Detect" and target_language != "en": logger.info("STEP 4: Translation Agent (answer)") answer = translation_agent(answer, target_language) logger.info("✅ Orchestration completed successfully") return answer except Exception as e: logger.error(f"❌ Orchestration error: {e}\n{traceback.format_exc()}") return f"❌ Error: {str(e)[:300]}" # ===== FILE INGESTION ===== def ingest_files(files) -> str: """Ingest uploaded files into vectorstore""" if not files: return "⚠️ No files uploaded" if vectorstore is None or embedding_model is None: return "❌ Vectorstore not initialized" logger.info(f"Starting file ingestion for {len(files)} file(s)") total_chunks = 0 error_messages = [] try: text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) for file_obj in files: try: # Handle both string paths and file objects if isinstance(file_obj, str): file_path = file_obj filename = os.path.basename(file_path).lower() else: file_path = file_obj.name if hasattr(file_obj, 'name') else str(file_obj) filename = os.path.basename(file_path).lower() logger.info(f"Processing: {filename} from {file_path}") text = "" # Extract based on file type if filename.endswith(".txt"): text = extract_text_from_txt(file_path) elif filename.endswith(".pdf"): text = extract_text_from_pdf(file_path) elif filename.endswith((".jpg", ".jpeg", ".png")): text = extract_text_from_image(file_path) else: error_messages.append(f"⚠️ Unsupported format: {filename}") continue # Check if text was extracted if not text or len(text.strip()) < 20: error_messages.append(f"⚠️ No content extracted from: {filename}") logger.warning(f"No content from {filename}") continue # Split and add to vectorstore chunks = text_splitter.split_text(text) logger.info(f"Created {len(chunks)} chunks from {filename}") vectorstore.add_texts(chunks) total_chunks += len(chunks) logger.info(f"✅ Added {len(chunks)} chunks from {filename}") except Exception as e: error_msg = f"Error processing {filename}: {str(e)[:100]}" error_messages.append(error_msg) logger.error(error_msg) continue # Persist vectorstore if total_chunks > 0: vectorstore.persist() logger.info(f"Vectorstore persisted with {total_chunks} chunks") # Build response message message = f"✅ Successfully ingested {total_chunks} chunks from {len([f for f in files if f])} file(s)" if error_messages: message += "\n\n" + "\n".join(error_messages) logger.info(message) return message except Exception as e: error_msg = f"❌ Ingestion error: {str(e)[:200]}" logger.error(f"{error_msg}\n{traceback.format_exc()}") return error_msg # ===== GRADIO INTERFACE ===== suggested_questions = [ "What is the main idea of the text?", "Summarize the content.", "What are the key points?", "¿Cuál es la idea principal?", "Quel est le résumé?", "Was sind die wichtigsten Punkte?", ] supported_languages = [ "Auto Detect", "en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "hi", "bn", "ar" ] with gr.Blocks(title="Nexus RAG- MultiAgent MultiLingual RAG System") as interface: gr.Markdown("# 🧠 Nexus RAG- MultiAgent MultiLingual RAG System") gr.Markdown("**Upload documents** → **Auto-ingest** → **Ask questions** in any language") gr.Markdown("---") with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 📁 Document Upload") file_input = gr.File( label="Upload Files (TXT/PDF/Images)", file_count="multiple", file_types=[".txt", ".pdf", ".jpg", ".jpeg", ".png"] ) ingest_output = gr.Textbox( label="Ingestion Status", interactive=False, lines=4 ) with gr.Column(scale=3): gr.Markdown("### ❓ Ask Your Question") question = gr.Textbox( label="Your Question", placeholder="Ask anything about your documents...", lines=3 ) language_dropdown = gr.Dropdown( choices=supported_languages, value="Auto Detect", label="Output Language" ) submit_button = gr.Button("💬 Ask Agent", variant="primary", size="lg") output_answer = gr.Markdown(label="📝 Answer") gr.Markdown("### 💡 Example Questions:") gr.Examples(suggested_questions, inputs=question) # Auto-ingest files when uploaded file_input.change( fn=ingest_files, inputs=file_input, outputs=ingest_output ) # Submit question submit_button.click( fn=multi_agent_orchestrator, inputs=[question, language_dropdown], outputs=output_answer ) if __name__ == "__main__": logger.info("🚀 Launching Gradio interface...") interface.launch()