Add new tools and functionalities for audio transcription, code execution, document handling, image processing, and mathematical operations
d303e2f
| from langchain_core.tools import tool | |
| from typing import List, Dict, Any, Optional | |
| import tempfile | |
| from urllib.parse import urlparse | |
| import os | |
| import uuid | |
| import requests | |
| from PIL import Image | |
| import pytesseract | |
| import pandas as pd | |
| def create_file_with_content(content: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Save content to a new file in a temporary directory and return the absolute file path. | |
| Args: | |
| content (str): The content to save to the file. | |
| filename (str, optional): The desired name of the file. If not provided, a random unique name will be generated. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| # Generate a unique filename to avoid collisions if no name is provided | |
| filename = f"file_{uuid.uuid4().hex[:8]}.txt" # Default to .txt if no extension in name | |
| filepath = os.path.join(temp_dir, filename) | |
| try: | |
| with open(filepath, "w", encoding='utf-8') as f: | |
| f.write(content) | |
| return filepath | |
| except Exception as e: | |
| return f"Error creating file {filepath}: {str(e)}" | |
| def read_file_content(file_path: str) -> str: | |
| """ | |
| Read the content of a specified file and return it as a string. | |
| Args: | |
| file_path (str): The absolute path to the file to be read. | |
| """ | |
| if not os.path.exists(file_path): | |
| return f"Error: File not found at {file_path}" | |
| if not os.path.isfile(file_path): | |
| return f"Error: Path {file_path} is not a file." | |
| try: | |
| with open(file_path, "r", encoding='utf-8') as f: | |
| content = f.read() | |
| return content | |
| except Exception as e: | |
| return f"Error reading file {file_path}: {str(e)}" | |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Download a file from a URL and save it to a temporary location. | |
| Args: | |
| url (str): the URL of the file to download. | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| try: | |
| print(f"Attempting to download file from {url}") | |
| # Parse URL to get filename if not provided | |
| if not filename: | |
| path = urlparse(url).path | |
| filename = os.path.basename(path) | |
| if not filename: | |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
| print(f"Will save as {filename}") | |
| # Create temporary file | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename) | |
| # Download the file with timeout and proper headers | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" | |
| } | |
| response = requests.get(url, stream=True, headers=headers, timeout=30) | |
| status_code = response.status_code | |
| print(f"Download request status code: {status_code}") | |
| response.raise_for_status() | |
| # Get content type for debugging | |
| content_type = response.headers.get('Content-Type', 'unknown') | |
| content_length = response.headers.get('Content-Length', 'unknown') | |
| print(f"Content type: {content_type}, Content length: {content_length}") | |
| # Save the file | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: # filter out keep-alive new chunks | |
| f.write(chunk) | |
| # Verify file was downloaded successfully | |
| if os.path.exists(filepath) and os.path.getsize(filepath) > 0: | |
| print(f"File successfully downloaded to {filepath} ({os.path.getsize(filepath)} bytes)") | |
| return filepath | |
| else: | |
| print(f"File download may have failed. File size: {os.path.getsize(filepath) if os.path.exists(filepath) else 'file does not exist'}") | |
| return "" | |
| except requests.exceptions.Timeout: | |
| print(f"Timeout error downloading file from {url}") | |
| return "" | |
| except requests.exceptions.HTTPError as e: | |
| print(f"HTTP error downloading file: {e}") | |
| return "" | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request error downloading file: {e}") | |
| return "" | |
| except Exception as e: | |
| print(f"Unexpected error downloading file: {str(e)}") | |
| return "" | |
| def extract_text_from_image(image_path: str) -> str: | |
| """ | |
| Extract text from an image using OCR library pytesseract (if available). | |
| Args: | |
| image_path (str): the path to the image file. | |
| """ | |
| try: | |
| # Open the image | |
| image = Image.open(image_path) | |
| # Extract text from the image | |
| text = pytesseract.image_to_string(image) | |
| return f"Extracted text from image:\n\n{text}" | |
| except Exception as e: | |
| return f"Error extracting text from image: {str(e)}" | |
| def analyze_csv_file(file_path: str, query: str) -> str: | |
| """ | |
| Reads a CSV file using pandas and returns a summary of its structure and content. | |
| The summary includes column names, data types, the first 5 rows, and descriptive statistics. | |
| Use this information to understand the data. | |
| For specific calculations or data manipulations based on the 'query' (e.g., summing columns, filtering rows, complex aggregations), | |
| you should use the 'execute_code_multilang' tool with Python pandas code that operates on the file_path. | |
| The 'query' argument here is for context and will be included in the summary. | |
| Args: | |
| file_path (str): The absolute path to the CSV file. | |
| query (str): The user's question about the data; use this to plan subsequent steps. | |
| """ | |
| try: | |
| # Read the CSV file | |
| df = pd.read_csv(file_path) | |
| result = f"CSV File Analysis for: {os.path.basename(file_path)}\n" | |
| result += f"Query: {query}\n\n" | |
| result += f"File loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| result += "First 5 rows:\n" | |
| result += df.head().to_string() + "\n\n" | |
| result += "Data types:\n" | |
| result += df.dtypes.to_string() + "\n\n" | |
| result += "Summary statistics (for numerical columns):\n" | |
| result += df.describe(include='number').to_string() + "\n\n" | |
| result += "Summary statistics (for object/categorical columns):\n" | |
| result += df.describe(include='object').to_string() + "\n" | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing CSV file {file_path}: {str(e)}" | |
| def analyze_excel_file(file_path: str, query: str) -> str: | |
| """ | |
| Reads an Excel file using pandas and returns a summary of its structure and content. | |
| The summary includes sheet names, column names, data types, the first 5 rows (of the first sheet), and descriptive statistics. | |
| It defaults to analyzing the first sheet. | |
| Use this information to understand the data. | |
| For specific calculations or data manipulations based on the 'query' (e.g., summing columns, filtering rows, complex aggregations), | |
| you should use the 'execute_code_multilang' tool with Python pandas code that operates on the file_path (and specifies a sheet if not the first). | |
| The 'query' argument here is for context and will be included in the summary. | |
| Args: | |
| file_path (str): The absolute path to the Excel file. | |
| query (str): The user's question about the data; use this to plan subsequent steps. | |
| """ | |
| try: | |
| # Read the Excel file | |
| # To handle multiple sheets, pandas reads the first sheet by default. | |
| # For more specific sheet analysis, the tool would need a sheet_name parameter. | |
| xls = pd.ExcelFile(file_path) | |
| sheet_names = xls.sheet_names | |
| result = f"Excel File Analysis for: {os.path.basename(file_path)}\n" | |
| result += f"Query: {query}\n" | |
| result += f"Available sheets: {', '.join(sheet_names)}\n\n" | |
| if not sheet_names: | |
| return f"Error: No sheets found in Excel file {file_path}" | |
| # Analyze the first sheet by default | |
| sheet_to_analyze = sheet_names[0] | |
| df = pd.read_excel(file_path, sheet_name=sheet_to_analyze) | |
| result += f"Analyzing sheet: '{sheet_to_analyze}'\n" | |
| result += f"Sheet loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| result += "First 5 rows:\n" | |
| result += df.head().to_string() + "\n\n" | |
| result += "Data types:\n" | |
| result += df.dtypes.to_string() + "\n\n" | |
| result += "Summary statistics (for numerical columns):\n" | |
| result += df.describe(include='number').to_string() + "\n\n" | |
| result += "Summary statistics (for object/categorical columns):\n" | |
| result += df.describe(include='object').to_string() + "\n" | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing Excel file {file_path}: {str(e)}" |