AgenticRAG_test / rag_pipeline.py
Zeggai Abdellah
fix max iterations
a6cedca
# -*- coding: utf-8 -*-
"""
Enhanced RAG Pipeline for vaccine assistant with fallback system
Handles agent creation and question answering with sequential citation numbering
Includes fallback agent for max iterations handling
"""
import json
import re
from llama_index.core import PromptTemplate
from llama_index.core.agent import ReActAgent
from llama_index.llms.google_genai import GoogleGenAI
from langdetect import detect
import os
import time
def extract_source_ids(response_text):
"""
Extract source IDs from the response, handling different citation formats:
- Standard format: [Source ID]
- Multiple sources in one citation: [Source ID1][Source ID2]
- Multiple sources in one bracket: [Source ID1, Source ID2]
Args:
response_text (str): The generated response text with inline citations.
Returns:
list of str: List of unique source IDs found in the response text.
"""
import re
print(f"[LOG] Extracting source IDs from response text (length: {len(response_text)} chars)")
# First, extract all source IDs from inline citations with adjacent brackets [ID1][ID2]
# Replace them with single brackets with comma separation to standardize format
consolidated_text = re.sub(r'\][\s]*\[', '][', response_text)
consolidated_text = re.sub(r'\]\[', ', ', consolidated_text)
# Now extract all source IDs from any format (single ID or comma-separated IDs)
inline_citations = re.findall(r'\[([^\[\]]+)\]', consolidated_text)
print(f"[LOG] Found {len(inline_citations)} inline citations")
if not inline_citations:
print("Warning: No source IDs found in the response text.")
return []
# Process each citation which might contain multiple comma-separated IDs
all_ids = []
for citation in inline_citations:
# Split by comma and strip whitespace
ids = [id_str.strip() for id_str in citation.split(',')]
all_ids.extend(ids)
# Get unique source IDs while preserving order
seen = set()
source_ids = []
for id_str in all_ids:
if id_str not in seen:
seen.add(id_str)
source_ids.append(id_str)
print(f"[LOG] Extracted {len(source_ids)} unique source IDs: {source_ids[:3]}{'...' if len(source_ids) > 3 else ''}")
if not source_ids:
print("Warning: No valid source IDs found after filtering.")
return []
return source_ids
def convert_citations_to_sequential(response_text, source_id_to_number_map):
"""
Convert source IDs in response text to sequential numbers.
Args:
response_text (str): The response text with source ID citations
source_id_to_number_map (dict): Mapping from source IDs to sequential numbers
Returns:
str: Response text with sequential number citations
"""
print(f"[LOG] Converting {len(source_id_to_number_map)} source IDs to sequential numbers")
def replace_citation(match):
citation_content = match.group(1)
# Handle multiple IDs in one citation (comma-separated)
ids = [id_str.strip() for id_str in citation_content.split(',')]
# Convert each ID to its sequential number
numbers = []
for id_str in ids:
if id_str in source_id_to_number_map:
numbers.append(str(source_id_to_number_map[id_str]))
# Return the formatted citation with sequential numbers
if len(numbers) == 1:
return f"[{numbers[0]}]"
elif len(numbers) > 1:
return f"[{','.join(numbers)}]"
else:
return match.group(0) # Return original if no mapping found
# Replace all citations in the text
sequential_response = re.sub(r'\[([^\[\]]+)\]', replace_citation, response_text)
print("[LOG] Successfully converted citations to sequential format")
return sequential_response
def create_safe_custom_prompt(tools, llm, is_fallback=False):
"""Create a safe version that won't have formatting conflicts"""
print(f"[LOG] Creating {'fallback' if is_fallback else 'standard'} custom prompt with {len(tools)} tools")
if is_fallback:
custom_instructions = """
## MEDICAL ASSISTANT ROLE - FALLBACK MODE
You are a helpful and knowledgeable AI-powered vaccine assistant designed to support doctors in clinical decision-making.
You are operating in FALLBACK MODE with access to only the most essential and comprehensive tools.
You provide evidence-based guidance using only information from official vaccine medical documents.
Answer the doctor's question accurately and concisely using only the provided information.
## FALLBACK MODE INSTRUCTIONS
- You have access to only 2 powerful tools: general_guide_tool (Algerian National Vaccination Guide) and who_immunization_tool (WHO global guidance).
- **MANDATORY TOOL USAGE**: Always use the relevant tool(s) to search for information before answering, even if you initially think no information is available.
- Be direct and efficient - search once with each tool if needed, then provide your answer.
- Do not overthink or search repeatedly - these tools are comprehensive.
## IMPORTANT REQUIREMENTS
### Citation and Sourcing
1. For each fact in your response, include an inline citation in the format [Source ID] immediately following the information, e.g., [e795ebd28318886c0b1a5395ac30ad90].
2. The Source ID must be the exact alphanumeric identifier from the search results, NOT the tool name or any other text.
3. Do NOT use 'Source:' in the citation format; use only the Source ID in square brackets.
4. Do NOT use tool names (like general_guide_tool, who_immunization_tool) as citations.
5. If a fact is supported by multiple sources, use adjacent citations: [e795ebd28318886c0b1a5395ac30ad90][21a932b2340bb16707763f57f0ad2]
6. Use ONLY the provided information from tool outputs and never include facts from your general knowledge.
### Content Formatting
1. When rendering tables:
- Convert HTML tables into clean Markdown format.
- Preserve all original headers and data rows exactly.
- Include the citation in the table caption, e.g., 'Table: Vaccination Schedule [Source ID]'.
2. For lists, maintain the original bullet points/numbering and include citations.
3. Present information concisely but ensure clinical accuracy is never compromised.
### CRITICAL: Efficient Fallback Strategy
1. **MANDATORY SEARCH**: Use each relevant tool at least once to search for information, even if you suspect the information might not be available.
2. **BREAK DOWN COMPLEX QUERIES**: For comparative or multi-part questions (e.g., comparing Algerian and WHO guidelines), break the query into sub-queries and use the appropriate tool for each part.
3. **DO NOT STOP PREMATURELY**: Do not conclude "no information is available" without using the relevant tool(s) to search for the answer.
4. **BE DECISIVE**: Once you find relevant information for each sub-query, formulate your response immediately.
5. **ANSWER FULLY**: Address all parts of the question, using multiple tools if required by the query.
6. **FINAL ANSWER**: Once you have your answer, present it directly. Do not output your internal 'thought' or 'action' steps. Your final output must be the synthesized answer itself.
### Response Guidelines
- **MANDATORY TOOL SELECTION**:
- For queries mentioning "WHO," "World Health Organization," "international," "global guidance," or WHO documents, use who_immunization_tool first.
- For queries mentioning "Algerian," "national guide," or Algerian-specific terms, use general_guide_tool first.
- For comparative queries (e.g., Algerian vs. WHO), use both tools, addressing each part systematically.
- **EXPLICIT REASONING**: Before answering, log your reasoning steps, including which tools you will use and why, based on the query’s content.
- Provide all found information with proper citations using Source IDs only.
- If information is limited, clearly state: "Based on the available documents, I can provide the following information..." and indicate what is not available.
---
"""
else:
custom_instructions = """
## MEDICAL ASSISTANT ROLE
You are a helpful and knowledgeable AI-powered vaccine assistant designed to support doctors in clinical decision-making.
You provide evidence-based guidance using only information from official vaccine medical documents.
Answer the doctor's question accurately and concisely using only the provided information.
## IMPORTANT REQUIREMENTS
### Citation and Sourcing
1. For each fact in your response, include an inline citation in the format [Source ID] immediately following the information, e.g., [e795ebd28318886c0b1a5395ac30ad90].
2. The Source ID must be the exact alphanumeric identifier from the search results, NOT the tool name or any other text.
3. Do NOT use 'Source:' in the citation format; use only the Source ID in square brackets.
4. Do NOT use tool names (like general_guide_tool, cold_chain_tool) as citations.
5. If a fact is supported by multiple sources, use adjacent citations: [e795ebd28318886c0b1a5395ac30ad90][21a932b2340bb16707763f57f0ad2]
6. Use ONLY the provided information from tool outputs and never include facts from your general knowledge.
### Content Formatting
1. When rendering tables:
- Convert HTML tables into clean Markdown format.
- Preserve all original headers and data rows exactly.
- Include the citation in the table caption, e.g., 'Table: Vaccination Schedule [Source ID]'.
2. For lists, maintain the original bullet points/numbering and include citations.
3. Present information concisely but ensure clinical accuracy is never compromised.
### CRITICAL: Efficient Response Strategy
1. **MANDATORY SEARCH**: Always use the relevant tool(s) to search for information before answering, even if you initially think no information is available.
2. **MANDATORY TOOL SELECTION**:
- For queries about global standards or WHO, use who_immunization_tool.
- For broad questions about the Algerian guide, use general_guide_tool.
- For specific topics like cold chain, disease info, etc., use the most specific tool (e.g., cold_chain_tool, disease_info_tool).
3. **Query Decomposition**: Break comparative or multi-part queries into sub-queries and use the appropriate tool for each.
4. **DO NOT STOP PREMATURELY**: Do not conclude "no information is available" without using the relevant tool(s) to search for the answer.
5. **EXPLICIT REASONING**: Before answering, log your reasoning steps, including which tools you will use and why.
6. **BE DECISIVE**: Once you find relevant information, formulate your response.
### Final Answer Generation
- **STOP WHEN SUFFICIENT**: Once you have gathered enough information from the tools to answer the user's question completely, you MUST stop using tools and formulate a final answer.
- **SYNTHESIZE THE ANSWER**: Formulate a comprehensive, final answer based ONLY on the observed tool outputs.
- **PRESENT CLEANLY**: Present this final answer directly to the user. Your final output must be the answer itself, not your internal 'thought' or 'action' steps.
### Response Guidelines for Complex Questions
- For comparative questions: Break the query into sub-queries, use the appropriate tools, then provide the comparison.
- For multi-part questions: Address each part systematically, using the appropriate tool for each sub-query.
- If information is not found after using the relevant tool(s): State clearly: "Based on the available documents, I can provide the following information..." and specify what is not available.
---
"""
# Get the exact original template first
temp_agent = ReActAgent.from_tools(tools, llm=llm, verbose=False)
original_prompts = temp_agent.get_prompts()
original_template = original_prompts["agent_worker:system_prompt"].template
# Add instructions at the very beginning
safe_template = f"{custom_instructions}{original_template}"
# Create new prompt with same metadata as original
original_prompt = original_prompts["agent_worker:system_prompt"]
try:
new_prompt = PromptTemplate(
template=safe_template,
template_vars=original_prompt.template_vars,
metadata=original_prompt.metadata if hasattr(original_prompt, 'metadata') else None
)
print(f"[LOG] ✅ Successfully created {'fallback' if is_fallback else 'standard'} custom prompt")
return new_prompt
except:
# Even safer fallback
print(f"[LOG] ⚠️ Using fallback prompt template for {'fallback' if is_fallback else 'standard'} agent")
return PromptTemplate(template=safe_template)
def create_agent(tools, llm, is_fallback=False):
"""Create the ReAct agent with custom prompt"""
agent_type = "FALLBACK" if is_fallback else "STANDARD"
# **FIX**: Increased max_iterations to give the agent more steps to reason
max_iter = 15
print(f"[LOG] Creating {agent_type} ReAct agent with {len(tools)} tools and max_iterations={max_iter}")
# Create agent with appropriate settings
agent = ReActAgent.from_tools(
tools,
llm=llm,
verbose=True,
max_iterations=max_iter,
)
# Create and apply safe custom prompt
try:
safe_custom_prompt = create_safe_custom_prompt(tools, llm, is_fallback=is_fallback)
agent.update_prompts({"agent_worker:system_prompt": safe_custom_prompt})
print(f"✅ Successfully updated {agent_type} agent with safe custom prompt")
except Exception as e:
print(f"❌ {agent_type} agent prompt update failed: {e}")
print(f"⚠️ Using original {agent_type} agent without modifications")
print(f"[LOG] {agent_type} agent creation completed")
return agent
def create_fallback_tools(all_tools):
"""Extract only the general_guide_tool and who_immunization_tool for fallback agent"""
print("[LOG] Creating fallback tools (guide + WHO only)")
fallback_tools = []
tool_names_found = []
for tool in all_tools:
tool_name = tool.metadata.name if hasattr(tool, 'metadata') else str(tool)
if tool_name in ["general_guide_tool", "who_immunization_tool"]:
fallback_tools.append(tool)
tool_names_found.append(tool_name)
print(f"[LOG] Found {len(fallback_tools)} fallback tools: {tool_names_found}")
if len(fallback_tools) == 0:
print("[LOG] ❌ ERROR: No fallback tools found! Check tool names.")
return None
return fallback_tools
def initialize_rag_pipeline(tools):
"""Initialize the RAG pipeline with both standard and fallback agents"""
print("[LOG] Initializing RAG pipeline with fallback system...")
print(f"[LOG] Available tools: {[tool.metadata.name if hasattr(tool, 'metadata') else str(tool) for tool in tools]}")
# Initialize LlamaIndex LLM
print("[LOG] Initializing Google GenAI LLM (gemini-2.0-flash)")
llama_index_llm = GoogleGenAI(
model="models/gemini-2.0-flash",
api_key=os.getenv('GOOGLE_API_KEY'),
)
# Create standard agent
print("[LOG] Creating standard agent...")
standard_agent = create_agent(tools, llama_index_llm, is_fallback=False)
# Create fallback tools and agent
print("[LOG] Creating fallback agent...")
fallback_tools = create_fallback_tools(tools)
if fallback_tools is None:
print("[LOG] ❌ WARNING: Fallback agent creation failed - no fallback tools available")
fallback_agent = None
else:
fallback_agent = create_agent(fallback_tools, llama_index_llm, is_fallback=True)
print("[LOG] ✅ Fallback agent created successfully")
print("[LOG] ✅ RAG pipeline initialization completed with fallback system")
return {
"standard_agent": standard_agent,
"fallback_agent": fallback_agent,
"llm": llama_index_llm
}
def detect_max_iterations_error(response_text):
"""Detect if the response indicates a max iterations error OR is an unfinished thought."""
response_lower = response_text.lower().strip()
# **FIX**: Check if the response is the agent's raw thought process.
if response_lower.startswith("a:```thought") or response_lower.startswith("```thought"):
print("[LOG] Detected unfinished agent thought process.")
return True
max_iteration_indicators = [
"max iterations",
"reached max iterations",
"agent stopped due to max iterations",
"maximum number of iterations",
"iteration limit"
]
# Check for explicit max iterations indicators
for indicator in max_iteration_indicators:
if indicator in response_lower:
print(f"[LOG] Detected max iteration indicator: '{indicator}'")
return True
# Check for very short or empty responses (often indicates failure)
if len(response_text.strip()) < 10:
return True
# Check for generic error patterns
if ("error" in response_lower and "processing" in response_lower):
return True
return False
def process_question(agents_dict, question: str) -> str:
"""Process a question through the RAG pipeline with fallback support"""
print(f"[LOG] Processing question: '{question[:100]}{'...' if len(question) > 100 else ''}'")
standard_agent = agents_dict["standard_agent"]
fallback_agent = agents_dict["fallback_agent"]
print("="*50)
print("🤖 STANDARD AGENT REASONING PROCESS:")
print("="*50)
start_time = time.time()
try:
# Try standard agent first
response = standard_agent.chat(question)
response_text = response.response
print("="*50)
print("🤖 STANDARD AGENT REASONING COMPLETED")
print("="*50)
elapsed_time = time.time() - start_time
print(f"[LOG] ✅ Standard agent response received in {elapsed_time:.2f} seconds")
print(f"[LOG] Response length: {len(response_text)} characters")
# Check if we need to use fallback
if detect_max_iterations_error(response_text):
print("[LOG] 🔄 Max iterations or unfinished thought detected, switching to FALLBACK AGENT...")
if fallback_agent is None:
print("[LOG] ❌ Fallback agent not available, returning error message")
return ("I apologize, but I encountered difficulties processing your question. "
"Please try rephrasing your question more specifically or breaking it down into smaller parts.")
print("="*50)
print("🛡️ FALLBACK AGENT REASONING PROCESS:")
print("="*50)
fallback_start_time = time.time()
try:
fallback_response = fallback_agent.chat(question)
fallback_text = fallback_response.response
print("="*50)
print("🛡️ FALLBACK AGENT REASONING COMPLETED")
print("="*50)
fallback_elapsed = time.time() - fallback_start_time
total_elapsed = time.time() - start_time
print(f"[LOG] ✅ Fallback agent response received in {fallback_elapsed:.2f} seconds")
print(f"[LOG] Total processing time: {total_elapsed:.2f} seconds")
print(f"[LOG] Fallback response length: {len(fallback_text)} characters")
# Check if fallback also failed
if detect_max_iterations_error(fallback_text):
print("[LOG] ❌ Fallback agent also hit max iterations or failed to produce an answer.")
return ("I apologize, but I'm having difficulty finding specific information about your question in the available documents. "
"Please try asking a more specific question or rephrasing your query.")
return fallback_text
except Exception as e:
fallback_elapsed = time.time() - fallback_start_time
print(f"[LOG] ❌ Fallback agent error after {fallback_elapsed:.2f} seconds: {e}")
return ("I apologize, but I encountered an error while processing your question. "
"Please try rephrasing your question or asking about a more specific topic.")
return response_text
except Exception as e:
elapsed_time = time.time() - start_time
print(f"[LOG] ❌ Standard agent error after {elapsed_time:.2f} seconds: {e}")
# Try fallback even on standard agent exception
if fallback_agent is not None:
print("[LOG] 🔄 Standard agent failed, trying FALLBACK AGENT...")
try:
fallback_response = fallback_agent.chat(question)
return fallback_response.response
except Exception as fallback_e:
print(f"[LOG] ❌ Fallback agent also failed: {fallback_e}")
return f"Error processing your question: {str(e)}"
def aswer_language_detection(response_text: str) -> str:
"""
Detect the language of the response text.
Args:
response_text (str): The response text to analyze.
Returns:
str: Detected language code (e.g., 'en', 'fr', etc.)
"""
print("[LOG] Detecting response language...")
try:
# Detect the language of the first 5 words of the response
first_line = " ".join(response_text.split()[:5])
first_line = re.sub(r'\[.*?\]', '', first_line) # Remove citations
answer_language = detect(first_line)
print(f"[LOG] Detected language: {answer_language}")
if answer_language not in ['en', 'ar', 'fr']:
print(f"[LOG] Language {answer_language} not in supported list, defaulting to 'en'")
answer_language ='en'
except:
print("[LOG] Language detection failed, defaulting to 'en'")
answer_language ='en'
finally:
return answer_language
def process_question_with_sequential_citations(agents_dict, question: str, chunks_directory="./data/") -> dict:
"""
Process a question through the RAG pipeline with fallback support and return response with sequential citation numbers.
Args:
agents_dict: Dictionary containing standard_agent, fallback_agent, and llm
question (str): The user's question
chunks_directory (str): Path to the directory containing JSON files
Returns:
dict: {
"response": str, # Response with sequential citation numbers [1], [2], etc.
"cited_elements_json": str, # JSON array of cited elements in order
"unique_ids": list, # Original source IDs in order
"citation_mapping": dict, # Mapping from source ID to citation number
"used_fallback": bool # Whether fallback agent was used
}
"""
print(f"\n[LOG] === STARTING QUESTION PROCESSING WITH FALLBACK SUPPORT ===")
print(f"[LOG] Question: '{question[:150]}{'...' if len(question) > 150 else ''}'")
print(f"[LOG] Chunks directory: {chunks_directory}")
start_time = time.time()
used_fallback = False # This flag is a heuristic
try:
# Get the response using the enhanced process_question function
response_text = process_question(agents_dict, question)
# Check if fallback was likely used (simple heuristic based on logs)
# A more robust way would be for `process_question` to return a tuple (response, used_fallback)
if "switching to fallback agent" in response_text.lower():
used_fallback = True
print("[LOG] 🛡️ Fallback agent was likely used based on log indicators.")
agent_time = time.time() - start_time
print(f"[LOG] Agent processing completed in {agent_time:.2f} seconds")
print(f"[LOG] Raw response length: {len(response_text)} characters")
# Extract source IDs from the response (preserving order)
unique_ids = extract_source_ids(response_text)
# Create mapping from source ID to sequential number
source_id_to_number = {source_id: i + 1 for i, source_id in enumerate(unique_ids)}
print(f"[LOG] Created citation mapping for {len(source_id_to_number)} sources")
# Convert citations to sequential numbers
sequential_response = convert_citations_to_sequential(response_text, source_id_to_number)
# Load all chunks data to find cited elements
print("[LOG] Loading chunks data for citation lookup...")
all_chunks_data = []
min_chunks_files = ["Guide-pratique-de-mise-en-oeuvre-du-calendrier-national-de-vaccination-2023.json",
"Immunization in Practice_WHO_eng_2015.json"]
for json_file in min_chunks_files:
json_path = os.path.join(chunks_directory, json_file)
if not os.path.exists(json_path):
print(f"[LOG] ⚠️ Skipping non-existent file: {json_path}")
continue
print(f"[LOG] Loading {json_file}...")
try:
with open(json_path, "r", encoding="utf-8") as f:
chunks_data = json.load(f)
all_chunks_data.extend(chunks_data)
print(f"[LOG] ✅ Loaded {len(chunks_data)} chunks from {json_file}")
except Exception as e:
print(f"[LOG] ❌ Warning: Could not load {json_file}: {e}")
print(f"[LOG] Total chunks loaded: {len(all_chunks_data)}")
# Get cited elements in the same order as the sequential citations
print("[LOG] Finding cited elements...")
cited_elements_ordered = []
for i, source_id in enumerate(unique_ids): # This preserves the order
# print(f"[LOG] Looking for source ID {i+1}/{len(unique_ids)}: {source_id}") # This is too verbose for normal operation
found = False
for element in all_chunks_data:
# Handle TableElement structure
if element.get("type") == 'TableElement':
if element.get("elements", {}).get("element_id") == source_id:
cited_elements_ordered.append(element.get("elements", {}))
found = True
break
# Handle other element structures
elif "elements" in element and isinstance(element["elements"], list):
for nested_element in element["elements"]:
if isinstance(nested_element, dict) and nested_element.get("element_id") == source_id:
cited_elements_ordered.append(nested_element)
found = True
break
if found:
break
if not found:
print(f"[LOG] ⚠️ Source ID {source_id} not found in chunks data")
print(f"[LOG] Found {len(cited_elements_ordered)} cited elements")
# Convert to JSON
cited_elements_json = json.dumps(cited_elements_ordered, ensure_ascii=False, indent=2)
answer_language = aswer_language_detection(response_text)
total_time = time.time() - start_time
print(f"[LOG] ✅ Processing completed in {total_time:.2f} seconds total")
print(f"[LOG] Final response length: {len(sequential_response)} characters")
print(f"[LOG] Used fallback: {used_fallback}")
print(f"[LOG] === QUESTION PROCESSING COMPLETED ===\n")
return {
"response": sequential_response,
"cited_elements_json": cited_elements_json,
"unique_ids": unique_ids,
"citation_mapping": source_id_to_number,
"answer_language": answer_language,
"used_fallback": used_fallback
}
except Exception as e:
elapsed_time = time.time() - start_time
print(f"[LOG] ❌ Error processing question after {elapsed_time:.2f} seconds: {e}")
error_response = "I apologize, but I encountered an error while processing your question. Please try rephrasing your question or asking about a more specific topic."
return {
"response": error_response,
"cited_elements_json": "[]",
"unique_ids": [],
"citation_mapping": {},
"answer_language": "en",
"used_fallback": False
}
def process_question_with_citations(agents_dict, question: str, chunks_directory="./data/") -> dict:
"""
Legacy function - maintained for backward compatibility.
Now calls the new sequential citation function with fallback support.
"""
print("[LOG] Using legacy function wrapper - redirecting to sequential citations with fallback")
return process_question_with_sequential_citations(agents_dict, question, chunks_directory)