| import pytz
|
| import datetime
|
| from ddgs import DDGS
|
| from bs4 import BeautifulSoup
|
| import requests
|
| import re
|
| import io
|
| import os
|
| from google import genai
|
| from google.genai import types
|
| import config
|
|
|
| from langchain_community.document_loaders import WikipediaLoader
|
| from langchain_community.document_loaders import ArxivLoader
|
| from youtube_transcript_api import YouTubeTranscriptApi
|
| from pytube import extract
|
| from langchain_core.tools import tool
|
| from langfuse_tracking import track_tool_call
|
|
|
| import pandas as pd
|
| import speech_recognition as sr
|
| from pydub import AudioSegment
|
| from pypdf import PdfReader
|
| from io import BytesIO
|
| from markdownify import markdownify as md
|
|
|
|
|
|
|
|
|
|
|
| def _sanitize_file_path(file_name: str) -> tuple:
|
| """
|
| Sanitize file name to prevent path traversal attacks.
|
|
|
| Args:
|
| file_name: The file name to sanitize
|
|
|
| Returns:
|
| tuple: (is_valid: bool, sanitized_name_or_error: str)
|
| """
|
|
|
| if '..' in file_name or file_name.startswith('/') or file_name.startswith('\\'):
|
| return False, "Invalid file name: path traversal not allowed"
|
|
|
|
|
| if os.path.isabs(file_name):
|
| return False, "Invalid file name: absolute paths not allowed"
|
|
|
|
|
| normalized = os.path.normpath(file_name)
|
| if normalized.startswith('..') or os.path.isabs(normalized):
|
| return False, "Invalid file name: path traversal detected"
|
|
|
| return True, normalized
|
|
|
| def _get_file_content(file_name: str, mode: str = 'binary'):
|
| """
|
| Helper function to get file content from local filesystem or remote URL.
|
|
|
| Args:
|
| file_name: The file name (without 'files/' prefix)
|
| mode: 'binary' for bytes, 'text' for string
|
|
|
| Returns:
|
| tuple: (success: bool, data: bytes/str or error_message: str)
|
| """
|
|
|
| is_valid, result = _sanitize_file_path(file_name)
|
| if not is_valid:
|
| return False, result
|
|
|
| file_name = result
|
| file_path = f"files/{file_name}"
|
|
|
|
|
| if os.path.exists(file_path):
|
| try:
|
| if mode == 'binary':
|
| with open(file_path, 'rb') as f:
|
| return True, f.read()
|
| else:
|
| with open(file_path, 'r') as f:
|
| return True, f.read()
|
| except Exception as e:
|
| return False, f"Error reading local file: {e}"
|
|
|
|
|
| else:
|
| try:
|
| base_url = os.getenv("SPACE_HOST", "agents-course-unit4-scoring.hf.space")
|
| if not base_url.startswith("http"):
|
| file_url = f"https://{base_url}/files/{file_name}"
|
| else:
|
| file_url = f"{base_url}/files/{file_name}"
|
|
|
| print(f"Fetching file from URL: {file_url}")
|
| response = requests.get(file_url, timeout=30)
|
| response.raise_for_status()
|
|
|
| if mode == 'binary':
|
| return True, response.content
|
| else:
|
| return True, response.text
|
| except Exception as e:
|
| return False, f"Error fetching remote file: {e}"
|
|
|
| def _get_mime_type(file_name: str) -> str:
|
| """Helper function to determine MIME type from file extension."""
|
| ext = file_name.lower().split('.')[-1]
|
| mime_types = {
|
| 'png': 'image/png',
|
| 'jpg': 'image/jpeg',
|
| 'jpeg': 'image/jpeg',
|
| 'gif': 'image/gif',
|
| 'webp': 'image/webp',
|
| 'bmp': 'image/bmp'
|
| }
|
| return mime_types.get(ext, 'image/png')
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| @track_tool_call("add")
|
| def add(a: float, b: float) -> str:
|
| """Add two numbers.
|
|
|
| Args:
|
| a: first int
|
| b: second int
|
| """
|
| return str(a + b)
|
|
|
| @tool
|
| @track_tool_call("subtract")
|
| def subtract(a: float, b: float) -> str:
|
| """Subtract b from a.
|
|
|
| Args:
|
| a: first int
|
| b: second int
|
| """
|
| return str(a - b)
|
|
|
| @tool
|
| @track_tool_call("multiply")
|
| def multiply(a: float, b: float) -> str:
|
| """Multiply two numbers.
|
|
|
| Args:
|
| a: first int
|
| b: second int
|
| """
|
| return str(a * b)
|
|
|
| @tool
|
| @track_tool_call("divide")
|
| def divide(a: float, b: float) -> str:
|
| """Divide a by b.
|
|
|
| Args:
|
| a: first int
|
| b: second int
|
| """
|
| if b == 0:
|
| return "Cannot divide by zero"
|
| return str(a / b)
|
|
|
| @tool
|
| @track_tool_call("power")
|
| def power(a: float, b: float) -> str:
|
| """Raise a to the power of b.
|
|
|
| Args:
|
| a: first int
|
| b: second int
|
| """
|
| return str(a ** b)
|
|
|
| @tool
|
| @track_tool_call("modulus")
|
| def modulus(a: int, b: int) -> int:
|
| """Get the modulus of two numbers.
|
|
|
| Args:
|
| a: first int
|
| b: second int
|
| """
|
| return a % b
|
|
|
| @tool
|
| @track_tool_call("string_reverse")
|
| def string_reverse(input_string: str) -> str:
|
| """
|
| Reverses the input string. Useful whenever a string seems to be non-sensical or
|
| contains a lot of gibberish. This function can be used to reverse the string
|
| and check if it makes more sense when reversed.
|
|
|
| Args:
|
| input_string (str): The string to reverse.
|
|
|
| Returns:
|
| str: The reversed string.
|
| """
|
| return input_string[::-1]
|
|
|
|
|
| @tool
|
| @track_tool_call("get_current_time_in_timezone")
|
| def get_current_time_in_timezone(timezone: str) -> str:
|
| """A tool that fetches the current local time in a specified timezone.
|
| Args:
|
| timezone: A string representing a valid timezone (e.g., 'America/New_York').
|
| """
|
| try:
|
|
|
| tz = pytz.timezone(timezone)
|
|
|
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
|
| return f"The current local time in {timezone} is: {local_time}"
|
| except Exception as e:
|
| return f"Error fetching time for timezone '{timezone}': {str(e)}"
|
|
|
| @tool
|
| @track_tool_call("websearch")
|
| def websearch(query: str) -> str:
|
| """This tool will search the web using DuckDuckGo.
|
|
|
| Args:
|
| query: The search query.
|
| """
|
|
|
| try:
|
| print(f"websearch called: {query}")
|
| with DDGS() as ddgs:
|
| results = ddgs.text(query, max_results=5, timelimit='y')
|
| if results:
|
| print(f"websearch results: {len(results)}")
|
| return "\n\n".join([f"Title: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}" for r in results])
|
| return "No results found. Try search with a different query."
|
| except Exception as e:
|
| return f"Search error (try again): {str(e)}"
|
|
|
| @tool
|
| @track_tool_call("wiki_search")
|
| def wiki_search(query: str) -> str:
|
| """Search Wikipedia for a query and return maximum 3 results.
|
|
|
| Args:
|
| query: The search query."""
|
| try:
|
| print(f"wiki_search called: {query}")
|
|
|
| search_docs = WikipediaLoader(query=query, load_max_docs=3).load()
|
| formatted_search_docs = "\n\n---\n\n".join(
|
| [
|
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
|
| for doc in search_docs
|
| ])
|
| print(f"wiki_results: {len(formatted_search_docs)} characters")
|
| return {"wiki_results": formatted_search_docs}
|
| except Exception as e:
|
| return f"Error performing wikipedia search: {e}. try again."
|
|
|
| @tool
|
| @track_tool_call("arvix_search")
|
| def arvix_search(query: str) -> str:
|
| """Search Arxiv for a query and return maximum 3 result.
|
|
|
| Args:
|
| query: The search query."""
|
| try:
|
| print(f"arvix_search called: {query}")
|
|
|
| search_docs = ArxivLoader(query=query, load_max_docs=3).load()
|
| formatted_search_docs = "\n\n---\n\n".join(
|
| [
|
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
|
| for doc in search_docs
|
| ])
|
|
|
| print(f"arvix_results: {len(formatted_search_docs)} characters")
|
| return {"arvix_results": formatted_search_docs}
|
| except Exception as e:
|
| return f"Error performing arxiv search: {e}. try again."
|
|
|
| @tool
|
| @track_tool_call("get_youtube_transcript")
|
| def get_youtube_transcript(page_url: str) -> str:
|
| """Get the transcript of a YouTube video
|
|
|
| Args:
|
| page_url (str): YouTube URL of the video
|
| """
|
| print(f"get_youtube_transcript called: {page_url}")
|
|
|
| try:
|
|
|
| video_id = extract.video_id(page_url)
|
|
|
|
|
| ytt_api = YouTubeTranscriptApi()
|
| transcript = ytt_api.fetch(video_id)
|
|
|
|
|
| txt = '\n'.join([s.text for s in transcript.snippets])
|
| print(f"youtube_transcript: {len(txt)} characters")
|
| return txt
|
| except Exception as e:
|
| msg = f"get_youtube_transcript failed: {e}"
|
| print(msg)
|
| return msg
|
|
|
| @tool
|
| @track_tool_call("get_webpage_content")
|
| def get_webpage_content(page_url: str) -> str:
|
| """Load a web page and return it as markdown if possible
|
|
|
| Args:
|
| page_url (str): the URL of web page to get
|
|
|
| Returns:
|
| str: The content of the page(s).
|
| """
|
|
|
| try:
|
| print(f"get_web_page_content called: with url {page_url}")
|
| r = requests.get(page_url, timeout=30)
|
| r.raise_for_status()
|
| text = ""
|
|
|
| if r.headers.get('Content-Type', '') == 'application/pdf':
|
| pdf_file = BytesIO(r.content)
|
| reader = PdfReader(pdf_file)
|
| for page in reader.pages:
|
| text += page.extract_text()
|
| else:
|
| soup = BeautifulSoup((r.text), 'html.parser')
|
| if soup.body:
|
|
|
| text = md(str(soup.body))
|
| else:
|
|
|
| text = r.text
|
| print(f"webpage_content: {len(text)} characters")
|
| return text
|
| except Exception as e:
|
| return f"get_webpage_content failed: {e}"
|
|
|
| @tool
|
| @track_tool_call("read_excel_file")
|
| def read_excel_file(file_name: str) -> str:
|
| """
|
| Reads an Excel file (.xlsx) and returns its content as a Markdown table.
|
| Use this tool to inspect data stored in Excel spreadsheets.
|
|
|
| Args:
|
| file_name (str): The name of the file (e.g., 'data.xlsx'). Do not include the 'files/' prefix.
|
|
|
| Returns:
|
| str: The file content formatted as a Markdown table.
|
| """
|
|
|
| try:
|
| print(f"read_excel_file called: with file {file_name}")
|
|
|
|
|
| success, data = _get_file_content(file_name, mode='binary')
|
| if not success:
|
| return f"Error: Failed to read Excel file. {data}"
|
|
|
|
|
| df = pd.read_excel(BytesIO(data))
|
| return df.to_markdown(index=False)
|
|
|
| except Exception as e:
|
| return f"Error: Failed to read the Excel file. Reason: {e}"
|
|
|
| @tool
|
| @track_tool_call("read_python_script")
|
| def read_python_script(file_name: str) -> str:
|
| """
|
| Reads the source code of a Python script.
|
| Use this tool to examine the code logic of a .py file.
|
| Note: This does NOT execute the script, it only reads the text.
|
|
|
| Args:
|
| file_name (str): The name of the file (e.g., 'script.py'). Do not include the 'files/' prefix.
|
|
|
| Returns:
|
| str: The raw source code of the script.
|
| """
|
|
|
| try:
|
| print(f"read_python_script called: with file {file_name}")
|
|
|
|
|
| success, data = _get_file_content(file_name, mode='text')
|
| if not success:
|
| return f"Error: Failed to read Python script. {data}"
|
|
|
| return data
|
|
|
| except Exception as e:
|
| return f"Error: Failed to read the Python script. Reason: {e}"
|
|
|
| @tool
|
| @track_tool_call("parse_audio_file")
|
| def parse_audio_file(file_name: str) -> str:
|
| """
|
| Transcribes audio from an MP3 file into text.
|
| Use this tool to extract speech/text from audio files.
|
|
|
| Args:
|
| file_name (str): The name of the MP3 file (e.g., 'audio.mp3'). Do not include the 'files/' prefix.
|
|
|
| Returns:
|
| str: The transcribed text.
|
| """
|
|
|
| try:
|
| print(f"parse_audio_file called: with file {file_name}")
|
|
|
|
|
| success, data = _get_file_content(file_name, mode='binary')
|
| if not success:
|
| return f"Error: Failed to read audio file. {data}"
|
|
|
|
|
| audio = AudioSegment.from_file(io.BytesIO(data), format="mp3")
|
|
|
| wav_data = io.BytesIO()
|
| audio.export(wav_data, format="wav")
|
| wav_data.seek(0)
|
|
|
|
|
| recognizer = sr.Recognizer()
|
| with sr.AudioFile(wav_data) as source:
|
| audio_data = recognizer.record(source)
|
| text = recognizer.recognize_google(audio_data)
|
| return text
|
|
|
| except sr.RequestError as e:
|
| return f"Error: Could not request results from Google Web Speech API; {e}"
|
| except Exception as e:
|
| if "ffmpeg" in str(e).lower() or "avlib" in str(e).lower():
|
| return f"Error: Failed to process audio. Reason: {e}. Ensure ffmpeg is installed and in your system's PATH."
|
| return f"Error: Failed to parse the audio file. Reason: {e}"
|
|
|
| @tool
|
| @track_tool_call("analyze_youtube_video")
|
| def analyze_youtube_video(question: str, youtube_url: str) -> str:
|
| """
|
| Uses a multimodal AI model to analyze a YouTube video and answer a specific question.
|
| Use this tool when you need visual or audio understanding of a YouTube video (e.g., "What is shown in the video?").
|
|
|
| Args:
|
| question (str): The question you want answered about the video content.
|
| youtube_url (str): The full HTTPS URL of the YouTube video.
|
| """
|
|
|
| try:
|
| print(f"analyze_youtube_video called: {youtube_url} with question: {question}")
|
|
|
| api_key = os.getenv("GOOGLE_API_KEY")
|
| if not api_key:
|
| return "Error: GOOGLE_API_KEY environment variable not set"
|
|
|
| client = genai.Client(api_key=api_key)
|
|
|
|
|
| response = client.models.generate_content(
|
| model=config.GEMINI_MODEL,
|
| contents=[types.Content(
|
| parts=[
|
| types.Part(file_data=types.FileData(file_uri=youtube_url)),
|
| types.Part(text=question)
|
| ]
|
| )
|
| ],
|
| config=types.GenerateContentConfig(
|
| temperature=config.GEMINI_TEMPERATURE,
|
| max_output_tokens=config.GEMINI_MAX_TOKENS,
|
| )
|
| )
|
| return response.text
|
| except Exception as e:
|
| error_msg = f"Error analyzing video: {str(e)[:config.QUESTION_PREVIEW_LENGTH]}"
|
| print(error_msg)
|
| return error_msg
|
|
|
| @tool
|
| @track_tool_call("analyze_image")
|
| def analyze_image(question: str, file_name: str) -> str:
|
| """
|
| Analyzes an image file and answers a specific question about it using AI vision.
|
| Use this tool when you need to understand image content (e.g., chess positions, diagrams, photos).
|
|
|
| Args:
|
| question (str): The question you want answered about the image.
|
| file_name (str): The name of the image file (e.g., 'image.png'). Do not include the 'files/' prefix.
|
|
|
| Returns:
|
| str: The answer to the question based on the image analysis.
|
| """
|
|
|
| try:
|
| print(f"analyze_image called: {file_name} with question: {question}")
|
|
|
| api_key = os.getenv("GOOGLE_API_KEY")
|
| if not api_key:
|
| return "Error: GOOGLE_API_KEY environment variable not set"
|
|
|
|
|
| success, image_data = _get_file_content(file_name, mode='binary')
|
| if not success:
|
| return f"Error: Failed to read image file. {image_data}"
|
|
|
| client = genai.Client(api_key=api_key)
|
|
|
|
|
| response = client.models.generate_content(
|
| model=config.GEMINI_MODEL,
|
| contents=[types.Content(
|
| parts=[
|
| types.Part(inline_data=types.Blob(
|
| mime_type=_get_mime_type(file_name),
|
| data=image_data
|
| )),
|
| types.Part(text=question)
|
| ]
|
| )],
|
| config=types.GenerateContentConfig(
|
| temperature=config.GEMINI_TEMPERATURE,
|
| max_output_tokens=config.GEMINI_MAX_TOKENS,
|
| )
|
| )
|
| return response.text
|
|
|
| except Exception as e:
|
| error_msg = f"Error analyzing image: {str(e)[:config.QUESTION_PREVIEW_LENGTH]}"
|
| print(error_msg)
|
| return error_msg
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_custom_tools_list() -> list:
|
| """Get list of all custom tools for the agent.
|
|
|
| Returns:
|
| list: List of tool functions
|
| """
|
| tools = [
|
| add,
|
| subtract,
|
| multiply,
|
| divide,
|
| power,
|
| modulus,
|
| string_reverse,
|
| get_current_time_in_timezone,
|
| websearch,
|
| wiki_search,
|
| arvix_search,
|
| get_youtube_transcript,
|
| get_webpage_content,
|
| read_python_script,
|
| read_excel_file,
|
| parse_audio_file,
|
| analyze_youtube_video,
|
| analyze_image
|
| ]
|
| return tools
|
|
|