Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import whisper | |
| import re | |
| from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip | |
| from moviepy.editor import * | |
| import math | |
| from stable_whisper import modify_model,results_to_word_srt | |
| import asyncio | |
| from deepgram import Deepgram | |
| from typing import Dict | |
| import os | |
| import moviepy.editor as mp | |
| from pytube import YouTube | |
| from time import sleep | |
| import pandas as pd | |
| import calendar | |
| import time | |
| current_GMT = time.gmtime() | |
| time_stamp = calendar.timegm(current_GMT) | |
| st.title('AI Editor for Content Creators!') | |
| #load whisper model | |
| def load_model(model_selected): | |
| #load medium model | |
| model = whisper.load_model(model_selected) | |
| # modify model to get word timestamp | |
| modify_model(model) | |
| return model | |
| #transcribe | |
| def transcribe_video(vid,model_selected): | |
| model = load_model(model_selected) | |
| options = whisper.DecodingOptions(fp16=False,language="English") | |
| result = model.transcribe(vid, **options.__dict__) | |
| result['srt'] = whisper_result_to_srt(result) | |
| return result | |
| #srt generation | |
| def whisper_result_to_srt(result): | |
| text = [] | |
| for i,s in enumerate(result['segments']): | |
| text.append(str(i+1)) | |
| time_start = s['start'] | |
| hours, minutes, seconds = int(time_start/3600), (time_start/60) % 60, (time_start) % 60 | |
| timestamp_start = "%02d:%02d:%06.3f" % (hours, minutes, seconds) | |
| timestamp_start = timestamp_start.replace('.',',') | |
| time_end = s['end'] | |
| hours, minutes, seconds = int(time_end/3600), (time_end/60) % 60, (time_end) % 60 | |
| timestamp_end = "%02d:%02d:%06.3f" % (hours, minutes, seconds) | |
| timestamp_end = timestamp_end.replace('.',',') | |
| text.append(timestamp_start + " --> " + timestamp_end) | |
| text.append(s['text'].strip() + "\n") | |
| return "\n".join(text) | |
| #compute speaking_time | |
| async def compute_speaking_time(transcript_data: Dict,data:str) -> None: | |
| if 'results' in transcript_data: | |
| transcript = transcript_data['results']['channels'][0]['alternatives'][0]['words'] | |
| total_speaker_time = {} | |
| speaker_words = [] | |
| current_speaker = -1 | |
| for speaker in transcript: | |
| speaker_number = speaker["speaker"] | |
| if speaker_number is not current_speaker: | |
| current_speaker = speaker_number | |
| speaker_words.append([speaker_number, [], 0]) | |
| try: | |
| total_speaker_time[speaker_number][1] += 1 | |
| except KeyError: | |
| total_speaker_time[speaker_number] = [0,1] | |
| get_word = speaker["word"] | |
| speaker_words[-1][1].append(get_word) | |
| total_speaker_time[speaker_number][0] += speaker["end"] - speaker["start"] | |
| speaker_words[-1][2] += speaker["end"] - speaker["start"] | |
| for speaker, words, time_amount in speaker_words: | |
| print(f"Speaker {speaker}: {' '.join(words)}") | |
| data+=f"\nSpeaker {speaker}: {' '.join(words)}" | |
| print(f"Speaker {speaker}: {time_amount}") | |
| data+=f"\nSpeaker {speaker}: {time_amount}" | |
| for speaker, (total_time, amount) in total_speaker_time.items(): | |
| print(f"Speaker {speaker} avg time per phrase: {total_time/amount} ") | |
| data+=f"\nSpeaker {speaker} avg time per phrase: {total_time/amount} " | |
| print(f"Total time of conversation: {total_time}") | |
| data+=f"\nTotal time of conversation: {total_time}" | |
| return transcript,data | |
| #extract audio from video | |
| def extract_write_audio(vd): | |
| my_clip = mp.VideoFileClip(f'{vd}') | |
| my_clip.audio.write_audiofile(f"audio.wav") | |
| #speaker diarization workflow | |
| async def speaker_diarization_flow(PATH_TO_FILE): | |
| audio = extract_write_audio(PATH_TO_FILE) | |
| data = '' | |
| DEEPGRAM_API_KEY = "3dc39bf904babb858390455b1a1399e221bf87f8" | |
| deepgram = Deepgram(DEEPGRAM_API_KEY) | |
| with open(PATH_TO_FILE, 'rb') as audio: | |
| source = {'buffer': audio, 'mimetype': 'audio/wav'} | |
| transcription = await deepgram.transcription.prerecorded(source, {'punctuate': True, 'diarize': True}) | |
| transcript,final_data = await compute_speaking_time(transcription,data) | |
| return final_data | |
| # speaker diarization main funciton | |
| async def speaker_diarization(PATH_TO_FILE): | |
| data = await speaker_diarization_flow(PATH_TO_FILE) | |
| print("data is", data) | |
| return data | |
| #find filler words | |
| def filler_words_finder(result_data): | |
| word_map_prior_edit=set() | |
| word_map_after_edit=set() | |
| #my filler words sample | |
| filler_words={'um','ah','you know','mmm','mmm','er','uh','Hmm','actually','basically','seriously','mhm','uh huh','uh','huh','ooh','aah','ooh'} | |
| filler_words_timestamp=set() | |
| for keys in result_data: | |
| if keys == 'segments': | |
| prev=0 | |
| for i in result_data[keys]: | |
| for word in i['whole_word_timestamps']: | |
| lower_case = re.sub(r'\W','',word['word'].lower()) | |
| word_map_prior_edit.add(word['timestamp']) | |
| if lower_case in filler_words or lower_case.startswith(('hm','aa','mm','oo')): | |
| st.write(word['word'].lower(),word['timestamp']) | |
| print(word['word'].lower(),word['timestamp']) | |
| filler_words_timestamp.add(word['timestamp']) | |
| prev=word['timestamp'] | |
| continue | |
| word_map_after_edit.add((prev,word['timestamp'])) | |
| prev=word['timestamp'] | |
| return word_map_after_edit, filler_words_timestamp | |
| def merge_overlapping_time_intervals(intervals): | |
| stack = [] | |
| result=[intervals[0]] | |
| for interval in intervals: | |
| interval2=result[-1] | |
| if overlap(interval,interval2): | |
| result[-1] = [min(interval[0],interval2[0]),max(interval[1],interval2[1])] | |
| else: | |
| result.append(interval) | |
| return result | |
| def overlap(interval1,interval2): | |
| return min(interval1[1],interval2[1])-max(interval1[0],interval2[0]) >= 0 | |
| #assembly ai endpoints | |
| import requests | |
| transcript_endpoint = "https://api.assemblyai.com/v2/transcript" | |
| upload_endpoint = "https://api.assemblyai.com/v2/upload" | |
| headers = { | |
| "authorization": "05e515bf6b474966bc48bbdd1448b3cf", | |
| "content-type": "application/json" | |
| } | |
| def upload_to_AssemblyAI(save_location): | |
| CHUNK_SIZE = 5242880 | |
| def read_file(filename): | |
| with open(filename, 'rb') as _file: | |
| while True: | |
| print("chunk uploaded") | |
| data = _file.read(CHUNK_SIZE) | |
| if not data: | |
| break | |
| yield data | |
| upload_response = requests.post( | |
| upload_endpoint, | |
| headers=headers, data=read_file(save_location) | |
| ) | |
| print(upload_response.json()) | |
| audio_url = upload_response.json()['upload_url'] | |
| print('Uploaded to', audio_url) | |
| return audio_url | |
| def start_analysis(audio_url,type): | |
| ## Start transcription job of audio file | |
| data = { | |
| 'audio_url': audio_url, | |
| 'iab_categories': True, | |
| 'content_safety': True, | |
| "summarization": True, | |
| "summary_type": "bullets", | |
| "summary_model":type | |
| } | |
| if type=='conversational': | |
| data["speaker_labels"]= True | |
| transcript_response = requests.post(transcript_endpoint, json=data, headers=headers) | |
| print(transcript_response.json()) | |
| transcript_id = transcript_response.json()['id'] | |
| polling_endpoint = transcript_endpoint + "/" + transcript_id | |
| print("Transcribing at", polling_endpoint) | |
| return polling_endpoint | |
| def get_analysis_results(polling_endpoint): | |
| status = 'submitted' | |
| while True: | |
| print(status) | |
| polling_response = requests.get(polling_endpoint, headers=headers) | |
| status = polling_response.json()['status'] | |
| # st.write(polling_response.json()) | |
| # st.write(status) | |
| if status == 'submitted' or status == 'processing' or status == 'queued': | |
| print('not ready yet') | |
| sleep(10) | |
| elif status == 'completed': | |
| print('creating transcript') | |
| return polling_response | |
| break | |
| else: | |
| print('error') | |
| return False | |
| break | |
| def pii_redact(audiourl,options): | |
| print(options,audiourl) | |
| endpoint = "https://api.assemblyai.com/v2/transcript" | |
| json = { | |
| "audio_url": audiourl, | |
| "redact_pii": True, | |
| "redact_pii_audio": True, | |
| "redact_pii_policies": options | |
| } | |
| headers = { | |
| "authorization": "05e515bf6b474966bc48bbdd1448b3cf", | |
| "content-type": "application/json", | |
| } | |
| response = requests.post(endpoint, json=json, headers=headers) | |
| print(response.json()) | |
| transcript_id = response.json()['id'] | |
| polling_endpoint = endpoint + "/" + transcript_id | |
| return polling_endpoint | |
| def pii_redact_audio(polling_endpoint): | |
| status = 'submitted' | |
| headers = { | |
| "authorization": "05e515bf6b474966bc48bbdd1448b3cf", | |
| "content-type": "application/json", | |
| } | |
| while True: | |
| print(status) | |
| polling_response = requests.get(polling_endpoint, headers=headers) | |
| status = polling_response.json()['status'] | |
| if status == 'submitted' or status == 'processing' or status == 'queued': | |
| print('not ready yet') | |
| sleep(10) | |
| elif status == 'completed': | |
| print('creating transcript') | |
| return polling_response | |
| break | |
| else: | |
| print('error') | |
| return False | |
| break | |
| def download_redact_audio(pooling_enpoint): | |
| headers = { | |
| "authorization": "05e515bf6b474966bc48bbdd1448b3cf", | |
| "content-type": "application/json", | |
| } | |
| redacted_audio_response = requests.get(pooling_enpoint + "/redacted-audio",headers=headers) | |
| print(redacted_audio_response.json()) | |
| redacted_audio = requests.get(redacted_audio_response.json()['redacted_audio_url']) | |
| with open('redacted_audio.mp3', 'wb') as f: | |
| f.write(redacted_audio.content) | |
| def redact_audio_video_display(vd,audio): | |
| audioclip = AudioFileClip(audio) | |
| clip = VideoFileClip(vd) | |
| videoclip = clip.set_audio(audioclip) | |
| videoclip.write_videofile("Redacted_video.mp4") | |
| st.video("Redacted_video.mp4") | |
| async def main(uploaded_video,model_selected): | |
| try: | |
| vid = uploaded_video.name | |
| with open(vid, mode='wb') as f: | |
| f.write(uploaded_video.read()) # save video to disk | |
| except: | |
| with st.spinner('Downloading Yotube Video'): | |
| yt = YouTube(uploaded_video) | |
| title=yt.title | |
| vid = f"{title}.mp4" | |
| yt.streams.filter(file_extension="mp4").get_by_resolution("360p").download(filename=vid) | |
| finally: | |
| name = vid.split('.')[0] | |
| preview = st.video(vid) | |
| #extracting the transcription result | |
| with st.spinner('Transcribing Video, Wait for it...'): | |
| result = transcribe_video(vid,model_selected) | |
| st.text_area("Edit Transcript",result["text"]) | |
| col1, col2, col3, col4, col5, col6 = st.columns([1,1,1,1,1,1]) | |
| tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs(["Remove Filler Words","Edit Video" ,"Download SRT", "Perform Speaker Diarization","Content Analyzer","PII redactation"]) | |
| with tab1: | |
| filler_word = st.button('Edit/Remove Filler Words with a click of a button') | |
| if filler_word: | |
| with st.spinner(text="In progress..."): | |
| word_map_after_edit, filler_words_timestamp = filler_words_finder(result) | |
| final_intervals = merge_overlapping_time_intervals(sorted(list(word_map_after_edit))) | |
| subclips=[] | |
| for start,end in final_intervals: | |
| clip = VideoFileClip(vid) | |
| tmp = clip.subclip(start,(end - end*0.1)) | |
| subclips.append(tmp) | |
| #concatenate subclips without filler words | |
| final_clip = concatenate_videoclips(subclips) | |
| final_clip.write_videofile(f"remove_{vid}") | |
| preview = st.video(f"remove_{vid}") | |
| with tab2: | |
| save = st.button('Edit') | |
| with tab3: | |
| download = st.download_button('Download SRT', result['srt'],f'{name}.srt') | |
| if download: | |
| st.write('Thanks for downloading!') | |
| with tab4: | |
| identify_download_speaker = st.button('Perform Speaker Diarization') | |
| if identify_download_speaker: | |
| with st.spinner(text="In progress..."): | |
| results = await speaker_diarization(vid) | |
| download_speaker = st.download_button("download speaker_diarization",results,'diarization_stats.txt') | |
| if download_speaker: | |
| st.write('Thanks for downloading!') | |
| with tab5: | |
| type = st.selectbox('Summary Type?',('informative', 'conversational', 'catchy')) | |
| Analyze_content = st.button("Start Content Analysis") | |
| if Analyze_content: | |
| with st.spinner(text="In progress..."): | |
| audio = extract_write_audio(vid) | |
| audio_url = upload_to_AssemblyAI("audio.wav") | |
| # start analysis of the file | |
| polling_endpoint = start_analysis(audio_url,type) | |
| # receive the results | |
| results = get_analysis_results(polling_endpoint) | |
| # separate analysis results | |
| summary = results.json()['summary'] | |
| content_moderation = results.json()["content_safety_labels"] | |
| topic_labels = results.json()["iab_categories_result"] | |
| my_expander1 = st.expander(label='Summary') | |
| my_expander2 = st.expander(label='Content Moderation') | |
| my_expander3 = st.expander(label='Topic Discussed') | |
| with my_expander1: | |
| st.header("Video summary") | |
| st.write(summary) | |
| with my_expander2: | |
| st.header("Sensitive content") | |
| if content_moderation['summary'] != {}: | |
| st.subheader('🚨 Mention of the following sensitive topics detected.') | |
| moderation_df = pd.DataFrame(content_moderation['summary'].items()) | |
| moderation_df.columns = ['topic','confidence'] | |
| st.dataframe(moderation_df, use_container_width=True) | |
| else: | |
| st.subheader('✅ All clear! No sensitive content detected.') | |
| with my_expander3: | |
| st.header("Topics discussed") | |
| topics_df = pd.DataFrame(topic_labels['summary'].items()) | |
| topics_df.columns = ['topic','confidence'] | |
| topics_df["topic"] = topics_df["topic"].str.split(">") | |
| expanded_topics = topics_df.topic.apply(pd.Series).add_prefix('topic_level_') | |
| topics_df = topics_df.join(expanded_topics).drop('topic', axis=1).sort_values(['confidence'], ascending=False).fillna('') | |
| st.dataframe(topics_df, use_container_width=True) | |
| with tab6: | |
| options = st.multiselect('Select Policies to redact from video',["medical_process","medical_condition","blood_type","drug","injury","number_sequence","email_address","date_of_birth","phone_number","us_social_security_number","credit_card_number","credit_card_expiration","credit_card_cvv","date","nationality","event","language","location","money_amount","person_name","person_age","organization","political_affiliation","occupation","religion","drivers_license","banking_information"],["person_name", 'credit_card_number']) | |
| Perform_redact = st.button("Start PII Redaction") | |
| if Perform_redact: | |
| with st.spinner(text="In progress..."): | |
| audio = extract_write_audio(vid) | |
| audio_url = upload_to_AssemblyAI("audio.wav") | |
| print(audio_url) | |
| print([ x for x in options ]) | |
| polling_endpoint = pii_redact(audio_url,options) | |
| results = pii_redact_audio(polling_endpoint) | |
| download_redact_audio(polling_endpoint) | |
| redact_audio_video_display(vid,"redacted_audio.mp3") | |
| Model_type = st.sidebar.selectbox("Choose Model",('Tiny - Best for Srt generation', 'Base - Best suited for various AI services', 'Medium - Use this model for filler word removal'),0) | |
| upload_video = st.sidebar.file_uploader("Upload mp4 file",type=["mp4","mpeg"]) | |
| youtube_url = st.sidebar.text_input("Enter a youtube video url") | |
| # submit_button = st.sidebar.button("Extract Youtube Video") | |
| if Model_type.startswith("Tiny"): | |
| model_selected = 'tiny.en' | |
| if Model_type.startswith("Base"): | |
| model_selected = 'base.en' | |
| if Model_type.startswith("Small"): | |
| model_selected = 'small.en' | |
| if Model_type.startswith("Medium"): | |
| model_selected = 'medium.en' | |
| if youtube_url: | |
| asyncio.run(main(youtube_url,model_selected)) | |
| if upload_video: | |
| asyncio.run(main(upload_video,model_selected)) | |
| st.sidebar.write("Kindly upload or provide youtube link with less a minute of video for faster performance and avoid excess usage of the free tier.") | |