cmw-copilot / tools /tools.py
arterm-sedov
fix(cmw-platform): button context parameter and Create-kind support
dbdf2d2
# tools.py - Consolidated tools
# Dependencies are included
import base64
import cmath
from datetime import datetime
import io
import json
import logging
import os
from pathlib import Path
import re
import shutil
import sqlite3
import subprocess
import sys
import tempfile
import time
from typing import Annotated, Any, Dict, List, Literal, Optional, Tuple, Union
import urllib.parse
import uuid
import numpy as np
import pandas as pd
import requests
logger = logging.getLogger(__name__)
# Check if we're running on Hugging Face Spaces
HF_SPACES = os.environ.get("SPACE_ID") is not None
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont
from pydantic import BaseModel, ConfigDict, Field, field_validator
# Try to import matplotlib, but make it optional
try:
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend
import matplotlib.pyplot as plt
MATPLOTLIB_AVAILABLE = True
except (ImportError, Exception) as e:
MATPLOTLIB_AVAILABLE = False
plt = None
print(f"Warning: matplotlib not available: {e}")
# Always import the tool decorator - it's essential
from langchain_core.tools import InjectedToolArg, tool
# Image generation engine (backed by the provider registry in
# agent_ng.image_providers). The tool only knows the engine facade and the
# default-model hint accessor — never the active model slug or provider.
try:
from agent_ng.image_engine import ImageEngine
from agent_ng.image_models import get_default_prompt_style_hint
except ImportError: # pragma: no cover
ImageEngine = None # type: ignore[assignment,misc]
get_default_prompt_style_hint = None # type: ignore[assignment]
# Session-aware current_session_id (ContextVar fallback when agent not injected)
try:
from agent_ng.session_manager import get_current_session_id
except ImportError: # pragma: no cover
get_current_session_id = None # type: ignore[assignment]
from .applications_tools.tool_list_applications import list_applications
# Applications tools
from .applications_tools.tool_list_templates import list_templates
from .attributes_tools.tool_archive_or_unarchive_attribute import (
archive_or_unarchive_attribute,
)
# Attributes tools - Utility functions
from .attributes_tools.tool_delete_attribute import delete_attribute
from .attributes_tools.tool_get_attribute import get_attribute
from .attributes_tools.tools_account_attribute import edit_or_create_account_attribute
from .attributes_tools.tools_boolean_attribute import edit_or_create_boolean_attribute
# Attributes tools - Other attribute types
from .attributes_tools.tools_datetime_attribute import (
edit_or_create_date_time_attribute,
)
from .attributes_tools.tools_decimal_attribute import edit_or_create_numeric_attribute
from .attributes_tools.tools_document_attribute import edit_or_create_document_attribute
from .attributes_tools.tools_drawing_attribute import edit_or_create_drawing_attribute
from .attributes_tools.tools_duration_attribute import edit_or_create_duration_attribute
from .attributes_tools.tools_enum_attribute import edit_or_create_enum_attribute
from .attributes_tools.tools_image_attribute import edit_or_create_image_attribute
from .attributes_tools.tools_record_attribute import edit_or_create_record_attribute
from .attributes_tools.tools_role_attribute import edit_or_create_role_attribute
# Attributes tools - Text attributes
from .attributes_tools.tools_text_attribute import edit_or_create_text_attribute
# Datetime tool
from .get_datetime import get_current_datetime
# Expose Comindware Platform tools from all directories
# Templates tools
from .templates_tools.tool_list_attributes import list_attributes
from .templates_tools.tool_list_records import list_template_records
# Templates tools
from .templates_tools.tools_record_template import edit_or_create_record_template
# Attributes tools - Text attributes
from .attributes_tools.tools_text_attribute import edit_or_create_text_attribute
# Attributes tools - Other attribute types
from .attributes_tools.tools_datetime_attribute import edit_or_create_date_time_attribute
from .attributes_tools.tools_decimal_attribute import edit_or_create_numeric_attribute
from .attributes_tools.tools_record_attribute import edit_or_create_record_attribute
from .attributes_tools.tools_image_attribute import edit_or_create_image_attribute
from .attributes_tools.tools_drawing_attribute import edit_or_create_drawing_attribute
from .attributes_tools.tools_document_attribute import edit_or_create_document_attribute
from .attributes_tools.tools_duration_attribute import edit_or_create_duration_attribute
from .attributes_tools.tools_account_attribute import edit_or_create_account_attribute
from .attributes_tools.tools_boolean_attribute import edit_or_create_boolean_attribute
from .attributes_tools.tools_role_attribute import edit_or_create_role_attribute
from .attributes_tools.tools_enum_attribute import edit_or_create_enum_attribute
# Attributes tools - Utility functions
from .attributes_tools.tool_delete_attribute import delete_attribute
from .attributes_tools.tool_archive_or_unarchive_attribute import archive_or_unarchive_attribute
from .attributes_tools.tool_get_attribute import get_attribute
# Datetime tool
from .get_datetime import get_current_datetime
# NOTE: Browser automation tools (tools/browser_tools.py, agent_ng/browser_session.py)
# are intentionally NOT bound to the agent. They are kept for external/standalone use.
# See .agents/skills/cmw-platform/SKILL.md section "Browser Automation"
# Global configuration for search tools
SEARCH_LIMIT = 5 # Maximum number of results for all search tools (Tavily, Wikipedia, Arxiv)
# LangChain imports for search tools
try:
from langchain_tavily import TavilySearch
TAVILY_AVAILABLE = True
except ImportError:
TAVILY_AVAILABLE = False
print("Warning: TavilySearch not available. Install with: pip install langchain-tavily")
# Try to import wikipedia-api as it's a common dependency
try:
import wikipedia
WIKIPEDIA_AVAILABLE = True
except ImportError as e:
WIKIPEDIA_AVAILABLE = False
print(f"Wikipedia search requires additional dependencies. Install with: pip install wikipedia-api. Error: {str(e)}")
try:
from langchain_community.document_loaders import WikipediaLoader
WIKILOADER_AVAILABLE = True
except ImportError:
WIKILOADER_AVAILABLE = False
print("Warning: WikipediaLoader not available. Install with: pip install langchain-community")
# Try to import arxiv as it's a common dependency
try:
import arxiv
ARXIV_AVAILABLE = True
except ImportError as e:
ARXIV_AVAILABLE = False
print(f"Arxiv search requires additional dependencies. Install with: pip install arxiv. Error: {str(e)}")
# Try to import PyMuPDF for PDF processing
try:
import fitz # PyMuPDF
PYMUPDF_AVAILABLE = True
except ImportError:
PYMUPDF_AVAILABLE = False
print("Warning: PyMuPDF not available. Install with: pip install pymupdf")
try:
from langchain_community.document_loaders import ArxivLoader
ARXIVLOADER_AVAILABLE = True
except ImportError:
ARXIVLOADER_AVAILABLE = False
print("Warning: ArxivLoader not available. Install with: pip install langchain-community")
# Optional: Exa deep research (web_search_deep_research_exa_ai returns a clear JSON error if exa-py is missing)
try:
from exa_py import Exa
EXA_AVAILABLE = True
except ImportError:
EXA_AVAILABLE = False
# Google Gemini imports for video/audio understanding
try:
from google import genai
from google.genai import types
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
print("Warning: Google Gemini not available. Install with: pip install google-genai")
# ========== GEMINI HELPER FUNCTIONS ==========
def _get_gemini_client():
"""
Initialize and return a Gemini client with proper error handling.
Args:
model_name (str, optional): The Gemini model to use. If None, defaults to gemini-2.5-flash.
Returns:
client or None: The Gemini client if initialization succeeds, None otherwise.
"""
if not GEMINI_AVAILABLE:
print("Warning: Google Gemini not available. Install with: pip install google-genai")
return None
try:
gemini_key = os.environ.get("GEMINI_KEY")
if not gemini_key:
print("Warning: GEMINI_KEY not found in environment variables.")
return None
client = genai.Client(api_key=gemini_key)
return client
except Exception as e:
print(f"Error initializing Gemini client: {str(e)}")
return None
def _get_gemini_response(prompt, error_prefix="Gemini", model_name="gemini-2.5-flash"):
"""
Get a response from Gemini with proper error handling.
Args:
prompt: The prompt to send to Gemini
error_prefix (str): Prefix for error messages to identify the calling context
model_name (str, optional): The Gemini model to use.
Returns:
str: The Gemini response text, or an error message if the request fails.
"""
client = _get_gemini_client()
if not client:
return f"{error_prefix} client not available. Check installation and API key configuration."
try:
response = client.models.generate_content(
model=model_name,
contents=prompt
)
return response.text
except Exception as e:
return f"Error in {error_prefix.lower()} request: {str(e)}"
# ========== IMAGE PROCESSING HELPERS ==========
def encode_image(image_path: str) -> str:
"""
Convert an image file to a base64-encoded string.
Args:
image_path (str): The path to the image file to encode.
Returns:
str: The base64-encoded string representation of the image file.
"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def decode_image(base64_string: str) -> Any:
"""
Convert a base64-encoded string to a PIL Image object.
Args:
base64_string (str): The base64-encoded string representing the image.
Returns:
Any: The decoded PIL Image object.
"""
image_data = base64.b64decode(base64_string)
return Image.open(io.BytesIO(image_data))
def save_image(image: Any, directory: str = "image_outputs") -> str:
"""
Save a PIL Image object to disk in the specified directory and return the file path.
Args:
image (Any): The PIL Image object to save.
directory (str, optional): The directory to save the image in. Defaults to "image_outputs".
Returns:
str: The file path where the image was saved.
"""
os.makedirs(directory, exist_ok=True)
image_id = str(uuid.uuid4())
image_path = os.path.join(directory, f"{image_id}.png")
image.save(image_path)
return image_path
# ========== CODE INTERPRETER ==========
class CodeInterpreter:
"""
A code interpreter for executing code in various languages (Python, Bash, SQL, C, Java) with safety and resource controls.
Args:
allowed_modules (list, optional): List of allowed module names for Python execution.
max_execution_time (int, optional): Maximum execution time in seconds for code blocks.
working_directory (str, optional): Directory for temporary files and execution context.
Attributes:
globals (dict): Global variables for code execution.
temp_sqlite_db (str): Path to a temporary SQLite database for SQL code.
"""
def __init__(self, allowed_modules=None, max_execution_time=30, working_directory=None):
self.allowed_modules = allowed_modules or [
"numpy", "pandas", "matplotlib", "scipy", "sklearn",
"math", "random", "statistics", "datetime", "collections",
"itertools", "functools", "operator", "re", "json",
"sympy", "networkx", "nltk", "PIL",
"cmath", "uuid", "tempfile", "requests", "urllib"
]
self.max_execution_time = max_execution_time
self.working_directory = working_directory or os.path.join(os.getcwd())
if not os.path.exists(self.working_directory):
os.makedirs(self.working_directory)
# Use global imports that are already available
self.globals = {
"__builtins__": __builtins__,
"np": np,
"pd": pd,
"Image": Image,
}
# Only add plt to globals if it's available
if MATPLOTLIB_AVAILABLE:
self.globals["plt"] = plt
self.temp_sqlite_db = os.path.join(tempfile.gettempdir(), "code_exec.db")
def execute_code(self, code: str, language: str = "python") -> dict[str, Any]:
"""
Execute code in the specified language with safety controls.
Args:
code (str): The source code to execute
language (str): The programming language
Returns:
Dict containing execution results, status, and outputs
"""
try:
if language.lower() == "python":
return self._execute_python(code)
elif language.lower() == "bash":
return self._execute_bash(code)
elif language.lower() == "sql":
return self._execute_sql(code)
elif language.lower() == "c":
return self._execute_c(code)
elif language.lower() == "java":
return self._execute_java(code)
else:
return {"status": "error", "stderr": f"Unsupported language: {language}"}
except Exception as e:
return {"status": "error", "stderr": str(e)}
def _execute_python(self, code: str) -> dict[str, Any]:
"""Execute Python code with safety controls."""
try:
# Capture stdout and stderr
# Create string buffers to capture output
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
# Store original stdout/stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
# Redirect stdout/stderr to our buffers
sys.stdout = stdout_buffer
sys.stderr = stderr_buffer
try:
# Create a copy of globals for this execution
local_globals = self.globals.copy()
local_globals["__name__"] = "__main__"
# Execute the code
exec(code, local_globals)
# Get captured output
stdout_content = stdout_buffer.getvalue()
stderr_content = stderr_buffer.getvalue()
# Capture any variables that might be dataframes or plots
result = {"status": "success", "stdout": stdout_content, "stderr": stderr_content, "result": None}
# Check for dataframes
dataframes = []
for name, value in local_globals.items():
if isinstance(value, pd.DataFrame):
dataframes.append({
"name": name,
"shape": value.shape,
"head": value.head().to_dict("records")
})
if dataframes:
result["dataframes"] = dataframes
# Check for plots (only if matplotlib is available)
plots = []
if MATPLOTLIB_AVAILABLE and plt is not None:
try:
# Save any current plots
if plt.get_fignums():
for fig_num in plt.get_fignums():
fig = plt.figure(fig_num)
plot_path = os.path.join(self.working_directory, f"plot_{fig_num}.png")
fig.savefig(plot_path)
plots.append(plot_path)
plt.close(fig)
except Exception as plot_error:
# If plot handling fails, just continue without plots
print(f"Warning: Plot handling failed: {plot_error}")
if plots:
result["plots"] = plots
return result
finally:
# Restore original stdout/stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
stdout_buffer.close()
stderr_buffer.close()
except Exception as e:
return {"status": "error", "stderr": str(e)}
def _execute_bash(self, code: str) -> dict[str, Any]:
"""Execute Bash code."""
if HF_SPACES:
return {"status": "error", "stderr": "Bash execution not available on Hugging Face Spaces"}
try:
result = subprocess.run(
code,
check=False, shell=True,
capture_output=True,
text=True,
timeout=self.max_execution_time
)
return {
"status": "success" if result.returncode == 0 else "error",
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {"status": "error", "stderr": "Execution timed out"}
except Exception as e:
return {"status": "error", "stderr": str(e)}
def _execute_sql(self, code: str) -> dict[str, Any]:
"""Execute SQL code using SQLite."""
try:
conn = sqlite3.connect(self.temp_sqlite_db)
cursor = conn.cursor()
# Execute SQL
cursor.execute(code)
# Fetch results if it's a SELECT
if code.strip().upper().startswith("SELECT"):
results = cursor.fetchall()
columns = [description[0] for description in cursor.description]
result = {"status": "success", "results": results, "columns": columns}
else:
conn.commit()
result = {"status": "success", "message": f"Executed: {code}"}
conn.close()
return result
except Exception as e:
return {"status": "error", "stderr": str(e)}
def _execute_c(self, code: str) -> dict[str, Any]:
"""Execute C code by compiling and running."""
if HF_SPACES:
return {"status": "error", "stderr": "C code execution not available on Hugging Face Spaces"}
try:
# Create temporary C file
c_file = os.path.join(self.working_directory, "temp_code.c")
with open(c_file, "w") as f:
f.write(code)
# Compile
compile_result = subprocess.run(
["gcc", "-o", os.path.join(self.working_directory, "temp_program"), c_file],
check=False, capture_output=True,
text=True
)
if compile_result.returncode != 0:
return {"status": "error", "stderr": f"Compilation failed: {compile_result.stderr}"}
# Run
run_result = subprocess.run(
[os.path.join(self.working_directory, "temp_program")],
check=False, capture_output=True,
text=True,
timeout=self.max_execution_time
)
return {
"status": "success",
"stdout": run_result.stdout,
"stderr": run_result.stderr,
"returncode": run_result.returncode
}
except subprocess.TimeoutExpired:
return {"status": "error", "stderr": "Execution timed out"}
except Exception as e:
return {"status": "error", "stderr": str(e)}
def _execute_java(self, code: str) -> dict[str, Any]:
"""Execute Java code by compiling and running."""
if HF_SPACES:
return {"status": "error", "stderr": "Java code execution not available on Hugging Face Spaces"}
try:
# Create temporary Java file
java_file = os.path.join(self.working_directory, "TempCode.java")
with open(java_file, "w") as f:
f.write(code)
# Compile
compile_result = subprocess.run(
["javac", java_file],
check=False, capture_output=True,
text=True
)
if compile_result.returncode != 0:
return {"status": "error", "stderr": f"Compilation failed: {compile_result.stderr}"}
# Run
run_result = subprocess.run(
["java", "-cp", self.working_directory, "TempCode"],
check=False, capture_output=True,
text=True,
timeout=self.max_execution_time
)
return {
"status": "success",
"stdout": run_result.stdout,
"stderr": run_result.stderr,
"returncode": run_result.returncode
}
except subprocess.TimeoutExpired:
return {"status": "error", "stderr": "Execution timed out"}
except Exception as e:
return {"status": "error", "stderr": str(e)}
# Create a global instance for use by tools
interpreter_instance = CodeInterpreter()
@tool
def execute_code_multilang(code_reference: str, language: str = "python", agent=None) -> str:
"""Execute code in multiple languages (Python, Bash, SQL, C, Java) and return results.
Args:
code_reference (str): The source code to execute, filename, or URL to download code from.
language (str): The language of the code. Supported: "python", "bash", "sql", "c", "java".
agent: Agent instance for file resolution (injected automatically).
Returns:
A string summarizing the execution results (stdout, stderr, errors, plots, dataframes if any).
"""
from .file_utils import FileUtils
# Resolve code reference (code content, filename, or URL)
code_content, detected_language = FileUtils.resolve_code_input(code_reference, agent)
# Use detected language or provided language
final_language = (detected_language or language).lower()
supported_languages = ["python", "bash", "sql", "c", "java"]
if final_language not in supported_languages:
return FileUtils.create_tool_response(
"execute_code_multilang",
error=f"❌ Unsupported language: {final_language}. Supported languages are: {', '.join(supported_languages)}"
)
result = interpreter_instance.execute_code(code_content, language=final_language)
response = []
if result["status"] == "success":
response.append(f"✅ Code executed successfully in **{final_language.upper()}**")
if result.get("stdout"):
response.append(
"\n**Standard Output:**\n```\n" + result["stdout"].strip() + "\n```"
)
if result.get("stderr"):
response.append(
"\n**Standard Error (if any):**\n```\n"
+ result["stderr"].strip()
+ "\n```"
)
if result.get("result") is not None:
response.append(
"\n**Execution Result:**\n```\n"
+ str(result["result"]).strip()
+ "\n```"
)
if result.get("dataframes"):
for df_info in result["dataframes"]:
response.append(
f"\n**DataFrame `{df_info['name']}` (Shape: {df_info['shape']})**"
)
df_preview = pd.DataFrame(df_info["head"])
response.append("First 5 rows:\n```\n" + str(df_preview) + "\n```")
if result.get("plots"):
response.append(
f"\n**Generated {len(result['plots'])} plot(s)** (Image data returned separately)"
)
else:
response.append(f"❌ Code execution failed in **{final_language.upper()}**")
if result.get("stderr"):
response.append(
"\n**Error Log:**\n```\n" + result["stderr"].strip() + "\n```"
)
return FileUtils.create_tool_response("execute_code_multilang", result="\n".join(response))
# ========== MATH TOOLS ==========
@tool
def multiply(a: float, b: float) -> float:
"""
Multiply two numbers and return the result.
Args:
a (float): The first number.
b (float): The second number.
Returns:
float: The product of a and b.
"""
return a * b
@tool
def add(a: float, b: float) -> float:
"""
Add two numbers and return the result.
Args:
a (float): The first number.
b (float): The second number.
Returns:
float: The sum of a and b.
"""
return a + b
@tool
def subtract(a: float, b: float) -> float:
"""
Subtract the second number from the first and return the result.
Args:
a (float): The number to subtract from.
b (float): The number to subtract.
Returns:
float: The result of a - b.
"""
return a - b
@tool
def divide(a: float, b: float) -> float:
"""
Divide the first number by the second and return the result.
Args:
a (float): The numerator.
b (float): The denominator. Must not be zero.
Returns:
float: The quotient of a and b.
"""
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""
Compute the modulus (remainder) of two integers.
Args:
a (int): The dividend.
b (int): The divisor.
Returns:
int: The remainder when a is divided by b.
"""
if b == 0:
raise ValueError("Cannot divide by zero")
return a % b
@tool
def power(a: float, b: float) -> float:
"""
Raise the first number to the power of the second and return the result.
Args:
a (float): The base number.
b (float): The exponent.
Returns:
float: a raised to the power of b.
"""
return a ** b
@tool
def square_root(a: float) -> float:
"""
Compute the square root of a number. Returns a complex number if input is negative.
Args:
a (float): The number to compute the square root of.
Returns:
float or complex: The square root of a. If a < 0, returns a complex number.
"""
if a >= 0:
return a ** 0.5
return cmath.sqrt(a)
# ========== WEB/SEARCH TOOLS ==========
@tool
def wiki_search(input: str) -> str:
"""
Search Wikipedia for a query and return up to 3 results as formatted text.
Args:
input (str): The search query string for Wikipedia.
Returns:
str: Formatted search results from Wikipedia with source information and content.
"""
try:
if not WIKILOADER_AVAILABLE:
return json.dumps({
"type": "tool_response",
"tool_name": "wiki_search",
"error": "Wikipedia search not available. Install with: pip install langchain-community"
})
search_docs = WikipediaLoader(query=input, load_max_docs=SEARCH_LIMIT).load()
formatted_results = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}'
for doc in search_docs
]
)
return json.dumps({
"type": "tool_response",
"tool_name": "wiki_search",
"wiki_results": formatted_results
})
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "wiki_search",
"error": f"Error in Wikipedia search: {str(e)}"
})
@tool
def web_search(input: str) -> str:
"""
Search the web using Tavily for a query and return up to 3 results as formatted text.
Tavily is a search API that provides real-time web search results. This tool is useful for:
- Finding current information about recent events
- Searching for specific facts, statistics, or data
- Getting up-to-date information from various websites
- Researching topics that may not be covered in Wikipedia or academic papers
Args:
input (str): The search query string to search for on the web.
Returns:
str: Formatted search results from Tavily with source URLs and content snippets.
Returns an error message if Tavily is not available or if the search fails.
"""
if not TAVILY_AVAILABLE:
return json.dumps({
"type": "tool_response",
"tool_name": "web_search",
"error": "Tavily search not available. Install with: pip install langchain-tavily"
})
try:
if not os.environ.get("TAVILY_API_KEY"):
return json.dumps({
"type": "tool_response",
"tool_name": "web_search",
"error": "TAVILY_API_KEY not found in environment variables. Please set it in your .env file."
})
search_result = TavilySearch(max_results=SEARCH_LIMIT).invoke(input)
# Handle different response types
if isinstance(search_result, str):
# If Tavily returned a string (error message or direct answer)
return json.dumps({
"type": "tool_response",
"tool_name": "web_search",
"web_results": search_result
})
elif isinstance(search_result, list):
# If Tavily returned a list of Document objects
formatted_results = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}'
for doc in search_result
]
)
return json.dumps({
"type": "tool_response",
"tool_name": "web_search",
"web_results": formatted_results
})
else:
return json.dumps({
"type": "tool_response",
"tool_name": "web_search",
"web_results": str(search_result)
})
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "web_search",
"error": f"Error in web search: {str(e)}"
})
@tool
def arxiv_search(input: str) -> str:
"""
Search Arxiv for academic papers and return up to 3 results as formatted text.
Args:
input (str): The search query string for academic papers.
Returns:
str: Formatted search results from Arxiv with paper metadata and abstracts.
"""
try:
if not ARXIVLOADER_AVAILABLE:
return json.dumps({
"type": "tool_response",
"tool_name": "arxiv_search",
"error": "Arxiv search not available. Install with: pip install langchain-community"
})
if not PYMUPDF_AVAILABLE:
return json.dumps({
"type": "tool_response",
"tool_name": "arxiv_search",
"error": "PyMuPDF package not found, please install it with pip install pymupdf"
})
search_docs = ArxivLoader(query=input, load_max_docs=SEARCH_LIMIT).load()
formatted_results = "\n\n---\n\n".join(
[
f'<Document title="{doc.metadata.get("Title", "Unknown")}" authors="{doc.metadata.get("Authors", "Unknown")}" published="{doc.metadata.get("Published", "Unknown")}"/>\n{doc.page_content}'
for doc in search_docs
]
)
return json.dumps({
"type": "tool_response",
"tool_name": "arxiv_search",
"arxiv_results": formatted_results
})
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "arxiv_search",
"error": f"Error in Arxiv search: {str(e)}"
})
# ========== FILE/DATA TOOLS ==========
@tool
def read_text_based_file(
file_reference: str,
read_html_as_markdown: bool = True,
extract_images: bool = False,
agent=None,
) -> str:
"""
Read text-based files and return content as text.
This is the general-purpose text file reader for most formats.
Use this tool when you need file content - for specialized analysis, use dedicated tools.
Supported file types:
- Text files: .txt, .md, .log, .rtf
- Code files: .py, .js, .ts, .html, .htm, .css, .sql, .java, .cpp, .c, .php, .rb, .go
- Configuration files: .ini, .cfg, .conf, .env, .properties, .yaml, .yml
- Structured text: .json, .xml, .svg
- Web files: .html, .htm (see HTML options below)
- Documentation: .md, .rst, .tex
- Office documents: .docx, .xlsx, .pptx (converted to Markdown for clean text extraction)
- PDF files: .pdf (extracts text content as Markdown)
HTML-specific options:
- read_html_as_markdown (bool, default=True): Converts HTML to Markdown to save tokens
and improve readability. Set to False only when you need the raw HTML structure
(e.g., extracting specific tags, attributes, CSS, or JavaScript).
Image extraction options:
- extract_images (bool, default=False): For PDF and Office documents. If True,
extracts embedded images and includes them in the response. Images are saved
to temporary files and can be referenced in future turns. Set to True only
when you need the images - saves tokens and processing time when False.
XLSX/Excel: When to use this tool vs analyze_excel_file?
- read_text_based_file: Use for simple text extraction - reads Excel content as Markdown
tables. Good for understanding document structure and reading text content.
- analyze_excel_file: Use when you need data analysis, statistics, pandas operations,
charts, or need to query/transform the data.
The tool automatically:
- Detects file encoding and handles multiple encodings (UTF-8, Latin-1, CP1252, ISO-8859-1)
- Resolves filenames to full file paths
- Downloads files from URLs
- Provides file metadata (name, size, encoding) in results
Args:
file_reference (str): Original filename from user upload OR URL to download
read_html_as_markdown (bool): For HTML files. If True (default), converts HTML to
Markdown for token efficiency and readability. Set False only when raw HTML
structure is needed.
extract_images (bool): For PDF/Office files. If True, extracts embedded images
along with text. Default False (backward compatible).
agent: Agent instance for file resolution (injected automatically)
Returns:
str: The text content of the file with metadata, or an error message if reading fails
"""
from .file_utils import FileUtils
from .local_path_text import read_local_path_to_plain_text
ref = (file_reference or "").strip()
file_path = FileUtils.resolve_file_reference(ref, agent)
if not file_path:
return FileUtils.create_tool_response("read_text_based_file", error=f"File not found: {ref}")
file_info = FileUtils.get_file_info(file_path)
if not file_info.exists:
return FileUtils.create_tool_response("read_text_based_file", error=file_info.error)
content, read_err, enc, image_paths, markdown_path = read_local_path_to_plain_text(
file_path, read_html_as_markdown=read_html_as_markdown, extract_images=extract_images, _file_info=file_info
)
if read_err:
return FileUtils.create_tool_response(
"read_text_based_file",
error=read_err,
file_info=FileUtils.file_info_for_tool_response(file_info, ref),
)
display_name = ref
size_str = FileUtils.format_file_size(file_info.size)
extl = (file_info.extension or "").lower()
if extl == ".html" and read_html_as_markdown:
header = f"File: {display_name} ({size_str}, converted to Markdown)"
elif extl == ".html" and not read_html_as_markdown:
header = f"File: {display_name} ({size_str}, raw HTML)"
elif enc and enc != "utf-8":
header = f"File: {display_name} ({size_str}, {enc} encoding)"
else:
header = f"File: {display_name} ({size_str})"
result_text = f"{header}\n\nContent:\n{content}"
extra = {}
if image_paths and agent is not None:
registered_names = []
base_name = Path(ref).stem
for idx, abs_path in enumerate(image_paths):
try:
logical_name = f"{base_name}_image_{idx + 1}{Path(abs_path).suffix}"
agent.register_file(logical_name, abs_path)
registered_names.append(logical_name)
logger.debug("Registered extracted image: %s -> %s", logical_name, abs_path)
except Exception as reg_err:
logger.warning("Failed to register image %s: %s", abs_path, reg_err)
if registered_names:
extra["image_paths"] = registered_names
elif image_paths:
extra["image_paths"] = image_paths
if markdown_path and agent is not None:
try:
md_name = f"{Path(ref).stem}_extracted.md"
agent.register_file(md_name, markdown_path)
extra["markdown_path"] = md_name
logger.debug("Registered markdown: %s -> %s", md_name, markdown_path)
except Exception as reg_err:
logger.warning("Failed to register markdown: %s", reg_err)
if markdown_path:
extra["markdown_path"] = markdown_path
elif markdown_path:
extra["markdown_path"] = markdown_path
return FileUtils.create_tool_response(
"read_text_based_file",
result=result_text,
file_info=FileUtils.file_info_for_tool_response(file_info, ref),
extra=extra if extra else None,
)
# ========== PANDAS QUERY/PIPELINE HELPERS ==========
def _safe_to_markdown(df: pd.DataFrame, max_rows: int = 10, max_cols: int = 20) -> str:
preview_df = df.head(max_rows)
if max_cols is not None:
preview_df = preview_df.iloc[:, :max_cols]
try:
return preview_df.to_markdown(index=False)
except Exception:
return preview_df.to_string(index=False)
def _dataframe_schema(df: pd.DataFrame) -> dict[str, str]:
return {str(col): str(dtype) for col, dtype in df.dtypes.items()}
def _truncate_records(df: pd.DataFrame, max_rows: int = 100, max_cols: int = 50, max_cell_chars: int = 500) -> list[dict[str, Any]]:
limited = df.head(max_rows)
if max_cols is not None:
limited = limited.iloc[:, :max_cols]
def _truncate_val(v: Any) -> Any:
try:
s = str(v)
except Exception:
return v
if len(s) > max_cell_chars:
return s[: max_cell_chars - 1] + "…"
return v
return [{k: _truncate_val(v) for k, v in row.items()} for row in limited.to_dict(orient="records")]
_ALLOWED_OPS: dict[str, Literal["df_method", "special"]] = {
"query": "df_method",
"assign": "df_method",
"rename": "df_method",
"drop": "df_method",
"dropna": "df_method",
"fillna": "df_method",
"astype": "df_method",
"sort_values": "df_method",
"head": "df_method",
"tail": "df_method",
"sample": "df_method",
"value_counts": "df_method",
"nlargest": "df_method",
"nsmallest": "df_method",
"reset_index": "df_method",
"set_index": "df_method",
"pivot_table": "df_method",
"melt": "df_method",
"stack": "df_method",
"unstack": "df_method",
"groupby": "special",
}
def _coerce_tabular(obj: Any, step_name: str) -> pd.DataFrame:
if isinstance(obj, pd.DataFrame):
return obj
if isinstance(obj, pd.Series):
return obj.to_frame(name=step_name or "value").reset_index()
return pd.DataFrame(obj)
def _dispatch_pipeline(df: pd.DataFrame, steps: list[dict[str, Any]]) -> pd.DataFrame:
current = df
for i, step in enumerate(steps):
if not isinstance(step, dict):
raise ValueError(f"Pipeline step {i} must be an object")
op = step.get("op")
if not isinstance(op, str) or op.startswith("__"):
raise ValueError(f"Invalid op at step {i}")
kind = _ALLOWED_OPS.get(op)
if kind is None:
raise ValueError(f"Op '{op}' not allowed")
if kind == "df_method":
method = getattr(current, op, None)
if method is None or not callable(method):
raise ValueError(f"Method '{op}' not available on DataFrame")
kwargs = {k: v for k, v in step.items() if k != "op"}
result = method(**kwargs) if kwargs else method()
current = _coerce_tabular(result, op)
else:
if op == "groupby":
by = step.get("by")
gb = current.groupby(by=by, dropna=False, observed=False)
if "agg" in step:
result = gb.agg(step.get("agg"))
current = _coerce_tabular(result, op)
elif step.get("size") is True:
current = gb.size().reset_index(name="size")
else:
raise ValueError("groupby requires 'agg' or size=true")
else:
raise ValueError(f"Unsupported special op: {op}")
return current
def _apply_pandas_query(
df: pd.DataFrame,
query: str | None,
preview_opts: dict[str, Any] | None = None,
plot_opts: dict[str, Any] | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
preview = preview_opts or {"rows": 10, "cols": 20, "include_schema": True}
plots: list[str] = []
original_shape = tuple(df.shape)
transformed = df
if query and isinstance(query, str) and query.strip():
q = query.strip()
try:
if q.startswith("{") and q.endswith("}"):
cfg = json.loads(q)
if isinstance(cfg.get("pipeline"), list):
transformed = _dispatch_pipeline(df, cfg["pipeline"]) # type: ignore[arg-type]
elif isinstance(cfg.get("expr"), str):
transformed = df.query(cfg["expr"]) # type: ignore[arg-type]
plot_opts = cfg.get("plot") or plot_opts
preview = cfg.get("preview") or preview
elif q.startswith("[") and q.endswith("]"):
steps = json.loads(q)
transformed = _dispatch_pipeline(df, steps)
elif q.lower().startswith("expr:"):
expr = q.split(":", 1)[1].strip()
transformed = df.query(expr)
else:
transformed = df.query(q)
except Exception as e:
raise ValueError(f"Failed to apply query: {e}")
if plot_opts and MATPLOTLIB_AVAILABLE and plt is not None:
try:
kind = plot_opts.get("kind", "bar")
x = plot_opts.get("x")
y = plot_opts.get("y")
fig = plt.figure()
ax = fig.gca()
data = transformed
if x is None and y is None and kind in ("bar", "barh"):
non_numeric = [c for c in data.columns if not pd.api.types.is_numeric_dtype(data[c])]
target_col = non_numeric[0] if non_numeric else data.columns[0]
vc = data[target_col].value_counts().head(20)
vc.plot(kind=kind, ax=ax)
else:
data.plot(kind=kind, x=x, y=y, ax=ax)
plot_path = os.path.join(tempfile.gettempdir(), f"df_plot_{uuid.uuid4().hex}.png")
fig.savefig(plot_path, bbox_inches="tight")
plt.close(fig)
plots.append(encode_image(plot_path))
except Exception:
pass
rows = int(preview.get("rows", 10))
cols = int(preview.get("cols", 20))
include_schema = bool(preview.get("include_schema", True))
table_markdown = _safe_to_markdown(transformed, rows, cols)
table_records = _truncate_records(transformed, max_rows=min(rows, 1000), max_cols=min(cols, 100))
payload: dict[str, Any] = {
"original_shape": original_shape,
"shape": tuple(transformed.shape),
"table_markdown": table_markdown,
"table_records": table_records,
}
if include_schema:
payload["schema"] = _dataframe_schema(transformed)
try:
if transformed.shape[0] <= 5000 and transformed.shape[1] <= 50:
payload["describe_summary"] = str(transformed.describe(include="all", datetime_is_numeric=True))
except Exception:
pass
if plots:
payload["plots"] = plots
return transformed, payload
@tool
def analyze_csv_file(file_reference: str, query: str, agent=None) -> str:
"""
Analyze CSV files and return summary statistics and column information.
This tool can process CSV files with various formats and encodings:
- Standard CSV files: .csv
- Tab-separated files: .tsv, .txt (with tab delimiters)
- Comma-separated files with different encodings
The tool automatically:
- Detects delimiters and handles common CSV variations
- Resolves filenames to full file paths via agent's file registry
- Downloads files from URLs automatically
- Provides comprehensive analysis including data types, statistics, and column information
Args:
file_reference (str): Original filename from user upload OR URL to download
query (str): A question or description of the analysis to perform (currently unused)
agent: Agent instance for file resolution (injected automatically)
Returns:
str: Summary statistics and column information, or an error message if analysis fails.
"""
from .file_utils import FileUtils
# Resolve file reference (filename or URL) to full path
file_path = FileUtils.resolve_file_reference(file_reference, agent)
if not file_path:
return FileUtils.create_tool_response("analyze_csv_file", error=f"File not found: {file_reference}")
# Check file exists using utilities with Pydantic validation
file_info = FileUtils.get_file_info(file_path)
if not file_info.exists:
return FileUtils.create_tool_response("analyze_csv_file", error=file_info.error)
try:
df = pd.read_csv(file_path)
_, payload = _apply_pandas_query(
df,
query=query if isinstance(query, str) and query.strip() else None,
preview_opts=None,
plot_opts=None,
)
header = (
f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
f"File: {file_info.name} ({FileUtils.format_file_size(file_info.size)})\n"
)
result_parts = [header]
if payload.get("table_markdown"):
result_parts.append("Preview:\n" + payload["table_markdown"])
if payload.get("describe_summary"):
result_parts.append("\n\nSummary statistics:\n" + str(payload["describe_summary"]))
result_text = "\n".join(result_parts)
return FileUtils.create_tool_response(
"analyze_csv_file",
result=result_text,
file_info=file_info,
extra=payload,
)
except Exception as e:
return FileUtils.create_tool_response("analyze_csv_file", error=f"Error analyzing CSV file: {str(e)}")
@tool
def analyze_excel_file(file_reference: str, query: str, agent=None) -> str:
"""
Analyze Excel files and return summary statistics and column information.
This tool can process Excel files in various formats:
- Excel files: .xlsx, .xls
- Excel workbooks with multiple sheets
- Excel files with different encodings and formats
When to use this tool vs read_text_based_file?
- Use analyze_excel_file for: data analysis, statistics, pandas operations, charts,
querying/transforming data, understanding numerical patterns, describe() summaries.
- Use read_text_based_file for: simple text extraction, reading cell content as text,
understanding document structure when you don't need analytics.
The tool automatically:
- Detects sheet structure and provides comprehensive analysis
- Resolves filenames to full file paths via agent's file registry
- Downloads files from URLs automatically
- Provides data types, statistics, column information, and sheet details
Args:
file_reference (str): Original filename from user upload OR URL to download
query (str): A question or description of the analysis to perform (currently unused)
agent: Agent instance for file resolution (injected automatically)
Returns:
str: Summary statistics and column information, or an error message if analysis fails.
"""
from .file_utils import FileUtils
# Resolve file reference (filename or URL) to full path
file_path = FileUtils.resolve_file_reference(file_reference, agent)
if not file_path:
return FileUtils.create_tool_response("analyze_excel_file", error=f"File not found: {file_reference}")
# Check file exists using utilities with Pydantic validation
file_info = FileUtils.get_file_info(file_path)
if not file_info.exists:
return FileUtils.create_tool_response("analyze_excel_file", error=file_info.error)
try:
df = pd.read_excel(file_path)
_, payload = _apply_pandas_query(
df,
query=query if isinstance(query, str) and query.strip() else None,
preview_opts=None,
plot_opts=None,
)
header = (
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
f"File: {file_info.name} ({FileUtils.format_file_size(file_info.size)})\n"
)
result_parts = [header]
if payload.get("table_markdown"):
result_parts.append("Preview:\n" + payload["table_markdown"])
if payload.get("describe_summary"):
result_parts.append("\n\nSummary statistics:\n" + str(payload["describe_summary"]))
result_text = "\n".join(result_parts)
return FileUtils.create_tool_response(
"analyze_excel_file",
result=result_text,
file_info=file_info,
extra=payload,
)
except Exception as e:
# Enhanced error reporting: print columns and head if possible
try:
df = pd.read_excel(file_path)
columns = list(df.columns)
head = df.head().to_dict("records")
error_details = f"Error analyzing Excel file: {str(e)}\nColumns: {columns}\nHead: {head}"
except Exception as inner_e:
error_details = f"Error analyzing Excel file: {str(e)}\nAdditionally, failed to read columns/head: {str(inner_e)}"
return FileUtils.create_tool_response("analyze_excel_file", error=error_details)
# ========== IMAGE ANALYSIS/GENERATION TOOLS ==========
@tool
def analyze_image(file_reference: str, agent=None) -> str:
"""
LEGACY: Analyze basic properties of an image (size, mode, color analysis, thumbnail preview).
⚠️ This is a primitive metadata parser that only extracts technical properties.
For AI-powered image understanding, use analyze_image_ai() instead.
This tool provides:
- Image dimensions and format
- Basic color analysis (average RGB, brightness, dominant color)
- Thumbnail generation
For semantic understanding (what's in the image, OCR, object detection, etc.),
use analyze_image_ai() which uses vision-language models.
The tool automatically:
- Resolves filenames to full file paths via agent's file registry
- Downloads files from URLs automatically
- Provides comprehensive image analysis including dimensions, color analysis, and thumbnails
Args:
file_reference (str): Original filename from user upload OR URL to download
agent: Agent instance for file resolution (injected automatically)
Returns:
str: JSON string with analysis results including dimensions, mode, color_analysis, and thumbnail.
"""
from .file_utils import FileUtils
try:
# Resolve file reference (filename or URL) to full path
file_path = FileUtils.resolve_file_reference(file_reference, agent)
if not file_path:
return FileUtils.create_tool_response("analyze_image", error=f"File not found: {file_reference}")
# Open image from file path
img = Image.open(file_path)
width, height = img.size
mode = img.mode
if mode in ("RGB", "RGBA"):
arr = np.array(img)
avg_colors = arr.mean(axis=(0, 1))
dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])]
brightness = avg_colors.mean()
color_analysis = {
"average_rgb": avg_colors.tolist(),
"brightness": brightness,
"dominant_color": dominant,
}
else:
color_analysis = {"note": f"No color analysis for mode {mode}"}
thumbnail = img.copy()
thumbnail.thumbnail((100, 100))
thumb_path = save_image(thumbnail, "thumbnails")
thumbnail_base64 = encode_image(thumb_path)
result = {
"dimensions": (width, height),
"mode": mode,
"color_analysis": color_analysis,
"thumbnail": thumbnail_base64,
}
return FileUtils.create_tool_response("analyze_image", result=json.dumps(result))
except Exception as e:
return FileUtils.create_tool_response("analyze_image", error=str(e))
@tool
def analyze_image_ai(
file_reference: str,
prompt: str,
system_prompt: str = None,
agent=None
) -> str:
"""
AI-powered image analysis using vision-language models.
This tool uses advanced vision-language models to understand image content semantically:
- Describe what's in the image
- Answer questions about the image
- Extract text (OCR with 100% accuracy)
- Identify objects, people, scenes
- Analyze charts, graphs, diagrams
- Read documents and forms
For basic metadata (dimensions, colors), use the legacy analyze_image() instead.
Args:
file_reference (str): Uploaded image filename, or a URL to an image.
prompt (str): Question or instruction about the image (e.g., "What's in this image?")
system_prompt (str, optional): System instruction for the model
agent: Agent instance for file resolution (injected automatically)
Returns:
str: JSON string with AI analysis result or error message
"""
from .file_utils import FileUtils
try:
from agent_ng.vision_input import VisionInput
from agent_ng.vision_tool_manager import VisionToolManager
lowered_ref = file_reference.strip().lower()
is_direct_url = lowered_ref.startswith("http://") or lowered_ref.startswith(
"https://"
)
if is_direct_url:
vision_input = VisionInput(
prompt=prompt,
image_url=file_reference.strip(),
)
else:
file_path = FileUtils.resolve_file_reference(file_reference, agent)
if not file_path:
return FileUtils.create_tool_response(
"analyze_image_ai",
error=f"File not found: {file_reference}",
)
if file_path.lower().endswith(".pdf"):
return FileUtils.create_tool_response(
"analyze_image_ai",
error="PDF files cannot be analyzed directly as images. "
"Most vision models don't support PDFs natively. "
"To analyze visual PDF content:\n"
"1. First: read_text_based_file(file_reference='DOC.pdf', extract_images=True)\n"
" - This extracts images from PDF pages\n"
"2. Then: analyze the extracted images individually\n"
" - Use analyze_image_ai for each extracted image path",
)
vision_input = VisionInput(prompt=prompt, image_path=file_path)
# Initialize VisionToolManager
import os
os.environ["OPENROUTER_FETCH_PRICING_AT_STARTUP"] = "false"
manager = VisionToolManager()
# Analyze image
result = manager.analyze(vision_input)
# Return result
return FileUtils.create_tool_response(
"analyze_image_ai",
result=result,
extra={
"file": file_reference,
"model_used": manager.vl_model
}
)
except Exception as e:
return FileUtils.create_tool_response(
"analyze_image_ai",
error=f"Analysis failed: {str(e)}"
)
class TransformImageParams(BaseModel):
width: int | None = Field(None, description="New width for resize operation")
height: int | None = Field(None, description="New height for resize operation")
angle: int | None = Field(None, description="Rotation angle in degrees")
direction: Literal["horizontal", "vertical"] | None = Field(None, description="Flip direction")
radius: float | None = Field(None, description="Blur radius")
factor: float | None = Field(None, description="Enhancement factor for brightness/contrast")
@tool(args_schema=TransformImageParams)
def transform_image(image_base64: str, operation: str, params: dict[str, Any] | None = None) -> str:
"""
Transform an image using various operations like resize, rotate, filter, etc.
Args:
image_base64 (str): The base64-encoded string of the image to transform.
operation (str): The transformation operation to apply.
params (Dict[str, Any], optional): Parameters for the transformation.
Returns:
str: JSON string with the transformed image as base64 or error message.
"""
try:
img = decode_image(image_base64)
params = params or {}
if operation == "resize":
width = params.get("width", img.width)
height = params.get("height", img.height)
img = img.resize((width, height), Image.Resampling.LANCZOS)
elif operation == "rotate":
angle = params.get("angle", 0)
img = img.rotate(angle, expand=True)
elif operation == "flip":
direction = params.get("direction", "horizontal")
if direction == "horizontal":
img = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
else:
img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM)
elif operation == "blur":
radius = params.get("radius", 2)
img = img.filter(ImageFilter.GaussianBlur(radius=radius))
elif operation == "sharpen":
img = img.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3))
elif operation == "brightness":
factor = params.get("factor", 1.0)
enhancer = ImageEnhance.Brightness(img)
img = enhancer.enhance(factor)
elif operation == "contrast":
factor = params.get("factor", 1.0)
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(factor)
else:
return json.dumps({
"type": "tool_response",
"tool_name": "transform_image",
"error": f"Unsupported operation: {operation}"
}, indent=2)
result_path = save_image(img)
result_base64 = encode_image(result_path)
return json.dumps({
"type": "tool_response",
"tool_name": "transform_image",
"transformed_image": result_base64
}, indent=2)
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "transform_image",
"error": str(e)
}, indent=2)
class DrawOnImageParams(BaseModel):
text: str | None = Field(None, description="Text to draw")
position: list[int] | None = Field(None, description="Text position [x, y]")
color: str | None = Field(None, description="Color name (e.g., 'red', 'blue') or RGB string (e.g., '255,0,0')")
size: int | None = Field(None, description="Font size for text")
coords: list[int] | None = Field(None, description="Rectangle coordinates [x1, y1, x2, y2]")
center: list[int] | None = Field(None, description="Circle center [x, y]")
radius: int | None = Field(None, description="Circle radius")
start: list[int] | None = Field(None, description="Line start [x, y]")
end: list[int] | None = Field(None, description="Line end [x, y]")
width: int | None = Field(None, description="Stroke width")
@tool(args_schema=DrawOnImageParams)
def draw_on_image(image_base64: str, drawing_type: str, params: DrawOnImageParams) -> str:
"""
Draw shapes, text, or other elements on an image.
Args:
image_base64 (str): The base64-encoded string of the image to draw on.
drawing_type (str): The type of drawing to perform.
params (Dict[str, Any]): Parameters for the drawing operation.
Returns:
str: JSON string with the modified image as base64 or error message.
"""
try:
img = decode_image(image_base64)
draw = ImageDraw.Draw(img)
def parse_color(color_str):
"""Parse color string to RGB tuple or color name"""
if not color_str:
return "black"
# Check if it's RGB values as comma-separated string
if "," in color_str and color_str.replace(",", "").replace(" ", "").isdigit():
try:
rgb_values = [int(x.strip()) for x in color_str.split(",")]
if len(rgb_values) == 3 and all(0 <= v <= 255 for v in rgb_values):
return tuple(rgb_values)
except ValueError:
pass
# Return as color name
return color_str
if drawing_type == "text":
text = params.text or ""
position = params.position or [10, 10]
color = parse_color(params.color) or "black"
size = params.size or 20
try:
font = ImageFont.truetype("arial.ttf", size)
except:
font = ImageFont.load_default()
draw.text(tuple(position), text, fill=color, font=font)
elif drawing_type == "rectangle":
coords = params.coords or [10, 10, 100, 100]
color = parse_color(params.color) or "red"
width = params.width or 2
draw.rectangle(coords, outline=color, width=width)
elif drawing_type == "circle":
center = params.center or [50, 50]
radius = params.radius or 30
color = parse_color(params.color) or "blue"
width = params.width or 2
bbox = [center[0] - radius, center[1] - radius,
center[0] + radius, center[1] + radius]
draw.ellipse(bbox, outline=color, width=width)
elif drawing_type == "line":
start = params.start or [10, 10]
end = params.end or [100, 100]
color = parse_color(params.color) or "green"
width = params.width or 2
draw.line([tuple(start), tuple(end)], fill=color, width=width)
else:
return json.dumps({
"type": "tool_response",
"tool_name": "draw_on_image",
"error": f"Unsupported drawing type: {drawing_type}"
}, indent=2)
result_path = save_image(img)
result_base64 = encode_image(result_path)
return json.dumps({
"type": "tool_response",
"tool_name": "draw_on_image",
"modified_image": result_base64
}, indent=2)
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "draw_on_image",
"error": str(e)
}, indent=2)
# ------------------------------------------------------------------------- #
# AI image generation (OpenRouter) #
# ------------------------------------------------------------------------- #
# Root for session-isolated image outputs. Patched in tests. Matches the
# convention established by ``tools/file_utils.py::save_base64_to_file``.
_IMAGE_OUTPUT_ROOT = ".gradio/sessions"
def _build_generate_ai_image_description() -> str:
"""Compose the LLM-facing tool description.
Includes the active model's prompt-style hint (never the slug or
vendor) so the calling LLM can adapt prompt craft to the configured
backend.
"""
body = (
"Create a new image from a text description.\n\n"
"Use this when the user asks for an illustration, icon, "
"diagram, business infographic, logo, banner, social-media "
"graphic, or any other visual that does not yet exist. The "
"generated image comes back as a chat attachment reference "
"that you can pass to other tools (for example, to attach the "
"image to a record, analyze it, or transform it).\n\n"
"`aspect_ratio` lets you request a specific shape (for example "
"`16:9` for a banner, `9:16` for a mobile story, `1:1` for a "
"square icon). `image_size` lets you request a resolution tier "
"(`1K` for small/fast, `2K` for sharper details, `4K` for "
"print-quality). Only set these when the composition really "
"depends on shape or size.\n\n"
"Returns a structured result containing the image reference "
"and the generation cost. If the call fails, the result "
"contains an `error` message explaining why — simplify the "
"prompt and try again."
)
if get_default_prompt_style_hint is None:
return body
hint = get_default_prompt_style_hint()
if not hint:
return body
return f"{body}\n\nActive image generator profile: {hint}"
_GENERATE_AI_IMAGE_DESCRIPTION = _build_generate_ai_image_description()
class GenerateAIImageParams(BaseModel):
"""Parameters for the `generate_ai_image` tool.
Deliberately does not expose model selection to the LLM — the active
model is an operations-level decision controlled by the
``IMAGE_GEN_DEFAULT_MODEL`` environment variable (see
:mod:`agent_ng.image_models`).
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
prompt: str = Field(
...,
description=(
"Natural-language description of the image to create. Spell "
"out any text that must appear in the picture exactly as it "
"should be rendered."
),
)
aspect_ratio: str | None = Field(
None,
description=(
"Optional desired shape of the image, written as 'W:H' "
"(for example '1:1' for a square icon, '16:9' for a banner, "
"'9:16' for a mobile story)."
),
)
image_size: Literal["1K", "2K", "4K"] | None = Field(
None,
description=(
"Optional resolution tier: '1K' for small/fast, '2K' for "
"sharper details, '4K' for print-quality."
),
)
agent: Annotated[Any | None, InjectedToolArg] = Field(
default=None,
description="Runtime-injected; not supplied by the LLM.",
)
@tool(
"generate_ai_image",
args_schema=GenerateAIImageParams,
description=_GENERATE_AI_IMAGE_DESCRIPTION,
)
def generate_ai_image(
prompt: str,
aspect_ratio: str | None = None,
image_size: str | None = None,
agent: Annotated[Any | None, InjectedToolArg] = None,
) -> dict[str, Any]:
"""Create a new image from a text description.
The image generator is chosen by the deployment (via
``IMAGE_GEN_DEFAULT_MODEL``); the calling LLM only decides on the
prompt, aspect ratio and size.
Returns a structured result. On success it includes a
``file_reference`` (an attachment name usable by other chat tools),
the ``cost`` in USD, the output ``mime_type`` and ``size_bytes``. On
failure it includes an ``error`` string and no attachment.
"""
if ImageEngine is None:
return {
"success": False,
"error": "Image generation engine is not available.",
"file_reference": None,
"cost": None,
}
try:
engine = ImageEngine()
except ValueError as exc:
return {
"success": False,
"error": str(exc),
"file_reference": None,
"cost": None,
}
result = engine.generate(
prompt=prompt,
aspect_ratio=aspect_ratio,
image_size=image_size,
)
if not result.success or result.image_bytes is None:
return {
"success": False,
"error": result.error or "image generation failed",
"file_reference": None,
"cost": result.cost,
}
# Resolve session id: prefer the injected agent, fall back to ContextVar.
session_id: str | None = getattr(agent, "session_id", None) if agent else None
if not session_id and get_current_session_id is not None:
session_id = get_current_session_id()
# Write bytes to disk — either into a session dir (preferred) or mkstemp.
ext = _extension_for_mime(result.mime_type)
display_name = _make_display_name(ext)
try:
disk_path = _write_image_bytes(
image_bytes=result.image_bytes,
display_name=display_name,
session_id=session_id,
)
except OSError as exc:
return {
"success": False,
"error": f"Failed to write image bytes: {exc}",
"file_reference": None,
"cost": result.cost,
}
# Register with the agent if we have one.
if agent is not None and callable(getattr(agent, "register_file", None)):
try:
agent.register_file(display_name, disk_path)
except Exception as exc:
logger.warning("register_file failed for %s: %s", display_name, exc)
try:
os.unlink(disk_path)
except OSError as oe:
logger.debug("temp cleanup after register failure: %s", oe)
return {
"success": False,
"error": f"register_file failed: {exc}",
"file_reference": None,
"cost": result.cost,
}
file_reference = display_name
else:
file_reference = os.path.abspath(disk_path)
return {
"success": True,
"error": None,
"file_reference": file_reference,
"cost": result.cost,
"mime_type": result.mime_type,
"size_bytes": len(result.image_bytes),
}
# ---- helpers for generate_ai_image -------------------------------------- #
def _extension_for_mime(mime: str | None) -> str:
"""Map a MIME type to a canonical file extension (falls back to .png)."""
mapping = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/webp": ".webp",
"image/gif": ".gif",
}
return mapping.get((mime or "").lower(), ".png")
def _make_display_name(ext: str) -> str:
"""Compose a unique, human-readable filename for a generated image."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"llm_image_{timestamp}_{uuid.uuid4().hex[:8]}{ext}"
def _write_image_bytes(
image_bytes: bytes,
display_name: str,
session_id: str | None,
) -> str:
"""Persist ``image_bytes`` and return the absolute on-disk path.
When ``session_id`` is truthy, files go under
``{_IMAGE_OUTPUT_ROOT}/{session_id}/``; otherwise a ``tempfile.mkstemp``
path is used.
"""
if session_id:
session_dir = Path(_IMAGE_OUTPUT_ROOT) / session_id
session_dir.mkdir(parents=True, exist_ok=True)
path = session_dir / display_name
path.write_bytes(image_bytes)
return str(path)
fd, tmp = tempfile.mkstemp(suffix=Path(display_name).suffix)
try:
with os.fdopen(fd, "wb") as f:
f.write(image_bytes)
except OSError:
# Best-effort cleanup of the partial temp file; then re-raise.
try:
os.unlink(tmp)
except OSError:
pass
raise
return tmp
class CombineImagesParams(BaseModel):
spacing: int | None = Field(None, description="Spacing between images in pixels")
background_color: str | None = Field(None, description="Background color for collage (e.g., 'white', 'black') or RGB string (e.g., '255,255,255')")
blend_mode: str | None = Field(None, description="Blend mode for blending operations")
opacity: float | None = Field(None, description="Opacity for overlay operations (0.0-1.0)")
@tool(args_schema=CombineImagesParams)
def combine_images(images_base64: list[str], operation: str,
params: CombineImagesParams | None = None) -> str:
"""
Combine multiple images using various operations (collage, stack, blend, horizontal, vertical, overlay, etc.).
Args:
images_base64 (List[str]): List of base64-encoded image strings.
operation (str): The combination operation to perform.
params (Dict[str, Any], optional): Parameters for the combination.
Returns:
str: JSON string with the combined image as base64 or error message.
"""
try:
if len(images_base64) < 2:
return json.dumps({
"type": "tool_response",
"tool_name": "combine_images",
"error": "At least 2 images required for combination"
}, indent=2)
images = [decode_image(b64) for b64 in images_base64]
if params is None:
params = CombineImagesParams()
if operation == "horizontal":
# Combine images side by side
total_width = sum(img.width for img in images)
max_height = max(img.height for img in images)
result = Image.new("RGB", (total_width, max_height))
x_offset = 0
for img in images:
result.paste(img, (x_offset, 0))
x_offset += img.width
elif operation == "vertical":
# Stack images vertically
max_width = max(img.width for img in images)
total_height = sum(img.height for img in images)
result = Image.new("RGB", (max_width, total_height))
y_offset = 0
for img in images:
result.paste(img, (0, y_offset))
y_offset += img.height
elif operation == "overlay":
# Overlay images on top of each other
base_img = images[0]
for overlay_img in images[1:]:
if overlay_img.size != base_img.size:
overlay_img = overlay_img.resize(base_img.size, Image.Resampling.LANCZOS)
base_img = Image.alpha_composite(base_img.convert("RGBA"), overlay_img.convert("RGBA"))
result = base_img.convert("RGB")
elif operation == "stack":
# Original stack operation with direction parameter
direction = params.direction or "horizontal"
if direction == "horizontal":
total_width = sum(img.width for img in images)
max_height = max(img.height for img in images)
result = Image.new("RGB", (total_width, max_height))
x = 0
for img in images:
result.paste(img, (x, 0))
x += img.width
else:
max_width = max(img.width for img in images)
total_height = sum(img.height for img in images)
result = Image.new("RGB", (max_width, total_height))
y = 0
for img in images:
result.paste(img, (0, y))
y += img.height
else:
return json.dumps({
"type": "tool_response",
"tool_name": "combine_images",
"error": f"Unsupported combination operation: {operation}"
}, indent=2)
result_path = save_image(result)
result_base64 = encode_image(result_path)
return json.dumps({
"type": "tool_response",
"tool_name": "combine_images",
"combined_image": result_base64
}, indent=2)
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "combine_images",
"error": str(e)
}, indent=2)
# ========== VIDEO/AUDIO UNDERSTANDING TOOLS ==========
@tool
def understand_video(file_reference: str, prompt: str, system_prompt: str = None, agent=None,
start_time: str = None, end_time: str = None, fps: float = None) -> str:
"""
Analyze a video using vision-language models (Gemini, Qwen).
This tool uses VisionToolManager for video understanding with multiple model options.
Automatically selects the best model based on video characteristics.
Supports:
- Uploaded video files
- Direct video URLs
- YouTube URLs (routed per VL_YOUTUBE_MODEL, VL_YOUTUBE_GEMINI_PROVIDER, VL_GEMINI_PROVIDER in .env)
Args:
file_reference (str): Original filename from user upload OR direct video URL
OR YouTube URL
prompt (str): A question or request regarding the video content
system_prompt (str, optional): System instruction (not used currently)
agent: Agent instance for file resolution (injected automatically)
start_time (str, optional): Start time for video clipping in MM:SS format (e.g., "02:30")
end_time (str, optional): End time for video clipping in MM:SS format (e.g., "03:29")
fps (float, optional): Custom frame rate for video processing (default: 1 FPS)
Returns:
str: Analysis of the video content based on the prompt, or error message
"""
from .file_utils import FileUtils
try:
from agent_ng.vision_input import VisionInput
from agent_ng.vision_tool_manager import VisionToolManager
# Handle direct URLs without forcing local download first
lowered_ref = file_reference.strip().lower()
is_direct_url = lowered_ref.startswith("http://") or lowered_ref.startswith("https://")
if is_direct_url:
vision_input = VisionInput(prompt=prompt, video_url=file_reference.strip())
file_path = None
else:
# Resolve uploaded/local file reference
file_path = FileUtils.resolve_file_reference(file_reference, agent)
if not file_path:
return FileUtils.create_tool_response(
"understand_video",
error=f"File not found: {file_reference}"
)
vision_input = VisionInput(
prompt=prompt,
video_path=file_path
)
# Validate input
vision_input.validate()
# Initialize VisionToolManager
import os
os.environ["OPENROUTER_FETCH_PRICING_AT_STARTUP"] = "false"
manager = VisionToolManager()
# Analyze video using URL-aware manager routing
result = manager.analyze(vision_input)
selected_model = manager.get_model_for_input(vision_input)
# Return result
return FileUtils.create_tool_response(
"understand_video",
result=result,
extra={
"file": file_reference,
"model_used": selected_model
}
)
except Exception as e:
return FileUtils.create_tool_response(
"understand_video",
error=f"Video analysis failed: {str(e)}"
)
@tool
def understand_audio(file_reference: str, prompt: str, system_prompt: str = None, agent=None,
start_time: str = None, end_time: str = None) -> str:
"""
Analyze an audio file using vision-language models (Gemini).
This tool uses VisionToolManager for audio understanding.
Automatically uses Gemini 2.5 Flash (only model with audio support).
Args:
file_reference (str): Original filename from user upload OR URL to download
prompt (str): A question or request regarding the audio content
system_prompt (str, optional): System instruction (not used currently)
agent: Agent instance for file resolution (injected automatically)
start_time (str, optional): Start time reference in MM:SS format (e.g., "02:30")
end_time (str, optional): End time reference in MM:SS format (e.g., "03:29")
Returns:
str: Analysis of the audio content based on the prompt, or error message
"""
from .file_utils import FileUtils
try:
from agent_ng.vision_input import VisionInput
from agent_ng.vision_tool_manager import VisionToolManager
# Resolve file reference to full path
file_path = FileUtils.resolve_file_reference(file_reference, agent)
if not file_path:
return FileUtils.create_tool_response(
"understand_audio",
error=f"File not found: {file_reference}"
)
# Create VisionInput
vision_input = VisionInput(
prompt=prompt,
audio_path=file_path
)
# Initialize VisionToolManager
import os
os.environ["OPENROUTER_FETCH_PRICING_AT_STARTUP"] = "false"
manager = VisionToolManager()
# Analyze audio
result = manager.analyze_audio(audio_path=file_path, prompt=prompt)
# Return result
return FileUtils.create_tool_response(
"understand_audio",
result=result,
extra={
"file": file_reference,
"model_used": manager.vl_audio_model
}
)
except Exception as e:
return FileUtils.create_tool_response(
"understand_audio",
error=f"Audio analysis failed: {str(e)}"
)
@tool
def web_search_deep_research_exa_ai(instructions: str) -> str:
"""
Search the web and site content using deep research tool.
Ask a query and get a well-researched answer with references.
Can provide FINAL ANSWER candidate.
Args:
instructions: The prompt or query describing the research goal.
Returns:
The results of the deep research as a string.
"""
if not EXA_AVAILABLE:
return json.dumps({
"type": "tool_response",
"tool_name": "web_search_deep_research_exa_ai",
"error": "Exa not available. Install with: pip install exa-py"
})
try:
exa_api_key = os.environ.get("EXA_API_KEY")
if not exa_api_key:
return json.dumps({
"type": "tool_response",
"tool_name": "web_search_deep_research_exa_ai",
"error": "EXA_API_KEY not found in environment variables. Please set it in your .env file."
})
exa = Exa(exa_api_key)
task_stub = exa.research.create_task(
instructions=instructions,
model="exa-research-pro",
output_infer_schema = True
)
task = exa.research.poll_task(task_stub.id)
return json.dumps({
"type": "tool_response",
"tool_name": "web_search_deep_research_exa_ai",
"result": str(task)
})
except Exception as e:
return json.dumps({
"type": "tool_response",
"tool_name": "web_search_deep_research_exa_ai",
"error": f"Error in Exa research: {str(e)}"
})
# ========== PYDANTIC SCHEMAS ==========
class SubmitAnswerSchema(BaseModel):
"""
Schema for submitting final answers with structured metadata.
Use this when ready to provide a final answer and the analysis is complete.
"""
answer: str = Field(
description="The final answer to the user's question",
min_length=1
)
confidence: float = Field(
default=1.0,
ge=0.0,
le=1.0,
description="Confidence level from 0.0 to 1.0 (default: 1.0)"
)
sources: list[str] | None = Field(
default=None,
description="List of sources or tools used to generate this answer"
)
reasoning: str | None = Field(
default=None,
description="Brief explanation of the reasoning process"
)
@field_validator("sources")
@classmethod
def validate_sources(cls, v):
if v is not None and len(v) == 0:
return None
return v
class SubmitIntermediateStepSchema(BaseModel):
"""
Schema for submitting intermediate reasoning steps or progress updates.
Use this to document intermediate steps in your reasoning process,
progress updates, or partial findings before reaching a final conclusion.
"""
step_name: str = Field(
description="Short name/identifier for this step (e.g., 'data_analysis', 'search_results')",
min_length=1,
max_length=100
)
description: str = Field(
description="Detailed description of what was accomplished in this step",
min_length=1
)
status: Literal["in_progress", "completed", "failed", "blocked"] = Field(
default="in_progress",
description="Current status of this step"
)
data: dict[str, Any] | None = Field(
default=None,
description="Optional dictionary containing relevant data, findings, or results from this step"
)
next_steps: list[str] | None = Field(
default=None,
description="Optional list of planned next steps or actions"
)
confidence: float | None = Field(
default=None,
ge=0.0,
le=1.0,
description="Optional confidence level from 0.0 to 1.0 for this step's results"
)
issues: list[str] | None = Field(
default=None,
description="Optional list of issues, concerns, or limitations encountered"
)
@field_validator("next_steps", "issues")
@classmethod
def validate_lists(cls, v):
if v is not None and len(v) == 0:
return None
return v
class SubmitAnswerResult(BaseModel):
"""
Structured result model for submit_answer operations.
This model standardizes the response format for final answer submissions,
providing consistent success/error handling and response structure.
"""
success: bool
status_code: int = Field(default=200)
raw_response: dict | str | None = Field(default=None)
error: str | None = Field(default=None)
class SubmitIntermediateStepResult(BaseModel):
"""
Structured result model for submit_intermediate_step operations.
This model standardizes the response format for intermediate step submissions,
providing consistent success/error handling and response structure.
"""
success: bool
status_code: int = Field(default=200)
raw_response: dict | str | None = Field(default=None)
error: str | None = Field(default=None)
# ========== TOOL FUNCTIONS ==========
@tool("submit_answer", return_direct=False, args_schema=SubmitAnswerSchema)
def submit_answer(answer: str, confidence: float = 1.0, sources: list[str] = None, reasoning: str = None) -> dict[str, Any]:
"""
Submit a final answer using Schema-Guided Reasoning (SGR).
This tool forces the LLM to explicitly state its conclusion rather than leaving it implicit.
It preserves structured metadata while ensuring clean integration with the streaming pipeline.
Use this tool when ready to provide an answer for the current question. This can be:
- A final answer after completing all reasoning steps
- An answer to a sub-question in a multi-turn conversation
- A response that concludes the current analysis phase
This tool preserves context across multiple conversation turns.
Returns:
dict: Structured response with answer and metadata
"""
try:
# Create structured response that's easy to extract
result = {
"success": True,
"answer": answer,
"confidence": confidence,
"sources": sources or [],
"reasoning": reasoning or "",
"timestamp": time.time(),
"type": "final_answer"
}
return result
except Exception as e:
return {
"success": False,
"error": f"Error submitting answer: {str(e)}",
"type": "error"
}
@tool("submit_intermediate_step", return_direct=False, args_schema=SubmitIntermediateStepSchema)
def submit_intermediate_step(step_name: str, description: str, status: str = "in_progress",
data: dict[str, Any] = None, next_steps: list[str] = None,
confidence: float = None, issues: list[str] = None) -> dict[str, Any]:
"""
Submit an intermediate reasoning step using Schema-Guided Reasoning (SGR).
Use this tool to document intermediate steps in your reasoning process,
progress updates, or partial findings before reaching a final conclusion.
This tool helps track the agent's thought process and enables better debugging.
It helps guide structured thinking and makes the reasoning process transparent and debuggable.
Returns:
dict: Structured response with step details and metadata
"""
try:
# Create structured response that's easy to extract
result = {
"success": True,
"step_name": step_name,
"description": description,
"status": status,
"data": data or {},
"next_steps": next_steps or [],
"confidence": confidence,
"issues": issues or [],
"timestamp": time.time(),
"type": "intermediate_step"
}
return result
except Exception as e:
return {
"success": False,
"error": f"Error submitting step: {str(e)}",
"type": "error"
}
# ========== END OF TOOLS.PY ==========