| import os, openai,string |
| import pandas as pd |
| from pydub import AudioSegment |
| from utilities import constants, api_keys, clean_text, prompt_constants |
| from textwrap import wrap |
| from moviepy.editor import VideoFileClip |
| from nltk.tokenize import word_tokenize |
|
|
| openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY') |
| key_words=set() |
|
|
| def CompletionEngine(sys_message, user_message, num_tokens, num_results, temperature, topic_model, top_p): |
| return openai.ChatCompletion.create( |
| model=topic_model, |
| messages=[ |
| {"role": "system", "content": sys_message}, |
| {"role": "user", "content": user_message}, ], |
| max_tokens=num_tokens, |
| n=num_results, |
| temperature=temperature, |
| stop=None, |
| top_p=top_p |
| ) |
|
|
| |
| def StripAndTranslateAudio(input_file): |
| if not os.path.exists(input_file): |
| return f"{input_file} {constants.FILE_DOES_NOT_EXIST}",[],pd.DataFrame({"role": [""], "content": [""] }) |
| video = VideoFileClip(input_file) |
| audio = video.audio |
| if not input_file.endswith((".mp4", ".mov", ".avi", ".mkv")): |
| return constants.ANALYSIS_WRONG_FORMAT,[],pd.DataFrame({"role": [""], "content": [""] }) |
| file_name=clean_text.CleanFileName(input_file) |
| transcript="" |
| messages=[] |
| |
| if os.path.exists(constants.TRANSCRIPT_PATH+file_name+".txt"): |
| with open(constants.TRANSCRIPT_PATH+file_name+".txt", "r") as f: |
| transcript = f.read() |
| messages.append({"role": "system", "content": f"Sending audio file {file_name} to OpenAI whisper-1"}) |
| messages.append({"role": "transcribe", "content": transcript}) |
| else: |
| audio_file = constants.ORIGINALS_PATH+"audio_" + file_name + '.wav' |
| print(audio_file) |
| audio.write_audiofile(audio_file) |
| audio_segment = AudioSegment.from_file(audio_file, format="wav") |
| chunk_length = 60 * 1000 |
| chunks = [audio_segment[i:i + chunk_length] for i in range(0, len(audio_segment), chunk_length)] |
| full_transcript = "" |
| for i, chunk in enumerate(chunks): |
| chunk_audio_file = f"{constants.ORIGINALS_PATH}audio_chunk_{i}_{file_name}.wav" |
| chunk.export(chunk_audio_file, format="wav") |
| messages.append({"role": "system", "content": f"Sending audio chunk {i} to OpenAI whisper-1"}) |
| with open(chunk_audio_file, 'rb') as f: |
| chunk_transcript = openai.Audio.transcribe("whisper-1", f) |
| full_transcript += chunk_transcript['text'] + " " |
| messages.append({"role": "transcribe", "content": chunk_transcript['text']}) |
| os.remove(chunk_audio_file) |
| with open(constants.TRANSCRIPT_PATH + file_name + ".txt", "w") as f: |
| f.write(full_transcript) |
| os.remove(audio_file) |
| transcript = full_transcript |
| with open(constants.TRANSCRIPT_PATH + file_name + ".txt", "w") as f: |
| f.write(transcript) |
| return transcript, messages |
|
|
|
|
| |
| def SummarizeLargeTranscript(transcript, messages): |
| while(NeedsChunks(transcript)): |
| text_chunks=CreateChunks(transcript) |
| summarized_text=SummarizeChunks(text_chunks,messages) |
| transcript=summarized_text |
| return transcript, messages |
|
|
| def NeedsChunks(transcript): |
| return len(transcript) > constants.CHUNK_LENGTH |
|
|
| def CreateChunks(transcript): |
| |
| total_length = len(transcript) |
| segment_length = constants.CHUNK_LENGTH |
| segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)] |
| text_chunks = [] |
| start_idx = 0 |
| for end_idx in segment_indices: |
| |
| while end_idx > start_idx and transcript[end_idx] not in string.whitespace + string.punctuation: |
| end_idx -= 1 |
| if end_idx > start_idx: |
| text_chunks.append(transcript[start_idx:end_idx]) |
| start_idx = end_idx + 1 |
| if start_idx < total_length: |
| text_chunks.append(transcript[start_idx:]) |
| return text_chunks |
|
|
| def SummarizeChunks(text_chunks, messages): |
| summarized_text_list = [] |
| for chunk in text_chunks: |
| chunk_summary = SummarizeChunk(chunk, messages) |
| summarized_text_list.append(chunk_summary) |
| return " ".join(summarized_text_list) |
|
|
| def SummarizeChunk(chunk, messages): |
| chunk = clean_text.CleanText(chunk) |
| completion = CompletionEngine(prompt_constants.ANALYSIS_SYSTEM_PROMPT, chunk, constants.SUMMARY_TOKENS, constants.NUM_RESULTS, constants.TEMP, constants.ANALYSIS_MODEL, constants.TOP_P) |
| summary = completion.choices[0]['message']['content'] |
| messages.append({"role": "assistant", "content": summary}) |
| return summary |
|
|
|
|
| |
| def FindTopics(transcript, messages): |
| messages.append({"role": "system", "content": prompt_constants.KEYWORD_SYSTEM_PROMPT}) |
| topicCompletion=CompletionEngine(prompt_constants.KEYWORD_SYSTEM_PROMPT, transcript, constants.KEYWORD_TOKENS, constants.NUM_RESULTS, constants.TEMP, constants.ANALYSIS_MODEL, constants.TOP_P) |
| topics=topicCompletion.choices[0]['message']['content'] |
| messages.append({"role": "assistant", "content": topics}) |
| return topics, messages |
|
|
| def ProcessAudio(input_file): |
| |
| transcript,messages=StripAndTranslateAudio(input_file) |
| messages.append({"role": "system", "content": prompt_constants.ANALYSIS_SYSTEM_PROMPT}) |
|
|
| |
| transcript,messages= SummarizeLargeTranscript(transcript,messages) |
|
|
| |
| topics,messages = FindTopics(transcript,messages) |
|
|
| df = pd.DataFrame(messages) |
| return transcript, topics, df |