Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| Enhanced RAG Pipeline for vaccine assistant with fallback system | |
| Handles agent creation and question answering with sequential citation numbering | |
| Includes fallback agent for max iterations handling | |
| """ | |
| import json | |
| import re | |
| from llama_index.core import PromptTemplate | |
| from llama_index.core.agent import ReActAgent | |
| from llama_index.llms.google_genai import GoogleGenAI | |
| from langdetect import detect | |
| import os | |
| import time | |
| def extract_source_ids(response_text): | |
| """ | |
| Extract source IDs from the response, handling different citation formats: | |
| - Standard format: [Source ID] | |
| - Multiple sources in one citation: [Source ID1][Source ID2] | |
| - Multiple sources in one bracket: [Source ID1, Source ID2] | |
| Args: | |
| response_text (str): The generated response text with inline citations. | |
| Returns: | |
| list of str: List of unique source IDs found in the response text. | |
| """ | |
| import re | |
| print(f"[LOG] Extracting source IDs from response text (length: {len(response_text)} chars)") | |
| # First, extract all source IDs from inline citations with adjacent brackets [ID1][ID2] | |
| # Replace them with single brackets with comma separation to standardize format | |
| consolidated_text = re.sub(r'\][\s]*\[', '][', response_text) | |
| consolidated_text = re.sub(r'\]\[', ', ', consolidated_text) | |
| # Now extract all source IDs from any format (single ID or comma-separated IDs) | |
| inline_citations = re.findall(r'\[([^\[\]]+)\]', consolidated_text) | |
| print(f"[LOG] Found {len(inline_citations)} inline citations") | |
| if not inline_citations: | |
| print("Warning: No source IDs found in the response text.") | |
| return [] | |
| # Process each citation which might contain multiple comma-separated IDs | |
| all_ids = [] | |
| for citation in inline_citations: | |
| # Split by comma and strip whitespace | |
| ids = [id_str.strip() for id_str in citation.split(',')] | |
| all_ids.extend(ids) | |
| # Get unique source IDs while preserving order | |
| seen = set() | |
| source_ids = [] | |
| for id_str in all_ids: | |
| if id_str not in seen: | |
| seen.add(id_str) | |
| source_ids.append(id_str) | |
| print(f"[LOG] Extracted {len(source_ids)} unique source IDs: {source_ids[:3]}{'...' if len(source_ids) > 3 else ''}") | |
| if not source_ids: | |
| print("Warning: No valid source IDs found after filtering.") | |
| return [] | |
| return source_ids | |
| def convert_citations_to_sequential(response_text, source_id_to_number_map): | |
| """ | |
| Convert source IDs in response text to sequential numbers. | |
| Args: | |
| response_text (str): The response text with source ID citations | |
| source_id_to_number_map (dict): Mapping from source IDs to sequential numbers | |
| Returns: | |
| str: Response text with sequential number citations | |
| """ | |
| print(f"[LOG] Converting {len(source_id_to_number_map)} source IDs to sequential numbers") | |
| def replace_citation(match): | |
| citation_content = match.group(1) | |
| # Handle multiple IDs in one citation (comma-separated) | |
| ids = [id_str.strip() for id_str in citation_content.split(',')] | |
| # Convert each ID to its sequential number | |
| numbers = [] | |
| for id_str in ids: | |
| if id_str in source_id_to_number_map: | |
| numbers.append(str(source_id_to_number_map[id_str])) | |
| # Return the formatted citation with sequential numbers | |
| if len(numbers) == 1: | |
| return f"[{numbers[0]}]" | |
| elif len(numbers) > 1: | |
| return f"[{','.join(numbers)}]" | |
| else: | |
| return match.group(0) # Return original if no mapping found | |
| # Replace all citations in the text | |
| sequential_response = re.sub(r'\[([^\[\]]+)\]', replace_citation, response_text) | |
| print("[LOG] Successfully converted citations to sequential format") | |
| return sequential_response | |
| def create_safe_custom_prompt(tools, llm, is_fallback=False): | |
| """Create a safe version that won't have formatting conflicts""" | |
| print(f"[LOG] Creating {'fallback' if is_fallback else 'standard'} custom prompt with {len(tools)} tools") | |
| if is_fallback: | |
| custom_instructions = """ | |
| ## MEDICAL ASSISTANT ROLE - FALLBACK MODE | |
| You are a helpful and knowledgeable AI-powered vaccine assistant designed to support doctors in clinical decision-making. | |
| You are operating in FALLBACK MODE with access to only the most essential and comprehensive tools. | |
| You provide evidence-based guidance using only information from official vaccine medical documents. | |
| Answer the doctor's question accurately and concisely using only the provided information. | |
| ## FALLBACK MODE INSTRUCTIONS | |
| - You have access to only 2 powerful tools: general_guide_tool (Algerian National Vaccination Guide) and who_immunization_tool (WHO global guidance). | |
| - **MANDATORY TOOL USAGE**: Always use the relevant tool(s) to search for information before answering, even if you initially think no information is available. | |
| - Be direct and efficient - search once with each tool if needed, then provide your answer. | |
| - Do not overthink or search repeatedly - these tools are comprehensive. | |
| ## IMPORTANT REQUIREMENTS | |
| ### Citation and Sourcing | |
| 1. For each fact in your response, include an inline citation in the format [Source ID] immediately following the information, e.g., [e795ebd28318886c0b1a5395ac30ad90]. | |
| 2. The Source ID must be the exact alphanumeric identifier from the search results, NOT the tool name or any other text. | |
| 3. Do NOT use 'Source:' in the citation format; use only the Source ID in square brackets. | |
| 4. Do NOT use tool names (like general_guide_tool, who_immunization_tool) as citations. | |
| 5. If a fact is supported by multiple sources, use adjacent citations: [e795ebd28318886c0b1a5395ac30ad90][21a932b2340bb16707763f57f0ad2] | |
| 6. Use ONLY the provided information from tool outputs and never include facts from your general knowledge. | |
| ### Content Formatting | |
| 1. When rendering tables: | |
| - Convert HTML tables into clean Markdown format. | |
| - Preserve all original headers and data rows exactly. | |
| - Include the citation in the table caption, e.g., 'Table: Vaccination Schedule [Source ID]'. | |
| 2. For lists, maintain the original bullet points/numbering and include citations. | |
| 3. Present information concisely but ensure clinical accuracy is never compromised. | |
| ### CRITICAL: Efficient Fallback Strategy | |
| 1. **MANDATORY SEARCH**: Use each relevant tool at least once to search for information, even if you suspect the information might not be available. | |
| 2. **BREAK DOWN COMPLEX QUERIES**: For comparative or multi-part questions (e.g., comparing Algerian and WHO guidelines), break the query into sub-queries and use the appropriate tool for each part. | |
| 3. **DO NOT STOP PREMATURELY**: Do not conclude "no information is available" without using the relevant tool(s) to search for the answer. | |
| 4. **BE DECISIVE**: Once you find relevant information for each sub-query, formulate your response immediately. | |
| 5. **ANSWER FULLY**: Address all parts of the question, using multiple tools if required by the query. | |
| 6. **FINAL ANSWER**: Once you have your answer, present it directly. Do not output your internal 'thought' or 'action' steps. Your final output must be the synthesized answer itself. | |
| ### Response Guidelines | |
| - **MANDATORY TOOL SELECTION**: | |
| - For queries mentioning "WHO," "World Health Organization," "international," "global guidance," or WHO documents, use who_immunization_tool first. | |
| - For queries mentioning "Algerian," "national guide," or Algerian-specific terms, use general_guide_tool first. | |
| - For comparative queries (e.g., Algerian vs. WHO), use both tools, addressing each part systematically. | |
| - **EXPLICIT REASONING**: Before answering, log your reasoning steps, including which tools you will use and why, based on the query’s content. | |
| - Provide all found information with proper citations using Source IDs only. | |
| - If information is limited, clearly state: "Based on the available documents, I can provide the following information..." and indicate what is not available. | |
| --- | |
| """ | |
| else: | |
| custom_instructions = """ | |
| ## MEDICAL ASSISTANT ROLE | |
| You are a helpful and knowledgeable AI-powered vaccine assistant designed to support doctors in clinical decision-making. | |
| You provide evidence-based guidance using only information from official vaccine medical documents. | |
| Answer the doctor's question accurately and concisely using only the provided information. | |
| ## IMPORTANT REQUIREMENTS | |
| ### Citation and Sourcing | |
| 1. For each fact in your response, include an inline citation in the format [Source ID] immediately following the information, e.g., [e795ebd28318886c0b1a5395ac30ad90]. | |
| 2. The Source ID must be the exact alphanumeric identifier from the search results, NOT the tool name or any other text. | |
| 3. Do NOT use 'Source:' in the citation format; use only the Source ID in square brackets. | |
| 4. Do NOT use tool names (like general_guide_tool, cold_chain_tool) as citations. | |
| 5. If a fact is supported by multiple sources, use adjacent citations: [e795ebd28318886c0b1a5395ac30ad90][21a932b2340bb16707763f57f0ad2] | |
| 6. Use ONLY the provided information from tool outputs and never include facts from your general knowledge. | |
| ### Content Formatting | |
| 1. When rendering tables: | |
| - Convert HTML tables into clean Markdown format. | |
| - Preserve all original headers and data rows exactly. | |
| - Include the citation in the table caption, e.g., 'Table: Vaccination Schedule [Source ID]'. | |
| 2. For lists, maintain the original bullet points/numbering and include citations. | |
| 3. Present information concisely but ensure clinical accuracy is never compromised. | |
| ### CRITICAL: Efficient Response Strategy | |
| 1. **MANDATORY SEARCH**: Always use the relevant tool(s) to search for information before answering, even if you initially think no information is available. | |
| 2. **MANDATORY TOOL SELECTION**: | |
| - For queries about global standards or WHO, use who_immunization_tool. | |
| - For broad questions about the Algerian guide, use general_guide_tool. | |
| - For specific topics like cold chain, disease info, etc., use the most specific tool (e.g., cold_chain_tool, disease_info_tool). | |
| 3. **Query Decomposition**: Break comparative or multi-part queries into sub-queries and use the appropriate tool for each. | |
| 4. **DO NOT STOP PREMATURELY**: Do not conclude "no information is available" without using the relevant tool(s) to search for the answer. | |
| 5. **EXPLICIT REASONING**: Before answering, log your reasoning steps, including which tools you will use and why. | |
| 6. **BE DECISIVE**: Once you find relevant information, formulate your response. | |
| ### Final Answer Generation | |
| - **STOP WHEN SUFFICIENT**: Once you have gathered enough information from the tools to answer the user's question completely, you MUST stop using tools and formulate a final answer. | |
| - **SYNTHESIZE THE ANSWER**: Formulate a comprehensive, final answer based ONLY on the observed tool outputs. | |
| - **PRESENT CLEANLY**: Present this final answer directly to the user. Your final output must be the answer itself, not your internal 'thought' or 'action' steps. | |
| ### Response Guidelines for Complex Questions | |
| - For comparative questions: Break the query into sub-queries, use the appropriate tools, then provide the comparison. | |
| - For multi-part questions: Address each part systematically, using the appropriate tool for each sub-query. | |
| - If information is not found after using the relevant tool(s): State clearly: "Based on the available documents, I can provide the following information..." and specify what is not available. | |
| --- | |
| """ | |
| # Get the exact original template first | |
| temp_agent = ReActAgent.from_tools(tools, llm=llm, verbose=False) | |
| original_prompts = temp_agent.get_prompts() | |
| original_template = original_prompts["agent_worker:system_prompt"].template | |
| # Add instructions at the very beginning | |
| safe_template = f"{custom_instructions}{original_template}" | |
| # Create new prompt with same metadata as original | |
| original_prompt = original_prompts["agent_worker:system_prompt"] | |
| try: | |
| new_prompt = PromptTemplate( | |
| template=safe_template, | |
| template_vars=original_prompt.template_vars, | |
| metadata=original_prompt.metadata if hasattr(original_prompt, 'metadata') else None | |
| ) | |
| print(f"[LOG] ✅ Successfully created {'fallback' if is_fallback else 'standard'} custom prompt") | |
| return new_prompt | |
| except: | |
| # Even safer fallback | |
| print(f"[LOG] ⚠️ Using fallback prompt template for {'fallback' if is_fallback else 'standard'} agent") | |
| return PromptTemplate(template=safe_template) | |
| def create_agent(tools, llm, is_fallback=False): | |
| """Create the ReAct agent with custom prompt""" | |
| agent_type = "FALLBACK" if is_fallback else "STANDARD" | |
| # **FIX**: Increased max_iterations to give the agent more steps to reason | |
| max_iter = 15 | |
| print(f"[LOG] Creating {agent_type} ReAct agent with {len(tools)} tools and max_iterations={max_iter}") | |
| # Create agent with appropriate settings | |
| agent = ReActAgent.from_tools( | |
| tools, | |
| llm=llm, | |
| verbose=True, | |
| max_iterations=max_iter, | |
| ) | |
| # Create and apply safe custom prompt | |
| try: | |
| safe_custom_prompt = create_safe_custom_prompt(tools, llm, is_fallback=is_fallback) | |
| agent.update_prompts({"agent_worker:system_prompt": safe_custom_prompt}) | |
| print(f"✅ Successfully updated {agent_type} agent with safe custom prompt") | |
| except Exception as e: | |
| print(f"❌ {agent_type} agent prompt update failed: {e}") | |
| print(f"⚠️ Using original {agent_type} agent without modifications") | |
| print(f"[LOG] {agent_type} agent creation completed") | |
| return agent | |
| def create_fallback_tools(all_tools): | |
| """Extract only the general_guide_tool and who_immunization_tool for fallback agent""" | |
| print("[LOG] Creating fallback tools (guide + WHO only)") | |
| fallback_tools = [] | |
| tool_names_found = [] | |
| for tool in all_tools: | |
| tool_name = tool.metadata.name if hasattr(tool, 'metadata') else str(tool) | |
| if tool_name in ["general_guide_tool", "who_immunization_tool"]: | |
| fallback_tools.append(tool) | |
| tool_names_found.append(tool_name) | |
| print(f"[LOG] Found {len(fallback_tools)} fallback tools: {tool_names_found}") | |
| if len(fallback_tools) == 0: | |
| print("[LOG] ❌ ERROR: No fallback tools found! Check tool names.") | |
| return None | |
| return fallback_tools | |
| def initialize_rag_pipeline(tools): | |
| """Initialize the RAG pipeline with both standard and fallback agents""" | |
| print("[LOG] Initializing RAG pipeline with fallback system...") | |
| print(f"[LOG] Available tools: {[tool.metadata.name if hasattr(tool, 'metadata') else str(tool) for tool in tools]}") | |
| # Initialize LlamaIndex LLM | |
| print("[LOG] Initializing Google GenAI LLM (gemini-2.0-flash)") | |
| llama_index_llm = GoogleGenAI( | |
| model="models/gemini-2.0-flash", | |
| api_key=os.getenv('GOOGLE_API_KEY'), | |
| ) | |
| # Create standard agent | |
| print("[LOG] Creating standard agent...") | |
| standard_agent = create_agent(tools, llama_index_llm, is_fallback=False) | |
| # Create fallback tools and agent | |
| print("[LOG] Creating fallback agent...") | |
| fallback_tools = create_fallback_tools(tools) | |
| if fallback_tools is None: | |
| print("[LOG] ❌ WARNING: Fallback agent creation failed - no fallback tools available") | |
| fallback_agent = None | |
| else: | |
| fallback_agent = create_agent(fallback_tools, llama_index_llm, is_fallback=True) | |
| print("[LOG] ✅ Fallback agent created successfully") | |
| print("[LOG] ✅ RAG pipeline initialization completed with fallback system") | |
| return { | |
| "standard_agent": standard_agent, | |
| "fallback_agent": fallback_agent, | |
| "llm": llama_index_llm | |
| } | |
| def detect_max_iterations_error(response_text): | |
| """Detect if the response indicates a max iterations error OR is an unfinished thought.""" | |
| response_lower = response_text.lower().strip() | |
| # **FIX**: Check if the response is the agent's raw thought process. | |
| if response_lower.startswith("a:```thought") or response_lower.startswith("```thought"): | |
| print("[LOG] Detected unfinished agent thought process.") | |
| return True | |
| max_iteration_indicators = [ | |
| "max iterations", | |
| "reached max iterations", | |
| "agent stopped due to max iterations", | |
| "maximum number of iterations", | |
| "iteration limit" | |
| ] | |
| # Check for explicit max iterations indicators | |
| for indicator in max_iteration_indicators: | |
| if indicator in response_lower: | |
| print(f"[LOG] Detected max iteration indicator: '{indicator}'") | |
| return True | |
| # Check for very short or empty responses (often indicates failure) | |
| if len(response_text.strip()) < 10: | |
| return True | |
| # Check for generic error patterns | |
| if ("error" in response_lower and "processing" in response_lower): | |
| return True | |
| return False | |
| def process_question(agents_dict, question: str) -> str: | |
| """Process a question through the RAG pipeline with fallback support""" | |
| print(f"[LOG] Processing question: '{question[:100]}{'...' if len(question) > 100 else ''}'") | |
| standard_agent = agents_dict["standard_agent"] | |
| fallback_agent = agents_dict["fallback_agent"] | |
| print("="*50) | |
| print("🤖 STANDARD AGENT REASONING PROCESS:") | |
| print("="*50) | |
| start_time = time.time() | |
| try: | |
| # Try standard agent first | |
| response = standard_agent.chat(question) | |
| response_text = response.response | |
| print("="*50) | |
| print("🤖 STANDARD AGENT REASONING COMPLETED") | |
| print("="*50) | |
| elapsed_time = time.time() - start_time | |
| print(f"[LOG] ✅ Standard agent response received in {elapsed_time:.2f} seconds") | |
| print(f"[LOG] Response length: {len(response_text)} characters") | |
| # Check if we need to use fallback | |
| if detect_max_iterations_error(response_text): | |
| print("[LOG] 🔄 Max iterations or unfinished thought detected, switching to FALLBACK AGENT...") | |
| if fallback_agent is None: | |
| print("[LOG] ❌ Fallback agent not available, returning error message") | |
| return ("I apologize, but I encountered difficulties processing your question. " | |
| "Please try rephrasing your question more specifically or breaking it down into smaller parts.") | |
| print("="*50) | |
| print("🛡️ FALLBACK AGENT REASONING PROCESS:") | |
| print("="*50) | |
| fallback_start_time = time.time() | |
| try: | |
| fallback_response = fallback_agent.chat(question) | |
| fallback_text = fallback_response.response | |
| print("="*50) | |
| print("🛡️ FALLBACK AGENT REASONING COMPLETED") | |
| print("="*50) | |
| fallback_elapsed = time.time() - fallback_start_time | |
| total_elapsed = time.time() - start_time | |
| print(f"[LOG] ✅ Fallback agent response received in {fallback_elapsed:.2f} seconds") | |
| print(f"[LOG] Total processing time: {total_elapsed:.2f} seconds") | |
| print(f"[LOG] Fallback response length: {len(fallback_text)} characters") | |
| # Check if fallback also failed | |
| if detect_max_iterations_error(fallback_text): | |
| print("[LOG] ❌ Fallback agent also hit max iterations or failed to produce an answer.") | |
| return ("I apologize, but I'm having difficulty finding specific information about your question in the available documents. " | |
| "Please try asking a more specific question or rephrasing your query.") | |
| return fallback_text | |
| except Exception as e: | |
| fallback_elapsed = time.time() - fallback_start_time | |
| print(f"[LOG] ❌ Fallback agent error after {fallback_elapsed:.2f} seconds: {e}") | |
| return ("I apologize, but I encountered an error while processing your question. " | |
| "Please try rephrasing your question or asking about a more specific topic.") | |
| return response_text | |
| except Exception as e: | |
| elapsed_time = time.time() - start_time | |
| print(f"[LOG] ❌ Standard agent error after {elapsed_time:.2f} seconds: {e}") | |
| # Try fallback even on standard agent exception | |
| if fallback_agent is not None: | |
| print("[LOG] 🔄 Standard agent failed, trying FALLBACK AGENT...") | |
| try: | |
| fallback_response = fallback_agent.chat(question) | |
| return fallback_response.response | |
| except Exception as fallback_e: | |
| print(f"[LOG] ❌ Fallback agent also failed: {fallback_e}") | |
| return f"Error processing your question: {str(e)}" | |
| def aswer_language_detection(response_text: str) -> str: | |
| """ | |
| Detect the language of the response text. | |
| Args: | |
| response_text (str): The response text to analyze. | |
| Returns: | |
| str: Detected language code (e.g., 'en', 'fr', etc.) | |
| """ | |
| print("[LOG] Detecting response language...") | |
| try: | |
| # Detect the language of the first 5 words of the response | |
| first_line = " ".join(response_text.split()[:5]) | |
| first_line = re.sub(r'\[.*?\]', '', first_line) # Remove citations | |
| answer_language = detect(first_line) | |
| print(f"[LOG] Detected language: {answer_language}") | |
| if answer_language not in ['en', 'ar', 'fr']: | |
| print(f"[LOG] Language {answer_language} not in supported list, defaulting to 'en'") | |
| answer_language ='en' | |
| except: | |
| print("[LOG] Language detection failed, defaulting to 'en'") | |
| answer_language ='en' | |
| finally: | |
| return answer_language | |
| def process_question_with_sequential_citations(agents_dict, question: str, chunks_directory="./data/") -> dict: | |
| """ | |
| Process a question through the RAG pipeline with fallback support and return response with sequential citation numbers. | |
| Args: | |
| agents_dict: Dictionary containing standard_agent, fallback_agent, and llm | |
| question (str): The user's question | |
| chunks_directory (str): Path to the directory containing JSON files | |
| Returns: | |
| dict: { | |
| "response": str, # Response with sequential citation numbers [1], [2], etc. | |
| "cited_elements_json": str, # JSON array of cited elements in order | |
| "unique_ids": list, # Original source IDs in order | |
| "citation_mapping": dict, # Mapping from source ID to citation number | |
| "used_fallback": bool # Whether fallback agent was used | |
| } | |
| """ | |
| print(f"\n[LOG] === STARTING QUESTION PROCESSING WITH FALLBACK SUPPORT ===") | |
| print(f"[LOG] Question: '{question[:150]}{'...' if len(question) > 150 else ''}'") | |
| print(f"[LOG] Chunks directory: {chunks_directory}") | |
| start_time = time.time() | |
| used_fallback = False # This flag is a heuristic | |
| try: | |
| # Get the response using the enhanced process_question function | |
| response_text = process_question(agents_dict, question) | |
| # Check if fallback was likely used (simple heuristic based on logs) | |
| # A more robust way would be for `process_question` to return a tuple (response, used_fallback) | |
| if "switching to fallback agent" in response_text.lower(): | |
| used_fallback = True | |
| print("[LOG] 🛡️ Fallback agent was likely used based on log indicators.") | |
| agent_time = time.time() - start_time | |
| print(f"[LOG] Agent processing completed in {agent_time:.2f} seconds") | |
| print(f"[LOG] Raw response length: {len(response_text)} characters") | |
| # Extract source IDs from the response (preserving order) | |
| unique_ids = extract_source_ids(response_text) | |
| # Create mapping from source ID to sequential number | |
| source_id_to_number = {source_id: i + 1 for i, source_id in enumerate(unique_ids)} | |
| print(f"[LOG] Created citation mapping for {len(source_id_to_number)} sources") | |
| # Convert citations to sequential numbers | |
| sequential_response = convert_citations_to_sequential(response_text, source_id_to_number) | |
| # Load all chunks data to find cited elements | |
| print("[LOG] Loading chunks data for citation lookup...") | |
| all_chunks_data = [] | |
| min_chunks_files = ["Guide-pratique-de-mise-en-oeuvre-du-calendrier-national-de-vaccination-2023.json", | |
| "Immunization in Practice_WHO_eng_2015.json"] | |
| for json_file in min_chunks_files: | |
| json_path = os.path.join(chunks_directory, json_file) | |
| if not os.path.exists(json_path): | |
| print(f"[LOG] ⚠️ Skipping non-existent file: {json_path}") | |
| continue | |
| print(f"[LOG] Loading {json_file}...") | |
| try: | |
| with open(json_path, "r", encoding="utf-8") as f: | |
| chunks_data = json.load(f) | |
| all_chunks_data.extend(chunks_data) | |
| print(f"[LOG] ✅ Loaded {len(chunks_data)} chunks from {json_file}") | |
| except Exception as e: | |
| print(f"[LOG] ❌ Warning: Could not load {json_file}: {e}") | |
| print(f"[LOG] Total chunks loaded: {len(all_chunks_data)}") | |
| # Get cited elements in the same order as the sequential citations | |
| print("[LOG] Finding cited elements...") | |
| cited_elements_ordered = [] | |
| for i, source_id in enumerate(unique_ids): # This preserves the order | |
| # print(f"[LOG] Looking for source ID {i+1}/{len(unique_ids)}: {source_id}") # This is too verbose for normal operation | |
| found = False | |
| for element in all_chunks_data: | |
| # Handle TableElement structure | |
| if element.get("type") == 'TableElement': | |
| if element.get("elements", {}).get("element_id") == source_id: | |
| cited_elements_ordered.append(element.get("elements", {})) | |
| found = True | |
| break | |
| # Handle other element structures | |
| elif "elements" in element and isinstance(element["elements"], list): | |
| for nested_element in element["elements"]: | |
| if isinstance(nested_element, dict) and nested_element.get("element_id") == source_id: | |
| cited_elements_ordered.append(nested_element) | |
| found = True | |
| break | |
| if found: | |
| break | |
| if not found: | |
| print(f"[LOG] ⚠️ Source ID {source_id} not found in chunks data") | |
| print(f"[LOG] Found {len(cited_elements_ordered)} cited elements") | |
| # Convert to JSON | |
| cited_elements_json = json.dumps(cited_elements_ordered, ensure_ascii=False, indent=2) | |
| answer_language = aswer_language_detection(response_text) | |
| total_time = time.time() - start_time | |
| print(f"[LOG] ✅ Processing completed in {total_time:.2f} seconds total") | |
| print(f"[LOG] Final response length: {len(sequential_response)} characters") | |
| print(f"[LOG] Used fallback: {used_fallback}") | |
| print(f"[LOG] === QUESTION PROCESSING COMPLETED ===\n") | |
| return { | |
| "response": sequential_response, | |
| "cited_elements_json": cited_elements_json, | |
| "unique_ids": unique_ids, | |
| "citation_mapping": source_id_to_number, | |
| "answer_language": answer_language, | |
| "used_fallback": used_fallback | |
| } | |
| except Exception as e: | |
| elapsed_time = time.time() - start_time | |
| print(f"[LOG] ❌ Error processing question after {elapsed_time:.2f} seconds: {e}") | |
| error_response = "I apologize, but I encountered an error while processing your question. Please try rephrasing your question or asking about a more specific topic." | |
| return { | |
| "response": error_response, | |
| "cited_elements_json": "[]", | |
| "unique_ids": [], | |
| "citation_mapping": {}, | |
| "answer_language": "en", | |
| "used_fallback": False | |
| } | |
| def process_question_with_citations(agents_dict, question: str, chunks_directory="./data/") -> dict: | |
| """ | |
| Legacy function - maintained for backward compatibility. | |
| Now calls the new sequential citation function with fallback support. | |
| """ | |
| print("[LOG] Using legacy function wrapper - redirecting to sequential citations with fallback") | |
| return process_question_with_sequential_citations(agents_dict, question, chunks_directory) |