| | from smolagents import Tool |
| | import pandas as pd |
| | import os |
| | import tempfile |
| | import requests |
| | from urllib.parse import urlparse |
| | import json |
| | import re |
| | from datetime import datetime, timedelta |
| |
|
| | class ReverseTextTool(Tool): |
| | name = "reverse_text" |
| | description = "Reverses the text in a string." |
| | inputs = { |
| | "text": { |
| | "type": "string", |
| | "description": "The text to reverse." |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, text: str) -> str: |
| | return text[::-1] |
| |
|
| | class ExtractTextFromImageTool(Tool): |
| | name = "extract_text_from_image" |
| | description = "Extracts text from an image file using OCR." |
| | inputs = { |
| | "image_path": { |
| | "type": "string", |
| | "description": "Path to the image file." |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, image_path: str) -> str: |
| | try: |
| | |
| | import pytesseract |
| | from PIL import Image |
| | |
| | |
| | image = Image.open(image_path) |
| | |
| | |
| | configs = [ |
| | '--psm 6', |
| | '--psm 3', |
| | '--psm 1', |
| | ] |
| | |
| | results = [] |
| | for config in configs: |
| | try: |
| | text = pytesseract.image_to_string(image, config=config) |
| | if text.strip(): |
| | results.append(text) |
| | except Exception: |
| | continue |
| | |
| | if results: |
| | |
| | return f"Extracted text from image:\n\n{max(results, key=len)}" |
| | else: |
| | return "No text could be extracted from the image." |
| | except ImportError: |
| | return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system." |
| | except Exception as e: |
| | return f"Error extracting text from image: {str(e)}" |
| |
|
| | class AnalyzeCSVTool(Tool): |
| | name = "analyze_csv_file" |
| | description = "Analyzes a CSV file and provides information about its contents." |
| | inputs = { |
| | "file_path": { |
| | "type": "string", |
| | "description": "Path to the CSV file." |
| | }, |
| | "query": { |
| | "type": "string", |
| | "description": "Optional query about the data.", |
| | "default": "", |
| | "nullable": True |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, file_path: str, query: str = "") -> str: |
| | try: |
| | |
| | for encoding in ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']: |
| | try: |
| | df = pd.read_csv(file_path, encoding=encoding) |
| | break |
| | except UnicodeDecodeError: |
| | continue |
| | else: |
| | return "Error: Could not read the CSV file with any of the attempted encodings." |
| | |
| | |
| | result = f"CSV file has {len(df)} rows and {len(df.columns)} columns.\n" |
| | result += f"Columns: {', '.join(df.columns)}\n\n" |
| | |
| | |
| | if query: |
| | if "count" in query.lower(): |
| | result += f"Row count: {len(df)}\n" |
| | |
| | |
| | for col in df.columns: |
| | if col.lower() in query.lower(): |
| | result += f"\nColumn '{col}' information:\n" |
| | if pd.api.types.is_numeric_dtype(df[col]): |
| | result += f"Min: {df[col].min()}\n" |
| | result += f"Max: {df[col].max()}\n" |
| | result += f"Mean: {df[col].mean()}\n" |
| | result += f"Median: {df[col].median()}\n" |
| | else: |
| | |
| | value_counts = df[col].value_counts().head(10) |
| | result += f"Unique values: {df[col].nunique()}\n" |
| | result += f"Top values:\n{value_counts.to_string()}\n" |
| | |
| | |
| | else: |
| | |
| | numeric_cols = df.select_dtypes(include=['number']).columns |
| | if len(numeric_cols) > 0: |
| | result += "Numeric columns statistics:\n" |
| | result += df[numeric_cols].describe().to_string() |
| | result += "\n\n" |
| | |
| | |
| | cat_cols = df.select_dtypes(exclude=['number']).columns |
| | if len(cat_cols) > 0: |
| | result += "Categorical columns:\n" |
| | for col in cat_cols[:5]: |
| | result += f"- {col}: {df[col].nunique()} unique values\n" |
| | |
| | return result |
| | except Exception as e: |
| | return f"Error analyzing CSV file: {str(e)}" |
| |
|
| | class AnalyzeExcelTool(Tool): |
| | name = "analyze_excel_file" |
| | description = "Analyzes an Excel file and provides information about its contents." |
| | inputs = { |
| | "file_path": { |
| | "type": "string", |
| | "description": "Path to the Excel file." |
| | }, |
| | "query": { |
| | "type": "string", |
| | "description": "Optional query about the data.", |
| | "default": "", |
| | "nullable": True |
| | }, |
| | "sheet_name": { |
| | "type": "string", |
| | "description": "Name of the sheet to analyze (defaults to first sheet).", |
| | "default": None, |
| | "nullable": True |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, file_path: str, query: str = "", sheet_name: str = None) -> str: |
| | try: |
| | |
| | excel_file = pd.ExcelFile(file_path) |
| | sheet_names = excel_file.sheet_names |
| | |
| | |
| | result = f"Excel file contains {len(sheet_names)} sheets: {', '.join(sheet_names)}\n\n" |
| | |
| | |
| | if sheet_name is None: |
| | sheet_name = sheet_names[0] |
| | elif sheet_name not in sheet_names: |
| | return f"Error: Sheet '{sheet_name}' not found. Available sheets: {', '.join(sheet_names)}" |
| | |
| | |
| | df = pd.read_excel(file_path, sheet_name=sheet_name) |
| | |
| | |
| | result += f"Sheet '{sheet_name}' has {len(df)} rows and {len(df.columns)} columns.\n" |
| | result += f"Columns: {', '.join(df.columns)}\n\n" |
| | |
| | |
| | if query: |
| | if "count" in query.lower(): |
| | result += f"Row count: {len(df)}\n" |
| | |
| | |
| | for col in df.columns: |
| | if col.lower() in query.lower(): |
| | result += f"\nColumn '{col}' information:\n" |
| | if pd.api.types.is_numeric_dtype(df[col]): |
| | result += f"Min: {df[col].min()}\n" |
| | result += f"Max: {df[col].max()}\n" |
| | result += f"Mean: {df[col].mean()}\n" |
| | result += f"Median: {df[col].median()}\n" |
| | else: |
| | |
| | value_counts = df[col].value_counts().head(10) |
| | result += f"Unique values: {df[col].nunique()}\n" |
| | result += f"Top values:\n{value_counts.to_string()}\n" |
| | else: |
| | |
| | numeric_cols = df.select_dtypes(include=['number']).columns |
| | if len(numeric_cols) > 0: |
| | result += "Numeric columns statistics:\n" |
| | result += df[numeric_cols].describe().to_string() |
| | result += "\n\n" |
| | |
| | |
| | cat_cols = df.select_dtypes(exclude=['number']).columns |
| | if len(cat_cols) > 0: |
| | result += "Categorical columns:\n" |
| | for col in cat_cols[:5]: |
| | result += f"- {col}: {df[col].nunique()} unique values\n" |
| | |
| | return result |
| | except Exception as e: |
| | return f"Error analyzing Excel file: {str(e)}" |
| |
|
| | class DateCalculatorTool(Tool): |
| | name = "date_calculator" |
| | description = "Performs date calculations like adding days, formatting dates, etc." |
| | inputs = { |
| | "query": { |
| | "type": "string", |
| | "description": "The date calculation to perform (e.g., 'What day is 10 days from today?', 'Format 2023-05-15 as MM/DD/YYYY')" |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, query: str) -> str: |
| | try: |
| | |
| | if re.search(r'(today|now|current date|current time)', query, re.IGNORECASE): |
| | now = datetime.now() |
| | |
| | if 'time' in query.lower(): |
| | return f"Current date and time: {now.strftime('%Y-%m-%d %H:%M:%S')}" |
| | else: |
| | return f"Today's date: {now.strftime('%Y-%m-%d')}" |
| | |
| | |
| | add_match = re.search(r'(what|when).+?(\d+)\s+(day|days|week|weeks|month|months|year|years)\s+(from|after)\s+(.+)', query, re.IGNORECASE) |
| | if add_match: |
| | amount = int(add_match.group(2)) |
| | unit = add_match.group(3).lower() |
| | date_text = add_match.group(5).strip() |
| | |
| | |
| | if date_text.lower() in ['today', 'now']: |
| | base_date = datetime.now() |
| | else: |
| | try: |
| | |
| | for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%B %d, %Y']: |
| | try: |
| | base_date = datetime.strptime(date_text, fmt) |
| | break |
| | except ValueError: |
| | continue |
| | else: |
| | return f"Could not parse date: {date_text}" |
| | except Exception as e: |
| | return f"Error parsing date: {e}" |
| | |
| | |
| | if 'day' in unit: |
| | new_date = base_date + timedelta(days=amount) |
| | elif 'week' in unit: |
| | new_date = base_date + timedelta(weeks=amount) |
| | elif 'month' in unit: |
| | |
| | new_month = base_date.month + amount |
| | new_year = base_date.year + (new_month - 1) // 12 |
| | new_month = ((new_month - 1) % 12) + 1 |
| | new_date = base_date.replace(year=new_year, month=new_month) |
| | elif 'year' in unit: |
| | new_date = base_date.replace(year=base_date.year + amount) |
| | |
| | return f"Date {amount} {unit} from {base_date.strftime('%Y-%m-%d')} is {new_date.strftime('%Y-%m-%d')}" |
| | |
| | |
| | format_match = re.search(r'format\s+(.+?)\s+as\s+(.+)', query, re.IGNORECASE) |
| | if format_match: |
| | date_text = format_match.group(1).strip() |
| | format_spec = format_match.group(2).strip() |
| | |
| | |
| | if date_text.lower() in ['today', 'now']: |
| | date_obj = datetime.now() |
| | else: |
| | try: |
| | |
| | for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%B %d, %Y']: |
| | try: |
| | date_obj = datetime.strptime(date_text, fmt) |
| | break |
| | except ValueError: |
| | continue |
| | else: |
| | return f"Could not parse date: {date_text}" |
| | except Exception as e: |
| | return f"Error parsing date: {e}" |
| | |
| | |
| | format_mapping = { |
| | 'YYYY': '%Y', |
| | 'YY': '%y', |
| | 'MM': '%m', |
| | 'DD': '%d', |
| | 'HH': '%H', |
| | 'mm': '%M', |
| | 'ss': '%S' |
| | } |
| | |
| | strftime_format = format_spec |
| | for key, value in format_mapping.items(): |
| | strftime_format = strftime_format.replace(key, value) |
| | |
| | return f"Formatted date: {date_obj.strftime(strftime_format)}" |
| | |
| | return "I couldn't understand the date calculation query." |
| | except Exception as e: |
| | return f"Error performing date calculation: {str(e)}" |
| |
|
| | class DownloadFileTool(Tool): |
| | name = "download_file" |
| | description = "Downloads a file from a URL and saves it locally." |
| | inputs = { |
| | "url": { |
| | "type": "string", |
| | "description": "The URL to download from." |
| | }, |
| | "filename": { |
| | "type": "string", |
| | "description": "Optional filename to save as (default: derived from URL).", |
| | "default": None, |
| | "nullable": True |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, url: str, filename: str = None) -> str: |
| | try: |
| | |
| | if not filename: |
| | path = urlparse(url).path |
| | filename = os.path.basename(path) |
| | if not filename: |
| | |
| | import uuid |
| | filename = f"downloaded_{uuid.uuid4().hex[:8]}" |
| | |
| | |
| | temp_dir = tempfile.gettempdir() |
| | filepath = os.path.join(temp_dir, filename) |
| | |
| | |
| | response = requests.get(url, stream=True) |
| | response.raise_for_status() |
| | |
| | |
| | with open(filepath, 'wb') as f: |
| | for chunk in response.iter_content(chunk_size=8192): |
| | f.write(chunk) |
| | |
| | return f"File downloaded to {filepath}. You can now analyze this file." |
| | except Exception as e: |
| | return f"Error downloading file: {str(e)}" |