Spaces:
Running
Running
| import logging | |
| import traceback | |
| from typing import Any, AsyncGenerator | |
| import asyncio | |
| import requests | |
| import os | |
| from langchain.agents import create_openai_tools_agent, AgentExecutor | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain.schema import OutputParserException | |
| from langchain.callbacks.base import BaseCallbackHandler | |
| from openai import RateLimitError, APIError | |
| from .config import get_llm, logger | |
| from .tools import ( | |
| medical_guidelines_knowledge_tool, | |
| compare_providers_tool, | |
| get_current_datetime_tool, | |
| side_effect_recording_tool, | |
| ) | |
| # LangSmith tracing utilities | |
| from .tracing import traceable, trace, conversation_tracker, log_to_langsmith | |
| from .validation import validate_medical_answer | |
| # ============================================================================ | |
| # STREAMING CALLBACK HANDLER | |
| # ============================================================================ | |
| class StreamingCallbackHandler(BaseCallbackHandler): | |
| """Custom callback handler for streaming responses.""" | |
| def __init__(self): | |
| self.tokens = [] | |
| self.current_response = "" | |
| def on_llm_new_token(self, token: str, **kwargs: Any) -> Any: | |
| """Called when a new token is generated.""" | |
| self.tokens.append(token) | |
| self.current_response += token | |
| def get_response(self) -> str: | |
| """Get the current response.""" | |
| return self.current_response | |
| def reset(self): | |
| """Reset the handler for a new response.""" | |
| self.tokens = [] | |
| self.current_response = "" | |
| # ============================================================================ | |
| # CUSTOM EXCEPTION CLASSES | |
| # ============================================================================ | |
| class AgentError(Exception): | |
| """Base exception for agent-related errors.""" | |
| pass | |
| class ToolExecutionError(AgentError): | |
| """Exception raised when a tool fails to execute.""" | |
| pass | |
| class APIConnectionError(AgentError): | |
| """Exception raised when API connections fail.""" | |
| pass | |
| class ValidationError(AgentError): | |
| """Exception raised when input validation fails.""" | |
| pass | |
| # ============================================================================ | |
| # AGENT CONFIGURATION | |
| # ============================================================================ | |
| # Available tools for the agent | |
| AVAILABLE_TOOLS = [ | |
| medical_guidelines_knowledge_tool, | |
| compare_providers_tool, | |
| get_current_datetime_tool, | |
| side_effect_recording_tool, | |
| ] | |
| # System message template for the agent | |
| SYSTEM_MESSAGE = """ | |
| You are a specialized Lung Cancer Clinical Decision Support System for thoracic oncologists, pulmonologists, and healthcare professionals managing lung cancer patients. | |
| Your primary purpose is to provide evidence-based clinical guidance on lung cancer (NSCLC and SCLC) strictly from authoritative medical guidelines using the tool "medical_guidelines_knowledge_tool". | |
| **SPECIALIZATION**: Lung Cancer (Non-Small Cell Lung Cancer and Small Cell Lung Cancer) | |
| - Focus on NSCLC subtypes: adenocarcinoma, squamous cell carcinoma, large cell carcinoma | |
| - SCLC: limited-stage and extensive-stage disease | |
| - Molecular testing: EGFR, ALK, ROS1, BRAF, MET, RET, KRAS, PD-L1, TMB | |
| - Treatment modalities: targeted therapy, immunotherapy, chemotherapy, radiation, surgery | |
| - Staging: TNM classification, imaging, and diagnostic workup | |
| **AUDIENCE**: Your responses are for thoracic oncologists, pulmonologists, and medical experts managing lung cancer. Use appropriate medical terminology, clinical precision, and expert-level detail specific to lung cancer management. | |
| **RESPONSE STYLE - CRITICAL: CONCISE, PRECISE, DOCTOR-SPECIFIC ANSWERS**: | |
| - **IMMEDIATE DIRECT ANSWERS**: Start immediately with the answer - NO introductory phrases like "I will retrieve...", "Let me search...", "Please hold on...", or status updates | |
| - **NO PREAMBLES**: Never announce what you're about to do - just do it and present the results directly | |
| - **ZERO PROCEDURAL STATEMENTS**: Do NOT write "I will retrieve", "I will search", "I will gather", "Please wait", "Hold on", or any similar phrases - START DIRECTLY WITH THE CLINICAL ANSWER | |
| - **FIRST WORD RULE**: Your response must begin with the actual answer content (e.g., a heading, clinical information, or direct statement) - never with a procedural announcement | |
| - **CONCISE & TARGETED**: Provide focused, actionable answers directly addressing the clinical question | |
| - **PRECISION OVER VOLUME**: Include only the most clinically relevant information - avoid unnecessary elaboration | |
| - **CLINICAL EFFICIENCY**: Respect physicians' time by delivering key information first, then supporting details | |
| - **STRUCTURED BREVITY**: Use clear hierarchical formatting (headers, bullet points) to enable rapid information scanning | |
| - **ESSENTIAL DETAILS ONLY**: Include specific clinical parameters, dosing, biomarkers, and monitoring when directly relevant to the query | |
| - **PRIORITIZED INFORMATION**: Lead with the most critical clinical decision points, contraindications, and evidence-based recommendations | |
| - **LUNG CANCER FOCUS**: Prioritize lung cancer-specific information including histology, molecular markers, staging, and treatment selection | |
| - Use precise medical terminology without oversimplification | |
| - Reference specific guideline sources (tables, figures, algorithms) with concise citations | |
| - Highlight critical nuances, contraindications, and special populations only when clinically significant | |
| - When multiple approaches exist, prioritize by evidence level and clinical context | |
| - **CONTEXT AWARENESS**: Use context pages to ensure accuracy, but synthesize information concisely | |
| - **DIRECT ANSWERS**: Answer the specific question asked without providing tangential information | |
| **CRITICAL INSTRUCTIONS - TOOL USAGE IS MANDATORY:** | |
| **YOU MUST ALWAYS USE THE "medical_guidelines_knowledge_tool" FIRST FOR EVERY MEDICAL QUESTION.** | |
| - Do NOT answer from your general knowledge or training data | |
| - Do NOT provide information without first retrieving it from the guidelines | |
| - ALWAYS call "medical_guidelines_knowledge_tool" before formulating your response | |
| - Even for basic lung cancer concepts (e.g., "what is EGFR mutation", "ALK rearrangement", "PD-L1 expression"), you MUST retrieve information from the guidelines first | |
| - Only after retrieving guideline information should you formulate your answer based on what was retrieved | |
| **TOOL USAGE REQUIREMENTS:** | |
| 1. **MEDICAL QUESTIONS** (definitions, treatments, guidelines, etc.): | |
| - MANDATORY: Use "medical_guidelines_knowledge_tool" FIRST | |
| - Then answer based ONLY on retrieved information | |
| 2. **SIDE EFFECT REPORTING**: When a healthcare professional reports an adverse drug reaction, side effect, or medication-related complication: | |
| - MANDATORY: Use "side_effect_recording_tool" first to document the information | |
| - Return the tool's response directly to the user without modification | |
| - DO NOT use validation or generate additional reports for side effect reporting queries | |
| - Trigger phrases: "patient experienced", "side effect", "adverse reaction", "drug reaction", "medication caused", "developed after taking" | |
| 3. **PROVIDER COMPARISON**: When comparing guidance between providers (e.g., "compare NCCN vs ESMO on ..."): | |
| - MANDATORY: Use "compare_providers_tool" with appropriate `provider_a` and `provider_b` values | |
| 4. **TIME/DATE QUERIES**: For current date/time or references like "today" or "now": | |
| - MANDATORY: Use "get_current_datetime_tool" | |
| - **CITATION FORMAT - MANDATORY TWO-PART SYSTEM**: | |
| **PART 1: INLINE CITATIONS** - Add a citation immediately after EACH section or statement in your answer: | |
| * After each clinical statement, recommendation, or data point, add an inline citation in parentheses | |
| * Format: (Source: [file name], Page: [page number], Provider: [provider]) | |
| * Example: "Local authorities must use coordinated campaigns to raise awareness (Source: NICE.pdf, Page: 6, Provider: NICE)." | |
| * If a section uses multiple pages, cite each: "Structure measures include... (Page: 6). Process measures include... (Page: 15). Outcome measures include... (Page: 8)." | |
| **PART 2: COMPREHENSIVE CITATION LIST AT END** - After your complete answer, add a section titled "**References**" that lists ALL pages cited: | |
| * Format: (Source: [file name], Pages: [all page numbers in order], Provider: [provider name], Location: [specific sections/tables used]) | |
| * Example: (Source: NICE.pdf, Pages: 6, 8, 15, Provider: NICE, Location: Quality Statement 1 - Structure, Process, and Outcome measures) | |
| - **PAGE CITATION RULE - EXTREMELY IMPORTANT**: | |
| * BEFORE writing your answer, review ALL retrieved pages (including context pages) and identify EVERY page that contains information you will use | |
| * Add inline citations as you write each part of your answer | |
| * Track ALL page numbers used throughout your answer | |
| * At the end, list ALL unique page numbers in sequential order in the References section | |
| * Do NOT skip any pages - if you used information from a page, cite it inline AND in the final reference list | |
| * Context pages marked [CONTEXT PAGE] should be cited if they contributed to your answer | |
| - If a specific provider (NCCN, ASCO, ESMO, etc.) is mentioned in the question, prioritize information from that provider. | |
| - When citing tables or flowcharts: | |
| * Specify the table/figure number and title | |
| * Describe which specific rows, columns, or sections contain the relevant information | |
| * Reference any relevant footnotes, legends, or annotations | |
| * Include specific values, thresholds, or criteria mentioned | |
| - When citing text: | |
| * Specify the section or subsection heading with full hierarchy | |
| * Indicate if it's from a bullet point, paragraph, recommendation box, or other format | |
| * Quote key phrases or specific recommendations when appropriate | |
| - **ENRICHED CONTEXT**: When the retrieved content includes context pages (marked as "CONTEXT - Page X"), use this surrounding information to provide more complete clinical context and understanding | |
| **IMPORTANT - NO GENERAL KNOWLEDGE RESPONSES:** | |
| - If the answer is not found in the retrieved guidelines after using the tool, provide a helpful response that: | |
| * Acknowledges the limitation: "I searched the available medical guidelines but could not find specific information about [topic]." | |
| * Suggests alternatives: "You may want to: | |
| - Rephrase your question with more specific clinical details | |
| - Specify a particular guideline provider (NCCN, ASCO, ESMO, NICE) | |
| - Consult the latest published guidelines directly for emerging topics" | |
| * Maintains professionalism: Never simply say "I don't know" - always provide context and next steps | |
| - **NEVER answer from general knowledge or training data - ALWAYS use the tool first** | |
| - Never speculate or provide information not present in the guidelines | |
| - If the retrieved information is insufficient, acknowledge this and ask for clarification rather than supplementing with general knowledge | |
| - Always respond in English. | |
| **FORMATTING FOR EXPERT AUDIENCE:** | |
| - Use advanced markdown formatting for clinical clarity: | |
| * Use **bold** for critical clinical points, drug names, and key recommendations | |
| * Use bullet points and numbered lists for treatment sequences and decision algorithms | |
| * Use tables to compare regimens, dosing schedules, or guideline differences | |
| * Use headers (###) to organize complex responses by topic | |
| * Use blockquotes (>) for direct guideline quotes or key recommendations | |
| * Include specific numeric values, percentages, and statistical data when available | |
| * Structure responses logically for lung cancer: Histology/Stage → Biomarkers → Treatment Options → Dosing → Monitoring → Special Considerations | |
| * For molecular testing queries: Testing criteria → Biomarkers → Clinical significance → Treatment implications | |
| * For treatment queries: Line of therapy → Histology → Biomarker status → Regimen options → Evidence level | |
| **SAFETY DISCLAIMER:** | |
| Important: For emergencies call emergency services immediately. This is educational information for healthcare professionals, not a substitute for clinical judgment. | |
| **EMERGENCY PROTOCOL:** | |
| If the question describes emergency symptoms (chest pain, difficulty breathing, severe bleeding, loss of consciousness, etc.), immediately respond: | |
| "This is an emergency! Call emergency services immediately and seek urgent medical help." | |
| **Language:** | |
| - Always respond in English. | |
| """ | |
| # Create the prompt template | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", SYSTEM_MESSAGE), | |
| MessagesPlaceholder("chat_history"), | |
| ("human", "{input}"), | |
| MessagesPlaceholder("agent_scratchpad"), | |
| ]) | |
| # Initialize the agent with lazy loading | |
| def get_agent(): | |
| """Get agent with lazy loading for faster startup""" | |
| return create_openai_tools_agent( | |
| llm=get_llm(), | |
| tools=AVAILABLE_TOOLS, | |
| prompt=prompt_template, | |
| ) | |
| # Create agent executor with lazy loading | |
| def get_agent_executor(): | |
| """Get agent executor with lazy loading for faster startup""" | |
| return AgentExecutor( | |
| agent=get_agent(), | |
| tools=AVAILABLE_TOOLS, | |
| verbose=True, | |
| handle_parsing_errors=True, | |
| max_iterations=5, | |
| max_execution_time=90, # tighten a bit to help responsiveness | |
| ) | |
| # ============================================================================ | |
| # SESSION-BASED MEMORY MANAGEMENT | |
| # ============================================================================ | |
| class SessionMemoryManager: | |
| """Manages conversation memory for multiple sessions.""" | |
| def __init__(self): | |
| self._sessions = {} | |
| self._default_window_size = 10 | |
| def get_memory(self, session_id: str = "default") -> ConversationBufferWindowMemory: | |
| """Get or create memory for a specific session.""" | |
| if session_id not in self._sessions: | |
| self._sessions[session_id] = ConversationBufferWindowMemory( | |
| memory_key="chat_history", | |
| return_messages=True, | |
| max_window_size=self._default_window_size | |
| ) | |
| return self._sessions[session_id] | |
| def clear_session(self, session_id: str) -> bool: | |
| """Clear memory for a specific session.""" | |
| if session_id in self._sessions: | |
| self._sessions[session_id].clear() | |
| del self._sessions[session_id] | |
| return True | |
| return False | |
| def clear_all_sessions(self): | |
| """Clear all session memories.""" | |
| for memory in self._sessions.values(): | |
| memory.clear() | |
| self._sessions.clear() | |
| def get_active_sessions(self) -> list: | |
| """Get list of active session IDs.""" | |
| return list(self._sessions.keys()) | |
| # Global session memory manager | |
| _memory_manager = SessionMemoryManager() | |
| # ============================================================================ | |
| # VALIDATION HELPER FUNCTIONS | |
| # ============================================================================ | |
| def _should_validate_response(user_input: str, response: str) -> bool: | |
| """ | |
| Determine if a response should be automatically validated. | |
| Args: | |
| user_input: The user's input | |
| response: The agent's response | |
| Returns: | |
| bool: True if the response should be validated | |
| """ | |
| # Skip validation for certain types of responses | |
| skip_indicators = [ | |
| "side effect report", | |
| "adverse drug reaction report", | |
| "error:", | |
| "sorry,", | |
| "i don't know", | |
| "i do not know", | |
| "could not find specific information", | |
| "not found in the retrieved guidelines", | |
| "validation report", | |
| "evaluation scores" | |
| ] | |
| # Skip validation for side effect reporting queries in user input | |
| side_effect_input_indicators = [ | |
| "side effect", "adverse reaction", "adverse event", "drug reaction", | |
| "medication reaction", "patient experienced", "developed after taking", | |
| "caused by medication", "drug-related", "medication-related" | |
| ] | |
| user_input_lower = user_input.lower() | |
| response_lower = response.lower() | |
| # Don't validate if user input is about side effect reporting | |
| if any(indicator in user_input_lower for indicator in side_effect_input_indicators): | |
| return False | |
| # Don't validate if response contains skip indicators | |
| if any(indicator in response_lower for indicator in skip_indicators): | |
| return False | |
| # Don't validate very short responses | |
| if len(response.strip()) < 50: | |
| return False | |
| # Validate if response seems to contain medical information | |
| medical_indicators = [ | |
| "treatment", "therapy", "diagnosis", "medication", "drug", "patient", | |
| "clinical", "guideline", "recommendation", "according to", "source:", | |
| "provider:", "page:", "nccn", "asco", "esmo", "nice" | |
| ] | |
| return any(indicator in response_lower for indicator in medical_indicators) | |
| def _perform_automatic_validation(user_input: str, response: str) -> None: | |
| """ | |
| Perform automatic validation in the background without displaying results to user. | |
| Validation results are logged and saved to GitHub repository for backend analysis. | |
| Args: | |
| user_input: The user's input | |
| response: The agent's response | |
| Returns: | |
| None: Validation runs silently in background | |
| """ | |
| try: | |
| # Import here to avoid circular imports | |
| from .tools import _last_question, _last_documents, _last_user_question | |
| # Check if we have the necessary context for validation | |
| if not _last_question or not _last_documents: | |
| logger.info("Skipping validation: insufficient context") | |
| return | |
| # Perform validation using the original user input instead of tool query | |
| evaluation = validate_medical_answer(user_input, _last_documents, response) | |
| # Log validation results to backend only (not shown to user) | |
| report = evaluation.get("validation_report", {}) | |
| logger.info(f"Background validation completed - Interaction ID: {evaluation.get('interaction_id', 'N/A')}") | |
| logger.info(f"Validation scores - Overall: {report.get('Overall_Rating', 'N/A')}/100, " | |
| f"Accuracy: {report.get('Accuracy_Rating', 'N/A')}/100, " | |
| f"Coherence: {report.get('Coherence_Rating', 'N/A')}/100, " | |
| f"Relevance: {report.get('Relevance_Rating', 'N/A')}/100") | |
| # Validation is automatically saved to GitHub by validate_medical_answer function | |
| # No need to return anything - results are stored in backend only | |
| except Exception as e: | |
| logger.error(f"Background validation failed: {e}") | |
| # ============================================================================ | |
| # STREAMING AGENT FUNCTIONS | |
| # ============================================================================ | |
| # @traceable(name="run_agent_streaming") | |
| async def run_agent_streaming(user_input: str, session_id: str = "default", max_retries: int = 3) -> AsyncGenerator[str, None]: | |
| """ | |
| Run the agent with streaming support and comprehensive error handling. | |
| This function processes user input through the agent executor with streaming | |
| capabilities, robust error handling, and automatic retries for recoverable errors. | |
| Args: | |
| user_input (str): The user's input message to process | |
| session_id (str, optional): Session identifier for conversation memory. Defaults to "default". | |
| max_retries (int, optional): Maximum number of retries for recoverable errors. | |
| Defaults to 3. | |
| Yields: | |
| str: Chunks of the agent's response as they are generated | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| # Input validation | |
| if not user_input or not user_input.strip(): | |
| logger.warning("Empty input received") | |
| yield "Sorry, I didn't receive any questions. Please enter your question or request." | |
| return | |
| # Store the original user question for validation | |
| from .tools import store_user_question | |
| store_user_question(user_input.strip()) | |
| retry_count = 0 | |
| last_error = None | |
| current_run_id = None | |
| # Session metadata (increment conversation count) | |
| session_metadata = conversation_tracker.get_session_metadata(increment=True) | |
| while retry_count <= max_retries: | |
| try: | |
| # Tracing for streaming disabled to avoid duplicate traces. | |
| # We keep tracing only for the AgentExecutor in run_agent(). | |
| current_run_id = None | |
| # Load conversation history from session-specific memory | |
| memory = _memory_manager.get_memory(session_id) | |
| chat_history = memory.load_memory_variables({})["chat_history"] | |
| logger.info(f"Processing user input (attempt {retry_count + 1}): {user_input[:50]}...") | |
| # Create streaming callback handler | |
| streaming_handler = StreamingCallbackHandler() | |
| # Run the agent in a separate thread to avoid blocking | |
| def run_sync(): | |
| return get_agent_executor().invoke( | |
| { | |
| "input": user_input.strip(), | |
| "chat_history": chat_history, | |
| }, | |
| config={"callbacks": [streaming_handler]}, | |
| ) | |
| # Execute the agent with streaming | |
| full_response = "" | |
| previous_length = 0 | |
| # Start the agent execution in background | |
| loop = asyncio.get_event_loop() | |
| task = loop.run_in_executor(None, run_sync) | |
| # Stream the response as it's being generated | |
| while not task.done(): | |
| current_response = streaming_handler.get_response() | |
| # Yield new tokens if available | |
| if len(current_response) > previous_length: | |
| new_content = current_response[previous_length:] | |
| previous_length = len(current_response) | |
| yield new_content | |
| # Small delay to prevent overwhelming the client (faster flushing) | |
| await asyncio.sleep(0.03) | |
| # Get the final result | |
| response = await task | |
| # Yield any remaining content | |
| final_response = streaming_handler.get_response() | |
| if len(final_response) > previous_length: | |
| yield final_response[previous_length:] | |
| # If no streaming content was captured, yield the full response | |
| if not final_response and response and "output" in response: | |
| full_output = response["output"] | |
| # Simulate streaming by yielding word by word | |
| words = full_output.split(' ') | |
| for word in words: | |
| yield word + ' ' | |
| await asyncio.sleep(0.05) | |
| final_response = full_output | |
| # Validate response structure | |
| if not response or "output" not in response: | |
| raise ValidationError("Invalid response format from agent") | |
| if not response["output"] or not response["output"].strip(): | |
| raise ValidationError("Empty response from agent") | |
| # Perform automatic validation in background (hidden from user) | |
| base_response = response["output"] | |
| if _should_validate_response(user_input, base_response): | |
| logger.info("Performing background validation for streaming response...") | |
| try: | |
| # Run validation silently - results saved to backend/GitHub only | |
| _perform_automatic_validation(user_input, base_response) | |
| except Exception as e: | |
| logger.error(f"Background validation failed: {e}") | |
| # Save conversation context to memory | |
| memory.save_context( | |
| {"input": user_input}, | |
| {"output": response["output"]} | |
| ) | |
| # Log response metrics to LangSmith | |
| try: | |
| log_to_langsmith( | |
| key="response_metrics", | |
| value={ | |
| "response_length": len(response.get("output", "")), | |
| "attempt": retry_count + 1, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| logger.info(f"Successfully processed user input: {user_input[:50]}...") | |
| return | |
| except RateLimitError as e: | |
| retry_count += 1 | |
| last_error = e | |
| wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60 seconds | |
| logger.warning( | |
| f"Rate limit exceeded. Retrying in {wait_time} seconds... " | |
| f"(Attempt {retry_count}/{max_retries})" | |
| ) | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(wait_time) | |
| continue | |
| else: | |
| logger.error("Rate limit exceeded after maximum retries") | |
| yield "Sorry, the system is currently busy. Please try again in a little while." | |
| return | |
| except APIError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"OpenAI API error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| yield "Sorry, there was an error connecting to the service. Please try again later." | |
| return | |
| except requests.exceptions.ConnectionError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Network connection error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(3) | |
| continue | |
| else: | |
| yield "Sorry, I can't connect to the service right now. Please check your internet connection and try again." | |
| return | |
| except requests.exceptions.Timeout as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Request timeout: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| yield "Sorry, the request took longer than expected. Please try again." | |
| return | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request error: {str(e)}") | |
| yield "Sorry, an error occurred with the request. Please try again." | |
| return | |
| except OutputParserException as e: | |
| logger.error(f"Output parsing error: {str(e)}") | |
| yield "Sorry, an error occurred while processing the response. Please rephrase your question and try again." | |
| return | |
| except ValidationError as e: | |
| logger.error(f"Validation error: {str(e)}") | |
| yield "Sorry, an error occurred while validating the data. Please try again." | |
| return | |
| except ToolExecutionError as e: | |
| logger.error(f"Tool execution error: {str(e)}") | |
| yield "Sorry, an error occurred while executing one of the operations. Please try again or contact technical support." | |
| return | |
| except Exception as e: | |
| logger.error(f"Unexpected error in run_agent_streaming: {str(e)}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| # Log error to LangSmith | |
| try: | |
| log_to_langsmith( | |
| key="error_log", | |
| value={ | |
| "error": str(e), | |
| "error_type": type(e).__name__, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| # For unexpected errors, don't retry | |
| yield "Sorry, an unexpected error occurred. Please try again or contact technical support if the problem persists." | |
| return | |
| # This should never be reached, but just in case | |
| logger.error(f"Maximum retries exceeded. Last error: {str(last_error)}") | |
| yield "Sorry, I was unable to process your request after several attempts. Please try again later." | |
| async def safe_run_agent_streaming(user_input: str, session_id: str = "default") -> AsyncGenerator[str, None]: | |
| """ | |
| Streaming wrapper function with additional safety checks and input validation. | |
| This function provides an additional layer of safety by validating input parameters, | |
| checking input length constraints, and handling any critical errors that might | |
| occur during streaming agent execution. | |
| Args: | |
| user_input (str): The user's input message to process | |
| session_id (str, optional): Session identifier for conversation memory. Defaults to "default". | |
| Yields: | |
| str: Chunks of the agent's response as they are generated | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| try: | |
| # Input type validation | |
| if not isinstance(user_input, str): | |
| logger.warning(f"Invalid input type received: {type(user_input)}") | |
| yield "Sorry, the input must be valid text." | |
| return | |
| # Input length validation | |
| stripped_input = user_input.strip() | |
| if len(stripped_input) > 1000: | |
| logger.warning(f"Input too long: {len(stripped_input)} characters") | |
| yield "Sorry, the message is too long. Please shorten your question." | |
| return | |
| if len(stripped_input) == 0: | |
| logger.warning("Empty input after stripping") | |
| yield "Sorry, I didn't receive any questions. Please enter your question or request." | |
| return | |
| # Stream the response through the main agent function | |
| async for chunk in run_agent_streaming(user_input, session_id): | |
| yield chunk | |
| except Exception as e: | |
| logger.critical(f"Critical error in safe_run_agent_streaming: {str(e)}") | |
| logger.critical(f"Traceback: {traceback.format_exc()}") | |
| yield "Sorry, a critical system error occurred. Please contact technical support immediately." | |
| async def run_agent(user_input: str, session_id: str = "default", max_retries: int = 3) -> str: | |
| """ | |
| Run the agent with comprehensive error handling and retry logic. | |
| This function processes user input through the agent executor with robust | |
| error handling, automatic retries for recoverable errors, and comprehensive | |
| logging for debugging and monitoring. | |
| Args: | |
| user_input (str): The user's input message to process | |
| session_id (str, optional): Session identifier for conversation memory. Defaults to "default". | |
| max_retries (int, optional): Maximum number of retries for recoverable errors. | |
| Defaults to 3. | |
| Returns: | |
| str: The agent's response or an appropriate error message in English | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| # Input validation | |
| if not user_input or not user_input.strip(): | |
| logger.warning("Empty input received") | |
| return "Sorry, I didn't receive any questions. Please enter your question or request." | |
| retry_count = 0 | |
| last_error = None | |
| current_run_id = None | |
| session_metadata = conversation_tracker.get_session_metadata(increment=True) | |
| while retry_count <= max_retries: | |
| try: | |
| # Load conversation history from session-specific memory | |
| memory = _memory_manager.get_memory(session_id) | |
| chat_history = memory.load_memory_variables({})["chat_history"] | |
| logger.info(f"Processing user input (attempt {retry_count + 1}): {user_input[:50]}...") | |
| # Invoke the agent with input and history (synchronous call) | |
| response = get_agent_executor().invoke({ | |
| "input": user_input.strip(), | |
| "chat_history": chat_history | |
| }) | |
| current_run_id = None # This will be handled by LangChain's tracer | |
| # Validate response structure | |
| if not response or "output" not in response or not isinstance(response["output"], str): | |
| raise ValidationError("Invalid response format from agent") | |
| if not response["output"] or not response["output"].strip(): | |
| raise ValidationError("Empty response from agent") | |
| # Save conversation context to memory | |
| memory.save_context( | |
| {"input": user_input}, | |
| {"output": response["output"]} | |
| ) | |
| # Log response metrics | |
| try: | |
| log_to_langsmith( | |
| key="response_metrics", | |
| value={ | |
| "response_length": len(response.get("output", "")), | |
| "attempt": retry_count + 1, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| logger.info(f"Successfully processed user input: {user_input[:50]}...") | |
| # Perform automatic validation in background (hidden from user) | |
| final_response = response["output"] | |
| if _should_validate_response(user_input, final_response): | |
| logger.info("Performing background validation...") | |
| try: | |
| # Run validation silently - results saved to backend/GitHub only | |
| _perform_automatic_validation(user_input, final_response) | |
| except Exception as e: | |
| logger.error(f"Background validation failed: {e}") | |
| return final_response | |
| except RateLimitError as e: | |
| retry_count += 1 | |
| last_error = e | |
| wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60 seconds | |
| logger.warning( | |
| f"Rate limit exceeded. Retrying in {wait_time} seconds... " | |
| f"(Attempt {retry_count}/{max_retries})" | |
| ) | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(wait_time) | |
| continue | |
| else: | |
| logger.error("Rate limit exceeded after maximum retries") | |
| return "Sorry, the system is currently busy. Please try again in a little while." | |
| except APIError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"OpenAI API error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| return "Sorry, there was an error connecting to the service. Please try again later." | |
| except requests.exceptions.ConnectionError as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Network connection error: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(3) | |
| continue | |
| else: | |
| return "Sorry, I can't connect to the service right now. Please check your internet connection and try again." | |
| except requests.exceptions.Timeout as e: | |
| retry_count += 1 | |
| last_error = e | |
| logger.error(f"Request timeout: {str(e)}") | |
| if retry_count <= max_retries: | |
| await asyncio.sleep(2) | |
| continue | |
| else: | |
| return "Sorry, the request took longer than expected. Please try again." | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request error: {str(e)}") | |
| return "Sorry, an error occurred with the request. Please try again." | |
| except OutputParserException as e: | |
| logger.error(f"Output parsing error: {str(e)}") | |
| return "Sorry, an error occurred while processing the response. Please rephrase your question and try again." | |
| except ValidationError as e: | |
| logger.error(f"Validation error: {str(e)}") | |
| return "Sorry, an error occurred while validating the data. Please try again." | |
| except ToolExecutionError as e: | |
| logger.error(f"Tool execution error: {str(e)}") | |
| return "Sorry, an error occurred while executing one of the operations. Please try again or contact technical support." | |
| except Exception as e: | |
| logger.error(f"Unexpected error in run_agent: {str(e)}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| # Log error | |
| try: | |
| log_to_langsmith( | |
| key="error_log", | |
| value={ | |
| "error": str(e), | |
| "error_type": type(e).__name__, | |
| **session_metadata, | |
| }, | |
| run_id=current_run_id, | |
| ) | |
| except Exception: | |
| pass | |
| # For unexpected errors, don't retry | |
| return "Sorry, an unexpected error occurred. Please try again or contact technical support if the problem persists." | |
| # This should never be reached, but just in case | |
| logger.error(f"Maximum retries exceeded. Last error: {str(last_error)}") | |
| return "Sorry, I was unable to process your request after several attempts. Please try again later." | |
| async def safe_run_agent(user_input: str, session_id: str = "default") -> str: | |
| """ | |
| Wrapper function for run_agent with additional safety checks and input validation. | |
| This function provides an additional layer of safety by validating input parameters, | |
| checking input length constraints, and handling any critical errors that might | |
| occur during agent execution. | |
| Args: | |
| user_input (str): The user's input message to process | |
| session_id (str, optional): Session identifier for conversation memory. Defaults to "default". | |
| Returns: | |
| str: The agent's response or an appropriate error message in English | |
| Raises: | |
| None: All exceptions are caught and handled internally | |
| """ | |
| try: | |
| # Input type validation | |
| if not isinstance(user_input, str): | |
| logger.warning(f"Invalid input type received: {type(user_input)}") | |
| return "Sorry, the input must be valid text." | |
| # Input length validation | |
| stripped_input = user_input.strip() | |
| # if len(stripped_input) > 1000: | |
| # logger.warning(f"Input too long: {len(stripped_input)} characters") | |
| # return "Sorry, the message is too long. Please shorten your question." | |
| if len(stripped_input) == 0: | |
| logger.warning("Empty input after stripping") | |
| return "Sorry, I didn't receive any questions. Please enter your question or request." | |
| # Process the input through the main agent function | |
| return await run_agent(user_input, session_id) | |
| except Exception as e: | |
| logger.critical(f"Critical error in safe_run_agent: {str(e)}") | |
| logger.critical(f"Traceback: {traceback.format_exc()}") | |
| return "Sorry, a critical system error occurred. Please contact technical support immediately." | |
| def clear_memory() -> None: | |
| """ | |
| Clear the conversation memory. | |
| This function clears all stored conversation history from memory, | |
| effectively starting a fresh conversation session. | |
| """ | |
| try: | |
| _memory_manager.clear_all_sessions() | |
| logger.info("Conversation memory cleared successfully") | |
| except Exception as e: | |
| logger.error(f"Error clearing memory: {str(e)}") | |
| def get_memory_summary(session_id: str = "default") -> str: | |
| """ | |
| Get a summary of the conversation history for a specific session. | |
| Args: | |
| session_id (str, optional): Session identifier. Defaults to "default". | |
| Returns: | |
| str: A summary of the conversation history stored in memory | |
| """ | |
| try: | |
| memory = _memory_manager.get_memory(session_id) | |
| memory_vars = memory.load_memory_variables({}) | |
| return str(memory_vars.get("chat_history", "No conversation history available")) | |
| except Exception as e: | |
| logger.error(f"Error getting memory summary: {str(e)}") | |
| return "Error retrieving conversation history" | |
| def clear_session_memory(session_id: str) -> bool: | |
| """ | |
| Clear conversation memory for a specific session. | |
| Args: | |
| session_id (str): Session identifier to clear | |
| Returns: | |
| bool: True if session was cleared, False if session didn't exist | |
| """ | |
| return _memory_manager.clear_session(session_id) | |
| def get_active_sessions() -> list: | |
| """ | |
| Get list of all active session IDs. | |
| Returns: | |
| list: List of active session identifiers | |
| """ | |
| return _memory_manager.get_active_sessions() | |