Spaces:
Running
Running
| import logging | |
| import traceback | |
| from typing import Any, AsyncGenerator | |
| import asyncio | |
| import requests | |
| import os | |
| from langchain.agents import create_openai_tools_agent, AgentExecutor | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain.schema import OutputParserException | |
| from langchain.callbacks.base import BaseCallbackHandler | |
| from openai import RateLimitError, APIError | |
| from .config import get_llm, logger | |
| from .tools import ( | |
| medical_guidelines_knowledge_tool, | |
| compare_providers_tool, | |
| get_current_datetime_tool, | |
| side_effect_recording_tool, | |
| medical_answer_validation_tool, | |
| ) | |
| # LangSmith tracing utilities | |
| from .tracing import traceable, trace, conversation_tracker, log_to_langsmith | |
| from .validation import validate_medical_answer | |
| # ============================================================================ | |
| # STREAMING CALLBACK HANDLER | |
| # ============================================================================ | |
| class StreamingCallbackHandler(BaseCallbackHandler): | |
| """Custom callback handler for streaming responses.""" | |
| def __init__(self): | |
| self.tokens = [] | |
| self.current_response = "" | |
| def on_llm_new_token(self, token: str, **kwargs: Any) -> Any: | |
| """Called when a new token is generated.""" | |
| self.tokens.append(token) | |
| self.current_response += token | |
| def get_response(self) -> str: | |
| """Get the current response.""" | |
| return self.current_response | |
| def reset(self): | |
| """Reset the handler for a new response.""" | |
| self.tokens = [] | |
| self.current_response = "" | |
| # ============================================================================ | |
| # CUSTOM EXCEPTION CLASSES | |
| # ============================================================================ | |
| class AgentError(Exception): | |
| """Base exception for agent-related errors.""" | |
| pass | |
| class ToolExecutionError(AgentError): | |
| """Exception raised when a tool fails to execute.""" | |
| pass | |
| class APIConnectionError(AgentError): | |
| """Exception raised when API connections fail.""" | |
| pass | |
| class ValidationError(AgentError): | |
| """Exception raised when input validation fails.""" | |
| pass | |
| # ============================================================================ | |
| # AGENT CONFIGURATION | |
| # ============================================================================ | |
| # Available tools for the agent | |
| AVAILABLE_TOOLS = [ | |
| medical_guidelines_knowledge_tool, | |
| compare_providers_tool, | |
| get_current_datetime_tool, | |
| side_effect_recording_tool, | |
| medical_answer_validation_tool, | |
| ] | |
| # System message template for the agent | |
| SYSTEM_MESSAGE = """ | |
| You are an advanced Medical Advisor Chatbot for healthcare professionals. | |
| Your primary purpose is to answer clinical and medical questions strictly based on authoritative medical guidelines using the tool "medical_guidelines_knowledge_tool". | |
| **INSTRUCTIONS:** | |
| - Always answer using only the information retrieved from medical guidelines via "medical_guidelines_knowledge_tool". | |
| - **SIDE EFFECT REPORTING**: When a healthcare professional reports an adverse drug reaction, side effect, or medication-related complication, ALWAYS use the "side_effect_recording_tool" first to document the information. Return the tool's response directly to the user without modification. DO NOT use validation or generate additional reports for side effect reporting queries. | |
| - Use the side effect recording tool when the input contains phrases like: "patient experienced", "side effect", "adverse reaction", "drug reaction", "medication caused", "developed after taking", etc. | |
| - When the side effect recording tool requests additional information, present the request exactly as provided by the tool. | |
| - For every answer, you MUST provide detailed citations including: | |
| * Source file name | |
| * Page number | |
| * Provider name | |
| * Specific location (e.g., Table 1, Figure 2, Box 3, Section Header, etc.) | |
| * Type of content (e.g., table, flowchart, bullet point, paragraph, etc.) | |
| - Use this format for citations: | |
| (Source: [file name], Page: [page number], Provider: [provider name], Location: [specific location], Type: [content type]) | |
| - If multiple sources are used, cite each one with its corresponding metadata. | |
| - If a specific provider (NCCN, ASCO, ESMO, etc.) is mentioned in the question, prioritize information from that provider. | |
| - When citing tables or flowcharts: | |
| * Specify the table/figure number if available | |
| * Describe which part of the table/figure contains the information | |
| * Reference any relevant footnotes or legends | |
| - When citing text: | |
| * Specify the section or subsection heading | |
| * Indicate if it's from a bullet point, paragraph, or other format | |
| - If the answer is not found in the retrieved guidelines, respond: "I do not know." | |
| - Never speculate or provide information not present in the guidelines. | |
| - Always respond in English. | |
| **FORMATTING:** | |
| - Use markdown formatting for clarity: | |
| * Use bullet points for lists | |
| * Use bold for emphasis on key points | |
| * Use tables when summarizing multiple points | |
| **SAFETY DISCLAIMER:** | |
| Important: For emergencies call emergency services immediately. This is educational information for healthcare professionals, not a substitute for clinical judgment. | |
| **EMERGENCY PROTOCOL:** | |
| If the question describes emergency symptoms (chest pain, difficulty breathing, severe bleeding, loss of consciousness, etc.), immediately respond: | |
| "This is an emergency! Call emergency services immediately and seek urgent medical help." | |
| **Language:** | |
| - Always respond in English. | |
| """ | |
| # Create the prompt template | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", SYSTEM_MESSAGE), | |
| MessagesPlaceholder("chat_history"), | |
| ("human", "{input}"), | |
| MessagesPlaceholder("agent_scratchpad"), | |
| ]) | |
| # Initialize the agent with lazy loading | |
| def get_agent(): | |
| """Get agent with lazy loading for faster startup""" | |
| return create_openai_tools_agent( | |
| llm=get_llm(), | |
| tools=AVAILABLE_TOOLS, | |
| prompt=prompt_template, | |
| ) | |
| # Create agent executor with lazy loading | |
| def get_agent_executor(): | |
| """Get agent executor with lazy loading for faster startup""" | |
| return AgentExecutor( | |
| agent=get_agent(), | |
| tools=AVAILABLE_TOOLS, | |
| verbose=True, | |
| handle_parsing_errors=True, | |
| max_iterations=5, | |
| max_execution_time=90, # tighten a bit to help responsiveness | |
| ) | |
| # Initialize memory | |
| memory = ConversationBufferWindowMemory( | |
| memory_key="chat_history", | |
| return_messages=True, | |
| max_window_size=10 | |
| ) | |
| # ============================================================================ | |
| # VALIDATION HELPER FUNCTIONS | |
| # ============================================================================ | |
| def _should_validate_response(user_input: str, response: str) -> bool: | |
| """ | |
| Determine if a response should be automatically validated. | |
| Args: | |
| user_input: The user's input | |
| response: The agent's response | |
| Returns: | |
| bool: True if the response should be validated | |
| """ | |
| # Skip validation for certain types of responses | |
| skip_indicators = [ | |
| "side effect report", | |
| "adverse drug reaction report", | |
| "error:", | |
| "sorry,", | |
| "i don't know", | |
| "i do not know", | |
| "validation report", | |
| "evaluation scores" | |
| ] | |
| # Skip validation for side effect reporting queries in user input | |
| side_effect_input_indicators = [ | |
| "side effect", "adverse reaction", "adverse event", "drug reaction", | |
| "medication reaction", "patient experienced", "developed after taking", | |
| "caused by medication", "drug-related", "medication-related" | |
| ] | |
| user_input_lower = user_input.lower() | |
| response_lower = response.lower() | |
| # Don't validate if user input is about side effect reporting | |
| if any(indicator in user_input_lower for indicator in side_effect_input_indicators): | |
| return False | |
| # Don't validate if response contains skip indicators | |
| if any(indicator in response_lower for indicator in skip_indicators): | |
| return False | |
| # Don't validate very short responses | |
| if len(response.strip()) < 50: | |
| return False | |
| # Validate if response seems to contain medical information | |
| medical_indicators = [ | |
| "treatment", "therapy", "diagnosis", "medication", "drug", "patient", | |
| "clinical", "guideline", "recommendation", "according to", "source:", | |
| "provider:", "page:", "nccn", "asco", "esmo", "nice" | |
| ] | |
| return any(indicator in response_lower for indicator in medical_indicators) | |
| def _perform_automatic_validation(user_input: str, response: str) -> str: | |
| """ | |
| Perform automatic validation and append results to response. | |
| Args: | |
| user_input: The user's input | |
| response: The agent's response | |
| Returns: | |
| str: Response with validation results appended | |
| """ | |
| try: | |
| # Import here to avoid circular imports | |
| from .tools import _last_question, _last_documents, _last_user_question | |
| # Check if we have the necessary context for validation | |
| if not _last_question or not _last_documents: | |
| logger.info("Skipping validation: insufficient context") | |
| return response | |
| # Perform validation using the original user input instead of tool query | |
| evaluation = validate_medical_answer(user_input, _last_documents, response) | |
| # Format validation results | |
| report = evaluation.get("validation_report", {}) | |
| validation_summary = f""" | |
| --- | |
| ## 🔍 **AUTOMATIC VALIDATION REPORT** | |
| **Overall Score:** {report.get('Overall_Rating', 'N/A')}/100 | |
| **Key Metrics:** | |
| **Accuracy:** {report.get('Accuracy_Rating', 'N/A')}/100 | |
| {report.get('Accuracy_Comment', 'No comment available')} | |
| **Coherence:** {report.get('Coherence_Rating', 'N/A')}/100 | |
| {report.get('Coherence_Comment', 'No comment available')} | |
| **Relevance:** {report.get('Relevance_Rating', 'N/A')}/100 | |
| {report.get('Relevance_Comment', 'No comment available')} | |
| **Completeness:** {report.get('Completeness_Rating', 'N/A')}/100 | |
| {report.get('Completeness_Comment', 'No comment available')} | |
| **Citations:** {report.get('Citations_Attribution_Rating', 'N/A')}/100 | |
| {report.get('Citations_Attribution_Comment', 'No comment available')} | |
| **Length:** {report.get('Length_Rating', 'N/A')}/100 | |
| {report.get('Length_Comment', 'No comment available')} | |
| **Assessment:** {report.get('Final_Summary_and_Improvement_Plan', 'No assessment available')} | |
| *Validation ID: {evaluation.get('interaction_id', 'N/A')} | Saved to evaluation_results.json* | |
| """ | |
| return response + validation_summary | |
| except Exception as e: | |
| logger.error(f"Automatic validation failed: {e}") | |
| return response | |
| # ============================================================================ | |
| # STREAMING AGENT FUNCTIONS | |
| # ============================================================================ | |
| # @traceable(name="run_agent_streaming") | |
| async def run_agent_streaming(user_input: str, max_retries: int = 3) -> AsyncGenerator[str, None]: | |
| """ | |
| Run the agent with streaming support and comprehensive error handling. | |
| This function processes user input through the agent executor with streaming | |
| capabilities, robust error handling, and automatic retries for recoverable errors. | |
| Args: | |
| user_input (str): The user's input message to process | |
| max_retries (int, optional): Maximum number of retries for recoverable errors. | |
| Defaults to 3. | |
| Yields: | |
| str: Chunks of the agent's response as they are generated | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| # Input validation | |
| if not user_input or not user_input.strip(): | |
| logger.warning("Empty input received") | |
| yield "Sorry, I didn't receive any questions. Please enter your question or request." | |
| return | |
| # Store the original user question for validation | |
| from .tools import store_user_question | |
| store_user_question(user_input.strip()) | |
| retry_count = 0 | |
| last_error = None | |
| current_run_id = None | |
| # Session metadata (increment conversation count) | |
| session_metadata = conversation_tracker.get_session_metadata(increment=True) | |
| while retry_count <= max_retries: | |
| try: | |
| # Tracing for streaming disabled to avoid duplicate traces. | |
| # We keep tracing only for the AgentExecutor in run_agent(). | |
| current_run_id = None | |
| # Load conversation history from memory | |
| chat_history = memory.load_memory_variables({})["chat_history"] | |
| logger.info(f"Processing user input (attempt {retry_count + 1}): {user_input[:50]}...") | |
| # Create streaming callback handler | |
| streaming_handler = StreamingCallbackHandler() | |
| # Run the agent in a separate thread to avoid blocking | |
| def run_sync(): | |
| return get_agent_executor().invoke( | |
| { | |
| "input": user_input.strip(), | |
| "chat_history": chat_history, | |
| }, | |
| config={"callbacks": [streaming_handler]}, | |
| ) | |
| # Execute the agent with streaming | |
| full_response = "" | |
| previous_length = 0 | |
| # Start the agent execution in background | |
| loop = asyncio.get_event_loop() | |
| task = loop.run_in_executor(None, run_sync) | |
| # Stream the response as it's being generated | |
| while not task.done(): | |
| current_response = streaming_handler.get_response() | |
| # Yield new tokens if available | |
| if len(current_response) > previous_length: | |
| new_content = current_response[previous_length:] | |
| previous_length = len(current_response) | |
| yield new_content | |
| # Small delay to prevent overwhelming the client (faster flushing) | |
| await asyncio.sleep(0.03) | |
| # Get the final result | |
| response = await task | |
| # Yield any remaining content | |
| final_response = streaming_handler.get_response() | |
| if len(final_response) > previous_length: | |
| yield final_response[previous_length:] | |
| # If no streaming content was captured, yield the full response | |
| if not final_response and response and "output" in response: | |
| full_output = response["output"] | |
| # Simulate streaming by yielding word by word | |
| words = full_output.split(' ') | |
| for word in words: | |
| yield word + ' ' | |
| await asyncio.sleep(0.05) | |
| final_response = full_output | |
| # Validate response structure | |
| if not response or "output" not in response: | |
| raise ValidationError("Invalid response format from agent") | |
| if not response["output"] or not response["output"].strip(): | |
| raise ValidationError("Empty response from agent") | |
| # Perform automatic validation if appropriate | |
| base_response = response["output"] | |
| if _should_validate_response(user_input, base_response): | |
| logger.info("Performing automatic validation for streaming response...") | |
| try: | |
| validation_content = _perform_automatic_validation(user_input, base_response) | |
| # Extract just the validation part (everything after the original response) | |
| if len(validation_content) > len(base_response): | |
| validation_part = validation_content[len(base_response):] | |
| # Stream the validation part | |
| validation_words = validation_part.split(' ') | |
| for word in validation_words: | |
| yield word + ' ' | |
| await asyncio.sleep(0.02) | |
| except Exception as e: | |
| logger.error(f"Streaming validation failed: {e}") | |
| # Save conversation context to memory | |
| memory.save_context( | |
| {"input": user_input}, | |
| {"output": response["output"]} | |
| ) | |
| # Log response metrics to LangSmith | |
| try: | |
| log_to_langsmith( | |
| key="response_metrics", | |
| value={ | |
| "response_length": len(response.get("output", "")), | |
| "attempt": retry_count + 1, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| logger.info(f"Successfully processed user input: {user_input[:50]}...") | |
| return | |
| except RateLimitError as e: | |
| retry_count += 1 | |
| last_error = e | |
| wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60 seconds | |
| logger.warning( | |
| f"Rate limit exceeded. Retrying in {wait_time} seconds... " | |
| f"(Attempt {retry_count}/{max_retries})" | |
| ) | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(wait_time) | |
| continue | |
| else: | |
| logger.error("Rate limit exceeded after maximum retries") | |
| yield "Sorry, the system is currently busy. Please try again in a little while." | |
| return | |
| except APIError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"OpenAI API error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| yield "Sorry, there was an error connecting to the service. Please try again later." | |
| return | |
| except requests.exceptions.ConnectionError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Network connection error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(3) | |
| continue | |
| else: | |
| yield "Sorry, I can't connect to the service right now. Please check your internet connection and try again." | |
| return | |
| except requests.exceptions.Timeout as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Request timeout: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| yield "Sorry, the request took longer than expected. Please try again." | |
| return | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request error: {str(e)}") | |
| yield "Sorry, an error occurred with the request. Please try again." | |
| return | |
| except OutputParserException as e: | |
| logger.error(f"Output parsing error: {str(e)}") | |
| yield "Sorry, an error occurred while processing the response. Please rephrase your question and try again." | |
| return | |
| except ValidationError as e: | |
| logger.error(f"Validation error: {str(e)}") | |
| yield "Sorry, an error occurred while validating the data. Please try again." | |
| return | |
| except ToolExecutionError as e: | |
| logger.error(f"Tool execution error: {str(e)}") | |
| yield "Sorry, an error occurred while executing one of the operations. Please try again or contact technical support." | |
| return | |
| except Exception as e: | |
| logger.error(f"Unexpected error in run_agent_streaming: {str(e)}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| # Log error to LangSmith | |
| try: | |
| log_to_langsmith( | |
| key="error_log", | |
| value={ | |
| "error": str(e), | |
| "error_type": type(e).__name__, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| # For unexpected errors, don't retry | |
| yield "Sorry, an unexpected error occurred. Please try again or contact technical support if the problem persists." | |
| return | |
| # This should never be reached, but just in case | |
| logger.error(f"Maximum retries exceeded. Last error: {str(last_error)}") | |
| yield "Sorry, I was unable to process your request after several attempts. Please try again later." | |
| async def safe_run_agent_streaming(user_input: str) -> AsyncGenerator[str, None]: | |
| """ | |
| Streaming wrapper function with additional safety checks and input validation. | |
| This function provides an additional layer of safety by validating input parameters, | |
| checking input length constraints, and handling any critical errors that might | |
| occur during streaming agent execution. | |
| Args: | |
| user_input (str): The user's input message to process | |
| Yields: | |
| str: Chunks of the agent's response as they are generated | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| try: | |
| # Input type validation | |
| if not isinstance(user_input, str): | |
| logger.warning(f"Invalid input type received: {type(user_input)}") | |
| yield "Sorry, the input must be valid text." | |
| return | |
| # Input length validation | |
| stripped_input = user_input.strip() | |
| if len(stripped_input) > 1000: | |
| logger.warning(f"Input too long: {len(stripped_input)} characters") | |
| yield "Sorry, the message is too long. Please shorten your question." | |
| return | |
| if len(stripped_input) == 0: | |
| logger.warning("Empty input after stripping") | |
| yield "Sorry, I didn't receive any questions. Please enter your question or request." | |
| return | |
| # Stream the response through the main agent function | |
| async for chunk in run_agent_streaming(user_input): | |
| yield chunk | |
| except Exception as e: | |
| logger.critical(f"Critical error in safe_run_agent_streaming: {str(e)}") | |
| logger.critical(f"Traceback: {traceback.format_exc()}") | |
| yield "Sorry, a critical system error occurred. Please contact technical support immediately." | |
| async def run_agent(user_input: str, max_retries: int = 3) -> str: | |
| """ | |
| Run the agent with comprehensive error handling and retry logic. | |
| This function processes user input through the agent executor with robust | |
| error handling, automatic retries for recoverable errors, and comprehensive | |
| logging for debugging and monitoring. | |
| Args: | |
| user_input (str): The user's input message to process | |
| max_retries (int, optional): Maximum number of retries for recoverable errors. | |
| Defaults to 3. | |
| Returns: | |
| str: The agent's response or an appropriate error message in English | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| # Input validation | |
| if not user_input or not user_input.strip(): | |
| logger.warning("Empty input received") | |
| return "Sorry, I didn't receive any questions. Please enter your question or request." | |
| retry_count = 0 | |
| last_error = None | |
| current_run_id = None | |
| session_metadata = conversation_tracker.get_session_metadata(increment=True) | |
| while retry_count <= max_retries: | |
| try: | |
| # Load conversation history from memory | |
| chat_history = memory.load_memory_variables({})["chat_history"] | |
| logger.info(f"Processing user input (attempt {retry_count + 1}): {user_input[:50]}...") | |
| # Invoke the agent with input and history (synchronous call) | |
| response = get_agent_executor().invoke({ | |
| "input": user_input.strip(), | |
| "chat_history": chat_history | |
| }) | |
| current_run_id = None # This will be handled by LangChain's tracer | |
| # Validate response structure | |
| if not response or "output" not in response or not isinstance(response["output"], str): | |
| raise ValidationError("Invalid response format from agent") | |
| if not response["output"] or not response["output"].strip(): | |
| raise ValidationError("Empty response from agent") | |
| # Save conversation context to memory | |
| memory.save_context( | |
| {"input": user_input}, | |
| {"output": response["output"]} | |
| ) | |
| # Log response metrics | |
| try: | |
| log_to_langsmith( | |
| key="response_metrics", | |
| value={ | |
| "response_length": len(response.get("output", "")), | |
| "attempt": retry_count + 1, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| logger.info(f"Successfully processed user input: {user_input[:50]}...") | |
| # Perform automatic validation if appropriate | |
| final_response = response["output"] | |
| if _should_validate_response(user_input, final_response): | |
| logger.info("Performing automatic validation...") | |
| final_response = _perform_automatic_validation(user_input, final_response) | |
| return final_response | |
| except RateLimitError as e: | |
| retry_count += 1 | |
| last_error = e | |
| wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60 seconds | |
| logger.warning( | |
| f"Rate limit exceeded. Retrying in {wait_time} seconds... " | |
| f"(Attempt {retry_count}/{max_retries})" | |
| ) | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(wait_time) | |
| continue | |
| else: | |
| logger.error("Rate limit exceeded after maximum retries") | |
| return "Sorry, the system is currently busy. Please try again in a little while." | |
| except APIError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"OpenAI API error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| return "Sorry, there was an error connecting to the service. Please try again later." | |
| except requests.exceptions.ConnectionError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Network connection error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(3) | |
| continue | |
| else: | |
| return "Sorry, I can't connect to the service right now. Please check your internet connection and try again." | |
| except requests.exceptions.Timeout as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Request timeout: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| return "Sorry, the request took longer than expected. Please try again." | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request error: {str(e)}") | |
| return "Sorry, an error occurred with the request. Please try again." | |
| except OutputParserException as e: | |
| logger.error(f"Output parsing error: {str(e)}") | |
| return "Sorry, an error occurred while processing the response. Please rephrase your question and try again." | |
| except ValidationError as e: | |
| logger.error(f"Validation error: {str(e)}") | |
| return "Sorry, an error occurred while validating the data. Please try again." | |
| except ToolExecutionError as e: | |
| logger.error(f"Tool execution error: {str(e)}") | |
| return "Sorry, an error occurred while executing one of the operations. Please try again or contact technical support." | |
| except Exception as e: | |
| logger.error(f"Unexpected error in run_agent: {str(e)}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| # Log error | |
| try: | |
| log_to_langsmith( | |
| key="error_log", | |
| value={ | |
| "error": str(e), | |
| "error_type": type(e).__name__, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| # For unexpected errors, don't retry | |
| return "Sorry, an unexpected error occurred. Please try again or contact technical support if the problem persists." | |
| # This should never be reached, but just in case | |
| logger.error(f"Maximum retries exceeded. Last error: {str(last_error)}") | |
| return "Sorry, I was unable to process your request after several attempts. Please try again later." | |
| async def safe_run_agent(user_input: str) -> str: | |
| """ | |
| Wrapper function for run_agent with additional safety checks and input validation. | |
| This function provides an additional layer of safety by validating input parameters, | |
| checking input length constraints, and handling any critical errors that might | |
| occur during agent execution. | |
| Args: | |
| user_input (str): The user's input message to process | |
| Returns: | |
| str: The agent's response or an appropriate error message in English | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| try: | |
| # Input type validation | |
| if not isinstance(user_input, str): | |
| logger.warning(f"Invalid input type received: {type(user_input)}") | |
| return "Sorry, the input must be valid text." | |
| # Input length validation | |
| stripped_input = user_input.strip() | |
| # if len(stripped_input) > 1000: | |
| # logger.warning(f"Input too long: {len(stripped_input)} characters") | |
| # return "Sorry, the message is too long. Please shorten your question." | |
| if len(stripped_input) == 0: | |
| logger.warning("Empty input after stripping") | |
| return "Sorry, I didn't receive any questions. Please enter your question or request." | |
| # Process the input through the main agent function | |
| return await run_agent(user_input) | |
| except Exception as e: | |
| logger.critical(f"Critical error in safe_run_agent: {str(e)}") | |
| logger.critical(f"Traceback: {traceback.format_exc()}") | |
| return "Sorry, a critical system error occurred. Please contact technical support immediately." | |
| def clear_memory() -> None: | |
| """ | |
| Clear the conversation memory. | |
| This function clears all stored conversation history from memory, | |
| effectively starting a fresh conversation session. | |
| """ | |
| try: | |
| memory.clear() | |
| logger.info("Conversation memory cleared successfully") | |
| except Exception as e: | |
| logger.error(f"Error clearing memory: {str(e)}") | |
| def get_memory_summary() -> str: | |
| """ | |
| Get a summary of the current conversation memory. | |
| Returns: | |
| str: A summary of the conversation history stored in memory | |
| """ | |
| try: | |
| memory_vars = memory.load_memory_variables({}) | |
| return str(memory_vars.get("chat_history", "No conversation history available")) | |
| except Exception as e: | |
| logger.error(f"Error getting memory summary: {str(e)}") | |
| return "Error retrieving memory summary" | |