Spaces:
Sleeping
Sleeping
| from smolagents import CodeAgent, ToolCallingAgent, LiteLLMModel, tool, Tool, load_tool, WebSearchTool, DuckDuckGoSearchTool #, WikipediaSearchTool | |
| import asyncio | |
| import os | |
| import re | |
| import pandas as pd | |
| from typing import Optional | |
| from token_bucket import Limiter, MemoryStorage | |
| import yaml | |
| from PIL import Image, ImageOps | |
| import requests | |
| from io import BytesIO | |
| from markdownify import markdownify | |
| import whisper | |
| import time | |
| import shutil | |
| import traceback | |
| from langchain_community.document_loaders import ArxivLoader | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def search_arxiv(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query. | |
| Returns: | |
| str: Formatted search results | |
| """ | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"arxiv_results": formatted_search_docs} | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception | |
| import requests | |
| def is_429_error(exception): | |
| return isinstance(exception, requests.exceptions.HTTPError) and exception.response.status_code == 429 | |
| class VisitWebpageTool(Tool): | |
| name = "visit_webpage" | |
| description = "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." | |
| inputs = {'url': {'type': 'string', 'description': 'The url of the webpage to visit.'}} | |
| output_type = "string" | |
| def forward(self, url: str) -> str: | |
| try: | |
| response = requests.get(url, timeout=50) | |
| response.raise_for_status() | |
| markdown_content = markdownify(response.text).strip() | |
| markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) | |
| #from smolagents.utils import truncate_content | |
| #return truncate_content(markdown_content, 10000) | |
| return markdown_content | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 429: | |
| raise # Retry on 429 | |
| return f"Error fetching the webpage: {str(e)}" | |
| except requests.exceptions.Timeout: | |
| return "The request timed out. Please try again later or check the URL." | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching the webpage: {str(e)}" | |
| except Exception as e: | |
| return f"An unexpected error occurred: {str(e)}" | |
| def __init__(self, *args, **kwargs): | |
| self.is_initialized = False | |
| class SpeechToTextTool(Tool): | |
| name = "speech_to_text" | |
| description = ( | |
| "Converts an audio file to text using OpenAI Whisper." | |
| ) | |
| inputs = { | |
| "audio_path": {"type": "string", "description": "Path to audio file (.mp3, .wav)"}, | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.model = whisper.load_model("base") | |
| def forward(self, audio_path: str) -> str: | |
| if not os.path.exists(audio_path): | |
| return f"Error: File not found at {audio_path}" | |
| try: | |
| print(f"Starting transcription for {audio_path}...") | |
| result = self.model.transcribe(audio_path) | |
| print(f"Transcription completed for {audio_path}.") | |
| return result.get("text", "") | |
| except Exception as e: | |
| return f"Error processing audio file: {str(e)}" | |
| class ExcelReaderTool(Tool): | |
| name = "excel_reader" | |
| description = """ | |
| This tool reads and processes Excel files (.xlsx, .xls). | |
| It can extract data, calculate statistics, and perform data analysis on spreadsheets. | |
| """ | |
| inputs = { | |
| "excel_path": { | |
| "type": "string", | |
| "description": "The path to the Excel file to read", | |
| }, | |
| "sheet_name": { | |
| "type": "string", | |
| "description": "The name of the sheet to read (optional, defaults to first sheet)", | |
| "nullable": True | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, excel_path: str, sheet_name: str = None) -> str: | |
| try: | |
| if not os.path.exists(excel_path): | |
| return f"Error: Excel file not found at {excel_path}" | |
| import pandas as pd | |
| if sheet_name: | |
| df = pd.read_excel(excel_path, sheet_name=sheet_name) | |
| else: | |
| df = pd.read_excel(excel_path) | |
| info = { | |
| "shape": df.shape, | |
| "columns": list(df.columns), | |
| "dtypes": df.dtypes.to_dict(), | |
| "head": df.head(5).to_dict() | |
| } | |
| result = f"Excel file: {excel_path}\n" | |
| result += f"Shape: {info['shape'][0]} rows × {info['shape'][1]} columns\n\n" | |
| result += "Columns:\n" | |
| for col in info['columns']: | |
| result += f"- {col} ({info['dtypes'].get(col)})\n" | |
| result += "\nPreview (first 5 rows):\n" | |
| result += df.head(5).to_string() | |
| return result | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| class PythonCodeReaderTool(Tool): | |
| name = "read_python_code" | |
| description = "Reads a Python (.py) file and returns its content as a string." | |
| inputs = { | |
| "file_path": {"type": "string", "description": "The path to the Python file to read"} | |
| } | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| try: | |
| if not os.path.exists(file_path): | |
| return f"Error: Python file not found at {file_path}" | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| content = file.read() | |
| return content | |
| except Exception as e: | |
| return f"Error reading Python file: {str(e)}" | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| class RetryDuckDuckGoSearchTool(DuckDuckGoSearchTool): | |
| def forward(self, query: str) -> str: | |
| return super().forward(query) | |
| class MagAgent: | |
| def __init__(self, rate_limiter: Optional[Limiter] = None): | |
| """Initialize the MagAgent with search tools.""" | |
| logger.info("Initializing MagAgent") | |
| self.rate_limiter = rate_limiter | |
| print("Initializing MagAgent with search tools...") | |
| model = LiteLLMModel( | |
| model_id="gemini/gemini-2.0-flash", | |
| api_key=os.environ.get("GEMINI_KEY"), | |
| max_tokens=8192 | |
| ) | |
| self.imports = [ | |
| "pandas", | |
| "numpy", | |
| "os", | |
| "requests", | |
| "tempfile", | |
| "datetime", | |
| "json", | |
| "time", | |
| "re", | |
| "openpyxl", | |
| "pathlib", | |
| "sys", | |
| "bs4", | |
| "arxiv", | |
| "whisper", | |
| ] | |
| self.tools = [ | |
| # RetryDuckDuckGoSearchTool(), | |
| # WikipediaSearchTool(), | |
| SpeechToTextTool(), | |
| ExcelReaderTool(), | |
| # VisitWebpageTool(), | |
| PythonCodeReaderTool(), | |
| # search_arxiv, | |
| ] | |
| self.prompt_template = ( | |
| """ | |
| You are an advanced AI assistant specialized in solving complex, real-world tasks, requiring multi-step reasoning, factual accuracy, and use of external tools. | |
| Follow these principles: | |
| - Reason step-by-step. Think through the solution logically and plan your actions carefully before answering. | |
| - Validate information. Always verify facts when possible instead of guessing. | |
| - When processing external data (e.g., YouTube transcripts, web searches), expect potential issues like missing punctuation, inconsistent formatting, or conversational text. | |
| - When asked to transcript YouTube video, try searching it in www.youtubetotranscript.com. | |
| - If the input is ambiguous, prioritize extracting key information relevant to the question. | |
| - Use code if needed. For calculations, parsing, or transformations, generate Python code and execute it. Be cautious, as some questions contain time-consuming tasks, so analyze the question and choose the most efficient solution. | |
| - Be precise and concise. The final answer must strictly match the required format with no extra commentary. | |
| - Use tools intelligently. If a question involves external information, structured data, images, or audio, call the appropriate tool to retrieve or process it. | |
| - If the question includes direct speech or quoted text (e.g., "Isn't that hot?"), treat it as a precise query and preserve the quoted structure in your response, including quotation marks for direct quotes (e.g., final_answer('"Extremely."')). | |
| - If asked about the name of a place or city, use the full complete name without abbreviations (e.g., use Saint Petersburg instead of St.Petersburg). | |
| - If asked to look up page numbers, make sure you don't mix them with problem or excercise numbers. | |
| - If you cannot retrieve or process data (e.g., due to blocked requests), retry after 15 seconds delay, try another tool (try wikipedia_search, then web_search, then search_arxiv). Otherwise, return a clear error message: "Unable to retrieve data. Search has failed." | |
| - Use `final_answer` to give the final answer. | |
| QUESTION: {question} | |
| {file_section} | |
| ANSWER: | |
| """ | |
| ) | |
| web_agent = ToolCallingAgent( | |
| tools=[ | |
| # RetryDuckDuckGoSearchTool(), | |
| # WikipediaSearchTool(), | |
| # SpeechToTextTool(), | |
| WebSearchTool(), | |
| VisitWebpageTool(), | |
| # ExcelReaderTool(), | |
| # PythonCodeReaderTool(), | |
| search_arxiv, | |
| ], | |
| model=model, | |
| max_steps=15, | |
| name="web_search_agent", | |
| description="Runs web searches for you.", | |
| ) | |
| self.agent = CodeAgent( | |
| model=model, | |
| managed_agents=[web_agent], | |
| tools=self.tools, | |
| add_base_tools=True, | |
| additional_authorized_imports=self.imports, | |
| verbosity_level=2, | |
| max_steps=10 | |
| ) | |
| print("MagAgent initialized.") | |
| async def __call__(self, question: str, file_path: Optional[str] = None) -> str: | |
| """Process a question asynchronously using the MagAgent.""" | |
| print(f"MagAgent received question (first 50 chars): {question[:50]}... File path: {file_path}") | |
| try: | |
| if self.rate_limiter: | |
| while not self.rate_limiter.consume(1): | |
| print(f"Rate limit reached. Waiting...") | |
| await asyncio.sleep(4) | |
| # Conditionally include FILE: section only if file_path is provided | |
| file_section = f"FILE: {file_path}" if file_path else "" | |
| task = self.prompt_template.format( | |
| question=question, | |
| file_section=file_section | |
| ) | |
| print(f"Calling agent.run...") | |
| response = await asyncio.to_thread(self.agent.run, task=task) | |
| print(f"Agent.run completed.") | |
| response = str(response) | |
| if not response: | |
| print(f"No answer found.") | |
| response = "No answer found." | |
| print(f"MagAgent response: {response[:50]}...") | |
| return response | |
| except Exception as e: | |
| error_msg = f"Error processing question: {str(e)}. Check API key or network connectivity." | |
| print(error_msg) | |
| return error_msg |