Spaces:
Runtime error
Runtime error
| import os | |
| import nltk | |
| import openai | |
| import time | |
| import gradio as gr | |
| import tiktoken | |
| from threading import Thread #线程 用于定时器 | |
| from assets.char_poses_base64 import ( #角色动作 | |
| CHAR_IDLE_HTML, CHAR_THINKING_HTML, CHAR_TALKING_HTML) | |
| from app_utils import ( | |
| get_chat_history, initialize_knowledge_base, | |
| text_to_speech_gen, logging, buzz_user) | |
| global max_response_tokens | |
| global token_limit | |
| max_response_tokens = 500 | |
| token_limit= 15000 | |
| global FUNC_CALL #全局变量 用于判断角色动作 | |
| FUNC_CALL = 0 | |
| global BUZZ_TIMEOUT #全局变量 用于定时器 | |
| BUZZ_TIMEOUT = 60 | |
| global MESSAGES | |
| GENERAL_RSPONSE_TRIGGERS = ["I don't understand the question.", "I don't know", "Hello, my name is", "mentioned in the context provided","I don't know the answer"] | |
| MESSAGES = [{"role": "system", "content": "你现在是一个优秀的展览馆讲解员,你可以通过文字或语音与客户交流,你可以讲述上海老建筑和历史人物之间的关系。"}] | |
| LOGGER = logging.getLogger('voice_agent') #日志 | |
| AUDIO_HTML = '' | |
| # Uncomment If this is your first Run: | |
| nltk.download('averaged_perceptron_tagger') #下载语料库 | |
| conv_model, voice_model = initialize_knowledge_base() #初始化知识库 | |
| def num_tokens_from_messages(messages, model="gpt-3.5-turbo-16k"): | |
| encoding = tiktoken.encoding_for_model(model) | |
| num_tokens = 0 | |
| for message in messages: | |
| num_tokens += 4 # every message follows <im_start>{role/name}\n{content}<im_end>\n | |
| for key, value in message.items(): | |
| num_tokens += len(encoding.encode(value)) | |
| if key == "name": # if there's a name, the role is omitted | |
| num_tokens += -1 # role is always required and always 1 token | |
| num_tokens += 2 # every reply is primed with <im_start>assistant | |
| return num_tokens | |
| def idle_timer(): | |
| global BUZZ_TIMEOUT | |
| while True: | |
| time.sleep(BUZZ_TIMEOUT) | |
| buzz_user() | |
| if BUZZ_TIMEOUT == 80: | |
| time.sleep(BUZZ_TIMEOUT) | |
| BUZZ_TIMEOUT = 60 | |
| def update_img(): | |
| global FUNC_CALL | |
| FUNC_CALL += 1 | |
| if FUNC_CALL % 2== 0: | |
| return CHAR_TALKING_HTML | |
| else: | |
| return CHAR_THINKING_HTML | |
| def get_response(history, audio_input): | |
| query_type = 'text' | |
| question =history[-1][0] | |
| conv_history_tokens = 0 | |
| global BUZZ_TIMEOUT | |
| BUZZ_TIMEOUT = 80 | |
| if not question: | |
| if audio_input: | |
| query_type = 'audio' | |
| os.rename(audio_input, audio_input + '.wav') | |
| audio_file = open(audio_input + '.wav', "rb") | |
| transcript = openai.Audio.transcribe("whisper-1", audio_file) | |
| question = transcript['text'] | |
| else: | |
| return None, None | |
| LOGGER.info("\nquery_type: %s", query_type) | |
| LOGGER.info("query_text: %s", question) | |
| print('\nquery_type:', query_type) | |
| print('\nquery_text:', question) | |
| if question.lower().strip() == 'hi': | |
| question = 'hello' | |
| answer = conv_model.run(question) | |
| LOGGER.info("\ndocument_response: %s", answer) | |
| print('\ndocument_response:', answer) | |
| conv_history_tokens = num_tokens_from_messages(MESSAGES) | |
| print("conv_history_tokens: ", conv_history_tokens) | |
| print("MESSAGES", MESSAGES) | |
| while (conv_history_tokens + max_response_tokens >= token_limit): | |
| del MESSAGES[1] | |
| conv_history_tokens = num_tokens_from_messages(MESSAGES) | |
| print("conv_history_tokens_ajust: ", conv_history_tokens) | |
| MESSAGES.append({"role": "user", "content": question}) | |
| MESSAGES.append({"role": "assistant", "content": answer}) | |
| for trigger in GENERAL_RSPONSE_TRIGGERS: | |
| if trigger in answer: | |
| MESSAGES.append({"role": "user", "content": question}) | |
| chat = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo-16k", | |
| messages=MESSAGES, | |
| max_tokens=500, | |
| temperature=0.7, | |
| n=128, | |
| stop="\n" | |
| ) | |
| answer = chat.choices[0].message.content | |
| MESSAGES.append({"role": "assistant", "content": answer}) | |
| LOGGER.info("general_response: %s", answer) | |
| print('\ngeneral_response:', answer) | |
| AUDIO_HTML = text_to_speech_gen(answer) | |
| history[-1][1] = answer | |
| return history, AUDIO_HTML | |
| # buzz_usr_proc = Thread(target=idle_timer) | |
| with gr.Blocks(css = """#col_image{width:800px; height:800px; margin-left: auto; margin-right: auto;}""") as demo: | |
| with gr.Row(scale=0.7): | |
| output_html = gr.HTML(label="Felix's Voice", value=AUDIO_HTML) | |
| output_html.visible = False | |
| image1= gr.Image("assets/NPCtest1.png").style(height=700) #elem_id = "col_image" | |
| #assistant_character = gr.HTML(label=None, value=CHAR_IDLE_HTML, show_label=False) | |
| with gr.Column(scale=0.3): | |
| chatbot = gr.Chatbot(label='Send a text or a voice input').style(height=285) | |
| with gr.Column(): | |
| msg = gr.Textbox(placeholder='Write a chat & press Enter.', show_label=False).style(container=False) | |
| with gr.Column(scale=0.5): | |
| audio_input = gr.Audio(source="microphone", type='filepath', show_label=False).style(container=False) | |
| button = gr.Button(value="Send") | |
| msg.submit(get_chat_history, [msg, chatbot], [msg, chatbot] | |
| ).then(get_response, [chatbot, audio_input], [chatbot, output_html] | |
| ) | |
| button.click(get_chat_history, [msg, chatbot], [msg, chatbot] | |
| ).then(get_response, [chatbot, audio_input], [chatbot, output_html] | |
| ) | |
| # buzz_usr_proc.start() | |
| demo.launch(debug=False, favicon_path='assets/favicon.png', show_api=False, share=False) |