|
|
|
|
|
import ast |
|
|
import json |
|
|
import os |
|
|
import base64 |
|
|
import tempfile |
|
|
from io import BytesIO |
|
|
from time import sleep |
|
|
from typing import Generator |
|
|
from uuid import uuid4 |
|
|
|
|
|
import boto3 |
|
|
import cv2 |
|
|
import fitz |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
from googlesearch import search |
|
|
from pandas import read_excel |
|
|
from pytubefix import YouTube |
|
|
from pytubefix.cli import on_progress |
|
|
from smolagents import tool, Tool |
|
|
from requests.exceptions import HTTPError |
|
|
from urllib3.exceptions import ReadTimeoutError |
|
|
|
|
|
from definitions import TranscriptionJob |
|
|
from utils import get_file, s3_upload_file, s3_download_file, invoke_bedrock_model, invoke_openai_model |
|
|
|
|
|
|
|
|
@tool |
|
|
def math_calculator(expression: str) -> str: |
|
|
"""A simple calculator tool that evaluates mathematical expressions. |
|
|
|
|
|
Args: |
|
|
expression (str): A mathematical expression as a string, e.g., "2 + 2 * 3". |
|
|
""" |
|
|
try: |
|
|
result = ast.literal_eval(expression) |
|
|
return str(result) |
|
|
except Exception as e: |
|
|
return f"Error evaluating expression: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def excel_reader(task_id: str, file_name: str) -> str: |
|
|
"""Reads an Excel file and returns its content as a dataframe string. |
|
|
|
|
|
Args: |
|
|
task_id (str): The ID of the task associated with the file. |
|
|
file_name (str): The name of the Excel file to read. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
df = read_excel(file_content, engine="openpyxl") |
|
|
return df.to_string(index=False) |
|
|
except Exception as e: |
|
|
return f"Error reading Excel file {file_name}: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def txt_reader(task_id: str, file_name: str) -> str: |
|
|
"""Reads a text file and returns its content as a string. |
|
|
|
|
|
Args: |
|
|
task_id (str): The ID of the task associated with the file. |
|
|
file_name (str): The name of the file to read. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
return file_content.read().decode("utf-8") |
|
|
except Exception as e: |
|
|
return f"Error reading file {file_name}: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def pdf_reader(task_id: str, file_name: str) -> str: |
|
|
"""Reads a PDF file and returns its content as a string. |
|
|
|
|
|
Args: |
|
|
task_id (str): The ID of the task associated with the file. |
|
|
file_name (str): The name of the PDF file to read. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
with fitz.open(stream=file_content.read(), filetype="pdf") as doc: |
|
|
content = [page.get_text() for page in doc if page.get_text()] |
|
|
text = "\n".join(content) |
|
|
if not text: |
|
|
return f"No text found in PDF file {file_name}." |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
return f"Error reading PDF file {file_name}: {e}" |
|
|
|
|
|
|
|
|
class AudioTranscriber: |
|
|
"""A class to handle audio transcription using AWS Transcribe.""" |
|
|
|
|
|
def __init__(self): |
|
|
region = os.getenv("AWS_REGION", "us-east-1") |
|
|
self.client = boto3.client("transcribe", region_name=region) |
|
|
|
|
|
def _transcribe_audio(self, job_name: str, media_uri: str) -> dict: |
|
|
self.client.start_transcription_job( |
|
|
TranscriptionJobName=job_name, |
|
|
Media={"MediaFileUri": media_uri}, |
|
|
IdentifyLanguage=True, |
|
|
OutputBucketName=os.getenv("TARGET_BUCKET"), |
|
|
OutputKey=f"{job_name}.json", |
|
|
) |
|
|
|
|
|
def _get_transcription(self, job_name: str) -> str: |
|
|
while True: |
|
|
response: TranscriptionJob = self.client.get_transcription_job(TranscriptionJobName=job_name) |
|
|
status = response["TranscriptionJob"]["TranscriptionJobStatus"] |
|
|
if status in ["COMPLETED", "FAILED"]: |
|
|
break |
|
|
sleep(5) |
|
|
|
|
|
transcript_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"] |
|
|
try: |
|
|
bytes_result = s3_download_file(os.getenv("TARGET_BUCKET"), transcript_url.split("/")[-1]) |
|
|
transcription_data = json.loads(bytes_result.read().decode("utf-8")) |
|
|
return transcription_data["results"]["transcripts"][0]["transcript"] |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"Error decoding transcription JSON: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"Error downloading or processing transcription file: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
class AudioTranscriberTool(Tool, AudioTranscriber): |
|
|
name = "AudioTranscriber" |
|
|
description = "Extract text from audio files, such as MP3, MP4, WAV, etc." |
|
|
inputs = { |
|
|
"task_id": { |
|
|
"type": "string", |
|
|
"description": "The ID of the task associated with the audio file.", |
|
|
}, |
|
|
"file_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the audio file to transcribe.", |
|
|
}, |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
Tool.__init__(self, *args, **kwargs) |
|
|
AudioTranscriber.__init__(self, *args, **kwargs) |
|
|
|
|
|
def forward(self, task_id: str, file_name: str) -> str: |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
s3_upload_file(file_content, os.getenv("SOURCE_BUCKET"), file_name) |
|
|
|
|
|
media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
|
|
job_name = f"{uuid4()}-{file_name.split('.')[0]}" |
|
|
self._transcribe_audio(job_name, media_uri) |
|
|
transcription = self._get_transcription(job_name) |
|
|
return transcription |
|
|
except Exception as e: |
|
|
return f"Error starting transcription job for {file_name}: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def image_analyzer(task: str, task_id: str, file_name: str) -> str: |
|
|
"""Analyzes an image file and returns a response based on the task provided. |
|
|
|
|
|
Args: |
|
|
task (str): The description of the information to extract from the image. |
|
|
task_id (str): The ID of the task associated with the image file. |
|
|
file_name (str): The name of the image file to transcribe. |
|
|
""" |
|
|
try: |
|
|
file_content = get_file(task_id) |
|
|
base64_image = base64.b64encode(file_content.getvalue()).decode("utf-8") |
|
|
response = invoke_openai_model( |
|
|
[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "input_image", |
|
|
"image_url": f"data:image/{file_name.split('.')[-1]};base64,{base64_image}", |
|
|
}, |
|
|
{"type": "input_text", "text": task}, |
|
|
], |
|
|
} |
|
|
] |
|
|
) |
|
|
return response |
|
|
except Exception as e: |
|
|
return f"Error processing image file {file_name}: {e}" |
|
|
|
|
|
|
|
|
def _get_content(url: str, timeout: int = 5) -> bytes: |
|
|
resp = requests.get(url=url, timeout=timeout) |
|
|
resp.raise_for_status() |
|
|
return resp.content |
|
|
|
|
|
|
|
|
def _js_disable_message(text: str) -> bool: |
|
|
return "JavaScript is disabled in this browser" in text |
|
|
|
|
|
|
|
|
@tool |
|
|
def search_engine(search_term: str) -> str: |
|
|
"""Search for the provided search term in Google Search |
|
|
|
|
|
Args: |
|
|
search_term (str): The term to search for on the web. |
|
|
""" |
|
|
results = search(search_term, num_results=5) |
|
|
for idx, url in enumerate(results, 1): |
|
|
error_ocurred = False |
|
|
try: |
|
|
html_content = BeautifulSoup(_get_content(url), "html.parser") |
|
|
|
|
|
for tag in html_content.find_all( |
|
|
["header", "footer", "nav", "aside", "script", "style", "noscript", "form"] |
|
|
): |
|
|
tag.decompose() |
|
|
except (ReadTimeoutError, HTTPError) as ex: |
|
|
print("Got HTTP error when requesting %s. Error %s", url, ex) |
|
|
error_ocurred = True |
|
|
|
|
|
html_text = html_content.text |
|
|
if _js_disable_message(html_text): |
|
|
error_ocurred = True |
|
|
|
|
|
if error_ocurred: |
|
|
|
|
|
if len(results) == idx: |
|
|
return "Sorry, got an HTTP error when requesting the internet" |
|
|
|
|
|
continue |
|
|
|
|
|
return html_text.replace("\n", "") |
|
|
|
|
|
return "Could not retrieve any content from the search results." |
|
|
|
|
|
|
|
|
class YoutubeTranscriberTool(Tool, AudioTranscriber): |
|
|
name = "YoutubeTranscriber" |
|
|
description = "Extract text from YouTube videos, do not work for video understanding." |
|
|
inputs = { |
|
|
"youtube_url": { |
|
|
"type": "string", |
|
|
"description": "The URL of the YouTube video to transcribe.", |
|
|
}, |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
Tool.__init__(self, *args, **kwargs) |
|
|
AudioTranscriber.__init__(self, *args, **kwargs) |
|
|
|
|
|
def forward(self, youtube_url: str) -> str: |
|
|
file_name = f"{uuid4()}.mp4" |
|
|
buffer = BytesIO() |
|
|
try: |
|
|
youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress) |
|
|
youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer) |
|
|
except Exception as e: |
|
|
return f"Error fetching YouTube video {youtube_url}: {e}" |
|
|
try: |
|
|
s3_upload_file(buffer, os.getenv("SOURCE_BUCKET"), file_name) |
|
|
media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
|
|
job_name = f"{uuid4()}-{file_name.split('.', maxsplit=1)[0]}" |
|
|
self._transcribe_audio(job_name, media_uri) |
|
|
transcription = self._get_transcription(job_name) |
|
|
return transcription |
|
|
except Exception as e: |
|
|
return f"Error starting transcription job for {file_name}: {e}" |
|
|
|
|
|
|
|
|
class YoutubeVideoDescriptorTool(Tool): |
|
|
name = "YoutubeVideoDescriptor" |
|
|
description = ( |
|
|
"Describe a youtube video based on the video. Use this tool for tasks like video understanding," |
|
|
"not for audio transcription. Example: 'What is in the video?'" |
|
|
) |
|
|
inputs = { |
|
|
"youtube_url": { |
|
|
"type": "string", |
|
|
"description": "The URL of the YouTube video to get the description from.", |
|
|
}, |
|
|
"task": { |
|
|
"type": "string", |
|
|
"description": "The task to perform on the video, e.g., 'Describe the video content'.", |
|
|
}, |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
|
|
|
def _base64_frames(self, video_buffer: BytesIO, target_fps: int = 10) -> Generator[list[str], None, None]: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as input_temp: |
|
|
input_temp.write(video_buffer.getvalue()) |
|
|
input_temp_path = input_temp.name |
|
|
|
|
|
cap = cv2.VideoCapture(input_temp_path) |
|
|
orig_fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_interval = int(round(orig_fps / target_fps)) |
|
|
|
|
|
frames = [] |
|
|
i = 0 |
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if i % frame_interval == 0: |
|
|
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
|
|
|
|
i += 1 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
base64_frames = [] |
|
|
for frame in frames: |
|
|
_, buffer = cv2.imencode(".jpg", frame) |
|
|
encoded_buffer = base64.b64encode(buffer).decode("utf-8") |
|
|
base64_frames.append(encoded_buffer) |
|
|
if len(base64_frames) == 20: |
|
|
|
|
|
yield base64_frames |
|
|
base64_frames = [] |
|
|
|
|
|
if len(frames) < 20: |
|
|
yield base64_frames |
|
|
|
|
|
def forward(self, task: str, youtube_url: str) -> str: |
|
|
file_name = f"{uuid4()}.mp4" |
|
|
buffer = BytesIO() |
|
|
try: |
|
|
youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress) |
|
|
youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer) |
|
|
except Exception as e: |
|
|
return f"Error fetching YouTube video {youtube_url}: {e}" |
|
|
try: |
|
|
vision_messages = [] |
|
|
responses = [] |
|
|
for base64_frame_chunk in self._base64_frames(buffer, target_fps=1): |
|
|
vision_messages = [ |
|
|
{"type": "input_image", "image_url": f"data:image/jpeg;base64,{base64_frame}"} |
|
|
for base64_frame in base64_frame_chunk |
|
|
] |
|
|
response = invoke_openai_model( |
|
|
[{"role": "user", "content": [*vision_messages, {"type": "input_text", "text": task}]}] |
|
|
) |
|
|
responses.append(response) |
|
|
response = "\n".join(responses) |
|
|
final_response = invoke_bedrock_model( |
|
|
[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "text", "text": response}, |
|
|
{"type": "text", "text": "Please summarize the above text shortly."}, |
|
|
], |
|
|
} |
|
|
] |
|
|
) |
|
|
return final_response |
|
|
except Exception as e: |
|
|
return f"Error starting transcription job for {file_name}: {e}" |
|
|
|