audit_assistant / src /agents /visual_multi_agent_chatbot.py
akryldigital's picture
reduce Note: ... fitler warning
3c05bef verified
"""
Visual Multi-Agent RAG Chatbot using LangGraph
This system uses the same sophisticated multi-agent architecture as v1,
but with visual document retrieval (ColPali) instead of text-based RAG.
Inherits from BaseMultiAgentChatbot to get:
- LLM-based query analysis
- Filter extraction and validation
- Query rewriting
- Main/RAG/Response agent orchestration
Only implements:
- Visual search retrieval
- Response generation with visual context
Phase 2 IMPLEMENTED: Multi-modal LLM support
- Top 3 images (by relevance) are sent directly to GPT-4o
- LLM can see tables, charts, figures directly
- Falls back to text-only if multi-modal fails
"""
import os
import time
import base64
import logging
import traceback
import httpx
from typing import Dict, List, Any, Optional
from openai import OpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from src.agents.base_multi_agent_chatbot import BaseMultiAgentChatbot, MultiAgentState
from src.colpali.visual_search import VisualSearchAdapter
logger = logging.getLogger(__name__)
# Multi-modal LLM configuration
MULTIMODAL_MODEL = os.environ.get("VISUAL_RAG_MODEL", "gpt-4o") # GPT-4o supports vision
MULTIMODAL_MAX_IMAGES = int(os.environ.get("VISUAL_RAG_MAX_IMAGES", "3")) # Top N images by relevance score
MULTIMODAL_ENABLED = os.environ.get("VISUAL_RAG_MULTIMODAL", "true").lower() == "true" # Toggle for multi-modal mode
class VisualMultiAgentChatbot(BaseMultiAgentChatbot):
"""Multi-agent chatbot with visual RAG (ColPali) and multi-modal response generation"""
def __init__(
self,
visual_search: VisualSearchAdapter,
config_path: str = "src/config/settings.yaml",
enable_multimodal: bool = True
):
"""
Initialize the visual multi-agent chatbot.
Args:
visual_search: Visual search adapter (ColPali)
config_path: Path to config file
enable_multimodal: Whether to use multi-modal LLM (GPT-4o with images)
"""
self.visual_search = visual_search
self.enable_multimodal = enable_multimodal and MULTIMODAL_ENABLED
# Initialize OpenAI client for multi-modal (GPT-4o)
self.openai_client = None
if self.enable_multimodal:
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
self.openai_client = OpenAI(api_key=api_key)
logger.info(f"πŸ–ΌοΈ Multi-modal LLM initialized: {MULTIMODAL_MODEL}")
else:
logger.warning("⚠️ OPENAI_API_KEY not set, multi-modal disabled")
self.enable_multimodal = False
# Call parent init (sets up LLM, filters, graph, etc.)
super().__init__(config_path)
logger.info(f"🎨 Visual Multi-Agent Chatbot initialized (multi-modal: {self.enable_multimodal})")
def _perform_retrieval(self, query: str, filters: Dict[str, Any]) -> Any:
"""
Perform visual search retrieval.
Args:
query: The rewritten query
filters: The filters to apply
Returns:
Result object with .sources and .answer attributes
"""
logger.info(f"πŸ” VISUAL RETRIEVAL: Performing visual search")
logger.info(f"πŸ” VISUAL RETRIEVAL: Query: '{query}'")
logger.info(f"πŸ” VISUAL RETRIEVAL: Filters: {filters}")
# Convert filters to visual search format
visual_filters = {}
if filters.get("sources"):
visual_filters["sources"] = filters["sources"]
if filters.get("year"):
# Convert to "years" (plural) for visual search
visual_filters["years"] = filters["year"]
if filters.get("district"):
# Convert to "districts" (plural) for visual search
visual_filters["districts"] = filters["district"]
if filters.get("filenames"):
visual_filters["filenames"] = filters["filenames"]
logger.info(f"πŸ” VISUAL RETRIEVAL: Converted filters: {visual_filters}")
# Perform visual search
try:
visual_results = self.visual_search.search(
query=query,
top_k=10,
filters=visual_filters,
search_strategy="multi_vector"
)
logger.info(f"πŸ” VISUAL RETRIEVAL: Retrieved {len(visual_results)} visual documents")
# Return in format expected by base class
class Result:
def __init__(self, sources, answer=""):
self.sources = sources
self.answer = answer
return Result(visual_results, "")
except Exception as e:
logger.error(f"❌ VISUAL RETRIEVAL: Error during visual search: {e}")
traceback.print_exc()
# Return empty result
class Result:
def __init__(self, sources, answer=""):
self.sources = sources
self.answer = answer
return Result([], "")
def _response_agent(self, state: MultiAgentState) -> MultiAgentState:
"""
Override response agent for Visual RAG.
Visual RAG uses MaxSim scores (typically 10-30+) instead of cosine similarity (0-1),
so we skip the similarity score threshold check that's in the base class.
"""
logger.info("πŸ“ VISUAL RESPONSE AGENT: Starting document retrieval and answer generation")
rag_query = state["rag_query"]
filters = state["rag_filters"]
logger.info(f"πŸ“ VISUAL RESPONSE AGENT: Query: '{rag_query}'")
logger.info(f"πŸ“ VISUAL RESPONSE AGENT: Filters: {filters}")
try:
# Call visual retrieval
result = self._perform_retrieval(rag_query, filters)
state["retrieved_documents"] = result.sources
state["agent_logs"].append(f"VISUAL RESPONSE AGENT: Retrieved {len(result.sources)} documents")
logger.info(f"πŸ“ VISUAL RESPONSE AGENT: Retrieved {len(result.sources)} documents")
# For Visual RAG, we don't check similarity scores (MaxSim scores are different scale)
# Just check if we have any documents
if not result.sources:
logger.warning(f"⚠️ VISUAL RESPONSE AGENT: No documents retrieved, using LLM knowledge only")
response = self._generate_conversational_response_without_docs(
state["current_query"],
state["messages"]
)
else:
# Generate conversational response with documents
response = self._generate_conversational_response(
state["current_query"],
result.sources,
result.answer,
state["messages"],
filters
)
state["final_response"] = response
state["last_ai_message_time"] = time.time()
logger.info(f"πŸ“ VISUAL RESPONSE AGENT: Answer generation complete")
except Exception as e:
logger.error(f"❌ VISUAL RESPONSE AGENT ERROR: {e}")
traceback.print_exc()
state["final_response"] = "I apologize, but I encountered an error while retrieving visual documents. Please try again."
state["last_ai_message_time"] = time.time()
return state
def _fetch_image_as_base64(self, image_url: str, timeout: float = 10.0) -> Optional[str]:
"""
Fetch an image from URL and convert to base64.
Args:
image_url: URL of the image
timeout: Request timeout in seconds
Returns:
Base64 encoded image string, or None if failed
"""
try:
with httpx.Client(timeout=timeout) as client:
response = client.get(image_url)
response.raise_for_status()
# Determine content type
content_type = response.headers.get('content-type', 'image/jpeg')
if 'png' in content_type:
media_type = 'image/png'
elif 'gif' in content_type:
media_type = 'image/gif'
elif 'webp' in content_type:
media_type = 'image/webp'
else:
media_type = 'image/jpeg'
# Encode to base64
base64_image = base64.b64encode(response.content).decode('utf-8')
return f"data:{media_type};base64,{base64_image}"
except Exception as e:
logger.warning(f"⚠️ Failed to fetch image {image_url}: {e}")
return None
def _generate_multimodal_response(
self,
query: str,
documents: List[Any],
conversation_context: str,
correct_names: str,
filters: Dict[str, Any] = None
) -> Optional[str]:
"""
Generate response using GPT-4o with images (multi-modal).
Sends top 3 images by relevance score directly to the LLM.
Args:
query: User query
documents: Retrieved visual documents (sorted by score)
conversation_context: Formatted conversation history
correct_names: Correct district/source names from metadata
filters: Applied filters
Returns:
LLM response string, or None if multi-modal generation failed
"""
if not self.openai_client or not self.enable_multimodal:
logger.info("πŸ–ΌοΈ Multi-modal disabled, skipping")
return None
logger.info("=" * 80)
logger.info("πŸ–ΌοΈ MULTI-MODAL RESPONSE GENERATION: Starting")
logger.info("=" * 80)
# Get top N images by relevance score
top_docs = documents[:MULTIMODAL_MAX_IMAGES]
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Processing top {len(top_docs)} documents for image injection")
# Fetch images and build content
image_contents = []
image_descriptions = []
for i, doc in enumerate(top_docs):
metadata = getattr(doc, 'metadata', {})
content = getattr(doc, 'page_content', '')
score = getattr(doc, 'score', 0.0)
# Get image URL (prefer original for better quality)
image_url = metadata.get('original_url') or metadata.get('resized_url') or metadata.get('page')
if image_url and isinstance(image_url, str) and image_url.startswith('http'):
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Fetching image {i+1}: {image_url[:80]}...")
# Fetch and encode image
base64_image = self._fetch_image_as_base64(image_url)
if base64_image:
image_contents.append({
"type": "image_url",
"image_url": {
"url": base64_image,
"detail": "high" # High detail for document analysis
}
})
# Build description for this image
desc = f"[Image {i+1}] "
desc += f"File: {metadata.get('filename', 'Unknown')}, "
desc += f"Page: {metadata.get('page_number', 'N/A')}, "
desc += f"Year: {metadata.get('year', 'N/A')}, "
desc += f"District: {metadata.get('district', 'N/A')}, "
desc += f"Score: {score:.3f}"
if content:
desc += f"\nExtracted text preview: {content[:300]}..."
image_descriptions.append(desc)
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Image {i+1} loaded successfully")
else:
logger.warning(f"⚠️ MULTI-MODAL: Failed to load image {i+1}")
else:
logger.warning(f"⚠️ MULTI-MODAL: No valid image URL for doc {i+1}")
if not image_contents:
logger.warning("⚠️ MULTI-MODAL: No images loaded, falling back to text-only")
return None
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: {len(image_contents)} images loaded for LLM")
# Build the multi-modal prompt
system_prompt = """You are a helpful audit report assistant with the ability to SEE document images directly.
CRITICAL RULES - VISUAL ANALYSIS:
1. **LOOK AT THE IMAGES**: You can see the actual document pages. Analyze tables, charts, figures, and text directly.
2. **ONLY use information visible in the images or provided text** - DO NOT hallucinate
3. **EVERY claim MUST reference which image/document it came from** using [Image 1], [Image 2], etc.
4. **If you see tables or figures in the images, describe what they show**
5. **If information is not visible in any image, explicitly state that**
6. **USE CORRECT NAMES**: Always use the exact names from document metadata provided.
RESPONSE STYLE:
- Be conversational, not technical
- Use bullet points and lists when appropriate
- Reference specific images: "As shown in [Image 1]..." or "The table in [Image 2] indicates..."
- If multiple images show similar information, cite all: [Image 1, Image 2]
- Don't describe the image layout/format, focus on the CONTENT
TONE: Professional but friendly, like talking to a colleague who can see the same documents."""
# Build user message with images
user_content = []
# Add text context first
text_context = f"""Conversation History:
{conversation_context}
Current User Question: {query}
CORRECT NAMES TO USE (from document metadata):
{correct_names}
Image Descriptions (for reference):
{chr(10).join(image_descriptions)}
INSTRUCTIONS:
1. LOOK at the {len(image_contents)} document images below
2. Answer the user's question based on what you SEE in the images
3. Reference specific images when citing information
4. If the answer isn't visible in any image, say so
Now analyze the images and answer the question:"""
user_content.append({
"type": "text",
"text": text_context
})
# Add images
user_content.extend(image_contents)
try:
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Calling {MULTIMODAL_MODEL}...")
response = self.openai_client.chat.completions.create(
model=MULTIMODAL_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
],
max_tokens=2000,
temperature=0.3
)
response_text = response.choices[0].message.content.strip()
# Detailed token usage logging
usage = response.usage
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Response received")
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Response length: {len(response_text)} chars")
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Response preview: {response_text[:300]}...")
logger.info(f"πŸ–ΌοΈ MULTI-MODAL: Token usage breakdown:")
logger.info(f" πŸ“₯ Input tokens (prompt + images): {usage.prompt_tokens}")
logger.info(f" πŸ“€ Output tokens (response): {usage.completion_tokens}")
logger.info(f" πŸ“Š Total tokens: {usage.total_tokens}")
# Estimate cost (GPT-4o pricing: $2.50/1M input, $10/1M output as of 2024)
input_cost = (usage.prompt_tokens / 1_000_000) * 2.50
output_cost = (usage.completion_tokens / 1_000_000) * 10.00
total_cost = input_cost + output_cost
logger.info(f" πŸ’° Estimated cost: ${total_cost:.4f} (input: ${input_cost:.4f}, output: ${output_cost:.4f})")
return response_text
except Exception as e:
logger.error(f"❌ MULTI-MODAL ERROR: {e}")
traceback.print_exc()
return None
def _generate_conversational_response(
self,
query: str,
documents: List[Any],
rag_answer: str,
messages: List[Any],
filters: Dict[str, Any] = None
) -> str:
"""
Generate conversational response from visually retrieved documents.
Phase 2 IMPLEMENTED: Multi-modal LLM support
- Sends top 3 images (by relevance) directly to GPT-4o
- LLM can see tables, charts, figures directly
- Falls back to text-only if multi-modal fails
Current implementation: Multi-modal with text fallback
Args:
query: User query
documents: Retrieved visual documents
rag_answer: RAG answer (empty for visual search)
messages: Conversation history
Returns:
LLM response
"""
logger.info("=" * 80)
logger.info("πŸ’¬ VISUAL RESPONSE GENERATION: Starting")
logger.info("=" * 80)
logger.info(f"πŸ’¬ VISUAL RESPONSE GENERATION: Processing {len(documents)} visual documents")
logger.info(f"πŸ’¬ VISUAL RESPONSE GENERATION: Query: '{query}'")
logger.info(f"πŸ’¬ VISUAL RESPONSE GENERATION: Filters applied: {filters}")
# Log each document's metadata and content preview
for i, doc in enumerate(documents[:5]): # Log first 5 docs
metadata = getattr(doc, 'metadata', {})
content = getattr(doc, 'page_content', '')
logger.info(f"πŸ“„ DOC {i+1} METADATA: filename={metadata.get('filename', 'N/A')}, "
f"year={metadata.get('year', 'N/A')}, district={metadata.get('district', 'N/A')}, "
f"source={metadata.get('source', 'N/A')}, page={metadata.get('page_number', 'N/A')}")
logger.info(f"πŸ“„ DOC {i+1} CONTENT PREVIEW: {content[:200]}..." if content else f"πŸ“„ DOC {i+1} CONTENT: EMPTY/NONE")
logger.info(f"πŸ“„ DOC {i+1} CONTENT LENGTH: {len(content)} chars" if content else "πŸ“„ DOC {i+1} CONTENT LENGTH: 0")
# Build conversation history context
conversation_context = self._build_conversation_context_for_response(messages)
# Build detailed document information
document_details = self._build_visual_document_details(documents)
logger.info(f"πŸ’¬ VISUAL RESPONSE GENERATION: Document details length: {len(document_details)} chars")
# Extract correct names from documents
correct_names = self._extract_correct_names_from_documents(documents)
logger.info(f"πŸ’¬ VISUAL RESPONSE GENERATION: Correct names: {correct_names}")
# ============================================================
# PHASE 2: Try multi-modal generation first (GPT-4o with images)
# ============================================================
if self.enable_multimodal:
logger.info("πŸ–ΌοΈ VISUAL RESPONSE GENERATION: Attempting multi-modal generation (GPT-4o with images)...")
multimodal_response = self._generate_multimodal_response(
query=query,
documents=documents,
conversation_context=conversation_context,
correct_names=correct_names,
filters=filters
)
if multimodal_response:
logger.info("βœ… VISUAL RESPONSE GENERATION: Multi-modal generation successful!")
# Validate and enhance the response
final_response = self._validate_and_enhance_response(
multimodal_response,
documents,
query,
filters
)
logger.info("=" * 80)
return final_response
else:
logger.warning("⚠️ VISUAL RESPONSE GENERATION: Multi-modal failed, falling back to text-only")
else:
logger.info("πŸ“ VISUAL RESPONSE GENERATION: Multi-modal disabled, using text-only mode")
# ============================================================
# FALLBACK: Text-only generation (extracted text from pages)
# ============================================================
logger.info("πŸ“ VISUAL RESPONSE GENERATION: Using text-only mode")
# Create response prompt
response_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a helpful audit report assistant. Generate a natural, conversational response.
CRITICAL RULES - NO HALLUCINATION:
1. **ONLY use information from the retrieved documents provided below**
2. **EVERY sentence with facts, numbers, or specific claims MUST have a [Doc i] reference**
3. **If a document doesn't contain the information, DO NOT make it up**
4. **If the user asks about a year/district that's NOT in the retrieved documents, explicitly state that**
5. **Check the document years/districts before making any claims about them**
6. **USE CORRECT NAMES**: Always use the exact names from document metadata, not misspellings from conversation.
RULES:
1. Answer the user's question directly and clearly
2. Use ONLY the retrieved documents as evidence - DO NOT use your training data
3. Be conversational, not technical
4. Don't mention scores, retrieval details, or technical implementation
5. If relevant documents were found, reference them naturally
6. If no relevant documents, say you do not have enough information - DO NOT hallucinate
7. If the passages have useful facts or numbers, use them in your answer WITH references
8. **MANDATORY**: When you use information from a passage, mention where it came from by using [Doc i] at the end of the sentence.
9. Do not use the sentence 'Doc i says ...' to say where information came from.
10. If the same thing is said in more than one document, you can mention all of them like this: [Doc i, Doc j, Doc k]
11. Do not just summarize each passage one by one. Group your summaries to highlight the key parts in the explanation.
12. If it makes sense, use bullet points and lists to make your answers easier to understand.
13. You do not need to use every passage. Only use the ones that help answer the question.
14. **VERIFY**: Before mentioning any year, district, or number, check that it exists in the retrieved documents.
15. **NO HALLUCINATION**: If documents show years 2021, 2022, 2023 but user asks about 2020, DO NOT provide 2020 data.
16. **USE CORRECT SPELLING**: Always use the district/source names exactly as they appear in the document metadata below.
NOTE: These documents were retrieved using advanced visual search (ColPali), so they may contain tables, figures, or structured data.
TONE: Professional but friendly, like talking to a colleague."""),
HumanMessage(content=f"""Conversation History:
{conversation_context}
Current User Question: {query}
Retrieved Documents: {len(documents)} documents found
CORRECT NAMES TO USE (from document metadata - use these exact spellings):
{correct_names}
Full Document Details:
{document_details}
CRITICAL:
- Responses should be grounded to what is available in the retrieved documents
- If user asks about a specific year but documents show other years, explicitly state "can't provide response on ... because ..."
- Every factual claim MUST have [Doc i] reference
- If information is not in documents, explicitly state it's not available
- **USE THE CORRECT DISTRICT/SOURCE NAMES from the document metadata above**
Generate a conversational response with proper document references:""")
])
try:
logger.info(f"πŸ“ TEXT-ONLY GENERATION: Calling LLM...")
response = self.llm.invoke(response_prompt.format_messages())
response_text = response.content.strip()
logger.info(f"πŸ“ TEXT-ONLY GENERATION: LLM response received")
logger.info(f"πŸ“ TEXT-ONLY GENERATION: Response length: {len(response_text)} chars")
logger.info(f"πŸ“ TEXT-ONLY GENERATION: Response preview: {response_text[:300]}...")
# Check if response indicates no information found
no_info_indicators = [
"don't have", "do not have", "isn't available", "is not available",
"no information", "cannot provide", "can't provide", "not in the retrieved"
]
if any(indicator in response_text.lower() for indicator in no_info_indicators):
logger.warning("⚠️ TEXT-ONLY GENERATION: Response indicates NO INFORMATION FOUND")
logger.warning("⚠️ This could mean:")
logger.warning(" 1. Retrieved documents don't contain relevant content")
logger.warning(" 2. Extracted text from visual documents is empty/poor quality")
logger.warning(" 3. Query doesn't match document content")
# Validate and enhance the response
final_response = self._validate_and_enhance_response(
response_text,
documents,
query,
filters
)
logger.info("=" * 80)
return final_response
except Exception as e:
logger.error(f"❌ TEXT-ONLY GENERATION: Error during generation: {e}")
traceback.print_exc()
return "I apologize, but I encountered an error generating the response."
def _generate_conversational_response_without_docs(
self,
query: str,
messages: List[Any]
) -> str:
"""
Generate conversational response using only LLM knowledge.
Args:
query: User query
messages: Conversation history
Returns:
LLM response
"""
logger.info("πŸ’¬ RESPONSE GENERATION (NO DOCS): Starting response generation without documents")
# Build conversation context
conversation_context = ""
for msg in messages[-6:]:
if isinstance(msg, HumanMessage):
conversation_context += f"User: {msg.content}\n"
elif isinstance(msg, AIMessage):
conversation_context += f"Assistant: {msg.content}\n"
# Create response prompt
response_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a helpful audit report assistant.
RULES:
1. Politely explain that no relevant documents were found with high enough similarity
2. Suggest rephrasing the query or being more specific
3. Suggest checking if the information might be in a different year/source/district
4. Stay professional but friendly
TONE: Professional but friendly, like talking to a colleague."""),
HumanMessage(content=f"""Current Question: {query}
Conversation History:
{conversation_context}
Note: No relevant documents were found with high enough similarity scores.
Generate a helpful response:""")
])
try:
response = self.llm.invoke(response_prompt.format_messages())
return response.content.strip()
except Exception as e:
logger.error(f"❌ RESPONSE GENERATION (NO DOCS): Error: {e}")
return "I couldn't find relevant documents for your query. Please try rephrasing or being more specific."
def _build_conversation_context_for_response(self, messages: List[Any]) -> str:
"""Build conversation history context for response generation"""
context_lines = []
for msg in messages[-6:]:
if isinstance(msg, HumanMessage):
context_lines.append(f"User: {msg.content}")
elif isinstance(msg, AIMessage):
context_lines.append(f"Assistant: {msg.content}")
return "\n".join(context_lines) if context_lines else "No previous conversation."
def _build_visual_document_details(self, documents: List[Any]) -> str:
"""Build detailed document information for response generation"""
if not documents:
logger.warning("πŸ“„ BUILD_DETAILS: No documents to process!")
return "No documents retrieved."
logger.info(f"πŸ“„ BUILD_DETAILS: Processing {len(documents)} documents for LLM context")
details = []
docs_with_content = 0
docs_without_content = 0
total_content_length = 0
for i, doc in enumerate(documents[:15], 1):
metadata = getattr(doc, 'metadata', {})
content = getattr(doc, 'page_content', '')
score = getattr(doc, 'score', 0.0)
filename = metadata.get('filename', 'Unknown')
year = metadata.get('year', 'Unknown')
district = metadata.get('district', 'Unknown')
source = metadata.get('source', 'Unknown')
page = metadata.get('page_number', metadata.get('page', 'Unknown'))
# Visual metadata
num_tiles = metadata.get('num_tiles')
num_visual_tokens = metadata.get('num_visual_tokens')
doc_info = f"[Doc {i}] (Score: {score:.3f})"
doc_info += f"\n Filename: {filename}"
doc_info += f"\n Year: {year}"
doc_info += f"\n Source: {source}"
if district != 'Unknown':
doc_info += f"\n District: {district}"
doc_info += f"\n Page: {page}"
# Add visual metadata if available
if num_tiles or num_visual_tokens:
doc_info += f"\n Visual: {num_tiles} tiles, {num_visual_tokens} tokens"
# Add content preview
if content and content.strip():
doc_info += f"\n Content: {content[:500]}{'...' if len(content) > 500 else ''}"
docs_with_content += 1
total_content_length += len(content)
else:
doc_info += "\n Content: (No text extracted - image-only page)"
docs_without_content += 1
details.append(doc_info)
# Log summary
logger.info(f"πŸ“„ BUILD_DETAILS SUMMARY:")
logger.info(f" - Documents with text content: {docs_with_content}")
logger.info(f" - Documents WITHOUT text (image-only): {docs_without_content}")
logger.info(f" - Total text content length: {total_content_length} chars")
if docs_without_content > docs_with_content:
logger.warning(f"⚠️ BUILD_DETAILS: Most documents have NO TEXT CONTENT!")
logger.warning(f"⚠️ This is likely why the LLM says 'no information available'")
return "\n\n".join(details) if details else "No document details available."
def _extract_correct_names_from_documents(self, documents: List[Any]) -> str:
"""Extract correct district/source names from documents to correct misspellings"""
districts = set()
sources = set()
years = set()
for doc in documents:
metadata = getattr(doc, 'metadata', {})
if metadata.get('district'):
districts.add(str(metadata['district']))
if metadata.get('source'):
sources.add(str(metadata['source']))
if metadata.get('year'):
years.add(str(metadata['year']))
result = []
if districts:
result.append(f"Districts: {', '.join(sorted(districts))}")
if sources:
result.append(f"Sources: {', '.join(sorted(sources))}")
if years:
result.append(f"Years: {', '.join(sorted(years))}")
if result:
return "\n".join(result) + "\n\nIMPORTANT: Use these EXACT spellings in your response."
return "No metadata available."
def _validate_and_enhance_response(
self,
response: str,
documents: List[Any],
query: str,
filters: Dict[str, Any] = None
) -> str:
"""
Validate response and add warnings about data coverage gaps.
Compares REQUESTED filters against RETRIEVED document metadata.
Args:
response: LLM-generated response
documents: Retrieved documents
query: User query
filters: Applied filters (year, district, etc.)
Returns:
Response with optional warnings appended
"""
import re
# Extract years and districts from RETRIEVED documents
doc_years = set()
doc_districts = set()
for doc in documents:
metadata = getattr(doc, 'metadata', {}) if hasattr(doc, 'metadata') else {}
if isinstance(metadata, dict):
if metadata.get('year'):
doc_years.add(str(metadata['year']))
if metadata.get('district'):
doc_districts.add(str(metadata['district']))
logger.info(f"πŸ“Š VALIDATION: Retrieved docs cover years={doc_years}, districts={doc_districts}")
warnings = []
# Get REQUESTED filters
requested_years = set()
requested_districts = set()
if filters:
if filters.get('year'):
if isinstance(filters['year'], list):
requested_years = set(str(y) for y in filters['year'])
else:
requested_years = {str(filters['year'])}
if filters.get('district'):
if isinstance(filters['district'], list):
requested_districts = set(str(d) for d in filters['district'])
else:
requested_districts = {str(filters['district'])}
logger.info(f"πŸ“Š VALIDATION: Requested years={requested_years}, districts={requested_districts}")
# Compare requested vs retrieved YEARS
if requested_years and doc_years:
missing_years = requested_years - doc_years
if missing_years:
warnings.append(
f"You requested data for years {', '.join(sorted(requested_years))}, "
f"but the retrieved documents are missing {', '.join(sorted(missing_years))} "
f"(may not be available in the database)."
)
# Compare requested vs retrieved DISTRICTS
if requested_districts and doc_districts:
# Normalize for comparison (case-insensitive)
requested_lower = {d.lower() for d in requested_districts}
doc_lower = {d.lower() for d in doc_districts}
missing_lower = requested_lower - doc_lower
if missing_lower:
missing_districts = [d for d in requested_districts if d.lower() in missing_lower]
warnings.append(
f"You requested data for districts {', '.join(sorted(requested_districts))}, "
f"but the retrieved documents are missing {', '.join(sorted(missing_districts))} "
f"(may not be available in the database)."
)
# Add warnings to response if any
if warnings and "⚠️" not in response:
warning_text = "\n\n⚠️ **Note:** " + " ".join(warnings)
response = response + warning_text
logger.info(f"πŸ“Š VALIDATION: Added warning about data coverage")
return response
def get_visual_multi_agent_chatbot() -> VisualMultiAgentChatbot:
"""
Factory function to create a visual multi-agent chatbot.
Returns:
Initialized VisualMultiAgentChatbot
"""
import os
from src.colpali.visual_search import VisualSearchAdapter
logger.info("🎨 Creating Visual Multi-Agent Chatbot...")
# Get Qdrant credentials for ColPali cluster
qdrant_url = (
os.environ.get("QDRANT_URL_AKRYL") or
os.environ.get("DEST_QDRANT_URL") or
os.environ.get("QDRANT_URL")
)
qdrant_api_key = (
os.environ.get("QDRANT_API_KEY_AKRYL") or
os.environ.get("DEST_QDRANT_API_KEY") or
os.environ.get("QDRANT_API_KEY")
)
# Get collection name from env var (default to colSmol-500M-v2 for new processing)
collection_name = os.environ.get("QDRANT_COLLECTION_VISUAL", "colSmol-500M-v2")
if not qdrant_url or not qdrant_api_key:
raise ValueError(
"Visual mode requires Qdrant credentials for the ColPali cluster.\n"
"Please set one of these in your .env file:\n"
" - QDRANT_URL_AKRYL and QDRANT_API_KEY_AKRYL\n"
" - DEST_QDRANT_URL and DEST_QDRANT_API_KEY\n"
" - QDRANT_URL and QDRANT_API_KEY\n"
"And optionally set QDRANT_COLLECTION_VISUAL (default: colSmol-500M-v2)"
)
logger.info(f" Using Qdrant URL: {qdrant_url}")
logger.info(f" Collection: {collection_name}")
logger.info(f" Multi-modal: {MULTIMODAL_ENABLED} (model: {MULTIMODAL_MODEL}, max_images: {MULTIMODAL_MAX_IMAGES})")
# Create visual search adapter
visual_search = VisualSearchAdapter(
qdrant_url=qdrant_url,
qdrant_api_key=qdrant_api_key,
collection_name=collection_name
)
# Create multi-agent chatbot with multi-modal enabled
chatbot = VisualMultiAgentChatbot(
visual_search=visual_search,
config_path="src/config/settings.yaml",
enable_multimodal=MULTIMODAL_ENABLED
)
return chatbot