Spaces:
Sleeping
Sleeping
| """ | |
| Visual Multi-Agent RAG Chatbot using LangGraph | |
| This system uses the same sophisticated multi-agent architecture as v1, | |
| but with visual document retrieval (ColPali) instead of text-based RAG. | |
| Inherits from BaseMultiAgentChatbot to get: | |
| - LLM-based query analysis | |
| - Filter extraction and validation | |
| - Query rewriting | |
| - Main/RAG/Response agent orchestration | |
| Only implements: | |
| - Visual search retrieval | |
| - Response generation with visual context | |
| Phase 2 IMPLEMENTED: Multi-modal LLM support | |
| - Top 3 images (by relevance) are sent directly to GPT-4o | |
| - LLM can see tables, charts, figures directly | |
| - Falls back to text-only if multi-modal fails | |
| """ | |
| import os | |
| import time | |
| import base64 | |
| import logging | |
| import traceback | |
| import httpx | |
| from typing import Dict, List, Any, Optional | |
| from openai import OpenAI | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from src.agents.base_multi_agent_chatbot import BaseMultiAgentChatbot, MultiAgentState | |
| from src.colpali.visual_search import VisualSearchAdapter | |
| logger = logging.getLogger(__name__) | |
| # Multi-modal LLM configuration | |
| MULTIMODAL_MODEL = os.environ.get("VISUAL_RAG_MODEL", "gpt-4o") # GPT-4o supports vision | |
| MULTIMODAL_MAX_IMAGES = int(os.environ.get("VISUAL_RAG_MAX_IMAGES", "3")) # Top N images by relevance score | |
| MULTIMODAL_ENABLED = os.environ.get("VISUAL_RAG_MULTIMODAL", "true").lower() == "true" # Toggle for multi-modal mode | |
| class VisualMultiAgentChatbot(BaseMultiAgentChatbot): | |
| """Multi-agent chatbot with visual RAG (ColPali) and multi-modal response generation""" | |
| def __init__( | |
| self, | |
| visual_search: VisualSearchAdapter, | |
| config_path: str = "src/config/settings.yaml", | |
| enable_multimodal: bool = True | |
| ): | |
| """ | |
| Initialize the visual multi-agent chatbot. | |
| Args: | |
| visual_search: Visual search adapter (ColPali) | |
| config_path: Path to config file | |
| enable_multimodal: Whether to use multi-modal LLM (GPT-4o with images) | |
| """ | |
| self.visual_search = visual_search | |
| self.enable_multimodal = enable_multimodal and MULTIMODAL_ENABLED | |
| # Initialize OpenAI client for multi-modal (GPT-4o) | |
| self.openai_client = None | |
| if self.enable_multimodal: | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if api_key: | |
| self.openai_client = OpenAI(api_key=api_key) | |
| logger.info(f"πΌοΈ Multi-modal LLM initialized: {MULTIMODAL_MODEL}") | |
| else: | |
| logger.warning("β οΈ OPENAI_API_KEY not set, multi-modal disabled") | |
| self.enable_multimodal = False | |
| # Call parent init (sets up LLM, filters, graph, etc.) | |
| super().__init__(config_path) | |
| logger.info(f"π¨ Visual Multi-Agent Chatbot initialized (multi-modal: {self.enable_multimodal})") | |
| def _perform_retrieval(self, query: str, filters: Dict[str, Any]) -> Any: | |
| """ | |
| Perform visual search retrieval. | |
| Args: | |
| query: The rewritten query | |
| filters: The filters to apply | |
| Returns: | |
| Result object with .sources and .answer attributes | |
| """ | |
| logger.info(f"π VISUAL RETRIEVAL: Performing visual search") | |
| logger.info(f"π VISUAL RETRIEVAL: Query: '{query}'") | |
| logger.info(f"π VISUAL RETRIEVAL: Filters: {filters}") | |
| # Convert filters to visual search format | |
| visual_filters = {} | |
| if filters.get("sources"): | |
| visual_filters["sources"] = filters["sources"] | |
| if filters.get("year"): | |
| # Convert to "years" (plural) for visual search | |
| visual_filters["years"] = filters["year"] | |
| if filters.get("district"): | |
| # Convert to "districts" (plural) for visual search | |
| visual_filters["districts"] = filters["district"] | |
| if filters.get("filenames"): | |
| visual_filters["filenames"] = filters["filenames"] | |
| logger.info(f"π VISUAL RETRIEVAL: Converted filters: {visual_filters}") | |
| # Perform visual search | |
| try: | |
| visual_results = self.visual_search.search( | |
| query=query, | |
| top_k=10, | |
| filters=visual_filters, | |
| search_strategy="multi_vector" | |
| ) | |
| logger.info(f"π VISUAL RETRIEVAL: Retrieved {len(visual_results)} visual documents") | |
| # Return in format expected by base class | |
| class Result: | |
| def __init__(self, sources, answer=""): | |
| self.sources = sources | |
| self.answer = answer | |
| return Result(visual_results, "") | |
| except Exception as e: | |
| logger.error(f"β VISUAL RETRIEVAL: Error during visual search: {e}") | |
| traceback.print_exc() | |
| # Return empty result | |
| class Result: | |
| def __init__(self, sources, answer=""): | |
| self.sources = sources | |
| self.answer = answer | |
| return Result([], "") | |
| def _response_agent(self, state: MultiAgentState) -> MultiAgentState: | |
| """ | |
| Override response agent for Visual RAG. | |
| Visual RAG uses MaxSim scores (typically 10-30+) instead of cosine similarity (0-1), | |
| so we skip the similarity score threshold check that's in the base class. | |
| """ | |
| logger.info("π VISUAL RESPONSE AGENT: Starting document retrieval and answer generation") | |
| rag_query = state["rag_query"] | |
| filters = state["rag_filters"] | |
| logger.info(f"π VISUAL RESPONSE AGENT: Query: '{rag_query}'") | |
| logger.info(f"π VISUAL RESPONSE AGENT: Filters: {filters}") | |
| try: | |
| # Call visual retrieval | |
| result = self._perform_retrieval(rag_query, filters) | |
| state["retrieved_documents"] = result.sources | |
| state["agent_logs"].append(f"VISUAL RESPONSE AGENT: Retrieved {len(result.sources)} documents") | |
| logger.info(f"π VISUAL RESPONSE AGENT: Retrieved {len(result.sources)} documents") | |
| # For Visual RAG, we don't check similarity scores (MaxSim scores are different scale) | |
| # Just check if we have any documents | |
| if not result.sources: | |
| logger.warning(f"β οΈ VISUAL RESPONSE AGENT: No documents retrieved, using LLM knowledge only") | |
| response = self._generate_conversational_response_without_docs( | |
| state["current_query"], | |
| state["messages"] | |
| ) | |
| else: | |
| # Generate conversational response with documents | |
| response = self._generate_conversational_response( | |
| state["current_query"], | |
| result.sources, | |
| result.answer, | |
| state["messages"], | |
| filters | |
| ) | |
| state["final_response"] = response | |
| state["last_ai_message_time"] = time.time() | |
| logger.info(f"π VISUAL RESPONSE AGENT: Answer generation complete") | |
| except Exception as e: | |
| logger.error(f"β VISUAL RESPONSE AGENT ERROR: {e}") | |
| traceback.print_exc() | |
| state["final_response"] = "I apologize, but I encountered an error while retrieving visual documents. Please try again." | |
| state["last_ai_message_time"] = time.time() | |
| return state | |
| def _fetch_image_as_base64(self, image_url: str, timeout: float = 10.0) -> Optional[str]: | |
| """ | |
| Fetch an image from URL and convert to base64. | |
| Args: | |
| image_url: URL of the image | |
| timeout: Request timeout in seconds | |
| Returns: | |
| Base64 encoded image string, or None if failed | |
| """ | |
| try: | |
| with httpx.Client(timeout=timeout) as client: | |
| response = client.get(image_url) | |
| response.raise_for_status() | |
| # Determine content type | |
| content_type = response.headers.get('content-type', 'image/jpeg') | |
| if 'png' in content_type: | |
| media_type = 'image/png' | |
| elif 'gif' in content_type: | |
| media_type = 'image/gif' | |
| elif 'webp' in content_type: | |
| media_type = 'image/webp' | |
| else: | |
| media_type = 'image/jpeg' | |
| # Encode to base64 | |
| base64_image = base64.b64encode(response.content).decode('utf-8') | |
| return f"data:{media_type};base64,{base64_image}" | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to fetch image {image_url}: {e}") | |
| return None | |
| def _generate_multimodal_response( | |
| self, | |
| query: str, | |
| documents: List[Any], | |
| conversation_context: str, | |
| correct_names: str, | |
| filters: Dict[str, Any] = None | |
| ) -> Optional[str]: | |
| """ | |
| Generate response using GPT-4o with images (multi-modal). | |
| Sends top 3 images by relevance score directly to the LLM. | |
| Args: | |
| query: User query | |
| documents: Retrieved visual documents (sorted by score) | |
| conversation_context: Formatted conversation history | |
| correct_names: Correct district/source names from metadata | |
| filters: Applied filters | |
| Returns: | |
| LLM response string, or None if multi-modal generation failed | |
| """ | |
| if not self.openai_client or not self.enable_multimodal: | |
| logger.info("πΌοΈ Multi-modal disabled, skipping") | |
| return None | |
| logger.info("=" * 80) | |
| logger.info("πΌοΈ MULTI-MODAL RESPONSE GENERATION: Starting") | |
| logger.info("=" * 80) | |
| # Get top N images by relevance score | |
| top_docs = documents[:MULTIMODAL_MAX_IMAGES] | |
| logger.info(f"πΌοΈ MULTI-MODAL: Processing top {len(top_docs)} documents for image injection") | |
| # Fetch images and build content | |
| image_contents = [] | |
| image_descriptions = [] | |
| for i, doc in enumerate(top_docs): | |
| metadata = getattr(doc, 'metadata', {}) | |
| content = getattr(doc, 'page_content', '') | |
| score = getattr(doc, 'score', 0.0) | |
| # Get image URL (prefer original for better quality) | |
| image_url = metadata.get('original_url') or metadata.get('resized_url') or metadata.get('page') | |
| if image_url and isinstance(image_url, str) and image_url.startswith('http'): | |
| logger.info(f"πΌοΈ MULTI-MODAL: Fetching image {i+1}: {image_url[:80]}...") | |
| # Fetch and encode image | |
| base64_image = self._fetch_image_as_base64(image_url) | |
| if base64_image: | |
| image_contents.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": base64_image, | |
| "detail": "high" # High detail for document analysis | |
| } | |
| }) | |
| # Build description for this image | |
| desc = f"[Image {i+1}] " | |
| desc += f"File: {metadata.get('filename', 'Unknown')}, " | |
| desc += f"Page: {metadata.get('page_number', 'N/A')}, " | |
| desc += f"Year: {metadata.get('year', 'N/A')}, " | |
| desc += f"District: {metadata.get('district', 'N/A')}, " | |
| desc += f"Score: {score:.3f}" | |
| if content: | |
| desc += f"\nExtracted text preview: {content[:300]}..." | |
| image_descriptions.append(desc) | |
| logger.info(f"πΌοΈ MULTI-MODAL: Image {i+1} loaded successfully") | |
| else: | |
| logger.warning(f"β οΈ MULTI-MODAL: Failed to load image {i+1}") | |
| else: | |
| logger.warning(f"β οΈ MULTI-MODAL: No valid image URL for doc {i+1}") | |
| if not image_contents: | |
| logger.warning("β οΈ MULTI-MODAL: No images loaded, falling back to text-only") | |
| return None | |
| logger.info(f"πΌοΈ MULTI-MODAL: {len(image_contents)} images loaded for LLM") | |
| # Build the multi-modal prompt | |
| system_prompt = """You are a helpful audit report assistant with the ability to SEE document images directly. | |
| CRITICAL RULES - VISUAL ANALYSIS: | |
| 1. **LOOK AT THE IMAGES**: You can see the actual document pages. Analyze tables, charts, figures, and text directly. | |
| 2. **ONLY use information visible in the images or provided text** - DO NOT hallucinate | |
| 3. **EVERY claim MUST reference which image/document it came from** using [Image 1], [Image 2], etc. | |
| 4. **If you see tables or figures in the images, describe what they show** | |
| 5. **If information is not visible in any image, explicitly state that** | |
| 6. **USE CORRECT NAMES**: Always use the exact names from document metadata provided. | |
| RESPONSE STYLE: | |
| - Be conversational, not technical | |
| - Use bullet points and lists when appropriate | |
| - Reference specific images: "As shown in [Image 1]..." or "The table in [Image 2] indicates..." | |
| - If multiple images show similar information, cite all: [Image 1, Image 2] | |
| - Don't describe the image layout/format, focus on the CONTENT | |
| TONE: Professional but friendly, like talking to a colleague who can see the same documents.""" | |
| # Build user message with images | |
| user_content = [] | |
| # Add text context first | |
| text_context = f"""Conversation History: | |
| {conversation_context} | |
| Current User Question: {query} | |
| CORRECT NAMES TO USE (from document metadata): | |
| {correct_names} | |
| Image Descriptions (for reference): | |
| {chr(10).join(image_descriptions)} | |
| INSTRUCTIONS: | |
| 1. LOOK at the {len(image_contents)} document images below | |
| 2. Answer the user's question based on what you SEE in the images | |
| 3. Reference specific images when citing information | |
| 4. If the answer isn't visible in any image, say so | |
| Now analyze the images and answer the question:""" | |
| user_content.append({ | |
| "type": "text", | |
| "text": text_context | |
| }) | |
| # Add images | |
| user_content.extend(image_contents) | |
| try: | |
| logger.info(f"πΌοΈ MULTI-MODAL: Calling {MULTIMODAL_MODEL}...") | |
| response = self.openai_client.chat.completions.create( | |
| model=MULTIMODAL_MODEL, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_content} | |
| ], | |
| max_tokens=2000, | |
| temperature=0.3 | |
| ) | |
| response_text = response.choices[0].message.content.strip() | |
| # Detailed token usage logging | |
| usage = response.usage | |
| logger.info(f"πΌοΈ MULTI-MODAL: Response received") | |
| logger.info(f"πΌοΈ MULTI-MODAL: Response length: {len(response_text)} chars") | |
| logger.info(f"πΌοΈ MULTI-MODAL: Response preview: {response_text[:300]}...") | |
| logger.info(f"πΌοΈ MULTI-MODAL: Token usage breakdown:") | |
| logger.info(f" π₯ Input tokens (prompt + images): {usage.prompt_tokens}") | |
| logger.info(f" π€ Output tokens (response): {usage.completion_tokens}") | |
| logger.info(f" π Total tokens: {usage.total_tokens}") | |
| # Estimate cost (GPT-4o pricing: $2.50/1M input, $10/1M output as of 2024) | |
| input_cost = (usage.prompt_tokens / 1_000_000) * 2.50 | |
| output_cost = (usage.completion_tokens / 1_000_000) * 10.00 | |
| total_cost = input_cost + output_cost | |
| logger.info(f" π° Estimated cost: ${total_cost:.4f} (input: ${input_cost:.4f}, output: ${output_cost:.4f})") | |
| return response_text | |
| except Exception as e: | |
| logger.error(f"β MULTI-MODAL ERROR: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _generate_conversational_response( | |
| self, | |
| query: str, | |
| documents: List[Any], | |
| rag_answer: str, | |
| messages: List[Any], | |
| filters: Dict[str, Any] = None | |
| ) -> str: | |
| """ | |
| Generate conversational response from visually retrieved documents. | |
| Phase 2 IMPLEMENTED: Multi-modal LLM support | |
| - Sends top 3 images (by relevance) directly to GPT-4o | |
| - LLM can see tables, charts, figures directly | |
| - Falls back to text-only if multi-modal fails | |
| Current implementation: Multi-modal with text fallback | |
| Args: | |
| query: User query | |
| documents: Retrieved visual documents | |
| rag_answer: RAG answer (empty for visual search) | |
| messages: Conversation history | |
| Returns: | |
| LLM response | |
| """ | |
| logger.info("=" * 80) | |
| logger.info("π¬ VISUAL RESPONSE GENERATION: Starting") | |
| logger.info("=" * 80) | |
| logger.info(f"π¬ VISUAL RESPONSE GENERATION: Processing {len(documents)} visual documents") | |
| logger.info(f"π¬ VISUAL RESPONSE GENERATION: Query: '{query}'") | |
| logger.info(f"π¬ VISUAL RESPONSE GENERATION: Filters applied: {filters}") | |
| # Log each document's metadata and content preview | |
| for i, doc in enumerate(documents[:5]): # Log first 5 docs | |
| metadata = getattr(doc, 'metadata', {}) | |
| content = getattr(doc, 'page_content', '') | |
| logger.info(f"π DOC {i+1} METADATA: filename={metadata.get('filename', 'N/A')}, " | |
| f"year={metadata.get('year', 'N/A')}, district={metadata.get('district', 'N/A')}, " | |
| f"source={metadata.get('source', 'N/A')}, page={metadata.get('page_number', 'N/A')}") | |
| logger.info(f"π DOC {i+1} CONTENT PREVIEW: {content[:200]}..." if content else f"π DOC {i+1} CONTENT: EMPTY/NONE") | |
| logger.info(f"π DOC {i+1} CONTENT LENGTH: {len(content)} chars" if content else "π DOC {i+1} CONTENT LENGTH: 0") | |
| # Build conversation history context | |
| conversation_context = self._build_conversation_context_for_response(messages) | |
| # Build detailed document information | |
| document_details = self._build_visual_document_details(documents) | |
| logger.info(f"π¬ VISUAL RESPONSE GENERATION: Document details length: {len(document_details)} chars") | |
| # Extract correct names from documents | |
| correct_names = self._extract_correct_names_from_documents(documents) | |
| logger.info(f"π¬ VISUAL RESPONSE GENERATION: Correct names: {correct_names}") | |
| # ============================================================ | |
| # PHASE 2: Try multi-modal generation first (GPT-4o with images) | |
| # ============================================================ | |
| if self.enable_multimodal: | |
| logger.info("πΌοΈ VISUAL RESPONSE GENERATION: Attempting multi-modal generation (GPT-4o with images)...") | |
| multimodal_response = self._generate_multimodal_response( | |
| query=query, | |
| documents=documents, | |
| conversation_context=conversation_context, | |
| correct_names=correct_names, | |
| filters=filters | |
| ) | |
| if multimodal_response: | |
| logger.info("β VISUAL RESPONSE GENERATION: Multi-modal generation successful!") | |
| # Validate and enhance the response | |
| final_response = self._validate_and_enhance_response( | |
| multimodal_response, | |
| documents, | |
| query, | |
| filters | |
| ) | |
| logger.info("=" * 80) | |
| return final_response | |
| else: | |
| logger.warning("β οΈ VISUAL RESPONSE GENERATION: Multi-modal failed, falling back to text-only") | |
| else: | |
| logger.info("π VISUAL RESPONSE GENERATION: Multi-modal disabled, using text-only mode") | |
| # ============================================================ | |
| # FALLBACK: Text-only generation (extracted text from pages) | |
| # ============================================================ | |
| logger.info("π VISUAL RESPONSE GENERATION: Using text-only mode") | |
| # Create response prompt | |
| response_prompt = ChatPromptTemplate.from_messages([ | |
| SystemMessage(content="""You are a helpful audit report assistant. Generate a natural, conversational response. | |
| CRITICAL RULES - NO HALLUCINATION: | |
| 1. **ONLY use information from the retrieved documents provided below** | |
| 2. **EVERY sentence with facts, numbers, or specific claims MUST have a [Doc i] reference** | |
| 3. **If a document doesn't contain the information, DO NOT make it up** | |
| 4. **If the user asks about a year/district that's NOT in the retrieved documents, explicitly state that** | |
| 5. **Check the document years/districts before making any claims about them** | |
| 6. **USE CORRECT NAMES**: Always use the exact names from document metadata, not misspellings from conversation. | |
| RULES: | |
| 1. Answer the user's question directly and clearly | |
| 2. Use ONLY the retrieved documents as evidence - DO NOT use your training data | |
| 3. Be conversational, not technical | |
| 4. Don't mention scores, retrieval details, or technical implementation | |
| 5. If relevant documents were found, reference them naturally | |
| 6. If no relevant documents, say you do not have enough information - DO NOT hallucinate | |
| 7. If the passages have useful facts or numbers, use them in your answer WITH references | |
| 8. **MANDATORY**: When you use information from a passage, mention where it came from by using [Doc i] at the end of the sentence. | |
| 9. Do not use the sentence 'Doc i says ...' to say where information came from. | |
| 10. If the same thing is said in more than one document, you can mention all of them like this: [Doc i, Doc j, Doc k] | |
| 11. Do not just summarize each passage one by one. Group your summaries to highlight the key parts in the explanation. | |
| 12. If it makes sense, use bullet points and lists to make your answers easier to understand. | |
| 13. You do not need to use every passage. Only use the ones that help answer the question. | |
| 14. **VERIFY**: Before mentioning any year, district, or number, check that it exists in the retrieved documents. | |
| 15. **NO HALLUCINATION**: If documents show years 2021, 2022, 2023 but user asks about 2020, DO NOT provide 2020 data. | |
| 16. **USE CORRECT SPELLING**: Always use the district/source names exactly as they appear in the document metadata below. | |
| NOTE: These documents were retrieved using advanced visual search (ColPali), so they may contain tables, figures, or structured data. | |
| TONE: Professional but friendly, like talking to a colleague."""), | |
| HumanMessage(content=f"""Conversation History: | |
| {conversation_context} | |
| Current User Question: {query} | |
| Retrieved Documents: {len(documents)} documents found | |
| CORRECT NAMES TO USE (from document metadata - use these exact spellings): | |
| {correct_names} | |
| Full Document Details: | |
| {document_details} | |
| CRITICAL: | |
| - Responses should be grounded to what is available in the retrieved documents | |
| - If user asks about a specific year but documents show other years, explicitly state "can't provide response on ... because ..." | |
| - Every factual claim MUST have [Doc i] reference | |
| - If information is not in documents, explicitly state it's not available | |
| - **USE THE CORRECT DISTRICT/SOURCE NAMES from the document metadata above** | |
| Generate a conversational response with proper document references:""") | |
| ]) | |
| try: | |
| logger.info(f"π TEXT-ONLY GENERATION: Calling LLM...") | |
| response = self.llm.invoke(response_prompt.format_messages()) | |
| response_text = response.content.strip() | |
| logger.info(f"π TEXT-ONLY GENERATION: LLM response received") | |
| logger.info(f"π TEXT-ONLY GENERATION: Response length: {len(response_text)} chars") | |
| logger.info(f"π TEXT-ONLY GENERATION: Response preview: {response_text[:300]}...") | |
| # Check if response indicates no information found | |
| no_info_indicators = [ | |
| "don't have", "do not have", "isn't available", "is not available", | |
| "no information", "cannot provide", "can't provide", "not in the retrieved" | |
| ] | |
| if any(indicator in response_text.lower() for indicator in no_info_indicators): | |
| logger.warning("β οΈ TEXT-ONLY GENERATION: Response indicates NO INFORMATION FOUND") | |
| logger.warning("β οΈ This could mean:") | |
| logger.warning(" 1. Retrieved documents don't contain relevant content") | |
| logger.warning(" 2. Extracted text from visual documents is empty/poor quality") | |
| logger.warning(" 3. Query doesn't match document content") | |
| # Validate and enhance the response | |
| final_response = self._validate_and_enhance_response( | |
| response_text, | |
| documents, | |
| query, | |
| filters | |
| ) | |
| logger.info("=" * 80) | |
| return final_response | |
| except Exception as e: | |
| logger.error(f"β TEXT-ONLY GENERATION: Error during generation: {e}") | |
| traceback.print_exc() | |
| return "I apologize, but I encountered an error generating the response." | |
| def _generate_conversational_response_without_docs( | |
| self, | |
| query: str, | |
| messages: List[Any] | |
| ) -> str: | |
| """ | |
| Generate conversational response using only LLM knowledge. | |
| Args: | |
| query: User query | |
| messages: Conversation history | |
| Returns: | |
| LLM response | |
| """ | |
| logger.info("π¬ RESPONSE GENERATION (NO DOCS): Starting response generation without documents") | |
| # Build conversation context | |
| conversation_context = "" | |
| for msg in messages[-6:]: | |
| if isinstance(msg, HumanMessage): | |
| conversation_context += f"User: {msg.content}\n" | |
| elif isinstance(msg, AIMessage): | |
| conversation_context += f"Assistant: {msg.content}\n" | |
| # Create response prompt | |
| response_prompt = ChatPromptTemplate.from_messages([ | |
| SystemMessage(content="""You are a helpful audit report assistant. | |
| RULES: | |
| 1. Politely explain that no relevant documents were found with high enough similarity | |
| 2. Suggest rephrasing the query or being more specific | |
| 3. Suggest checking if the information might be in a different year/source/district | |
| 4. Stay professional but friendly | |
| TONE: Professional but friendly, like talking to a colleague."""), | |
| HumanMessage(content=f"""Current Question: {query} | |
| Conversation History: | |
| {conversation_context} | |
| Note: No relevant documents were found with high enough similarity scores. | |
| Generate a helpful response:""") | |
| ]) | |
| try: | |
| response = self.llm.invoke(response_prompt.format_messages()) | |
| return response.content.strip() | |
| except Exception as e: | |
| logger.error(f"β RESPONSE GENERATION (NO DOCS): Error: {e}") | |
| return "I couldn't find relevant documents for your query. Please try rephrasing or being more specific." | |
| def _build_conversation_context_for_response(self, messages: List[Any]) -> str: | |
| """Build conversation history context for response generation""" | |
| context_lines = [] | |
| for msg in messages[-6:]: | |
| if isinstance(msg, HumanMessage): | |
| context_lines.append(f"User: {msg.content}") | |
| elif isinstance(msg, AIMessage): | |
| context_lines.append(f"Assistant: {msg.content}") | |
| return "\n".join(context_lines) if context_lines else "No previous conversation." | |
| def _build_visual_document_details(self, documents: List[Any]) -> str: | |
| """Build detailed document information for response generation""" | |
| if not documents: | |
| logger.warning("π BUILD_DETAILS: No documents to process!") | |
| return "No documents retrieved." | |
| logger.info(f"π BUILD_DETAILS: Processing {len(documents)} documents for LLM context") | |
| details = [] | |
| docs_with_content = 0 | |
| docs_without_content = 0 | |
| total_content_length = 0 | |
| for i, doc in enumerate(documents[:15], 1): | |
| metadata = getattr(doc, 'metadata', {}) | |
| content = getattr(doc, 'page_content', '') | |
| score = getattr(doc, 'score', 0.0) | |
| filename = metadata.get('filename', 'Unknown') | |
| year = metadata.get('year', 'Unknown') | |
| district = metadata.get('district', 'Unknown') | |
| source = metadata.get('source', 'Unknown') | |
| page = metadata.get('page_number', metadata.get('page', 'Unknown')) | |
| # Visual metadata | |
| num_tiles = metadata.get('num_tiles') | |
| num_visual_tokens = metadata.get('num_visual_tokens') | |
| doc_info = f"[Doc {i}] (Score: {score:.3f})" | |
| doc_info += f"\n Filename: {filename}" | |
| doc_info += f"\n Year: {year}" | |
| doc_info += f"\n Source: {source}" | |
| if district != 'Unknown': | |
| doc_info += f"\n District: {district}" | |
| doc_info += f"\n Page: {page}" | |
| # Add visual metadata if available | |
| if num_tiles or num_visual_tokens: | |
| doc_info += f"\n Visual: {num_tiles} tiles, {num_visual_tokens} tokens" | |
| # Add content preview | |
| if content and content.strip(): | |
| doc_info += f"\n Content: {content[:500]}{'...' if len(content) > 500 else ''}" | |
| docs_with_content += 1 | |
| total_content_length += len(content) | |
| else: | |
| doc_info += "\n Content: (No text extracted - image-only page)" | |
| docs_without_content += 1 | |
| details.append(doc_info) | |
| # Log summary | |
| logger.info(f"π BUILD_DETAILS SUMMARY:") | |
| logger.info(f" - Documents with text content: {docs_with_content}") | |
| logger.info(f" - Documents WITHOUT text (image-only): {docs_without_content}") | |
| logger.info(f" - Total text content length: {total_content_length} chars") | |
| if docs_without_content > docs_with_content: | |
| logger.warning(f"β οΈ BUILD_DETAILS: Most documents have NO TEXT CONTENT!") | |
| logger.warning(f"β οΈ This is likely why the LLM says 'no information available'") | |
| return "\n\n".join(details) if details else "No document details available." | |
| def _extract_correct_names_from_documents(self, documents: List[Any]) -> str: | |
| """Extract correct district/source names from documents to correct misspellings""" | |
| districts = set() | |
| sources = set() | |
| years = set() | |
| for doc in documents: | |
| metadata = getattr(doc, 'metadata', {}) | |
| if metadata.get('district'): | |
| districts.add(str(metadata['district'])) | |
| if metadata.get('source'): | |
| sources.add(str(metadata['source'])) | |
| if metadata.get('year'): | |
| years.add(str(metadata['year'])) | |
| result = [] | |
| if districts: | |
| result.append(f"Districts: {', '.join(sorted(districts))}") | |
| if sources: | |
| result.append(f"Sources: {', '.join(sorted(sources))}") | |
| if years: | |
| result.append(f"Years: {', '.join(sorted(years))}") | |
| if result: | |
| return "\n".join(result) + "\n\nIMPORTANT: Use these EXACT spellings in your response." | |
| return "No metadata available." | |
| def _validate_and_enhance_response( | |
| self, | |
| response: str, | |
| documents: List[Any], | |
| query: str, | |
| filters: Dict[str, Any] = None | |
| ) -> str: | |
| """ | |
| Validate response and add warnings about data coverage gaps. | |
| Compares REQUESTED filters against RETRIEVED document metadata. | |
| Args: | |
| response: LLM-generated response | |
| documents: Retrieved documents | |
| query: User query | |
| filters: Applied filters (year, district, etc.) | |
| Returns: | |
| Response with optional warnings appended | |
| """ | |
| import re | |
| # Extract years and districts from RETRIEVED documents | |
| doc_years = set() | |
| doc_districts = set() | |
| for doc in documents: | |
| metadata = getattr(doc, 'metadata', {}) if hasattr(doc, 'metadata') else {} | |
| if isinstance(metadata, dict): | |
| if metadata.get('year'): | |
| doc_years.add(str(metadata['year'])) | |
| if metadata.get('district'): | |
| doc_districts.add(str(metadata['district'])) | |
| logger.info(f"π VALIDATION: Retrieved docs cover years={doc_years}, districts={doc_districts}") | |
| warnings = [] | |
| # Get REQUESTED filters | |
| requested_years = set() | |
| requested_districts = set() | |
| if filters: | |
| if filters.get('year'): | |
| if isinstance(filters['year'], list): | |
| requested_years = set(str(y) for y in filters['year']) | |
| else: | |
| requested_years = {str(filters['year'])} | |
| if filters.get('district'): | |
| if isinstance(filters['district'], list): | |
| requested_districts = set(str(d) for d in filters['district']) | |
| else: | |
| requested_districts = {str(filters['district'])} | |
| logger.info(f"π VALIDATION: Requested years={requested_years}, districts={requested_districts}") | |
| # Compare requested vs retrieved YEARS | |
| if requested_years and doc_years: | |
| missing_years = requested_years - doc_years | |
| if missing_years: | |
| warnings.append( | |
| f"You requested data for years {', '.join(sorted(requested_years))}, " | |
| f"but the retrieved documents are missing {', '.join(sorted(missing_years))} " | |
| f"(may not be available in the database)." | |
| ) | |
| # Compare requested vs retrieved DISTRICTS | |
| if requested_districts and doc_districts: | |
| # Normalize for comparison (case-insensitive) | |
| requested_lower = {d.lower() for d in requested_districts} | |
| doc_lower = {d.lower() for d in doc_districts} | |
| missing_lower = requested_lower - doc_lower | |
| if missing_lower: | |
| missing_districts = [d for d in requested_districts if d.lower() in missing_lower] | |
| warnings.append( | |
| f"You requested data for districts {', '.join(sorted(requested_districts))}, " | |
| f"but the retrieved documents are missing {', '.join(sorted(missing_districts))} " | |
| f"(may not be available in the database)." | |
| ) | |
| # Add warnings to response if any | |
| if warnings and "β οΈ" not in response: | |
| warning_text = "\n\nβ οΈ **Note:** " + " ".join(warnings) | |
| response = response + warning_text | |
| logger.info(f"π VALIDATION: Added warning about data coverage") | |
| return response | |
| def get_visual_multi_agent_chatbot() -> VisualMultiAgentChatbot: | |
| """ | |
| Factory function to create a visual multi-agent chatbot. | |
| Returns: | |
| Initialized VisualMultiAgentChatbot | |
| """ | |
| import os | |
| from src.colpali.visual_search import VisualSearchAdapter | |
| logger.info("π¨ Creating Visual Multi-Agent Chatbot...") | |
| # Get Qdrant credentials for ColPali cluster | |
| qdrant_url = ( | |
| os.environ.get("QDRANT_URL_AKRYL") or | |
| os.environ.get("DEST_QDRANT_URL") or | |
| os.environ.get("QDRANT_URL") | |
| ) | |
| qdrant_api_key = ( | |
| os.environ.get("QDRANT_API_KEY_AKRYL") or | |
| os.environ.get("DEST_QDRANT_API_KEY") or | |
| os.environ.get("QDRANT_API_KEY") | |
| ) | |
| # Get collection name from env var (default to colSmol-500M-v2 for new processing) | |
| collection_name = os.environ.get("QDRANT_COLLECTION_VISUAL", "colSmol-500M-v2") | |
| if not qdrant_url or not qdrant_api_key: | |
| raise ValueError( | |
| "Visual mode requires Qdrant credentials for the ColPali cluster.\n" | |
| "Please set one of these in your .env file:\n" | |
| " - QDRANT_URL_AKRYL and QDRANT_API_KEY_AKRYL\n" | |
| " - DEST_QDRANT_URL and DEST_QDRANT_API_KEY\n" | |
| " - QDRANT_URL and QDRANT_API_KEY\n" | |
| "And optionally set QDRANT_COLLECTION_VISUAL (default: colSmol-500M-v2)" | |
| ) | |
| logger.info(f" Using Qdrant URL: {qdrant_url}") | |
| logger.info(f" Collection: {collection_name}") | |
| logger.info(f" Multi-modal: {MULTIMODAL_ENABLED} (model: {MULTIMODAL_MODEL}, max_images: {MULTIMODAL_MAX_IMAGES})") | |
| # Create visual search adapter | |
| visual_search = VisualSearchAdapter( | |
| qdrant_url=qdrant_url, | |
| qdrant_api_key=qdrant_api_key, | |
| collection_name=collection_name | |
| ) | |
| # Create multi-agent chatbot with multi-modal enabled | |
| chatbot = VisualMultiAgentChatbot( | |
| visual_search=visual_search, | |
| config_path="src/config/settings.yaml", | |
| enable_multimodal=MULTIMODAL_ENABLED | |
| ) | |
| return chatbot | |