from src.settings import Settings from smolagents import LiteLLMModel, ToolCallingAgent, CodeAgent, InferenceClientModel, Tool from tools.FinalAnswerTool import FinalAnswerTool from tools.ReadAudioTool import ReadAudioTool from tools.ReadImageTool import ReadImageTool from tools.ReadTextTool import ReadTextTool from tools.ReadExcelTool import ReadExcelTool from tools.ReadVideoTool import ReadVideoTool from tools.WebSearchTool import TavilySearchTool from tools.WikipediaTool import LocalWikipediaTool from tools.YouTubeTool import YouTubeSearchTool from tools.PythonRunnerTool import PythonRunnerTool from tools.PythonCalcTool import PythonCalcTool from tools.SemanticScholar import TavilyResearchTool from src.utils import InputTokenRateLimiter import wikipedia as wiki from markdownify import markdownify as to_markdown import time import random settings = Settings() import litellm from litellm import completion #litellm._turn_on_debug() class WikiTitleFinder(Tool): name = "wiki_titles" description = "Search for related Wikipedia page titles." inputs = {"query": {"type": "string", "description": "Search query."}} output_type = "string" def forward(self, query: str) -> str: results = wiki.search(query) return ", ".join(results) if results else "No results." class WikiContentFetcher(Tool): name = "wiki_page" description = "Fetch Wikipedia page content." inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}} output_type = "string" def forward(self, page_title: str) -> str: try: return to_markdown(wiki.page(page_title).html()) except wiki.exceptions.PageError: return f"'{page_title}' not found." class MathSolver(Tool): name = "math_solver" description = "Safely evaluate basic math expressions." inputs = {"input": {"type": "string", "description": "Math expression to evaluate."}} output_type = "string" def forward(self, input: str) -> str: try: return str(eval(input, {"__builtins__": {}})) except Exception as e: return f"Math error: {e}" class BasicAgent(): def __init__(self): self.model = LiteLLMModel( model_id=settings.llm_model_id, api_key=settings.llm_api_key ) self.agent = CodeAgent( model=self.model, tools=[ TavilySearchTool(), TavilyResearchTool(), FinalAnswerTool(), PythonCalcTool(), ReadAudioTool(), ReadImageTool(), ReadExcelTool(), ReadTextTool(), YouTubeSearchTool(), PythonRunnerTool(), MathSolver(), LocalWikipediaTool(), WikiTitleFinder(), WikiContentFetcher(), ], max_steps=10, planning_interval=5, ) self.token_rate_limiter = InputTokenRateLimiter() self.expected_tokens_per_step = 10000 self.max_retries = 3 self.base_delay = 5 self.token_rate_limiter = InputTokenRateLimiter() self.expected_tokens_per_step = 10000 self.max_retries = 3 self.base_delay = 5 def _read_file(self, file_path: str) -> str: if not os.path.exists(file_path): print(f"File not found: {file_path}") return "" if file_path.endswith(".txt"): with open(file_path, "r", encoding="utf-8") as f: return f.read() elif file_path.endswith(".csv"): data = [] try: # Use csv.DictReader to read the file into a list of dictionaries with open(file_path, mode='r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: data.append(row) # Return the structured data as a JSON string return json.dumps(data, indent=2) except Exception as e: print(f"Error reading CSV file: {e}") return "" elif file_path.endswith(".docx"): doc = docx.Document(file_path) return "\n".join([p.text for p in doc.paragraphs]) else: # For unsupported formats, return empty string print(f"Unsupported file type: {file_path}") return "" def run(self, question: str, file_content: str = "", file_path: str = "") -> str: final_answer = None retry_count = 0 # If file content is empty but file_path exists, read the file if not file_content and file_path: file_content = self._read_file(file_path) context = "" if file_content: context = f"Story content:\n{file_content}" elif file_path: context = f"File path: {file_path}" print(f"Starting Agent with question: {question}\nContext length: {len(context)} chars") while True: try: final_input = f"Question: {question}\n\n{context}" steps = self.agent.run(final_input) # Convert string steps to list if isinstance(steps, str): steps = [steps] for step in steps: if isinstance(step, str): final_answer = step continue step_name = step.__class__.__name__ output = getattr(step, "output", None) if output: final_answer = output self.token_rate_limiter.maybe_wait(self.expected_tokens_per_step) tokens_used = getattr(step, "token_usage", None) if tokens_used: self.token_rate_limiter.add_tokens(tokens_used.input_tokens) break # Exit retry loop if successful except Exception as e: if "overload" in str(e).lower() or "rate limit" in str(e).lower(): if retry_count >= self.max_retries: print("Max retries reached. Exiting...") break delay = self.base_delay * (2 ** retry_count) + random.random() print(f"Retrying in {delay:.1f}s due to rate limit... ({e})") time.sleep(delay) retry_count += 1 else: print(f"Error during agent run: {e}") break # Ensure a valid answer is always returned if not final_answer: final_answer = "I am unable to answer" print(f"Finished agent run. Final Answer: {final_answer}\n{'='*50}") return final_answer def __call__(self, question: str, file_content: str = "", file_path: str = "") -> str: return self.run(question, file_content, file_path)