Tomkuijpers2232 commited on
Commit
dceeb79
·
verified ·
1 Parent(s): b12074b

Update agent.py

Browse files
Files changed (1) hide show
  1. agent.py +291 -317
agent.py CHANGED
@@ -1,6 +1,6 @@
1
  import os
2
  from dotenv import load_dotenv
3
- from typing import List, Dict, Any, Optional, Literal
4
  from langgraph.graph import START, StateGraph, MessagesState
5
  from langgraph.graph.message import add_messages
6
  from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage
@@ -10,83 +10,35 @@ from langgraph.prebuilt import tools_condition
10
  from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace
11
  from langchain_core.tools import tool
12
  from langchain_community.document_loaders import WikipediaLoader
13
- from langchain_community.tools import YouTubeSearchTool
14
  from langchain_google_genai import ChatGoogleGenerativeAI
15
  from langchain_tavily import TavilySearch
16
  import tempfile
17
  import pandas as pd
18
- import numpy as np
19
- import requests
20
- from urllib.parse import urlparse
21
- import uuid
22
- from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
23
- import base64
24
- import io
25
 
26
  load_dotenv()
27
 
28
- # ============== SYSTEM PROMPTS FOR SPECIALIZED AGENTS ============== #
29
-
30
- COORDINATOR_SYSTEM_PROMPT = """You are a Coordinator Agent that orchestrates multiple specialized agents to solve complex tasks.
31
-
32
- Your role is to:
33
- 1. Analyze incoming requests and determine which specialized agents are needed
34
- 2. Break down complex tasks into subtasks for different agents
35
- 3. Coordinate between agents when needed
36
- 4. Synthesize final answers from multiple agent responses
37
-
38
- Available specialized agents:
39
- - Research Agent: Wikipedia, web search, YouTube search
40
- - Math Agent: Basic mathematical calculations
41
- - Data Analysis Agent: CSV/Excel analysis, OCR text extraction
42
- - Image Processing Agent: Image analysis, transformation, generation
43
- - File Management Agent: File operations, downloads, saves
44
-
45
- When you receive a task:
46
- 1. THINK: What type of task is this? Which agents do I need?
47
- 2. ROUTE: Send subtasks to appropriate agents
48
- 3. COORDINATE: Manage dependencies between agent tasks
49
- 4. SYNTHESIZE: Combine results into a final answer
50
-
51
- Always provide a clear, comprehensive final answer.
52
- """
53
-
54
- RESEARCH_AGENT_PROMPT = """You are a Research Agent specialized in information gathering and search.
55
-
56
- Your expertise includes:
57
- - Wikipedia searches for encyclopedic information
58
- - Web searches for current information and facts
59
- - YouTube searches for video content
60
 
61
- Follow ReAct methodology:
62
- 1. THINK: What information do I need to find?
63
- 2. ACT: Use appropriate search tools systematically
64
- 3. OBSERVE: Analyze and verify search results
65
- 4. SYNTHESIZE: Provide comprehensive, accurate information
66
 
67
- Be thorough in your research and cross-reference sources when possible.
68
-
69
- Always finish with: FINAL ANSWER: [YOUR FINAL ANSWER]
70
 
71
- Your final answer should be:
72
- - A number (without commas or units unless specified)
73
- - As few words as possible for strings (no articles, no abbreviations for cities, spell out digits)
74
- - A comma-separated list following the above rules for each element
75
- """
76
 
77
- MATH_AGENT_PROMPT = """You are a Math Agent specialized in mathematical calculations and operations.
78
 
79
- Your expertise includes:
80
- - Basic arithmetic operations (add, subtract, multiply, divide)
81
- - Mathematical reasoning and problem-solving
82
 
83
- Follow ReAct methodology:
84
- 1. THINK: What calculations are needed?
85
- 2. ACT: Perform calculations systematically
86
- 3. VERIFY: Double-check your work
87
- 4. PROVIDE: Clear numerical answers
 
88
 
89
- Always show your work and verify calculations.
90
 
91
  Always finish with: FINAL ANSWER: [YOUR FINAL ANSWER]
92
 
@@ -94,87 +46,46 @@ Your final answer should be:
94
  - A number (without commas or units unless specified)
95
  - As few words as possible for strings (no articles, no abbreviations for cities, spell out digits)
96
  - A comma-separated list following the above rules for each element
97
- """
98
-
99
- DATA_ANALYSIS_AGENT_PROMPT = """You are a Data Analysis Agent specialized in processing and analyzing structured data.
100
-
101
- Your expertise includes:
102
- - CSV file analysis and statistics
103
- - Excel file processing
104
- - OCR text extraction from images
105
- - Data interpretation and insights
106
-
107
- Follow ReAct methodology:
108
- 1. THINK: What type of data analysis is needed?
109
- 2. ACT: Use appropriate analysis tools
110
- 3. OBSERVE: Examine data patterns and statistics
111
- 4. INTERPRET: Provide meaningful insights
112
-
113
- Focus on accuracy and provide clear data-driven insights.
114
- """
115
-
116
- IMAGE_PROCESSING_AGENT_PROMPT = """You are an Image Processing Agent specialized in image analysis, manipulation, and generation.
117
-
118
- Your expertise includes:
119
- - Image analysis (properties, colors, content)
120
- - Image transformations (resize, rotate, crop, filters)
121
- - Drawing and annotation on images
122
- - Simple image generation
123
- - Combining multiple images
124
 
125
- Follow ReAct methodology:
126
- 1. THINK: What image processing is required?
127
- 2. ACT: Apply appropriate image operations
128
- 3. OBSERVE: Verify results and quality
129
- 4. DELIVER: Provide processed images with explanations
130
-
131
- Focus on quality and user requirements.
132
  """
133
 
134
- FILE_MANAGEMENT_AGENT_PROMPT = """You are a File Management Agent specialized in file operations and data handling.
135
-
136
- Your expertise includes:
137
- - Saving and reading files
138
- - Downloading files from URLs
139
- - Downloading task files from APIs
140
- - File format handling
141
-
142
- Follow ReAct methodology:
143
- 1. THINK: What file operations are needed?
144
- 2. ACT: Perform file operations safely
145
- 3. VERIFY: Confirm successful operations
146
- 4. REPORT: Provide clear status and file paths
147
-
148
- Ensure secure and reliable file handling.
149
- """
150
-
151
- # ============== TOOL DEFINITIONS (grouped by agent) ============== #
152
-
153
- # Math Agent Tools
154
  @tool
155
- def multiply(a: int, b: int) -> int:
156
- """Multiply two numbers"""
 
 
157
  return a * b
158
 
159
  @tool
160
- def add(a: int, b: int) -> int:
161
- """Add two numbers"""
 
 
162
  return a + b
163
 
164
  @tool
165
- def subtract(a: int, b: int) -> int:
166
- """Subtract two numbers"""
 
 
167
  return a - b
168
 
169
  @tool
170
- def divide(a: int, b: int) -> float:
171
- """Divide two numbers"""
 
 
172
  return a / b
173
 
174
- # Research Agent Tools
175
  @tool
176
  def wikidata_search(query: str) -> str:
177
- """Search for information on Wikipedia and return maximum 2 results."""
 
 
 
 
 
178
  loader = WikipediaLoader(query=query, load_max_docs=2)
179
  docs = loader.load()
180
  formatted_search_docs = "\n\n---\n\n".join(
@@ -184,14 +95,49 @@ def wikidata_search(query: str) -> str:
184
  ])
185
  return {"wiki_results": formatted_search_docs}
186
 
187
- # Initialize search tools
188
- tavily_search_tool = TavilySearch(max_results=3, topic="general")
189
- youtube_search_tool = YouTubeSearchTool()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
- # File Management Agent Tools
192
  @tool
193
  def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
194
- """Save content to a file and return the path."""
 
 
 
 
 
195
  temp_dir = tempfile.gettempdir()
196
  if filename is None:
197
  temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
@@ -204,22 +150,32 @@ def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
204
 
205
  return f"File saved to {filepath}. You can read this file to process its contents."
206
 
 
207
  @tool
208
  def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
209
- """Download a file from a URL and save it to a temporary location."""
 
 
 
 
 
210
  try:
 
211
  if not filename:
212
  path = urlparse(url).path
213
  filename = os.path.basename(path)
214
  if not filename:
215
  filename = f"downloaded_{uuid.uuid4().hex[:8]}"
216
 
 
217
  temp_dir = tempfile.gettempdir()
218
  filepath = os.path.join(temp_dir, filename)
219
 
 
220
  response = requests.get(url, stream=True)
221
  response.raise_for_status()
222
 
 
223
  with open(filepath, "wb") as f:
224
  for chunk in response.iter_content(chunk_size=8192):
225
  f.write(chunk)
@@ -228,75 +184,100 @@ def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
228
  except Exception as e:
229
  return f"Error downloading file: {str(e)}"
230
 
231
- @tool
232
- def download_task_file(task_id: str, api_url: str = "https://agents-course-unit4-scoring.hf.space") -> str:
233
- """Download a file associated with a task from the evaluation API."""
234
- try:
235
- file_url = f"{api_url}/files/{task_id}"
236
- temp_dir = tempfile.gettempdir()
237
- filename = f"task_{task_id}.png"
238
- filepath = os.path.join(temp_dir, filename)
239
-
240
- response = requests.get(file_url, stream=True)
241
- response.raise_for_status()
242
-
243
- with open(filepath, "wb") as f:
244
- for chunk in response.iter_content(chunk_size=8192):
245
- f.write(chunk)
246
-
247
- return f"Task file downloaded to {filepath}. You can now analyze this file."
248
- except Exception as e:
249
- return f"Error downloading task file: {str(e)}"
250
 
251
- # Data Analysis Agent Tools
252
  @tool
253
  def extract_text_from_image(image_path: str) -> str:
254
- """Extract text from an image using OCR."""
 
 
 
 
255
  try:
256
- import pytesseract
257
  image = Image.open(image_path)
 
 
258
  text = pytesseract.image_to_string(image)
 
259
  return f"Extracted text from image:\n\n{text}"
260
  except Exception as e:
261
  return f"Error extracting text from image: {str(e)}"
262
 
 
263
  @tool
264
  def analyze_csv_file(file_path: str, query: str) -> str:
265
- """Analyze a CSV file using pandas and answer a question about it."""
 
 
 
 
 
266
  try:
 
267
  df = pd.read_csv(file_path)
 
 
268
  result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
269
  result += f"Columns: {', '.join(df.columns)}\n\n"
 
 
270
  result += "Summary statistics:\n"
271
  result += str(df.describe())
 
272
  return result
 
273
  except Exception as e:
274
  return f"Error analyzing CSV file: {str(e)}"
275
 
 
276
  @tool
277
  def analyze_excel_file(file_path: str, query: str) -> str:
278
- """Analyze an Excel file using pandas and answer a question about it."""
 
 
 
 
 
279
  try:
 
280
  df = pd.read_excel(file_path)
281
- result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
 
 
 
 
282
  result += f"Columns: {', '.join(df.columns)}\n\n"
 
 
283
  result += "Summary statistics:\n"
284
  result += str(df.describe())
 
285
  return result
 
286
  except Exception as e:
287
  return f"Error analyzing Excel file: {str(e)}"
288
 
289
- # Image Processing Agent Tools - Helper functions
 
 
 
 
 
 
 
 
290
  def encode_image(image_path: str) -> str:
291
  """Convert an image file to base64 string."""
292
  with open(image_path, "rb") as image_file:
293
  return base64.b64encode(image_file.read()).decode("utf-8")
294
 
 
295
  def decode_image(base64_string: str) -> Image.Image:
296
  """Convert a base64 string to a PIL Image."""
297
  image_data = base64.b64decode(base64_string)
298
  return Image.open(io.BytesIO(image_data))
299
 
 
300
  def save_image(image: Image.Image, directory: str = "image_outputs") -> str:
301
  """Save a PIL Image to disk and return the path."""
302
  os.makedirs(directory, exist_ok=True)
@@ -307,7 +288,13 @@ def save_image(image: Image.Image, directory: str = "image_outputs") -> str:
307
 
308
  @tool
309
  def analyze_image(image_base64: str) -> Dict[str, Any]:
310
- """Analyze basic properties of an image."""
 
 
 
 
 
 
311
  try:
312
  img = decode_image(image_base64)
313
  width, height = img.size
@@ -340,29 +327,42 @@ def analyze_image(image_base64: str) -> Dict[str, Any]:
340
  except Exception as e:
341
  return {"error": str(e)}
342
 
 
343
  @tool
344
  def transform_image(
345
  image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None
346
  ) -> Dict[str, Any]:
347
- """Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale."""
 
 
 
 
 
 
 
 
348
  try:
349
  img = decode_image(image_base64)
350
  params = params or {}
351
 
352
  if operation == "resize":
353
- img = img.resize((
354
- params.get("width", img.width // 2),
355
- params.get("height", img.height // 2),
356
- ))
 
 
357
  elif operation == "rotate":
358
  img = img.rotate(params.get("angle", 90), expand=True)
359
  elif operation == "crop":
360
- img = img.crop((
361
- params.get("left", 0),
362
- params.get("top", 0),
363
- params.get("right", img.width),
364
- params.get("bottom", img.height),
365
- ))
 
 
366
  elif operation == "flip":
367
  if params.get("direction", "horizontal") == "horizontal":
368
  img = img.transpose(Image.FLIP_LEFT_RIGHT)
@@ -388,11 +388,20 @@ def transform_image(
388
  except Exception as e:
389
  return {"error": str(e)}
390
 
 
391
  @tool
392
  def draw_on_image(
393
  image_base64: str, drawing_type: str, params: Dict[str, Any]
394
  ) -> Dict[str, Any]:
395
- """Draw shapes (rectangle, circle, line) or text onto an image."""
 
 
 
 
 
 
 
 
396
  try:
397
  img = decode_image(image_base64)
398
  draw = ImageDraw.Draw(img)
@@ -412,12 +421,16 @@ def draw_on_image(
412
  width=params.get("width", 2),
413
  )
414
  elif drawing_type == "line":
415
- draw.line((
416
- params["start_x"],
417
- params["start_y"],
418
- params["end_x"],
419
- params["end_y"],
420
- ), fill=color, width=params.get("width", 2))
 
 
 
 
421
  elif drawing_type == "text":
422
  font_size = params.get("font_size", 20)
423
  try:
@@ -440,6 +453,7 @@ def draw_on_image(
440
  except Exception as e:
441
  return {"error": str(e)}
442
 
 
443
  @tool
444
  def generate_simple_image(
445
  image_type: str,
@@ -447,7 +461,15 @@ def generate_simple_image(
447
  height: int = 500,
448
  params: Optional[Dict[str, Any]] = None,
449
  ) -> Dict[str, Any]:
450
- """Generate a simple image (gradient, noise, pattern, chart)."""
 
 
 
 
 
 
 
 
451
  try:
452
  params = params or {}
453
 
@@ -461,20 +483,33 @@ def generate_simple_image(
461
 
462
  if direction == "horizontal":
463
  for x in range(width):
464
- r = int(start_color[0] + (end_color[0] - start_color[0]) * x / width)
465
- g = int(start_color[1] + (end_color[1] - start_color[1]) * x / width)
466
- b = int(start_color[2] + (end_color[2] - start_color[2]) * x / width)
 
 
 
 
 
 
467
  draw.line([(x, 0), (x, height)], fill=(r, g, b))
468
  else:
469
  for y in range(height):
470
- r = int(start_color[0] + (end_color[0] - start_color[0]) * y / height)
471
- g = int(start_color[1] + (end_color[1] - start_color[1]) * y / height)
472
- b = int(start_color[2] + (end_color[2] - start_color[2]) * y / height)
 
 
 
 
 
 
473
  draw.line([(0, y), (width, y)], fill=(r, g, b))
474
 
475
  elif image_type == "noise":
476
  noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
477
  img = Image.fromarray(noise_array, "RGB")
 
478
  else:
479
  return {"error": f"Unsupported image_type {image_type}"}
480
 
@@ -485,11 +520,20 @@ def generate_simple_image(
485
  except Exception as e:
486
  return {"error": str(e)}
487
 
 
488
  @tool
489
  def combine_images(
490
  images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None
491
  ) -> Dict[str, Any]:
492
- """Combine multiple images (collage, stack, blend)."""
 
 
 
 
 
 
 
 
493
  try:
494
  images = [decode_image(b64) for b64 in images_base64]
495
  params = params or {}
@@ -522,157 +566,87 @@ def combine_images(
522
  except Exception as e:
523
  return {"error": str(e)}
524
 
525
- # ============== SPECIALIZED AGENT CLASSES ============== #
526
-
527
- class SpecializedAgent:
528
- """Base class for specialized agents"""
529
- def __init__(self, name: str, system_prompt: str, tools: List):
530
- self.name = name
531
- self.system_prompt = system_prompt
532
- self.tools = tools
533
- self.llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY"))
534
- self.llm_with_tools = self.llm.bind_tools(tools)
535
- self.graph = self._build_graph()
 
536
 
537
- def _build_graph(self):
538
- def agent_node(state: MessagesState) -> MessagesState:
539
- messages = state["messages"]
540
- if not messages or not isinstance(messages[0], SystemMessage):
541
- messages = [SystemMessage(content=self.system_prompt)] + messages
542
- return {"messages": [self.llm_with_tools.invoke(messages)]}
543
 
544
- builder = StateGraph(MessagesState)
545
- builder.add_node("agent", agent_node)
546
- builder.add_node("tools", ToolNode(self.tools))
547
 
548
- builder.add_edge(START, "agent")
549
- builder.add_conditional_edges("agent", tools_condition)
550
- builder.add_edge("tools", "agent")
 
551
 
552
- return builder.compile()
553
-
554
- def __call__(self, question: str) -> str:
555
- try:
556
- messages = [HumanMessage(content=question)]
557
- result = self.graph.invoke({"messages": messages})
558
- return result["messages"][-1].content
559
- except Exception as e:
560
- return f"Error in {self.name}: {str(e)}"
561
 
562
- # Agent tool groupings
563
- RESEARCH_TOOLS = [wikidata_search, tavily_search_tool, youtube_search_tool]
564
- MATH_TOOLS = [multiply, add, subtract, divide]
565
- DATA_ANALYSIS_TOOLS = [analyze_csv_file, analyze_excel_file, extract_text_from_image]
566
- IMAGE_PROCESSING_TOOLS = [analyze_image, transform_image, draw_on_image, generate_simple_image, combine_images]
567
- FILE_MANAGEMENT_TOOLS = [save_and_read_file, download_file_from_url, download_task_file]
568
 
569
- # ============== MULTI-AGENT SYSTEM ============== #
570
 
571
- class MultiAgentSystem:
572
- def __init__(self):
573
- # Initialize specialized agents
574
- self.research_agent = SpecializedAgent("Research Agent", RESEARCH_AGENT_PROMPT, RESEARCH_TOOLS)
575
- self.math_agent = SpecializedAgent("Math Agent", MATH_AGENT_PROMPT, MATH_TOOLS)
576
- self.data_agent = SpecializedAgent("Data Analysis Agent", DATA_ANALYSIS_AGENT_PROMPT, DATA_ANALYSIS_TOOLS)
577
- self.image_agent = SpecializedAgent("Image Processing Agent", IMAGE_PROCESSING_AGENT_PROMPT, IMAGE_PROCESSING_TOOLS)
578
- self.file_agent = SpecializedAgent("File Management Agent", FILE_MANAGEMENT_AGENT_PROMPT, FILE_MANAGEMENT_TOOLS)
579
-
580
- # Coordinator LLM
581
- self.coordinator_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY"))
582
 
583
- print("Multi-Agent System initialized with 5 specialized agents.")
 
 
 
 
584
 
585
- def _classify_task(self, question: str) -> Dict[str, Any]:
586
- """Use the coordinator to classify the task and determine which agents to use"""
587
- classification_prompt = f"""
588
- As a task classifier, analyze this question and determine which specialized agents are needed:
589
-
590
- Question: {question}
591
-
592
- Available agents:
593
- - research: For Wikipedia, web search, YouTube search
594
- - math: For mathematical calculations
595
- - data_analysis: For CSV/Excel analysis, OCR
596
- - image_processing: For image analysis, manipulation, generation
597
- - file_management: For file operations, downloads
598
-
599
- Respond with a JSON object containing:
600
- {{
601
- "primary_agent": "agent_name",
602
- "supporting_agents": ["agent1", "agent2"],
603
- "task_breakdown": "explanation of how to approach this task",
604
- "requires_coordination": true/false
605
- }}
606
- """
607
-
608
- response = self.coordinator_llm.invoke([HumanMessage(content=classification_prompt)])
609
-
610
- # Simple classification logic as fallback
611
- question_lower = question.lower()
612
-
613
- classification = {
614
- "primary_agent": "research",
615
- "supporting_agents": [],
616
- "task_breakdown": "Research-based question",
617
- "requires_coordination": False
618
- }
619
-
620
- # Determine primary agent based on keywords
621
- if any(word in question_lower for word in ['calculate', 'multiply', 'add', 'subtract', 'divide', 'math']):
622
- classification["primary_agent"] = "math"
623
- elif any(word in question_lower for word in ['csv', 'excel', 'data', 'analyze data', 'spreadsheet']):
624
- classification["primary_agent"] = "data_analysis"
625
- elif any(word in question_lower for word in ['image', 'photo', 'picture', 'draw', 'generate image']):
626
- classification["primary_agent"] = "image_processing"
627
- elif any(word in question_lower for word in ['download', 'file', 'save']):
628
- classification["primary_agent"] = "file_management"
629
-
630
- return classification
631
 
632
- def __call__(self, question: str) -> str:
633
- """Route the question to appropriate agents and coordinate the response"""
634
- try:
635
- # Classify the task
636
- classification = self._classify_task(question)
637
- primary_agent = classification["primary_agent"]
638
-
639
- # Route to primary agent
640
- if primary_agent == "research":
641
- response = self.research_agent(question)
642
- elif primary_agent == "math":
643
- response = self.math_agent(question)
644
- elif primary_agent == "data_analysis":
645
- response = self.data_agent(question)
646
- elif primary_agent == "image_processing":
647
- response = self.image_agent(question)
648
- elif primary_agent == "file_management":
649
- response = self.file_agent(question)
650
- else:
651
- response = self.research_agent(question) # Default fallback
652
-
653
- # For now, return the primary agent's response
654
- # In a more sophisticated system, we would coordinate between multiple agents
655
- return response
656
-
657
- except Exception as e:
658
- return f"Error in Multi-Agent System: {str(e)}"
659
 
660
- # ============== MAIN AGENT CLASS (for backward compatibility) ============== #
 
 
 
 
 
 
 
 
 
661
 
662
  class LangGraphAgent:
663
  def __init__(self):
664
- self.multi_agent_system = MultiAgentSystem()
665
- print("LangGraphAgent initialized with Multi-Agent System.")
666
 
667
  def __call__(self, question: str) -> str:
668
- """Run the multi-agent system on a question and return the answer"""
669
- return self.multi_agent_system(question)
 
 
 
 
 
 
 
670
 
671
  if __name__ == "__main__":
672
  agent = LangGraphAgent()
673
  question = "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia."
674
  answer = agent(question)
675
- print(f"\nFinal Answer: {answer}")
676
 
677
 
678
 
 
1
  import os
2
  from dotenv import load_dotenv
3
+ from typing import List, Dict, Any, Optional
4
  from langgraph.graph import START, StateGraph, MessagesState
5
  from langgraph.graph.message import add_messages
6
  from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage
 
10
  from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace
11
  from langchain_core.tools import tool
12
  from langchain_community.document_loaders import WikipediaLoader
13
+ from langchain_community.document_loaders import YoutubeLoader
14
  from langchain_google_genai import ChatGoogleGenerativeAI
15
  from langchain_tavily import TavilySearch
16
  import tempfile
17
  import pandas as pd
 
 
 
 
 
 
 
18
 
19
  load_dotenv()
20
 
21
+ # ReAct System Prompt
22
+ REACT_SYSTEM_PROMPT = """You are a research assistant that uses ReAct (Reasoning + Acting) methodology. For each question, follow this systematic approach:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
+ **THINK**: First, analyze the question carefully. What type of information do you need? What tools might help?
 
 
 
 
25
 
26
+ **ACT**: Use available tools to gather information. Search thoroughly and verify facts from multiple sources when possible.
 
 
27
 
28
+ **OBSERVE**: Analyze the results from your tools. Are they complete and reliable? Do you need more information?
 
 
 
 
29
 
30
+ **REASON**: Synthesize all information gathered. Check for consistency and identify any gaps or uncertainties.
31
 
32
+ **VERIFY**: Before providing your final answer, double-check your reasoning and ensure you have sufficient evidence.
 
 
33
 
34
+ For each question:
35
+ 1. Break down what you're looking for
36
+ 2. Use tools systematically to gather comprehensive information
37
+ 3. Cross-reference information when possible
38
+ 4. Be honest about limitations - if you cannot find reliable information, say so
39
+ 5. Only provide confident answers when you have verified evidence
40
 
41
+ When you cannot access certain content (videos, audio, images without tools), clearly state this limitation.
42
 
43
  Always finish with: FINAL ANSWER: [YOUR FINAL ANSWER]
44
 
 
46
  - A number (without commas or units unless specified)
47
  - As few words as possible for strings (no articles, no abbreviations for cities, spell out digits)
48
  - A comma-separated list following the above rules for each element
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
+ Be thorough in your research but honest about uncertainty. Quality and accuracy are more important than speed.
 
 
 
 
 
 
51
  """
52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  @tool
54
+ def multiply(a:int, b:int) -> int:
55
+ """
56
+ Multiply two numbers
57
+ """
58
  return a * b
59
 
60
  @tool
61
+ def add(a:int, b:int) -> int:
62
+ """
63
+ Add two numbers
64
+ """
65
  return a + b
66
 
67
  @tool
68
+ def subtract(a:int, b:int) -> int:
69
+ """
70
+ Subtract two numbers
71
+ """
72
  return a - b
73
 
74
  @tool
75
+ def divide(a:int, b:int) -> int:
76
+ """
77
+ Divide two numbers
78
+ """
79
  return a / b
80
 
 
81
  @tool
82
  def wikidata_search(query: str) -> str:
83
+ """
84
+ Search for information on Wikipedia and return maximum 2 results.
85
+
86
+ Args:
87
+ query: The search query.
88
+ """
89
  loader = WikipediaLoader(query=query, load_max_docs=2)
90
  docs = loader.load()
91
  formatted_search_docs = "\n\n---\n\n".join(
 
95
  ])
96
  return {"wiki_results": formatted_search_docs}
97
 
98
+ # Initialize Tavily Search Tool
99
+ tavily_search_tool = TavilySearch(
100
+ max_results=3,
101
+ topic="general",
102
+ )
103
+
104
+ @tool
105
+ def load_youtube_transcript(url: str, add_video_info: bool = True, language: List[str] = ["en"], translation: str = "en") -> str:
106
+ """
107
+ Load transcript from a YouTube video URL.
108
+
109
+ Args:
110
+ url: YouTube video URL
111
+ add_video_info: Whether to include video metadata
112
+ language: List of language codes in descending priority
113
+ translation: Language to translate transcript to
114
+ """
115
+ try:
116
+ loader = YoutubeLoader.from_youtube_url(
117
+ url,
118
+ add_video_info=add_video_info,
119
+ language=language,
120
+ translation=translation
121
+ )
122
+ docs = loader.load()
123
+
124
+ formatted_transcript = "\n\n---\n\n".join([
125
+ f'<Document source="{doc.metadata.get("source", "")}" title="{doc.metadata.get("title", "")}" author="{doc.metadata.get("author", "")}" length="{doc.metadata.get("length", "")}"/>\n{doc.page_content}\n</Document>'
126
+ for doc in docs
127
+ ])
128
+
129
+ return {"youtube_transcript": formatted_transcript}
130
+ except Exception as e:
131
+ return f"Error loading YouTube transcript: {str(e)}"
132
 
 
133
  @tool
134
  def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
135
+ """
136
+ Save content to a file and return the path.
137
+ Args:
138
+ content (str): the content to save to the file
139
+ filename (str, optional): the name of the file. If not provided, a random name file will be created.
140
+ """
141
  temp_dir = tempfile.gettempdir()
142
  if filename is None:
143
  temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
 
150
 
151
  return f"File saved to {filepath}. You can read this file to process its contents."
152
 
153
+
154
  @tool
155
  def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
156
+ """
157
+ Download a file from a URL and save it to a temporary location.
158
+ Args:
159
+ url (str): the URL of the file to download.
160
+ filename (str, optional): the name of the file. If not provided, a random name file will be created.
161
+ """
162
  try:
163
+ # Parse URL to get filename if not provided
164
  if not filename:
165
  path = urlparse(url).path
166
  filename = os.path.basename(path)
167
  if not filename:
168
  filename = f"downloaded_{uuid.uuid4().hex[:8]}"
169
 
170
+ # Create temporary file
171
  temp_dir = tempfile.gettempdir()
172
  filepath = os.path.join(temp_dir, filename)
173
 
174
+ # Download the file
175
  response = requests.get(url, stream=True)
176
  response.raise_for_status()
177
 
178
+ # Save the file
179
  with open(filepath, "wb") as f:
180
  for chunk in response.iter_content(chunk_size=8192):
181
  f.write(chunk)
 
184
  except Exception as e:
185
  return f"Error downloading file: {str(e)}"
186
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187
 
 
188
  @tool
189
  def extract_text_from_image(image_path: str) -> str:
190
+ """
191
+ Extract text from an image using OCR library pytesseract (if available).
192
+ Args:
193
+ image_path (str): the path to the image file.
194
+ """
195
  try:
196
+ # Open the image
197
  image = Image.open(image_path)
198
+
199
+ # Extract text from the image
200
  text = pytesseract.image_to_string(image)
201
+
202
  return f"Extracted text from image:\n\n{text}"
203
  except Exception as e:
204
  return f"Error extracting text from image: {str(e)}"
205
 
206
+
207
  @tool
208
  def analyze_csv_file(file_path: str, query: str) -> str:
209
+ """
210
+ Analyze a CSV file using pandas and answer a question about it.
211
+ Args:
212
+ file_path (str): the path to the CSV file.
213
+ query (str): Question about the data
214
+ """
215
  try:
216
+ # Read the CSV file
217
  df = pd.read_csv(file_path)
218
+
219
+ # Run various analyses based on the query
220
  result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
221
  result += f"Columns: {', '.join(df.columns)}\n\n"
222
+
223
+ # Add summary statistics
224
  result += "Summary statistics:\n"
225
  result += str(df.describe())
226
+
227
  return result
228
+
229
  except Exception as e:
230
  return f"Error analyzing CSV file: {str(e)}"
231
 
232
+
233
  @tool
234
  def analyze_excel_file(file_path: str, query: str) -> str:
235
+ """
236
+ Analyze an Excel file using pandas and answer a question about it.
237
+ Args:
238
+ file_path (str): the path to the Excel file.
239
+ query (str): Question about the data
240
+ """
241
  try:
242
+ # Read the Excel file
243
  df = pd.read_excel(file_path)
244
+
245
+ # Run various analyses based on the query
246
+ result = (
247
+ f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
248
+ )
249
  result += f"Columns: {', '.join(df.columns)}\n\n"
250
+
251
+ # Add summary statistics
252
  result += "Summary statistics:\n"
253
  result += str(df.describe())
254
+
255
  return result
256
+
257
  except Exception as e:
258
  return f"Error analyzing Excel file: {str(e)}"
259
 
260
+
261
+ ### ============== IMAGE PROCESSING AND GENERATION TOOLS =============== ###
262
+ import os
263
+ import io
264
+ import base64
265
+ import uuid
266
+ from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
267
+
268
+ # Helper functions for image processing
269
  def encode_image(image_path: str) -> str:
270
  """Convert an image file to base64 string."""
271
  with open(image_path, "rb") as image_file:
272
  return base64.b64encode(image_file.read()).decode("utf-8")
273
 
274
+
275
  def decode_image(base64_string: str) -> Image.Image:
276
  """Convert a base64 string to a PIL Image."""
277
  image_data = base64.b64decode(base64_string)
278
  return Image.open(io.BytesIO(image_data))
279
 
280
+
281
  def save_image(image: Image.Image, directory: str = "image_outputs") -> str:
282
  """Save a PIL Image to disk and return the path."""
283
  os.makedirs(directory, exist_ok=True)
 
288
 
289
  @tool
290
  def analyze_image(image_base64: str) -> Dict[str, Any]:
291
+ """
292
+ Analyze basic properties of an image (size, mode, color analysis, thumbnail preview).
293
+ Args:
294
+ image_base64 (str): Base64 encoded image string
295
+ Returns:
296
+ Dictionary with analysis result
297
+ """
298
  try:
299
  img = decode_image(image_base64)
300
  width, height = img.size
 
327
  except Exception as e:
328
  return {"error": str(e)}
329
 
330
+
331
  @tool
332
  def transform_image(
333
  image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None
334
  ) -> Dict[str, Any]:
335
+ """
336
+ Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale.
337
+ Args:
338
+ image_base64 (str): Base64 encoded input image
339
+ operation (str): Transformation operation
340
+ params (Dict[str, Any], optional): Parameters for the operation
341
+ Returns:
342
+ Dictionary with transformed image (base64)
343
+ """
344
  try:
345
  img = decode_image(image_base64)
346
  params = params or {}
347
 
348
  if operation == "resize":
349
+ img = img.resize(
350
+ (
351
+ params.get("width", img.width // 2),
352
+ params.get("height", img.height // 2),
353
+ )
354
+ )
355
  elif operation == "rotate":
356
  img = img.rotate(params.get("angle", 90), expand=True)
357
  elif operation == "crop":
358
+ img = img.crop(
359
+ (
360
+ params.get("left", 0),
361
+ params.get("top", 0),
362
+ params.get("right", img.width),
363
+ params.get("bottom", img.height),
364
+ )
365
+ )
366
  elif operation == "flip":
367
  if params.get("direction", "horizontal") == "horizontal":
368
  img = img.transpose(Image.FLIP_LEFT_RIGHT)
 
388
  except Exception as e:
389
  return {"error": str(e)}
390
 
391
+
392
  @tool
393
  def draw_on_image(
394
  image_base64: str, drawing_type: str, params: Dict[str, Any]
395
  ) -> Dict[str, Any]:
396
+ """
397
+ Draw shapes (rectangle, circle, line) or text onto an image.
398
+ Args:
399
+ image_base64 (str): Base64 encoded input image
400
+ drawing_type (str): Drawing type
401
+ params (Dict[str, Any]): Drawing parameters
402
+ Returns:
403
+ Dictionary with result image (base64)
404
+ """
405
  try:
406
  img = decode_image(image_base64)
407
  draw = ImageDraw.Draw(img)
 
421
  width=params.get("width", 2),
422
  )
423
  elif drawing_type == "line":
424
+ draw.line(
425
+ (
426
+ params["start_x"],
427
+ params["start_y"],
428
+ params["end_x"],
429
+ params["end_y"],
430
+ ),
431
+ fill=color,
432
+ width=params.get("width", 2),
433
+ )
434
  elif drawing_type == "text":
435
  font_size = params.get("font_size", 20)
436
  try:
 
453
  except Exception as e:
454
  return {"error": str(e)}
455
 
456
+
457
  @tool
458
  def generate_simple_image(
459
  image_type: str,
 
461
  height: int = 500,
462
  params: Optional[Dict[str, Any]] = None,
463
  ) -> Dict[str, Any]:
464
+ """
465
+ Generate a simple image (gradient, noise, pattern, chart).
466
+ Args:
467
+ image_type (str): Type of image
468
+ width (int), height (int)
469
+ params (Dict[str, Any], optional): Specific parameters
470
+ Returns:
471
+ Dictionary with generated image (base64)
472
+ """
473
  try:
474
  params = params or {}
475
 
 
483
 
484
  if direction == "horizontal":
485
  for x in range(width):
486
+ r = int(
487
+ start_color[0] + (end_color[0] - start_color[0]) * x / width
488
+ )
489
+ g = int(
490
+ start_color[1] + (end_color[1] - start_color[1]) * x / width
491
+ )
492
+ b = int(
493
+ start_color[2] + (end_color[2] - start_color[2]) * x / width
494
+ )
495
  draw.line([(x, 0), (x, height)], fill=(r, g, b))
496
  else:
497
  for y in range(height):
498
+ r = int(
499
+ start_color[0] + (end_color[0] - start_color[0]) * y / height
500
+ )
501
+ g = int(
502
+ start_color[1] + (end_color[1] - start_color[1]) * y / height
503
+ )
504
+ b = int(
505
+ start_color[2] + (end_color[2] - start_color[2]) * y / height
506
+ )
507
  draw.line([(0, y), (width, y)], fill=(r, g, b))
508
 
509
  elif image_type == "noise":
510
  noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
511
  img = Image.fromarray(noise_array, "RGB")
512
+
513
  else:
514
  return {"error": f"Unsupported image_type {image_type}"}
515
 
 
520
  except Exception as e:
521
  return {"error": str(e)}
522
 
523
+
524
  @tool
525
  def combine_images(
526
  images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None
527
  ) -> Dict[str, Any]:
528
+ """
529
+ Combine multiple images (collage, stack, blend).
530
+ Args:
531
+ images_base64 (List[str]): List of base64 images
532
+ operation (str): Combination type
533
+ params (Dict[str, Any], optional)
534
+ Returns:
535
+ Dictionary with combined image (base64)
536
+ """
537
  try:
538
  images = [decode_image(b64) for b64 in images_base64]
539
  params = params or {}
 
566
  except Exception as e:
567
  return {"error": str(e)}
568
 
569
+
570
+ @tool
571
+ def download_task_file(task_id: str, api_url: str = "https://agents-course-unit4-scoring.hf.space") -> str:
572
+ """
573
+ Download a file associated with a task from the evaluation API.
574
+ Args:
575
+ task_id (str): The task ID to download the file for
576
+ api_url (str): The base API URL (defaults to the evaluation server)
577
+ """
578
+ try:
579
+ # Construct the file download URL
580
+ file_url = f"{api_url}/files/{task_id}"
581
 
582
+ # Create temporary file
583
+ temp_dir = tempfile.gettempdir()
584
+ filename = f"task_{task_id}.png" # Most files are images
585
+ filepath = os.path.join(temp_dir, filename)
 
 
586
 
587
+ # Download the file
588
+ response = requests.get(file_url, stream=True)
589
+ response.raise_for_status()
590
 
591
+ # Save the file
592
+ with open(filepath, "wb") as f:
593
+ for chunk in response.iter_content(chunk_size=8192):
594
+ f.write(chunk)
595
 
596
+ return f"Task file downloaded to {filepath}. You can now analyze this file."
597
+ except Exception as e:
598
+ return f"Error downloading task file: {str(e)}"
 
 
 
 
 
 
599
 
 
 
 
 
 
 
600
 
601
+ tools = [multiply, add, subtract, divide, wikidata_search, tavily_search_tool, load_youtube_transcript, combine_images, analyze_image, transform_image, draw_on_image, generate_simple_image, analyze_csv_file, analyze_excel_file, save_and_read_file, download_file_from_url, extract_text_from_image, download_task_file]
602
 
603
+ def build_graph():
604
+ llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY"))
605
+ llm_with_tools = llm.bind_tools(tools)
606
+
607
+ def agent_node(state: MessagesState) -> MessagesState:
608
+ """This is the agent node with ReAct methodology"""
609
+ messages = state["messages"]
 
 
 
 
610
 
611
+ # Add system prompt if not already present
612
+ if not messages or not isinstance(messages[0], SystemMessage):
613
+ messages = [SystemMessage(content=REACT_SYSTEM_PROMPT)] + messages
614
+
615
+ return {"messages": [llm_with_tools.invoke(messages)]}
616
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
617
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
618
 
619
+ builder = StateGraph(MessagesState)
620
+ builder.add_node("agent", agent_node)
621
+ builder.add_node("tools", ToolNode(tools))
622
+
623
+
624
+ builder.add_edge(START, "agent")
625
+ builder.add_conditional_edges("agent", tools_condition)
626
+ builder.add_edge("tools", "agent")
627
+
628
+ return builder.compile()
629
 
630
  class LangGraphAgent:
631
  def __init__(self):
632
+ self.graph = build_graph()
633
+ print("LangGraphAgent initialized with tools.")
634
 
635
  def __call__(self, question: str) -> str:
636
+ """Run the agent on a question and return the answer"""
637
+ try:
638
+ messages = [HumanMessage(content=question)]
639
+ result = self.graph.invoke({"messages": messages})
640
+ for m in result["messages"]:
641
+ m.pretty_print()
642
+ return result["messages"][-1].content
643
+ except Exception as e:
644
+ return f"Error: {str(e)}"
645
 
646
  if __name__ == "__main__":
647
  agent = LangGraphAgent()
648
  question = "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia."
649
  answer = agent(question)
 
650
 
651
 
652