Spaces:
Paused
Paused
| import base64 | |
| import os | |
| import logging | |
| import re | |
| import glob | |
| from pathlib import Path | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import HumanMessage | |
| from langchain_openai import ChatOpenAI | |
| from src.utils.config import config | |
| from src.tools.enhanced_tools import download_file_from_url | |
| from src.tools.safe_web_tools import SafeYouTubeTranscriptTool | |
| logger = logging.getLogger(__name__) | |
| # Initialize multimodal model on demand | |
| def get_multimodal_model(): | |
| """Get multimodal model instance, initialized on first use.""" | |
| if not hasattr(get_multimodal_model, '_model'): | |
| get_multimodal_model._model = ChatOpenAI(model="gpt-4o-mini", api_key=config.GPT_KEY, max_tokens=1024) | |
| return get_multimodal_model._model | |
| def encode_file_to_base64(file_path: str) -> str: | |
| """Encodes any file into a base64 string.""" | |
| try: | |
| with open(file_path, "rb") as file_content: | |
| return base64.b64encode(file_content.read()).decode('utf-8') | |
| except Exception as e: | |
| return f"Error encoding file: {e}" | |
| def resolve_file_path(file_path: str) -> str: | |
| """Resolve file path, handling various scenarios including temporary paths.""" | |
| logger.debug(f"Resolving file path: {file_path}") | |
| # First, try the path as-is | |
| if os.path.exists(file_path): | |
| logger.debug(f"File found at original path: {file_path}") | |
| return file_path | |
| # If it's a relative path, try from current directory | |
| if not os.path.isabs(file_path): | |
| abs_path = os.path.abspath(file_path) | |
| if os.path.exists(abs_path): | |
| logger.debug(f"File found at absolute path: {abs_path}") | |
| return abs_path | |
| # Try to find the file in common temporary directories | |
| temp_dirs = ['/tmp', '/var/tmp', os.path.expanduser('~/tmp')] | |
| filename = os.path.basename(file_path) | |
| for temp_dir in temp_dirs: | |
| if os.path.exists(temp_dir): | |
| # Look for exact filename | |
| potential_path = os.path.join(temp_dir, filename) | |
| if os.path.exists(potential_path): | |
| logger.debug(f"File found in temp directory: {potential_path}") | |
| return potential_path | |
| # Look for files with similar names using glob | |
| pattern = os.path.join(temp_dir, f"*{filename}*") | |
| matches = glob.glob(pattern) | |
| if matches: | |
| logger.debug(f"File found via glob pattern: {matches[0]}") | |
| return matches[0] | |
| # Look in subdirectories for GAIA-style temp paths | |
| pattern = os.path.join(temp_dir, "gaia_task_*", filename) | |
| matches = glob.glob(pattern) | |
| if matches: | |
| logger.debug(f"File found in GAIA temp directory: {matches[0]}") | |
| return matches[0] | |
| # If we still haven't found it, try a broader search in /tmp | |
| if filename: | |
| pattern = f"/tmp/**/{'*'.join(filename.split('.'))}*" | |
| matches = glob.glob(pattern, recursive=True) | |
| if matches: | |
| logger.debug(f"File found via recursive search: {matches[0]}") | |
| return matches[0] | |
| logger.warning(f"Could not resolve file path: {file_path}") | |
| return file_path # Return original path if we can't find it | |
| def vision_analyzer(image_path: str, question: str) -> str: | |
| """Analyzes an image based on a specific question using a vision model. | |
| This tool is designed to interpret the content of image files (PNG, JPG, etc.) | |
| and answer questions about them. It's ideal for tasks requiring visual understanding, | |
| such as describing image content, identifying objects, or answering questions | |
| related to visual data. | |
| Args: | |
| image_path (str): The path to the image file. Can be absolute, relative, or a filename. | |
| question (str): The specific question to ask about the image content. | |
| Returns: | |
| A string containing the answer to the question based on the image analysis. | |
| Returns an error message if the image file is not found or analysis fails. | |
| """ | |
| logger.debug(f"vision_analyzer called with image_path: {image_path}, question: {question}") | |
| # Resolve the file path | |
| resolved_path = resolve_file_path(image_path) | |
| if not os.path.exists(resolved_path): | |
| logger.error(f"Image file not found at {resolved_path} (original: {image_path})") | |
| # Provide helpful debugging information | |
| debug_info = [] | |
| debug_info.append(f"Original path: {image_path}") | |
| debug_info.append(f"Resolved path: {resolved_path}") | |
| debug_info.append(f"Current working directory: {os.getcwd()}") | |
| # List files in current directory | |
| try: | |
| current_files = os.listdir('.') | |
| image_files = [f for f in current_files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))] | |
| if image_files: | |
| debug_info.append(f"Image files in current directory: {image_files}") | |
| except: | |
| pass | |
| # Check /tmp for any image files | |
| try: | |
| tmp_files = glob.glob('/tmp/**/*.png', recursive=True) + glob.glob('/tmp/**/*.jpg', recursive=True) | |
| if tmp_files: | |
| debug_info.append(f"Image files in /tmp: {tmp_files[:5]}") # Show first 5 | |
| except: | |
| pass | |
| return f"Error: Image file not found.\n" + "\n".join(debug_info) | |
| base64_image = encode_file_to_base64(resolved_path) | |
| if base64_image.startswith("Error"): | |
| logger.error(f"Error encoding image: {base64_image}") | |
| return base64_image | |
| try: | |
| msg = get_multimodal_model().invoke( | |
| [ | |
| HumanMessage( | |
| content=[ | |
| {"type": "text", "text": question}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}, | |
| }, | |
| ] | |
| ) | |
| ] | |
| ) | |
| logger.debug(f"vision_analyzer output: {msg.content}") | |
| return msg.content | |
| except Exception as e: | |
| logger.error(f"Error during vision analysis: {e}") | |
| return f"Error during vision analysis: {str(e)}" | |
| def audio_transcriber(audio_path: str) -> str: | |
| """Transcribes an audio file into text using a speech-to-text model (e.g., Whisper). | |
| This tool is useful for converting spoken language from audio files (MP3, WAV, etc.) | |
| into written text. It can be used for transcribing voice memos, interviews, | |
| lectures, or any audio content. | |
| Args: | |
| audio_path (str): The path to the audio file. Can be absolute, relative, or a filename. | |
| Returns: | |
| A string containing the transcribed text from the audio file. | |
| Returns an error message if the audio file is not found or transcription fails. | |
| """ | |
| # Note: This requires the 'openai' package with whisper capabilities. | |
| # Ensure 'pip install openai' is up to date. | |
| from openai import OpenAI | |
| logger.debug(f"audio_transcriber called with audio_path: {audio_path}") | |
| # Resolve the file path | |
| resolved_path = resolve_file_path(audio_path) | |
| if not os.path.exists(resolved_path): | |
| logger.error(f"Audio file not found at {resolved_path} (original: {audio_path})") | |
| return f"Error: Audio file not found at {resolved_path}" | |
| try: | |
| client = OpenAI(api_key=config.GPT_KEY) | |
| with open(resolved_path, "rb") as audio_file: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file | |
| ) | |
| logger.debug(f"Audio transcription successful.") | |
| return f"Transcription successful:\n```\n{transcript.text}\n```" | |
| except Exception as e: | |
| logger.error(f"Error during audio transcription: {e}", exc_info=True) | |
| return f"Error during audio transcription: {str(e)}" | |
| def pdf_analyzer(pdf_path: str, question: str) -> str: | |
| """Analyzes a PDF file based on a specific question using a multimodal model. | |
| This tool allows you to extract information, summarize content, or answer questions | |
| from PDF documents. It's suitable for processing reports, articles, manuals, | |
| or any text-based PDF content. | |
| Args: | |
| pdf_path (str): The path to the PDF file. Can be absolute, relative, or a filename. | |
| question (str): The specific question to ask about the PDF content. | |
| Returns: | |
| A string containing the answer to the question based on the PDF analysis. | |
| Returns an error message if the PDF file is not found or analysis fails. | |
| """ | |
| logger.debug(f"pdf_analyzer called with pdf_path: {pdf_path}, question: {question}") | |
| # Resolve the file path | |
| resolved_path = resolve_file_path(pdf_path) | |
| if not os.path.exists(resolved_path): | |
| logger.error(f"PDF file not found at {resolved_path} (original: {pdf_path})") | |
| return f"Error: PDF file not found at {resolved_path}" | |
| base64_pdf = encode_file_to_base64(resolved_path) | |
| if base64_pdf.startswith("Error"): | |
| logger.error(f"Error encoding PDF: {base64_pdf}") | |
| return base64_pdf | |
| try: | |
| msg = get_multimodal_model().invoke( | |
| [ | |
| HumanMessage( | |
| content=[ | |
| {"type": "text", "text": question}, | |
| { | |
| "type": "input_file", | |
| "filename": os.path.basename(resolved_path), | |
| "file_data": f"data:application/pdf;base64,{base64_pdf}", | |
| }, | |
| ] | |
| ) | |
| ] | |
| ) | |
| logger.debug(f"pdf_analyzer output: {msg.content}") | |
| return msg.content | |
| except Exception as e: | |
| logger.error(f"Error during PDF analysis: {e}") | |
| return f"Error during PDF analysis: {str(e)}" | |
| def youtube_visual_analyzer(youtube_url: str, question: str) -> str: | |
| """Analyzes a YouTube video using Google's Gemini API for comprehensive video understanding. | |
| This tool uses Google's Gemini 2.0 Flash model to analyze YouTube videos directly, | |
| providing detailed insights about the video content, visual elements, actions, | |
| objects, people, and more. Unlike thumbnail analysis, this processes the entire video. | |
| Args: | |
| youtube_url (str): The URL of the YouTube video to analyze. | |
| question (str): The specific question to ask about the video's content. | |
| Returns: | |
| A string containing the answer to the question based on the complete video analysis. | |
| Returns an error message if the video cannot be analyzed. | |
| """ | |
| logger.debug(f"youtube_visual_analyzer called with youtube_url: {youtube_url}, question: {question}") | |
| # Validate YouTube URL format | |
| if not re.match(r'https?://(www\.)?(youtube\.com/watch\?v=|youtu\.be/)', youtube_url): | |
| logger.error(f"Invalid YouTube URL format: {youtube_url}") | |
| return "Invalid YouTube URL format. Please provide a valid YouTube URL." | |
| # Check if Google API key is available | |
| if not config.GOOGLE_API_KEY: | |
| logger.error("Google API key not found in environment variables") | |
| return "Error: Google API key not configured. Please set GOOGLE_API_KEY environment variable." | |
| try: | |
| # Import Google Generative AI SDK | |
| import google.generativeai as genai | |
| # Configure the API | |
| genai.configure(api_key=config.GOOGLE_API_KEY) | |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| # Use the file upload API to process the YouTube video | |
| # According to Google AI documentation, YouTube URLs can be passed directly | |
| logger.debug("Sending request to Gemini API for video analysis") | |
| # Create content with YouTube URL and question | |
| content = [ | |
| { | |
| "file_data": { | |
| "file_uri": youtube_url | |
| } | |
| }, | |
| { | |
| "text": f"Please analyze this YouTube video and answer the following question: {question}" | |
| } | |
| ] | |
| response = model.generate_content(content) | |
| if response.text: | |
| logger.debug("Successfully received response from Gemini API") | |
| return response.text | |
| else: | |
| logger.error("Empty response from Gemini API") | |
| return "Error: Received empty response from Gemini API" | |
| except ImportError as e: | |
| logger.error(f"Google Generative AI SDK not installed: {e}") | |
| return "Error: Google Generative AI SDK not installed. Please install with: pip install google-generativeai" | |
| except Exception as e: | |
| logger.error(f"Error during Gemini video analysis: {e}") | |
| return f"Error during video analysis: {str(e)}" | |
| def find_image_files(directory: str = ".") -> str: | |
| """Find image files in a directory and its subdirectories. | |
| This utility tool helps locate image files when you're not sure of the exact path. | |
| Args: | |
| directory: Directory to search in (default: current directory) | |
| Returns: | |
| List of found image files with their full paths | |
| """ | |
| try: | |
| image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp'] | |
| found_files = [] | |
| # Search in the specified directory | |
| for ext in image_extensions: | |
| pattern = os.path.join(directory, f"**/*{ext}") | |
| matches = glob.glob(pattern, recursive=True) | |
| found_files.extend(matches) | |
| # Also search in common temp directories if directory is current dir | |
| if directory == ".": | |
| temp_dirs = ['/tmp', '/var/tmp'] | |
| for temp_dir in temp_dirs: | |
| if os.path.exists(temp_dir): | |
| for ext in image_extensions: | |
| pattern = os.path.join(temp_dir, f"**/*{ext}") | |
| matches = glob.glob(pattern, recursive=True) | |
| found_files.extend(matches) | |
| if not found_files: | |
| return f"No image files found in {directory}" | |
| # Sort and format results | |
| found_files.sort() | |
| result = f"Found {len(found_files)} image files:\n" | |
| for i, file_path in enumerate(found_files[:20], 1): # Show first 20 | |
| result += f"{i}. {file_path}\n" | |
| if len(found_files) > 20: | |
| result += f"... and {len(found_files) - 20} more files" | |
| return result | |
| except Exception as e: | |
| return f"Error searching for image files: {str(e)}" | |