Spaces:
Sleeping
Sleeping
| import requests | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| from utils import process_image_for_gpt | |
| import pandas as pd | |
| import tempfile | |
| import os | |
| import io | |
| import yt_dlp | |
| import re | |
| import html2text | |
| from requests.exceptions import RequestException | |
| from bs4 import BeautifulSoup | |
| from pydub import AudioSegment | |
| def add_numbers(*nums: list[int]) -> int: | |
| """Add a list of numbers | |
| Args: | |
| nums: list of numbers""" | |
| def transcribe_image_from_url(image_url: str) -> str: | |
| """Only works with full http urls""" | |
| client = OpenAI() | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": """Please transcribe all text visible in this image. | |
| Extract the text exactly as it appears, maintaining formatting when possible. | |
| If there's no readable text, respond with 'No text found in image'.""", | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": image_url, | |
| "detail": "high", | |
| }, | |
| }, | |
| ], | |
| } | |
| ], | |
| max_tokens=1000, | |
| temperature=0, | |
| ) | |
| transcribed_text = response.choices[0].message.content.strip() | |
| return transcribed_text | |
| def truncate_content(content: str, max_length: int = 10000) -> str: | |
| if len(content) <= max_length: | |
| return content | |
| else: | |
| return content[:max_length] | |
| class WebPageTranscription: | |
| def __init__(self): | |
| self.counter = 0 | |
| def transcribe_webpage(self, website_url: str) -> str: | |
| """Visits website url and returns markdown of contents | |
| Args: | |
| website_url:str""" | |
| if self.counter > 1: | |
| return "No more transcriptions, move on" | |
| self.counter += 1 | |
| try: | |
| # Send a GET request to the URL with a 20-second timeout | |
| response = requests.get(website_url, timeout=20) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| content_div = soup.find("div", id="mw-content-text") | |
| if not content_div: | |
| content_div = soup.find("div") | |
| # Only extract <p> and <table> tags | |
| elements = content_div.find_all(["p", "table"]) | |
| # Join selected HTML chunks | |
| html_subset = "".join(str(el) for el in elements) | |
| # Convert the HTML content to Markdown | |
| markdown_content = html2text.HTML2Text().handle(str(html_subset)) | |
| # Remove multiple line breaks | |
| markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) | |
| return truncate_content(markdown_content, 20000) | |
| except requests.exceptions.Timeout: | |
| return "The request timed out. Please try again later or check the URL." | |
| except RequestException as e: | |
| return f"Error fetching the webpage: {str(e)}" | |
| except Exception as e: | |
| return f"An unexpected error occurred: {str(e)}" | |
| def parse_youtube_video(youtube_url: str) -> str: | |
| """Returns text transcript of a youtube video | |
| Args: | |
| youtube_url: full url linking to the video to transcribe | |
| """ | |
| load_dotenv() | |
| client = OpenAI() | |
| # Configure yt-dlp to extract audio | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "mp3", | |
| "preferredquality": "64", | |
| } | |
| ], | |
| "outtmpl": "%(title)s.%(ext)s", | |
| } | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| ydl_opts["outtmpl"] = os.path.join(temp_dir, "%(title)s.%(ext)s") | |
| # Download audio | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(youtube_url, download=True) | |
| # Find the downloaded audio file | |
| audio_file = None | |
| for file in os.listdir(temp_dir): | |
| if file.endswith(".mp3"): | |
| audio_file = os.path.join(temp_dir, file) | |
| break | |
| if not audio_file: | |
| raise Exception("Audio file not found") | |
| audio = AudioSegment.from_mp3(audio_file) | |
| chunk_length_ms = 5 * 1000 * 60 | |
| chunks = [] | |
| for i in range(0, len(audio), chunk_length_ms): | |
| chunk = audio[i : i + chunk_length_ms] | |
| chunk_path = os.path.join(temp_dir, f"chunk_{i // chunk_length_ms}.mp3") | |
| chunk.export(chunk_path, format="mp3") | |
| chunks.append(chunk_path) | |
| # Transcribe each chunk | |
| full_transcript = "" | |
| for chunk_path in chunks: | |
| with open(chunk_path, "rb") as audio_chunk: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_chunk, | |
| ) | |
| full_transcript += transcript.text + " " | |
| return full_transcript.strip() | |
| class APIProcessor: | |
| def __init__(self, file_url: str, file_name: str): | |
| load_dotenv() | |
| self.file_url = file_url | |
| self.file_name = file_name | |
| self.client = OpenAI() | |
| def _transcribe_mp3(self, response: requests.Response) -> str: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| temp_file.write(chunk) | |
| temp_file_path = temp_file.name | |
| try: | |
| with open(temp_file_path, "rb") as audio_file: | |
| transcription = self.client.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", | |
| file=audio_file, | |
| ) | |
| return transcription.text | |
| except Exception as e: | |
| print(str(e)) | |
| finally: | |
| os.unlink(temp_file_path) | |
| def _transcribe_image(self, response: requests.Response) -> str: | |
| image_bytes = response.content | |
| base64_image = process_image_for_gpt(image_bytes) | |
| TRANSCRIPTION_PROMPT = """Please in detail transcribe as much of the output information you can via text. Feel free to use ASCII.""" | |
| image_message = [ | |
| {"type": "text", "text": TRANSCRIPTION_PROMPT}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{base64_image}", | |
| }, | |
| }, | |
| ] | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": image_message}], | |
| max_tokens=1000, | |
| ) | |
| return response.choices[0].message.content | |
| def _transcribe_spreadsheet(self, response: requests.Response) -> str: | |
| try: | |
| excel_data = io.BytesIO(response.content) | |
| excel_file = pd.ExcelFile(excel_data) | |
| sheets = excel_file.sheet_names | |
| all_sheets_data = {} | |
| for sheet in sheets: | |
| df = excel_file.parse(sheet_name=sheet) | |
| all_sheets_data[sheet] = df.to_string() | |
| return str(all_sheets_data) | |
| except Exception as e: | |
| return f"Error processing spreadsheet: {e}" | |
| def get_and_process_attachment(self) -> str: | |
| """For current question, download and process the file associated if it exists. | |
| Returns: | |
| Parsed text output of the attachment | |
| """ | |
| if not self.file_name: | |
| return "No attached file for this question" | |
| response = requests.get(self.file_url, timeout=15) | |
| file_extension = self.file_name.split(".")[-1] | |
| if file_extension == "mp3": | |
| parsed_text = self._transcribe_mp3(response) | |
| elif file_extension == "xlsx": | |
| parsed_text = self._transcribe_spreadsheet(response) | |
| elif file_extension == "png": | |
| parsed_text = self._transcribe_image(response) | |
| else: | |
| parsed_text = response.content | |
| return parsed_text | |
| if __name__ == "__main__": | |
| # attempt to process file examples from API | |
| # def get_file_api_url(task_id: str) -> str: | |
| # return "https://agents-course-unit4-scoring.hf.space" + "/files/" + task_id | |
| # audio_task_processor = APIProcessor( | |
| # file_name="", | |
| # file_url=get_file_api_url("8e867cd7-cff9-4e6c-867a-ff5ddc2550be"), | |
| # ) | |
| # response = audio_task_processor.get_and_process_attachment() | |
| # print(response) | |
| result = parse_youtube_video("https://www.youtube.com/watch?v=1htKBjuUWec") | |
| print(result) | |
| # text = transcribe_webpage( | |
| # "https://en.wikipedia.org/wiki/Mercedes_Sosa#Studio_albums" | |
| # ) | |
| # print(text) | |