Cmuroc27's picture
funcion read mejorada
fc7df56
# Image analyzer tool
import tempfile
# Building search web agent
from llama_index.core import SimpleDirectoryReader
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec
from tavily import AsyncTavilyClient
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import AgentWorkflow
import numexpr as ne
from llama_index.llms.openai import OpenAI
import base64
import openai
import os
from huggingface_hub import InferenceClient
from dotenv import load_dotenv
from youtube_transcript_api import YouTubeTranscriptApi
import re
import requests
load_dotenv()
OPEN_AI = os.getenv("OPENAI_API_KEY").strip()
HF_TOKEN = os.environ.get("HF_TOKEN")
TAVILY = os.getenv("TAVILY_KEY")
client = InferenceClient(HF_TOKEN)
def python_interpreter(code: str) -> str:
"""
Execute Python code and return the output (stdout).
Use this when you need to perform complex calculations, string manipulations,
or when asked to determine the output of a python script.
WARNING: The code will be executed in the current environment.
"""
# Crear un buffer para capturar el print()
buffer = io.StringIO()
try:
# Redirigir stdout a nuestro buffer
with contextlib.redirect_stdout(buffer):
# Crear un diccionario local para las variables
local_scope = {}
exec(code, {}, local_scope)
output = buffer.getvalue()
if not output and local_scope:
# Si no hubo prints, devolvemos las variables finales
return f"Code executed successfully. Variables: {local_scope}"
return output if output else "Code executed. No output produced."
except Exception as e:
return f"Error executing code: {str(e)}"
def transcribe_audio_openai(audio_path: str) -> str:
"""Transcribe audio using OpenAI Whisper API - compatible with Spaces"""
try:
if not os.path.exists(audio_path):
return "Error: Audio file not found."
# Verificar que la API key está disponible
if not OPEN_AI:
return "Error: OpenAI API key not configured"
# Configurar OpenAI
openai.api_key = OPEN_AI
with open(audio_path, "rb") as audio_file:
transcript = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
return transcript # Retorna solo el texto transcrito
except Exception as e:
return f"Error transcribing audio: {str(e)}"
def get_youtube_transcript(video_url: str) -> str:
"""Extract transcript from a YouTube video URL."""
try:
# Extraer el ID del video
if "youtu.be/" in video_url:
video_id = video_url.split("youtu.be/")[1].split("?")[0]
elif "youtube.com/watch" in video_url:
video_id = video_url.split("v=")[1].split("&")[0]
else:
return "Invalid YouTube URL"
transcript_yt = YouTubeTranscriptApi()
fetched_transcript = transcript_yt.fetch(video_id)
transcript = []
for snippet in fetched_transcript:
transcript.append(snippet.text)
text = "\n".join(transcript)
return text[:3000] # límite razonable
except Exception as e:
return f"Unavailable. Error: {str(e)}"
def read_document(file_name: str) -> str:
"""
Downloads a file from the GAIA source if not present.
- If it's a text file (.txt, .py, .csv, .json, .md), returns the content.
- If it's a binary file (.xlsx, .pdf, .png, .mp3), returns the file path
and instructions to use other tools (like python_interpreter or analyze_image).
"""
import os
import requests
# Limpiar el nombre del archivo (a veces el LLM alucina rutas)
file_name = os.path.basename(file_name)
# URL oficial de validación de GAIA (Donde viven los archivos reales)
base_url = "https://huggingface.co/datasets/gaia-benchmark/GAIA/resolve/main/2023/validation"
file_url = f"{base_url}/{file_name}"
# 1. Descarga si no existe
if not os.path.exists(file_name):
try:
print(f"📥 Downloading {file_name} from {file_url}...")
response = requests.get(file_url)
if response.status_code == 200:
with open(file_name, "wb") as f:
f.write(response.content)
else:
return f"Error: File {file_name} not found in GAIA source (Status {response.status_code})."
except Exception as e:
return f"Error downloading file: {str(e)}"
# 2. Decidir cómo leerlo según la extensión
_, ext = os.path.splitext(file_name)
ext = ext.lower()
# Lista de archivos que NO se deben leer como texto plano
binary_extensions = ['.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.mp3', '.wav', '.pdf', '.zip']
if ext in binary_extensions:
return (f"File '{file_name}' has been downloaded and saved locally. "
f"It is a binary file ({ext}). DO NOT read it as text. "
f"Use 'python_interpreter' (with pandas for excel) or 'analyze_image' to process it.")
# 3. Si es texto, leerlo y devolver el contenido
try:
with open(file_name, "r", encoding='utf-8', errors='ignore') as file:
content = file.read()
# Truncar si es demasiado largo para evitar errores de contexto
if len(content) > 10000:
return content[:10000] + "\n...[Content Truncated]..."
return content
except Exception as e:
return f"Error reading text file: {str(e)}"
def analyze_image(image_path: str, question = """ Describe what you see in this image""") -> str:
"""Analyze and extract information from images """
try:
if not os.path.exists(image_path):
return "Error: Image file not found at the specified path."
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
# ver el tipo de la imagen
ext = os.path.splitext(image_path)[1].lower()
mime_type = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}.get(ext, 'image/jpeg')
from llama_index.core.base.llms.types import ChatMessage, MessageRole
message = ChatMessage(
role = MessageRole.USER,
content = [
{type: "text", "text": question},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_base64}"
}
}
]
)
llm = OpenAI(api_key=OPEN_AI,model="gpt-4o-mini",temperature=0.7)
response = llm.chat([message])
return response.message.content
except Exception as e:
return f"Error analyzing image: {str(e)}"
# Funciòn para calcular expresiones matemáticas
def calculator_numexpr(expression: str) -> str:
"""
Evaluate expresiones matem'aticas
"""
try:
expression = expression.strip()
# Evaluar la expression
result = ne.evaluate(expression)
if hasattr(result, 'item'):
result = result.item()
return f"Result: {result}"
except Exception as e:
return f"Error calculating '{expression}': {str(e)}"
# internet
tool_spec = DuckDuckGoSearchToolSpec()
async def search_web(query: str) -> str:
"""Useful for using the web to answer questions."""
client = AsyncTavilyClient(api_key=TAVILY)
return str(await client.search(query))
# Definimos las tool
python_tool = FunctionTool.from_defaults(
fn=python_interpreter,
name="python_interpreter",
description="Executes Python code. Use this to run code found in files or to perform complex logic."
)
image_analyzer_tool = FunctionTool.from_defaults(
fn = analyze_image,
name = "analyze_image",
description = "Analyze image to extract information, identify objects and read text"
)
calculator_tool = FunctionTool.from_defaults(
fn=calculator_numexpr,
name="calculator",
description="Evaluate mathematical expressions including basic operations (+, -, *, /, **) and functions (sqrt, log, sin, cos, etc.)"
)
read_document_tool = FunctionTool.from_defaults(fn = read_document,
name="read_document",
description = "Read and extract text content from documents including PDF, DOCX, TXT, MD, CSV, and JSON files")
youtube_transcript_tool = FunctionTool.from_defaults(
fn=get_youtube_transcript,
name="youtube_transcript",
description="Get transcript/subtitles from YouTube videos. Use this when you need to know what is said in a YouTube video."
)
search_tool = FunctionTool.from_defaults(
fn=search_web,
name="web_search", # ✅ Nombre explícito
description="Search the web for current information, facts, and answers to questions"
)