| import whisper | |
| import gradio as gr | |
| import datetime | |
| import openai | |
| import subprocess | |
| import os | |
| import torch | |
| from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding | |
| import re | |
| from pyannote.audio import Audio | |
| from pyannote.core import Segment | |
| import wave | |
| import contextlib | |
| from sklearn.cluster import AgglomerativeClustering | |
| import numpy as np | |
| openai.api_key = os.getenv("open_ai") | |
| audio = Audio() | |
| model = whisper.load_model("large-v2") | |
| embedding_model = PretrainedSpeakerEmbedding( | |
| "speechbrain/spkrec-ecapa-voxceleb", | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| ) | |
| def transcribe(audio): | |
| path, error = convert_to_wav(audio) | |
| if error is not None: | |
| return error | |
| duration = get_duration(path) | |
| if duration > 4 * 60 * 60: | |
| return "Audio duration too long" | |
| result = model.transcribe(path) | |
| segments = result["segments"] | |
| num_speakers = 2 | |
| if len(segments) == 1: | |
| segments[0]['speaker'] = 'SPEAKER 1' | |
| else: | |
| embeddings = make_embeddings(path, segments, duration) | |
| add_speaker_labels(segments, embeddings, num_speakers) | |
| output = get_output(segments) | |
| return output | |
| def convert_to_wav(path): | |
| if path[-3:] != 'wav': | |
| new_path = '.'.join(path.split('.')[:-1]) + '.wav' | |
| try: | |
| subprocess.call(['ffmpeg', '-i', path, new_path, '-y']) | |
| except: | |
| return path, 'Error: Could not convert file to .wav' | |
| path = new_path | |
| return path, None | |
| def get_duration(path): | |
| with contextlib.closing(wave.open(path,'r')) as f: | |
| frames = f.getnframes() | |
| rate = f.getframerate() | |
| return frames / float(rate) | |
| def make_embeddings(path, segments, duration): | |
| embeddings = np.zeros(shape=(len(segments), 192)) | |
| for i, segment in enumerate(segments): | |
| embeddings[i] = segment_embedding(path, segment, duration) | |
| return np.nan_to_num(embeddings) | |
| def segment_embedding(path, segment, duration): | |
| start = segment["start"] | |
| end = min(duration, segment["end"]) | |
| clip = Segment(start, end) | |
| waveform, sample_rate = audio.crop(path, clip) | |
| return embedding_model(waveform[None]) | |
| def add_speaker_labels(segments, embeddings, num_speakers): | |
| clustering = AgglomerativeClustering(num_speakers).fit(embeddings) | |
| labels = clustering.labels_ | |
| for i in range(len(segments)): | |
| segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1) | |
| def time(secs): | |
| return datetime.timedelta(seconds=round(secs)) | |
| def get_output(segments): | |
| output = '' | |
| for (i, segment) in enumerate(segments): | |
| if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: | |
| if i != 0: | |
| output += '\n' | |
| output += segment["speaker"] + ' ' + str(time(segment["start"])) + '\n' | |
| output += segment["text"][1:] + ' ' | |
| return output | |
| def calculate_sentiment_score(output): | |
| max_context_length = 4096 | |
| chunks = [output[i:i+max_context_length] for i in range(0, len(output), max_context_length)] | |
| prompt_prefix = "Classify the sentiment of the following coaching calls as very positive, positive, nuetral, negative, or very negative.: " | |
| prompt_suffix = "\nsentiment: " | |
| sentiment_scores = [] | |
| for chunk in chunks: | |
| prompt_text = prompt_prefix + chunk + prompt_suffix | |
| response = openai.Completion.create( | |
| model="text-davinci-003", | |
| prompt=prompt_text, | |
| temperature=0, | |
| max_tokens=15 | |
| ) | |
| sentiment_score = re.sub('\W+', '', response['choices'][0]['text']) | |
| sentiment_scores.append(sentiment_score) | |
| return ', '.join(sentiment_scores) | |
| def process_transcript_and_extract_qa(output): | |
| lines = output.split("\n") | |
| new_lines = [] | |
| for i in range(len(lines)): | |
| if i % 2 == 0: | |
| speaker = lines[i].strip() | |
| message = lines[i+1].strip() | |
| new_line = f"{speaker}: {message}" | |
| new_lines.append(new_line) | |
| processed_transcript = "\n".join(new_lines) | |
| lines = processed_transcript.split('\n') | |
| qa_dict = {} | |
| for i in range(len(lines)): | |
| if 'SPEAKER 1' in lines[i] and lines[i].endswith('?'): | |
| q = lines[i].split(': ')[1] | |
| for j in range(i+1, len(lines)): | |
| if 'SPEAKER 2' in lines[j]: | |
| a = lines[j].split(': ')[1] | |
| qa_dict['Q: ' + q] = 'A: ' + a | |
| break | |
| q_a_list = [] | |
| for key, value in qa_dict.items(): | |
| q_a_list.append(key + '\n' + value) | |
| output_str = "\n".join(q_a_list) | |
| return output_str | |
| demo = gr.Blocks() | |
| with demo: | |
| gr.Markdown( | |
| ''' | |
| # AlterCall AI File Processing | |
| ''') | |
| audio_file = gr.Audio(type="filepath") | |
| output = gr.Textbox(label ='Transcript') | |
| output_str = gr.Textbox(label='Questions and Answers') | |
| sentiment_scores = gr.Textbox(label ='Sentiment Scores') | |
| b1 = gr.Button("Transcribe Call") | |
| b1.click(transcribe, inputs=audio_file, outputs=output) | |
| b2 = gr.Button("Identify Questions and Answers") | |
| b2.click(process_transcript_and_extract_qa, inputs=output, outputs=output_str) | |
| b3 = gr.Button("Classify Sentiment") | |
| b3.click(calculate_sentiment_score, inputs=output, outputs=sentiment_scores) | |
| demo.launch() |