import whisper import gradio as gr import datetime import openai import subprocess import os import torch from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding import re from pyannote.audio import Audio from pyannote.core import Segment import wave import contextlib from sklearn.cluster import AgglomerativeClustering import numpy as np openai.api_key = os.getenv("open_ai") audio = Audio() model = whisper.load_model("large-v2") embedding_model = PretrainedSpeakerEmbedding( "speechbrain/spkrec-ecapa-voxceleb", device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') ) def transcribe(audio): path, error = convert_to_wav(audio) if error is not None: return error duration = get_duration(path) if duration > 4 * 60 * 60: return "Audio duration too long" result = model.transcribe(path) segments = result["segments"] num_speakers = 2 if len(segments) == 1: segments[0]['speaker'] = 'SPEAKER 1' else: embeddings = make_embeddings(path, segments, duration) add_speaker_labels(segments, embeddings, num_speakers) output = get_output(segments) return output def convert_to_wav(path): if path[-3:] != 'wav': new_path = '.'.join(path.split('.')[:-1]) + '.wav' try: subprocess.call(['ffmpeg', '-i', path, new_path, '-y']) except: return path, 'Error: Could not convert file to .wav' path = new_path return path, None def get_duration(path): with contextlib.closing(wave.open(path,'r')) as f: frames = f.getnframes() rate = f.getframerate() return frames / float(rate) def make_embeddings(path, segments, duration): embeddings = np.zeros(shape=(len(segments), 192)) for i, segment in enumerate(segments): embeddings[i] = segment_embedding(path, segment, duration) return np.nan_to_num(embeddings) def segment_embedding(path, segment, duration): start = segment["start"] end = min(duration, segment["end"]) clip = Segment(start, end) waveform, sample_rate = audio.crop(path, clip) return embedding_model(waveform[None]) def add_speaker_labels(segments, embeddings, num_speakers): clustering = AgglomerativeClustering(num_speakers).fit(embeddings) labels = clustering.labels_ for i in range(len(segments)): segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1) def time(secs): return datetime.timedelta(seconds=round(secs)) def get_output(segments): output = '' for (i, segment) in enumerate(segments): if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: if i != 0: output += '\n' output += segment["speaker"] + ' ' + str(time(segment["start"])) + '\n' output += segment["text"][1:] + ' ' return output def calculate_sentiment_score(output): max_context_length = 4096 chunks = [output[i:i+max_context_length] for i in range(0, len(output), max_context_length)] prompt_prefix = "Classify the sentiment of the following coaching calls as very positive, positive, nuetral, negative, or very negative.: " prompt_suffix = "\nsentiment: " sentiment_scores = [] for chunk in chunks: prompt_text = prompt_prefix + chunk + prompt_suffix response = openai.Completion.create( model="text-davinci-003", prompt=prompt_text, temperature=0, max_tokens=15 ) sentiment_score = re.sub('\W+', '', response['choices'][0]['text']) sentiment_scores.append(sentiment_score) return ', '.join(sentiment_scores) def process_transcript_and_extract_qa(output): lines = output.split("\n") new_lines = [] for i in range(len(lines)): if i % 2 == 0: speaker = lines[i].strip() message = lines[i+1].strip() new_line = f"{speaker}: {message}" new_lines.append(new_line) processed_transcript = "\n".join(new_lines) lines = processed_transcript.split('\n') qa_dict = {} for i in range(len(lines)): if 'SPEAKER 1' in lines[i] and lines[i].endswith('?'): q = lines[i].split(': ')[1] for j in range(i+1, len(lines)): if 'SPEAKER 2' in lines[j]: a = lines[j].split(': ')[1] qa_dict['Q: ' + q] = 'A: ' + a break q_a_list = [] for key, value in qa_dict.items(): q_a_list.append(key + '\n' + value) output_str = "\n".join(q_a_list) return output_str demo = gr.Blocks() with demo: gr.Markdown( ''' # AlterCall AI File Processing ''') audio_file = gr.Audio(type="filepath") output = gr.Textbox(label ='Transcript') output_str = gr.Textbox(label='Questions and Answers') sentiment_scores = gr.Textbox(label ='Sentiment Scores') b1 = gr.Button("Transcribe Call") b1.click(transcribe, inputs=audio_file, outputs=output) b2 = gr.Button("Identify Questions and Answers") b2.click(process_transcript_and_extract_qa, inputs=output, outputs=output_str) b3 = gr.Button("Classify Sentiment") b3.click(calculate_sentiment_score, inputs=output, outputs=sentiment_scores) demo.launch()