# Image analyzer tool import tempfile # Building search web agent from llama_index.core import SimpleDirectoryReader from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec from tavily import AsyncTavilyClient from llama_index.core.tools import FunctionTool from llama_index.core.agent.workflow import AgentWorkflow import numexpr as ne from llama_index.llms.openai import OpenAI import base64 import openai import os from huggingface_hub import InferenceClient from dotenv import load_dotenv from youtube_transcript_api import YouTubeTranscriptApi import re import requests load_dotenv() OPEN_AI = os.getenv("OPENAI_API_KEY").strip() HF_TOKEN = os.environ.get("HF_TOKEN") TAVILY = os.getenv("TAVILY_KEY") client = InferenceClient(HF_TOKEN) def python_interpreter(code: str) -> str: """ Execute Python code and return the output (stdout). Use this when you need to perform complex calculations, string manipulations, or when asked to determine the output of a python script. WARNING: The code will be executed in the current environment. """ # Crear un buffer para capturar el print() buffer = io.StringIO() try: # Redirigir stdout a nuestro buffer with contextlib.redirect_stdout(buffer): # Crear un diccionario local para las variables local_scope = {} exec(code, {}, local_scope) output = buffer.getvalue() if not output and local_scope: # Si no hubo prints, devolvemos las variables finales return f"Code executed successfully. Variables: {local_scope}" return output if output else "Code executed. No output produced." except Exception as e: return f"Error executing code: {str(e)}" def transcribe_audio_openai(audio_path: str) -> str: """Transcribe audio using OpenAI Whisper API - compatible with Spaces""" try: if not os.path.exists(audio_path): return "Error: Audio file not found." # Verificar que la API key está disponible if not OPEN_AI: return "Error: OpenAI API key not configured" # Configurar OpenAI openai.api_key = OPEN_AI with open(audio_path, "rb") as audio_file: transcript = openai.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="text" ) return transcript # Retorna solo el texto transcrito except Exception as e: return f"Error transcribing audio: {str(e)}" def get_youtube_transcript(video_url: str) -> str: """Extract transcript from a YouTube video URL.""" try: # Extraer el ID del video if "youtu.be/" in video_url: video_id = video_url.split("youtu.be/")[1].split("?")[0] elif "youtube.com/watch" in video_url: video_id = video_url.split("v=")[1].split("&")[0] else: return "Invalid YouTube URL" transcript_yt = YouTubeTranscriptApi() fetched_transcript = transcript_yt.fetch(video_id) transcript = [] for snippet in fetched_transcript: transcript.append(snippet.text) text = "\n".join(transcript) return text[:3000] # límite razonable except Exception as e: return f"Unavailable. Error: {str(e)}" def read_document(file_name: str) -> str: """ Downloads a file from the GAIA source if not present. - If it's a text file (.txt, .py, .csv, .json, .md), returns the content. - If it's a binary file (.xlsx, .pdf, .png, .mp3), returns the file path and instructions to use other tools (like python_interpreter or analyze_image). """ import os import requests # Limpiar el nombre del archivo (a veces el LLM alucina rutas) file_name = os.path.basename(file_name) # URL oficial de validación de GAIA (Donde viven los archivos reales) base_url = "https://huggingface.co/datasets/gaia-benchmark/GAIA/resolve/main/2023/validation" file_url = f"{base_url}/{file_name}" # 1. Descarga si no existe if not os.path.exists(file_name): try: print(f"📥 Downloading {file_name} from {file_url}...") response = requests.get(file_url) if response.status_code == 200: with open(file_name, "wb") as f: f.write(response.content) else: return f"Error: File {file_name} not found in GAIA source (Status {response.status_code})." except Exception as e: return f"Error downloading file: {str(e)}" # 2. Decidir cómo leerlo según la extensión _, ext = os.path.splitext(file_name) ext = ext.lower() # Lista de archivos que NO se deben leer como texto plano binary_extensions = ['.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.mp3', '.wav', '.pdf', '.zip'] if ext in binary_extensions: return (f"File '{file_name}' has been downloaded and saved locally. " f"It is a binary file ({ext}). DO NOT read it as text. " f"Use 'python_interpreter' (with pandas for excel) or 'analyze_image' to process it.") # 3. Si es texto, leerlo y devolver el contenido try: with open(file_name, "r", encoding='utf-8', errors='ignore') as file: content = file.read() # Truncar si es demasiado largo para evitar errores de contexto if len(content) > 10000: return content[:10000] + "\n...[Content Truncated]..." return content except Exception as e: return f"Error reading text file: {str(e)}" def analyze_image(image_path: str, question = """ Describe what you see in this image""") -> str: """Analyze and extract information from images """ try: if not os.path.exists(image_path): return "Error: Image file not found at the specified path." with open(image_path, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") # ver el tipo de la imagen ext = os.path.splitext(image_path)[1].lower() mime_type = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' }.get(ext, 'image/jpeg') from llama_index.core.base.llms.types import ChatMessage, MessageRole message = ChatMessage( role = MessageRole.USER, content = [ {type: "text", "text": question}, { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{image_base64}" } } ] ) llm = OpenAI(api_key=OPEN_AI,model="gpt-4o-mini",temperature=0.7) response = llm.chat([message]) return response.message.content except Exception as e: return f"Error analyzing image: {str(e)}" # Funciòn para calcular expresiones matemáticas def calculator_numexpr(expression: str) -> str: """ Evaluate expresiones matem'aticas """ try: expression = expression.strip() # Evaluar la expression result = ne.evaluate(expression) if hasattr(result, 'item'): result = result.item() return f"Result: {result}" except Exception as e: return f"Error calculating '{expression}': {str(e)}" # internet tool_spec = DuckDuckGoSearchToolSpec() async def search_web(query: str) -> str: """Useful for using the web to answer questions.""" client = AsyncTavilyClient(api_key=TAVILY) return str(await client.search(query)) # Definimos las tool python_tool = FunctionTool.from_defaults( fn=python_interpreter, name="python_interpreter", description="Executes Python code. Use this to run code found in files or to perform complex logic." ) image_analyzer_tool = FunctionTool.from_defaults( fn = analyze_image, name = "analyze_image", description = "Analyze image to extract information, identify objects and read text" ) calculator_tool = FunctionTool.from_defaults( fn=calculator_numexpr, name="calculator", description="Evaluate mathematical expressions including basic operations (+, -, *, /, **) and functions (sqrt, log, sin, cos, etc.)" ) read_document_tool = FunctionTool.from_defaults(fn = read_document, name="read_document", description = "Read and extract text content from documents including PDF, DOCX, TXT, MD, CSV, and JSON files") youtube_transcript_tool = FunctionTool.from_defaults( fn=get_youtube_transcript, name="youtube_transcript", description="Get transcript/subtitles from YouTube videos. Use this when you need to know what is said in a YouTube video." ) search_tool = FunctionTool.from_defaults( fn=search_web, name="web_search", # ✅ Nombre explícito description="Search the web for current information, facts, and answers to questions" )