Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os, uuid, json | |
| import requests | |
| import os | |
| import openai | |
| import time | |
| from tempfile import NamedTemporaryFile | |
| from st_audiorec import st_audiorec | |
| from azure.identity import DefaultAzureCredential | |
| from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient | |
| from datetime import datetime | |
| from pydub import AudioSegment | |
| AOAI_ENDPOINT = "https://whisper-aoai-sean.openai.azure.com" | |
| WHISPER_DEPLOYMENT_NAME = "whisper" | |
| AOAI_KEY = os.environ.get("AOAI_KEY") | |
| WHISPER_PROMPT = "The following is a conversation between a doctor and a patient." | |
| AOAI_PROMPT_DOCTOR = "I am a doctor. create a summary of this patient encounter for me. respond in the same language as the text was given in." | |
| AOAI_PROMPT_STANDARD = "Summerize this text. Call out key points. Return in markdown format." | |
| AZURE_BLOB_CONNECTION_STRING = os.environ.get("AZURE_BLOB_CONNECTION_STRING") | |
| TRANSCRIPTION_API_KEY = os.environ.get("TRANSCRIPTION_API_KEY") | |
| wav_audio_data = None | |
| openai.api_type = "azure" | |
| openai.api_base = aoai_endpoint = "https://eastus-openai-sean.openai.azure.com/" | |
| openai.api_key = aoai_key = os.environ.get("AOAI_API_KEY") | |
| openai.api_version = "2023-07-01-preview" | |
| if "hebrew_mode" not in st.session_state: | |
| st.session_state["hebrew_mode"] = '' | |
| if "summary" not in st.session_state: | |
| st.session_state["summary"] = '' | |
| if "request_status" not in st.session_state: | |
| st.session_state["request_status"] = "Pending" | |
| if "transcription" not in st.session_state: | |
| st.session_state["transcription"] = '' | |
| if "recording" not in st.session_state: | |
| st.session_state["recording"] = 'na' | |
| if "clicked" not in st.session_state: | |
| st.session_state["clicked"] = False | |
| if "raw_transcription" not in st.session_state: | |
| st.session_state["raw_transcription"] = '' | |
| def click_button(): | |
| st.session_state["clicked"] = True | |
| def create_transcription_request(blob_url): | |
| url = "https://eastus.api.cognitive.microsoft.com/speechtotext/v3.2-preview.1/transcriptions" | |
| locale = "en-us" | |
| if st.session_state.hebrew_mode: | |
| locale = "he-il" | |
| payload = json.dumps({ | |
| "displayName": "20231106_182337", | |
| "description": "Speech Studio Batch speech to text", | |
| "locale": locale, | |
| "contentUrls": [ | |
| blob_url | |
| ], | |
| "model": { | |
| "self": "https://eastus.api.cognitive.microsoft.com/speechtotext/v3.2-preview.1/models/base/e830341e-8f47-4e0a-b64c-3f66167b751c" | |
| }, | |
| "properties": { | |
| "wordLevelTimestampsEnabled": False, | |
| "displayFormWordLevelTimestampsEnabled": False, | |
| "diarizationEnabled": True, | |
| "diarization": { | |
| "speakers": { | |
| "minCount": 1, | |
| "maxCount": 2 | |
| } | |
| }, | |
| "punctuationMode": "DictatedAndAutomatic", | |
| "profanityFilterMode": "Masked" | |
| }, | |
| "customProperties": {} | |
| }) | |
| headers = { | |
| 'Ocp-Apim-Subscription-Key': TRANSCRIPTION_API_KEY, | |
| 'Content-Type': 'application/json' | |
| } | |
| response = requests.request("POST", url, headers=headers, data=payload) | |
| if response.status_code != 201: | |
| return st.error("Error creating transcription request") | |
| else: | |
| return response.json()["self"] | |
| def attempt_to_get_transcription(transcription_url): | |
| headers = { | |
| 'Ocp-Apim-Subscription-Key': TRANSCRIPTION_API_KEY, | |
| 'Content-Type': 'application/json' | |
| } | |
| output = requests.get(transcription_url, headers=headers).json() | |
| return output["status"] | |
| def extract_conversation(json_data): | |
| # Parse the JSON data | |
| data = json.loads(json_data) | |
| # Extract the recognized phrases | |
| recognized_phrases = data.get("recognizedPhrases", []) | |
| # Sort the phrases by offsetInTicks (if they're not already sorted) | |
| recognized_phrases.sort(key=lambda x: x.get("offsetInTicks", 0)) | |
| # Build the conversation string | |
| conversation = [] | |
| for phrase in recognized_phrases: | |
| speaker = f"Person {phrase.get('speaker')}" | |
| # Assuming we want to take the first 'nBest' element as it's the most confident one | |
| text = phrase['nBest'][0].get('display', '') | |
| conversation.append(f"{speaker}: {text} \n") | |
| # Join the conversation lines with a newline character | |
| return '\n'.join(conversation) | |
| def get_final_transcription(transcription_url): | |
| headers = { | |
| 'Ocp-Apim-Subscription-Key': TRANSCRIPTION_API_KEY, | |
| 'Content-Type': 'application/json' | |
| } | |
| transcription_url = f"{transcription_url}/files" | |
| output = requests.get(transcription_url, headers=headers).json()["values"] | |
| for item in output: | |
| if item["kind"] == "Transcription": | |
| output = item["links"]["contentUrl"] | |
| break | |
| request = requests.get(output, headers=headers) | |
| return extract_conversation(request.text) | |
| def upload_audio(audio_bytes): | |
| # save audio to temp file | |
| now = datetime.now() | |
| filename = now.strftime("%Y%m%d_%H%M%S") + ".wav" | |
| # save it as a temporary file | |
| with NamedTemporaryFile(delete=False) as f: | |
| if type(audio_bytes) == bytes: | |
| f.write(audio_bytes) | |
| else: | |
| f.write(audio_bytes.getbuffer()) | |
| temp_filename = f.name | |
| sound = AudioSegment.from_wav(temp_filename) | |
| sound = sound.set_channels(1) | |
| sound.export(f"{temp_filename}.wav", format="wav") | |
| blob_service_client = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION_STRING) | |
| blob_client = blob_service_client.get_blob_client(container="audiofiles", blob=filename) | |
| try: | |
| with open(f"{temp_filename}.wav", "rb") as data: | |
| blob_client.upload_blob(data) | |
| return blob_client.url | |
| except: | |
| return st.error("Error uploading to Azure Blob Storage") | |
| def summerize_with_gpt(text, additional="Standard"): | |
| response = openai.ChatCompletion.create( | |
| engine="gpt-4-32k", | |
| messages = [{"role":"system","content": f"{AOAI_PROMPT_DOCTOR} \n {additional}"}, {"role":"user","content":text}], | |
| temperature=0.2, | |
| max_tokens=1200, | |
| top_p=0.95, | |
| frequency_penalty=0, | |
| presence_penalty=0, | |
| stop=None) | |
| return response.choices[0].message.content | |
| def transcribe(audio_bytes): | |
| url = f"{AOAI_ENDPOINT}/openai/deployments/{WHISPER_DEPLOYMENT_NAME}/audio/transcriptions?prompt={WHISPER_PROMPT}&api-key={AOAI_KEY}&api-version=2023-09-01-preview" | |
| files = [ | |
| ('file', ('Recording.wav', audio_bytes, 'application/octet-stream')) | |
| ] | |
| response = requests.post(url, files=files) | |
| return response.json() | |
| st.title("Summerizer π§¬") | |
| st.session_state.hebrew_mode = st.toggle("Hebew", False) | |
| # st.session_state.hebrew_mode = st.toggle("Hebrew Mode", False) | |
| select_container = st.empty() | |
| text_box = st.empty() | |
| request_completed = False | |
| tmp = "" | |
| html_right = "<div style='text-align: right;>" | |
| with select_container.container(): | |
| select = st.selectbox("Upload or Record", ("Upload", "Record", "Text")) | |
| if select == "Record": | |
| wav_audio_data = st_audiorec() | |
| elif select == "Upload": | |
| wav_audio_data = st.file_uploader("Upload Audio", type=["wav"]) | |
| elif select == "Text": | |
| text_data = st.text_area("Enter Text") | |
| summary_types = st.text_input("Enter Summary Type etc. (Standard, Bullet, or Paragraph)") | |
| done_speech_button = st.button("Upload", on_click=click_button) | |
| if st.session_state.clicked: | |
| if wav_audio_data is not None: | |
| st.session_state.clicked = False | |
| with st.spinner("Uploading to Azure Blob storage..."): | |
| blob_url = upload_audio(wav_audio_data) | |
| st.toast("Successfully Uploaded!",icon="β ") | |
| with st.status("Using Azure Speech with OpenAI's Whisper to transcribe..."): | |
| transcription_request = create_transcription_request(blob_url) | |
| time.sleep(1) | |
| st.write("Transcription Request Created!") | |
| st.toast("Successfully Created Transcription Request!",icon="β ") | |
| while request_completed == False: | |
| request_status = attempt_to_get_transcription(transcription_request) | |
| if tmp != request_status: | |
| st.write(f"Transcription Status: {request_status}") | |
| time.sleep(1) | |
| tmp = request_status | |
| if request_status == "Succeeded": | |
| st.write("Transcription Complete!") | |
| st.toast("Successfully Transcribed!",icon="β ") | |
| request_completed = True | |
| st.write("Grabbing Transcription...") | |
| time.sleep(1) | |
| raw_transcription = get_final_transcription(transcription_url=transcription_request) | |
| st.write("Successfully Grabbed Transcription!") | |
| with st.expander("Transcription", False): | |
| if st.session_state.hebrew_mode: | |
| st.markdown(f"<div style='text-align: right;'> {raw_transcription} </div>",unsafe_allow_html=True) | |
| else: | |
| st.session_state.raw_transcript = st.markdown(f"{raw_transcription}") | |
| with st.status("Using GPT-4 to summerize..."): | |
| st.write("Starting up the GPUs!") | |
| st.session_state.summary = summerize_with_gpt(raw_transcription) | |
| st.write("Successfully Summerized!") | |
| st.toast("Successfully Summerized!",icon="β ") | |
| with st.expander("Summary", False): | |
| if st.session_state.hebrew_mode: | |
| st.markdown(f"<div style='text-align: right;'> {st.session_state.summary} </div>",unsafe_allow_html=True) | |
| else: | |
| st.markdown(f"{st.session_state.summary}",unsafe_allow_html=True) | |
| elif text_data is not None: | |
| st.session_state.clicked = False | |
| with st.status("Using GPT-4 to summerize..."): | |
| st.write("Starting up the GPUs!") | |
| st.session_state.summary = summerize_with_gpt(text_data, summary_types) | |
| st.write("Successfully Summerized!") | |
| st.toast("Successfully Summerized!",icon="β ") | |
| with st.expander("Summary", False): | |
| if st.session_state.hebrew_mode: | |
| st.markdown(f"<div style='text-align: right;'> {st.session_state.summary} </div>",unsafe_allow_html=True) | |
| else: | |
| st.markdown(f"{st.session_state.summary}",unsafe_allow_html=True) | |
| else: | |
| st.error("Please upload or record audio") | |
| st.session_state.clicked = False | |