import streamlit as st import os, uuid, json import requests import os import openai import time from tempfile import NamedTemporaryFile from st_audiorec import st_audiorec from azure.identity import DefaultAzureCredential from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient from datetime import datetime from pydub import AudioSegment AOAI_ENDPOINT = "https://whisper-aoai-sean.openai.azure.com" WHISPER_DEPLOYMENT_NAME = "whisper" AOAI_KEY = os.environ.get("AOAI_KEY") WHISPER_PROMPT = "The following is a conversation between a doctor and a patient." AOAI_PROMPT_DOCTOR = "I am a doctor. create a summary of this patient encounter for me. respond in the same language as the text was given in." AOAI_PROMPT_STANDARD = "Summerize this text. Call out key points. Return in markdown format." AZURE_BLOB_CONNECTION_STRING = os.environ.get("AZURE_BLOB_CONNECTION_STRING") TRANSCRIPTION_API_KEY = os.environ.get("TRANSCRIPTION_API_KEY") wav_audio_data = None openai.api_type = "azure" openai.api_base = aoai_endpoint = "https://eastus-openai-sean.openai.azure.com/" openai.api_key = aoai_key = os.environ.get("AOAI_API_KEY") openai.api_version = "2023-07-01-preview" if "hebrew_mode" not in st.session_state: st.session_state["hebrew_mode"] = '' if "summary" not in st.session_state: st.session_state["summary"] = '' if "request_status" not in st.session_state: st.session_state["request_status"] = "Pending" if "transcription" not in st.session_state: st.session_state["transcription"] = '' if "recording" not in st.session_state: st.session_state["recording"] = 'na' if "clicked" not in st.session_state: st.session_state["clicked"] = False if "raw_transcription" not in st.session_state: st.session_state["raw_transcription"] = '' def click_button(): st.session_state["clicked"] = True def create_transcription_request(blob_url): url = "https://eastus.api.cognitive.microsoft.com/speechtotext/v3.2-preview.1/transcriptions" locale = "en-us" if st.session_state.hebrew_mode: locale = "he-il" payload = json.dumps({ "displayName": "20231106_182337", "description": "Speech Studio Batch speech to text", "locale": locale, "contentUrls": [ blob_url ], "model": { "self": "https://eastus.api.cognitive.microsoft.com/speechtotext/v3.2-preview.1/models/base/e830341e-8f47-4e0a-b64c-3f66167b751c" }, "properties": { "wordLevelTimestampsEnabled": False, "displayFormWordLevelTimestampsEnabled": False, "diarizationEnabled": True, "diarization": { "speakers": { "minCount": 1, "maxCount": 2 } }, "punctuationMode": "DictatedAndAutomatic", "profanityFilterMode": "Masked" }, "customProperties": {} }) headers = { 'Ocp-Apim-Subscription-Key': TRANSCRIPTION_API_KEY, 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) if response.status_code != 201: return st.error("Error creating transcription request") else: return response.json()["self"] def attempt_to_get_transcription(transcription_url): headers = { 'Ocp-Apim-Subscription-Key': TRANSCRIPTION_API_KEY, 'Content-Type': 'application/json' } output = requests.get(transcription_url, headers=headers).json() return output["status"] def extract_conversation(json_data): # Parse the JSON data data = json.loads(json_data) # Extract the recognized phrases recognized_phrases = data.get("recognizedPhrases", []) # Sort the phrases by offsetInTicks (if they're not already sorted) recognized_phrases.sort(key=lambda x: x.get("offsetInTicks", 0)) # Build the conversation string conversation = [] for phrase in recognized_phrases: speaker = f"Person {phrase.get('speaker')}" # Assuming we want to take the first 'nBest' element as it's the most confident one text = phrase['nBest'][0].get('display', '') conversation.append(f"{speaker}: {text} \n") # Join the conversation lines with a newline character return '\n'.join(conversation) def get_final_transcription(transcription_url): headers = { 'Ocp-Apim-Subscription-Key': TRANSCRIPTION_API_KEY, 'Content-Type': 'application/json' } transcription_url = f"{transcription_url}/files" output = requests.get(transcription_url, headers=headers).json()["values"] for item in output: if item["kind"] == "Transcription": output = item["links"]["contentUrl"] break request = requests.get(output, headers=headers) return extract_conversation(request.text) def upload_audio(audio_bytes): # save audio to temp file now = datetime.now() filename = now.strftime("%Y%m%d_%H%M%S") + ".wav" # save it as a temporary file with NamedTemporaryFile(delete=False) as f: if type(audio_bytes) == bytes: f.write(audio_bytes) else: f.write(audio_bytes.getbuffer()) temp_filename = f.name sound = AudioSegment.from_wav(temp_filename) sound = sound.set_channels(1) sound.export(f"{temp_filename}.wav", format="wav") blob_service_client = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION_STRING) blob_client = blob_service_client.get_blob_client(container="audiofiles", blob=filename) try: with open(f"{temp_filename}.wav", "rb") as data: blob_client.upload_blob(data) return blob_client.url except: return st.error("Error uploading to Azure Blob Storage") def summerize_with_gpt(text, additional="Standard"): response = openai.ChatCompletion.create( engine="gpt-4-32k", messages = [{"role":"system","content": f"{AOAI_PROMPT_DOCTOR} \n {additional}"}, {"role":"user","content":text}], temperature=0.2, max_tokens=1200, top_p=0.95, frequency_penalty=0, presence_penalty=0, stop=None) return response.choices[0].message.content def transcribe(audio_bytes): url = f"{AOAI_ENDPOINT}/openai/deployments/{WHISPER_DEPLOYMENT_NAME}/audio/transcriptions?prompt={WHISPER_PROMPT}&api-key={AOAI_KEY}&api-version=2023-09-01-preview" files = [ ('file', ('Recording.wav', audio_bytes, 'application/octet-stream')) ] response = requests.post(url, files=files) return response.json() st.title("Summerizer 🧬") st.session_state.hebrew_mode = st.toggle("Hebew", False) # st.session_state.hebrew_mode = st.toggle("Hebrew Mode", False) select_container = st.empty() text_box = st.empty() request_completed = False tmp = "" html_right = "
{raw_transcription}
",unsafe_allow_html=True) else: st.session_state.raw_transcript = st.markdown(f"{raw_transcription}") with st.status("Using GPT-4 to summerize..."): st.write("Starting up the GPUs!") st.session_state.summary = summerize_with_gpt(raw_transcription) st.write("Successfully Summerized!") st.toast("Successfully Summerized!",icon="✅") with st.expander("Summary", False): if st.session_state.hebrew_mode: st.markdown(f"
{st.session_state.summary}
",unsafe_allow_html=True) else: st.markdown(f"{st.session_state.summary}",unsafe_allow_html=True) elif text_data is not None: st.session_state.clicked = False with st.status("Using GPT-4 to summerize..."): st.write("Starting up the GPUs!") st.session_state.summary = summerize_with_gpt(text_data, summary_types) st.write("Successfully Summerized!") st.toast("Successfully Summerized!",icon="✅") with st.expander("Summary", False): if st.session_state.hebrew_mode: st.markdown(f"
{st.session_state.summary}
",unsafe_allow_html=True) else: st.markdown(f"{st.session_state.summary}",unsafe_allow_html=True) else: st.error("Please upload or record audio") st.session_state.clicked = False