File size: 11,811 Bytes
514151b
 
 
 
 
 
 
 
 
 
53488e2
514151b
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
 
514151b
c242d6e
 
88583d9
c242d6e
 
 
514151b
 
 
 
 
 
 
 
 
 
 
c242d6e
514151b
 
 
 
 
 
 
 
 
4134e11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
514151b
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
514151b
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
 
514151b
 
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53488e2
 
514151b
 
 
53488e2
514151b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4134e11
53488e2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
import tempfile
import requests
import os
from time import sleep
from urllib.parse import urlparse
from typing import Optional, List
import yt_dlp
import imageio
from google.genai import types
from PIL import Image
from smolagents import tool, LiteLLMModel
from google import genai
from dotenv import load_dotenv
import whisper

load_dotenv()

@tool
def use_vision_model(question: str, images: List[Image.Image]) -> str:
    """
    Use a Vision Model to answer a question about a set of images.  
    Always use this tool to ask questions about a set of images you have been provided.
    This function uses an image-to-text AI model.  
    You can ask a question about a list of one image or a list of multiple images.  
    So, if you have multiple images that you want to ask the same question of, pass the entire list of images to the model.
    Ensure your prompt is specific enough to retrieve the exact information you are looking for.
    
    Args:
        question: The question to ask about the images.  Type: str
        images: The list of images to as the question about.  Type: List[PIL.Image.Image]
    """
    image_model_name = "gemini/gemini-1.5-flash"
    
    print(f'Leveraging model {image_model_name}')
    
    image_model = LiteLLMModel(
        model_id=image_model_name,
        api_key=os.getenv("GEMINI_KEY"),
        temperature=0.2
    )

    content = [
        {
            "type": "text",
            "text": question
        }
    ]
    print(f"Asking model a question about {len(images)} images")
    for image in images:
        content.append({
            "type": "image",
            "image": image
        })

    messages = [
        {
            "role": "user",
            "content": content
        }
    ]

    # Get the response and properly extract the content as a string
    response = image_model(messages)
    
    # Handle different response formats
    if hasattr(response, 'content'):
        output = response.content
        # If content is a list, extract text from it
        if isinstance(output, list):
            text_parts = []
            for item in output:
                if isinstance(item, dict) and 'text' in item:
                    text_parts.append(item['text'])
                elif isinstance(item, str):
                    text_parts.append(item)
            output = ' '.join(text_parts) if text_parts else str(output)
        elif not isinstance(output, str):
            output = str(output)
    else:
        output = str(response)
    
    print(f'Model returned: {output}')
    return output

@tool
def review_youtube_video(url: str, question: str) -> str:
    """
    Reviews a YouTube video and answers a specific question about that video.  
    Args:
        url (str): the URL to the YouTube video.  Should be like this format: https://www.youtube.com/watch?v=9hE5-98ZeCg
        question (str): The question you are asking about the video
    """
    try:
        client = genai.Client(api_key=os.getenv('GEMINI_KEY'))
        model = 'gemini-2.0-flash-lite'
        response = client.models.generate_content(
            model=model,
            contents=types.Content(
                parts=[
                    types.Part(
                        file_data=types.FileData(file_uri=url)
                    ),
                    types.Part(text=question)
                ]
            )
        )
        return response.text
    except Exception as e:
        return f"Error asking {model} about video: {str(e)}"

@tool
def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]:
    """
    Reviews a YouTube video and returns a List of PIL Images (List[PIL.Image.Image]), which can then be reviewed by a vision model.
    Only use this tool if you have been given a YouTube video that you need to analyze.
    This will generate a list of images, and you can use the use_vision_model tool to analyze those images
    Args:
        url: The Youtube URL
        sample_interval_seconds: The sampling interval (default is 5 seconds)
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        ydl_opts = {
            'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
            'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'merge_output_format': 'mp4',
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
        
        video_path = None
        for file in os.listdir(tmpdir):
            if file.endswith('.mp4'):
                video_path = os.path.join(tmpdir, file)
                break
        
        if not video_path:
            raise RuntimeError("Failed to download video as mp4")

        reader = imageio.get_reader(video_path)
        metadata = reader.get_meta_data()
        fps = metadata.get('fps')
        
        if fps is None:
            reader.close()
            raise RuntimeError("Unable to determine FPS from video metadata")

        frame_interval = int(fps * sample_interval_seconds)
        images: List[Image.Image] = []

        for idx, frame in enumerate(reader):
            if idx % frame_interval == 0:
                images.append(Image.fromarray(frame))

        reader.close()
        return images

@tool
def read_file(filepath: str) -> str:
    """
    Used to read the content of a file.  Returns the content as a string.
    Will only work for text-based files, such as .txt files or code files.
    Do not use for audio or visual files. 
    
    Args:
        filepath (str): The path to the file to be read.
    Returns:
        str: Content of the file as a string.
    """
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            content = file.read()
        print(content)
        return content
    except FileNotFoundError:
        return f"File not found: {filepath}"
    except IOError as e:
        return f"Error reading file: {str(e)}"

@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
    """
    Download a file from a URL and save it to a temporary location.  
    Use this tool when you are asked a question and told that there is a file or image provided.
    
    Args:
        url: The URL to download from
        filename: Optional filename, will generate one based on URL if not provided
        
    Returns:
        Path to the downloaded file
    """
    try:
        print(f"Downloading file from {url}")
        if not filename:
            path = urlparse(url).path
            filename = os.path.basename(path)
            if not filename:
                import uuid
                filename = f"downloaded_{uuid.uuid4().hex[:8]}"
        
        temp_dir = tempfile.gettempdir()
        filepath = os.path.join(temp_dir, filename)
        
        response = requests.get(url, stream=True)
        response.raise_for_status()
        
        with open(filepath, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        return f"File downloaded to {filepath}. You can now process this file."
    except Exception as e:
        return f"Error downloading file: {str(e)}"

@tool
def extract_text_from_image(image_path: str) -> str:
    """
    Extract text from an image using pytesseract (if available).
    
    Args:
        image_path: Path to the image file
        
    Returns:
        Extracted text or error message
    """
    try:
        import pytesseract
        from PIL import Image
        
        image = Image.open(image_path)
        text = pytesseract.image_to_string(image)
        
        return f"Extracted text from image:\n\n{text}"
    except ImportError:
        return "Error: pytesseract is not installed."
    except Exception as e:
        return f"Error extracting text from image: {str(e)}"

@tool
def analyze_csv_file(file_path: str, query: str) -> str:
    """
    Analyze a CSV file using pandas and answer a question about it.  
    To use this file you need to have saved it in a location and pass that location to the function.
    The download_file_from_url tool will save it by name to tempfile.gettempdir()
    
    Args:
        file_path: Path to the CSV file
        query: Question about the data
        
    Returns:
        Analysis result or error message
    """
    try:
        import pandas as pd
        
        df = pd.read_csv(file_path)
        
        result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
        result += f"Columns: {', '.join(df.columns)}\n\n"
        result += "Summary statistics:\n"
        result += str(df.describe())
        result += "\n\nFirst few rows:\n"
        result += str(df.head())
        
        return result
    except ImportError:
        return "Error: pandas is not installed."
    except Exception as e:
        return f"Error analyzing CSV file: {str(e)}"

@tool
def analyze_excel_file(file_path: str, query: str) -> str:
    """
    Analyze an Excel file using pandas and answer a question about it.
    To use this file you need to have saved it in a location and pass that location to the function.
    The download_file_from_url tool will save it by name to tempfile.gettempdir()
    
    Args:
        file_path: Path to the Excel file
        query: Question about the data
        
    Returns:
        Analysis result or error message
    """
    try:
        import pandas as pd
        
        df = pd.read_excel(file_path)
        
        result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
        result += f"Columns: {', '.join(df.columns)}\n\n"
        result += "Summary statistics:\n"
        result += str(df.describe())
        result += "\n\nFirst few rows:\n"
        result += str(df.head())
        
        return result
    except ImportError:
        return "Error: pandas and openpyxl are not installed."
    except Exception as e:
        return f"Error analyzing Excel file: {str(e)}"

@tool
def youtube_transcribe(url: str) -> str:
    """
    Transcribes a YouTube video.  Use when you need to process the audio from a YouTube video into Text.
    Args:
        url: Url of the YouTube video
    """
    model_size: str = "small"
    model = whisper.load_model(model_size)
    with tempfile.TemporaryDirectory() as tmpdir:
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'wav',
                'preferredquality': '192',
            }],
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)

        audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
        if not audio_path:
            raise RuntimeError("Failed to find audio")

        result = model.transcribe(audio_path)
        return result['text']

@tool
def transcribe_audio(audio_file_path: str) -> str:
    """
    Transcribes an audio file.  Use when you need to process audio data.
    DO NOT use this tool for YouTube video; use the youtube_transcribe tool to process audio data from YouTube.
    Use this tool when you have an audio file in .mp3, .wav, .aac, .ogg, .flac, .m4a, .alac or .wma
    Args:
        audio_file_path: Filepath to the audio file (str)
    """
    model_size: str = "small"
    model = whisper.load_model(model_size)
    result = model.transcribe(audio_path)
    return result['text']