|
|
|
|
|
import ast |
|
|
import json |
|
|
import os |
|
|
import base64 |
|
|
from time import sleep |
|
|
from uuid import uuid4 |
|
|
|
|
|
import boto3 |
|
|
import fitz |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
from googlesearch import search |
|
|
from pandas import read_excel |
|
|
from smolagents import tool, Tool |
|
|
from requests.exceptions import HTTPError |
|
|
from urllib3.exceptions import ReadTimeoutError |
|
|
|
|
|
from definitions import TranscriptionJob |
|
|
from utils import get_file, s3_upload_file, s3_download_file, bedrock_runtime, BEDROCK_MODEL_ID |
|
|
|
|
|
|
|
|
@tool |
|
|
def math_calculator(expression: str) -> str: |
|
|
"""A simple calculator tool that evaluates mathematical expressions. |
|
|
|
|
|
Args: |
|
|
expression (str): A mathematical expression as a string, e.g., "2 + 2 * 3". |
|
|
""" |
|
|
try: |
|
|
result = ast.literal_eval(expression) |
|
|
return str(result) |
|
|
except Exception as e: |
|
|
return f"Error evaluating expression: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def excel_reader(task_id: str, file_name: str) -> str: |
|
|
"""Reads an Excel file and returns its content as a dataframe string. |
|
|
|
|
|
Args: |
|
|
task_id (str): The ID of the task associated with the file. |
|
|
file_name (str): The name of the Excel file to read. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
df = read_excel(file_content, engine="openpyxl") |
|
|
return df.to_string(index=False) |
|
|
except Exception as e: |
|
|
return f"Error reading Excel file {file_name}: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def txt_reader(task_id: str, file_name: str) -> str: |
|
|
"""Reads a text file and returns its content as a string. |
|
|
|
|
|
Args: |
|
|
task_id (str): The ID of the task associated with the file. |
|
|
file_name (str): The name of the file to read. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
return file_content.read().decode("utf-8") |
|
|
except Exception as e: |
|
|
return f"Error reading file {file_name}: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def pdf_reader(task_id: str, file_name: str) -> str: |
|
|
"""Reads a PDF file and returns its content as a string. |
|
|
|
|
|
Args: |
|
|
task_id (str): The ID of the task associated with the file. |
|
|
file_name (str): The name of the PDF file to read. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
with fitz.open(stream=file_content.read(), filetype="pdf") as doc: |
|
|
content = [page.get_text() for page in doc if page.get_text()] |
|
|
text = "\n".join(content) |
|
|
if not text: |
|
|
return f"No text found in PDF file {file_name}." |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
return f"Error reading PDF file {file_name}: {e}" |
|
|
|
|
|
|
|
|
class AudioTranscriber(Tool): |
|
|
name = "AudioTranscriber" |
|
|
description = "Extract text from audio files, such as MP3, MP4, WAV, etc." |
|
|
inputs = { |
|
|
"task_id": { |
|
|
"type": "string", |
|
|
"description": "The ID of the task associated with the audio file.", |
|
|
}, |
|
|
"file_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the audio file to transcribe.", |
|
|
}, |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
super().__init__(*args, **kwargs) |
|
|
region = os.getenv("AWS_REGION", "us-east-1") |
|
|
self.client = boto3.client("transcribe", region_name=region) |
|
|
|
|
|
def _transcribe_audio(self, job_name: str, media_uri: str) -> dict: |
|
|
self.client.start_transcription_job( |
|
|
TranscriptionJobName=job_name, |
|
|
Media={"MediaFileUri": media_uri}, |
|
|
IdentifyLanguage=True, |
|
|
OutputBucketName=os.getenv("TARGET_BUCKET"), |
|
|
OutputKey=f"{job_name}.json", |
|
|
) |
|
|
|
|
|
def _get_transcription(self, job_name: str) -> str: |
|
|
while True: |
|
|
response: TranscriptionJob = self.client.get_transcription_job(TranscriptionJobName=job_name) |
|
|
status = response["TranscriptionJob"]["TranscriptionJobStatus"] |
|
|
if status in ["COMPLETED", "FAILED"]: |
|
|
break |
|
|
sleep(5) |
|
|
|
|
|
transcript_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"] |
|
|
try: |
|
|
bytes_result = s3_download_file(os.getenv("TARGET_BUCKET"), transcript_url.split("/")[-1]) |
|
|
transcription_data = json.loads(bytes_result.read().decode("utf-8")) |
|
|
return transcription_data["transcripts"][0]["transcript"] |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"Error decoding transcription JSON: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"Error downloading or processing transcription file: {e}") |
|
|
raise |
|
|
|
|
|
def forward(self, task_id: str, file_name: str) -> str: |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
s3_upload_file(file_content, os.getenv("SOURCE_BUCKET"), file_name) |
|
|
|
|
|
media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
|
|
job_name = f"{uuid4()}-{file_name.split('.')[0]}" |
|
|
self._transcribe_audio(job_name, media_uri) |
|
|
transcription = self._get_transcription(job_name) |
|
|
return transcription |
|
|
except Exception as e: |
|
|
return f"Error starting transcription job for {file_name}: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def image_transcriber(text_prompt: str, task_id: str, file_name: str) -> str: |
|
|
"""Transcribes text from an image file |
|
|
|
|
|
Args: |
|
|
text_prompt (str): The text prompt to guide the transcription. |
|
|
task_id (str): The ID of the task associated with the image file. |
|
|
file_name (str): The name of the image file to transcribe. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
base64_image = base64.b64encode(file_content.getvalue()).decode("utf-8") |
|
|
response = bedrock_runtime.invoke_model( |
|
|
modelId=BEDROCK_MODEL_ID, |
|
|
body=json.dumps( |
|
|
{ |
|
|
"anthropic_version": "bedrock-2023-05-31", |
|
|
"max_tokens": 4096, |
|
|
"messages": [ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "image", |
|
|
"source": { |
|
|
"type": "base64", |
|
|
"media_type": f"image/{file_name.split('.')[-1]}", |
|
|
"data": base64_image, |
|
|
}, |
|
|
}, |
|
|
{"type": "text", "text": text_prompt}, |
|
|
], |
|
|
} |
|
|
], |
|
|
} |
|
|
), |
|
|
)["body"].read() |
|
|
return json.loads(response)["content"][0]["text"] |
|
|
except Exception as e: |
|
|
return f"Error processing image file {file_name}: {e}" |
|
|
|
|
|
|
|
|
def _get_content(url: str, timeout: int = 5) -> bytes: |
|
|
resp = requests.get(url=url, timeout=timeout) |
|
|
resp.raise_for_status() |
|
|
return resp.content |
|
|
|
|
|
|
|
|
def _js_disable_message(text: str) -> bool: |
|
|
return "JavaScript is disabled in this browser" in text |
|
|
|
|
|
|
|
|
@tool |
|
|
def search_engine(search_term: str) -> str: |
|
|
"""Search for the provided search term in Google Search |
|
|
|
|
|
Args: |
|
|
search_term (str): The term to search for on the web. |
|
|
""" |
|
|
results = search(search_term, num_results=5) |
|
|
for idx, url in enumerate(results, 1): |
|
|
error_ocurred = False |
|
|
try: |
|
|
html_content = BeautifulSoup(_get_content(url), "html.parser") |
|
|
|
|
|
for tag in html_content.find_all(["header", "footer", "nav", "aside"]): |
|
|
tag.decompose() |
|
|
except (ReadTimeoutError, HTTPError) as ex: |
|
|
print("Got HTTP error when requesting %s. Error %s", url, ex) |
|
|
error_ocurred = True |
|
|
|
|
|
html_text = html_content.text |
|
|
if _js_disable_message(html_text): |
|
|
error_ocurred = True |
|
|
|
|
|
if error_ocurred: |
|
|
|
|
|
if len(results) == idx: |
|
|
return "Sorry, got an HTTP error when requesting the internet" |
|
|
|
|
|
continue |
|
|
|
|
|
return html_text.replace("\n", "") |
|
|
|
|
|
return "Could not retrieve any content from the search results." |
|
|
|