import torch import gradio as gr import yt_dlp as youtube_dl from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read import tempfile import os import numpy as np from gensim.models import Word2Vec from sklearn.cluster import KMeans from sklearn.metrics.pairwise import cosine_similarity from collections import defaultdict import spacy from transformers import pipeline from sklearn.metrics import davies_bouldin_score MODEL_NAME = "openai/whisper-large-v3" BATCH_SIZE = 8 FILE_LIMIT_MB = 1000 YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files device = 0 if torch.cuda.is_available() else "cpu" pipe = pipeline( task="automatic-speech-recognition", model=MODEL_NAME, chunk_length_s=30, device=device, ) summarizer = pipeline("summarization", model="facebook/bart-large-cnn") # Download the 'en_core_web_sm' model spacy.cli.download("en_core_web_sm") # Load the model nlp = spacy.load("en_core_web_sm") def summarize(text, max_length=1000): return summarizer(text, max_length=min(max_length, len(text)), min_length=1, do_sample=False)[0]["summary_text"] def segment_sentences(text): # Process the text using spaCy doc = nlp(text) # Extract sentences from the processed document return [sent.text for sent in doc.sents] def preprocess_sentences(sentences): preprocessed_sentences = [] for sentence in sentences: # Tokenize and lemmatize the sentence using spaCy doc = nlp(sentence.lower()) tokens = [token.lemma_ for token in doc if not token.is_stop and token.is_alpha] preprocessed_sentences.append(tokens) return preprocessed_sentences def embedding(preprocessed_sentences): model = Word2Vec(preprocessed_sentences, vector_size=100, window=5, min_count=1, sg=1) sentence_embeddings = [] for sentence in preprocessed_sentences: word_embeddings = [model.wv[word] for word in sentence if word in model.wv] if word_embeddings: sentence_embeddings.append(np.mean(word_embeddings, axis=0)) else: # Handle the case when none of the words in the sentence exist in the Word2Vec vocabulary sentence_embeddings.append(np.zeros(model.vector_size)) # Use zero vector as placeholder return sentence_embeddings def optimal_n_clusters(sentence_embeddings): cosine_sim_matrix = cosine_similarity(sentence_embeddings) db_scores = [] k_values = range(2, len(sentence_embeddings)) for k in k_values: kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) cluster_labels = kmeans.fit_predict(cosine_sim_matrix) db_scores.append(davies_bouldin_score(cosine_sim_matrix, cluster_labels)) # Choose the optimal number of clusters based on Davies-Bouldin index return (cosine_sim_matrix, np.argmin(db_scores) + 2) # Add 2 to account for skipping k=1 def cluster_assignments(cosine_sim_matrix, optimal_n_clusters): # Cluster sentence embeddings using KMeans with the optimal number of clusters kmeans = KMeans(n_clusters=optimal_n_clusters, n_init=10, random_state=42) return kmeans.fit_predict(cosine_sim_matrix) def clusters(sentences, cluster_assignments): # Group sentences into clusters clusters = defaultdict(list) for i, sentence in enumerate(sentences): clusters[cluster_assignments[i]].append(sentence) result = defaultdict(list) for i in range(len(clusters)): cluster = ' '.join(clusters[i]) title = summarize(cluster, 10) result[title].extend(clusters[i]) return result def format_as_bullet_points(dictionary): bullet_points = "" for key, values in dictionary.items(): bullet_points += f"- {key}:\n" for value in values: bullet_points += f" - {value}\n" return bullet_points.strip() def final_result(input): text = summarize(input) sentences = segment_sentences(text) preprocessed_sentences = preprocess_sentences(sentences) sentence_embeddings = embedding(preprocessed_sentences) cosine_sim_matrix, optimal_number_of_clusters = optimal_n_clusters(sentence_embeddings) clusters_assignments = cluster_assignments(cosine_sim_matrix, optimal_number_of_clusters) all_clusters = clusters(sentences, clusters_assignments) return format_as_bullet_points(all_clusters) def transcribe(inputs, task): if inputs is None: raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] return final_result(text) def _return_yt_html_embed(yt_url): video_id = yt_url.split("?v=")[-1] HTML_str = ( f'
' "
" ) return HTML_str def download_yt_audio(yt_url, filename): info_loader = youtube_dl.YoutubeDL() try: info = info_loader.extract_info(yt_url, download=False) except youtube_dl.utils.DownloadError as err: raise gr.Error(str(err)) file_length = info["duration_string"] file_h_m_s = file_length.split(":") file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] if len(file_h_m_s) == 1: file_h_m_s.insert(0, 0) if len(file_h_m_s) == 2: file_h_m_s.insert(0, 0) file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] if file_length_s > YT_LENGTH_LIMIT_S: yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: ydl.download([yt_url]) except youtube_dl.utils.ExtractorError as err: raise gr.Error(str(err)) def yt_transcribe(yt_url, task, max_filesize=75.0): html_embed_str = _return_yt_html_embed(yt_url) with tempfile.TemporaryDirectory() as tmpdirname: filepath = os.path.join(tmpdirname, "video.mp4") download_yt_audio(yt_url, filepath) with open(filepath, "rb") as f: inputs = f.read() inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] return html_embed_str, final_result(text) demo = gr.Blocks() mf_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.inputs.Audio(source="microphone", type="filepath", optional=True), gr.inputs.Radio(["transcribe", "translate"], label="Task", default="transcribe"), ], outputs="text", layout="horizontal", theme="huggingface", title="Whisper Large V3: Transcribe Audio", description=( "Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the OpenAI Whisper" f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" " of arbitrary length." ), allow_flagging="never", ) file_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Audio file"), gr.inputs.Radio(["transcribe", "translate"], label="Task", default="transcribe"), ], outputs="text", layout="horizontal", theme="huggingface", title="Whisper Large V3: Transcribe Audio", description=( "Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the OpenAI Whisper" f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" " of arbitrary length." ), allow_flagging="never", ) yt_transcribe = gr.Interface( fn=yt_transcribe, inputs=[ gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), gr.inputs.Radio(["transcribe", "translate"], label="Task", default="transcribe") ], outputs=["html", "text"], layout="horizontal", theme="huggingface", title="Whisper Large V3: Transcribe YouTube", description=( "Transcribe long-form YouTube videos with the click of a button! Demo uses the OpenAI Whisper checkpoint" f" [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe video files of" " arbitrary length." ), allow_flagging="never", ) with demo: gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) demo.launch(enable_queue=True)