File size: 9,019 Bytes
3869fd1
 
 
 
 
 
 
 
 
3cb2af5
e6af243
3cb2af5
e6af243
59b66b3
3869fd1
 
59b66b3
 
3869fd1
59b66b3
 
 
 
 
 
3869fd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59b66b3
3869fd1
 
 
 
 
 
 
 
 
 
 
 
 
 
e6af243
 
 
 
 
 
 
5620621
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3cb2af5
 
3869fd1
 
 
59b66b3
3869fd1
 
 
 
 
 
 
 
 
 
 
59b66b3
3869fd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59b66b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3869fd1
59b66b3
3869fd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf4800b
 
 
3869fd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e6af243
 
3869fd1
 
 
 
59b66b3
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import requests
from dotenv import load_dotenv
from openai import OpenAI
from utils import process_image_for_gpt
import pandas as pd
import tempfile
import os
import io
import yt_dlp
import re
import html2text
from requests.exceptions import RequestException
from bs4 import BeautifulSoup
from pydub import AudioSegment


def add_numbers(*nums: list[int]) -> int:
    """Add a list of numbers
    Args:
        nums: list of numbers"""


def transcribe_image_from_url(image_url: str) -> str:
    """Only works with full http urls"""
    client = OpenAI()

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": """Please transcribe all text visible in this image. 
                        Extract the text exactly as it appears, maintaining formatting when possible.
                        If there's no readable text, respond with 'No text found in image'.""",
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": image_url,
                            "detail": "high",
                        },
                    },
                ],
            }
        ],
        max_tokens=1000,
        temperature=0,
    )

    transcribed_text = response.choices[0].message.content.strip()
    return transcribed_text


def truncate_content(content: str, max_length: int = 10000) -> str:
    if len(content) <= max_length:
        return content
    else:
        return content[:max_length]


class WebPageTranscription:
    def __init__(self):
        self.counter = 0

    def transcribe_webpage(self, website_url: str) -> str:
        """Visits website url and returns markdown of contents
        Args:
        website_url:str"""
        if self.counter > 1:
            return "No more transcriptions, move on"
        self.counter += 1
        try:
            # Send a GET request to the URL with a 20-second timeout
            response = requests.get(website_url, timeout=20)
            response.raise_for_status()  # Raise an exception for bad status codes

            soup = BeautifulSoup(response.text, "html.parser")
            content_div = soup.find("div", id="mw-content-text")

            if not content_div:
                content_div = soup.find("div")

            # Only extract <p> and <table> tags
            elements = content_div.find_all(["p", "table"])

            # Join selected HTML chunks
            html_subset = "".join(str(el) for el in elements)

            # Convert the HTML content to Markdown
            markdown_content = html2text.HTML2Text().handle(str(html_subset))

            # Remove multiple line breaks
            markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)

            return truncate_content(markdown_content, 20000)

        except requests.exceptions.Timeout:
            return "The request timed out. Please try again later or check the URL."
        except RequestException as e:
            return f"Error fetching the webpage: {str(e)}"
        except Exception as e:
            return f"An unexpected error occurred: {str(e)}"


def parse_youtube_video(youtube_url: str) -> str:
    """Returns text transcript of a youtube video
    Args:
        youtube_url: full url linking to the video to transcribe
    """
    load_dotenv()
    client = OpenAI()

    # Configure yt-dlp to extract audio
    ydl_opts = {
        "format": "bestaudio/best",
        "postprocessors": [
            {
                "key": "FFmpegExtractAudio",
                "preferredcodec": "mp3",
                "preferredquality": "64",
            }
        ],
        "outtmpl": "%(title)s.%(ext)s",
    }

    with tempfile.TemporaryDirectory() as temp_dir:
        ydl_opts["outtmpl"] = os.path.join(temp_dir, "%(title)s.%(ext)s")

        # Download audio
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(youtube_url, download=True)

        # Find the downloaded audio file
        audio_file = None
        for file in os.listdir(temp_dir):
            if file.endswith(".mp3"):
                audio_file = os.path.join(temp_dir, file)
                break

        if not audio_file:
            raise Exception("Audio file not found")

        audio = AudioSegment.from_mp3(audio_file)
        chunk_length_ms = 5 * 1000 * 60
        chunks = []

        for i in range(0, len(audio), chunk_length_ms):
            chunk = audio[i : i + chunk_length_ms]
            chunk_path = os.path.join(temp_dir, f"chunk_{i // chunk_length_ms}.mp3")
            chunk.export(chunk_path, format="mp3")
            chunks.append(chunk_path)

        # Transcribe each chunk
        full_transcript = ""
        for chunk_path in chunks:
            with open(chunk_path, "rb") as audio_chunk:
                transcript = client.audio.transcriptions.create(
                    model="whisper-1",
                    file=audio_chunk,
                )
                full_transcript += transcript.text + " "

        return full_transcript.strip()


class APIProcessor:
    def __init__(self, file_url: str, file_name: str):
        load_dotenv()
        self.file_url = file_url
        self.file_name = file_name
        self.client = OpenAI()

    def _transcribe_mp3(self, response: requests.Response) -> str:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
            for chunk in response.iter_content(chunk_size=8192):
                temp_file.write(chunk)
            temp_file_path = temp_file.name

        try:
            with open(temp_file_path, "rb") as audio_file:
                transcription = self.client.audio.transcriptions.create(
                    model="gpt-4o-transcribe",
                    file=audio_file,
                )
            return transcription.text
        except Exception as e:
            print(str(e))
        finally:
            os.unlink(temp_file_path)

    def _transcribe_image(self, response: requests.Response) -> str:
        image_bytes = response.content
        base64_image = process_image_for_gpt(image_bytes)
        TRANSCRIPTION_PROMPT = """Please in detail transcribe as much of the output information you can via text. Feel free to use ASCII."""
        image_message = [
            {"type": "text", "text": TRANSCRIPTION_PROMPT},
            {
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/jpeg;base64,{base64_image}",
                },
            },
        ]
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": image_message}],
            max_tokens=1000,
        )
        return response.choices[0].message.content

    def _transcribe_spreadsheet(self, response: requests.Response) -> str:
        try:
            excel_data = io.BytesIO(response.content)
            excel_file = pd.ExcelFile(excel_data)
            sheets = excel_file.sheet_names
            all_sheets_data = {}

            for sheet in sheets:
                df = excel_file.parse(sheet_name=sheet)
                all_sheets_data[sheet] = df.to_string()

            return str(all_sheets_data)
        except Exception as e:
            return f"Error processing spreadsheet: {e}"

    def get_and_process_attachment(self) -> str:
        """For current question, download and process the file associated if it exists.
        Returns:
            Parsed text output of the attachment
        """
        if not self.file_name:
            return "No attached file for this question"

        response = requests.get(self.file_url, timeout=15)

        file_extension = self.file_name.split(".")[-1]

        if file_extension == "mp3":
            parsed_text = self._transcribe_mp3(response)
        elif file_extension == "xlsx":
            parsed_text = self._transcribe_spreadsheet(response)
        elif file_extension == "png":
            parsed_text = self._transcribe_image(response)
        else:
            parsed_text = response.content

        return parsed_text


if __name__ == "__main__":
    # attempt to process file examples from API
    # def get_file_api_url(task_id: str) -> str:
    #     return "https://agents-course-unit4-scoring.hf.space" + "/files/" + task_id

    # audio_task_processor = APIProcessor(
    #     file_name="",
    #     file_url=get_file_api_url("8e867cd7-cff9-4e6c-867a-ff5ddc2550be"),
    # )

    # response = audio_task_processor.get_and_process_attachment()
    # print(response)
    result = parse_youtube_video("https://www.youtube.com/watch?v=1htKBjuUWec")
    print(result)
    # text = transcribe_webpage(
    #     "https://en.wikipedia.org/wiki/Mercedes_Sosa#Studio_albums"
    # )
    # print(text)