ai-kit / recording_analysis /recording_analysis.py
Kim Adams
adding ocr
8a5a18b
import os, openai,string
import pandas as pd
from pydub import AudioSegment
from utilities import constants, api_keys, clean_text, prompt_constants
from textwrap import wrap
from moviepy.editor import VideoFileClip
from nltk.tokenize import word_tokenize
openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY')
key_words=set()
def CompletionEngine(sys_message, user_message, num_tokens, num_results, temperature, topic_model, top_p):
return openai.ChatCompletion.create(
model=topic_model,
messages=[
{"role": "system", "content": sys_message},
{"role": "user", "content": user_message}, ],
max_tokens=num_tokens,
n=num_results,
temperature=temperature,
stop=None,
top_p=top_p
)
#------------- #1: strip audio from video, create text from audio using OpenAI whisper-1 ----------------
def StripAndTranslateAudio(input_file):
if not os.path.exists(input_file):
return f"{input_file} {constants.FILE_DOES_NOT_EXIST}",[],pd.DataFrame({"role": [""], "content": [""] })
video = VideoFileClip(input_file)
audio = video.audio
if not input_file.endswith((".mp4", ".mov", ".avi", ".mkv")):
return constants.ANALYSIS_WRONG_FORMAT,[],pd.DataFrame({"role": [""], "content": [""] })
file_name=clean_text.CleanFileName(input_file)
transcript=""
messages=[]
#see if already transcribed, if so, return the transcript
if os.path.exists(constants.TRANSCRIPT_PATH+file_name+".txt"):
with open(constants.TRANSCRIPT_PATH+file_name+".txt", "r") as f:
transcript = f.read()
messages.append({"role": "system", "content": f"Sending audio file {file_name} to OpenAI whisper-1"})
messages.append({"role": "transcribe", "content": transcript})
else:
audio_file = constants.ORIGINALS_PATH+"audio_" + file_name + '.wav'
print(audio_file)
audio.write_audiofile(audio_file)
audio_segment = AudioSegment.from_file(audio_file, format="wav")
chunk_length = 60 * 1000 # 1 minute in milliseconds
chunks = [audio_segment[i:i + chunk_length] for i in range(0, len(audio_segment), chunk_length)]
full_transcript = ""
for i, chunk in enumerate(chunks):
chunk_audio_file = f"{constants.ORIGINALS_PATH}audio_chunk_{i}_{file_name}.wav"
chunk.export(chunk_audio_file, format="wav")
messages.append({"role": "system", "content": f"Sending audio chunk {i} to OpenAI whisper-1"})
with open(chunk_audio_file, 'rb') as f:
chunk_transcript = openai.Audio.transcribe("whisper-1", f)
full_transcript += chunk_transcript['text'] + " "
messages.append({"role": "transcribe", "content": chunk_transcript['text']})
os.remove(chunk_audio_file)
with open(constants.TRANSCRIPT_PATH + file_name + ".txt", "w") as f:
f.write(full_transcript)
os.remove(audio_file)
transcript = full_transcript
with open(constants.TRANSCRIPT_PATH + file_name + ".txt", "w") as f:
f.write(transcript)
return transcript, messages
#------------- 2: chunk & process transcripts using OpenAI gpt-3.5-turbo ----------------
def SummarizeLargeTranscript(transcript, messages):
while(NeedsChunks(transcript)):
text_chunks=CreateChunks(transcript)
summarized_text=SummarizeChunks(text_chunks,messages)
transcript=summarized_text
return transcript, messages
def NeedsChunks(transcript):
return len(transcript) > constants.CHUNK_LENGTH
def CreateChunks(transcript):
# Calculate the total length of the string and segment length
total_length = len(transcript)
segment_length = constants.CHUNK_LENGTH
segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)]
text_chunks = []
start_idx = 0
for end_idx in segment_indices:
# Adjust end index backward to find a space or punctuation mark
while end_idx > start_idx and transcript[end_idx] not in string.whitespace + string.punctuation:
end_idx -= 1
if end_idx > start_idx:
text_chunks.append(transcript[start_idx:end_idx])
start_idx = end_idx + 1 # Skip the space or punctuation
if start_idx < total_length:
text_chunks.append(transcript[start_idx:])
return text_chunks
def SummarizeChunks(text_chunks, messages):
summarized_text_list = []
for chunk in text_chunks:
chunk_summary = SummarizeChunk(chunk, messages)
summarized_text_list.append(chunk_summary)
return " ".join(summarized_text_list)
def SummarizeChunk(chunk, messages):
chunk = clean_text.CleanText(chunk)
completion = CompletionEngine(prompt_constants.ANALYSIS_SYSTEM_PROMPT, chunk, constants.SUMMARY_TOKENS, constants.NUM_RESULTS, constants.TEMP, constants.ANALYSIS_MODEL, constants.TOP_P)
summary = completion.choices[0]['message']['content']
messages.append({"role": "assistant", "content": summary})
return summary
#------------- #3: find topics using OpenAI gpt-3.5-turbo ----------------
def FindTopics(transcript, messages):
messages.append({"role": "system", "content": prompt_constants.KEYWORD_SYSTEM_PROMPT})
topicCompletion=CompletionEngine(prompt_constants.KEYWORD_SYSTEM_PROMPT, transcript, constants.KEYWORD_TOKENS, constants.NUM_RESULTS, constants.TEMP, constants.ANALYSIS_MODEL, constants.TOP_P)
topics=topicCompletion.choices[0]['message']['content']
messages.append({"role": "assistant", "content": topics})
return topics, messages
def ProcessAudio(input_file):
#1: strip audio from video, create text from audio using OpenAI whisper-1
transcript,messages=StripAndTranslateAudio(input_file)
messages.append({"role": "system", "content": prompt_constants.ANALYSIS_SYSTEM_PROMPT})
#2: process transcript using OpenAI gpt-3.5-turbo
transcript,messages= SummarizeLargeTranscript(transcript,messages)
#3: find topics using OpenAI gpt-3.5-turbo
topics,messages = FindTopics(transcript,messages)
df = pd.DataFrame(messages)
return transcript, topics, df