Spaces:
Running
Running
| # tools.py - Consolidated tools | |
| # Dependencies are included | |
| import base64 | |
| import cmath | |
| from datetime import datetime | |
| import io | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import re | |
| import shutil | |
| import sqlite3 | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from typing import Annotated, Any, Dict, List, Literal, Optional, Tuple, Union | |
| import urllib.parse | |
| import uuid | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| logger = logging.getLogger(__name__) | |
| # Check if we're running on Hugging Face Spaces | |
| HF_SPACES = os.environ.get("SPACE_ID") is not None | |
| from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont | |
| from pydantic import BaseModel, ConfigDict, Field, field_validator | |
| # Try to import matplotlib, but make it optional | |
| try: | |
| import matplotlib | |
| matplotlib.use("Agg") # Use non-interactive backend | |
| import matplotlib.pyplot as plt | |
| MATPLOTLIB_AVAILABLE = True | |
| except (ImportError, Exception) as e: | |
| MATPLOTLIB_AVAILABLE = False | |
| plt = None | |
| print(f"Warning: matplotlib not available: {e}") | |
| # Always import the tool decorator - it's essential | |
| from langchain_core.tools import InjectedToolArg, tool | |
| # Image generation engine (backed by the provider registry in | |
| # agent_ng.image_providers). The tool only knows the engine facade and the | |
| # default-model hint accessor — never the active model slug or provider. | |
| try: | |
| from agent_ng.image_engine import ImageEngine | |
| from agent_ng.image_models import get_default_prompt_style_hint | |
| except ImportError: # pragma: no cover | |
| ImageEngine = None # type: ignore[assignment,misc] | |
| get_default_prompt_style_hint = None # type: ignore[assignment] | |
| # Session-aware current_session_id (ContextVar fallback when agent not injected) | |
| try: | |
| from agent_ng.session_manager import get_current_session_id | |
| except ImportError: # pragma: no cover | |
| get_current_session_id = None # type: ignore[assignment] | |
| from .applications_tools.tool_list_applications import list_applications | |
| # Applications tools | |
| from .applications_tools.tool_list_templates import list_templates | |
| from .attributes_tools.tool_archive_or_unarchive_attribute import ( | |
| archive_or_unarchive_attribute, | |
| ) | |
| # Attributes tools - Utility functions | |
| from .attributes_tools.tool_delete_attribute import delete_attribute | |
| from .attributes_tools.tool_get_attribute import get_attribute | |
| from .attributes_tools.tools_account_attribute import edit_or_create_account_attribute | |
| from .attributes_tools.tools_boolean_attribute import edit_or_create_boolean_attribute | |
| # Attributes tools - Other attribute types | |
| from .attributes_tools.tools_datetime_attribute import ( | |
| edit_or_create_date_time_attribute, | |
| ) | |
| from .attributes_tools.tools_decimal_attribute import edit_or_create_numeric_attribute | |
| from .attributes_tools.tools_document_attribute import edit_or_create_document_attribute | |
| from .attributes_tools.tools_drawing_attribute import edit_or_create_drawing_attribute | |
| from .attributes_tools.tools_duration_attribute import edit_or_create_duration_attribute | |
| from .attributes_tools.tools_enum_attribute import edit_or_create_enum_attribute | |
| from .attributes_tools.tools_image_attribute import edit_or_create_image_attribute | |
| from .attributes_tools.tools_record_attribute import edit_or_create_record_attribute | |
| from .attributes_tools.tools_role_attribute import edit_or_create_role_attribute | |
| # Attributes tools - Text attributes | |
| from .attributes_tools.tools_text_attribute import edit_or_create_text_attribute | |
| # Datetime tool | |
| from .get_datetime import get_current_datetime | |
| # Expose Comindware Platform tools from all directories | |
| # Templates tools | |
| from .templates_tools.tool_list_attributes import list_attributes | |
| from .templates_tools.tool_list_records import list_template_records | |
| # Templates tools | |
| from .templates_tools.tools_record_template import edit_or_create_record_template | |
| # Attributes tools - Text attributes | |
| from .attributes_tools.tools_text_attribute import edit_or_create_text_attribute | |
| # Attributes tools - Other attribute types | |
| from .attributes_tools.tools_datetime_attribute import edit_or_create_date_time_attribute | |
| from .attributes_tools.tools_decimal_attribute import edit_or_create_numeric_attribute | |
| from .attributes_tools.tools_record_attribute import edit_or_create_record_attribute | |
| from .attributes_tools.tools_image_attribute import edit_or_create_image_attribute | |
| from .attributes_tools.tools_drawing_attribute import edit_or_create_drawing_attribute | |
| from .attributes_tools.tools_document_attribute import edit_or_create_document_attribute | |
| from .attributes_tools.tools_duration_attribute import edit_or_create_duration_attribute | |
| from .attributes_tools.tools_account_attribute import edit_or_create_account_attribute | |
| from .attributes_tools.tools_boolean_attribute import edit_or_create_boolean_attribute | |
| from .attributes_tools.tools_role_attribute import edit_or_create_role_attribute | |
| from .attributes_tools.tools_enum_attribute import edit_or_create_enum_attribute | |
| # Attributes tools - Utility functions | |
| from .attributes_tools.tool_delete_attribute import delete_attribute | |
| from .attributes_tools.tool_archive_or_unarchive_attribute import archive_or_unarchive_attribute | |
| from .attributes_tools.tool_get_attribute import get_attribute | |
| # Datetime tool | |
| from .get_datetime import get_current_datetime | |
| # NOTE: Browser automation tools (tools/browser_tools.py, agent_ng/browser_session.py) | |
| # are intentionally NOT bound to the agent. They are kept for external/standalone use. | |
| # See .agents/skills/cmw-platform/SKILL.md section "Browser Automation" | |
| # Global configuration for search tools | |
| SEARCH_LIMIT = 5 # Maximum number of results for all search tools (Tavily, Wikipedia, Arxiv) | |
| # LangChain imports for search tools | |
| try: | |
| from langchain_tavily import TavilySearch | |
| TAVILY_AVAILABLE = True | |
| except ImportError: | |
| TAVILY_AVAILABLE = False | |
| print("Warning: TavilySearch not available. Install with: pip install langchain-tavily") | |
| # Try to import wikipedia-api as it's a common dependency | |
| try: | |
| import wikipedia | |
| WIKIPEDIA_AVAILABLE = True | |
| except ImportError as e: | |
| WIKIPEDIA_AVAILABLE = False | |
| print(f"Wikipedia search requires additional dependencies. Install with: pip install wikipedia-api. Error: {str(e)}") | |
| try: | |
| from langchain_community.document_loaders import WikipediaLoader | |
| WIKILOADER_AVAILABLE = True | |
| except ImportError: | |
| WIKILOADER_AVAILABLE = False | |
| print("Warning: WikipediaLoader not available. Install with: pip install langchain-community") | |
| # Try to import arxiv as it's a common dependency | |
| try: | |
| import arxiv | |
| ARXIV_AVAILABLE = True | |
| except ImportError as e: | |
| ARXIV_AVAILABLE = False | |
| print(f"Arxiv search requires additional dependencies. Install with: pip install arxiv. Error: {str(e)}") | |
| # Try to import PyMuPDF for PDF processing | |
| try: | |
| import fitz # PyMuPDF | |
| PYMUPDF_AVAILABLE = True | |
| except ImportError: | |
| PYMUPDF_AVAILABLE = False | |
| print("Warning: PyMuPDF not available. Install with: pip install pymupdf") | |
| try: | |
| from langchain_community.document_loaders import ArxivLoader | |
| ARXIVLOADER_AVAILABLE = True | |
| except ImportError: | |
| ARXIVLOADER_AVAILABLE = False | |
| print("Warning: ArxivLoader not available. Install with: pip install langchain-community") | |
| # Optional: Exa deep research (web_search_deep_research_exa_ai returns a clear JSON error if exa-py is missing) | |
| try: | |
| from exa_py import Exa | |
| EXA_AVAILABLE = True | |
| except ImportError: | |
| EXA_AVAILABLE = False | |
| # Google Gemini imports for video/audio understanding | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| print("Warning: Google Gemini not available. Install with: pip install google-genai") | |
| # ========== GEMINI HELPER FUNCTIONS ========== | |
| def _get_gemini_client(): | |
| """ | |
| Initialize and return a Gemini client with proper error handling. | |
| Args: | |
| model_name (str, optional): The Gemini model to use. If None, defaults to gemini-2.5-flash. | |
| Returns: | |
| client or None: The Gemini client if initialization succeeds, None otherwise. | |
| """ | |
| if not GEMINI_AVAILABLE: | |
| print("Warning: Google Gemini not available. Install with: pip install google-genai") | |
| return None | |
| try: | |
| gemini_key = os.environ.get("GEMINI_KEY") | |
| if not gemini_key: | |
| print("Warning: GEMINI_KEY not found in environment variables.") | |
| return None | |
| client = genai.Client(api_key=gemini_key) | |
| return client | |
| except Exception as e: | |
| print(f"Error initializing Gemini client: {str(e)}") | |
| return None | |
| def _get_gemini_response(prompt, error_prefix="Gemini", model_name="gemini-2.5-flash"): | |
| """ | |
| Get a response from Gemini with proper error handling. | |
| Args: | |
| prompt: The prompt to send to Gemini | |
| error_prefix (str): Prefix for error messages to identify the calling context | |
| model_name (str, optional): The Gemini model to use. | |
| Returns: | |
| str: The Gemini response text, or an error message if the request fails. | |
| """ | |
| client = _get_gemini_client() | |
| if not client: | |
| return f"{error_prefix} client not available. Check installation and API key configuration." | |
| try: | |
| response = client.models.generate_content( | |
| model=model_name, | |
| contents=prompt | |
| ) | |
| return response.text | |
| except Exception as e: | |
| return f"Error in {error_prefix.lower()} request: {str(e)}" | |
| # ========== IMAGE PROCESSING HELPERS ========== | |
| def encode_image(image_path: str) -> str: | |
| """ | |
| Convert an image file to a base64-encoded string. | |
| Args: | |
| image_path (str): The path to the image file to encode. | |
| Returns: | |
| str: The base64-encoded string representation of the image file. | |
| """ | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode("utf-8") | |
| def decode_image(base64_string: str) -> Any: | |
| """ | |
| Convert a base64-encoded string to a PIL Image object. | |
| Args: | |
| base64_string (str): The base64-encoded string representing the image. | |
| Returns: | |
| Any: The decoded PIL Image object. | |
| """ | |
| image_data = base64.b64decode(base64_string) | |
| return Image.open(io.BytesIO(image_data)) | |
| def save_image(image: Any, directory: str = "image_outputs") -> str: | |
| """ | |
| Save a PIL Image object to disk in the specified directory and return the file path. | |
| Args: | |
| image (Any): The PIL Image object to save. | |
| directory (str, optional): The directory to save the image in. Defaults to "image_outputs". | |
| Returns: | |
| str: The file path where the image was saved. | |
| """ | |
| os.makedirs(directory, exist_ok=True) | |
| image_id = str(uuid.uuid4()) | |
| image_path = os.path.join(directory, f"{image_id}.png") | |
| image.save(image_path) | |
| return image_path | |
| # ========== CODE INTERPRETER ========== | |
| class CodeInterpreter: | |
| """ | |
| A code interpreter for executing code in various languages (Python, Bash, SQL, C, Java) with safety and resource controls. | |
| Args: | |
| allowed_modules (list, optional): List of allowed module names for Python execution. | |
| max_execution_time (int, optional): Maximum execution time in seconds for code blocks. | |
| working_directory (str, optional): Directory for temporary files and execution context. | |
| Attributes: | |
| globals (dict): Global variables for code execution. | |
| temp_sqlite_db (str): Path to a temporary SQLite database for SQL code. | |
| """ | |
| def __init__(self, allowed_modules=None, max_execution_time=30, working_directory=None): | |
| self.allowed_modules = allowed_modules or [ | |
| "numpy", "pandas", "matplotlib", "scipy", "sklearn", | |
| "math", "random", "statistics", "datetime", "collections", | |
| "itertools", "functools", "operator", "re", "json", | |
| "sympy", "networkx", "nltk", "PIL", | |
| "cmath", "uuid", "tempfile", "requests", "urllib" | |
| ] | |
| self.max_execution_time = max_execution_time | |
| self.working_directory = working_directory or os.path.join(os.getcwd()) | |
| if not os.path.exists(self.working_directory): | |
| os.makedirs(self.working_directory) | |
| # Use global imports that are already available | |
| self.globals = { | |
| "__builtins__": __builtins__, | |
| "np": np, | |
| "pd": pd, | |
| "Image": Image, | |
| } | |
| # Only add plt to globals if it's available | |
| if MATPLOTLIB_AVAILABLE: | |
| self.globals["plt"] = plt | |
| self.temp_sqlite_db = os.path.join(tempfile.gettempdir(), "code_exec.db") | |
| def execute_code(self, code: str, language: str = "python") -> dict[str, Any]: | |
| """ | |
| Execute code in the specified language with safety controls. | |
| Args: | |
| code (str): The source code to execute | |
| language (str): The programming language | |
| Returns: | |
| Dict containing execution results, status, and outputs | |
| """ | |
| try: | |
| if language.lower() == "python": | |
| return self._execute_python(code) | |
| elif language.lower() == "bash": | |
| return self._execute_bash(code) | |
| elif language.lower() == "sql": | |
| return self._execute_sql(code) | |
| elif language.lower() == "c": | |
| return self._execute_c(code) | |
| elif language.lower() == "java": | |
| return self._execute_java(code) | |
| else: | |
| return {"status": "error", "stderr": f"Unsupported language: {language}"} | |
| except Exception as e: | |
| return {"status": "error", "stderr": str(e)} | |
| def _execute_python(self, code: str) -> dict[str, Any]: | |
| """Execute Python code with safety controls.""" | |
| try: | |
| # Capture stdout and stderr | |
| # Create string buffers to capture output | |
| stdout_buffer = io.StringIO() | |
| stderr_buffer = io.StringIO() | |
| # Store original stdout/stderr | |
| old_stdout = sys.stdout | |
| old_stderr = sys.stderr | |
| # Redirect stdout/stderr to our buffers | |
| sys.stdout = stdout_buffer | |
| sys.stderr = stderr_buffer | |
| try: | |
| # Create a copy of globals for this execution | |
| local_globals = self.globals.copy() | |
| local_globals["__name__"] = "__main__" | |
| # Execute the code | |
| exec(code, local_globals) | |
| # Get captured output | |
| stdout_content = stdout_buffer.getvalue() | |
| stderr_content = stderr_buffer.getvalue() | |
| # Capture any variables that might be dataframes or plots | |
| result = {"status": "success", "stdout": stdout_content, "stderr": stderr_content, "result": None} | |
| # Check for dataframes | |
| dataframes = [] | |
| for name, value in local_globals.items(): | |
| if isinstance(value, pd.DataFrame): | |
| dataframes.append({ | |
| "name": name, | |
| "shape": value.shape, | |
| "head": value.head().to_dict("records") | |
| }) | |
| if dataframes: | |
| result["dataframes"] = dataframes | |
| # Check for plots (only if matplotlib is available) | |
| plots = [] | |
| if MATPLOTLIB_AVAILABLE and plt is not None: | |
| try: | |
| # Save any current plots | |
| if plt.get_fignums(): | |
| for fig_num in plt.get_fignums(): | |
| fig = plt.figure(fig_num) | |
| plot_path = os.path.join(self.working_directory, f"plot_{fig_num}.png") | |
| fig.savefig(plot_path) | |
| plots.append(plot_path) | |
| plt.close(fig) | |
| except Exception as plot_error: | |
| # If plot handling fails, just continue without plots | |
| print(f"Warning: Plot handling failed: {plot_error}") | |
| if plots: | |
| result["plots"] = plots | |
| return result | |
| finally: | |
| # Restore original stdout/stderr | |
| sys.stdout = old_stdout | |
| sys.stderr = old_stderr | |
| stdout_buffer.close() | |
| stderr_buffer.close() | |
| except Exception as e: | |
| return {"status": "error", "stderr": str(e)} | |
| def _execute_bash(self, code: str) -> dict[str, Any]: | |
| """Execute Bash code.""" | |
| if HF_SPACES: | |
| return {"status": "error", "stderr": "Bash execution not available on Hugging Face Spaces"} | |
| try: | |
| result = subprocess.run( | |
| code, | |
| check=False, shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=self.max_execution_time | |
| ) | |
| return { | |
| "status": "success" if result.returncode == 0 else "error", | |
| "stdout": result.stdout, | |
| "stderr": result.stderr, | |
| "returncode": result.returncode | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"status": "error", "stderr": "Execution timed out"} | |
| except Exception as e: | |
| return {"status": "error", "stderr": str(e)} | |
| def _execute_sql(self, code: str) -> dict[str, Any]: | |
| """Execute SQL code using SQLite.""" | |
| try: | |
| conn = sqlite3.connect(self.temp_sqlite_db) | |
| cursor = conn.cursor() | |
| # Execute SQL | |
| cursor.execute(code) | |
| # Fetch results if it's a SELECT | |
| if code.strip().upper().startswith("SELECT"): | |
| results = cursor.fetchall() | |
| columns = [description[0] for description in cursor.description] | |
| result = {"status": "success", "results": results, "columns": columns} | |
| else: | |
| conn.commit() | |
| result = {"status": "success", "message": f"Executed: {code}"} | |
| conn.close() | |
| return result | |
| except Exception as e: | |
| return {"status": "error", "stderr": str(e)} | |
| def _execute_c(self, code: str) -> dict[str, Any]: | |
| """Execute C code by compiling and running.""" | |
| if HF_SPACES: | |
| return {"status": "error", "stderr": "C code execution not available on Hugging Face Spaces"} | |
| try: | |
| # Create temporary C file | |
| c_file = os.path.join(self.working_directory, "temp_code.c") | |
| with open(c_file, "w") as f: | |
| f.write(code) | |
| # Compile | |
| compile_result = subprocess.run( | |
| ["gcc", "-o", os.path.join(self.working_directory, "temp_program"), c_file], | |
| check=False, capture_output=True, | |
| text=True | |
| ) | |
| if compile_result.returncode != 0: | |
| return {"status": "error", "stderr": f"Compilation failed: {compile_result.stderr}"} | |
| # Run | |
| run_result = subprocess.run( | |
| [os.path.join(self.working_directory, "temp_program")], | |
| check=False, capture_output=True, | |
| text=True, | |
| timeout=self.max_execution_time | |
| ) | |
| return { | |
| "status": "success", | |
| "stdout": run_result.stdout, | |
| "stderr": run_result.stderr, | |
| "returncode": run_result.returncode | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"status": "error", "stderr": "Execution timed out"} | |
| except Exception as e: | |
| return {"status": "error", "stderr": str(e)} | |
| def _execute_java(self, code: str) -> dict[str, Any]: | |
| """Execute Java code by compiling and running.""" | |
| if HF_SPACES: | |
| return {"status": "error", "stderr": "Java code execution not available on Hugging Face Spaces"} | |
| try: | |
| # Create temporary Java file | |
| java_file = os.path.join(self.working_directory, "TempCode.java") | |
| with open(java_file, "w") as f: | |
| f.write(code) | |
| # Compile | |
| compile_result = subprocess.run( | |
| ["javac", java_file], | |
| check=False, capture_output=True, | |
| text=True | |
| ) | |
| if compile_result.returncode != 0: | |
| return {"status": "error", "stderr": f"Compilation failed: {compile_result.stderr}"} | |
| # Run | |
| run_result = subprocess.run( | |
| ["java", "-cp", self.working_directory, "TempCode"], | |
| check=False, capture_output=True, | |
| text=True, | |
| timeout=self.max_execution_time | |
| ) | |
| return { | |
| "status": "success", | |
| "stdout": run_result.stdout, | |
| "stderr": run_result.stderr, | |
| "returncode": run_result.returncode | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"status": "error", "stderr": "Execution timed out"} | |
| except Exception as e: | |
| return {"status": "error", "stderr": str(e)} | |
| # Create a global instance for use by tools | |
| interpreter_instance = CodeInterpreter() | |
| def execute_code_multilang(code_reference: str, language: str = "python", agent=None) -> str: | |
| """Execute code in multiple languages (Python, Bash, SQL, C, Java) and return results. | |
| Args: | |
| code_reference (str): The source code to execute, filename, or URL to download code from. | |
| language (str): The language of the code. Supported: "python", "bash", "sql", "c", "java". | |
| agent: Agent instance for file resolution (injected automatically). | |
| Returns: | |
| A string summarizing the execution results (stdout, stderr, errors, plots, dataframes if any). | |
| """ | |
| from .file_utils import FileUtils | |
| # Resolve code reference (code content, filename, or URL) | |
| code_content, detected_language = FileUtils.resolve_code_input(code_reference, agent) | |
| # Use detected language or provided language | |
| final_language = (detected_language or language).lower() | |
| supported_languages = ["python", "bash", "sql", "c", "java"] | |
| if final_language not in supported_languages: | |
| return FileUtils.create_tool_response( | |
| "execute_code_multilang", | |
| error=f"❌ Unsupported language: {final_language}. Supported languages are: {', '.join(supported_languages)}" | |
| ) | |
| result = interpreter_instance.execute_code(code_content, language=final_language) | |
| response = [] | |
| if result["status"] == "success": | |
| response.append(f"✅ Code executed successfully in **{final_language.upper()}**") | |
| if result.get("stdout"): | |
| response.append( | |
| "\n**Standard Output:**\n```\n" + result["stdout"].strip() + "\n```" | |
| ) | |
| if result.get("stderr"): | |
| response.append( | |
| "\n**Standard Error (if any):**\n```\n" | |
| + result["stderr"].strip() | |
| + "\n```" | |
| ) | |
| if result.get("result") is not None: | |
| response.append( | |
| "\n**Execution Result:**\n```\n" | |
| + str(result["result"]).strip() | |
| + "\n```" | |
| ) | |
| if result.get("dataframes"): | |
| for df_info in result["dataframes"]: | |
| response.append( | |
| f"\n**DataFrame `{df_info['name']}` (Shape: {df_info['shape']})**" | |
| ) | |
| df_preview = pd.DataFrame(df_info["head"]) | |
| response.append("First 5 rows:\n```\n" + str(df_preview) + "\n```") | |
| if result.get("plots"): | |
| response.append( | |
| f"\n**Generated {len(result['plots'])} plot(s)** (Image data returned separately)" | |
| ) | |
| else: | |
| response.append(f"❌ Code execution failed in **{final_language.upper()}**") | |
| if result.get("stderr"): | |
| response.append( | |
| "\n**Error Log:**\n```\n" + result["stderr"].strip() + "\n```" | |
| ) | |
| return FileUtils.create_tool_response("execute_code_multilang", result="\n".join(response)) | |
| # ========== MATH TOOLS ========== | |
| def multiply(a: float, b: float) -> float: | |
| """ | |
| Multiply two numbers and return the result. | |
| Args: | |
| a (float): The first number. | |
| b (float): The second number. | |
| Returns: | |
| float: The product of a and b. | |
| """ | |
| return a * b | |
| def add(a: float, b: float) -> float: | |
| """ | |
| Add two numbers and return the result. | |
| Args: | |
| a (float): The first number. | |
| b (float): The second number. | |
| Returns: | |
| float: The sum of a and b. | |
| """ | |
| return a + b | |
| def subtract(a: float, b: float) -> float: | |
| """ | |
| Subtract the second number from the first and return the result. | |
| Args: | |
| a (float): The number to subtract from. | |
| b (float): The number to subtract. | |
| Returns: | |
| float: The result of a - b. | |
| """ | |
| return a - b | |
| def divide(a: float, b: float) -> float: | |
| """ | |
| Divide the first number by the second and return the result. | |
| Args: | |
| a (float): The numerator. | |
| b (float): The denominator. Must not be zero. | |
| Returns: | |
| float: The quotient of a and b. | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """ | |
| Compute the modulus (remainder) of two integers. | |
| Args: | |
| a (int): The dividend. | |
| b (int): The divisor. | |
| Returns: | |
| int: The remainder when a is divided by b. | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero") | |
| return a % b | |
| def power(a: float, b: float) -> float: | |
| """ | |
| Raise the first number to the power of the second and return the result. | |
| Args: | |
| a (float): The base number. | |
| b (float): The exponent. | |
| Returns: | |
| float: a raised to the power of b. | |
| """ | |
| return a ** b | |
| def square_root(a: float) -> float: | |
| """ | |
| Compute the square root of a number. Returns a complex number if input is negative. | |
| Args: | |
| a (float): The number to compute the square root of. | |
| Returns: | |
| float or complex: The square root of a. If a < 0, returns a complex number. | |
| """ | |
| if a >= 0: | |
| return a ** 0.5 | |
| return cmath.sqrt(a) | |
| # ========== WEB/SEARCH TOOLS ========== | |
| def wiki_search(input: str) -> str: | |
| """ | |
| Search Wikipedia for a query and return up to 3 results as formatted text. | |
| Args: | |
| input (str): The search query string for Wikipedia. | |
| Returns: | |
| str: Formatted search results from Wikipedia with source information and content. | |
| """ | |
| try: | |
| if not WIKILOADER_AVAILABLE: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "wiki_search", | |
| "error": "Wikipedia search not available. Install with: pip install langchain-community" | |
| }) | |
| search_docs = WikipediaLoader(query=input, load_max_docs=SEARCH_LIMIT).load() | |
| formatted_results = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "wiki_search", | |
| "wiki_results": formatted_results | |
| }) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "wiki_search", | |
| "error": f"Error in Wikipedia search: {str(e)}" | |
| }) | |
| def web_search(input: str) -> str: | |
| """ | |
| Search the web using Tavily for a query and return up to 3 results as formatted text. | |
| Tavily is a search API that provides real-time web search results. This tool is useful for: | |
| - Finding current information about recent events | |
| - Searching for specific facts, statistics, or data | |
| - Getting up-to-date information from various websites | |
| - Researching topics that may not be covered in Wikipedia or academic papers | |
| Args: | |
| input (str): The search query string to search for on the web. | |
| Returns: | |
| str: Formatted search results from Tavily with source URLs and content snippets. | |
| Returns an error message if Tavily is not available or if the search fails. | |
| """ | |
| if not TAVILY_AVAILABLE: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search", | |
| "error": "Tavily search not available. Install with: pip install langchain-tavily" | |
| }) | |
| try: | |
| if not os.environ.get("TAVILY_API_KEY"): | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search", | |
| "error": "TAVILY_API_KEY not found in environment variables. Please set it in your .env file." | |
| }) | |
| search_result = TavilySearch(max_results=SEARCH_LIMIT).invoke(input) | |
| # Handle different response types | |
| if isinstance(search_result, str): | |
| # If Tavily returned a string (error message or direct answer) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search", | |
| "web_results": search_result | |
| }) | |
| elif isinstance(search_result, list): | |
| # If Tavily returned a list of Document objects | |
| formatted_results = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}' | |
| for doc in search_result | |
| ] | |
| ) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search", | |
| "web_results": formatted_results | |
| }) | |
| else: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search", | |
| "web_results": str(search_result) | |
| }) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search", | |
| "error": f"Error in web search: {str(e)}" | |
| }) | |
| def arxiv_search(input: str) -> str: | |
| """ | |
| Search Arxiv for academic papers and return up to 3 results as formatted text. | |
| Args: | |
| input (str): The search query string for academic papers. | |
| Returns: | |
| str: Formatted search results from Arxiv with paper metadata and abstracts. | |
| """ | |
| try: | |
| if not ARXIVLOADER_AVAILABLE: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "arxiv_search", | |
| "error": "Arxiv search not available. Install with: pip install langchain-community" | |
| }) | |
| if not PYMUPDF_AVAILABLE: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "arxiv_search", | |
| "error": "PyMuPDF package not found, please install it with pip install pymupdf" | |
| }) | |
| search_docs = ArxivLoader(query=input, load_max_docs=SEARCH_LIMIT).load() | |
| formatted_results = "\n\n---\n\n".join( | |
| [ | |
| f'<Document title="{doc.metadata.get("Title", "Unknown")}" authors="{doc.metadata.get("Authors", "Unknown")}" published="{doc.metadata.get("Published", "Unknown")}"/>\n{doc.page_content}' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "arxiv_search", | |
| "arxiv_results": formatted_results | |
| }) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "arxiv_search", | |
| "error": f"Error in Arxiv search: {str(e)}" | |
| }) | |
| # ========== FILE/DATA TOOLS ========== | |
| def read_text_based_file( | |
| file_reference: str, | |
| read_html_as_markdown: bool = True, | |
| extract_images: bool = False, | |
| agent=None, | |
| ) -> str: | |
| """ | |
| Read text-based files and return content as text. | |
| This is the general-purpose text file reader for most formats. | |
| Use this tool when you need file content - for specialized analysis, use dedicated tools. | |
| Supported file types: | |
| - Text files: .txt, .md, .log, .rtf | |
| - Code files: .py, .js, .ts, .html, .htm, .css, .sql, .java, .cpp, .c, .php, .rb, .go | |
| - Configuration files: .ini, .cfg, .conf, .env, .properties, .yaml, .yml | |
| - Structured text: .json, .xml, .svg | |
| - Web files: .html, .htm (see HTML options below) | |
| - Documentation: .md, .rst, .tex | |
| - Office documents: .docx, .xlsx, .pptx (converted to Markdown for clean text extraction) | |
| - PDF files: .pdf (extracts text content as Markdown) | |
| HTML-specific options: | |
| - read_html_as_markdown (bool, default=True): Converts HTML to Markdown to save tokens | |
| and improve readability. Set to False only when you need the raw HTML structure | |
| (e.g., extracting specific tags, attributes, CSS, or JavaScript). | |
| Image extraction options: | |
| - extract_images (bool, default=False): For PDF and Office documents. If True, | |
| extracts embedded images and includes them in the response. Images are saved | |
| to temporary files and can be referenced in future turns. Set to True only | |
| when you need the images - saves tokens and processing time when False. | |
| XLSX/Excel: When to use this tool vs analyze_excel_file? | |
| - read_text_based_file: Use for simple text extraction - reads Excel content as Markdown | |
| tables. Good for understanding document structure and reading text content. | |
| - analyze_excel_file: Use when you need data analysis, statistics, pandas operations, | |
| charts, or need to query/transform the data. | |
| The tool automatically: | |
| - Detects file encoding and handles multiple encodings (UTF-8, Latin-1, CP1252, ISO-8859-1) | |
| - Resolves filenames to full file paths | |
| - Downloads files from URLs | |
| - Provides file metadata (name, size, encoding) in results | |
| Args: | |
| file_reference (str): Original filename from user upload OR URL to download | |
| read_html_as_markdown (bool): For HTML files. If True (default), converts HTML to | |
| Markdown for token efficiency and readability. Set False only when raw HTML | |
| structure is needed. | |
| extract_images (bool): For PDF/Office files. If True, extracts embedded images | |
| along with text. Default False (backward compatible). | |
| agent: Agent instance for file resolution (injected automatically) | |
| Returns: | |
| str: The text content of the file with metadata, or an error message if reading fails | |
| """ | |
| from .file_utils import FileUtils | |
| from .local_path_text import read_local_path_to_plain_text | |
| ref = (file_reference or "").strip() | |
| file_path = FileUtils.resolve_file_reference(ref, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response("read_text_based_file", error=f"File not found: {ref}") | |
| file_info = FileUtils.get_file_info(file_path) | |
| if not file_info.exists: | |
| return FileUtils.create_tool_response("read_text_based_file", error=file_info.error) | |
| content, read_err, enc, image_paths, markdown_path = read_local_path_to_plain_text( | |
| file_path, read_html_as_markdown=read_html_as_markdown, extract_images=extract_images, _file_info=file_info | |
| ) | |
| if read_err: | |
| return FileUtils.create_tool_response( | |
| "read_text_based_file", | |
| error=read_err, | |
| file_info=FileUtils.file_info_for_tool_response(file_info, ref), | |
| ) | |
| display_name = ref | |
| size_str = FileUtils.format_file_size(file_info.size) | |
| extl = (file_info.extension or "").lower() | |
| if extl == ".html" and read_html_as_markdown: | |
| header = f"File: {display_name} ({size_str}, converted to Markdown)" | |
| elif extl == ".html" and not read_html_as_markdown: | |
| header = f"File: {display_name} ({size_str}, raw HTML)" | |
| elif enc and enc != "utf-8": | |
| header = f"File: {display_name} ({size_str}, {enc} encoding)" | |
| else: | |
| header = f"File: {display_name} ({size_str})" | |
| result_text = f"{header}\n\nContent:\n{content}" | |
| extra = {} | |
| if image_paths and agent is not None: | |
| registered_names = [] | |
| base_name = Path(ref).stem | |
| for idx, abs_path in enumerate(image_paths): | |
| try: | |
| logical_name = f"{base_name}_image_{idx + 1}{Path(abs_path).suffix}" | |
| agent.register_file(logical_name, abs_path) | |
| registered_names.append(logical_name) | |
| logger.debug("Registered extracted image: %s -> %s", logical_name, abs_path) | |
| except Exception as reg_err: | |
| logger.warning("Failed to register image %s: %s", abs_path, reg_err) | |
| if registered_names: | |
| extra["image_paths"] = registered_names | |
| elif image_paths: | |
| extra["image_paths"] = image_paths | |
| if markdown_path and agent is not None: | |
| try: | |
| md_name = f"{Path(ref).stem}_extracted.md" | |
| agent.register_file(md_name, markdown_path) | |
| extra["markdown_path"] = md_name | |
| logger.debug("Registered markdown: %s -> %s", md_name, markdown_path) | |
| except Exception as reg_err: | |
| logger.warning("Failed to register markdown: %s", reg_err) | |
| if markdown_path: | |
| extra["markdown_path"] = markdown_path | |
| elif markdown_path: | |
| extra["markdown_path"] = markdown_path | |
| return FileUtils.create_tool_response( | |
| "read_text_based_file", | |
| result=result_text, | |
| file_info=FileUtils.file_info_for_tool_response(file_info, ref), | |
| extra=extra if extra else None, | |
| ) | |
| # ========== PANDAS QUERY/PIPELINE HELPERS ========== | |
| def _safe_to_markdown(df: pd.DataFrame, max_rows: int = 10, max_cols: int = 20) -> str: | |
| preview_df = df.head(max_rows) | |
| if max_cols is not None: | |
| preview_df = preview_df.iloc[:, :max_cols] | |
| try: | |
| return preview_df.to_markdown(index=False) | |
| except Exception: | |
| return preview_df.to_string(index=False) | |
| def _dataframe_schema(df: pd.DataFrame) -> dict[str, str]: | |
| return {str(col): str(dtype) for col, dtype in df.dtypes.items()} | |
| def _truncate_records(df: pd.DataFrame, max_rows: int = 100, max_cols: int = 50, max_cell_chars: int = 500) -> list[dict[str, Any]]: | |
| limited = df.head(max_rows) | |
| if max_cols is not None: | |
| limited = limited.iloc[:, :max_cols] | |
| def _truncate_val(v: Any) -> Any: | |
| try: | |
| s = str(v) | |
| except Exception: | |
| return v | |
| if len(s) > max_cell_chars: | |
| return s[: max_cell_chars - 1] + "…" | |
| return v | |
| return [{k: _truncate_val(v) for k, v in row.items()} for row in limited.to_dict(orient="records")] | |
| _ALLOWED_OPS: dict[str, Literal["df_method", "special"]] = { | |
| "query": "df_method", | |
| "assign": "df_method", | |
| "rename": "df_method", | |
| "drop": "df_method", | |
| "dropna": "df_method", | |
| "fillna": "df_method", | |
| "astype": "df_method", | |
| "sort_values": "df_method", | |
| "head": "df_method", | |
| "tail": "df_method", | |
| "sample": "df_method", | |
| "value_counts": "df_method", | |
| "nlargest": "df_method", | |
| "nsmallest": "df_method", | |
| "reset_index": "df_method", | |
| "set_index": "df_method", | |
| "pivot_table": "df_method", | |
| "melt": "df_method", | |
| "stack": "df_method", | |
| "unstack": "df_method", | |
| "groupby": "special", | |
| } | |
| def _coerce_tabular(obj: Any, step_name: str) -> pd.DataFrame: | |
| if isinstance(obj, pd.DataFrame): | |
| return obj | |
| if isinstance(obj, pd.Series): | |
| return obj.to_frame(name=step_name or "value").reset_index() | |
| return pd.DataFrame(obj) | |
| def _dispatch_pipeline(df: pd.DataFrame, steps: list[dict[str, Any]]) -> pd.DataFrame: | |
| current = df | |
| for i, step in enumerate(steps): | |
| if not isinstance(step, dict): | |
| raise ValueError(f"Pipeline step {i} must be an object") | |
| op = step.get("op") | |
| if not isinstance(op, str) or op.startswith("__"): | |
| raise ValueError(f"Invalid op at step {i}") | |
| kind = _ALLOWED_OPS.get(op) | |
| if kind is None: | |
| raise ValueError(f"Op '{op}' not allowed") | |
| if kind == "df_method": | |
| method = getattr(current, op, None) | |
| if method is None or not callable(method): | |
| raise ValueError(f"Method '{op}' not available on DataFrame") | |
| kwargs = {k: v for k, v in step.items() if k != "op"} | |
| result = method(**kwargs) if kwargs else method() | |
| current = _coerce_tabular(result, op) | |
| else: | |
| if op == "groupby": | |
| by = step.get("by") | |
| gb = current.groupby(by=by, dropna=False, observed=False) | |
| if "agg" in step: | |
| result = gb.agg(step.get("agg")) | |
| current = _coerce_tabular(result, op) | |
| elif step.get("size") is True: | |
| current = gb.size().reset_index(name="size") | |
| else: | |
| raise ValueError("groupby requires 'agg' or size=true") | |
| else: | |
| raise ValueError(f"Unsupported special op: {op}") | |
| return current | |
| def _apply_pandas_query( | |
| df: pd.DataFrame, | |
| query: str | None, | |
| preview_opts: dict[str, Any] | None = None, | |
| plot_opts: dict[str, Any] | None = None, | |
| ) -> tuple[pd.DataFrame, dict[str, Any]]: | |
| preview = preview_opts or {"rows": 10, "cols": 20, "include_schema": True} | |
| plots: list[str] = [] | |
| original_shape = tuple(df.shape) | |
| transformed = df | |
| if query and isinstance(query, str) and query.strip(): | |
| q = query.strip() | |
| try: | |
| if q.startswith("{") and q.endswith("}"): | |
| cfg = json.loads(q) | |
| if isinstance(cfg.get("pipeline"), list): | |
| transformed = _dispatch_pipeline(df, cfg["pipeline"]) # type: ignore[arg-type] | |
| elif isinstance(cfg.get("expr"), str): | |
| transformed = df.query(cfg["expr"]) # type: ignore[arg-type] | |
| plot_opts = cfg.get("plot") or plot_opts | |
| preview = cfg.get("preview") or preview | |
| elif q.startswith("[") and q.endswith("]"): | |
| steps = json.loads(q) | |
| transformed = _dispatch_pipeline(df, steps) | |
| elif q.lower().startswith("expr:"): | |
| expr = q.split(":", 1)[1].strip() | |
| transformed = df.query(expr) | |
| else: | |
| transformed = df.query(q) | |
| except Exception as e: | |
| raise ValueError(f"Failed to apply query: {e}") | |
| if plot_opts and MATPLOTLIB_AVAILABLE and plt is not None: | |
| try: | |
| kind = plot_opts.get("kind", "bar") | |
| x = plot_opts.get("x") | |
| y = plot_opts.get("y") | |
| fig = plt.figure() | |
| ax = fig.gca() | |
| data = transformed | |
| if x is None and y is None and kind in ("bar", "barh"): | |
| non_numeric = [c for c in data.columns if not pd.api.types.is_numeric_dtype(data[c])] | |
| target_col = non_numeric[0] if non_numeric else data.columns[0] | |
| vc = data[target_col].value_counts().head(20) | |
| vc.plot(kind=kind, ax=ax) | |
| else: | |
| data.plot(kind=kind, x=x, y=y, ax=ax) | |
| plot_path = os.path.join(tempfile.gettempdir(), f"df_plot_{uuid.uuid4().hex}.png") | |
| fig.savefig(plot_path, bbox_inches="tight") | |
| plt.close(fig) | |
| plots.append(encode_image(plot_path)) | |
| except Exception: | |
| pass | |
| rows = int(preview.get("rows", 10)) | |
| cols = int(preview.get("cols", 20)) | |
| include_schema = bool(preview.get("include_schema", True)) | |
| table_markdown = _safe_to_markdown(transformed, rows, cols) | |
| table_records = _truncate_records(transformed, max_rows=min(rows, 1000), max_cols=min(cols, 100)) | |
| payload: dict[str, Any] = { | |
| "original_shape": original_shape, | |
| "shape": tuple(transformed.shape), | |
| "table_markdown": table_markdown, | |
| "table_records": table_records, | |
| } | |
| if include_schema: | |
| payload["schema"] = _dataframe_schema(transformed) | |
| try: | |
| if transformed.shape[0] <= 5000 and transformed.shape[1] <= 50: | |
| payload["describe_summary"] = str(transformed.describe(include="all", datetime_is_numeric=True)) | |
| except Exception: | |
| pass | |
| if plots: | |
| payload["plots"] = plots | |
| return transformed, payload | |
| def analyze_csv_file(file_reference: str, query: str, agent=None) -> str: | |
| """ | |
| Analyze CSV files and return summary statistics and column information. | |
| This tool can process CSV files with various formats and encodings: | |
| - Standard CSV files: .csv | |
| - Tab-separated files: .tsv, .txt (with tab delimiters) | |
| - Comma-separated files with different encodings | |
| The tool automatically: | |
| - Detects delimiters and handles common CSV variations | |
| - Resolves filenames to full file paths via agent's file registry | |
| - Downloads files from URLs automatically | |
| - Provides comprehensive analysis including data types, statistics, and column information | |
| Args: | |
| file_reference (str): Original filename from user upload OR URL to download | |
| query (str): A question or description of the analysis to perform (currently unused) | |
| agent: Agent instance for file resolution (injected automatically) | |
| Returns: | |
| str: Summary statistics and column information, or an error message if analysis fails. | |
| """ | |
| from .file_utils import FileUtils | |
| # Resolve file reference (filename or URL) to full path | |
| file_path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response("analyze_csv_file", error=f"File not found: {file_reference}") | |
| # Check file exists using utilities with Pydantic validation | |
| file_info = FileUtils.get_file_info(file_path) | |
| if not file_info.exists: | |
| return FileUtils.create_tool_response("analyze_csv_file", error=file_info.error) | |
| try: | |
| df = pd.read_csv(file_path) | |
| _, payload = _apply_pandas_query( | |
| df, | |
| query=query if isinstance(query, str) and query.strip() else None, | |
| preview_opts=None, | |
| plot_opts=None, | |
| ) | |
| header = ( | |
| f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| f"File: {file_info.name} ({FileUtils.format_file_size(file_info.size)})\n" | |
| ) | |
| result_parts = [header] | |
| if payload.get("table_markdown"): | |
| result_parts.append("Preview:\n" + payload["table_markdown"]) | |
| if payload.get("describe_summary"): | |
| result_parts.append("\n\nSummary statistics:\n" + str(payload["describe_summary"])) | |
| result_text = "\n".join(result_parts) | |
| return FileUtils.create_tool_response( | |
| "analyze_csv_file", | |
| result=result_text, | |
| file_info=file_info, | |
| extra=payload, | |
| ) | |
| except Exception as e: | |
| return FileUtils.create_tool_response("analyze_csv_file", error=f"Error analyzing CSV file: {str(e)}") | |
| def analyze_excel_file(file_reference: str, query: str, agent=None) -> str: | |
| """ | |
| Analyze Excel files and return summary statistics and column information. | |
| This tool can process Excel files in various formats: | |
| - Excel files: .xlsx, .xls | |
| - Excel workbooks with multiple sheets | |
| - Excel files with different encodings and formats | |
| When to use this tool vs read_text_based_file? | |
| - Use analyze_excel_file for: data analysis, statistics, pandas operations, charts, | |
| querying/transforming data, understanding numerical patterns, describe() summaries. | |
| - Use read_text_based_file for: simple text extraction, reading cell content as text, | |
| understanding document structure when you don't need analytics. | |
| The tool automatically: | |
| - Detects sheet structure and provides comprehensive analysis | |
| - Resolves filenames to full file paths via agent's file registry | |
| - Downloads files from URLs automatically | |
| - Provides data types, statistics, column information, and sheet details | |
| Args: | |
| file_reference (str): Original filename from user upload OR URL to download | |
| query (str): A question or description of the analysis to perform (currently unused) | |
| agent: Agent instance for file resolution (injected automatically) | |
| Returns: | |
| str: Summary statistics and column information, or an error message if analysis fails. | |
| """ | |
| from .file_utils import FileUtils | |
| # Resolve file reference (filename or URL) to full path | |
| file_path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response("analyze_excel_file", error=f"File not found: {file_reference}") | |
| # Check file exists using utilities with Pydantic validation | |
| file_info = FileUtils.get_file_info(file_path) | |
| if not file_info.exists: | |
| return FileUtils.create_tool_response("analyze_excel_file", error=file_info.error) | |
| try: | |
| df = pd.read_excel(file_path) | |
| _, payload = _apply_pandas_query( | |
| df, | |
| query=query if isinstance(query, str) and query.strip() else None, | |
| preview_opts=None, | |
| plot_opts=None, | |
| ) | |
| header = ( | |
| f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| f"File: {file_info.name} ({FileUtils.format_file_size(file_info.size)})\n" | |
| ) | |
| result_parts = [header] | |
| if payload.get("table_markdown"): | |
| result_parts.append("Preview:\n" + payload["table_markdown"]) | |
| if payload.get("describe_summary"): | |
| result_parts.append("\n\nSummary statistics:\n" + str(payload["describe_summary"])) | |
| result_text = "\n".join(result_parts) | |
| return FileUtils.create_tool_response( | |
| "analyze_excel_file", | |
| result=result_text, | |
| file_info=file_info, | |
| extra=payload, | |
| ) | |
| except Exception as e: | |
| # Enhanced error reporting: print columns and head if possible | |
| try: | |
| df = pd.read_excel(file_path) | |
| columns = list(df.columns) | |
| head = df.head().to_dict("records") | |
| error_details = f"Error analyzing Excel file: {str(e)}\nColumns: {columns}\nHead: {head}" | |
| except Exception as inner_e: | |
| error_details = f"Error analyzing Excel file: {str(e)}\nAdditionally, failed to read columns/head: {str(inner_e)}" | |
| return FileUtils.create_tool_response("analyze_excel_file", error=error_details) | |
| # ========== IMAGE ANALYSIS/GENERATION TOOLS ========== | |
| def analyze_image(file_reference: str, agent=None) -> str: | |
| """ | |
| LEGACY: Analyze basic properties of an image (size, mode, color analysis, thumbnail preview). | |
| ⚠️ This is a primitive metadata parser that only extracts technical properties. | |
| For AI-powered image understanding, use analyze_image_ai() instead. | |
| This tool provides: | |
| - Image dimensions and format | |
| - Basic color analysis (average RGB, brightness, dominant color) | |
| - Thumbnail generation | |
| For semantic understanding (what's in the image, OCR, object detection, etc.), | |
| use analyze_image_ai() which uses vision-language models. | |
| The tool automatically: | |
| - Resolves filenames to full file paths via agent's file registry | |
| - Downloads files from URLs automatically | |
| - Provides comprehensive image analysis including dimensions, color analysis, and thumbnails | |
| Args: | |
| file_reference (str): Original filename from user upload OR URL to download | |
| agent: Agent instance for file resolution (injected automatically) | |
| Returns: | |
| str: JSON string with analysis results including dimensions, mode, color_analysis, and thumbnail. | |
| """ | |
| from .file_utils import FileUtils | |
| try: | |
| # Resolve file reference (filename or URL) to full path | |
| file_path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response("analyze_image", error=f"File not found: {file_reference}") | |
| # Open image from file path | |
| img = Image.open(file_path) | |
| width, height = img.size | |
| mode = img.mode | |
| if mode in ("RGB", "RGBA"): | |
| arr = np.array(img) | |
| avg_colors = arr.mean(axis=(0, 1)) | |
| dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])] | |
| brightness = avg_colors.mean() | |
| color_analysis = { | |
| "average_rgb": avg_colors.tolist(), | |
| "brightness": brightness, | |
| "dominant_color": dominant, | |
| } | |
| else: | |
| color_analysis = {"note": f"No color analysis for mode {mode}"} | |
| thumbnail = img.copy() | |
| thumbnail.thumbnail((100, 100)) | |
| thumb_path = save_image(thumbnail, "thumbnails") | |
| thumbnail_base64 = encode_image(thumb_path) | |
| result = { | |
| "dimensions": (width, height), | |
| "mode": mode, | |
| "color_analysis": color_analysis, | |
| "thumbnail": thumbnail_base64, | |
| } | |
| return FileUtils.create_tool_response("analyze_image", result=json.dumps(result)) | |
| except Exception as e: | |
| return FileUtils.create_tool_response("analyze_image", error=str(e)) | |
| def analyze_image_ai( | |
| file_reference: str, | |
| prompt: str, | |
| system_prompt: str = None, | |
| agent=None | |
| ) -> str: | |
| """ | |
| AI-powered image analysis using vision-language models. | |
| This tool uses advanced vision-language models to understand image content semantically: | |
| - Describe what's in the image | |
| - Answer questions about the image | |
| - Extract text (OCR with 100% accuracy) | |
| - Identify objects, people, scenes | |
| - Analyze charts, graphs, diagrams | |
| - Read documents and forms | |
| For basic metadata (dimensions, colors), use the legacy analyze_image() instead. | |
| Args: | |
| file_reference (str): Uploaded image filename, or a URL to an image. | |
| prompt (str): Question or instruction about the image (e.g., "What's in this image?") | |
| system_prompt (str, optional): System instruction for the model | |
| agent: Agent instance for file resolution (injected automatically) | |
| Returns: | |
| str: JSON string with AI analysis result or error message | |
| """ | |
| from .file_utils import FileUtils | |
| try: | |
| from agent_ng.vision_input import VisionInput | |
| from agent_ng.vision_tool_manager import VisionToolManager | |
| lowered_ref = file_reference.strip().lower() | |
| is_direct_url = lowered_ref.startswith("http://") or lowered_ref.startswith( | |
| "https://" | |
| ) | |
| if is_direct_url: | |
| vision_input = VisionInput( | |
| prompt=prompt, | |
| image_url=file_reference.strip(), | |
| ) | |
| else: | |
| file_path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response( | |
| "analyze_image_ai", | |
| error=f"File not found: {file_reference}", | |
| ) | |
| if file_path.lower().endswith(".pdf"): | |
| return FileUtils.create_tool_response( | |
| "analyze_image_ai", | |
| error="PDF files cannot be analyzed directly as images. " | |
| "Most vision models don't support PDFs natively. " | |
| "To analyze visual PDF content:\n" | |
| "1. First: read_text_based_file(file_reference='DOC.pdf', extract_images=True)\n" | |
| " - This extracts images from PDF pages\n" | |
| "2. Then: analyze the extracted images individually\n" | |
| " - Use analyze_image_ai for each extracted image path", | |
| ) | |
| vision_input = VisionInput(prompt=prompt, image_path=file_path) | |
| # Initialize VisionToolManager | |
| import os | |
| os.environ["OPENROUTER_FETCH_PRICING_AT_STARTUP"] = "false" | |
| manager = VisionToolManager() | |
| # Analyze image | |
| result = manager.analyze(vision_input) | |
| # Return result | |
| return FileUtils.create_tool_response( | |
| "analyze_image_ai", | |
| result=result, | |
| extra={ | |
| "file": file_reference, | |
| "model_used": manager.vl_model | |
| } | |
| ) | |
| except Exception as e: | |
| return FileUtils.create_tool_response( | |
| "analyze_image_ai", | |
| error=f"Analysis failed: {str(e)}" | |
| ) | |
| class TransformImageParams(BaseModel): | |
| width: int | None = Field(None, description="New width for resize operation") | |
| height: int | None = Field(None, description="New height for resize operation") | |
| angle: int | None = Field(None, description="Rotation angle in degrees") | |
| direction: Literal["horizontal", "vertical"] | None = Field(None, description="Flip direction") | |
| radius: float | None = Field(None, description="Blur radius") | |
| factor: float | None = Field(None, description="Enhancement factor for brightness/contrast") | |
| def transform_image(image_base64: str, operation: str, params: dict[str, Any] | None = None) -> str: | |
| """ | |
| Transform an image using various operations like resize, rotate, filter, etc. | |
| Args: | |
| image_base64 (str): The base64-encoded string of the image to transform. | |
| operation (str): The transformation operation to apply. | |
| params (Dict[str, Any], optional): Parameters for the transformation. | |
| Returns: | |
| str: JSON string with the transformed image as base64 or error message. | |
| """ | |
| try: | |
| img = decode_image(image_base64) | |
| params = params or {} | |
| if operation == "resize": | |
| width = params.get("width", img.width) | |
| height = params.get("height", img.height) | |
| img = img.resize((width, height), Image.Resampling.LANCZOS) | |
| elif operation == "rotate": | |
| angle = params.get("angle", 0) | |
| img = img.rotate(angle, expand=True) | |
| elif operation == "flip": | |
| direction = params.get("direction", "horizontal") | |
| if direction == "horizontal": | |
| img = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT) | |
| else: | |
| img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) | |
| elif operation == "blur": | |
| radius = params.get("radius", 2) | |
| img = img.filter(ImageFilter.GaussianBlur(radius=radius)) | |
| elif operation == "sharpen": | |
| img = img.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3)) | |
| elif operation == "brightness": | |
| factor = params.get("factor", 1.0) | |
| enhancer = ImageEnhance.Brightness(img) | |
| img = enhancer.enhance(factor) | |
| elif operation == "contrast": | |
| factor = params.get("factor", 1.0) | |
| enhancer = ImageEnhance.Contrast(img) | |
| img = enhancer.enhance(factor) | |
| else: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "transform_image", | |
| "error": f"Unsupported operation: {operation}" | |
| }, indent=2) | |
| result_path = save_image(img) | |
| result_base64 = encode_image(result_path) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "transform_image", | |
| "transformed_image": result_base64 | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "transform_image", | |
| "error": str(e) | |
| }, indent=2) | |
| class DrawOnImageParams(BaseModel): | |
| text: str | None = Field(None, description="Text to draw") | |
| position: list[int] | None = Field(None, description="Text position [x, y]") | |
| color: str | None = Field(None, description="Color name (e.g., 'red', 'blue') or RGB string (e.g., '255,0,0')") | |
| size: int | None = Field(None, description="Font size for text") | |
| coords: list[int] | None = Field(None, description="Rectangle coordinates [x1, y1, x2, y2]") | |
| center: list[int] | None = Field(None, description="Circle center [x, y]") | |
| radius: int | None = Field(None, description="Circle radius") | |
| start: list[int] | None = Field(None, description="Line start [x, y]") | |
| end: list[int] | None = Field(None, description="Line end [x, y]") | |
| width: int | None = Field(None, description="Stroke width") | |
| def draw_on_image(image_base64: str, drawing_type: str, params: DrawOnImageParams) -> str: | |
| """ | |
| Draw shapes, text, or other elements on an image. | |
| Args: | |
| image_base64 (str): The base64-encoded string of the image to draw on. | |
| drawing_type (str): The type of drawing to perform. | |
| params (Dict[str, Any]): Parameters for the drawing operation. | |
| Returns: | |
| str: JSON string with the modified image as base64 or error message. | |
| """ | |
| try: | |
| img = decode_image(image_base64) | |
| draw = ImageDraw.Draw(img) | |
| def parse_color(color_str): | |
| """Parse color string to RGB tuple or color name""" | |
| if not color_str: | |
| return "black" | |
| # Check if it's RGB values as comma-separated string | |
| if "," in color_str and color_str.replace(",", "").replace(" ", "").isdigit(): | |
| try: | |
| rgb_values = [int(x.strip()) for x in color_str.split(",")] | |
| if len(rgb_values) == 3 and all(0 <= v <= 255 for v in rgb_values): | |
| return tuple(rgb_values) | |
| except ValueError: | |
| pass | |
| # Return as color name | |
| return color_str | |
| if drawing_type == "text": | |
| text = params.text or "" | |
| position = params.position or [10, 10] | |
| color = parse_color(params.color) or "black" | |
| size = params.size or 20 | |
| try: | |
| font = ImageFont.truetype("arial.ttf", size) | |
| except: | |
| font = ImageFont.load_default() | |
| draw.text(tuple(position), text, fill=color, font=font) | |
| elif drawing_type == "rectangle": | |
| coords = params.coords or [10, 10, 100, 100] | |
| color = parse_color(params.color) or "red" | |
| width = params.width or 2 | |
| draw.rectangle(coords, outline=color, width=width) | |
| elif drawing_type == "circle": | |
| center = params.center or [50, 50] | |
| radius = params.radius or 30 | |
| color = parse_color(params.color) or "blue" | |
| width = params.width or 2 | |
| bbox = [center[0] - radius, center[1] - radius, | |
| center[0] + radius, center[1] + radius] | |
| draw.ellipse(bbox, outline=color, width=width) | |
| elif drawing_type == "line": | |
| start = params.start or [10, 10] | |
| end = params.end or [100, 100] | |
| color = parse_color(params.color) or "green" | |
| width = params.width or 2 | |
| draw.line([tuple(start), tuple(end)], fill=color, width=width) | |
| else: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "draw_on_image", | |
| "error": f"Unsupported drawing type: {drawing_type}" | |
| }, indent=2) | |
| result_path = save_image(img) | |
| result_base64 = encode_image(result_path) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "draw_on_image", | |
| "modified_image": result_base64 | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "draw_on_image", | |
| "error": str(e) | |
| }, indent=2) | |
| # ------------------------------------------------------------------------- # | |
| # AI image generation (OpenRouter) # | |
| # ------------------------------------------------------------------------- # | |
| # Root for session-isolated image outputs. Patched in tests. Matches the | |
| # convention established by ``tools/file_utils.py::save_base64_to_file``. | |
| _IMAGE_OUTPUT_ROOT = ".gradio/sessions" | |
| def _build_generate_ai_image_description() -> str: | |
| """Compose the LLM-facing tool description. | |
| Includes the active model's prompt-style hint (never the slug or | |
| vendor) so the calling LLM can adapt prompt craft to the configured | |
| backend. | |
| """ | |
| body = ( | |
| "Create a new image from a text description.\n\n" | |
| "Use this when the user asks for an illustration, icon, " | |
| "diagram, business infographic, logo, banner, social-media " | |
| "graphic, or any other visual that does not yet exist. The " | |
| "generated image comes back as a chat attachment reference " | |
| "that you can pass to other tools (for example, to attach the " | |
| "image to a record, analyze it, or transform it).\n\n" | |
| "`aspect_ratio` lets you request a specific shape (for example " | |
| "`16:9` for a banner, `9:16` for a mobile story, `1:1` for a " | |
| "square icon). `image_size` lets you request a resolution tier " | |
| "(`1K` for small/fast, `2K` for sharper details, `4K` for " | |
| "print-quality). Only set these when the composition really " | |
| "depends on shape or size.\n\n" | |
| "Returns a structured result containing the image reference " | |
| "and the generation cost. If the call fails, the result " | |
| "contains an `error` message explaining why — simplify the " | |
| "prompt and try again." | |
| ) | |
| if get_default_prompt_style_hint is None: | |
| return body | |
| hint = get_default_prompt_style_hint() | |
| if not hint: | |
| return body | |
| return f"{body}\n\nActive image generator profile: {hint}" | |
| _GENERATE_AI_IMAGE_DESCRIPTION = _build_generate_ai_image_description() | |
| class GenerateAIImageParams(BaseModel): | |
| """Parameters for the `generate_ai_image` tool. | |
| Deliberately does not expose model selection to the LLM — the active | |
| model is an operations-level decision controlled by the | |
| ``IMAGE_GEN_DEFAULT_MODEL`` environment variable (see | |
| :mod:`agent_ng.image_models`). | |
| """ | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| prompt: str = Field( | |
| ..., | |
| description=( | |
| "Natural-language description of the image to create. Spell " | |
| "out any text that must appear in the picture exactly as it " | |
| "should be rendered." | |
| ), | |
| ) | |
| aspect_ratio: str | None = Field( | |
| None, | |
| description=( | |
| "Optional desired shape of the image, written as 'W:H' " | |
| "(for example '1:1' for a square icon, '16:9' for a banner, " | |
| "'9:16' for a mobile story)." | |
| ), | |
| ) | |
| image_size: Literal["1K", "2K", "4K"] | None = Field( | |
| None, | |
| description=( | |
| "Optional resolution tier: '1K' for small/fast, '2K' for " | |
| "sharper details, '4K' for print-quality." | |
| ), | |
| ) | |
| agent: Annotated[Any | None, InjectedToolArg] = Field( | |
| default=None, | |
| description="Runtime-injected; not supplied by the LLM.", | |
| ) | |
| def generate_ai_image( | |
| prompt: str, | |
| aspect_ratio: str | None = None, | |
| image_size: str | None = None, | |
| agent: Annotated[Any | None, InjectedToolArg] = None, | |
| ) -> dict[str, Any]: | |
| """Create a new image from a text description. | |
| The image generator is chosen by the deployment (via | |
| ``IMAGE_GEN_DEFAULT_MODEL``); the calling LLM only decides on the | |
| prompt, aspect ratio and size. | |
| Returns a structured result. On success it includes a | |
| ``file_reference`` (an attachment name usable by other chat tools), | |
| the ``cost`` in USD, the output ``mime_type`` and ``size_bytes``. On | |
| failure it includes an ``error`` string and no attachment. | |
| """ | |
| if ImageEngine is None: | |
| return { | |
| "success": False, | |
| "error": "Image generation engine is not available.", | |
| "file_reference": None, | |
| "cost": None, | |
| } | |
| try: | |
| engine = ImageEngine() | |
| except ValueError as exc: | |
| return { | |
| "success": False, | |
| "error": str(exc), | |
| "file_reference": None, | |
| "cost": None, | |
| } | |
| result = engine.generate( | |
| prompt=prompt, | |
| aspect_ratio=aspect_ratio, | |
| image_size=image_size, | |
| ) | |
| if not result.success or result.image_bytes is None: | |
| return { | |
| "success": False, | |
| "error": result.error or "image generation failed", | |
| "file_reference": None, | |
| "cost": result.cost, | |
| } | |
| # Resolve session id: prefer the injected agent, fall back to ContextVar. | |
| session_id: str | None = getattr(agent, "session_id", None) if agent else None | |
| if not session_id and get_current_session_id is not None: | |
| session_id = get_current_session_id() | |
| # Write bytes to disk — either into a session dir (preferred) or mkstemp. | |
| ext = _extension_for_mime(result.mime_type) | |
| display_name = _make_display_name(ext) | |
| try: | |
| disk_path = _write_image_bytes( | |
| image_bytes=result.image_bytes, | |
| display_name=display_name, | |
| session_id=session_id, | |
| ) | |
| except OSError as exc: | |
| return { | |
| "success": False, | |
| "error": f"Failed to write image bytes: {exc}", | |
| "file_reference": None, | |
| "cost": result.cost, | |
| } | |
| # Register with the agent if we have one. | |
| if agent is not None and callable(getattr(agent, "register_file", None)): | |
| try: | |
| agent.register_file(display_name, disk_path) | |
| except Exception as exc: | |
| logger.warning("register_file failed for %s: %s", display_name, exc) | |
| try: | |
| os.unlink(disk_path) | |
| except OSError as oe: | |
| logger.debug("temp cleanup after register failure: %s", oe) | |
| return { | |
| "success": False, | |
| "error": f"register_file failed: {exc}", | |
| "file_reference": None, | |
| "cost": result.cost, | |
| } | |
| file_reference = display_name | |
| else: | |
| file_reference = os.path.abspath(disk_path) | |
| return { | |
| "success": True, | |
| "error": None, | |
| "file_reference": file_reference, | |
| "cost": result.cost, | |
| "mime_type": result.mime_type, | |
| "size_bytes": len(result.image_bytes), | |
| } | |
| # ---- helpers for generate_ai_image -------------------------------------- # | |
| def _extension_for_mime(mime: str | None) -> str: | |
| """Map a MIME type to a canonical file extension (falls back to .png).""" | |
| mapping = { | |
| "image/png": ".png", | |
| "image/jpeg": ".jpg", | |
| "image/jpg": ".jpg", | |
| "image/webp": ".webp", | |
| "image/gif": ".gif", | |
| } | |
| return mapping.get((mime or "").lower(), ".png") | |
| def _make_display_name(ext: str) -> str: | |
| """Compose a unique, human-readable filename for a generated image.""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| return f"llm_image_{timestamp}_{uuid.uuid4().hex[:8]}{ext}" | |
| def _write_image_bytes( | |
| image_bytes: bytes, | |
| display_name: str, | |
| session_id: str | None, | |
| ) -> str: | |
| """Persist ``image_bytes`` and return the absolute on-disk path. | |
| When ``session_id`` is truthy, files go under | |
| ``{_IMAGE_OUTPUT_ROOT}/{session_id}/``; otherwise a ``tempfile.mkstemp`` | |
| path is used. | |
| """ | |
| if session_id: | |
| session_dir = Path(_IMAGE_OUTPUT_ROOT) / session_id | |
| session_dir.mkdir(parents=True, exist_ok=True) | |
| path = session_dir / display_name | |
| path.write_bytes(image_bytes) | |
| return str(path) | |
| fd, tmp = tempfile.mkstemp(suffix=Path(display_name).suffix) | |
| try: | |
| with os.fdopen(fd, "wb") as f: | |
| f.write(image_bytes) | |
| except OSError: | |
| # Best-effort cleanup of the partial temp file; then re-raise. | |
| try: | |
| os.unlink(tmp) | |
| except OSError: | |
| pass | |
| raise | |
| return tmp | |
| class CombineImagesParams(BaseModel): | |
| spacing: int | None = Field(None, description="Spacing between images in pixels") | |
| background_color: str | None = Field(None, description="Background color for collage (e.g., 'white', 'black') or RGB string (e.g., '255,255,255')") | |
| blend_mode: str | None = Field(None, description="Blend mode for blending operations") | |
| opacity: float | None = Field(None, description="Opacity for overlay operations (0.0-1.0)") | |
| def combine_images(images_base64: list[str], operation: str, | |
| params: CombineImagesParams | None = None) -> str: | |
| """ | |
| Combine multiple images using various operations (collage, stack, blend, horizontal, vertical, overlay, etc.). | |
| Args: | |
| images_base64 (List[str]): List of base64-encoded image strings. | |
| operation (str): The combination operation to perform. | |
| params (Dict[str, Any], optional): Parameters for the combination. | |
| Returns: | |
| str: JSON string with the combined image as base64 or error message. | |
| """ | |
| try: | |
| if len(images_base64) < 2: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "combine_images", | |
| "error": "At least 2 images required for combination" | |
| }, indent=2) | |
| images = [decode_image(b64) for b64 in images_base64] | |
| if params is None: | |
| params = CombineImagesParams() | |
| if operation == "horizontal": | |
| # Combine images side by side | |
| total_width = sum(img.width for img in images) | |
| max_height = max(img.height for img in images) | |
| result = Image.new("RGB", (total_width, max_height)) | |
| x_offset = 0 | |
| for img in images: | |
| result.paste(img, (x_offset, 0)) | |
| x_offset += img.width | |
| elif operation == "vertical": | |
| # Stack images vertically | |
| max_width = max(img.width for img in images) | |
| total_height = sum(img.height for img in images) | |
| result = Image.new("RGB", (max_width, total_height)) | |
| y_offset = 0 | |
| for img in images: | |
| result.paste(img, (0, y_offset)) | |
| y_offset += img.height | |
| elif operation == "overlay": | |
| # Overlay images on top of each other | |
| base_img = images[0] | |
| for overlay_img in images[1:]: | |
| if overlay_img.size != base_img.size: | |
| overlay_img = overlay_img.resize(base_img.size, Image.Resampling.LANCZOS) | |
| base_img = Image.alpha_composite(base_img.convert("RGBA"), overlay_img.convert("RGBA")) | |
| result = base_img.convert("RGB") | |
| elif operation == "stack": | |
| # Original stack operation with direction parameter | |
| direction = params.direction or "horizontal" | |
| if direction == "horizontal": | |
| total_width = sum(img.width for img in images) | |
| max_height = max(img.height for img in images) | |
| result = Image.new("RGB", (total_width, max_height)) | |
| x = 0 | |
| for img in images: | |
| result.paste(img, (x, 0)) | |
| x += img.width | |
| else: | |
| max_width = max(img.width for img in images) | |
| total_height = sum(img.height for img in images) | |
| result = Image.new("RGB", (max_width, total_height)) | |
| y = 0 | |
| for img in images: | |
| result.paste(img, (0, y)) | |
| y += img.height | |
| else: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "combine_images", | |
| "error": f"Unsupported combination operation: {operation}" | |
| }, indent=2) | |
| result_path = save_image(result) | |
| result_base64 = encode_image(result_path) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "combine_images", | |
| "combined_image": result_base64 | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "combine_images", | |
| "error": str(e) | |
| }, indent=2) | |
| # ========== VIDEO/AUDIO UNDERSTANDING TOOLS ========== | |
| def understand_video(file_reference: str, prompt: str, system_prompt: str = None, agent=None, | |
| start_time: str = None, end_time: str = None, fps: float = None) -> str: | |
| """ | |
| Analyze a video using vision-language models (Gemini, Qwen). | |
| This tool uses VisionToolManager for video understanding with multiple model options. | |
| Automatically selects the best model based on video characteristics. | |
| Supports: | |
| - Uploaded video files | |
| - Direct video URLs | |
| - YouTube URLs (routed per VL_YOUTUBE_MODEL, VL_YOUTUBE_GEMINI_PROVIDER, VL_GEMINI_PROVIDER in .env) | |
| Args: | |
| file_reference (str): Original filename from user upload OR direct video URL | |
| OR YouTube URL | |
| prompt (str): A question or request regarding the video content | |
| system_prompt (str, optional): System instruction (not used currently) | |
| agent: Agent instance for file resolution (injected automatically) | |
| start_time (str, optional): Start time for video clipping in MM:SS format (e.g., "02:30") | |
| end_time (str, optional): End time for video clipping in MM:SS format (e.g., "03:29") | |
| fps (float, optional): Custom frame rate for video processing (default: 1 FPS) | |
| Returns: | |
| str: Analysis of the video content based on the prompt, or error message | |
| """ | |
| from .file_utils import FileUtils | |
| try: | |
| from agent_ng.vision_input import VisionInput | |
| from agent_ng.vision_tool_manager import VisionToolManager | |
| # Handle direct URLs without forcing local download first | |
| lowered_ref = file_reference.strip().lower() | |
| is_direct_url = lowered_ref.startswith("http://") or lowered_ref.startswith("https://") | |
| if is_direct_url: | |
| vision_input = VisionInput(prompt=prompt, video_url=file_reference.strip()) | |
| file_path = None | |
| else: | |
| # Resolve uploaded/local file reference | |
| file_path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response( | |
| "understand_video", | |
| error=f"File not found: {file_reference}" | |
| ) | |
| vision_input = VisionInput( | |
| prompt=prompt, | |
| video_path=file_path | |
| ) | |
| # Validate input | |
| vision_input.validate() | |
| # Initialize VisionToolManager | |
| import os | |
| os.environ["OPENROUTER_FETCH_PRICING_AT_STARTUP"] = "false" | |
| manager = VisionToolManager() | |
| # Analyze video using URL-aware manager routing | |
| result = manager.analyze(vision_input) | |
| selected_model = manager.get_model_for_input(vision_input) | |
| # Return result | |
| return FileUtils.create_tool_response( | |
| "understand_video", | |
| result=result, | |
| extra={ | |
| "file": file_reference, | |
| "model_used": selected_model | |
| } | |
| ) | |
| except Exception as e: | |
| return FileUtils.create_tool_response( | |
| "understand_video", | |
| error=f"Video analysis failed: {str(e)}" | |
| ) | |
| def understand_audio(file_reference: str, prompt: str, system_prompt: str = None, agent=None, | |
| start_time: str = None, end_time: str = None) -> str: | |
| """ | |
| Analyze an audio file using vision-language models (Gemini). | |
| This tool uses VisionToolManager for audio understanding. | |
| Automatically uses Gemini 2.5 Flash (only model with audio support). | |
| Args: | |
| file_reference (str): Original filename from user upload OR URL to download | |
| prompt (str): A question or request regarding the audio content | |
| system_prompt (str, optional): System instruction (not used currently) | |
| agent: Agent instance for file resolution (injected automatically) | |
| start_time (str, optional): Start time reference in MM:SS format (e.g., "02:30") | |
| end_time (str, optional): End time reference in MM:SS format (e.g., "03:29") | |
| Returns: | |
| str: Analysis of the audio content based on the prompt, or error message | |
| """ | |
| from .file_utils import FileUtils | |
| try: | |
| from agent_ng.vision_input import VisionInput | |
| from agent_ng.vision_tool_manager import VisionToolManager | |
| # Resolve file reference to full path | |
| file_path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not file_path: | |
| return FileUtils.create_tool_response( | |
| "understand_audio", | |
| error=f"File not found: {file_reference}" | |
| ) | |
| # Create VisionInput | |
| vision_input = VisionInput( | |
| prompt=prompt, | |
| audio_path=file_path | |
| ) | |
| # Initialize VisionToolManager | |
| import os | |
| os.environ["OPENROUTER_FETCH_PRICING_AT_STARTUP"] = "false" | |
| manager = VisionToolManager() | |
| # Analyze audio | |
| result = manager.analyze_audio(audio_path=file_path, prompt=prompt) | |
| # Return result | |
| return FileUtils.create_tool_response( | |
| "understand_audio", | |
| result=result, | |
| extra={ | |
| "file": file_reference, | |
| "model_used": manager.vl_audio_model | |
| } | |
| ) | |
| except Exception as e: | |
| return FileUtils.create_tool_response( | |
| "understand_audio", | |
| error=f"Audio analysis failed: {str(e)}" | |
| ) | |
| def web_search_deep_research_exa_ai(instructions: str) -> str: | |
| """ | |
| Search the web and site content using deep research tool. | |
| Ask a query and get a well-researched answer with references. | |
| Can provide FINAL ANSWER candidate. | |
| Args: | |
| instructions: The prompt or query describing the research goal. | |
| Returns: | |
| The results of the deep research as a string. | |
| """ | |
| if not EXA_AVAILABLE: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search_deep_research_exa_ai", | |
| "error": "Exa not available. Install with: pip install exa-py" | |
| }) | |
| try: | |
| exa_api_key = os.environ.get("EXA_API_KEY") | |
| if not exa_api_key: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search_deep_research_exa_ai", | |
| "error": "EXA_API_KEY not found in environment variables. Please set it in your .env file." | |
| }) | |
| exa = Exa(exa_api_key) | |
| task_stub = exa.research.create_task( | |
| instructions=instructions, | |
| model="exa-research-pro", | |
| output_infer_schema = True | |
| ) | |
| task = exa.research.poll_task(task_stub.id) | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search_deep_research_exa_ai", | |
| "result": str(task) | |
| }) | |
| except Exception as e: | |
| return json.dumps({ | |
| "type": "tool_response", | |
| "tool_name": "web_search_deep_research_exa_ai", | |
| "error": f"Error in Exa research: {str(e)}" | |
| }) | |
| # ========== PYDANTIC SCHEMAS ========== | |
| class SubmitAnswerSchema(BaseModel): | |
| """ | |
| Schema for submitting final answers with structured metadata. | |
| Use this when ready to provide a final answer and the analysis is complete. | |
| """ | |
| answer: str = Field( | |
| description="The final answer to the user's question", | |
| min_length=1 | |
| ) | |
| confidence: float = Field( | |
| default=1.0, | |
| ge=0.0, | |
| le=1.0, | |
| description="Confidence level from 0.0 to 1.0 (default: 1.0)" | |
| ) | |
| sources: list[str] | None = Field( | |
| default=None, | |
| description="List of sources or tools used to generate this answer" | |
| ) | |
| reasoning: str | None = Field( | |
| default=None, | |
| description="Brief explanation of the reasoning process" | |
| ) | |
| def validate_sources(cls, v): | |
| if v is not None and len(v) == 0: | |
| return None | |
| return v | |
| class SubmitIntermediateStepSchema(BaseModel): | |
| """ | |
| Schema for submitting intermediate reasoning steps or progress updates. | |
| Use this to document intermediate steps in your reasoning process, | |
| progress updates, or partial findings before reaching a final conclusion. | |
| """ | |
| step_name: str = Field( | |
| description="Short name/identifier for this step (e.g., 'data_analysis', 'search_results')", | |
| min_length=1, | |
| max_length=100 | |
| ) | |
| description: str = Field( | |
| description="Detailed description of what was accomplished in this step", | |
| min_length=1 | |
| ) | |
| status: Literal["in_progress", "completed", "failed", "blocked"] = Field( | |
| default="in_progress", | |
| description="Current status of this step" | |
| ) | |
| data: dict[str, Any] | None = Field( | |
| default=None, | |
| description="Optional dictionary containing relevant data, findings, or results from this step" | |
| ) | |
| next_steps: list[str] | None = Field( | |
| default=None, | |
| description="Optional list of planned next steps or actions" | |
| ) | |
| confidence: float | None = Field( | |
| default=None, | |
| ge=0.0, | |
| le=1.0, | |
| description="Optional confidence level from 0.0 to 1.0 for this step's results" | |
| ) | |
| issues: list[str] | None = Field( | |
| default=None, | |
| description="Optional list of issues, concerns, or limitations encountered" | |
| ) | |
| def validate_lists(cls, v): | |
| if v is not None and len(v) == 0: | |
| return None | |
| return v | |
| class SubmitAnswerResult(BaseModel): | |
| """ | |
| Structured result model for submit_answer operations. | |
| This model standardizes the response format for final answer submissions, | |
| providing consistent success/error handling and response structure. | |
| """ | |
| success: bool | |
| status_code: int = Field(default=200) | |
| raw_response: dict | str | None = Field(default=None) | |
| error: str | None = Field(default=None) | |
| class SubmitIntermediateStepResult(BaseModel): | |
| """ | |
| Structured result model for submit_intermediate_step operations. | |
| This model standardizes the response format for intermediate step submissions, | |
| providing consistent success/error handling and response structure. | |
| """ | |
| success: bool | |
| status_code: int = Field(default=200) | |
| raw_response: dict | str | None = Field(default=None) | |
| error: str | None = Field(default=None) | |
| # ========== TOOL FUNCTIONS ========== | |
| def submit_answer(answer: str, confidence: float = 1.0, sources: list[str] = None, reasoning: str = None) -> dict[str, Any]: | |
| """ | |
| Submit a final answer using Schema-Guided Reasoning (SGR). | |
| This tool forces the LLM to explicitly state its conclusion rather than leaving it implicit. | |
| It preserves structured metadata while ensuring clean integration with the streaming pipeline. | |
| Use this tool when ready to provide an answer for the current question. This can be: | |
| - A final answer after completing all reasoning steps | |
| - An answer to a sub-question in a multi-turn conversation | |
| - A response that concludes the current analysis phase | |
| This tool preserves context across multiple conversation turns. | |
| Returns: | |
| dict: Structured response with answer and metadata | |
| """ | |
| try: | |
| # Create structured response that's easy to extract | |
| result = { | |
| "success": True, | |
| "answer": answer, | |
| "confidence": confidence, | |
| "sources": sources or [], | |
| "reasoning": reasoning or "", | |
| "timestamp": time.time(), | |
| "type": "final_answer" | |
| } | |
| return result | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error submitting answer: {str(e)}", | |
| "type": "error" | |
| } | |
| def submit_intermediate_step(step_name: str, description: str, status: str = "in_progress", | |
| data: dict[str, Any] = None, next_steps: list[str] = None, | |
| confidence: float = None, issues: list[str] = None) -> dict[str, Any]: | |
| """ | |
| Submit an intermediate reasoning step using Schema-Guided Reasoning (SGR). | |
| Use this tool to document intermediate steps in your reasoning process, | |
| progress updates, or partial findings before reaching a final conclusion. | |
| This tool helps track the agent's thought process and enables better debugging. | |
| It helps guide structured thinking and makes the reasoning process transparent and debuggable. | |
| Returns: | |
| dict: Structured response with step details and metadata | |
| """ | |
| try: | |
| # Create structured response that's easy to extract | |
| result = { | |
| "success": True, | |
| "step_name": step_name, | |
| "description": description, | |
| "status": status, | |
| "data": data or {}, | |
| "next_steps": next_steps or [], | |
| "confidence": confidence, | |
| "issues": issues or [], | |
| "timestamp": time.time(), | |
| "type": "intermediate_step" | |
| } | |
| return result | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error submitting step: {str(e)}", | |
| "type": "error" | |
| } | |
| # ========== END OF TOOLS.PY ========== | |