File size: 8,409 Bytes
e47481b
 
 
 
98e87f9
e47481b
 
 
 
 
df6c855
 
 
e47481b
 
df6c855
 
e47481b
 
98e87f9
e47481b
 
 
98e87f9
e47481b
 
 
98e87f9
e47481b
 
98e87f9
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
e47481b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98e87f9
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
98e87f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df6c855
 
98e87f9
 
df6c855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# pylint: disable=W0718
import ast
import json
import os
import base64
from time import sleep
from uuid import uuid4

import boto3
import fitz
import requests
from bs4 import BeautifulSoup
from googlesearch import search
from pandas import read_excel
from smolagents import tool, Tool
from requests.exceptions import HTTPError
from urllib3.exceptions import ReadTimeoutError

from definitions import TranscriptionJob
from utils import get_file, s3_upload_file, s3_download_file, bedrock_runtime, BEDROCK_MODEL_ID


@tool
def math_calculator(expression: str) -> str:
    """A simple calculator tool that evaluates mathematical expressions.

    Args:
        expression (str): A mathematical expression as a string, e.g., "2 + 2 * 3".
    """
    try:
        result = ast.literal_eval(expression)
        return str(result)
    except Exception as e:
        return f"Error evaluating expression: {e}"


@tool
def excel_reader(task_id: str, file_name: str) -> str:
    """Reads an Excel file and returns its content as a dataframe string.

    Args:
        task_id (str): The ID of the task associated with the file.
        file_name (str): The name of the Excel file to read.
    """
    try:
        file_content = get_file(task_id)
        df = read_excel(file_content, engine="openpyxl")
        return df.to_string(index=False)
    except Exception as e:
        return f"Error reading Excel file {file_name}: {e}"


@tool
def txt_reader(task_id: str, file_name: str) -> str:
    """Reads a text file and returns its content as a string.

    Args:
        task_id (str): The ID of the task associated with the file.
        file_name (str): The name of the file to read.
    """
    try:
        file_content = get_file(task_id)
        return file_content.read().decode("utf-8")
    except Exception as e:
        return f"Error reading file {file_name}: {e}"


@tool
def pdf_reader(task_id: str, file_name: str) -> str:
    """Reads a PDF file and returns its content as a string.

    Args:
        task_id (str): The ID of the task associated with the file.
        file_name (str): The name of the PDF file to read.
    """
    try:
        file_content = get_file(task_id)
        with fitz.open(stream=file_content.read(), filetype="pdf") as doc:
            content = [page.get_text() for page in doc if page.get_text()]
            text = "\n".join(content)
            if not text:
                return f"No text found in PDF file {file_name}."
            return text.strip()
    except Exception as e:
        return f"Error reading PDF file {file_name}: {e}"


class AudioTranscriber(Tool):  # pylint: disable=C0115
    name = "AudioTranscriber"
    description = "Extract text from audio files, such as MP3, MP4, WAV, etc."
    inputs = {
        "task_id": {
            "type": "string",
            "description": "The ID of the task associated with the audio file.",
        },
        "file_name": {
            "type": "string",
            "description": "The name of the audio file to transcribe.",
        },
    }
    output_type = "string"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        region = os.getenv("AWS_REGION", "us-east-1")
        self.client = boto3.client("transcribe", region_name=region)

    def _transcribe_audio(self, job_name: str, media_uri: str) -> dict:
        self.client.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={"MediaFileUri": media_uri},
            IdentifyLanguage=True,
            OutputBucketName=os.getenv("TARGET_BUCKET"),
            OutputKey=f"{job_name}.json",
        )

    def _get_transcription(self, job_name: str) -> str:
        while True:
            response: TranscriptionJob = self.client.get_transcription_job(TranscriptionJobName=job_name)
            status = response["TranscriptionJob"]["TranscriptionJobStatus"]
            if status in ["COMPLETED", "FAILED"]:
                break
            sleep(5)

        transcript_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
        try:
            bytes_result = s3_download_file(os.getenv("TARGET_BUCKET"), transcript_url.split("/")[-1])
            transcription_data = json.loads(bytes_result.read().decode("utf-8"))
            return transcription_data["transcripts"][0]["transcript"]
        except json.JSONDecodeError as e:
            print(f"Error decoding transcription JSON: {e}")
            raise
        except Exception as e:
            print(f"Error downloading or processing transcription file: {e}")
            raise

    def forward(self, task_id: str, file_name: str) -> str:  # pylint: disable=W0221
        try:
            file_content = get_file(task_id)
            s3_upload_file(file_content, os.getenv("SOURCE_BUCKET"), file_name)

            media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}"
            job_name = f"{uuid4()}-{file_name.split('.')[0]}"
            self._transcribe_audio(job_name, media_uri)
            transcription = self._get_transcription(job_name)
            return transcription
        except Exception as e:
            return f"Error starting transcription job for {file_name}: {e}"


@tool
def image_transcriber(text_prompt: str, task_id: str, file_name: str) -> str:
    """Transcribes text from an image file

    Args:
        text_prompt (str): The text prompt to guide the transcription.
        task_id (str): The ID of the task associated with the image file.
        file_name (str): The name of the image file to transcribe.
    """
    try:
        file_content = get_file(task_id)
        base64_image = base64.b64encode(file_content.getvalue()).decode("utf-8")
        response = bedrock_runtime.invoke_model(
            modelId=BEDROCK_MODEL_ID,
            body=json.dumps(
                {
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 4096,
                    "messages": [
                        {
                            "role": "user",
                            "content": [
                                {
                                    "type": "image",
                                    "source": {
                                        "type": "base64",
                                        "media_type": f"image/{file_name.split('.')[-1]}",
                                        "data": base64_image,
                                    },
                                },
                                {"type": "text", "text": text_prompt},
                            ],
                        }
                    ],
                }
            ),
        )["body"].read()
        return json.loads(response)["content"][0]["text"]
    except Exception as e:
        return f"Error processing image file {file_name}: {e}"


def _get_content(url: str, timeout: int = 5) -> bytes:
    resp = requests.get(url=url, timeout=timeout)
    resp.raise_for_status()
    return resp.content


def _js_disable_message(text: str) -> bool:
    return "JavaScript is disabled in this browser" in text


@tool
def search_engine(search_term: str) -> str:
    """Search for the provided search term in Google Search

    Args:
        search_term (str): The term to search for on the web.
    """
    results = search(search_term, num_results=5)
    for idx, url in enumerate(results, 1):
        error_ocurred = False
        try:
            html_content = BeautifulSoup(_get_content(url), "html.parser")
            # Remove headers and footers
            for tag in html_content.find_all(["header", "footer", "nav", "aside"]):
                tag.decompose()
        except (ReadTimeoutError, HTTPError) as ex:
            print("Got HTTP error when requesting %s. Error %s", url, ex)
            error_ocurred = True

        html_text = html_content.text
        if _js_disable_message(html_text):
            error_ocurred = True

        if error_ocurred:
            # if the last URL is not successfully requested, return a direct response as if it was the final answer
            if len(results) == idx:
                return "Sorry, got an HTTP error when requesting the internet"
            # if there are more URLs to request, continue
            continue

        return html_text.replace("\n", "")

    return "Could not retrieve any content from the search results."