Grux3 / src /tools /multimodal.py
BladeSzaSza's picture
feat: working local agent with test cases passing
d61265e
import base64
import os
import logging
import re
import glob
from pathlib import Path
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from src.utils.config import config
from src.tools.enhanced_tools import download_file_from_url
from src.tools.safe_web_tools import SafeYouTubeTranscriptTool
logger = logging.getLogger(__name__)
# Initialize multimodal model on demand
def get_multimodal_model():
"""Get multimodal model instance, initialized on first use."""
if not hasattr(get_multimodal_model, '_model'):
get_multimodal_model._model = ChatOpenAI(model="gpt-4o-mini", api_key=config.GPT_KEY, max_tokens=1024)
return get_multimodal_model._model
def encode_file_to_base64(file_path: str) -> str:
"""Encodes any file into a base64 string."""
try:
with open(file_path, "rb") as file_content:
return base64.b64encode(file_content.read()).decode('utf-8')
except Exception as e:
return f"Error encoding file: {e}"
def resolve_file_path(file_path: str) -> str:
"""Resolve file path, handling various scenarios including temporary paths."""
logger.debug(f"Resolving file path: {file_path}")
# First, try the path as-is
if os.path.exists(file_path):
logger.debug(f"File found at original path: {file_path}")
return file_path
# If it's a relative path, try from current directory
if not os.path.isabs(file_path):
abs_path = os.path.abspath(file_path)
if os.path.exists(abs_path):
logger.debug(f"File found at absolute path: {abs_path}")
return abs_path
# Try to find the file in common temporary directories
temp_dirs = ['/tmp', '/var/tmp', os.path.expanduser('~/tmp')]
filename = os.path.basename(file_path)
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
# Look for exact filename
potential_path = os.path.join(temp_dir, filename)
if os.path.exists(potential_path):
logger.debug(f"File found in temp directory: {potential_path}")
return potential_path
# Look for files with similar names using glob
pattern = os.path.join(temp_dir, f"*{filename}*")
matches = glob.glob(pattern)
if matches:
logger.debug(f"File found via glob pattern: {matches[0]}")
return matches[0]
# Look in subdirectories for GAIA-style temp paths
pattern = os.path.join(temp_dir, "gaia_task_*", filename)
matches = glob.glob(pattern)
if matches:
logger.debug(f"File found in GAIA temp directory: {matches[0]}")
return matches[0]
# If we still haven't found it, try a broader search in /tmp
if filename:
pattern = f"/tmp/**/{'*'.join(filename.split('.'))}*"
matches = glob.glob(pattern, recursive=True)
if matches:
logger.debug(f"File found via recursive search: {matches[0]}")
return matches[0]
logger.warning(f"Could not resolve file path: {file_path}")
return file_path # Return original path if we can't find it
@tool
def vision_analyzer(image_path: str, question: str) -> str:
"""Analyzes an image based on a specific question using a vision model.
This tool is designed to interpret the content of image files (PNG, JPG, etc.)
and answer questions about them. It's ideal for tasks requiring visual understanding,
such as describing image content, identifying objects, or answering questions
related to visual data.
Args:
image_path (str): The path to the image file. Can be absolute, relative, or a filename.
question (str): The specific question to ask about the image content.
Returns:
A string containing the answer to the question based on the image analysis.
Returns an error message if the image file is not found or analysis fails.
"""
logger.debug(f"vision_analyzer called with image_path: {image_path}, question: {question}")
# Resolve the file path
resolved_path = resolve_file_path(image_path)
if not os.path.exists(resolved_path):
logger.error(f"Image file not found at {resolved_path} (original: {image_path})")
# Provide helpful debugging information
debug_info = []
debug_info.append(f"Original path: {image_path}")
debug_info.append(f"Resolved path: {resolved_path}")
debug_info.append(f"Current working directory: {os.getcwd()}")
# List files in current directory
try:
current_files = os.listdir('.')
image_files = [f for f in current_files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]
if image_files:
debug_info.append(f"Image files in current directory: {image_files}")
except:
pass
# Check /tmp for any image files
try:
tmp_files = glob.glob('/tmp/**/*.png', recursive=True) + glob.glob('/tmp/**/*.jpg', recursive=True)
if tmp_files:
debug_info.append(f"Image files in /tmp: {tmp_files[:5]}") # Show first 5
except:
pass
return f"Error: Image file not found.\n" + "\n".join(debug_info)
base64_image = encode_file_to_base64(resolved_path)
if base64_image.startswith("Error"):
logger.error(f"Error encoding image: {base64_image}")
return base64_image
try:
msg = get_multimodal_model().invoke(
[
HumanMessage(
content=[
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{base64_image}"},
},
]
)
]
)
logger.debug(f"vision_analyzer output: {msg.content}")
return msg.content
except Exception as e:
logger.error(f"Error during vision analysis: {e}")
return f"Error during vision analysis: {str(e)}"
@tool
def audio_transcriber(audio_path: str) -> str:
"""Transcribes an audio file into text using a speech-to-text model (e.g., Whisper).
This tool is useful for converting spoken language from audio files (MP3, WAV, etc.)
into written text. It can be used for transcribing voice memos, interviews,
lectures, or any audio content.
Args:
audio_path (str): The path to the audio file. Can be absolute, relative, or a filename.
Returns:
A string containing the transcribed text from the audio file.
Returns an error message if the audio file is not found or transcription fails.
"""
# Note: This requires the 'openai' package with whisper capabilities.
# Ensure 'pip install openai' is up to date.
from openai import OpenAI
logger.debug(f"audio_transcriber called with audio_path: {audio_path}")
# Resolve the file path
resolved_path = resolve_file_path(audio_path)
if not os.path.exists(resolved_path):
logger.error(f"Audio file not found at {resolved_path} (original: {audio_path})")
return f"Error: Audio file not found at {resolved_path}"
try:
client = OpenAI(api_key=config.GPT_KEY)
with open(resolved_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
logger.debug(f"Audio transcription successful.")
return f"Transcription successful:\n```\n{transcript.text}\n```"
except Exception as e:
logger.error(f"Error during audio transcription: {e}", exc_info=True)
return f"Error during audio transcription: {str(e)}"
@tool
def pdf_analyzer(pdf_path: str, question: str) -> str:
"""Analyzes a PDF file based on a specific question using a multimodal model.
This tool allows you to extract information, summarize content, or answer questions
from PDF documents. It's suitable for processing reports, articles, manuals,
or any text-based PDF content.
Args:
pdf_path (str): The path to the PDF file. Can be absolute, relative, or a filename.
question (str): The specific question to ask about the PDF content.
Returns:
A string containing the answer to the question based on the PDF analysis.
Returns an error message if the PDF file is not found or analysis fails.
"""
logger.debug(f"pdf_analyzer called with pdf_path: {pdf_path}, question: {question}")
# Resolve the file path
resolved_path = resolve_file_path(pdf_path)
if not os.path.exists(resolved_path):
logger.error(f"PDF file not found at {resolved_path} (original: {pdf_path})")
return f"Error: PDF file not found at {resolved_path}"
base64_pdf = encode_file_to_base64(resolved_path)
if base64_pdf.startswith("Error"):
logger.error(f"Error encoding PDF: {base64_pdf}")
return base64_pdf
try:
msg = get_multimodal_model().invoke(
[
HumanMessage(
content=[
{"type": "text", "text": question},
{
"type": "input_file",
"filename": os.path.basename(resolved_path),
"file_data": f"data:application/pdf;base64,{base64_pdf}",
},
]
)
]
)
logger.debug(f"pdf_analyzer output: {msg.content}")
return msg.content
except Exception as e:
logger.error(f"Error during PDF analysis: {e}")
return f"Error during PDF analysis: {str(e)}"
@tool
def youtube_visual_analyzer(youtube_url: str, question: str) -> str:
"""Analyzes a YouTube video using Google's Gemini API for comprehensive video understanding.
This tool uses Google's Gemini 2.0 Flash model to analyze YouTube videos directly,
providing detailed insights about the video content, visual elements, actions,
objects, people, and more. Unlike thumbnail analysis, this processes the entire video.
Args:
youtube_url (str): The URL of the YouTube video to analyze.
question (str): The specific question to ask about the video's content.
Returns:
A string containing the answer to the question based on the complete video analysis.
Returns an error message if the video cannot be analyzed.
"""
logger.debug(f"youtube_visual_analyzer called with youtube_url: {youtube_url}, question: {question}")
# Validate YouTube URL format
if not re.match(r'https?://(www\.)?(youtube\.com/watch\?v=|youtu\.be/)', youtube_url):
logger.error(f"Invalid YouTube URL format: {youtube_url}")
return "Invalid YouTube URL format. Please provide a valid YouTube URL."
# Check if Google API key is available
if not config.GOOGLE_API_KEY:
logger.error("Google API key not found in environment variables")
return "Error: Google API key not configured. Please set GOOGLE_API_KEY environment variable."
try:
# Import Google Generative AI SDK
import google.generativeai as genai
# Configure the API
genai.configure(api_key=config.GOOGLE_API_KEY)
model = genai.GenerativeModel('gemini-2.0-flash-exp')
# Use the file upload API to process the YouTube video
# According to Google AI documentation, YouTube URLs can be passed directly
logger.debug("Sending request to Gemini API for video analysis")
# Create content with YouTube URL and question
content = [
{
"file_data": {
"file_uri": youtube_url
}
},
{
"text": f"Please analyze this YouTube video and answer the following question: {question}"
}
]
response = model.generate_content(content)
if response.text:
logger.debug("Successfully received response from Gemini API")
return response.text
else:
logger.error("Empty response from Gemini API")
return "Error: Received empty response from Gemini API"
except ImportError as e:
logger.error(f"Google Generative AI SDK not installed: {e}")
return "Error: Google Generative AI SDK not installed. Please install with: pip install google-generativeai"
except Exception as e:
logger.error(f"Error during Gemini video analysis: {e}")
return f"Error during video analysis: {str(e)}"
@tool
def find_image_files(directory: str = ".") -> str:
"""Find image files in a directory and its subdirectories.
This utility tool helps locate image files when you're not sure of the exact path.
Args:
directory: Directory to search in (default: current directory)
Returns:
List of found image files with their full paths
"""
try:
image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp']
found_files = []
# Search in the specified directory
for ext in image_extensions:
pattern = os.path.join(directory, f"**/*{ext}")
matches = glob.glob(pattern, recursive=True)
found_files.extend(matches)
# Also search in common temp directories if directory is current dir
if directory == ".":
temp_dirs = ['/tmp', '/var/tmp']
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
for ext in image_extensions:
pattern = os.path.join(temp_dir, f"**/*{ext}")
matches = glob.glob(pattern, recursive=True)
found_files.extend(matches)
if not found_files:
return f"No image files found in {directory}"
# Sort and format results
found_files.sort()
result = f"Found {len(found_files)} image files:\n"
for i, file_path in enumerate(found_files[:20], 1): # Show first 20
result += f"{i}. {file_path}\n"
if len(found_files) > 20:
result += f"... and {len(found_files) - 20} more files"
return result
except Exception as e:
return f"Error searching for image files: {str(e)}"