from langchain_openai import ChatOpenAI from langchain_community.document_loaders import WikipediaLoader,ArxivLoader from langchain_tavily import TavilySearch from langchain.schema import HumanMessage from openai import OpenAI from langchain.tools import tool import pandas as pd from langchain_core.callbacks.manager import CallbackManager from langchain_core.callbacks.stdout import StdOutCallbackHandler from langgraph.types import Command from langchain.docstore.document import Document from typing import List, Dict, Any, Optional import uuid import tempfile from langchain.agents import Tool from urllib.parse import urlparse import pytesseract from langgraph.prebuilt import create_react_agent from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter import requests from dotenv import load_dotenv import os import cmath import httpx from pathlib import Path import base64 from langchain_community.tools import DuckDuckGoSearchResults from smolagents import DuckDuckGoSearchTool,PythonInterpreterTool,WikipediaSearchTool,VisitWebpageTool,GoogleSearchTool import numpy as np # Helper functions for image processing def encode_image(image_path: str) -> str: """Convert an image file to base64 string.""" with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") def decode_image(base64_string: str) -> Image.Image: """Convert a base64 string to a PIL Image.""" image_data = base64.b64decode(base64_string) return Image.open(io.BytesIO(image_data)) def save_image(image: Image.Image, directory: str = "image_outputs") -> str: """Save a PIL Image to disk and return the path.""" os.makedirs(directory, exist_ok=True) image_id = str(uuid.uuid4()) image_path = os.path.join(directory, f"{image_id}.png") image.save(image_path) return image_path load_dotenv() ChatGroq_key=os.getenv("ChatGroq") HF_TOKEN=os.getenv("HF_TOKEN") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") SERPAPI_API_KEY=os.getenv("SERPAPI_API_KEY") TAVILY_API_KEY=os.getenv("TAVILY_API_KEY") duckduckgo_tool = DuckDuckGoSearchResults() callback_handler = StdOutCallbackHandler() callback_manager = CallbackManager(handlers=[callback_handler]) Custom_PythonInterpreter_Tool = PythonInterpreterTool() DuckDuckGoSearch_Tool = DuckDuckGoSearchTool() web_search_wikipedia = WikipediaSearchTool() visitwebpage_tool = VisitWebpageTool() @tool def google_search_tool(query:str) -> str: """Performs a google web search for your query then returns a string of the top search results. Args: query (str): The search query to perform. """ search_engine = GoogleSearchTool() response = search_engine(query=query,api_key=SERPAPI_API_KEY) return response def arxiv_search_tool(query: str) -> str: """ Tool function to search Arxiv papers by query and return summary info. """ loader = ArxivLoader(query=query, load_max_docs=2) # limit to 2 papers for brevity docs = loader.load() if not docs: return "No papers found for the query." # Prepare a summary string of the papers found summaries = [] for doc in docs: meta = doc.metadata summary = f"Title: {meta.get('Title', 'N/A')}\n" \ f"Authors: {meta.get('Authors', 'N/A')}\n" \ f"Published: {meta.get('Published', 'N/A')}\n" \ f"Summary: {meta.get('Summary', 'N/A')[:500]}..." # limit summary length summaries.append(summary) return "\n\n".join(summaries) @tool def analyze_csv_file(file_path: str) -> str: """ Analyze a CSV file using pandas and answer a question about it. Args: file_path (str): the path to the CSV file. """ try: # Read the CSV file df = pd.read_csv(file_path) # Run various analyses based on the query result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" # Add summary statistics result += "Summary statistics:\n" result += str(df.describe()) return result except Exception as e: return f"Error analyzing CSV file: {str(e)}" @tool def extract_text_from_image(image_path: str) -> str: """ Extract text from an image using OCR library pytesseract (if available). Args: image_path (str): the path to the image file. """ try: # Open the image image = Image.open(image_path) # Extract text from the image text = pytesseract.image_to_string(image) return f"Extracted text from image:\n\n{text}" except Exception as e: return f"Error extracting text from image: {str(e)}" @tool def download_document_from_url(url: str) -> str: """ Download document from URL to temporary file Args: url (str): url to download """ with httpx.Client() as client: response = client.get(url) response.raise_for_status() # Get filename from response file_ext = None # Check if response has a suggested filename if hasattr(response, 'headers'): content_disposition = response.headers.get('content-disposition', '') if 'filename=' in content_disposition: import re match = re.search(r'filename[^;=\n]*=(([\'"]).*?\2|[^;\n]*)', content_disposition) if match: filename = match.group(1).strip('\'"') file_ext = Path(filename).suffix # Fallback to final URL after redirects if not file_ext: final_url = str(response.url) file_ext = Path(urlparse(final_url).path).suffix # Default to .bin if no extension found if not file_ext: file_ext = '.bin' with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file: tmp_file.write(response.content) return tmp_file.name @tool def read_doc_file(file_path: str) -> str: """ Reads a .docx or .doc file and returns the extracted text. Args: file_path (str): Path to the Word document. """ try: if file_path.endswith(".docx"): from docx import Document as DocxDocument doc = DocxDocument(file_path) full_text = "\n".join([para.text for para in doc.paragraphs]) return full_text elif file_path.endswith(".doc"): import textract text = textract.process(file_path).decode("utf-8") return text else: return "Unsupported file type. Please provide a .doc or .docx file." except Exception as e: return f"Failed to read document: {str(e)}" @tool def save_and_read_file(content: str, filename: Optional[str] = None) -> str: """ Save content to a file and return the path. Args: content (str): the content to save to the file filename (str, optional): the name of the file. If not provided, a random name file will be created. """ temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) with open(filepath, "w") as f: f.write(content) return f"File saved to {filepath}. You can read this file to process its contents." import speech_recognition as sr import os import requests from pydub import AudioSegment def transcribe_audio_from_path(local_audio_path: str, language: str = "en-US") -> str: """ Transcribes audio content from a local file path to a text string. This tool is designed to convert spoken content from a locally saved audio file into written text. It expects a path to an audio file that has already been downloaded and saved to the local environment (e.g., using 'file_saver'). Supports various audio formats (e.g., MP3, WAV) and converts them to WAV internally for transcription. For best results, specify the correct language code (e.g., 'en-US' for US English, 'es-ES' for Spanish). Args: local_audio_path (str): The local file path to the audio (e.g., "my_recording.mp3"). This MUST be a path to a file already existing on disk. language (str, optional): The spoken language in the audio. Defaults to "en-US". Refer to Google Speech Recognition language codes for options. Returns: str: The transcribed text, or an informative error message if transcription fails. """ r = sr.Recognizer() temp_wav_path = "temp_audio_to_transcribe.wav" # Temporary WAV file for transcription transcribed_text = "" try: # Ensure it's a local path and file exists if local_audio_path.startswith("http://") or local_audio_path.startswith("https://"): return "Error: This tool only accepts local file paths, not URLs. Please use 'file_saver' first." if not os.path.exists(local_audio_path): return f"Error: Local audio file not found at '{local_audio_path}'." # Convert to WAV if not already (SpeechRecognition prefers WAV) audio = AudioSegment.from_file(local_audio_path) audio.export(temp_wav_path, format="wav") # Transcribe the audio with sr.AudioFile(temp_wav_path) as source: audio_listened = r.record(source) try: transcribed_text = r.recognize_google(audio_listened, language=language) except sr.UnknownValueError: return "Could not understand audio (speech not clear or too short)." except sr.RequestError as e: return f"Could not request results from Google Speech Recognition service; {e}" except FileNotFoundError: # This should be caught by os.path.exists now, but good for robustness return f"Error: Audio file not found at '{local_audio_path}'." except Exception as e: return f"An unexpected error occurred during audio processing or transcription: {e}" finally: # Clean up temporary WAV file if os.path.exists(temp_wav_path): os.remove(temp_wav_path) return transcribed_text.strip() @tool def download_file_from_url(url: str, filename: Optional[str] = None) -> str: """ Download a file from a URL and save it to a temporary location. Args: url (str): the URL of the file to download. filename (str, optional): the name of the file. If not provided, a random name file will be created. """ try: # Parse URL to get filename if not provided if not filename: path = urlparse(url).path filename = os.path.basename(path) if not filename: filename = f"downloaded_{uuid.uuid4().hex[:8]}" # Create temporary file temp_dir = tempfile.gettempdir() filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(url, stream=True) response.raise_for_status() # Save the file with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"File downloaded to {filepath}. You can read this file to process its contents." except Exception as e: return f"Error downloading file: {str(e)}" import subprocess import sys from typing import Optional from langchain.tools import tool @tool def run_python_script(script_path: str, timeout: Optional[int] = 30) -> str: """ Execute a .py file at `script_path` in a sandboxed subprocess, capture stdout/stderr, and return it as a string. Args: script_path (str): Local path to the Python script. timeout (int, optional): How many seconds to allow before killing it. Returns: str: The script’s captured output or error message. """ try: # Use the same Python interpreter result = subprocess.run( [sys.executable, script_path], capture_output=True, text=True, timeout=timeout, check=True ) return result.stdout.strip() or "" except subprocess.CalledProcessError as e: # Script ran but exited nonzero return f"❌ Exit {e.returncode}:\n{e.stderr.strip()}" except subprocess.TimeoutExpired: return f"❌ Script timed out after {timeout}s." except Exception as e: # Any other failure return f"❌ Execution error: {e}" @tool def arxiv_search(query: str) -> str: """Search Arxiv for a query and return maximum 3 result.""" search_docs = TavilySearch(max_results=3).invoke(query=query) formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"web_results": formatted_search_docs} @tool def Excel_Reader(file_path: str)-> str: """ Analyze an Excel file using pandas and answer a question about it. Args: file_path (str): the path to the Excel file. """ try: df = pd.read_excel(file_path) result = ( f"✅ Excel file loaded successfully.\n" f"🧮 Shape: {df.shape[0]} rows × {df.shape[1]} columns.\n" f"🧾 Columns: {', '.join(df.columns)}\n\n" f"📊 Data types:\n{df.dtypes.to_string()}\n\n" f"📈 Summary statistics:\n{df.describe(include='all').to_string()}\n" ) return result except Exception as e: return f"Error analyzing Excel file: {str(e)}" @tool def analyze_excel_file(file_path: str, query: str) -> str: """ Analyze an Excel file using pandas and answer a question about it. Args: file_path (str): the path to the Excel file. query (str): Question about the data """ try: # Read the Excel file df = pd.read_excel(file_path) # Run various analyses based on the query result = ( f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" ) result += f"Columns: {', '.join(df.columns)}\n\n" # Add summary statistics result += "Summary statistics:\n" result += str(df.describe()) return result except Exception as e: return f"Error analyzing Excel file: {str(e)}" @tool def Search_wikipedia(query: str) -> str: """Search wikipedia with a query and return maximum 3 results. Args: query: The search query. Exsample: "Mercedes Sosa discography" """ search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} @tool def add(a: float, b: float)-> float: """ Adds two numbers. Args: a (float): the first number b (float): the second number """ return a + b @tool def multiply(a: float, b: float) -> float: """ Multiplies two numbers. Args: a (float): the first number b (float): the second number """ return a * b @tool def subtract(a: float, b: float) -> float: """ Subtracts two numbers. Args: a (float): the first number b (float): the second number """ return a - b @tool def divide(a: float, b: float) -> float: """ divide two numbers. Args: a (float): the first number b (float): the second number """ return a / b @tool def modulus(a: int, b: int) -> int: """ Get the modulus of two numbers. Args: a (int): the first number b (int): the second number """ return a % b @tool def power(a: float, b: float) -> float: """ Get the power of two numbers. Args: a (float): the first number b (float): the second number """ return a**b @tool def square_root(a: float) -> float | complex: """ Get the square root of a number. Args: a (float): the number to get the square root of """ if a >= 0: return a**0.5 return cmath.sqrt(a) @tool def download_task_file_name(file_name: str) -> str: """ Downloads a file using the task_id (derived from the file_name), saves it locally with the original extension, and returns the full path. """ task_id, ext = os.path.splitext(file_name) BASE_URL = "https://agents-course-unit4-scoring.hf.space" url = f"{BASE_URL}/files/{task_id}" response = requests.get(url) response.raise_for_status() # Save to temp dir with correct extension tmp_dir = tempfile.gettempdir() local_path = os.path.join(tmp_dir, file_name) with open(local_path, "wb") as f: f.write(response.content) print(f"downloaded task file and returned at path... {local_path}") return local_path @tool def transcribe_audio(file_path: str) -> str: """Transcribes a audio file to text Args: file_path: The file_path to the audio """ try: client = OpenAI(api_key=OPENAI_API_KEY) with open(file_path, "rb") as audio_file: transcript = client.audio.transcriptions.create( file=audio_file, model="whisper-1" ) return transcript.text except Exception as e: return f"Transcription failed: {str(e)}" def build_graph(): """Builds the graph""" llm = ChatOpenAI( model="gpt-4o", api_key=OPENAI_API_KEY, max_retries=5, verbose=True, timeout=10 ) @tool def extract_text(img_path: str) -> str: """ Extract text from an image file using a multimodal model. Args: img_path (str): The path for the image """ all_text = "" try: # Read image and encode as base64 with open(img_path, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") # Prepare the prompt including the base64 image data message = [ HumanMessage( content=[ { "type": "text", "text": ( "Extract all the text from this image. " "Return only the extracted text, no explanations." ), }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{image_base64}" }, }, ] ) ] response = ai_agent.invoke(message) all_text += response.content + "\n\n" return all_text.strip() except Exception as e: error_msg = f"Error extracting text: {str(e)}" print(error_msg) return "" @tool def analyze_imageTool(query: str, input_file: str) -> str: """ A vision model that analyzes the input_file and answers the query Args: query: The question/task for the vision model to analyze on the file input_file: The file for the vision model (.png, .jpg etc.) """ try: print(f"[analyze_image]: query: {query}, input_file: {input_file}") # Read image and encode as base64 with open(input_file, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") message = [ HumanMessage( content=[ { "type": "text", "text": query, }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{image_base64}" }, }, ] ) ] assistant = ChatOpenAI( model="gpt-4o", api_key=OPENAI_API_KEY, temperature=0.8, max_retries=2, verbose=True, timeout=10 ) response = assistant.invoke(message) print(f"[function] returning: {response}") return response.content.strip() except Exception as e: error_msg = f"Error analyzing image: {str(e)}" print(error_msg) return "" def read_file_tool(file_path: str) -> str: """ Open the file and read contents or do something meaningful Args: file_path: The path to the file_path """ with open(file_path, 'r') as f: content = f.read() return content @tool def wiki_search(query: str) -> str: """Search Wikipedia for a query and return maximum 2 results. Args: query: The search query.""" search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"wiki_results": formatted_search_docs} @tool def analyze_image_basic_properties(image_base64: str) -> Dict[str, Any]: """ Analyze basic properties of an image (size, mode, color analysis, thumbnail preview). Args: image_base64 (str): Base64 encoded image string Returns: Dictionary with analysis result """ try: img = decode_image(image_base64) width, height = img.size mode = img.mode if mode in ("RGB", "RGBA"): arr = np.array(img) avg_colors = arr.mean(axis=(0, 1)) dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])] brightness = avg_colors.mean() color_analysis = { "average_rgb": avg_colors.tolist(), "brightness": brightness, "dominant_color": dominant, } else: color_analysis = {"note": f"No color analysis for mode {mode}"} thumbnail = img.copy() thumbnail.thumbnail((100, 100)) thumb_path = save_image(thumbnail, "thumbnails") thumbnail_base64 = encode_image(thumb_path) return { "dimensions": (width, height), "mode": mode, "color_analysis": color_analysis, "thumbnail": thumbnail_base64, } except Exception as e: return {"error": str(e)} @tool def transform_image( image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale. Args: image_base64 (str): Base64 encoded input image operation (str): Transformation operation params (Dict[str, Any], optional): Parameters for the operation Returns: Dictionary with transformed image (base64) """ try: img = decode_image(image_base64) params = params or {} if operation == "resize": img = img.resize( ( params.get("width", img.width // 2), params.get("height", img.height // 2), ) ) elif operation == "rotate": img = img.rotate(params.get("angle", 90), expand=True) elif operation == "crop": img = img.crop( ( params.get("left", 0), params.get("top", 0), params.get("right", img.width), params.get("bottom", img.height), ) ) elif operation == "flip": if params.get("direction", "horizontal") == "horizontal": img = img.transpose(Image.FLIP_LEFT_RIGHT) else: img = img.transpose(Image.FLIP_TOP_BOTTOM) elif operation == "adjust_brightness": img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5)) elif operation == "adjust_contrast": img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5)) elif operation == "blur": img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2))) elif operation == "sharpen": img = img.filter(ImageFilter.SHARPEN) elif operation == "grayscale": img = img.convert("L") else: return {"error": f"Unknown operation: {operation}"} result_path = save_image(img) result_base64 = encode_image(result_path) return {"transformed_image": result_base64} except Exception as e: return {"error": str(e)} @tool def draw_on_image( image_base64: str, drawing_type: str, params: Dict[str, Any] ) -> Dict[str, Any]: """ Draw shapes (rectangle, circle, line) or text onto an image. Args: image_base64 (str): Base64 encoded input image drawing_type (str): Drawing type params (Dict[str, Any]): Drawing parameters Returns: Dictionary with result image (base64) """ try: img = decode_image(image_base64) draw = ImageDraw.Draw(img) color = params.get("color", "red") if drawing_type == "rectangle": draw.rectangle( [params["left"], params["top"], params["right"], params["bottom"]], outline=color, width=params.get("width", 2), ) elif drawing_type == "circle": x, y, r = params["x"], params["y"], params["radius"] draw.ellipse( (x - r, y - r, x + r, y + r), outline=color, width=params.get("width", 2), ) elif drawing_type == "line": draw.line( ( params["start_x"], params["start_y"], params["end_x"], params["end_y"], ), fill=color, width=params.get("width", 2), ) elif drawing_type == "text": font_size = params.get("font_size", 20) try: font = ImageFont.truetype("arial.ttf", font_size) except IOError: font = ImageFont.load_default() draw.text( (params["x"], params["y"]), params.get("text", "Text"), fill=color, font=font, ) else: return {"error": f"Unknown drawing type: {drawing_type}"} result_path = save_image(img) result_base64 = encode_image(result_path) return {"result_image": result_base64} except Exception as e: return {"error": str(e)} @tool def generate_simple_image( image_type: str, width: int = 500, height: int = 500, params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Generate a simple image (gradient, noise, pattern, chart). Args: image_type (str): Type of image width (int), height (int) params (Dict[str, Any], optional): Specific parameters Returns: Dictionary with generated image (base64) """ try: params = params or {} if image_type == "gradient": direction = params.get("direction", "horizontal") start_color = params.get("start_color", (255, 0, 0)) end_color = params.get("end_color", (0, 0, 255)) img = Image.new("RGB", (width, height)) draw = ImageDraw.Draw(img) if direction == "horizontal": for x in range(width): r = int( start_color[0] + (end_color[0] - start_color[0]) * x / width ) g = int( start_color[1] + (end_color[1] - start_color[1]) * x / width ) b = int( start_color[2] + (end_color[2] - start_color[2]) * x / width ) draw.line([(x, 0), (x, height)], fill=(r, g, b)) else: for y in range(height): r = int( start_color[0] + (end_color[0] - start_color[0]) * y / height ) g = int( start_color[1] + (end_color[1] - start_color[1]) * y / height ) b = int( start_color[2] + (end_color[2] - start_color[2]) * y / height ) draw.line([(0, y), (width, y)], fill=(r, g, b)) elif image_type == "noise": noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) img = Image.fromarray(noise_array, "RGB") else: return {"error": f"Unsupported image_type {image_type}"} result_path = save_image(img) result_base64 = encode_image(result_path) return {"generated_image": result_base64} except Exception as e: return {"error": str(e)} @tool def combine_images( images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Combine multiple images (collage, stack, blend). Args: images_base64 (List[str]): List of base64 images operation (str): Combination type params (Dict[str, Any], optional) Returns: Dictionary with combined image (base64) """ try: images = [decode_image(b64) for b64 in images_base64] params = params or {} if operation == "stack": direction = params.get("direction", "horizontal") if direction == "horizontal": total_width = sum(img.width for img in images) max_height = max(img.height for img in images) new_img = Image.new("RGB", (total_width, max_height)) x = 0 for img in images: new_img.paste(img, (x, 0)) x += img.width else: max_width = max(img.width for img in images) total_height = sum(img.height for img in images) new_img = Image.new("RGB", (max_width, total_height)) y = 0 for img in images: new_img.paste(img, (0, y)) y += img.height else: return {"error": f"Unsupported combination operation {operation}"} result_path = save_image(new_img) result_base64 = encode_image(result_path) return {"combined_image": result_base64} except Exception as e: return {"error": str(e)} transform_imagetool = Tool.from_function( func=transform_image, name="transform_imagetool", description="Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale." ) analyze_image_basic_propertiestool = Tool.from_function( func=analyze_image_basic_properties, name="analyze_image_basic_propertiestool", description="Analyze basic properties of an image (size, mode, color analysis, thumbnail preview)." ) draw_on_imagetool = Tool.from_function( func=draw_on_image, name="draw_on_imagetool", description="Draw shapes (rectangle, circle, line) or text onto an image." ) generate_simple_imagetool = Tool.from_function( func=generate_simple_image, name="generate_simple_imagetool", description="Generate a simple image (gradient, noise, pattern, chart)." ) combine_imagestool = Tool.from_function( func=combine_images, name="combine_imagestool", description="Combine multiple images (collage, stack, blend)." ) wiki_search_Tool = Tool.from_function( func=wiki_search, name="wiki_search_Tool", description="Search Wikipedia for a query and return maximum 2 results" ) subtract_tool = Tool.from_function( func=subtract, name="subtract", description="Subtracts one number from another (a - b)." ) read_file_Tool = Tool.from_function( func=read_file_tool, name="read_file_tool", description="Reads content of a file given its path." ) download_task_filenameTool = Tool.from_function( func=download_task_file_name, name="download_task_filename_tool", description=" Downloads a file releated to the question using the (file_name) or (task_id), it returns the file_path, you can then use that file_path as input to other tools." ) modulus_tool = Tool.from_function( func=modulus, name="modulus_tool", description="Get the modulus of two numbers." ) power_tool = Tool.from_function( func=power, name="power_tool", description="Get the power of two numbers." ) Download_document_from_url_Tool = Tool.from_function( func=download_document_from_url, name="download_document_from_url", description="Downloads and saves a document from a specified URL." ) Excel_ReaderTool = Tool.from_function( func=Excel_Reader, name="excel_reader", description="Reads an Excel (.xlsx) file and extracts its content." ) transcribe_audio_Tool = Tool.from_function( func=transcribe_audio, name="transcribe_audio", description="Transcribes spoken content from an audio file into text using Whisper." ) download_file_from_urlTool = Tool.from_function( func=download_file_from_url, name="download_file_from_url", description="Downloads any file (PDF, image, etc.) from a given URL and saves it locally." ) save_and_read_fileTool = Tool.from_function( func=save_and_read_file, name="save_and_read_file", description="Saves provided content to a local file and reads the file's content back." ) analyze_imageTool_ = Tool.from_function( func=analyze_imageTool, name="analyze_image", description="A vision model that analyzes the input_file and answers the query" ) run_python_scriptTool = Tool.from_function( func=run_python_script, name="run_python_script", description="Executes custom Python code dynamically and returns the output or result." ) analyze_csv_fileTool = Tool.from_function( func=analyze_csv_file, name="analyze_csv_file", description="Loads and analyzes a CSV file to answer questions or summarize data." ) extract_textTool = Tool.from_function( func=extract_text, name="extract_textTool", description="Extracts all readable text from an image using a vision model (OpenAI multimodal)." ) extract_text_from_image_tool = Tool.from_function( func=extract_text_from_image, name="extract_text_from_image", description=" Extract text from an image using OCR library pytesseract (if available)." ) arxiv_searchTool = Tool.from_function( func=arxiv_search, name="arxiv_search", description="Searches for academic papers on Arxiv.org based on a query and returns summaries." ) read_doc_fileTool = Tool.from_function( func=read_doc_file, name="read_doc_file", description="Reads the content of a Word (.docx) file and returns the extracted text." ) Search_wikipediaTool = Tool.from_function( func=Search_wikipedia, name="search_wikipedia", description="Search wikipedia with a query and return maximum 3 results." ) duckduckgo_websearch_tool = Tool.from_function( func=duckduckgo_tool, name="duckduckgo_websearch_tool", description="Tool that queries the DuckDuckGo search API and returns the results in `output_format" ) square_root_tool = Tool.from_function( func=square_root, name="square_root_tool", description=" Get the square root of a number." ) PythonInterpreter_Tool = Tool.from_function( func=Custom_PythonInterpreter_Tool, name="PythonInterpreter_Tool", description="This is a tool that evaluates python code. It can be used to perform calculations.", ) google_search_tool_ = Tool.from_function( func=google_search_tool, name="google_search_tool_", description="Searches Google with a query", ) Web_search_DuckDuckGoSearch_Tool = Tool.from_function( func=DuckDuckGoSearch_Tool, name="Web_search_DuckDuckGoSearch_Tool", description="This is a tool that search on the duckduck web and provides information .", ) WikipediaSearch_Tool = Tool.from_function( func=web_search_wikipedia, name="web_search_duckduckgo_tool", description="The topic to search on Wikipedia..", ) visit_webpage = Tool.from_function( func=visitwebpage_tool, name="visit_webpage", description="Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages.", ) arxiv_tool = Tool.from_function( func=arxiv_search_tool, name="arxiv_search_tool", description="Searches academic papers on Arxiv.org by query and returns summaries." ) analyze_excel_fileTool = Tool.from_function( func=analyze_excel_file, name="analyze_excel_fileTool", description="Analyze an Excel file using pandas and answer a question about it.." ) analyze_excel_fileTool = Tool.from_function( func=analyze_excel_file, name="analyze_excel_fileTool", description="Analyze an Excel file using pandas and answer a question about it.." ) Tools = [ subtract_tool, Download_document_from_url_Tool, Excel_ReaderTool, transcribe_audio_Tool, download_file_from_urlTool, save_and_read_fileTool, analyze_imageTool_, run_python_scriptTool, analyze_csv_fileTool, extract_textTool, arxiv_searchTool, read_doc_fileTool, Search_wikipediaTool, duckduckgo_websearch_tool, modulus_tool, power_tool, square_root_tool, extract_text_from_image_tool, download_task_filenameTool, read_file_Tool, PythonInterpreter_Tool, google_search_tool_, Web_search_DuckDuckGoSearch_Tool, visit_webpage, WikipediaSearch_Tool, arxiv_tool, wiki_search_Tool, analyze_excel_fileTool, combine_imagestool, generate_simple_imagetool, draw_on_imagetool, transform_imagetool, analyze_image_basic_propertiestool ] from langchain_core.messages import SystemMessage # Read the system prompt from the file prompt_template = "prompt_template.txt" with open(prompt_template, 'r', encoding='utf-8') as file: prompt_content = file.read() # Create the SystemMessage system_message = SystemMessage(content=prompt_content) ai_agent = create_react_agent(#from langchain.agents.react.base import ReActAgent model=llm, tools=Tools, prompt=system_message ) return ai_agent # if __name__ == "__main__": # graph = build_graph(provider="openAi") # img_bytes = graph.get_graph().draw_mermaid_png() # with open("dav.png", "wb") as f: # f.write(img_bytes)