import os from dotenv import load_dotenv from typing import List, Dict, Any, Optional from langgraph.graph import START, StateGraph, MessagesState from langgraph.graph.message import add_messages from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage from langgraph.prebuilt import ToolNode from langgraph.graph import START, StateGraph from langgraph.prebuilt import tools_condition from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace from langchain_core.tools import tool from langchain_community.document_loaders import WikipediaLoader from youtube_transcript_api import YouTubeTranscriptApi from langchain_google_genai import ChatGoogleGenerativeAI from langchain_tavily import TavilySearch import tempfile import pandas as pd import numpy as np import requests from urllib.parse import urlparse import uuid from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter import base64 import io load_dotenv() # ReAct System Prompt REACT_SYSTEM_PROMPT = """You are a research assistant that uses ReAct (Reasoning + Acting) methodology. For each question, follow this systematic approach: **THINK**: First, analyze the question carefully. What type of information do you need? What tools might help? **ACT**: Use available tools to gather information. Search thoroughly and verify facts from multiple sources when possible. **OBSERVE**: Analyze the results from your tools. Are they complete and reliable? Do you need more information? **REASON**: Synthesize all information gathered. Check for consistency and identify any gaps or uncertainties. **VERIFY**: Before providing your final answer, double-check your reasoning and ensure you have sufficient evidence. For each question: 1. Break down what you're looking for 2. Use tools systematically to gather comprehensive information 3. Cross-reference information when possible 4. Be honest about limitations - if you cannot find reliable information, say so 5. Only provide confident answers when you have verified evidence When you cannot access certain content (videos, audio, images without tools), clearly state this limitation. Always finish with: FINAL ANSWER: [YOUR FINAL ANSWER] Your final answer should be: - A number (without commas or units unless specified) - As few words as possible for strings (no articles, no abbreviations for cities, spell out digits) - A comma-separated list following the above rules for each element Be thorough in your research but honest about uncertainty. Quality and accuracy are more important than speed. """ @tool def multiply(a:int, b:int) -> int: """ Multiply two numbers """ return a * b @tool def add(a:int, b:int) -> int: """ Add two numbers """ return a + b @tool def subtract(a:int, b:int) -> int: """ Subtract two numbers """ return a - b @tool def divide(a:int, b:int) -> int: """ Divide two numbers """ return a / b @tool def wikidata_search(query: str) -> str: """ Search for information on Wikipedia and return maximum 2 results. Args: query: The search query. """ loader = WikipediaLoader(query=query, load_max_docs=2) docs = loader.load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in docs ]) return {"wiki_results": formatted_search_docs} # Initialize Tavily Search Tool tavily_search_tool = TavilySearch( max_results=3, topic="general", ) @tool def load_youtube_transcript(url: str, add_video_info: bool = True, language: List[str] = ["en"], translation: str = "en") -> str: """ Load transcript from a YouTube video URL. Args: url: YouTube video URL """ try: video_id = url.split("v=")[1] ytt_api = YouTubeTranscriptApi() docs = ytt_api.fetch(video_id) return {"youtube_transcript": docs} except Exception as e: return f"Error loading YouTube transcript: {str(e)}" @tool def save_and_read_file(content: str, filename: Optional[str] = None) -> str: """ Save content to a file and return the path. Args: content (str): the content to save to the file filename (str, optional): the name of the file. If not provided, a random name file will be created. """ temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) with open(filepath, "w") as f: f.write(content) return f"File saved to {filepath}. You can read this file to process its contents." @tool def download_file_from_url(url: str, filename: Optional[str] = None) -> str: """ Download a file from a URL and save it to a temporary location. Args: url (str): the URL of the file to download. filename (str, optional): the name of the file. If not provided, a random name file will be created. """ try: # Parse URL to get filename if not provided if not filename: path = urlparse(url).path filename = os.path.basename(path) if not filename: filename = f"downloaded_{uuid.uuid4().hex[:8]}" # Create temporary file temp_dir = tempfile.gettempdir() filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(url, stream=True) response.raise_for_status() # Save the file with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"File downloaded to {filepath}. You can read this file to process its contents." except Exception as e: return f"Error downloading file: {str(e)}" @tool def extract_text_from_image(image_path: str) -> str: """ Extract text from an image using OCR library pytesseract (if available). Args: image_path (str): the path to the image file. """ try: # Open the image image = Image.open(image_path) # Extract text from the image text = pytesseract.image_to_string(image) return f"Extracted text from image:\n\n{text}" except Exception as e: return f"Error extracting text from image: {str(e)}" @tool def analyze_csv_file(file_path: str, query: str) -> str: """ Analyze a CSV file using pandas and answer a question about it. Args: file_path (str): the path to the CSV file. query (str): Question about the data """ try: # Read the CSV file df = pd.read_csv(file_path) # Run various analyses based on the query result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" # Add summary statistics result += "Summary statistics:\n" result += str(df.describe()) return result except Exception as e: return f"Error analyzing CSV file: {str(e)}" @tool def analyze_excel_file(file_path: str, query: str) -> str: """ Analyze an Excel file using pandas and answer a question about it. Args: file_path (str): the path to the Excel file. query (str): Question about the data """ try: # Read the Excel file df = pd.read_excel(file_path) # Run various analyses based on the query result = ( f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" ) result += f"Columns: {', '.join(df.columns)}\n\n" # Add summary statistics result += "Summary statistics:\n" result += str(df.describe()) return result except Exception as e: return f"Error analyzing Excel file: {str(e)}" ### ============== IMAGE PROCESSING AND GENERATION TOOLS =============== ### import os import io import base64 import uuid from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter # Helper functions for image processing def encode_image(image_path: str) -> str: """Convert an image file to base64 string.""" with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") def decode_image(base64_string: str) -> Image.Image: """Convert a base64 string to a PIL Image.""" image_data = base64.b64decode(base64_string) return Image.open(io.BytesIO(image_data)) def save_image(image: Image.Image, directory: str = "image_outputs") -> str: """Save a PIL Image to disk and return the path.""" os.makedirs(directory, exist_ok=True) image_id = str(uuid.uuid4()) image_path = os.path.join(directory, f"{image_id}.png") image.save(image_path) return image_path @tool def analyze_image(image_base64: str) -> Dict[str, Any]: """ Analyze basic properties of an image (size, mode, color analysis, thumbnail preview). Args: image_base64 (str): Base64 encoded image string Returns: Dictionary with analysis result """ try: img = decode_image(image_base64) width, height = img.size mode = img.mode if mode in ("RGB", "RGBA"): arr = np.array(img) avg_colors = arr.mean(axis=(0, 1)) dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])] brightness = avg_colors.mean() color_analysis = { "average_rgb": avg_colors.tolist(), "brightness": brightness, "dominant_color": dominant, } else: color_analysis = {"note": f"No color analysis for mode {mode}"} thumbnail = img.copy() thumbnail.thumbnail((100, 100)) thumb_path = save_image(thumbnail, "thumbnails") thumbnail_base64 = encode_image(thumb_path) return { "dimensions": (width, height), "mode": mode, "color_analysis": color_analysis, "thumbnail": thumbnail_base64, } except Exception as e: return {"error": str(e)} @tool def transform_image( image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale. Args: image_base64 (str): Base64 encoded input image operation (str): Transformation operation params (Dict[str, Any], optional): Parameters for the operation Returns: Dictionary with transformed image (base64) """ try: img = decode_image(image_base64) params = params or {} if operation == "resize": img = img.resize( ( params.get("width", img.width // 2), params.get("height", img.height // 2), ) ) elif operation == "rotate": img = img.rotate(params.get("angle", 90), expand=True) elif operation == "crop": img = img.crop( ( params.get("left", 0), params.get("top", 0), params.get("right", img.width), params.get("bottom", img.height), ) ) elif operation == "flip": if params.get("direction", "horizontal") == "horizontal": img = img.transpose(Image.FLIP_LEFT_RIGHT) else: img = img.transpose(Image.FLIP_TOP_BOTTOM) elif operation == "adjust_brightness": img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5)) elif operation == "adjust_contrast": img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5)) elif operation == "blur": img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2))) elif operation == "sharpen": img = img.filter(ImageFilter.SHARPEN) elif operation == "grayscale": img = img.convert("L") else: return {"error": f"Unknown operation: {operation}"} result_path = save_image(img) result_base64 = encode_image(result_path) return {"transformed_image": result_base64} except Exception as e: return {"error": str(e)} @tool def draw_on_image( image_base64: str, drawing_type: str, params: Dict[str, Any] ) -> Dict[str, Any]: """ Draw shapes (rectangle, circle, line) or text onto an image. Args: image_base64 (str): Base64 encoded input image drawing_type (str): Drawing type params (Dict[str, Any]): Drawing parameters Returns: Dictionary with result image (base64) """ try: img = decode_image(image_base64) draw = ImageDraw.Draw(img) color = params.get("color", "red") if drawing_type == "rectangle": draw.rectangle( [params["left"], params["top"], params["right"], params["bottom"]], outline=color, width=params.get("width", 2), ) elif drawing_type == "circle": x, y, r = params["x"], params["y"], params["radius"] draw.ellipse( (x - r, y - r, x + r, y + r), outline=color, width=params.get("width", 2), ) elif drawing_type == "line": draw.line( ( params["start_x"], params["start_y"], params["end_x"], params["end_y"], ), fill=color, width=params.get("width", 2), ) elif drawing_type == "text": font_size = params.get("font_size", 20) try: font = ImageFont.truetype("arial.ttf", font_size) except IOError: font = ImageFont.load_default() draw.text( (params["x"], params["y"]), params.get("text", "Text"), fill=color, font=font, ) else: return {"error": f"Unknown drawing type: {drawing_type}"} result_path = save_image(img) result_base64 = encode_image(result_path) return {"result_image": result_base64} except Exception as e: return {"error": str(e)} @tool def generate_simple_image( image_type: str, width: int = 500, height: int = 500, params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Generate a simple image (gradient, noise, pattern, chart). Args: image_type (str): Type of image width (int), height (int) params (Dict[str, Any], optional): Specific parameters Returns: Dictionary with generated image (base64) """ try: params = params or {} if image_type == "gradient": direction = params.get("direction", "horizontal") start_color = params.get("start_color", (255, 0, 0)) end_color = params.get("end_color", (0, 0, 255)) img = Image.new("RGB", (width, height)) draw = ImageDraw.Draw(img) if direction == "horizontal": for x in range(width): r = int( start_color[0] + (end_color[0] - start_color[0]) * x / width ) g = int( start_color[1] + (end_color[1] - start_color[1]) * x / width ) b = int( start_color[2] + (end_color[2] - start_color[2]) * x / width ) draw.line([(x, 0), (x, height)], fill=(r, g, b)) else: for y in range(height): r = int( start_color[0] + (end_color[0] - start_color[0]) * y / height ) g = int( start_color[1] + (end_color[1] - start_color[1]) * y / height ) b = int( start_color[2] + (end_color[2] - start_color[2]) * y / height ) draw.line([(0, y), (width, y)], fill=(r, g, b)) elif image_type == "noise": noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) img = Image.fromarray(noise_array, "RGB") else: return {"error": f"Unsupported image_type {image_type}"} result_path = save_image(img) result_base64 = encode_image(result_path) return {"generated_image": result_base64} except Exception as e: return {"error": str(e)} @tool def combine_images( images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Combine multiple images (collage, stack, blend). Args: images_base64 (List[str]): List of base64 images operation (str): Combination type params (Dict[str, Any], optional) Returns: Dictionary with combined image (base64) """ try: images = [decode_image(b64) for b64 in images_base64] params = params or {} if operation == "stack": direction = params.get("direction", "horizontal") if direction == "horizontal": total_width = sum(img.width for img in images) max_height = max(img.height for img in images) new_img = Image.new("RGB", (total_width, max_height)) x = 0 for img in images: new_img.paste(img, (x, 0)) x += img.width else: max_width = max(img.width for img in images) total_height = sum(img.height for img in images) new_img = Image.new("RGB", (max_width, total_height)) y = 0 for img in images: new_img.paste(img, (0, y)) y += img.height else: return {"error": f"Unsupported combination operation {operation}"} result_path = save_image(new_img) result_base64 = encode_image(result_path) return {"combined_image": result_base64} except Exception as e: return {"error": str(e)} @tool def download_task_file(task_id: str, api_url: str = "https://agents-course-unit4-scoring.hf.space") -> str: """ Download a file associated with a task from the evaluation API. Args: task_id (str): The task ID to download the file for api_url (str): The base API URL (defaults to the evaluation server) """ try: # Construct the file download URL file_url = f"{api_url}/files/{task_id}" # Create temporary file temp_dir = tempfile.gettempdir() filename = f"task_{task_id}.png" # Most files are images filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(file_url, stream=True) response.raise_for_status() # Save the file with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"Task file downloaded to {filepath}. You can now analyze this file." except Exception as e: return f"Error downloading task file: {str(e)}" tools = [multiply, add, subtract, divide, wikidata_search, tavily_search_tool, load_youtube_transcript, combine_images, analyze_image, transform_image, draw_on_image, generate_simple_image, analyze_csv_file, analyze_excel_file, save_and_read_file, download_file_from_url, extract_text_from_image, download_task_file] def build_graph(): llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY")) llm_with_tools = llm.bind_tools(tools) def agent_node(state: MessagesState) -> MessagesState: """This is the agent node with ReAct methodology""" messages = state["messages"] # Add system prompt if not already present if not messages or not isinstance(messages[0], SystemMessage): messages = [SystemMessage(content=REACT_SYSTEM_PROMPT)] + messages return {"messages": [llm_with_tools.invoke(messages)]} builder = StateGraph(MessagesState) builder.add_node("agent", agent_node) builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", tools_condition) builder.add_edge("tools", "agent") return builder.compile() class LangGraphAgent: def __init__(self): self.graph = build_graph() print("LangGraphAgent initialized with tools.") def __call__(self, question: str) -> str: """Run the agent on a question and return the answer""" try: messages = [HumanMessage(content=question)] result = self.graph.invoke({"messages": messages}) for m in result["messages"]: m.pretty_print() return result["messages"][-1].content except Exception as e: return f"Error: {str(e)}" if __name__ == "__main__": agent = LangGraphAgent() question = "The attached Excel file contains the sales of menu items for a local fast-food chain. What were the total sales that the chain made from food (not including drinks)? Express your answer in USD with two decimal places. task_id: 1234567890" answer = agent(question)