arahrooh31's picture
Update app.py
3e4ee13
import whisper
import gradio as gr
import datetime
import openai
import subprocess
import os
import torch
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
import re
from pyannote.audio import Audio
from pyannote.core import Segment
import wave
import contextlib
from sklearn.cluster import AgglomerativeClustering
import numpy as np
openai.api_key = os.getenv("open_ai")
audio = Audio()
model = whisper.load_model("large-v2")
embedding_model = PretrainedSpeakerEmbedding(
"speechbrain/spkrec-ecapa-voxceleb",
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
)
def transcribe(audio):
path, error = convert_to_wav(audio)
if error is not None:
return error
duration = get_duration(path)
if duration > 4 * 60 * 60:
return "Audio duration too long"
result = model.transcribe(path)
segments = result["segments"]
num_speakers = 2
if len(segments) == 1:
segments[0]['speaker'] = 'SPEAKER 1'
else:
embeddings = make_embeddings(path, segments, duration)
add_speaker_labels(segments, embeddings, num_speakers)
output = get_output(segments)
return output
def convert_to_wav(path):
if path[-3:] != 'wav':
new_path = '.'.join(path.split('.')[:-1]) + '.wav'
try:
subprocess.call(['ffmpeg', '-i', path, new_path, '-y'])
except:
return path, 'Error: Could not convert file to .wav'
path = new_path
return path, None
def get_duration(path):
with contextlib.closing(wave.open(path,'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
return frames / float(rate)
def make_embeddings(path, segments, duration):
embeddings = np.zeros(shape=(len(segments), 192))
for i, segment in enumerate(segments):
embeddings[i] = segment_embedding(path, segment, duration)
return np.nan_to_num(embeddings)
def segment_embedding(path, segment, duration):
start = segment["start"]
end = min(duration, segment["end"])
clip = Segment(start, end)
waveform, sample_rate = audio.crop(path, clip)
return embedding_model(waveform[None])
def add_speaker_labels(segments, embeddings, num_speakers):
clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)
def time(secs):
return datetime.timedelta(seconds=round(secs))
def get_output(segments):
output = ''
for (i, segment) in enumerate(segments):
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
if i != 0:
output += '\n'
output += segment["speaker"] + ' ' + str(time(segment["start"])) + '\n'
output += segment["text"][1:] + ' '
return output
def calculate_sentiment_score(output):
max_context_length = 4096
chunks = [output[i:i+max_context_length] for i in range(0, len(output), max_context_length)]
prompt_prefix = "Classify the sentiment of the following coaching calls as very positive, positive, nuetral, negative, or very negative.: "
prompt_suffix = "\nsentiment: "
sentiment_scores = []
for chunk in chunks:
prompt_text = prompt_prefix + chunk + prompt_suffix
response = openai.Completion.create(
model="text-davinci-003",
prompt=prompt_text,
temperature=0,
max_tokens=15
)
sentiment_score = re.sub('\W+', '', response['choices'][0]['text'])
sentiment_scores.append(sentiment_score)
return ', '.join(sentiment_scores)
def process_transcript_and_extract_qa(output):
lines = output.split("\n")
new_lines = []
for i in range(len(lines)):
if i % 2 == 0:
speaker = lines[i].strip()
message = lines[i+1].strip()
new_line = f"{speaker}: {message}"
new_lines.append(new_line)
processed_transcript = "\n".join(new_lines)
lines = processed_transcript.split('\n')
qa_dict = {}
for i in range(len(lines)):
if 'SPEAKER 1' in lines[i] and lines[i].endswith('?'):
q = lines[i].split(': ')[1]
for j in range(i+1, len(lines)):
if 'SPEAKER 2' in lines[j]:
a = lines[j].split(': ')[1]
qa_dict['Q: ' + q] = 'A: ' + a
break
q_a_list = []
for key, value in qa_dict.items():
q_a_list.append(key + '\n' + value)
output_str = "\n".join(q_a_list)
return output_str
demo = gr.Blocks()
with demo:
gr.Markdown(
'''
# AlterCall AI File Processing
''')
audio_file = gr.Audio(type="filepath")
output = gr.Textbox(label ='Transcript')
output_str = gr.Textbox(label='Questions and Answers')
sentiment_scores = gr.Textbox(label ='Sentiment Scores')
b1 = gr.Button("Transcribe Call")
b1.click(transcribe, inputs=audio_file, outputs=output)
b2 = gr.Button("Identify Questions and Answers")
b2.click(process_transcript_and_extract_qa, inputs=output, outputs=output_str)
b3 = gr.Button("Classify Sentiment")
b3.click(calculate_sentiment_score, inputs=output, outputs=sentiment_scores)
demo.launch()