File size: 4,114 Bytes
ec60378
 
 
 
 
 
 
 
 
9b4ce5f
 
 
 
 
 
 
 
ec60378
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b4ce5f
ec60378
 
 
9b4ce5f
ec60378
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b4ce5f
 
ec60378
254b458
ec60378
 
 
9b4ce5f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import tempfile
import requests
import os
from urllib.parse import urlparse
from typing import Optional, List
import yt_dlp
import imageio
from PIL import Image
import whisper
from dotenv import load_dotenv

# Fallback tool decorator if gaia_benchmark.tools is not available
try:
    from gaia_benchmark.tools import tool
except ImportError:
    def tool(func):
        return func

load_dotenv()

@tool
def use_vision_model(question: str, images: List[Image.Image]) -> str:
    return "Vision model is not available for Mistral. Please integrate a separate endpoint for image analysis."

@tool
def review_youtube_video(url: str, question: str) -> str:
    return "This tool is currently unsupported with Mistral. Please remove or replace."

@tool
def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]:
    with tempfile.TemporaryDirectory() as tmpdir:
        ydl_opts = {
            'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
            'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'merge_output_format': 'mp4',
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])

        video_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.mp4')), None)
        reader = imageio.get_reader(video_path)
        fps = reader.get_meta_data().get('fps', 25)
        frame_interval = int(fps * sample_interval_seconds)
        images = [Image.fromarray(frame) for idx, frame in enumerate(reader) if idx % frame_interval == 0]
        reader.close()
        return images

@tool
def read_file(filepath: str) -> str:
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            return file.read()
    except Exception as e:
        return f"Error reading file: {str(e)}"

@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
    try:
        if not filename:
            filename = os.path.basename(urlparse(url).path) or f"download_{os.urandom(4).hex()}"
        filepath = os.path.join(tempfile.gettempdir(), filename)
        response = requests.get(url)
        with open(filepath, 'wb') as f:
            f.write(response.content)
        return filepath
    except Exception as e:
        return f"Error downloading file: {str(e)}"

@tool
def extract_text_from_image(image_path: str) -> str:
    try:
        import pytesseract
        return pytesseract.image_to_string(Image.open(image_path))
    except Exception as e:
        return f"Error extracting text: {str(e)}"

@tool
def analyze_csv_file(file_path: str, query: str) -> str:
    try:
        import pandas as pd
        df = pd.read_csv(file_path)
        return f"Loaded CSV with shape {df.shape} and columns: {df.columns.tolist()}"
    except Exception as e:
        return f"CSV error: {str(e)}"

@tool
def analyze_excel_file(file_path: str, query: str) -> str:
    try:
        import pandas as pd
        df = pd.read_excel(file_path)
        return f"Loaded Excel with shape {df.shape} and columns: {df.columns.tolist()}"
    except Exception as e:
        return f"Excel error: {str(e)}"

@tool
def youtube_transcribe(url: str) -> str:
    model = whisper.load_model("small")
    with tempfile.TemporaryDirectory() as tmpdir:
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'wav',
            }],
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])

        audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
        return model.transcribe(audio_path)['text']

@tool
def transcribe_audio(audio_file_path: str) -> str:
    return whisper.load_model("small").transcribe(audio_file_path)['text']