File size: 26,156 Bytes
48ac4f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 | from langchain_core.tools import tool
import os
from dotenv import load_dotenv
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_experimental.tools import PythonREPLTool
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
import requests
import base64
import tempfile
import pypdf
import pandas
import zipfile
from pathlib import Path
import mimetypes
from typing import Optional
import whisper
import torch
import yt_dlp
import google.generativeai as genai
import time
load_dotenv()
vision_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
# Create the underlying REPL tool
#_python_repl = PythonREPLTool()
@tool
def google_grounding_search(query: str) -> str:
"""
Search for current information using Google's grounded search.
Use this tool when you need:
- Latest/current information (news, events, prices, etc.)
- Real-time data that might not be in your training
- Recent developments or updates
- Current facts to supplement your knowledge
Args:
query: Search query (be specific and focused)
Returns:
Current information from Google search with citations
Example usage:
- google_grounding_search("latest AI news January 2025")
- google_grounding_search("current Tesla stock price")
- google_grounding_search("Manchester United new signings 2025")
"""
try:
# Import the newer Google genai library
from google import genai
from google.genai import types
import os
# Get API key from environment
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
return "Error: GEMINI_API_KEY not found in environment variables"
# Initialize client and grounding tool
client = genai.Client(api_key=api_key)
grounding_tool = types.Tool(google_search=types.GoogleSearch())
# Configure for grounding
grounding_config = types.GenerateContentConfig(
tools=[grounding_tool]
)
#print(f"🔎 Performing grounded search for: {query}")
# Make grounded search request
response = client.models.generate_content(
model="gemini-2.0-flash",
contents=f"Search for and provide current information about: {query}",
config=grounding_config
)
result = response.text.strip()
if not result:
return "No results found from grounded search"
return f"Current Information (via Google Search):\n{result}"
except ImportError as e:
return f"Error: google-genai library not available. Import error: {str(e)}"
except Exception as e:
return f"Error performing grounded search: {str(e)}"
@tool
def execute_python(code: str) -> str:
"""Execute Python code for mathematical calculations, data analysis, and general computation.
Args:
code: Valid Python code to execute
Returns:
The output/result of the executed code
"""
try:
# For simple calculations, use eval
if all(char in "0123456789+-*/.() " for char in code.strip()):
result = eval(code)
return str(result)
# For more complex code, use exec with captured output
import io
import sys
from contextlib import redirect_stdout
# Capture stdout
captured_output = io.StringIO()
local_vars = {}
with redirect_stdout(captured_output):
exec(code, {"__builtins__": __builtins__}, local_vars)
output = captured_output.getvalue().strip()
# If no output was printed, try to return the last variable value
if not output and local_vars:
# Get the last defined variable
last_var = list(local_vars.values())[-1] if local_vars else None
if last_var is not None:
return str(last_var)
return output if output else "Code executed successfully (no output)"
except Exception as e:
return f"Error executing code: {str(e)}"
@tool
def download_files_from_api(task_id: str, file_extension: str = None) -> str:
"""Downloads a file (image, PDF, CSV, code, audio, Excel, etc.) associated with a task ID from the API.
The file is saved to a temporary location, and its local path is returned.
Args:
task_id: The task ID for which to download the file.
file_extension: Optional. The expected file extension (e.g., ".py", ".csv", ".pdf").
If provided, this will be used for the temporary file.
Otherwise, the extension will be inferred from the Content-Type header.
Returns:
The absolute path to the downloaded file, or an error message.
"""
try:
api_url = "https://agents-course-unit4-scoring.hf.space"
response = requests.get(f"{api_url}/files/{task_id}", timeout=30)
response.raise_for_status()
ext = file_extension
if not ext:
# Determine file extension from headers or default to .bin
content_type = response.headers.get('Content-Type', '')
if 'image/jpeg' in content_type:
ext = '.jpg'
elif 'image/png' in content_type:
ext = '.png'
elif 'application/pdf' in content_type:
ext = '.pdf'
elif 'text/csv' in content_type:
ext = '.csv'
elif 'text/x-python' in content_type or 'application/x-python-code' in content_type:
ext = '.py'
elif 'audio/mpeg' in content_type:
ext = '.mp3'
elif 'audio/wav' in content_type:
ext = '.wav'
elif 'application/vnd.ms-excel' in content_type:
ext = '.xls'
elif 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' in content_type:
ext = '.xlsx'
elif 'application/zip' in content_type:
ext = '.zip'
elif 'text/plain' in content_type:
ext = '.txt'
else:
ext = '.bin' # Default for unknown types
# Create a temporary file to save the content
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
temp_file.write(response.content)
file_path = temp_file.name
print(f"Downloaded file for task {task_id} to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
return f"Error downloading file from API: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
@tool
def process_image(image_path: str) -> str:
"""Analyze an image file from a local path - extract any text present and provide visual description.
This tool can handle various image formats like PNG, JPEG, GIF, etc.
Args:
image_path: The absolute path to the local image file.
Returns:
Extracted text (if any) and visual description of the image.
"""
try:
# Dynamically determine the MIME type
import mimetypes
mime_type, _ = mimetypes.guess_type(image_path)
if mime_type is None:
# Default to a common type if detection fails
mime_type = "application/octet-stream"
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
# First call: Extract text
text_message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"Extract all the text from this image. "
"Return only the extracted text, no explanations. "
"If no text is found, respond with 'No text found'."
),
},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_base64}"
},
},
]
)
]
text_response = vision_llm.invoke(text_message)
extracted_text = text_response.content.strip()
# Second call: Get description
description_message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"Describe what you see in this image in detail. "
"Be specific about objects, positions, colors, text, numbers, "
"and any other relevant visual information."
),
},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_base64}"
},
},
]
)
]
description_response = vision_llm.invoke(description_message)
description = description_response.content.strip()
# Format the combined result
result = f"TEXT EXTRACTED:\n{extracted_text}\n\nVISUAL DESCRIPTION:\n{description}"
return result
except FileNotFoundError:
return f"Error: Image file not found at {image_path}"
except Exception as e:
return f"Error processing image: {str(e)}"
@tool
def process_pdf(pdf_path: str) -> str:
"""Extracts all text content from a PDF file.
Args:
pdf_path: The absolute path to the local PDF file.
Returns:
A string containing all extracted text from the PDF, or an error message.
"""
try:
reader = pypdf.PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text if text else "No text found in PDF."
except FileNotFoundError:
return f"Error: PDF file not found at {pdf_path}"
except Exception as e:
return f"Error processing PDF: {str(e)}"
@tool
def process_csv(csv_path: str, operation: str = "summary", params: dict = None) -> str:
"""Processes a CSV file based on the specified operation.
Args:
csv_path: The absolute path to the local CSV file.
operation: The operation to perform. Supported operations:
"summary": Returns a summary of the CSV (head, columns, dtypes, shape).
"get_column": Returns the content of a specific column. Requires 'column_name' in params.
"filter": Filters rows based on a condition. Requires 'column', 'operator', 'value' in params.
Supported operators: "==", "!=", ">", "<", ">=", "<=".
"aggregate": Performs aggregation on a column. Requires 'agg_column', 'agg_function' in params.
Optional: 'group_by_column'. Supported functions: "sum", "mean", "count", "min", "max".
"describe": Returns descriptive statistics for numerical columns.
params: A dictionary of parameters for the chosen operation.
Returns:
A string containing the result of the operation, or an error message.
"""
if params is None:
params = {}
try:
df = pandas.read_csv(csv_path)
if operation == "summary":
summary = f"Shape: {df.shape}\n"
summary += f"Columns:\n{df.columns.tolist()}\n"
summary += f"Data Types:\n{df.dtypes}\n"
summary += f"First 5 rows:\n{df.head().to_string()}"
return summary
elif operation == "get_column":
column_name = params.get("column_name")
if column_name not in df.columns:
return f"Error: Column '{column_name}' not found."
return df[column_name].to_string()
elif operation == "filter":
column = params.get("column")
operator = params.get("operator")
value = params.get("value")
if not all([column, operator, value is not None]):
return "Error: 'column', 'operator', and 'value' are required for filter operation."
if column not in df.columns:
return f"Error: Column '{column}' not found."
if operator == "==":
filtered_df = df[df[column] == value]
elif operator == "!=":
filtered_df = df[df[column] != value]
elif operator == ">":
filtered_df = df[df[column] > value]
elif operator == "<":
filtered_df = df[df[column] < value]
elif operator == ">=":
filtered_df = df[df[column] >= value]
elif operator == "<=":
filtered_df = df[df[column] <= value]
else:
return f"Error: Unsupported operator '{operator}'."
return filtered_df.to_string()
elif operation == "aggregate":
agg_column = params.get("agg_column")
agg_function = params.get("agg_function")
group_by_column = params.get("group_by_column")
if not all([agg_column, agg_function]):
return "Error: 'agg_column' and 'agg_function' are required for aggregate operation."
if agg_column not in df.columns:
return f"Error: Column '{agg_column}' not found."
if group_by_column and group_by_column not in df.columns:
return f"Error: Group by column '{group_by_column}' not found."
if agg_function not in ["sum", "mean", "count", "min", "max"]:
return f"Error: Unsupported aggregation function '{agg_function}'."
if group_by_column:
result = df.groupby(group_by_column)[agg_column].agg(agg_function)
else:
result = df[agg_column].agg(agg_function)
return str(result)
elif operation == "describe":
return df.describe().to_string()
else:
return f"Error: Unsupported operation '{operation}'."
except FileNotFoundError:
return f"Error: CSV file not found at {csv_path}"
except Exception as e:
return f"Error processing CSV: {str(e)}"
@tool
def process_code_file(code_file_path: str) -> str:
"""Reads and executes a code file, returning its output along with the full code.
Args:
code_file_path: The absolute path to the local code file.
Returns:
A string containing the full code and the output of the executed code, or an error message.
"""
try:
with open(code_file_path, "r") as f:
code_content = f.read()
if code_file_path.endswith(".py"):
execution_output = execute_python(code_content)
return f"--- FULL CODE ---\n{code_content}--- EXECUTION OUTPUT ---\n{execution_output}"
else:
return f"Error: Only Python (.py) files are supported for execution. Found: {code_file_path}"
except FileNotFoundError:
return f"Error: Code file not found at {code_file_path}"
except Exception as e:
return f"Error processing code file: {str(e)}"
@tool
def process_excel(excel_path: str, operation: str = "summary", params: dict = None) -> str:
"""Processes an Excel file based on the specified operation.
Args:
excel_path: The absolute path to the local Excel file.
operation: The operation to perform. Supported operations:
"summary": Returns a summary of the Excel file (sheet names, columns, etc.).
"get_sheet": Returns the content of a specific sheet. Requires 'sheet_name' in params.
Returns:
A string containing the result of the operation, or an error message.
"""
if params is None:
params = {}
try:
xls = pandas.ExcelFile(excel_path)
if operation == "summary":
sheet_names = xls.sheet_names
summary = f"Sheets: {sheet_names}\n"
for sheet in sheet_names:
df = pandas.read_excel(xls, sheet_name=sheet)
summary += f"\n--- Sheet: {sheet} ---\n"
summary += f"Shape: {df.shape}\n"
summary += f"Columns: {df.columns.tolist()}\n"
summary += f"First 5 rows:\n{df.head().to_string()}\n"
return summary
elif operation == "get_sheet":
sheet_name = params.get("sheet_name")
if sheet_name not in xls.sheet_names:
return f"Error: Sheet '{sheet_name}' not found."
df = pandas.read_excel(xls, sheet_name=sheet_name)
return df.to_string()
else:
return f"Error: Unsupported operation '{operation}'."
except FileNotFoundError:
return f"Error: Excel file not found at {excel_path}"
except Exception as e:
return f"Error processing Excel file: {str(e)}"
@tool
def process_archive(archive_path: str, operation: str = "list", extract_to: str = None) -> str:
"""Processes a .zip archive file.
Args:
archive_path: The absolute path to the local .zip file.
operation: The operation to perform. Supported operations:
"list": Lists the contents of the archive.
"extract": Extracts the entire archive. Requires 'extract_to' parameter.
extract_to: Optional. The directory to extract the files to.
If not provided, it will create a directory with the same name as the archive.
Returns:
A string containing the result of the operation, or an error message.
"""
try:
if not zipfile.is_zipfile(archive_path):
return f"Error: File at {archive_path} is not a valid .zip file."
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
if operation == "list":
file_list = zip_ref.namelist()
return f"Files in archive: {file_list}"
elif operation == "extract":
if extract_to is None:
# Create a directory named after the zip file (without extension)
extract_to, _ = os.path.splitext(archive_path)
os.makedirs(extract_to, exist_ok=True)
zip_ref.extractall(extract_to)
return f"Archive extracted successfully to: {extract_to}"
else:
return f"Error: Unsupported operation '{operation}'."
except FileNotFoundError:
return f"Error: Archive file not found at {archive_path}"
except Exception as e:
return f"Error processing archive: {str(e)}"
@tool
def read_text_file(file_path: str) -> str:
"""Reads the entire content of a text file.
Args:
file_path: The absolute path to the local text file (.txt, .md, .json, etc.).
Returns:
A string containing the full content of the file, or an error message.
"""
try:
with open(file_path, "r", encoding='utf-8') as f:
content = f.read()
return content
except FileNotFoundError:
return f"Error: File not found at {file_path}"
except Exception as e:
return f"Error reading text file: {str(e)}"
# Global model cache to avoid reloading
_whisper_model = None
@tool
def process_audio(audio_path: str) -> str:
"""Analyzes an audio file using local Whisper model for transcription.
Args:
audio_path: The absolute path to the local audio file
Returns:
A transcription and basic analysis of the audio content
"""
global _whisper_model
try:
# Check if file exists
if not os.path.exists(audio_path):
return f"Error: Audio file not found at {audio_path}"
# Check file size
file_size = os.path.getsize(audio_path)
if file_size > 100 * 1024 * 1024: # 100MB limit
return f"Error: Audio file too large ({file_size / (1024*1024):.1f}MB)"
# Load model once and cache it
if _whisper_model is None:
try:
_whisper_model = whisper.load_model("base")
print("Whisper model loaded")
except Exception as e:
return f"Error loading Whisper model: {str(e)}\nTry: pip install openai-whisper"
# Transcribe audio
result = _whisper_model.transcribe(audio_path)
transcription = result["text"].strip()
detected_language = result.get("language", "unknown")
# Basic info
word_count = len(transcription.split())
return f"""AUDIO TRANSCRIPTION:
File: {Path(audio_path).name}
Size: {file_size / (1024*1024):.1f}MB
Language: {detected_language}
Words: {word_count}
TRANSCRIPT:
{transcription}
"""
except Exception as e:
return f"Error processing audio: {str(e)}"
@tool
def process_youtube_video(url: str, question: str) -> str:
"""
REQUIRED for YouTube video analysis. Downloads and analyzes YouTube videos
to answer questions about visual content, count objects, identify details.
Use this tool WHENEVER you see a YouTube URL in the question.
This is the ONLY way to analyze YouTube video content accurately.
Args:
url: YouTube video URL (any youtube.com or youtu.be link)
question: The specific question about the video content
Returns:
Detailed analysis of the actual video content
"""
try:
# Import and configure the direct Google AI library
import google.generativeai as genai
genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
# Create temporary directory for video
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Configure yt-dlp options
ydl_opts = {
'format': 'best[height<=720]', # Limit quality to save quota
'outtmpl': str(temp_path / '%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
}
print(f"Downloading video from: {url}")
# Download video
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_title = info.get('title', 'Unknown')
duration = info.get('duration', 0)
# Find downloaded file
video_files = list(temp_path.glob('*'))
if not video_files:
return "Error: Failed to download video file"
video_file = video_files[0]
file_size = video_file.stat().st_size / (1024 * 1024) # MB
print(f"Video downloaded: {video_title} ({duration}s, {file_size:.1f}MB)")
# Check file size limit
if file_size > 100: # 100MB limit for Gemini
return f"Error: Video too large ({file_size:.1f}MB). Maximum size is 100MB."
# Upload and process with Gemini
try:
# Upload video file
print("Uploading video to Gemini...")
video_file_obj = genai.upload_file(str(video_file))
# Wait for processing
while video_file_obj.state.name == "PROCESSING":
print("Processing video...")
time.sleep(2)
video_file_obj = genai.get_file(video_file_obj.name)
if video_file_obj.state.name == "FAILED":
return "Error: Video processing failed"
# Create analysis prompt
analysis_prompt = f"""Analyze this video carefully to answer the following question: {question}
Please examine the video content thoroughly and provide a detailed, accurate answer. Pay attention to visual details, timing, and any relevant information that helps answer the question.
Video title: {video_title}
Duration: {duration} seconds
Question: {question}"""
# Generate analysis with Gemini 2.0 Flash
model = genai.GenerativeModel('gemini-2.0-flash')
response = model.generate_content([analysis_prompt, video_file_obj])
# Clean up uploaded file
try:
genai.delete_file(video_file_obj.name)
except:
pass
return f"""VIDEO ANALYSIS:
Title: {video_title}
URL: {url}
Duration: {duration} seconds
Size: {file_size:.1f}MB
QUESTION: {question}
ANSWER: {response.text}"""
except Exception as processing_error:
return f"Error processing video with Gemini: {str(processing_error)}"
except ImportError:
return "Error: google-generativeai library not installed. Run: pip install google-generativeai"
except Exception as e:
return f"Error downloading or processing video: {str(e)}" |