FD900's picture
Update tools.py
9b4ce5f verified
raw
history blame
4.11 kB
import tempfile
import requests
import os
from urllib.parse import urlparse
from typing import Optional, List
import yt_dlp
import imageio
from PIL import Image
import whisper
from dotenv import load_dotenv
# Fallback tool decorator if gaia_benchmark.tools is not available
try:
from gaia_benchmark.tools import tool
except ImportError:
def tool(func):
return func
load_dotenv()
@tool
def use_vision_model(question: str, images: List[Image.Image]) -> str:
return "Vision model is not available for Mistral. Please integrate a separate endpoint for image analysis."
@tool
def review_youtube_video(url: str, question: str) -> str:
return "This tool is currently unsupported with Mistral. Please remove or replace."
@tool
def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]:
with tempfile.TemporaryDirectory() as tmpdir:
ydl_opts = {
'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
'quiet': True,
'noplaylist': True,
'merge_output_format': 'mp4',
'force_ipv4': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
video_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.mp4')), None)
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data().get('fps', 25)
frame_interval = int(fps * sample_interval_seconds)
images = [Image.fromarray(frame) for idx, frame in enumerate(reader) if idx % frame_interval == 0]
reader.close()
return images
@tool
def read_file(filepath: str) -> str:
try:
with open(filepath, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
return f"Error reading file: {str(e)}"
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
try:
if not filename:
filename = os.path.basename(urlparse(url).path) or f"download_{os.urandom(4).hex()}"
filepath = os.path.join(tempfile.gettempdir(), filename)
response = requests.get(url)
with open(filepath, 'wb') as f:
f.write(response.content)
return filepath
except Exception as e:
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
try:
import pytesseract
return pytesseract.image_to_string(Image.open(image_path))
except Exception as e:
return f"Error extracting text: {str(e)}"
@tool
def analyze_csv_file(file_path: str, query: str) -> str:
try:
import pandas as pd
df = pd.read_csv(file_path)
return f"Loaded CSV with shape {df.shape} and columns: {df.columns.tolist()}"
except Exception as e:
return f"CSV error: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
try:
import pandas as pd
df = pd.read_excel(file_path)
return f"Loaded Excel with shape {df.shape} and columns: {df.columns.tolist()}"
except Exception as e:
return f"Excel error: {str(e)}"
@tool
def youtube_transcribe(url: str) -> str:
model = whisper.load_model("small")
with tempfile.TemporaryDirectory() as tmpdir:
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
'quiet': True,
'noplaylist': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'force_ipv4': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
return model.transcribe(audio_path)['text']
@tool
def transcribe_audio(audio_file_path: str) -> str:
return whisper.load_model("small").transcribe(audio_file_path)['text']