Spaces:
Runtime error
Runtime error
| import os | |
| import boto3 | |
| import openai | |
| import whisper | |
| import logging | |
| import base64 | |
| import gradio as gr | |
| from io import BytesIO | |
| from langchain import OpenAI | |
| from langchain.chains import RetrievalQA | |
| from langchain.vectorstores import Chroma | |
| from langchain.document_loaders import DirectoryLoader | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from assets.char_poses_base64 import idle_html_base_64, thinking_html_base_64, talking_html_base64 | |
| logging.basicConfig(level="INFO", | |
| filename='conversations.log', | |
| filemode='a', | |
| format='%(asctime)s %(message)s', | |
| datefmt='%H:%M:%S') | |
| logger = logging.getLogger('voice_agent') | |
| global FUNC_CALL | |
| FUNC_CALL = 0 | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
| AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
| AWS_REGION_NAME = 'ap-south-1' | |
| GENERAL_RSPONSE_TRIGGERS = ["I don't understand the question.", "I don't know", "Hello, my name is", "mentioned in the context provided"] | |
| MESSAGES = [{"role": "system", "content": "You are a helpful assistant.."}] | |
| CHAR_IDLE = f'<img src="{idle_html_base_64}"></img>' | |
| CHAR_TALKING = f'<img src="{talking_html_base64}"></img>' | |
| CHAR_THINKING = f'<img src="{thinking_html_base_64}"></img>' | |
| AUDIO_HTML = '' | |
| # Uncomment If this is your first Run: | |
| import nltk | |
| nltk.download('averaged_perceptron_tagger') | |
| def initialize_knowledge_base(): | |
| loader = DirectoryLoader('profiles', glob='**/*.txt') | |
| docs = loader.load() | |
| char_text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
| doc_texts = char_text_splitter.split_documents(docs) | |
| openAI_embeddings = OpenAIEmbeddings() | |
| vStore = Chroma.from_documents(doc_texts, openAI_embeddings) | |
| conv_model = RetrievalQA.from_chain_type( | |
| llm=OpenAI(), | |
| chain_type="stuff", | |
| retriever=vStore.as_retriever( | |
| search_kwargs={"k": 1} | |
| ) | |
| ) | |
| voice_model = whisper.load_model("tiny") | |
| return conv_model, voice_model | |
| def text_to_speech_gen(answer): | |
| polly = boto3.client('polly', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name=AWS_REGION_NAME) | |
| response = polly.synthesize_speech( | |
| Text=answer, | |
| VoiceId='Matthew', | |
| OutputFormat='mp3', | |
| Engine = "neural") | |
| audio_stream = response['AudioStream'].read() | |
| audio_html = audio_to_html(audio_stream) | |
| return audio_html | |
| def audio_to_html(audio_bytes): | |
| audio_io = BytesIO(audio_bytes) | |
| audio_io.seek(0) | |
| audio_base64 = base64.b64encode(audio_io.read()).decode("utf-8") | |
| audio_html = f'<audio src="data:audio/mpeg;base64,{audio_base64}" controls autoplay></audio>' | |
| return audio_html | |
| def update_img(): | |
| global FUNC_CALL | |
| FUNC_CALL += 1 | |
| if FUNC_CALL % 2== 0: | |
| CHARACTER_STATE = CHAR_TALKING | |
| else: | |
| CHARACTER_STATE = CHAR_THINKING | |
| return CHARACTER_STATE | |
| def user(user_message, history): | |
| return "", history + [[user_message, None]] | |
| conv_model, voice_model = initialize_knowledge_base() | |
| def get_response(history, audio_input): | |
| query_type = 'text' | |
| question =history[-1][0] | |
| if not question: | |
| if audio_input: | |
| query_type = 'audio' | |
| os.rename(audio_input, audio_input + '.wav') | |
| audio_file = open(audio_input + '.wav', "rb") | |
| transcript = openai.Audio.transcribe("whisper-1", audio_file) | |
| question = transcript['text'] | |
| else: | |
| return None, None | |
| logger.info("\nquery_type: %s", query_type) | |
| logger.info("query_text: %s", question) | |
| print('\nquery_type:', query_type) | |
| print('\nquery_text:', question) | |
| if question.lower().strip() == 'hi': | |
| question = 'hello' | |
| answer = conv_model.run(question) | |
| logger.info("\ndocument_response: %s", answer) | |
| print('\ndocument_response:', answer) | |
| for trigger in GENERAL_RSPONSE_TRIGGERS: | |
| if trigger in answer: | |
| MESSAGES.append({"role": "user", "content": question}) | |
| chat = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=MESSAGES, | |
| temperature=0.7, | |
| n=128, | |
| stop="\n" | |
| ) | |
| answer = chat.choices[0].message.content | |
| MESSAGES.append({"role": "assistant", "content": answer}) | |
| logger.info("general_response: %s", answer) | |
| print('\ngeneral_response:', answer) | |
| AUDIO_HTML = text_to_speech_gen(answer) | |
| history[-1][1] = answer | |
| return history, AUDIO_HTML | |
| with gr.Blocks(title="Your Assistance Pal!") as demo: | |
| with gr.Row(): | |
| output_html = gr.HTML(label="Felix's Voice", value=AUDIO_HTML) | |
| output_html.visible = False | |
| assistant_character = gr.HTML(label=None, value=CHAR_IDLE, show_label=False) | |
| with gr.Column(scale=0.1): | |
| chatbot = gr.Chatbot(label='Send a text or a voice input').style(height=285) | |
| with gr.Row(): | |
| msg = gr.Textbox(placeholder='Write a chat & press Enter.', show_label=False).style(container=False) | |
| with gr.Column(scale=0.5): | |
| audio_input = gr.Audio(source="microphone", type='filepath', show_label=False).style(container=False) | |
| button = gr.Button(value="Send") | |
| msg.submit(user, [msg, chatbot], [msg, chatbot] | |
| ).then(update_img, outputs=[assistant_character] | |
| ).then(get_response, [chatbot, audio_input], [chatbot, output_html] | |
| ).then(update_img, outputs=[assistant_character]) | |
| button.click(user, [msg, chatbot], [msg, chatbot] | |
| ).then(update_img, outputs=[assistant_character] | |
| ).then(get_response, [chatbot, audio_input], [chatbot, output_html] | |
| ).then(update_img, outputs=[assistant_character]) | |
| demo.launch(debug=False, favicon_path='assets/favicon.png', show_api=False, share=False) |