|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain_community.document_loaders import WikipediaLoader,ArxivLoader |
|
|
from langchain_tavily import TavilySearch |
|
|
from langchain.schema import HumanMessage |
|
|
from openai import OpenAI |
|
|
from langchain.tools import tool |
|
|
import pandas as pd |
|
|
|
|
|
from langchain_core.callbacks.manager import CallbackManager |
|
|
from langchain_core.callbacks.stdout import StdOutCallbackHandler |
|
|
from langgraph.types import Command |
|
|
from langchain.docstore.document import Document |
|
|
from typing import List, Dict, Any, Optional |
|
|
import uuid |
|
|
import tempfile |
|
|
from langchain.agents import Tool |
|
|
from urllib.parse import urlparse |
|
|
import pytesseract |
|
|
from langgraph.prebuilt import create_react_agent |
|
|
from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter |
|
|
import requests |
|
|
from dotenv import load_dotenv |
|
|
import os |
|
|
import cmath |
|
|
import httpx |
|
|
from pathlib import Path |
|
|
import base64 |
|
|
from langchain_community.tools import DuckDuckGoSearchResults |
|
|
|
|
|
|
|
|
from smolagents import DuckDuckGoSearchTool,PythonInterpreterTool,WikipediaSearchTool,VisitWebpageTool,GoogleSearchTool |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
def encode_image(image_path: str) -> str: |
|
|
"""Convert an image file to base64 string.""" |
|
|
with open(image_path, "rb") as image_file: |
|
|
return base64.b64encode(image_file.read()).decode("utf-8") |
|
|
|
|
|
|
|
|
def decode_image(base64_string: str) -> Image.Image: |
|
|
"""Convert a base64 string to a PIL Image.""" |
|
|
image_data = base64.b64decode(base64_string) |
|
|
return Image.open(io.BytesIO(image_data)) |
|
|
|
|
|
|
|
|
def save_image(image: Image.Image, directory: str = "image_outputs") -> str: |
|
|
"""Save a PIL Image to disk and return the path.""" |
|
|
os.makedirs(directory, exist_ok=True) |
|
|
image_id = str(uuid.uuid4()) |
|
|
image_path = os.path.join(directory, f"{image_id}.png") |
|
|
image.save(image_path) |
|
|
return image_path |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
ChatGroq_key=os.getenv("ChatGroq") |
|
|
HF_TOKEN=os.getenv("HF_TOKEN") |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
SERPAPI_API_KEY=os.getenv("SERPAPI_API_KEY") |
|
|
TAVILY_API_KEY=os.getenv("TAVILY_API_KEY") |
|
|
duckduckgo_tool = DuckDuckGoSearchResults() |
|
|
callback_handler = StdOutCallbackHandler() |
|
|
callback_manager = CallbackManager(handlers=[callback_handler]) |
|
|
Custom_PythonInterpreter_Tool = PythonInterpreterTool() |
|
|
DuckDuckGoSearch_Tool = DuckDuckGoSearchTool() |
|
|
web_search_wikipedia = WikipediaSearchTool() |
|
|
visitwebpage_tool = VisitWebpageTool() |
|
|
@tool |
|
|
def google_search_tool(query:str) -> str: |
|
|
"""Performs a google web search for your query then returns a string of the top search results. |
|
|
Args: |
|
|
query (str): The search query to perform. |
|
|
""" |
|
|
search_engine = GoogleSearchTool() |
|
|
response = search_engine(query=query,api_key=SERPAPI_API_KEY) |
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
def arxiv_search_tool(query: str) -> str: |
|
|
""" |
|
|
Tool function to search Arxiv papers by query and return summary info. |
|
|
""" |
|
|
loader = ArxivLoader(query=query, load_max_docs=2) |
|
|
docs = loader.load() |
|
|
|
|
|
if not docs: |
|
|
return "No papers found for the query." |
|
|
|
|
|
|
|
|
summaries = [] |
|
|
for doc in docs: |
|
|
meta = doc.metadata |
|
|
summary = f"Title: {meta.get('Title', 'N/A')}\n" \ |
|
|
f"Authors: {meta.get('Authors', 'N/A')}\n" \ |
|
|
f"Published: {meta.get('Published', 'N/A')}\n" \ |
|
|
f"Summary: {meta.get('Summary', 'N/A')[:500]}..." |
|
|
summaries.append(summary) |
|
|
|
|
|
return "\n\n".join(summaries) |
|
|
|
|
|
|
|
|
@tool |
|
|
def analyze_csv_file(file_path: str) -> str: |
|
|
""" |
|
|
Analyze a CSV file using pandas and answer a question about it. |
|
|
Args: |
|
|
file_path (str): the path to the CSV file. |
|
|
""" |
|
|
try: |
|
|
|
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
|
|
|
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
|
|
|
|
|
|
|
result += "Summary statistics:\n" |
|
|
result += str(df.describe()) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error analyzing CSV file: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def extract_text_from_image(image_path: str) -> str: |
|
|
""" |
|
|
Extract text from an image using OCR library pytesseract (if available). |
|
|
Args: |
|
|
image_path (str): the path to the image file. |
|
|
""" |
|
|
try: |
|
|
|
|
|
image = Image.open(image_path) |
|
|
|
|
|
|
|
|
text = pytesseract.image_to_string(image) |
|
|
|
|
|
return f"Extracted text from image:\n\n{text}" |
|
|
except Exception as e: |
|
|
return f"Error extracting text from image: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def download_document_from_url(url: str) -> str: |
|
|
""" |
|
|
Download document from URL to temporary file |
|
|
Args: |
|
|
url (str): url to download |
|
|
|
|
|
""" |
|
|
with httpx.Client() as client: |
|
|
response = client.get(url) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
file_ext = None |
|
|
|
|
|
|
|
|
if hasattr(response, 'headers'): |
|
|
content_disposition = response.headers.get('content-disposition', '') |
|
|
if 'filename=' in content_disposition: |
|
|
import re |
|
|
match = re.search(r'filename[^;=\n]*=(([\'"]).*?\2|[^;\n]*)', content_disposition) |
|
|
if match: |
|
|
filename = match.group(1).strip('\'"') |
|
|
file_ext = Path(filename).suffix |
|
|
|
|
|
|
|
|
if not file_ext: |
|
|
final_url = str(response.url) |
|
|
file_ext = Path(urlparse(final_url).path).suffix |
|
|
|
|
|
|
|
|
if not file_ext: |
|
|
file_ext = '.bin' |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file: |
|
|
tmp_file.write(response.content) |
|
|
return tmp_file.name |
|
|
|
|
|
@tool |
|
|
def read_doc_file(file_path: str) -> str: |
|
|
""" |
|
|
Reads a .docx or .doc file and returns the extracted text. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the Word document. |
|
|
""" |
|
|
try: |
|
|
if file_path.endswith(".docx"): |
|
|
from docx import Document as DocxDocument |
|
|
doc = DocxDocument(file_path) |
|
|
full_text = "\n".join([para.text for para in doc.paragraphs]) |
|
|
return full_text |
|
|
|
|
|
|
|
|
elif file_path.endswith(".doc"): |
|
|
import textract |
|
|
text = textract.process(file_path).decode("utf-8") |
|
|
return text |
|
|
else: |
|
|
return "Unsupported file type. Please provide a .doc or .docx file." |
|
|
except Exception as e: |
|
|
return f"Failed to read document: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def save_and_read_file(content: str, filename: Optional[str] = None) -> str: |
|
|
""" |
|
|
Save content to a file and return the path. |
|
|
Args: |
|
|
content (str): the content to save to the file |
|
|
filename (str, optional): the name of the file. If not provided, a random name file will be created. |
|
|
""" |
|
|
temp_dir = tempfile.gettempdir() |
|
|
if filename is None: |
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) |
|
|
filepath = temp_file.name |
|
|
else: |
|
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
|
|
with open(filepath, "w") as f: |
|
|
f.write(content) |
|
|
|
|
|
return f"File saved to {filepath}. You can read this file to process its contents." |
|
|
|
|
|
import speech_recognition as sr |
|
|
import os |
|
|
import requests |
|
|
from pydub import AudioSegment |
|
|
def transcribe_audio_from_path(local_audio_path: str, language: str = "en-US") -> str: |
|
|
""" |
|
|
Transcribes audio content from a local file path to a text string. |
|
|
This tool is designed to convert spoken content from a locally saved audio file |
|
|
into written text. It expects a path to an audio file that has already been |
|
|
downloaded and saved to the local environment (e.g., using 'file_saver'). |
|
|
Supports various audio formats (e.g., MP3, WAV) and converts them to WAV internally for transcription. |
|
|
For best results, specify the correct language code (e.g., 'en-US' for US English, 'es-ES' for Spanish). |
|
|
Args: |
|
|
local_audio_path (str): The local file path to the audio (e.g., "my_recording.mp3"). |
|
|
This MUST be a path to a file already existing on disk. |
|
|
language (str, optional): The spoken language in the audio. Defaults to "en-US". |
|
|
Refer to Google Speech Recognition language codes for options. |
|
|
Returns: |
|
|
str: The transcribed text, or an informative error message if transcription fails. |
|
|
""" |
|
|
r = sr.Recognizer() |
|
|
temp_wav_path = "temp_audio_to_transcribe.wav" |
|
|
transcribed_text = "" |
|
|
|
|
|
try: |
|
|
|
|
|
if local_audio_path.startswith("http://") or local_audio_path.startswith("https://"): |
|
|
return "Error: This tool only accepts local file paths, not URLs. Please use 'file_saver' first." |
|
|
|
|
|
if not os.path.exists(local_audio_path): |
|
|
return f"Error: Local audio file not found at '{local_audio_path}'." |
|
|
|
|
|
|
|
|
audio = AudioSegment.from_file(local_audio_path) |
|
|
audio.export(temp_wav_path, format="wav") |
|
|
|
|
|
|
|
|
with sr.AudioFile(temp_wav_path) as source: |
|
|
audio_listened = r.record(source) |
|
|
try: |
|
|
transcribed_text = r.recognize_google(audio_listened, language=language) |
|
|
except sr.UnknownValueError: |
|
|
return "Could not understand audio (speech not clear or too short)." |
|
|
except sr.RequestError as e: |
|
|
return f"Could not request results from Google Speech Recognition service; {e}" |
|
|
|
|
|
except FileNotFoundError: |
|
|
return f"Error: Audio file not found at '{local_audio_path}'." |
|
|
except Exception as e: |
|
|
return f"An unexpected error occurred during audio processing or transcription: {e}" |
|
|
finally: |
|
|
|
|
|
if os.path.exists(temp_wav_path): |
|
|
os.remove(temp_wav_path) |
|
|
|
|
|
return transcribed_text.strip() |
|
|
|
|
|
|
|
|
@tool |
|
|
def download_file_from_url(url: str, filename: Optional[str] = None) -> str: |
|
|
""" |
|
|
Download a file from a URL and save it to a temporary location. |
|
|
Args: |
|
|
url (str): the URL of the file to download. |
|
|
filename (str, optional): the name of the file. If not provided, a random name file will be created. |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not filename: |
|
|
path = urlparse(url).path |
|
|
filename = os.path.basename(path) |
|
|
if not filename: |
|
|
filename = f"downloaded_{uuid.uuid4().hex[:8]}" |
|
|
|
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
|
|
|
|
|
response = requests.get(url, stream=True) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
with open(filepath, "wb") as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
return f"File downloaded to {filepath}. You can read this file to process its contents." |
|
|
except Exception as e: |
|
|
return f"Error downloading file: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
import subprocess |
|
|
import sys |
|
|
from typing import Optional |
|
|
from langchain.tools import tool |
|
|
|
|
|
@tool |
|
|
def run_python_script(script_path: str, timeout: Optional[int] = 30) -> str: |
|
|
""" |
|
|
Execute a .py file at `script_path` in a sandboxed subprocess, |
|
|
capture stdout/stderr, and return it as a string. |
|
|
|
|
|
Args: |
|
|
script_path (str): Local path to the Python script. |
|
|
timeout (int, optional): How many seconds to allow before killing it. |
|
|
|
|
|
Returns: |
|
|
str: The script’s captured output or error message. |
|
|
""" |
|
|
try: |
|
|
|
|
|
result = subprocess.run( |
|
|
[sys.executable, script_path], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=timeout, |
|
|
check=True |
|
|
) |
|
|
return result.stdout.strip() or "<no output>" |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
|
|
|
return f"❌ Exit {e.returncode}:\n{e.stderr.strip()}" |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
return f"❌ Script timed out after {timeout}s." |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
return f"❌ Execution error: {e}" |
|
|
|
|
|
@tool |
|
|
def arxiv_search(query: str) -> str: |
|
|
"""Search Arxiv for a query and return maximum 3 result.""" |
|
|
search_docs = TavilySearch(max_results=3).invoke(query=query) |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return {"web_results": formatted_search_docs} |
|
|
|
|
|
@tool |
|
|
def Excel_Reader(file_path: str)-> str: |
|
|
""" |
|
|
Analyze an Excel file using pandas and answer a question about it. |
|
|
|
|
|
Args: |
|
|
file_path (str): the path to the Excel file. |
|
|
""" |
|
|
try: |
|
|
df = pd.read_excel(file_path) |
|
|
result = ( |
|
|
f"✅ Excel file loaded successfully.\n" |
|
|
f"🧮 Shape: {df.shape[0]} rows × {df.shape[1]} columns.\n" |
|
|
f"🧾 Columns: {', '.join(df.columns)}\n\n" |
|
|
f"📊 Data types:\n{df.dtypes.to_string()}\n\n" |
|
|
f"📈 Summary statistics:\n{df.describe(include='all').to_string()}\n" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error analyzing Excel file: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def analyze_excel_file(file_path: str, query: str) -> str: |
|
|
""" |
|
|
Analyze an Excel file using pandas and answer a question about it. |
|
|
Args: |
|
|
file_path (str): the path to the Excel file. |
|
|
query (str): Question about the data |
|
|
""" |
|
|
try: |
|
|
|
|
|
df = pd.read_excel(file_path) |
|
|
|
|
|
|
|
|
result = ( |
|
|
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
|
) |
|
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
|
|
|
|
|
|
|
result += "Summary statistics:\n" |
|
|
result += str(df.describe()) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error analyzing Excel file: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def Search_wikipedia(query: str) -> str: |
|
|
"""Search wikipedia with a query and return maximum 3 results. |
|
|
Args: |
|
|
query: The search query. |
|
|
Exsample: "Mercedes Sosa discography" |
|
|
""" |
|
|
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
|
|
for doc in search_docs |
|
|
]) |
|
|
return {"wiki_results": formatted_search_docs} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def add(a: float, b: float)-> float: |
|
|
""" |
|
|
Adds two numbers. |
|
|
|
|
|
Args: |
|
|
a (float): the first number |
|
|
b (float): the second number |
|
|
""" |
|
|
return a + b |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def multiply(a: float, b: float) -> float: |
|
|
""" |
|
|
Multiplies two numbers. |
|
|
|
|
|
Args: |
|
|
a (float): the first number |
|
|
b (float): the second number |
|
|
""" |
|
|
return a * b |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def subtract(a: float, b: float) -> float: |
|
|
""" |
|
|
Subtracts two numbers. |
|
|
|
|
|
Args: |
|
|
a (float): the first number |
|
|
b (float): the second number |
|
|
""" |
|
|
return a - b |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def divide(a: float, b: float) -> float: |
|
|
""" |
|
|
divide two numbers. |
|
|
|
|
|
Args: |
|
|
a (float): the first number |
|
|
b (float): the second number |
|
|
""" |
|
|
return a / b |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def modulus(a: int, b: int) -> int: |
|
|
""" |
|
|
Get the modulus of two numbers. |
|
|
|
|
|
Args: |
|
|
a (int): the first number |
|
|
b (int): the second number |
|
|
""" |
|
|
return a % b |
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def power(a: float, b: float) -> float: |
|
|
""" |
|
|
Get the power of two numbers. |
|
|
Args: |
|
|
a (float): the first number |
|
|
b (float): the second number |
|
|
""" |
|
|
return a**b |
|
|
|
|
|
|
|
|
@tool |
|
|
def square_root(a: float) -> float | complex: |
|
|
""" |
|
|
Get the square root of a number. |
|
|
Args: |
|
|
a (float): the number to get the square root of |
|
|
""" |
|
|
if a >= 0: |
|
|
return a**0.5 |
|
|
return cmath.sqrt(a) |
|
|
|
|
|
@tool |
|
|
def download_task_file_name(file_name: str) -> str: |
|
|
""" |
|
|
Downloads a file using the task_id (derived from the file_name), |
|
|
saves it locally with the original extension, and returns the full path. |
|
|
""" |
|
|
task_id, ext = os.path.splitext(file_name) |
|
|
BASE_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
url = f"{BASE_URL}/files/{task_id}" |
|
|
|
|
|
response = requests.get(url) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
tmp_dir = tempfile.gettempdir() |
|
|
local_path = os.path.join(tmp_dir, file_name) |
|
|
|
|
|
with open(local_path, "wb") as f: |
|
|
f.write(response.content) |
|
|
|
|
|
print(f"downloaded task file and returned at path... {local_path}") |
|
|
|
|
|
return local_path |
|
|
|
|
|
@tool |
|
|
def transcribe_audio(file_path: str) -> str: |
|
|
"""Transcribes a audio file to text |
|
|
Args: |
|
|
file_path: The file_path to the audio |
|
|
""" |
|
|
try: |
|
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
|
|
|
|
with open(file_path, "rb") as audio_file: |
|
|
transcript = client.audio.transcriptions.create( |
|
|
file=audio_file, |
|
|
model="whisper-1" |
|
|
) |
|
|
return transcript.text |
|
|
|
|
|
except Exception as e: |
|
|
return f"Transcription failed: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_graph(): |
|
|
"""Builds the graph""" |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model="gpt-4o", |
|
|
api_key=OPENAI_API_KEY, |
|
|
max_retries=5, |
|
|
verbose=True, |
|
|
timeout=10 |
|
|
) |
|
|
|
|
|
@tool |
|
|
def extract_text(img_path: str) -> str: |
|
|
""" Extract text from an image file using a multimodal model. |
|
|
|
|
|
Args: |
|
|
img_path (str): The path for the image |
|
|
""" |
|
|
all_text = "" |
|
|
try: |
|
|
|
|
|
with open(img_path, "rb") as image_file: |
|
|
image_bytes = image_file.read() |
|
|
|
|
|
image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
|
|
|
|
|
|
message = [ |
|
|
HumanMessage( |
|
|
content=[ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": ( |
|
|
"Extract all the text from this image. " |
|
|
"Return only the extracted text, no explanations." |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{image_base64}" |
|
|
}, |
|
|
}, |
|
|
] |
|
|
) |
|
|
] |
|
|
|
|
|
response = ai_agent.invoke(message) |
|
|
|
|
|
|
|
|
|
|
|
all_text += response.content + "\n\n" |
|
|
|
|
|
return all_text.strip() |
|
|
except Exception as e: |
|
|
|
|
|
error_msg = f"Error extracting text: {str(e)}" |
|
|
print(error_msg) |
|
|
return "" |
|
|
|
|
|
@tool |
|
|
def analyze_imageTool(query: str, input_file: str) -> str: |
|
|
""" A vision model that analyzes the input_file and answers the query |
|
|
|
|
|
Args: |
|
|
query: The question/task for the vision model to analyze on the file |
|
|
input_file: The file for the vision model (.png, .jpg etc.) |
|
|
""" |
|
|
try: |
|
|
print(f"[analyze_image]: query: {query}, input_file: {input_file}") |
|
|
|
|
|
with open(input_file, "rb") as image_file: |
|
|
image_bytes = image_file.read() |
|
|
image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
|
|
|
|
|
|
message = [ |
|
|
HumanMessage( |
|
|
content=[ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": query, |
|
|
}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{image_base64}" |
|
|
}, |
|
|
}, |
|
|
] |
|
|
) |
|
|
] |
|
|
|
|
|
assistant = ChatOpenAI( |
|
|
model="gpt-4o", |
|
|
api_key=OPENAI_API_KEY, |
|
|
temperature=0.8, |
|
|
max_retries=2, |
|
|
verbose=True, |
|
|
timeout=10 |
|
|
) |
|
|
|
|
|
|
|
|
response = assistant.invoke(message) |
|
|
|
|
|
|
|
|
print(f"[function] returning: {response}") |
|
|
return response.content.strip() |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error analyzing image: {str(e)}" |
|
|
print(error_msg) |
|
|
return "" |
|
|
|
|
|
def read_file_tool(file_path: str) -> str: |
|
|
""" Open the file and read contents or do something meaningful |
|
|
Args: |
|
|
file_path: The path to the file_path |
|
|
""" |
|
|
with open(file_path, 'r') as f: |
|
|
content = f.read() |
|
|
return content |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def wiki_search(query: str) -> str: |
|
|
"""Search Wikipedia for a query and return maximum 2 results. |
|
|
Args: |
|
|
query: The search query.""" |
|
|
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return {"wiki_results": formatted_search_docs} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def analyze_image_basic_properties(image_base64: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze basic properties of an image (size, mode, color analysis, thumbnail preview). |
|
|
Args: |
|
|
image_base64 (str): Base64 encoded image string |
|
|
Returns: |
|
|
Dictionary with analysis result |
|
|
""" |
|
|
try: |
|
|
img = decode_image(image_base64) |
|
|
width, height = img.size |
|
|
mode = img.mode |
|
|
|
|
|
if mode in ("RGB", "RGBA"): |
|
|
arr = np.array(img) |
|
|
avg_colors = arr.mean(axis=(0, 1)) |
|
|
dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])] |
|
|
brightness = avg_colors.mean() |
|
|
color_analysis = { |
|
|
"average_rgb": avg_colors.tolist(), |
|
|
"brightness": brightness, |
|
|
"dominant_color": dominant, |
|
|
} |
|
|
else: |
|
|
color_analysis = {"note": f"No color analysis for mode {mode}"} |
|
|
|
|
|
thumbnail = img.copy() |
|
|
thumbnail.thumbnail((100, 100)) |
|
|
thumb_path = save_image(thumbnail, "thumbnails") |
|
|
thumbnail_base64 = encode_image(thumb_path) |
|
|
|
|
|
return { |
|
|
"dimensions": (width, height), |
|
|
"mode": mode, |
|
|
"color_analysis": color_analysis, |
|
|
"thumbnail": thumbnail_base64, |
|
|
} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
@tool |
|
|
def transform_image( |
|
|
image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale. |
|
|
Args: |
|
|
image_base64 (str): Base64 encoded input image |
|
|
operation (str): Transformation operation |
|
|
params (Dict[str, Any], optional): Parameters for the operation |
|
|
Returns: |
|
|
Dictionary with transformed image (base64) |
|
|
""" |
|
|
try: |
|
|
img = decode_image(image_base64) |
|
|
params = params or {} |
|
|
|
|
|
if operation == "resize": |
|
|
img = img.resize( |
|
|
( |
|
|
params.get("width", img.width // 2), |
|
|
params.get("height", img.height // 2), |
|
|
) |
|
|
) |
|
|
elif operation == "rotate": |
|
|
img = img.rotate(params.get("angle", 90), expand=True) |
|
|
elif operation == "crop": |
|
|
img = img.crop( |
|
|
( |
|
|
params.get("left", 0), |
|
|
params.get("top", 0), |
|
|
params.get("right", img.width), |
|
|
params.get("bottom", img.height), |
|
|
) |
|
|
) |
|
|
elif operation == "flip": |
|
|
if params.get("direction", "horizontal") == "horizontal": |
|
|
img = img.transpose(Image.FLIP_LEFT_RIGHT) |
|
|
else: |
|
|
img = img.transpose(Image.FLIP_TOP_BOTTOM) |
|
|
elif operation == "adjust_brightness": |
|
|
img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5)) |
|
|
elif operation == "adjust_contrast": |
|
|
img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5)) |
|
|
elif operation == "blur": |
|
|
img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2))) |
|
|
elif operation == "sharpen": |
|
|
img = img.filter(ImageFilter.SHARPEN) |
|
|
elif operation == "grayscale": |
|
|
img = img.convert("L") |
|
|
else: |
|
|
return {"error": f"Unknown operation: {operation}"} |
|
|
|
|
|
result_path = save_image(img) |
|
|
result_base64 = encode_image(result_path) |
|
|
return {"transformed_image": result_base64} |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
@tool |
|
|
def draw_on_image( |
|
|
image_base64: str, drawing_type: str, params: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Draw shapes (rectangle, circle, line) or text onto an image. |
|
|
Args: |
|
|
image_base64 (str): Base64 encoded input image |
|
|
drawing_type (str): Drawing type |
|
|
params (Dict[str, Any]): Drawing parameters |
|
|
Returns: |
|
|
Dictionary with result image (base64) |
|
|
""" |
|
|
try: |
|
|
img = decode_image(image_base64) |
|
|
draw = ImageDraw.Draw(img) |
|
|
color = params.get("color", "red") |
|
|
|
|
|
if drawing_type == "rectangle": |
|
|
draw.rectangle( |
|
|
[params["left"], params["top"], params["right"], params["bottom"]], |
|
|
outline=color, |
|
|
width=params.get("width", 2), |
|
|
) |
|
|
elif drawing_type == "circle": |
|
|
x, y, r = params["x"], params["y"], params["radius"] |
|
|
draw.ellipse( |
|
|
(x - r, y - r, x + r, y + r), |
|
|
outline=color, |
|
|
width=params.get("width", 2), |
|
|
) |
|
|
elif drawing_type == "line": |
|
|
draw.line( |
|
|
( |
|
|
params["start_x"], |
|
|
params["start_y"], |
|
|
params["end_x"], |
|
|
params["end_y"], |
|
|
), |
|
|
fill=color, |
|
|
width=params.get("width", 2), |
|
|
) |
|
|
elif drawing_type == "text": |
|
|
font_size = params.get("font_size", 20) |
|
|
try: |
|
|
font = ImageFont.truetype("arial.ttf", font_size) |
|
|
except IOError: |
|
|
font = ImageFont.load_default() |
|
|
draw.text( |
|
|
(params["x"], params["y"]), |
|
|
params.get("text", "Text"), |
|
|
fill=color, |
|
|
font=font, |
|
|
) |
|
|
else: |
|
|
return {"error": f"Unknown drawing type: {drawing_type}"} |
|
|
|
|
|
result_path = save_image(img) |
|
|
result_base64 = encode_image(result_path) |
|
|
return {"result_image": result_base64} |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
@tool |
|
|
def generate_simple_image( |
|
|
image_type: str, |
|
|
width: int = 500, |
|
|
height: int = 500, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate a simple image (gradient, noise, pattern, chart). |
|
|
Args: |
|
|
image_type (str): Type of image |
|
|
width (int), height (int) |
|
|
params (Dict[str, Any], optional): Specific parameters |
|
|
Returns: |
|
|
Dictionary with generated image (base64) |
|
|
""" |
|
|
try: |
|
|
params = params or {} |
|
|
|
|
|
if image_type == "gradient": |
|
|
direction = params.get("direction", "horizontal") |
|
|
start_color = params.get("start_color", (255, 0, 0)) |
|
|
end_color = params.get("end_color", (0, 0, 255)) |
|
|
|
|
|
img = Image.new("RGB", (width, height)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
if direction == "horizontal": |
|
|
for x in range(width): |
|
|
r = int( |
|
|
start_color[0] + (end_color[0] - start_color[0]) * x / width |
|
|
) |
|
|
g = int( |
|
|
start_color[1] + (end_color[1] - start_color[1]) * x / width |
|
|
) |
|
|
b = int( |
|
|
start_color[2] + (end_color[2] - start_color[2]) * x / width |
|
|
) |
|
|
draw.line([(x, 0), (x, height)], fill=(r, g, b)) |
|
|
else: |
|
|
for y in range(height): |
|
|
r = int( |
|
|
start_color[0] + (end_color[0] - start_color[0]) * y / height |
|
|
) |
|
|
g = int( |
|
|
start_color[1] + (end_color[1] - start_color[1]) * y / height |
|
|
) |
|
|
b = int( |
|
|
start_color[2] + (end_color[2] - start_color[2]) * y / height |
|
|
) |
|
|
draw.line([(0, y), (width, y)], fill=(r, g, b)) |
|
|
|
|
|
elif image_type == "noise": |
|
|
noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) |
|
|
img = Image.fromarray(noise_array, "RGB") |
|
|
|
|
|
else: |
|
|
return {"error": f"Unsupported image_type {image_type}"} |
|
|
|
|
|
result_path = save_image(img) |
|
|
result_base64 = encode_image(result_path) |
|
|
return {"generated_image": result_base64} |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
|
def combine_images( |
|
|
images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Combine multiple images (collage, stack, blend). |
|
|
Args: |
|
|
images_base64 (List[str]): List of base64 images |
|
|
operation (str): Combination type |
|
|
params (Dict[str, Any], optional) |
|
|
Returns: |
|
|
Dictionary with combined image (base64) |
|
|
""" |
|
|
try: |
|
|
images = [decode_image(b64) for b64 in images_base64] |
|
|
params = params or {} |
|
|
|
|
|
if operation == "stack": |
|
|
direction = params.get("direction", "horizontal") |
|
|
if direction == "horizontal": |
|
|
total_width = sum(img.width for img in images) |
|
|
max_height = max(img.height for img in images) |
|
|
new_img = Image.new("RGB", (total_width, max_height)) |
|
|
x = 0 |
|
|
for img in images: |
|
|
new_img.paste(img, (x, 0)) |
|
|
x += img.width |
|
|
else: |
|
|
max_width = max(img.width for img in images) |
|
|
total_height = sum(img.height for img in images) |
|
|
new_img = Image.new("RGB", (max_width, total_height)) |
|
|
y = 0 |
|
|
for img in images: |
|
|
new_img.paste(img, (0, y)) |
|
|
y += img.height |
|
|
else: |
|
|
return {"error": f"Unsupported combination operation {operation}"} |
|
|
|
|
|
result_path = save_image(new_img) |
|
|
result_base64 = encode_image(result_path) |
|
|
return {"combined_image": result_base64} |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
transform_imagetool = Tool.from_function( |
|
|
func=transform_image, |
|
|
name="transform_imagetool", |
|
|
description="Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale." |
|
|
) |
|
|
analyze_image_basic_propertiestool = Tool.from_function( |
|
|
func=analyze_image_basic_properties, |
|
|
name="analyze_image_basic_propertiestool", |
|
|
description="Analyze basic properties of an image (size, mode, color analysis, thumbnail preview)." |
|
|
) |
|
|
|
|
|
draw_on_imagetool = Tool.from_function( |
|
|
func=draw_on_image, |
|
|
name="draw_on_imagetool", |
|
|
description="Draw shapes (rectangle, circle, line) or text onto an image." |
|
|
) |
|
|
|
|
|
generate_simple_imagetool = Tool.from_function( |
|
|
func=generate_simple_image, |
|
|
name="generate_simple_imagetool", |
|
|
description="Generate a simple image (gradient, noise, pattern, chart)." |
|
|
) |
|
|
|
|
|
combine_imagestool = Tool.from_function( |
|
|
func=combine_images, |
|
|
name="combine_imagestool", |
|
|
description="Combine multiple images (collage, stack, blend)." |
|
|
) |
|
|
|
|
|
|
|
|
wiki_search_Tool = Tool.from_function( |
|
|
func=wiki_search, |
|
|
name="wiki_search_Tool", |
|
|
description="Search Wikipedia for a query and return maximum 2 results" |
|
|
) |
|
|
|
|
|
subtract_tool = Tool.from_function( |
|
|
func=subtract, |
|
|
name="subtract", |
|
|
description="Subtracts one number from another (a - b)." |
|
|
) |
|
|
read_file_Tool = Tool.from_function( |
|
|
func=read_file_tool, |
|
|
name="read_file_tool", |
|
|
description="Reads content of a file given its path." |
|
|
) |
|
|
|
|
|
download_task_filenameTool = Tool.from_function( |
|
|
func=download_task_file_name, |
|
|
name="download_task_filename_tool", |
|
|
description=" Downloads a file releated to the question using the (file_name) or (task_id), it returns the file_path, you can then use that file_path as input to other tools." |
|
|
) |
|
|
modulus_tool = Tool.from_function( |
|
|
func=modulus, |
|
|
name="modulus_tool", |
|
|
description="Get the modulus of two numbers." |
|
|
) |
|
|
power_tool = Tool.from_function( |
|
|
func=power, |
|
|
name="power_tool", |
|
|
description="Get the power of two numbers." |
|
|
) |
|
|
|
|
|
|
|
|
Download_document_from_url_Tool = Tool.from_function( |
|
|
func=download_document_from_url, |
|
|
name="download_document_from_url", |
|
|
description="Downloads and saves a document from a specified URL." |
|
|
) |
|
|
|
|
|
Excel_ReaderTool = Tool.from_function( |
|
|
func=Excel_Reader, |
|
|
name="excel_reader", |
|
|
description="Reads an Excel (.xlsx) file and extracts its content." |
|
|
) |
|
|
|
|
|
transcribe_audio_Tool = Tool.from_function( |
|
|
func=transcribe_audio, |
|
|
name="transcribe_audio", |
|
|
description="Transcribes spoken content from an audio file into text using Whisper." |
|
|
) |
|
|
|
|
|
download_file_from_urlTool = Tool.from_function( |
|
|
func=download_file_from_url, |
|
|
name="download_file_from_url", |
|
|
description="Downloads any file (PDF, image, etc.) from a given URL and saves it locally." |
|
|
) |
|
|
|
|
|
save_and_read_fileTool = Tool.from_function( |
|
|
func=save_and_read_file, |
|
|
name="save_and_read_file", |
|
|
description="Saves provided content to a local file and reads the file's content back." |
|
|
|
|
|
) |
|
|
|
|
|
analyze_imageTool_ = Tool.from_function( |
|
|
func=analyze_imageTool, |
|
|
name="analyze_image", |
|
|
description="A vision model that analyzes the input_file and answers the query" |
|
|
) |
|
|
|
|
|
run_python_scriptTool = Tool.from_function( |
|
|
func=run_python_script, |
|
|
name="run_python_script", |
|
|
description="Executes custom Python code dynamically and returns the output or result." |
|
|
) |
|
|
|
|
|
analyze_csv_fileTool = Tool.from_function( |
|
|
func=analyze_csv_file, |
|
|
name="analyze_csv_file", |
|
|
description="Loads and analyzes a CSV file to answer questions or summarize data." |
|
|
) |
|
|
|
|
|
extract_textTool = Tool.from_function( |
|
|
func=extract_text, |
|
|
name="extract_textTool", |
|
|
description="Extracts all readable text from an image using a vision model (OpenAI multimodal)." |
|
|
) |
|
|
extract_text_from_image_tool = Tool.from_function( |
|
|
func=extract_text_from_image, |
|
|
name="extract_text_from_image", |
|
|
description=" Extract text from an image using OCR library pytesseract (if available)." |
|
|
) |
|
|
|
|
|
arxiv_searchTool = Tool.from_function( |
|
|
func=arxiv_search, |
|
|
name="arxiv_search", |
|
|
description="Searches for academic papers on Arxiv.org based on a query and returns summaries." |
|
|
) |
|
|
|
|
|
read_doc_fileTool = Tool.from_function( |
|
|
func=read_doc_file, |
|
|
name="read_doc_file", |
|
|
description="Reads the content of a Word (.docx) file and returns the extracted text." |
|
|
) |
|
|
Search_wikipediaTool = Tool.from_function( |
|
|
func=Search_wikipedia, |
|
|
name="search_wikipedia", |
|
|
description="Search wikipedia with a query and return maximum 3 results." |
|
|
) |
|
|
duckduckgo_websearch_tool = Tool.from_function( |
|
|
func=duckduckgo_tool, |
|
|
name="duckduckgo_websearch_tool", |
|
|
description="Tool that queries the DuckDuckGo search API and returns the results in `output_format" |
|
|
) |
|
|
square_root_tool = Tool.from_function( |
|
|
func=square_root, |
|
|
name="square_root_tool", |
|
|
description=" Get the square root of a number." |
|
|
) |
|
|
PythonInterpreter_Tool = Tool.from_function( |
|
|
func=Custom_PythonInterpreter_Tool, |
|
|
name="PythonInterpreter_Tool", |
|
|
description="This is a tool that evaluates python code. It can be used to perform calculations.", |
|
|
|
|
|
) |
|
|
google_search_tool_ = Tool.from_function( |
|
|
func=google_search_tool, |
|
|
name="google_search_tool_", |
|
|
description="Searches Google with a query", |
|
|
) |
|
|
Web_search_DuckDuckGoSearch_Tool = Tool.from_function( |
|
|
func=DuckDuckGoSearch_Tool, |
|
|
name="Web_search_DuckDuckGoSearch_Tool", |
|
|
description="This is a tool that search on the duckduck web and provides information .", |
|
|
) |
|
|
|
|
|
WikipediaSearch_Tool = Tool.from_function( |
|
|
func=web_search_wikipedia, |
|
|
name="web_search_duckduckgo_tool", |
|
|
description="The topic to search on Wikipedia..", |
|
|
) |
|
|
visit_webpage = Tool.from_function( |
|
|
func=visitwebpage_tool, |
|
|
name="visit_webpage", |
|
|
description="Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages.", |
|
|
) |
|
|
|
|
|
arxiv_tool = Tool.from_function( |
|
|
func=arxiv_search_tool, |
|
|
name="arxiv_search_tool", |
|
|
description="Searches academic papers on Arxiv.org by query and returns summaries." |
|
|
) |
|
|
|
|
|
analyze_excel_fileTool = Tool.from_function( |
|
|
func=analyze_excel_file, |
|
|
name="analyze_excel_fileTool", |
|
|
description="Analyze an Excel file using pandas and answer a question about it.." |
|
|
) |
|
|
analyze_excel_fileTool = Tool.from_function( |
|
|
func=analyze_excel_file, |
|
|
name="analyze_excel_fileTool", |
|
|
description="Analyze an Excel file using pandas and answer a question about it.." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Tools = [ |
|
|
subtract_tool, |
|
|
Download_document_from_url_Tool, |
|
|
Excel_ReaderTool, |
|
|
transcribe_audio_Tool, |
|
|
download_file_from_urlTool, |
|
|
save_and_read_fileTool, |
|
|
analyze_imageTool_, |
|
|
run_python_scriptTool, |
|
|
analyze_csv_fileTool, |
|
|
extract_textTool, |
|
|
arxiv_searchTool, |
|
|
read_doc_fileTool, |
|
|
Search_wikipediaTool, |
|
|
duckduckgo_websearch_tool, |
|
|
modulus_tool, |
|
|
power_tool, |
|
|
square_root_tool, |
|
|
extract_text_from_image_tool, |
|
|
download_task_filenameTool, |
|
|
read_file_Tool, |
|
|
PythonInterpreter_Tool, |
|
|
google_search_tool_, |
|
|
Web_search_DuckDuckGoSearch_Tool, |
|
|
visit_webpage, |
|
|
WikipediaSearch_Tool, |
|
|
arxiv_tool, |
|
|
wiki_search_Tool, |
|
|
analyze_excel_fileTool, |
|
|
combine_imagestool, |
|
|
generate_simple_imagetool, |
|
|
draw_on_imagetool, |
|
|
transform_imagetool, |
|
|
analyze_image_basic_propertiestool |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
from langchain_core.messages import SystemMessage |
|
|
|
|
|
|
|
|
prompt_template = "prompt_template.txt" |
|
|
with open(prompt_template, 'r', encoding='utf-8') as file: |
|
|
prompt_content = file.read() |
|
|
|
|
|
|
|
|
system_message = SystemMessage(content=prompt_content) |
|
|
|
|
|
ai_agent = create_react_agent( |
|
|
model=llm, |
|
|
tools=Tools, |
|
|
prompt=system_message |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
return ai_agent |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|