File size: 13,322 Bytes
e47481b
 
 
 
98e87f9
aa94df3
741f470
e47481b
aa94df3
e47481b
 
 
aa94df3
e47481b
df6c855
 
 
e47481b
741f470
 
e47481b
df6c855
 
e47481b
 
aa94df3
e47481b
 
 
98e87f9
e47481b
 
 
98e87f9
e47481b
 
98e87f9
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
741f470
 
e47481b
741f470
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
741f470
e47481b
 
 
 
 
 
 
741f470
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e47481b
 
 
 
 
 
 
 
 
 
 
 
98e87f9
 
 
aa94df3
 
98e87f9
 
aa94df3
98e87f9
 
 
 
 
df6c855
aa94df3
 
98e87f9
aa94df3
 
98e87f9
aa94df3
 
 
 
98e87f9
 
aa94df3
 
 
98e87f9
 
df6c855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
741f470
 
 
df6c855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
741f470
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa94df3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba8c96
aa94df3
 
 
 
 
 
 
 
 
 
fba8c96
 
aa94df3
 
fba8c96
 
 
aa94df3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# pylint: disable=W0718
import ast
import json
import os
import base64
import tempfile
from io import BytesIO
from time import sleep
from typing import Generator
from uuid import uuid4

import boto3
import cv2  # type: ignore
import fitz
import requests
from bs4 import BeautifulSoup
from googlesearch import search
from pandas import read_excel
from pytubefix import YouTube
from pytubefix.cli import on_progress
from smolagents import tool, Tool
from requests.exceptions import HTTPError
from urllib3.exceptions import ReadTimeoutError

from definitions import TranscriptionJob
from utils import get_file, s3_upload_file, s3_download_file, invoke_bedrock_model, invoke_openai_model


@tool
def math_calculator(expression: str) -> str:
    """A simple calculator tool that evaluates mathematical expressions.

    Args:
        expression (str): A mathematical expression as a string, e.g., "2 + 2 * 3".
    """
    try:
        result = ast.literal_eval(expression)
        return str(result)
    except Exception as e:
        return f"Error evaluating expression: {e}"


@tool
def excel_reader(task_id: str, file_name: str) -> str:
    """Reads an Excel file and returns its content as a dataframe string.

    Args:
        task_id (str): The ID of the task associated with the file.
        file_name (str): The name of the Excel file to read.
    """
    try:
        file_content = get_file(task_id)
        df = read_excel(file_content, engine="openpyxl")
        return df.to_string(index=False)
    except Exception as e:
        return f"Error reading Excel file {file_name}: {e}"


@tool
def txt_reader(task_id: str, file_name: str) -> str:
    """Reads a text file and returns its content as a string.

    Args:
        task_id (str): The ID of the task associated with the file.
        file_name (str): The name of the file to read.
    """
    try:
        file_content = get_file(task_id)
        return file_content.read().decode("utf-8")
    except Exception as e:
        return f"Error reading file {file_name}: {e}"


@tool
def pdf_reader(task_id: str, file_name: str) -> str:
    """Reads a PDF file and returns its content as a string.

    Args:
        task_id (str): The ID of the task associated with the file.
        file_name (str): The name of the PDF file to read.
    """
    try:
        file_content = get_file(task_id)
        with fitz.open(stream=file_content.read(), filetype="pdf") as doc:
            content = [page.get_text() for page in doc if page.get_text()]
            text = "\n".join(content)
            if not text:
                return f"No text found in PDF file {file_name}."
            return text.strip()
    except Exception as e:
        return f"Error reading PDF file {file_name}: {e}"


class AudioTranscriber:
    """A class to handle audio transcription using AWS Transcribe."""

    def __init__(self):
        region = os.getenv("AWS_REGION", "us-east-1")
        self.client = boto3.client("transcribe", region_name=region)

    def _transcribe_audio(self, job_name: str, media_uri: str) -> dict:
        self.client.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={"MediaFileUri": media_uri},
            IdentifyLanguage=True,
            OutputBucketName=os.getenv("TARGET_BUCKET"),
            OutputKey=f"{job_name}.json",
        )

    def _get_transcription(self, job_name: str) -> str:
        while True:
            response: TranscriptionJob = self.client.get_transcription_job(TranscriptionJobName=job_name)
            status = response["TranscriptionJob"]["TranscriptionJobStatus"]
            if status in ["COMPLETED", "FAILED"]:
                break
            sleep(5)

        transcript_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
        try:
            bytes_result = s3_download_file(os.getenv("TARGET_BUCKET"), transcript_url.split("/")[-1])
            transcription_data = json.loads(bytes_result.read().decode("utf-8"))
            return transcription_data["results"]["transcripts"][0]["transcript"]
        except json.JSONDecodeError as e:
            print(f"Error decoding transcription JSON: {e}")
            raise
        except Exception as e:
            print(f"Error downloading or processing transcription file: {e}")
            raise


class AudioTranscriberTool(Tool, AudioTranscriber):  # pylint: disable=C0115
    name = "AudioTranscriber"
    description = "Extract text from audio files, such as MP3, MP4, WAV, etc."
    inputs = {
        "task_id": {
            "type": "string",
            "description": "The ID of the task associated with the audio file.",
        },
        "file_name": {
            "type": "string",
            "description": "The name of the audio file to transcribe.",
        },
    }
    output_type = "string"

    def __init__(self, *args, **kwargs):
        Tool.__init__(self, *args, **kwargs)
        AudioTranscriber.__init__(self, *args, **kwargs)

    def forward(self, task_id: str, file_name: str) -> str:  # pylint: disable=W0221
        try:
            file_content = get_file(task_id)
            s3_upload_file(file_content, os.getenv("SOURCE_BUCKET"), file_name)

            media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}"
            job_name = f"{uuid4()}-{file_name.split('.')[0]}"
            self._transcribe_audio(job_name, media_uri)
            transcription = self._get_transcription(job_name)
            return transcription
        except Exception as e:
            return f"Error starting transcription job for {file_name}: {e}"


@tool
def image_analyzer(task: str, task_id: str, file_name: str) -> str:
    """Analyzes an image file and returns a response based on the task provided.

    Args:
        task (str): The description of the information to extract from the image.
        task_id (str): The ID of the task associated with the image file.
        file_name (str): The name of the image file to transcribe.
    """
    try:
        file_content = get_file(task_id)
        base64_image = base64.b64encode(file_content.getvalue()).decode("utf-8")
        response = invoke_openai_model(
            [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "input_image",
                            "image_url": f"data:image/{file_name.split('.')[-1]};base64,{base64_image}",
                        },
                        {"type": "input_text", "text": task},
                    ],
                }
            ]
        )
        return response
    except Exception as e:
        return f"Error processing image file {file_name}: {e}"


def _get_content(url: str, timeout: int = 5) -> bytes:
    resp = requests.get(url=url, timeout=timeout)
    resp.raise_for_status()
    return resp.content


def _js_disable_message(text: str) -> bool:
    return "JavaScript is disabled in this browser" in text


@tool
def search_engine(search_term: str) -> str:
    """Search for the provided search term in Google Search

    Args:
        search_term (str): The term to search for on the web.
    """
    results = search(search_term, num_results=5)
    for idx, url in enumerate(results, 1):
        error_ocurred = False
        try:
            html_content = BeautifulSoup(_get_content(url), "html.parser")
            # Remove headers and footers
            for tag in html_content.find_all(
                ["header", "footer", "nav", "aside", "script", "style", "noscript", "form"]
            ):
                tag.decompose()
        except (ReadTimeoutError, HTTPError) as ex:
            print("Got HTTP error when requesting %s. Error %s", url, ex)
            error_ocurred = True

        html_text = html_content.text
        if _js_disable_message(html_text):
            error_ocurred = True

        if error_ocurred:
            # if the last URL is not successfully requested, return a direct response as if it was the final answer
            if len(results) == idx:
                return "Sorry, got an HTTP error when requesting the internet"
            # if there are more URLs to request, continue
            continue

        return html_text.replace("\n", "")

    return "Could not retrieve any content from the search results."


class YoutubeTranscriberTool(Tool, AudioTranscriber):  # pylint: disable=C0115
    name = "YoutubeTranscriber"
    description = "Extract text from YouTube videos, do not work for video understanding."
    inputs = {
        "youtube_url": {
            "type": "string",
            "description": "The URL of the YouTube video to transcribe.",
        },
    }
    output_type = "string"

    def __init__(self, *args, **kwargs):
        Tool.__init__(self, *args, **kwargs)
        AudioTranscriber.__init__(self, *args, **kwargs)

    def forward(self, youtube_url: str) -> str:  # pylint: disable=W0221
        file_name = f"{uuid4()}.mp4"
        buffer = BytesIO()
        try:
            youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress)
            youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer)
        except Exception as e:
            return f"Error fetching YouTube video {youtube_url}: {e}"
        try:
            s3_upload_file(buffer, os.getenv("SOURCE_BUCKET"), file_name)
            media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}"
            job_name = f"{uuid4()}-{file_name.split('.', maxsplit=1)[0]}"
            self._transcribe_audio(job_name, media_uri)
            transcription = self._get_transcription(job_name)
            return transcription
        except Exception as e:
            return f"Error starting transcription job for {file_name}: {e}"


class YoutubeVideoDescriptorTool(Tool):  # pylint: disable=C0115
    name = "YoutubeVideoDescriptor"
    description = (
        "Describe a youtube video based on the video. Use this tool for tasks like video understanding,"
        "not for audio transcription. Example: 'What is in the video?'"
    )
    inputs = {
        "youtube_url": {
            "type": "string",
            "description": "The URL of the YouTube video to get the description from.",
        },
        "task": {
            "type": "string",
            "description": "The task to perform on the video, e.g., 'Describe the video content'.",
        },
    }
    output_type = "string"

    # pylint: disable=E1101
    def _base64_frames(self, video_buffer: BytesIO, target_fps: int = 10) -> Generator[list[str], None, None]:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as input_temp:
            input_temp.write(video_buffer.getvalue())
            input_temp_path = input_temp.name

        cap = cv2.VideoCapture(input_temp_path)
        orig_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_interval = int(round(orig_fps / target_fps))

        frames = []
        i = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            if i % frame_interval == 0:
                frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

            i += 1

        cap.release()

        base64_frames = []
        for frame in frames:
            _, buffer = cv2.imencode(".jpg", frame)
            encoded_buffer = base64.b64encode(buffer).decode("utf-8")
            base64_frames.append(encoded_buffer)
            if len(base64_frames) == 20:
                # yield every 20 frames
                yield base64_frames
                base64_frames = []
        # yield any remaining frames
        if len(frames) < 20:
            yield base64_frames

    def forward(self, task: str, youtube_url: str) -> str:  # pylint: disable=W0221
        file_name = f"{uuid4()}.mp4"
        buffer = BytesIO()
        try:
            youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress)
            youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer)
        except Exception as e:
            return f"Error fetching YouTube video {youtube_url}: {e}"
        try:
            vision_messages = []
            responses = []
            for base64_frame_chunk in self._base64_frames(buffer, target_fps=1):
                vision_messages = [
                    {"type": "input_image", "image_url": f"data:image/jpeg;base64,{base64_frame}"}
                    for base64_frame in base64_frame_chunk
                ]
                response = invoke_openai_model(
                    [{"role": "user", "content": [*vision_messages, {"type": "input_text", "text": task}]}]
                )
                responses.append(response)
            response = "\n".join(responses)
            final_response = invoke_bedrock_model(
                [
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": response},
                            {"type": "text", "text": "Please summarize the above text shortly."},
                        ],
                    }
                ]
            )
            return final_response
        except Exception as e:
            return f"Error starting transcription job for {file_name}: {e}"