Spaces:
Sleeping
Sleeping
| """ | |
| This module provides a robust wrapper for executing LangChain agents. | |
| The primary class, BasicAgent, enhances LangChain's AgentExecutor with a | |
| resilient retry mechanism. It is designed to handle intermittent API issues, | |
| specifically retriable Google GenAI API errors (like 429 quota error), | |
| by implementing a retry loop with exponential backoff. Before returning a | |
| final answer, it also processes the output through a validation registry. | |
| """ | |
| import time | |
| from typing import Any, List, Optional | |
| from langchain_classic.agents import Agent, AgentExecutor | |
| from langchain_core.tools import BaseTool | |
| from google import genai | |
| from validators import validator_registry | |
| class BasicAgent: | |
| def __init__( | |
| self, | |
| agent: Agent, | |
| tools: List[BaseTool], | |
| verbose: bool = False, | |
| handle_parsing_errors: bool = True, | |
| max_iterations: int = 9 | |
| ) -> None: | |
| """ | |
| Initialize with parameters required for AgentExecutor. | |
| """ | |
| self.agent: Agent = agent | |
| self.tools: List[BaseTool] = tools | |
| self.verbose: bool = verbose | |
| self.handle_parsing_errors: bool = handle_parsing_errors | |
| self.max_iterations: int = max_iterations | |
| self.agent_obj = AgentExecutor( | |
| agent=self.agent, | |
| tools=self.tools, | |
| verbose=self.verbose, | |
| handle_parsing_errors=self.handle_parsing_errors, | |
| max_iterations=self.max_iterations | |
| ) | |
| def is_retriable(self, e: Exception) -> bool: | |
| # Adjust this check if your error type is different | |
| return isinstance(e, genai.errors.APIError) and getattr(e, "code", None) in {429, 503} | |
| def invoke_with_retry(self,question: str, max_retries: int = 5, initial_delay: float = 10.0) -> str: | |
| current_delay = initial_delay | |
| for attempt in range(max_retries): | |
| try: | |
| result = self.agent_obj.invoke( | |
| {"input": question}, | |
| config={"configurable": {"session_id": "test-session"}}, | |
| ) | |
| # INVOCATION POINT for the validator registry | |
| validated_output = validator_registry.process( | |
| task_description=question, | |
| answer=result['output'] | |
| ) | |
| return validated_output | |
| except Exception as e: | |
| if self.is_retriable(e): | |
| # Check if the error object provides a specific retry_delay | |
| if hasattr(e, 'retry_delay') and hasattr(e.retry_delay, 'seconds'): | |
| # Use the specific retry_delay provided by the API | |
| current_delay = float(e.retry_delay.seconds) | |
| print(f"Quota error (attempt {attempt+1}/{max_retries}). API suggested retry after {current_delay} seconds.", flush=True) | |
| else: | |
| # Fallback to exponential backoff if no specific delay is provided | |
| print(f"Quota error (attempt {attempt+1}/{max_retries}). Retrying in {current_delay} seconds with exponential backoff.", flush=True) | |
| current_delay *= 2 # Exponential backoff | |
| time.sleep(current_delay) | |
| else: | |
| # If it's not a retriable error, re-raise it | |
| raise | |
| # If all retries fail, raise a RuntimeError | |
| raise RuntimeError(f"Max retries ({max_retries}) exceeded due to persistent quota errors or other retriable issues.") | |
| def __call__(self, question: str) -> str: | |
| """ | |
| Allows the instance to be called directly to get an AgentExecutor. | |
| """ | |
| return self.invoke_with_retry(question) |