Didrik Nathaniel LLoyd Aasland Skjelbred
update
07c33f2
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import WikipediaLoader,ArxivLoader
from langchain_tavily import TavilySearch
from langchain.schema import HumanMessage
from openai import OpenAI
from langchain.tools import tool
import pandas as pd
from langchain_core.callbacks.manager import CallbackManager
from langchain_core.callbacks.stdout import StdOutCallbackHandler
from langgraph.types import Command
from langchain.docstore.document import Document
from typing import List, Dict, Any, Optional
import uuid
import tempfile
from langchain.agents import Tool
from urllib.parse import urlparse
import pytesseract
from langgraph.prebuilt import create_react_agent
from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
import requests
from dotenv import load_dotenv
import os
import cmath
import httpx
from pathlib import Path
import base64
from langchain_community.tools import DuckDuckGoSearchResults
from smolagents import DuckDuckGoSearchTool,PythonInterpreterTool,WikipediaSearchTool,VisitWebpageTool,GoogleSearchTool
import numpy as np
# Helper functions for image processing
def encode_image(image_path: str) -> str:
"""Convert an image file to base64 string."""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def decode_image(base64_string: str) -> Image.Image:
"""Convert a base64 string to a PIL Image."""
image_data = base64.b64decode(base64_string)
return Image.open(io.BytesIO(image_data))
def save_image(image: Image.Image, directory: str = "image_outputs") -> str:
"""Save a PIL Image to disk and return the path."""
os.makedirs(directory, exist_ok=True)
image_id = str(uuid.uuid4())
image_path = os.path.join(directory, f"{image_id}.png")
image.save(image_path)
return image_path
load_dotenv()
ChatGroq_key=os.getenv("ChatGroq")
HF_TOKEN=os.getenv("HF_TOKEN")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
SERPAPI_API_KEY=os.getenv("SERPAPI_API_KEY")
TAVILY_API_KEY=os.getenv("TAVILY_API_KEY")
duckduckgo_tool = DuckDuckGoSearchResults()
callback_handler = StdOutCallbackHandler()
callback_manager = CallbackManager(handlers=[callback_handler])
Custom_PythonInterpreter_Tool = PythonInterpreterTool()
DuckDuckGoSearch_Tool = DuckDuckGoSearchTool()
web_search_wikipedia = WikipediaSearchTool()
visitwebpage_tool = VisitWebpageTool()
@tool
def google_search_tool(query:str) -> str:
"""Performs a google web search for your query then returns a string of the top search results.
Args:
query (str): The search query to perform.
"""
search_engine = GoogleSearchTool()
response = search_engine(query=query,api_key=SERPAPI_API_KEY)
return response
def arxiv_search_tool(query: str) -> str:
"""
Tool function to search Arxiv papers by query and return summary info.
"""
loader = ArxivLoader(query=query, load_max_docs=2) # limit to 2 papers for brevity
docs = loader.load()
if not docs:
return "No papers found for the query."
# Prepare a summary string of the papers found
summaries = []
for doc in docs:
meta = doc.metadata
summary = f"Title: {meta.get('Title', 'N/A')}\n" \
f"Authors: {meta.get('Authors', 'N/A')}\n" \
f"Published: {meta.get('Published', 'N/A')}\n" \
f"Summary: {meta.get('Summary', 'N/A')[:500]}..." # limit summary length
summaries.append(summary)
return "\n\n".join(summaries)
@tool
def analyze_csv_file(file_path: str) -> str:
"""
Analyze a CSV file using pandas and answer a question about it.
Args:
file_path (str): the path to the CSV file.
"""
try:
# Read the CSV file
df = pd.read_csv(file_path)
# Run various analyses based on the query
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
"""
Extract text from an image using OCR library pytesseract (if available).
Args:
image_path (str): the path to the image file.
"""
try:
# Open the image
image = Image.open(image_path)
# Extract text from the image
text = pytesseract.image_to_string(image)
return f"Extracted text from image:\n\n{text}"
except Exception as e:
return f"Error extracting text from image: {str(e)}"
@tool
def download_document_from_url(url: str) -> str:
"""
Download document from URL to temporary file
Args:
url (str): url to download
"""
with httpx.Client() as client:
response = client.get(url)
response.raise_for_status()
# Get filename from response
file_ext = None
# Check if response has a suggested filename
if hasattr(response, 'headers'):
content_disposition = response.headers.get('content-disposition', '')
if 'filename=' in content_disposition:
import re
match = re.search(r'filename[^;=\n]*=(([\'"]).*?\2|[^;\n]*)', content_disposition)
if match:
filename = match.group(1).strip('\'"')
file_ext = Path(filename).suffix
# Fallback to final URL after redirects
if not file_ext:
final_url = str(response.url)
file_ext = Path(urlparse(final_url).path).suffix
# Default to .bin if no extension found
if not file_ext:
file_ext = '.bin'
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file:
tmp_file.write(response.content)
return tmp_file.name
@tool
def read_doc_file(file_path: str) -> str:
"""
Reads a .docx or .doc file and returns the extracted text.
Args:
file_path (str): Path to the Word document.
"""
try:
if file_path.endswith(".docx"):
from docx import Document as DocxDocument
doc = DocxDocument(file_path)
full_text = "\n".join([para.text for para in doc.paragraphs])
return full_text
elif file_path.endswith(".doc"):
import textract
text = textract.process(file_path).decode("utf-8")
return text
else:
return "Unsupported file type. Please provide a .doc or .docx file."
except Exception as e:
return f"Failed to read document: {str(e)}"
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a file and return the path.
Args:
content (str): the content to save to the file
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w") as f:
f.write(content)
return f"File saved to {filepath}. You can read this file to process its contents."
import speech_recognition as sr
import os
import requests
from pydub import AudioSegment
def transcribe_audio_from_path(local_audio_path: str, language: str = "en-US") -> str:
"""
Transcribes audio content from a local file path to a text string.
This tool is designed to convert spoken content from a locally saved audio file
into written text. It expects a path to an audio file that has already been
downloaded and saved to the local environment (e.g., using 'file_saver').
Supports various audio formats (e.g., MP3, WAV) and converts them to WAV internally for transcription.
For best results, specify the correct language code (e.g., 'en-US' for US English, 'es-ES' for Spanish).
Args:
local_audio_path (str): The local file path to the audio (e.g., "my_recording.mp3").
This MUST be a path to a file already existing on disk.
language (str, optional): The spoken language in the audio. Defaults to "en-US".
Refer to Google Speech Recognition language codes for options.
Returns:
str: The transcribed text, or an informative error message if transcription fails.
"""
r = sr.Recognizer()
temp_wav_path = "temp_audio_to_transcribe.wav" # Temporary WAV file for transcription
transcribed_text = ""
try:
# Ensure it's a local path and file exists
if local_audio_path.startswith("http://") or local_audio_path.startswith("https://"):
return "Error: This tool only accepts local file paths, not URLs. Please use 'file_saver' first."
if not os.path.exists(local_audio_path):
return f"Error: Local audio file not found at '{local_audio_path}'."
# Convert to WAV if not already (SpeechRecognition prefers WAV)
audio = AudioSegment.from_file(local_audio_path)
audio.export(temp_wav_path, format="wav")
# Transcribe the audio
with sr.AudioFile(temp_wav_path) as source:
audio_listened = r.record(source)
try:
transcribed_text = r.recognize_google(audio_listened, language=language)
except sr.UnknownValueError:
return "Could not understand audio (speech not clear or too short)."
except sr.RequestError as e:
return f"Could not request results from Google Speech Recognition service; {e}"
except FileNotFoundError: # This should be caught by os.path.exists now, but good for robustness
return f"Error: Audio file not found at '{local_audio_path}'."
except Exception as e:
return f"An unexpected error occurred during audio processing or transcription: {e}"
finally:
# Clean up temporary WAV file
if os.path.exists(temp_wav_path):
os.remove(temp_wav_path)
return transcribed_text.strip()
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url (str): the URL of the file to download.
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can read this file to process its contents."
except Exception as e:
return f"Error downloading file: {str(e)}"
import subprocess
import sys
from typing import Optional
from langchain.tools import tool
@tool
def run_python_script(script_path: str, timeout: Optional[int] = 30) -> str:
"""
Execute a .py file at `script_path` in a sandboxed subprocess,
capture stdout/stderr, and return it as a string.
Args:
script_path (str): Local path to the Python script.
timeout (int, optional): How many seconds to allow before killing it.
Returns:
str: The script’s captured output or error message.
"""
try:
# Use the same Python interpreter
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
timeout=timeout,
check=True
)
return result.stdout.strip() or "<no output>"
except subprocess.CalledProcessError as e:
# Script ran but exited nonzero
return f"❌ Exit {e.returncode}:\n{e.stderr.strip()}"
except subprocess.TimeoutExpired:
return f"❌ Script timed out after {timeout}s."
except Exception as e:
# Any other failure
return f"❌ Execution error: {e}"
@tool
def arxiv_search(query: str) -> str:
"""Search Arxiv for a query and return maximum 3 result."""
search_docs = TavilySearch(max_results=3).invoke(query=query)
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"web_results": formatted_search_docs}
@tool
def Excel_Reader(file_path: str)-> str:
"""
Analyze an Excel file using pandas and answer a question about it.
Args:
file_path (str): the path to the Excel file.
"""
try:
df = pd.read_excel(file_path)
result = (
f"✅ Excel file loaded successfully.\n"
f"🧮 Shape: {df.shape[0]} rows × {df.shape[1]} columns.\n"
f"🧾 Columns: {', '.join(df.columns)}\n\n"
f"📊 Data types:\n{df.dtypes.to_string()}\n\n"
f"📈 Summary statistics:\n{df.describe(include='all').to_string()}\n"
)
return result
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
"""
Analyze an Excel file using pandas and answer a question about it.
Args:
file_path (str): the path to the Excel file.
query (str): Question about the data
"""
try:
# Read the Excel file
df = pd.read_excel(file_path)
# Run various analyses based on the query
result = (
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
)
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
@tool
def Search_wikipedia(query: str) -> str:
"""Search wikipedia with a query and return maximum 3 results.
Args:
query: The search query.
Exsample: "Mercedes Sosa discography"
"""
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return {"wiki_results": formatted_search_docs}
@tool
def add(a: float, b: float)-> float:
"""
Adds two numbers.
Args:
a (float): the first number
b (float): the second number
"""
return a + b
@tool
def multiply(a: float, b: float) -> float:
"""
Multiplies two numbers.
Args:
a (float): the first number
b (float): the second number
"""
return a * b
@tool
def subtract(a: float, b: float) -> float:
"""
Subtracts two numbers.
Args:
a (float): the first number
b (float): the second number
"""
return a - b
@tool
def divide(a: float, b: float) -> float:
"""
divide two numbers.
Args:
a (float): the first number
b (float): the second number
"""
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""
Get the modulus of two numbers.
Args:
a (int): the first number
b (int): the second number
"""
return a % b
@tool
def power(a: float, b: float) -> float:
"""
Get the power of two numbers.
Args:
a (float): the first number
b (float): the second number
"""
return a**b
@tool
def square_root(a: float) -> float | complex:
"""
Get the square root of a number.
Args:
a (float): the number to get the square root of
"""
if a >= 0:
return a**0.5
return cmath.sqrt(a)
@tool
def download_task_file_name(file_name: str) -> str:
"""
Downloads a file using the task_id (derived from the file_name),
saves it locally with the original extension, and returns the full path.
"""
task_id, ext = os.path.splitext(file_name)
BASE_URL = "https://agents-course-unit4-scoring.hf.space"
url = f"{BASE_URL}/files/{task_id}"
response = requests.get(url)
response.raise_for_status()
# Save to temp dir with correct extension
tmp_dir = tempfile.gettempdir()
local_path = os.path.join(tmp_dir, file_name)
with open(local_path, "wb") as f:
f.write(response.content)
print(f"downloaded task file and returned at path... {local_path}")
return local_path
@tool
def transcribe_audio(file_path: str) -> str:
"""Transcribes a audio file to text
Args:
file_path: The file_path to the audio
"""
try:
client = OpenAI(api_key=OPENAI_API_KEY)
with open(file_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
file=audio_file,
model="whisper-1"
)
return transcript.text
except Exception as e:
return f"Transcription failed: {str(e)}"
def build_graph():
"""Builds the graph"""
llm = ChatOpenAI(
model="gpt-4o",
api_key=OPENAI_API_KEY,
max_retries=5,
verbose=True,
timeout=10
)
@tool
def extract_text(img_path: str) -> str:
""" Extract text from an image file using a multimodal model.
Args:
img_path (str): The path for the image
"""
all_text = ""
try:
# Read image and encode as base64
with open(img_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
# Prepare the prompt including the base64 image data
message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"Extract all the text from this image. "
"Return only the extracted text, no explanations."
),
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{image_base64}"
},
},
]
)
]
response = ai_agent.invoke(message)
all_text += response.content + "\n\n"
return all_text.strip()
except Exception as e:
error_msg = f"Error extracting text: {str(e)}"
print(error_msg)
return ""
@tool
def analyze_imageTool(query: str, input_file: str) -> str:
""" A vision model that analyzes the input_file and answers the query
Args:
query: The question/task for the vision model to analyze on the file
input_file: The file for the vision model (.png, .jpg etc.)
"""
try:
print(f"[analyze_image]: query: {query}, input_file: {input_file}")
# Read image and encode as base64
with open(input_file, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
message = [
HumanMessage(
content=[
{
"type": "text",
"text": query,
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{image_base64}"
},
},
]
)
]
assistant = ChatOpenAI(
model="gpt-4o",
api_key=OPENAI_API_KEY,
temperature=0.8,
max_retries=2,
verbose=True,
timeout=10
)
response = assistant.invoke(message)
print(f"[function] returning: {response}")
return response.content.strip()
except Exception as e:
error_msg = f"Error analyzing image: {str(e)}"
print(error_msg)
return ""
def read_file_tool(file_path: str) -> str:
""" Open the file and read contents or do something meaningful
Args:
file_path: The path to the file_path
"""
with open(file_path, 'r') as f:
content = f.read()
return content
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query and return maximum 2 results.
Args:
query: The search query."""
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"wiki_results": formatted_search_docs}
@tool
def analyze_image_basic_properties(image_base64: str) -> Dict[str, Any]:
"""
Analyze basic properties of an image (size, mode, color analysis, thumbnail preview).
Args:
image_base64 (str): Base64 encoded image string
Returns:
Dictionary with analysis result
"""
try:
img = decode_image(image_base64)
width, height = img.size
mode = img.mode
if mode in ("RGB", "RGBA"):
arr = np.array(img)
avg_colors = arr.mean(axis=(0, 1))
dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])]
brightness = avg_colors.mean()
color_analysis = {
"average_rgb": avg_colors.tolist(),
"brightness": brightness,
"dominant_color": dominant,
}
else:
color_analysis = {"note": f"No color analysis for mode {mode}"}
thumbnail = img.copy()
thumbnail.thumbnail((100, 100))
thumb_path = save_image(thumbnail, "thumbnails")
thumbnail_base64 = encode_image(thumb_path)
return {
"dimensions": (width, height),
"mode": mode,
"color_analysis": color_analysis,
"thumbnail": thumbnail_base64,
}
except Exception as e:
return {"error": str(e)}
@tool
def transform_image(
image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale.
Args:
image_base64 (str): Base64 encoded input image
operation (str): Transformation operation
params (Dict[str, Any], optional): Parameters for the operation
Returns:
Dictionary with transformed image (base64)
"""
try:
img = decode_image(image_base64)
params = params or {}
if operation == "resize":
img = img.resize(
(
params.get("width", img.width // 2),
params.get("height", img.height // 2),
)
)
elif operation == "rotate":
img = img.rotate(params.get("angle", 90), expand=True)
elif operation == "crop":
img = img.crop(
(
params.get("left", 0),
params.get("top", 0),
params.get("right", img.width),
params.get("bottom", img.height),
)
)
elif operation == "flip":
if params.get("direction", "horizontal") == "horizontal":
img = img.transpose(Image.FLIP_LEFT_RIGHT)
else:
img = img.transpose(Image.FLIP_TOP_BOTTOM)
elif operation == "adjust_brightness":
img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5))
elif operation == "adjust_contrast":
img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5))
elif operation == "blur":
img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2)))
elif operation == "sharpen":
img = img.filter(ImageFilter.SHARPEN)
elif operation == "grayscale":
img = img.convert("L")
else:
return {"error": f"Unknown operation: {operation}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
return {"transformed_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def draw_on_image(
image_base64: str, drawing_type: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Draw shapes (rectangle, circle, line) or text onto an image.
Args:
image_base64 (str): Base64 encoded input image
drawing_type (str): Drawing type
params (Dict[str, Any]): Drawing parameters
Returns:
Dictionary with result image (base64)
"""
try:
img = decode_image(image_base64)
draw = ImageDraw.Draw(img)
color = params.get("color", "red")
if drawing_type == "rectangle":
draw.rectangle(
[params["left"], params["top"], params["right"], params["bottom"]],
outline=color,
width=params.get("width", 2),
)
elif drawing_type == "circle":
x, y, r = params["x"], params["y"], params["radius"]
draw.ellipse(
(x - r, y - r, x + r, y + r),
outline=color,
width=params.get("width", 2),
)
elif drawing_type == "line":
draw.line(
(
params["start_x"],
params["start_y"],
params["end_x"],
params["end_y"],
),
fill=color,
width=params.get("width", 2),
)
elif drawing_type == "text":
font_size = params.get("font_size", 20)
try:
font = ImageFont.truetype("arial.ttf", font_size)
except IOError:
font = ImageFont.load_default()
draw.text(
(params["x"], params["y"]),
params.get("text", "Text"),
fill=color,
font=font,
)
else:
return {"error": f"Unknown drawing type: {drawing_type}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
return {"result_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def generate_simple_image(
image_type: str,
width: int = 500,
height: int = 500,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Generate a simple image (gradient, noise, pattern, chart).
Args:
image_type (str): Type of image
width (int), height (int)
params (Dict[str, Any], optional): Specific parameters
Returns:
Dictionary with generated image (base64)
"""
try:
params = params or {}
if image_type == "gradient":
direction = params.get("direction", "horizontal")
start_color = params.get("start_color", (255, 0, 0))
end_color = params.get("end_color", (0, 0, 255))
img = Image.new("RGB", (width, height))
draw = ImageDraw.Draw(img)
if direction == "horizontal":
for x in range(width):
r = int(
start_color[0] + (end_color[0] - start_color[0]) * x / width
)
g = int(
start_color[1] + (end_color[1] - start_color[1]) * x / width
)
b = int(
start_color[2] + (end_color[2] - start_color[2]) * x / width
)
draw.line([(x, 0), (x, height)], fill=(r, g, b))
else:
for y in range(height):
r = int(
start_color[0] + (end_color[0] - start_color[0]) * y / height
)
g = int(
start_color[1] + (end_color[1] - start_color[1]) * y / height
)
b = int(
start_color[2] + (end_color[2] - start_color[2]) * y / height
)
draw.line([(0, y), (width, y)], fill=(r, g, b))
elif image_type == "noise":
noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
img = Image.fromarray(noise_array, "RGB")
else:
return {"error": f"Unsupported image_type {image_type}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
return {"generated_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def combine_images(
images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Combine multiple images (collage, stack, blend).
Args:
images_base64 (List[str]): List of base64 images
operation (str): Combination type
params (Dict[str, Any], optional)
Returns:
Dictionary with combined image (base64)
"""
try:
images = [decode_image(b64) for b64 in images_base64]
params = params or {}
if operation == "stack":
direction = params.get("direction", "horizontal")
if direction == "horizontal":
total_width = sum(img.width for img in images)
max_height = max(img.height for img in images)
new_img = Image.new("RGB", (total_width, max_height))
x = 0
for img in images:
new_img.paste(img, (x, 0))
x += img.width
else:
max_width = max(img.width for img in images)
total_height = sum(img.height for img in images)
new_img = Image.new("RGB", (max_width, total_height))
y = 0
for img in images:
new_img.paste(img, (0, y))
y += img.height
else:
return {"error": f"Unsupported combination operation {operation}"}
result_path = save_image(new_img)
result_base64 = encode_image(result_path)
return {"combined_image": result_base64}
except Exception as e:
return {"error": str(e)}
transform_imagetool = Tool.from_function(
func=transform_image,
name="transform_imagetool",
description="Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale."
)
analyze_image_basic_propertiestool = Tool.from_function(
func=analyze_image_basic_properties,
name="analyze_image_basic_propertiestool",
description="Analyze basic properties of an image (size, mode, color analysis, thumbnail preview)."
)
draw_on_imagetool = Tool.from_function(
func=draw_on_image,
name="draw_on_imagetool",
description="Draw shapes (rectangle, circle, line) or text onto an image."
)
generate_simple_imagetool = Tool.from_function(
func=generate_simple_image,
name="generate_simple_imagetool",
description="Generate a simple image (gradient, noise, pattern, chart)."
)
combine_imagestool = Tool.from_function(
func=combine_images,
name="combine_imagestool",
description="Combine multiple images (collage, stack, blend)."
)
wiki_search_Tool = Tool.from_function(
func=wiki_search,
name="wiki_search_Tool",
description="Search Wikipedia for a query and return maximum 2 results"
)
subtract_tool = Tool.from_function(
func=subtract,
name="subtract",
description="Subtracts one number from another (a - b)."
)
read_file_Tool = Tool.from_function(
func=read_file_tool,
name="read_file_tool",
description="Reads content of a file given its path."
)
download_task_filenameTool = Tool.from_function(
func=download_task_file_name,
name="download_task_filename_tool",
description=" Downloads a file releated to the question using the (file_name) or (task_id), it returns the file_path, you can then use that file_path as input to other tools."
)
modulus_tool = Tool.from_function(
func=modulus,
name="modulus_tool",
description="Get the modulus of two numbers."
)
power_tool = Tool.from_function(
func=power,
name="power_tool",
description="Get the power of two numbers."
)
Download_document_from_url_Tool = Tool.from_function(
func=download_document_from_url,
name="download_document_from_url",
description="Downloads and saves a document from a specified URL."
)
Excel_ReaderTool = Tool.from_function(
func=Excel_Reader,
name="excel_reader",
description="Reads an Excel (.xlsx) file and extracts its content."
)
transcribe_audio_Tool = Tool.from_function(
func=transcribe_audio,
name="transcribe_audio",
description="Transcribes spoken content from an audio file into text using Whisper."
)
download_file_from_urlTool = Tool.from_function(
func=download_file_from_url,
name="download_file_from_url",
description="Downloads any file (PDF, image, etc.) from a given URL and saves it locally."
)
save_and_read_fileTool = Tool.from_function(
func=save_and_read_file,
name="save_and_read_file",
description="Saves provided content to a local file and reads the file's content back."
)
analyze_imageTool_ = Tool.from_function(
func=analyze_imageTool,
name="analyze_image",
description="A vision model that analyzes the input_file and answers the query"
)
run_python_scriptTool = Tool.from_function(
func=run_python_script,
name="run_python_script",
description="Executes custom Python code dynamically and returns the output or result."
)
analyze_csv_fileTool = Tool.from_function(
func=analyze_csv_file,
name="analyze_csv_file",
description="Loads and analyzes a CSV file to answer questions or summarize data."
)
extract_textTool = Tool.from_function(
func=extract_text,
name="extract_textTool",
description="Extracts all readable text from an image using a vision model (OpenAI multimodal)."
)
extract_text_from_image_tool = Tool.from_function(
func=extract_text_from_image,
name="extract_text_from_image",
description=" Extract text from an image using OCR library pytesseract (if available)."
)
arxiv_searchTool = Tool.from_function(
func=arxiv_search,
name="arxiv_search",
description="Searches for academic papers on Arxiv.org based on a query and returns summaries."
)
read_doc_fileTool = Tool.from_function(
func=read_doc_file,
name="read_doc_file",
description="Reads the content of a Word (.docx) file and returns the extracted text."
)
Search_wikipediaTool = Tool.from_function(
func=Search_wikipedia,
name="search_wikipedia",
description="Search wikipedia with a query and return maximum 3 results."
)
duckduckgo_websearch_tool = Tool.from_function(
func=duckduckgo_tool,
name="duckduckgo_websearch_tool",
description="Tool that queries the DuckDuckGo search API and returns the results in `output_format"
)
square_root_tool = Tool.from_function(
func=square_root,
name="square_root_tool",
description=" Get the square root of a number."
)
PythonInterpreter_Tool = Tool.from_function(
func=Custom_PythonInterpreter_Tool,
name="PythonInterpreter_Tool",
description="This is a tool that evaluates python code. It can be used to perform calculations.",
)
google_search_tool_ = Tool.from_function(
func=google_search_tool,
name="google_search_tool_",
description="Searches Google with a query",
)
Web_search_DuckDuckGoSearch_Tool = Tool.from_function(
func=DuckDuckGoSearch_Tool,
name="Web_search_DuckDuckGoSearch_Tool",
description="This is a tool that search on the duckduck web and provides information .",
)
WikipediaSearch_Tool = Tool.from_function(
func=web_search_wikipedia,
name="web_search_duckduckgo_tool",
description="The topic to search on Wikipedia..",
)
visit_webpage = Tool.from_function(
func=visitwebpage_tool,
name="visit_webpage",
description="Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages.",
)
arxiv_tool = Tool.from_function(
func=arxiv_search_tool,
name="arxiv_search_tool",
description="Searches academic papers on Arxiv.org by query and returns summaries."
)
analyze_excel_fileTool = Tool.from_function(
func=analyze_excel_file,
name="analyze_excel_fileTool",
description="Analyze an Excel file using pandas and answer a question about it.."
)
analyze_excel_fileTool = Tool.from_function(
func=analyze_excel_file,
name="analyze_excel_fileTool",
description="Analyze an Excel file using pandas and answer a question about it.."
)
Tools = [
subtract_tool,
Download_document_from_url_Tool,
Excel_ReaderTool,
transcribe_audio_Tool,
download_file_from_urlTool,
save_and_read_fileTool,
analyze_imageTool_,
run_python_scriptTool,
analyze_csv_fileTool,
extract_textTool,
arxiv_searchTool,
read_doc_fileTool,
Search_wikipediaTool,
duckduckgo_websearch_tool,
modulus_tool,
power_tool,
square_root_tool,
extract_text_from_image_tool,
download_task_filenameTool,
read_file_Tool,
PythonInterpreter_Tool,
google_search_tool_,
Web_search_DuckDuckGoSearch_Tool,
visit_webpage,
WikipediaSearch_Tool,
arxiv_tool,
wiki_search_Tool,
analyze_excel_fileTool,
combine_imagestool,
generate_simple_imagetool,
draw_on_imagetool,
transform_imagetool,
analyze_image_basic_propertiestool
]
from langchain_core.messages import SystemMessage
# Read the system prompt from the file
prompt_template = "prompt_template.txt"
with open(prompt_template, 'r', encoding='utf-8') as file:
prompt_content = file.read()
# Create the SystemMessage
system_message = SystemMessage(content=prompt_content)
ai_agent = create_react_agent(#from langchain.agents.react.base import ReActAgent
model=llm,
tools=Tools,
prompt=system_message
)
return ai_agent
# if __name__ == "__main__":
# graph = build_graph(provider="openAi")
# img_bytes = graph.get_graph().draw_mermaid_png()
# with open("dav.png", "wb") as f:
# f.write(img_bytes)