import os from typing import ClassVar from smolagents import Tool import datetime from PIL import Image import pandas as pd from prompts import SYSTEM_PROMPT from PIL import Image from smolagents import WikipediaSearchTool from dotenv import load_dotenv import time from langchain_google_community import GoogleSearchAPIWrapper from google import genai from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader load_dotenv() class AnyTypeFileAnalyzerTool(Tool): name: ClassVar[str] = "any_type_file_analyzer_tool" description: ClassVar[str] = ( "Analyze an image or audio mp3 file using Gemini. Supports jpg, png, and mp3." ) inputs: ClassVar[dict] = { "analysis_description": { "type": "string", "description": "Describe what you want to analyze from the file", }, "file_path": {"type": "string", "description": "Path to image/audio file"}, } output_type: ClassVar[str] = "string" def forward(self, analysis_description: str, file_path: str): client = genai.Client( api_key=os.getenv("GEMINI_API_KEY"), ) try: file = client.files.upload(file=file_path) except Exception as e: return f"Error uploading file: {e}" try: full_description = SYSTEM_PROMPT + f"\n\n{analysis_description}" GENAI_MODEL = os.getenv("GENAI_MODEL") response = client.models.generate_content( model=GENAI_MODEL, contents=[full_description, file], ) return response.text except Exception as e: return f"Error from Gemini: {e}" def _get_mime_type(self, file_path: str) -> str: if file_path.endswith(".png"): return "image/png" elif file_path.endswith(".jpg") or file_path.endswith(".jpeg"): return "image/jpeg" elif file_path.endswith(".mp3"): return "audio/mpeg" else: raise ValueError( "Unsupported file type: only .png, .jpg, .jpeg, .mp3 are supported" ) class CodeFileReadTool(Tool): name: ClassVar[str] = "read_code_file" description: ClassVar[str] = ( "Read a code or text file (Python, JavaScript, Java, HTML, CSS, etc.) and return its content as a formatted code block with the correct language extension." ) inputs: ClassVar[dict] = { "file_path": {"type": "string", "description": "Path to the code or text file"}, } output_type: ClassVar[str] = "string" def forward(self, file_path: str): try: # Detect extension and map to language ext = os.path.splitext(file_path)[-1].lower() ext_to_lang = { ".py": "python", ".js": "javascript", ".java": "java", ".html": "html", ".css": "css", ".json": "json", ".txt": "", ".md": "markdown", ".c": "c", ".cpp": "cpp", ".ts": "typescript", ".sh": "bash", ".xml": "xml", ".yml": "yaml", ".yaml": "yaml", } lang = ext_to_lang.get(ext, "") with open(file_path, "r", encoding="utf-8") as f: content = f.read() # Format as markdown code block if lang: return f"```{lang}\n{content}\n```" else: return f"```\n{content}\n```" except Exception as e: return f"Error reading file: {e}" class ExcelAndCSVTableInspectorTool(Tool): name: ClassVar[str] = "excel_csv_file_analyzer" description: ClassVar[str] = ( "Load a CSV or Excel file and return table info and summary stats in Markdown format." ) inputs: ClassVar[dict] = { "file_path": { "type": "string", "description": "Path to CSV or Excel file (.csv, .xls, .xlsx)", } } output_type: ClassVar[str] = "string" def forward(self, file_path: str): try: if file_path.endswith(".csv"): df = pd.read_csv(file_path) elif file_path.endswith(".xls") or file_path.endswith(".xlsx"): df = pd.read_excel(file_path) else: return "Unsupported file type. Only CSV and Excel (.xls/.xlsx) are supported." md = df.to_markdown(index=False, tablefmt="pipe") return md except Exception as e: return f"Error loading file: {str(e)}" class YouTubeVideoAnalyzerTool(Tool): name: ClassVar[str] = "youtube_video_analyzer" description: ClassVar[str] = ( "Given a YouTube URL, analyzes the video content using Gemini (google-genai SDK) and answers user queries about it." ) inputs: ClassVar[dict] = { "url": {"type": "string", "description": "Full YouTube video URL"}, "user_prompt": { "type": "string", "description": "What you want to analyze from the video content", }, } output_type: ClassVar[str] = "string" def forward(self, url: str, user_prompt: str): api_key = os.getenv("GEMINI_API_KEY") if not api_key: return "Error: GEMINI_API_KEY environment variable is not set." try: genai.configure(api_key=api_key) model_name = os.getenv("GENAI_MODEL", "gemini-1.5-pro") model = genai.GenerativeModel(model_name) prompt = f""" You are an AI video analyzer. A user wants to analyze the following YouTube video. ### Video URL {url} ### User Request {user_prompt} ### Instructions: - Analyze the video content based on the user's request. - Identify the main key thing needed to be analyzed. - Provide the answer as per system prompt. """ response = model.generate_content([prompt]) return response.text if hasattr(response, "text") else str(response) except Exception as e: return f"Error analyzing video with Gemini: {e}" # --- Math Tools --- class CalculatorTool(Tool): name: ClassVar[str] = "calculator" description: ClassVar[str] = ( "Evaluate a basic mathematical expression (supports +, -, *, /, **, %, etc.)." ) inputs: ClassVar[dict] = { "expression": { "type": "string", "description": "A mathematical expression to evaluate", } } output_type: ClassVar[str] = "number" def forward(self, expression: str): # Safely evaluate the expression using ast import ast, operator # Allowed node types allowed_nodes = { ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num, ast.Constant, ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow, ast.Mod, ast.USub, ast.UAdd, } node = ast.parse(expression, mode="eval") for subnode in ast.walk(node): if type(subnode) not in allowed_nodes: raise ValueError(f"Unsafe or unsupported expression: {expression}") return eval(compile(node, "", "eval")) # Optionally, separate basic operations could be defined (e.g., add, subtract). class AddTool(Tool): name: ClassVar[str] = "add" description: ClassVar[str] = "Add two numbers together." inputs: ClassVar[dict] = { "a": {"type": "number", "description": "First number"}, "b": {"type": "number", "description": "Second number"}, } output_type: ClassVar[str] = "number" def forward(self, a: float, b: float): return a + b class MultiplyTool(Tool): name: ClassVar[str] = "multiply" description: ClassVar[str] = "Multiply two numbers." inputs: ClassVar[dict] = { "a": {"type": "number", "description": "First number"}, "b": {"type": "number", "description": "Second number"}, } output_type: ClassVar[str] = "number" def forward(self, a: float, b: float): return a * b # --- Date/Time Tools --- class DayOfWeekTool(Tool): name: ClassVar[str] = "day_of_week" description: ClassVar[str] = "Return the day of week for a given date (YYYY-MM-DD)." inputs: ClassVar[dict] = { "date": {"type": "string", "description": "Date in format YYYY-MM-DD"} } output_type: ClassVar[str] = "string" def forward(self, date: str): year, month, day = map(int, date.split("-")) dow = datetime.date(year, month, day).strftime("%A") return dow class AddDaysTool(Tool): name: ClassVar[str] = "add_days" description: ClassVar[str] = "Add a number of days to a date (YYYY-MM-DD)." inputs: ClassVar[dict] = { "date": {"type": "string", "description": "Start date (YYYY-MM-DD)"}, "days": {"type": "integer", "description": "Number of days to add"}, } output_type: ClassVar[str] = "string" def forward(self, date: str, days: int): year, month, day = map(int, date.split("-")) new_date = datetime.date(year, month, day) + datetime.timedelta(days=days) return new_date.isoformat() class DateDiffTool(Tool): name: ClassVar[str] = "date_diff" description: ClassVar[str] = ( "Compute difference in days between two dates (YYYY-MM-DD)." ) inputs: ClassVar[dict] = { "start_date": {"type": "string", "description": "First date (YYYY-MM-DD)"}, "end_date": {"type": "string", "description": "Second date (YYYY-MM-DD)"}, } output_type: ClassVar[str] = "integer" def forward(self, start_date: str, end_date: str): y1, m1, d1 = map(int, start_date.split("-")) y2, m2, d2 = map(int, end_date.split("-")) d0 = datetime.date(y1, m1, d1) d1 = datetime.date(y2, m2, d2) return abs((d1 - d0).days) # --- Unit Conversion Tools --- class TempConvertTool(Tool): name: ClassVar[str] = "convert_temperature" description: ClassVar[str] = "Convert temperature between Celsius and Fahrenheit." inputs: ClassVar[dict] = { "value": {"type": "number", "description": "Temperature value to convert"}, "from_unit": {"type": "string", "description": "Unit of input ('C' or 'F')"}, } output_type: ClassVar[str] = "number" def forward(self, value: float, from_unit: str): unit = from_unit.strip().upper() if unit == "C": # Celsius to Fahrenheit return value * 9 / 5 + 32 elif unit == "F": # Fahrenheit to Celsius return (value - 32) * 5 / 9 else: raise ValueError("Unit must be 'C' or 'F'.") class LengthConvertTool(Tool): name: ClassVar[str] = "convert_length" description: ClassVar[str] = ( "Convert length between kilometers, miles, meters, and feet." ) inputs: ClassVar[dict] = { "value": {"type": "number", "description": "Length value to convert"}, "from_unit": { "type": "string", "description": "Original unit ('km','mi','m','ft')", }, "to_unit": { "type": "string", "description": "Target unit ('km','mi','m','ft')", }, } output_type: ClassVar[str] = "number" def forward(self, value: float, from_unit: str, to_unit: str): u1 = from_unit.lower() u2 = to_unit.lower() # Convert input to meters first if u1 == "km": meters = value * 1000 elif u1 == "m": meters = value elif u1 == "mi": meters = value * 1609.34 elif u1 == "ft": meters = value * 0.3048 else: raise ValueError("Unsupported from_unit") # Convert meters to target unit if u2 == "km": return meters / 1000 if u2 == "m": return meters if u2 == "mi": return meters / 1609.34 if u2 == "ft": return meters / 0.3048 raise ValueError("Unsupported to_unit") # --- Text Tools --- class WordCountTool(Tool): name: ClassVar[str] = "word_count" description: ClassVar[str] = "Count the number of words in a text string." inputs: ClassVar[dict] = {"text": {"type": "string", "description": "Input text"}} output_type: ClassVar[str] = "integer" def forward(self, text: str): return len(text.split()) class FindTextTool(Tool): name: ClassVar[str] = "find_text" description: ClassVar[str] = ( "Find occurrences of a substring in a text; returns count." ) inputs: ClassVar[dict] = { "text": {"type": "string", "description": "Text to search in"}, "query": {"type": "string", "description": "Substring to search for"}, } output_type: ClassVar[str] = "integer" def forward(self, text: str, query: str): return text.count(query) # --- List/Sequence Tools --- class SortListTool(Tool): name: ClassVar[str] = "sort_list" description: ClassVar[str] = "Sort a list of items (numbers or strings)." inputs: ClassVar[dict] = { "items": {"type": "array", "description": "List of items to sort"} } output_type: ClassVar[str] = "array" def forward(self, items): return sorted(items) class UniqueListTool(Tool): name: ClassVar[str] = "unique_list" description: ClassVar[str] = ( "Return a list with duplicate items removed (preserving order)." ) inputs: ClassVar[dict] = { "items": {"type": "array", "description": "List of items"} } output_type: ClassVar[str] = "array" def forward(self, items): seen = [] for x in items: if x not in seen: seen.append(x) return seen # --- File I/O Tools --- class ReadFileTool(Tool): name: ClassVar[str] = "read_file" description: ClassVar[str] = "Read and return the contents of a text file." inputs: ClassVar[dict] = { "file_path": {"type": "string", "description": "Path to a text file"} } output_type: ClassVar[str] = "string" def forward(self, file_path: str): try: with open(file_path, "r") as f: return f.read() except FileNotFoundError: return f"Error: File not found: {file_path}" class WriteFileTool(Tool): name: ClassVar[str] = "write_file" description: ClassVar[str] = "Write a string to a text file (overwrites if exists)." inputs: ClassVar[dict] = { "file_path": {"type": "string", "description": "Path to write the file"}, "content": {"type": "string", "description": "Content to write"}, } output_type: ClassVar[str] = "string" def forward(self, file_path: str, content: str): with open(file_path, "w") as f: f.write(content) return f"Wrote to {file_path}" # --- Image Tool (stub) --- class ImageInfoTool(Tool): name: ClassVar[str] = "image_info" description: ClassVar[str] = "Load an image and report basic info (size and mode)." inputs: ClassVar[dict] = { "image_path": {"type": "string", "description": "Path to an image file"} } output_type: ClassVar[str] = "string" def forward(self, image_path: str): try: img = Image.open(image_path) return f"Image {image_path}: size={img.size}, mode={img.mode}" except Exception as e: return f"Error loading image: {e}" class WikipediaCustomTool(Tool): name: ClassVar[str] = "wikipedia_search_summary" description: ClassVar[str] = "Search Wikipedia and return max 3 results" inputs: ClassVar[dict] = { "query": { "type": "string", "description": "Search Query for Wikipedia", } } output_type: ClassVar[str] = "string" def forward(self, query: str) -> str: try: search_docs = WikipediaLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"wiki_results": formatted_search_docs} except Exception as e: return f"Error using Wikipedia API: {e}" class ArvixSearchTool(Tool): name: ClassVar[str] = "arvix_search" description: ClassVar[str] = "Search Arvix for a query and return maximum 3 result" inputs: ClassVar[dict] = { "query": { "type": "string", "description": "Search Query for Arvix", } } output_type: ClassVar[str] = "string" def forward(self, query: str) -> str: try: search_docs = ArxivLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ] ) return {"arvix_results": formatted_search_docs} except Exception as e: return f"Error using Arvix Tool: {e}" class GoogleSearchTool(Tool): name: ClassVar[str] = "google_search" description: ClassVar[str] = ( "Search the web using Google Search Engine and return results" ) inputs: ClassVar[dict] = { "query": { "type": "string", "description": "Search term to find information on Web", } } output_type: ClassVar[str] = "string" def forward(self, query: str) -> str: try: # Initialize Google Search API Wrapper google_search = GoogleSearchAPIWrapper( google_api_key=os.getenv("GOOGLE_API_KEY"), google_cse_id=os.getenv("GOOGLE_CSE_ID"), ) # Perform the search results = google_search.results(query, num_results=5) if not results: return f"No results found for: '{query}'" formatted = "\n\n".join( f"{i+1}. **{r['title']}**\n{r['link']}\n{r['snippet']}" for i, r in enumerate(results) ) return f"**Search Results for '{query}':**\n\n{formatted}" except Exception as e: return f"Error using Google Search API: {e}" class AdvanceGoogleAISearchTool(Tool): name: ClassVar[str] = "google_ai_search" description: ClassVar[str] = ( "Search the web using Google AI Search Engine and return results" ) inputs: ClassVar[dict] = { "query": { "type": "string", "description": "Search term to find information on Web", } } output_type: ClassVar[str] = "string" def forward(self, query: str) -> str: try: client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) response = client.models.generate_content( model=os.getenv("GENAI_MODEL"), contents=[query], ) if not response: return f"No results found for: '{query}'" return f"**Search Results for '{query}':**\n\n{response.text}" except Exception as e: return f"Error using Google AI Search API: {e}" # List of all available tools agent_tools = [ AnyTypeFileAnalyzerTool(), ExcelAndCSVTableInspectorTool(), YouTubeVideoAnalyzerTool(), CalculatorTool(), AddTool(), MultiplyTool(), DayOfWeekTool(), AddDaysTool(), DateDiffTool(), TempConvertTool(), LengthConvertTool(), WordCountTool(), FindTextTool(), SortListTool(), UniqueListTool(), GoogleSearchTool(), WikipediaCustomTool(), ArvixSearchTool(), AdvanceGoogleAISearchTool(), ]