Spaces:
Sleeping
Sleeping
| from src.settings import Settings | |
| from smolagents import LiteLLMModel, ToolCallingAgent, CodeAgent, InferenceClientModel, DuckDuckGoSearchTool, Tool | |
| from tools.FinalAnswerTool import FinalAnswerTool | |
| from tools.ReadAudioTool import ReadAudioTool | |
| from tools.ReadImageTool import ReadImageTool | |
| from tools.ReadTextTool import ReadTextTool | |
| from tools.ReadVideoTool import ReadVideoTool | |
| from tools.WebSearchTool import DuckDuckGoSearchTool | |
| from tools.YouTubeTool import YouTubeTool | |
| from tools.PythonRunnerTool import PythonRunnerTool | |
| from tools.PythonCalcTool import PythonCalcTool | |
| from tools.SemanticScholar import AcademicPaperSearchTool | |
| from src.utils import InputTokenRateLimiter | |
| import wikipedia as wiki | |
| from markdownify import markdownify as to_markdown | |
| import time | |
| import random | |
| settings = Settings() | |
| import litellm | |
| from litellm import completion | |
| #litellm._turn_on_debug() | |
| class WikiTitleFinder(Tool): | |
| name = "wiki_titles" | |
| description = "Search for related Wikipedia page titles." | |
| inputs = {"query": {"type": "string", "description": "Search query."}} | |
| output_type = "string" | |
| def forward(self, query: str) -> str: | |
| results = wiki.search(query) | |
| return ", ".join(results) if results else "No results." | |
| class WikiContentFetcher(Tool): | |
| name = "wiki_page" | |
| description = "Fetch Wikipedia page content." | |
| inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}} | |
| output_type = "string" | |
| def forward(self, page_title: str) -> str: | |
| try: | |
| return to_markdown(wiki.page(page_title).html()) | |
| except wiki.exceptions.PageError: | |
| return f"'{page_title}' not found." | |
| class MathSolver(Tool): | |
| name = "math_solver" | |
| description = "Safely evaluate basic math expressions." | |
| inputs = {"input": {"type": "string", "description": "Math expression to evaluate."}} | |
| output_type = "string" | |
| def forward(self, input: str) -> str: | |
| try: | |
| return str(eval(input, {"__builtins__": {}})) | |
| except Exception as e: | |
| return f"Math error: {e}" | |
| class BasicAgent(): | |
| def __init__(self): | |
| self.model = LiteLLMModel( | |
| model_id=settings.llm_model_id, | |
| api_key=settings.llm_api_key | |
| ) | |
| self.agent = CodeAgent( | |
| model=self.model, | |
| tools=[ | |
| FinalAnswerTool(), | |
| ReadAudioTool(), | |
| ReadImageTool(), | |
| ReadTextTool(), | |
| ReadVideoTool(), | |
| DuckDuckGoSearchTool(), | |
| WikiTitleFinder(), | |
| WikiContentFetcher(), | |
| YouTubeTool(), | |
| PythonRunnerTool(), | |
| PythonCalcTool(), | |
| AcademicPaperSearchTool(), | |
| MathSolver() | |
| ], | |
| max_steps=5, | |
| planning_interval=3, | |
| ) | |
| self.token_rate_limiter = InputTokenRateLimiter() | |
| self.expected_tokens_per_step = 10000 | |
| self.max_retries = 3 | |
| self.base_delay = 5 | |
| self.token_rate_limiter = InputTokenRateLimiter() | |
| self.expected_tokens_per_step = 10000 | |
| self.max_retries = 3 | |
| self.base_delay = 5 | |
| def run(self, question: str, file_content: str = "", file_path: str = ""): | |
| final_answer = None | |
| retry_count = 0 | |
| question = f"Question: {question}" | |
| if file_content: | |
| context = f"Story content:\n{file_content}" | |
| elif file_path: | |
| context = f"File path: {file_path}" | |
| else: | |
| context = "" | |
| print(f"Starting Agent with question text: {question}, {context}") | |
| while True: | |
| try: | |
| final_input = f"{question}\n\n{context}" | |
| # Run the agent | |
| steps = self.agent.run(final_input) | |
| # If steps is a string, convert it to a single-item list | |
| if isinstance(steps, str): | |
| steps = [steps] | |
| for step in steps: | |
| # Handle string steps | |
| if isinstance(step, str): | |
| final_answer = step | |
| print(f"Step: String Output: {final_answer}") | |
| continue | |
| # Handle object steps | |
| step_name = step.__class__.__name__ | |
| output = getattr(step, "output", None) | |
| if output: | |
| print(f"Step: {step_name} Output: {output}") | |
| self.token_rate_limiter.maybe_wait(self.expected_tokens_per_step) | |
| tokens_used = getattr(step, "token_usage", None) | |
| if tokens_used: | |
| self.token_rate_limiter.add_tokens(tokens_used.input_tokens) | |
| # Capture the final answer from the final answer step | |
| if step_name == "FinalAnswerStep": | |
| final_answer = output | |
| print(f"Captured Final Answer from step: {final_answer}") | |
| break # Exit retry loop if successful | |
| except Exception as e: | |
| # Handle API overload/rate limits | |
| if "overload" in str(e).lower() or "rate limit" in str(e).lower(): | |
| print("Rate limit exceeded. Retrying...") | |
| if retry_count >= self.max_retries: | |
| print("Max retries reached. Exiting...") | |
| break | |
| delay = self.base_delay * (2 ** retry_count) + random.random() | |
| print(f"Retrying in {delay:.1f}s ... ({e})") | |
| time.sleep(delay) | |
| retry_count += 1 | |
| else: | |
| print(f"Error: {e}") | |
| break | |
| print(f"\nFinished agent run.\n{'='*60}") | |
| print(f"Final Answer: {final_answer}\n") | |
| return final_answer | |
| def __call__(self, question: str, file_content: str = "", file_path: str = ""): | |
| return self.run(question, file_content) |