| |
| import ast |
| import json |
| import os |
| import base64 |
| import tempfile |
| from io import BytesIO |
| from time import sleep |
| from typing import Generator |
| from uuid import uuid4 |
|
|
| import boto3 |
| import cv2 |
| import fitz |
| import requests |
| from bs4 import BeautifulSoup |
| from googlesearch import search |
| from pandas import read_excel |
| from pytubefix import YouTube |
| from pytubefix.cli import on_progress |
| from smolagents import tool, Tool |
| from requests.exceptions import HTTPError |
| from urllib3.exceptions import ReadTimeoutError |
|
|
| from definitions import TranscriptionJob |
| from utils import get_file, s3_upload_file, s3_download_file, invoke_bedrock_model, invoke_openai_model |
|
|
|
|
| @tool |
| def math_calculator(expression: str) -> str: |
| """A simple calculator tool that evaluates mathematical expressions. |
| |
| Args: |
| expression (str): A mathematical expression as a string, e.g., "2 + 2 * 3". |
| """ |
| try: |
| result = ast.literal_eval(expression) |
| return str(result) |
| except Exception as e: |
| return f"Error evaluating expression: {e}" |
|
|
|
|
| @tool |
| def excel_reader(task_id: str, file_name: str) -> str: |
| """Reads an Excel file and returns its content as a dataframe string. |
| |
| Args: |
| task_id (str): The ID of the task associated with the file. |
| file_name (str): The name of the Excel file to read. |
| """ |
| try: |
| file_content = get_file(task_id) |
| df = read_excel(file_content, engine="openpyxl") |
| return df.to_string(index=False) |
| except Exception as e: |
| return f"Error reading Excel file {file_name}: {e}" |
|
|
|
|
| @tool |
| def txt_reader(task_id: str, file_name: str) -> str: |
| """Reads a text file and returns its content as a string. |
| |
| Args: |
| task_id (str): The ID of the task associated with the file. |
| file_name (str): The name of the file to read. |
| """ |
| try: |
| file_content = get_file(task_id) |
| return file_content.read().decode("utf-8") |
| except Exception as e: |
| return f"Error reading file {file_name}: {e}" |
|
|
|
|
| @tool |
| def pdf_reader(task_id: str, file_name: str) -> str: |
| """Reads a PDF file and returns its content as a string. |
| |
| Args: |
| task_id (str): The ID of the task associated with the file. |
| file_name (str): The name of the PDF file to read. |
| """ |
| try: |
| file_content = get_file(task_id) |
| with fitz.open(stream=file_content.read(), filetype="pdf") as doc: |
| content = [page.get_text() for page in doc if page.get_text()] |
| text = "\n".join(content) |
| if not text: |
| return f"No text found in PDF file {file_name}." |
| return text.strip() |
| except Exception as e: |
| return f"Error reading PDF file {file_name}: {e}" |
|
|
|
|
| class AudioTranscriber: |
| """A class to handle audio transcription using AWS Transcribe.""" |
|
|
| def __init__(self): |
| region = os.getenv("AWS_REGION", "us-east-1") |
| self.client = boto3.client("transcribe", region_name=region) |
|
|
| def _transcribe_audio(self, job_name: str, media_uri: str) -> dict: |
| self.client.start_transcription_job( |
| TranscriptionJobName=job_name, |
| Media={"MediaFileUri": media_uri}, |
| IdentifyLanguage=True, |
| OutputBucketName=os.getenv("TARGET_BUCKET"), |
| OutputKey=f"{job_name}.json", |
| ) |
|
|
| def _get_transcription(self, job_name: str) -> str: |
| while True: |
| response: TranscriptionJob = self.client.get_transcription_job(TranscriptionJobName=job_name) |
| status = response["TranscriptionJob"]["TranscriptionJobStatus"] |
| if status in ["COMPLETED", "FAILED"]: |
| break |
| sleep(5) |
|
|
| transcript_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"] |
| try: |
| bytes_result = s3_download_file(os.getenv("TARGET_BUCKET"), transcript_url.split("/")[-1]) |
| transcription_data = json.loads(bytes_result.read().decode("utf-8")) |
| return transcription_data["results"]["transcripts"][0]["transcript"] |
| except json.JSONDecodeError as e: |
| print(f"Error decoding transcription JSON: {e}") |
| raise |
| except Exception as e: |
| print(f"Error downloading or processing transcription file: {e}") |
| raise |
|
|
|
|
| class AudioTranscriberTool(Tool, AudioTranscriber): |
| name = "AudioTranscriber" |
| description = "Extract text from audio files, such as MP3, MP4, WAV, etc." |
| inputs = { |
| "task_id": { |
| "type": "string", |
| "description": "The ID of the task associated with the audio file.", |
| }, |
| "file_name": { |
| "type": "string", |
| "description": "The name of the audio file to transcribe.", |
| }, |
| } |
| output_type = "string" |
|
|
| def __init__(self, *args, **kwargs): |
| Tool.__init__(self, *args, **kwargs) |
| AudioTranscriber.__init__(self, *args, **kwargs) |
|
|
| def forward(self, task_id: str, file_name: str) -> str: |
| try: |
| file_content = get_file(task_id) |
| s3_upload_file(file_content, os.getenv("SOURCE_BUCKET"), file_name) |
|
|
| media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
| job_name = f"{uuid4()}-{file_name.split('.')[0]}" |
| self._transcribe_audio(job_name, media_uri) |
| transcription = self._get_transcription(job_name) |
| return transcription |
| except Exception as e: |
| return f"Error starting transcription job for {file_name}: {e}" |
|
|
|
|
| @tool |
| def image_analyzer(task: str, task_id: str, file_name: str) -> str: |
| """Analyzes an image file and returns a response based on the task provided. |
| |
| Args: |
| task (str): The description of the information to extract from the image. |
| task_id (str): The ID of the task associated with the image file. |
| file_name (str): The name of the image file to transcribe. |
| """ |
| try: |
| file_content = get_file(task_id) |
| base64_image = base64.b64encode(file_content.getvalue()).decode("utf-8") |
| response = invoke_openai_model( |
| [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "input_image", |
| "image_url": f"data:image/{file_name.split('.')[-1]};base64,{base64_image}", |
| }, |
| {"type": "input_text", "text": task}, |
| ], |
| } |
| ] |
| ) |
| return response |
| except Exception as e: |
| return f"Error processing image file {file_name}: {e}" |
|
|
|
|
| def _get_content(url: str, timeout: int = 5) -> bytes: |
| resp = requests.get(url=url, timeout=timeout) |
| resp.raise_for_status() |
| return resp.content |
|
|
|
|
| def _js_disable_message(text: str) -> bool: |
| return "JavaScript is disabled in this browser" in text |
|
|
|
|
| @tool |
| def search_engine(search_term: str) -> str: |
| """Search for the provided search term in Google Search |
| |
| Args: |
| search_term (str): The term to search for on the web. |
| """ |
| results = search(search_term, num_results=5) |
| for idx, url in enumerate(results, 1): |
| error_ocurred = False |
| try: |
| html_content = BeautifulSoup(_get_content(url), "html.parser") |
| |
| for tag in html_content.find_all( |
| ["header", "footer", "nav", "aside", "script", "style", "noscript", "form"] |
| ): |
| tag.decompose() |
| except (ReadTimeoutError, HTTPError) as ex: |
| print("Got HTTP error when requesting %s. Error %s", url, ex) |
| error_ocurred = True |
|
|
| html_text = html_content.text |
| if _js_disable_message(html_text): |
| error_ocurred = True |
|
|
| if error_ocurred: |
| |
| if len(results) == idx: |
| return "Sorry, got an HTTP error when requesting the internet" |
| |
| continue |
|
|
| return html_text.replace("\n", "") |
|
|
| return "Could not retrieve any content from the search results." |
|
|
|
|
| class YoutubeTranscriberTool(Tool, AudioTranscriber): |
| name = "YoutubeTranscriber" |
| description = "Extract text from YouTube videos, do not work for video understanding." |
| inputs = { |
| "youtube_url": { |
| "type": "string", |
| "description": "The URL of the YouTube video to transcribe.", |
| }, |
| } |
| output_type = "string" |
|
|
| def __init__(self, *args, **kwargs): |
| Tool.__init__(self, *args, **kwargs) |
| AudioTranscriber.__init__(self, *args, **kwargs) |
|
|
| def forward(self, youtube_url: str) -> str: |
| file_name = f"{uuid4()}.mp4" |
| buffer = BytesIO() |
| try: |
| youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress) |
| youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer) |
| except Exception as e: |
| return f"Error fetching YouTube video {youtube_url}: {e}" |
| try: |
| s3_upload_file(buffer, os.getenv("SOURCE_BUCKET"), file_name) |
| media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
| job_name = f"{uuid4()}-{file_name.split('.', maxsplit=1)[0]}" |
| self._transcribe_audio(job_name, media_uri) |
| transcription = self._get_transcription(job_name) |
| return transcription |
| except Exception as e: |
| return f"Error starting transcription job for {file_name}: {e}" |
|
|
|
|
| class YoutubeVideoDescriptorTool(Tool): |
| name = "YoutubeVideoDescriptor" |
| description = ( |
| "Describe a youtube video based on the video. Use this tool for tasks like video understanding," |
| "not for audio transcription. Example: 'What is in the video?'" |
| ) |
| inputs = { |
| "youtube_url": { |
| "type": "string", |
| "description": "The URL of the YouTube video to get the description from.", |
| }, |
| "task": { |
| "type": "string", |
| "description": "The task to perform on the video, e.g., 'Describe the video content'.", |
| }, |
| } |
| output_type = "string" |
|
|
| |
| def _base64_frames(self, video_buffer: BytesIO, target_fps: int = 10) -> Generator[list[str], None, None]: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as input_temp: |
| input_temp.write(video_buffer.getvalue()) |
| input_temp_path = input_temp.name |
|
|
| cap = cv2.VideoCapture(input_temp_path) |
| orig_fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_interval = int(round(orig_fps / target_fps)) |
|
|
| frames = [] |
| i = 0 |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| if i % frame_interval == 0: |
| frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
|
| i += 1 |
|
|
| cap.release() |
|
|
| base64_frames = [] |
| for frame in frames: |
| _, buffer = cv2.imencode(".jpg", frame) |
| encoded_buffer = base64.b64encode(buffer).decode("utf-8") |
| base64_frames.append(encoded_buffer) |
| if len(base64_frames) == 20: |
| |
| yield base64_frames |
| base64_frames = [] |
| |
| if len(frames) < 20: |
| yield base64_frames |
|
|
| def forward(self, task: str, youtube_url: str) -> str: |
| file_name = f"{uuid4()}.mp4" |
| buffer = BytesIO() |
| try: |
| youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress) |
| youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer) |
| except Exception as e: |
| return f"Error fetching YouTube video {youtube_url}: {e}" |
| try: |
| vision_messages = [] |
| responses = [] |
| for base64_frame_chunk in self._base64_frames(buffer, target_fps=1): |
| vision_messages = [ |
| {"type": "input_image", "image_url": f"data:image/jpeg;base64,{base64_frame}"} |
| for base64_frame in base64_frame_chunk |
| ] |
| response = invoke_openai_model( |
| [{"role": "user", "content": [*vision_messages, {"type": "input_text", "text": task}]}] |
| ) |
| responses.append(response) |
| response = "\n".join(responses) |
| final_response = invoke_bedrock_model( |
| [ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": response}, |
| {"type": "text", "text": "Please summarize the above text shortly."}, |
| ], |
| } |
| ] |
| ) |
| return final_response |
| except Exception as e: |
| return f"Error starting transcription job for {file_name}: {e}" |
|
|