| | import os |
| | from typing import ClassVar |
| | from smolagents import Tool |
| | import datetime |
| | from PIL import Image |
| | import pandas as pd |
| | from prompts import SYSTEM_PROMPT |
| | from PIL import Image |
| | from smolagents import WikipediaSearchTool |
| | from dotenv import load_dotenv |
| | import time |
| | from langchain_google_community import GoogleSearchAPIWrapper |
| | from google import genai |
| | from langchain_community.document_loaders import WikipediaLoader |
| | from langchain_community.document_loaders import ArxivLoader |
| |
|
| |
|
| | load_dotenv() |
| |
|
| |
|
| | class AnyTypeFileAnalyzerTool(Tool): |
| | name: ClassVar[str] = "any_type_file_analyzer_tool" |
| | description: ClassVar[str] = ( |
| | "Analyze an image or audio mp3 file using Gemini. Supports jpg, png, and mp3." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "analysis_description": { |
| | "type": "string", |
| | "description": "Describe what you want to analyze from the file", |
| | }, |
| | "file_path": {"type": "string", "description": "Path to image/audio file"}, |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, analysis_description: str, file_path: str): |
| |
|
| | client = genai.Client( |
| | api_key=os.getenv("GEMINI_API_KEY"), |
| | ) |
| | try: |
| | file = client.files.upload(file=file_path) |
| | except Exception as e: |
| | return f"Error uploading file: {e}" |
| |
|
| | try: |
| | full_description = SYSTEM_PROMPT + f"\n\n{analysis_description}" |
| | GENAI_MODEL = os.getenv("GENAI_MODEL") |
| | response = client.models.generate_content( |
| | model=GENAI_MODEL, |
| | contents=[full_description, file], |
| | ) |
| | return response.text |
| | except Exception as e: |
| | return f"Error from Gemini: {e}" |
| |
|
| | def _get_mime_type(self, file_path: str) -> str: |
| | if file_path.endswith(".png"): |
| | return "image/png" |
| | elif file_path.endswith(".jpg") or file_path.endswith(".jpeg"): |
| | return "image/jpeg" |
| | elif file_path.endswith(".mp3"): |
| | return "audio/mpeg" |
| | else: |
| | raise ValueError( |
| | "Unsupported file type: only .png, .jpg, .jpeg, .mp3 are supported" |
| | ) |
| |
|
| |
|
| | class CodeFileReadTool(Tool): |
| | name: ClassVar[str] = "read_code_file" |
| | description: ClassVar[str] = ( |
| | "Read a code or text file (Python, JavaScript, Java, HTML, CSS, etc.) and return its content as a formatted code block with the correct language extension." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "file_path": {"type": "string", "description": "Path to the code or text file"}, |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, file_path: str): |
| | try: |
| | |
| | ext = os.path.splitext(file_path)[-1].lower() |
| | ext_to_lang = { |
| | ".py": "python", |
| | ".js": "javascript", |
| | ".java": "java", |
| | ".html": "html", |
| | ".css": "css", |
| | ".json": "json", |
| | ".txt": "", |
| | ".md": "markdown", |
| | ".c": "c", |
| | ".cpp": "cpp", |
| | ".ts": "typescript", |
| | ".sh": "bash", |
| | ".xml": "xml", |
| | ".yml": "yaml", |
| | ".yaml": "yaml", |
| | } |
| | lang = ext_to_lang.get(ext, "") |
| |
|
| | with open(file_path, "r", encoding="utf-8") as f: |
| | content = f.read() |
| |
|
| | |
| | if lang: |
| | return f"```{lang}\n{content}\n```" |
| | else: |
| | return f"```\n{content}\n```" |
| | except Exception as e: |
| | return f"Error reading file: {e}" |
| |
|
| |
|
| | class ExcelAndCSVTableInspectorTool(Tool): |
| | name: ClassVar[str] = "excel_csv_file_analyzer" |
| | description: ClassVar[str] = ( |
| | "Load a CSV or Excel file and return table info and summary stats in Markdown format." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "file_path": { |
| | "type": "string", |
| | "description": "Path to CSV or Excel file (.csv, .xls, .xlsx)", |
| | } |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, file_path: str): |
| | try: |
| | if file_path.endswith(".csv"): |
| | df = pd.read_csv(file_path) |
| | elif file_path.endswith(".xls") or file_path.endswith(".xlsx"): |
| | df = pd.read_excel(file_path) |
| | else: |
| | return "Unsupported file type. Only CSV and Excel (.xls/.xlsx) are supported." |
| |
|
| | md = df.to_markdown(index=False, tablefmt="pipe") |
| |
|
| | return md |
| |
|
| | except Exception as e: |
| | return f"Error loading file: {str(e)}" |
| |
|
| |
|
| | class YouTubeVideoAnalyzerTool(Tool): |
| | name: ClassVar[str] = "youtube_video_analyzer" |
| | description: ClassVar[str] = ( |
| | "Given a YouTube URL, analyzes the video content using Gemini (google-genai SDK) and answers user queries about it." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "url": {"type": "string", "description": "Full YouTube video URL"}, |
| | "user_prompt": { |
| | "type": "string", |
| | "description": "What you want to analyze from the video content", |
| | }, |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, url: str, user_prompt: str): |
| | api_key = os.getenv("GEMINI_API_KEY") |
| | if not api_key: |
| | return "Error: GEMINI_API_KEY environment variable is not set." |
| | try: |
| | genai.configure(api_key=api_key) |
| | model_name = os.getenv("GENAI_MODEL", "gemini-1.5-pro") |
| | model = genai.GenerativeModel(model_name) |
| | prompt = f""" |
| | You are an AI video analyzer. A user wants to analyze the following YouTube video. |
| | |
| | ### Video URL |
| | {url} |
| | |
| | ### User Request |
| | {user_prompt} |
| | |
| | ### Instructions: |
| | - Analyze the video content based on the user's request. |
| | - Identify the main key thing needed to be analyzed. |
| | - Provide the answer as per system prompt. |
| | """ |
| | response = model.generate_content([prompt]) |
| | return response.text if hasattr(response, "text") else str(response) |
| | except Exception as e: |
| | return f"Error analyzing video with Gemini: {e}" |
| |
|
| |
|
| | |
| | class CalculatorTool(Tool): |
| | name: ClassVar[str] = "calculator" |
| | description: ClassVar[str] = ( |
| | "Evaluate a basic mathematical expression (supports +, -, *, /, **, %, etc.)." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "expression": { |
| | "type": "string", |
| | "description": "A mathematical expression to evaluate", |
| | } |
| | } |
| | output_type: ClassVar[str] = "number" |
| |
|
| | def forward(self, expression: str): |
| | |
| | import ast, operator |
| |
|
| | |
| | allowed_nodes = { |
| | ast.Expression, |
| | ast.BinOp, |
| | ast.UnaryOp, |
| | ast.Num, |
| | ast.Constant, |
| | ast.Add, |
| | ast.Sub, |
| | ast.Mult, |
| | ast.Div, |
| | ast.Pow, |
| | ast.Mod, |
| | ast.USub, |
| | ast.UAdd, |
| | } |
| | node = ast.parse(expression, mode="eval") |
| | for subnode in ast.walk(node): |
| | if type(subnode) not in allowed_nodes: |
| | raise ValueError(f"Unsafe or unsupported expression: {expression}") |
| | return eval(compile(node, "<string>", "eval")) |
| |
|
| |
|
| | |
| | class AddTool(Tool): |
| | name: ClassVar[str] = "add" |
| | description: ClassVar[str] = "Add two numbers together." |
| | inputs: ClassVar[dict] = { |
| | "a": {"type": "number", "description": "First number"}, |
| | "b": {"type": "number", "description": "Second number"}, |
| | } |
| | output_type: ClassVar[str] = "number" |
| |
|
| | def forward(self, a: float, b: float): |
| | return a + b |
| |
|
| |
|
| | class MultiplyTool(Tool): |
| | name: ClassVar[str] = "multiply" |
| | description: ClassVar[str] = "Multiply two numbers." |
| | inputs: ClassVar[dict] = { |
| | "a": {"type": "number", "description": "First number"}, |
| | "b": {"type": "number", "description": "Second number"}, |
| | } |
| | output_type: ClassVar[str] = "number" |
| |
|
| | def forward(self, a: float, b: float): |
| | return a * b |
| |
|
| |
|
| | |
| | class DayOfWeekTool(Tool): |
| | name: ClassVar[str] = "day_of_week" |
| | description: ClassVar[str] = "Return the day of week for a given date (YYYY-MM-DD)." |
| | inputs: ClassVar[dict] = { |
| | "date": {"type": "string", "description": "Date in format YYYY-MM-DD"} |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, date: str): |
| | year, month, day = map(int, date.split("-")) |
| | dow = datetime.date(year, month, day).strftime("%A") |
| | return dow |
| |
|
| |
|
| | class AddDaysTool(Tool): |
| | name: ClassVar[str] = "add_days" |
| | description: ClassVar[str] = "Add a number of days to a date (YYYY-MM-DD)." |
| | inputs: ClassVar[dict] = { |
| | "date": {"type": "string", "description": "Start date (YYYY-MM-DD)"}, |
| | "days": {"type": "integer", "description": "Number of days to add"}, |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, date: str, days: int): |
| | year, month, day = map(int, date.split("-")) |
| | new_date = datetime.date(year, month, day) + datetime.timedelta(days=days) |
| | return new_date.isoformat() |
| |
|
| |
|
| | class DateDiffTool(Tool): |
| | name: ClassVar[str] = "date_diff" |
| | description: ClassVar[str] = ( |
| | "Compute difference in days between two dates (YYYY-MM-DD)." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "start_date": {"type": "string", "description": "First date (YYYY-MM-DD)"}, |
| | "end_date": {"type": "string", "description": "Second date (YYYY-MM-DD)"}, |
| | } |
| | output_type: ClassVar[str] = "integer" |
| |
|
| | def forward(self, start_date: str, end_date: str): |
| | y1, m1, d1 = map(int, start_date.split("-")) |
| | y2, m2, d2 = map(int, end_date.split("-")) |
| | d0 = datetime.date(y1, m1, d1) |
| | d1 = datetime.date(y2, m2, d2) |
| | return abs((d1 - d0).days) |
| |
|
| |
|
| | |
| | class TempConvertTool(Tool): |
| | name: ClassVar[str] = "convert_temperature" |
| | description: ClassVar[str] = "Convert temperature between Celsius and Fahrenheit." |
| | inputs: ClassVar[dict] = { |
| | "value": {"type": "number", "description": "Temperature value to convert"}, |
| | "from_unit": {"type": "string", "description": "Unit of input ('C' or 'F')"}, |
| | } |
| | output_type: ClassVar[str] = "number" |
| |
|
| | def forward(self, value: float, from_unit: str): |
| | unit = from_unit.strip().upper() |
| | if unit == "C": |
| | |
| | return value * 9 / 5 + 32 |
| | elif unit == "F": |
| | |
| | return (value - 32) * 5 / 9 |
| | else: |
| | raise ValueError("Unit must be 'C' or 'F'.") |
| |
|
| |
|
| | class LengthConvertTool(Tool): |
| | name: ClassVar[str] = "convert_length" |
| | description: ClassVar[str] = ( |
| | "Convert length between kilometers, miles, meters, and feet." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "value": {"type": "number", "description": "Length value to convert"}, |
| | "from_unit": { |
| | "type": "string", |
| | "description": "Original unit ('km','mi','m','ft')", |
| | }, |
| | "to_unit": { |
| | "type": "string", |
| | "description": "Target unit ('km','mi','m','ft')", |
| | }, |
| | } |
| | output_type: ClassVar[str] = "number" |
| |
|
| | def forward(self, value: float, from_unit: str, to_unit: str): |
| | u1 = from_unit.lower() |
| | u2 = to_unit.lower() |
| | |
| | if u1 == "km": |
| | meters = value * 1000 |
| | elif u1 == "m": |
| | meters = value |
| | elif u1 == "mi": |
| | meters = value * 1609.34 |
| | elif u1 == "ft": |
| | meters = value * 0.3048 |
| | else: |
| | raise ValueError("Unsupported from_unit") |
| | |
| | if u2 == "km": |
| | return meters / 1000 |
| | if u2 == "m": |
| | return meters |
| | if u2 == "mi": |
| | return meters / 1609.34 |
| | if u2 == "ft": |
| | return meters / 0.3048 |
| | raise ValueError("Unsupported to_unit") |
| |
|
| |
|
| | |
| | class WordCountTool(Tool): |
| | name: ClassVar[str] = "word_count" |
| | description: ClassVar[str] = "Count the number of words in a text string." |
| | inputs: ClassVar[dict] = {"text": {"type": "string", "description": "Input text"}} |
| | output_type: ClassVar[str] = "integer" |
| |
|
| | def forward(self, text: str): |
| | return len(text.split()) |
| |
|
| |
|
| | class FindTextTool(Tool): |
| | name: ClassVar[str] = "find_text" |
| | description: ClassVar[str] = ( |
| | "Find occurrences of a substring in a text; returns count." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "text": {"type": "string", "description": "Text to search in"}, |
| | "query": {"type": "string", "description": "Substring to search for"}, |
| | } |
| | output_type: ClassVar[str] = "integer" |
| |
|
| | def forward(self, text: str, query: str): |
| | return text.count(query) |
| |
|
| |
|
| | |
| | class SortListTool(Tool): |
| | name: ClassVar[str] = "sort_list" |
| | description: ClassVar[str] = "Sort a list of items (numbers or strings)." |
| | inputs: ClassVar[dict] = { |
| | "items": {"type": "array", "description": "List of items to sort"} |
| | } |
| | output_type: ClassVar[str] = "array" |
| |
|
| | def forward(self, items): |
| | return sorted(items) |
| |
|
| |
|
| | class UniqueListTool(Tool): |
| | name: ClassVar[str] = "unique_list" |
| | description: ClassVar[str] = ( |
| | "Return a list with duplicate items removed (preserving order)." |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "items": {"type": "array", "description": "List of items"} |
| | } |
| | output_type: ClassVar[str] = "array" |
| |
|
| | def forward(self, items): |
| | seen = [] |
| | for x in items: |
| | if x not in seen: |
| | seen.append(x) |
| | return seen |
| |
|
| |
|
| | |
| | class ReadFileTool(Tool): |
| | name: ClassVar[str] = "read_file" |
| | description: ClassVar[str] = "Read and return the contents of a text file." |
| | inputs: ClassVar[dict] = { |
| | "file_path": {"type": "string", "description": "Path to a text file"} |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, file_path: str): |
| | try: |
| | with open(file_path, "r") as f: |
| | return f.read() |
| | except FileNotFoundError: |
| | return f"Error: File not found: {file_path}" |
| |
|
| |
|
| | class WriteFileTool(Tool): |
| | name: ClassVar[str] = "write_file" |
| | description: ClassVar[str] = "Write a string to a text file (overwrites if exists)." |
| | inputs: ClassVar[dict] = { |
| | "file_path": {"type": "string", "description": "Path to write the file"}, |
| | "content": {"type": "string", "description": "Content to write"}, |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, file_path: str, content: str): |
| | with open(file_path, "w") as f: |
| | f.write(content) |
| | return f"Wrote to {file_path}" |
| |
|
| |
|
| | |
| | class ImageInfoTool(Tool): |
| | name: ClassVar[str] = "image_info" |
| | description: ClassVar[str] = "Load an image and report basic info (size and mode)." |
| | inputs: ClassVar[dict] = { |
| | "image_path": {"type": "string", "description": "Path to an image file"} |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, image_path: str): |
| | try: |
| | img = Image.open(image_path) |
| | return f"Image {image_path}: size={img.size}, mode={img.mode}" |
| | except Exception as e: |
| | return f"Error loading image: {e}" |
| |
|
| |
|
| | class WikipediaCustomTool(Tool): |
| | name: ClassVar[str] = "wikipedia_search_summary" |
| | description: ClassVar[str] = "Search Wikipedia and return max 3 results" |
| | inputs: ClassVar[dict] = { |
| | "query": { |
| | "type": "string", |
| | "description": "Search Query for Wikipedia", |
| | } |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, query: str) -> str: |
| | try: |
| | search_docs = WikipediaLoader(query=query, load_max_docs=3).load() |
| | formatted_search_docs = "\n\n---\n\n".join( |
| | [ |
| | f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
| | for doc in search_docs |
| | ] |
| | ) |
| | return {"wiki_results": formatted_search_docs} |
| |
|
| | except Exception as e: |
| | return f"Error using Wikipedia API: {e}" |
| |
|
| |
|
| | class ArvixSearchTool(Tool): |
| | name: ClassVar[str] = "arvix_search" |
| | description: ClassVar[str] = "Search Arvix for a query and return maximum 3 result" |
| | inputs: ClassVar[dict] = { |
| | "query": { |
| | "type": "string", |
| | "description": "Search Query for Arvix", |
| | } |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, query: str) -> str: |
| | try: |
| | search_docs = ArxivLoader(query=query, load_max_docs=3).load() |
| | formatted_search_docs = "\n\n---\n\n".join( |
| | [ |
| | f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' |
| | for doc in search_docs |
| | ] |
| | ) |
| | return {"arvix_results": formatted_search_docs} |
| |
|
| | except Exception as e: |
| | return f"Error using Arvix Tool: {e}" |
| |
|
| |
|
| | class GoogleSearchTool(Tool): |
| | name: ClassVar[str] = "google_search" |
| | description: ClassVar[str] = ( |
| | "Search the web using Google Search Engine and return results" |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "query": { |
| | "type": "string", |
| | "description": "Search term to find information on Web", |
| | } |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, query: str) -> str: |
| | try: |
| | |
| | google_search = GoogleSearchAPIWrapper( |
| | google_api_key=os.getenv("GOOGLE_API_KEY"), |
| | google_cse_id=os.getenv("GOOGLE_CSE_ID"), |
| | ) |
| |
|
| | |
| | results = google_search.results(query, num_results=5) |
| | if not results: |
| | return f"No results found for: '{query}'" |
| |
|
| | formatted = "\n\n".join( |
| | f"{i+1}. **{r['title']}**\n{r['link']}\n{r['snippet']}" |
| | for i, r in enumerate(results) |
| | ) |
| |
|
| | return f"**Search Results for '{query}':**\n\n{formatted}" |
| |
|
| | except Exception as e: |
| | return f"Error using Google Search API: {e}" |
| |
|
| |
|
| | class AdvanceGoogleAISearchTool(Tool): |
| | name: ClassVar[str] = "google_ai_search" |
| | description: ClassVar[str] = ( |
| | "Search the web using Google AI Search Engine and return results" |
| | ) |
| | inputs: ClassVar[dict] = { |
| | "query": { |
| | "type": "string", |
| | "description": "Search term to find information on Web", |
| | } |
| | } |
| | output_type: ClassVar[str] = "string" |
| |
|
| | def forward(self, query: str) -> str: |
| | try: |
| | client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) |
| | response = client.models.generate_content( |
| | model=os.getenv("GENAI_MODEL"), |
| | contents=[query], |
| | ) |
| |
|
| | if not response: |
| | return f"No results found for: '{query}'" |
| |
|
| | return f"**Search Results for '{query}':**\n\n{response.text}" |
| |
|
| | except Exception as e: |
| | return f"Error using Google AI Search API: {e}" |
| |
|
| |
|
| | |
| | agent_tools = [ |
| | AnyTypeFileAnalyzerTool(), |
| | ExcelAndCSVTableInspectorTool(), |
| | YouTubeVideoAnalyzerTool(), |
| | CalculatorTool(), |
| | AddTool(), |
| | MultiplyTool(), |
| | DayOfWeekTool(), |
| | AddDaysTool(), |
| | DateDiffTool(), |
| | TempConvertTool(), |
| | LengthConvertTool(), |
| | WordCountTool(), |
| | FindTextTool(), |
| | SortListTool(), |
| | UniqueListTool(), |
| | GoogleSearchTool(), |
| | WikipediaCustomTool(), |
| | ArvixSearchTool(), |
| | AdvanceGoogleAISearchTool(), |
| | ] |
| |
|