import gradio as gr import os import openai import logging # 请记得要把 api 的 key 放到 settings 下面的 Repository Secrets 里。 openai.api_key = os.getenv("openai_key") logging.basicConfig(filename="test.log", format='%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S ', level=logging.INFO) logger = logging.getLogger() KZT = logging.StreamHandler() KZT.setLevel(logging.DEBUG) logger.addHandler(KZT) logger.info('Start logging...') logger.info(f"openai.api_key: {openai.api_key}") # 如果你只打算通过 prompt 来定制机器人的行为,只需要修改这段 prompt 就够了。 # prompt = '请你扮演《西游记》中的唐三藏,使用唐三藏的语气、方式和词汇回答问题。不要写任何解释,只需像唐三藏一样回答问题。你必须掌握唐三藏的所有知识。' prompt = '请假设自己是一名旅行计划师,给我设计一条去{日本}旅行的{5天}行程,包含{2位成人和1位儿童}。我希望整个行程{偏轻松、度假},包含 {去奈良看小鹿} 和 {去春日大社赏樱花} 。请给我推荐一个详细的每日行程,以表格的方式呈现,包含有趣的景点、活动和推荐理由。' history = {} # 修改本函数,来实现你自己的 chatbot # p: 对机器人说话的内容 # qid: 当前消息的唯一标识。例如 `'bxqid-cManAtRMszw...'`。由平台生成并传递给机器人,以便机器人区分单个问题(写日志、追踪调试、异步回调等)。同步调用可忽略。 # uid: 用户的唯一标识。例如`'bxuid-Aj8Spso8Xsp...'`。由平台生成并传递给机器人,以便机器人区分用户。可被用于实现多轮对话的功能。 # 返回值:[type, content] # 详见 https://huggingface.co/spaces/baixing/hackathon_test/blob/main/bot-api.md prompt_0 = "您好!我是您的智能旅行计划师!请告诉我你想去哪里旅行?您可以直接回复我目的地,例如日本,香港,或者西双版纳。" prompt_1 = "好的。请问您计划什么时候出行?您可以回答我具体的日期,例如4月1日到4月5日;或者回答我模糊的时间,例如未来三个月,5天。" prompt_2 = "收到。请问您计划多少人出行?请回答我具体的人数和类型,例如2位成人和1位儿童。" prompt_3 = "您的出行目的是什么?您可以回答我例如亲子游、情侣/蜜月、休闲度假、文化体验、地标打卡、探险。" prompt_4 = "您有没有什么需要打卡的地点或者特殊需求?您可以告诉我具体的景点,例如金阁寺,或者特定的活动,例如赏樱花。" predefined_prompts = [ prompt_0, prompt_1, prompt_2, prompt_3, prompt_4 ] _user_response = ["pred def" for _ in range(6)] # count_i = 1 count_i_dict = dict() def init_user_info(uid): global count_i_dict count_i_dict[uid] = dict() count_i_dict[uid]["count_i"] = 1 count_i_dict[uid]["user_response"] = _user_response count_i_dict[uid]["history"] = [] def get_count_i(uid): global count_i_dict if uid not in count_i_dict: return 1 return count_i_dict[uid]["count_i"] def set_count_i(uid, delta=1): global count_i_dict if uid not in count_i_dict: # TODO: call user prof init return count_i_dict[uid]["count_i"] += delta def get_user_response(uid, i=0): global count_i_dict if uid not in count_i_dict: return "" return count_i_dict[uid]["user_response"][i] def set_user_response(uid, i, current_p): global count_i_dict if uid not in count_i_dict: # TODO: call user prof init return count_i_dict[uid]["user_response"][i] = current_p def get_info(p, qid, uid): global predefined_prompts # global user_response # global count_i global count_i_dict if uid not in count_i_dict: init_user_info(uid) logger.info(f"Init info for user {uid} finished. qid={qid}") # current_count_i = count_i_dict[uid]["count_i"] current_count_i = get_count_i(uid) # count_i_dict[uid]["user_response"][current_count_i-1] = p set_user_response(uid, current_count_i-1, p) # user_response[current_count_i-1] = p # user_response[count_i-1] = p next_prompt = predefined_prompts[current_count_i] set_count_i(uid, 1) # count_i += 1 # count_i_dict[uid]["count_i"] += 1 logger.info(f"In get info loop: get from user={uid}: {p}, qid={qid}") logger.info(f"In get info loop: Our next prompt: {next_prompt}, uid={uid}, count_i={get_count_i(uid)}, qid={qid}") return ["text", next_prompt] def chat(p, qid, uid): # global user_response # global count_i global count_i_dict count_i = get_count_i(uid) # count_i = count_i_dict[uid]["count_i"] if count_i <= 4: ret = get_info(p, qid, uid) return ret if count_i == 5: # 5 # user_response[4] = p set_user_response(uid, count_i-1, p) # prompt_0_response = user_response[0] # prompt_1_response = user_response[1] # prompt_2_response = user_response[2] # prompt_3_response = user_response[3] # prompt_4_response = user_response[4] prompt_0_response = get_user_response(uid, 0) prompt_1_response = get_user_response(uid, 1) prompt_2_response = get_user_response(uid, 2) prompt_3_response = get_user_response(uid, 3) prompt_4_response = get_user_response(uid, 4) final_prompt = f"请假设自己是一名旅行计划师,给我设计一条去{prompt_0_response}旅行的{prompt_1_response}行程,包含{prompt_2_response}。我的出行目的是{prompt_3_response},希望包含 {prompt_4_response} 。请给我推荐一个详细的每日行程,以表格的方式呈现,包含有趣的景点、活动和推荐理由。请尽量考虑可执行性。" p = final_prompt logger.info(f"Our final prompt for user={uid}: {final_prompt}, count_i={count_i}, qid={qid}") # 找出该 uid 对应的历史对话 # global history # if uid in history: # msgs = history[uid] # else: # msgs = [] if uid in count_i_dict: history = count_i_dict[uid]["history"] msgs = history else: init_user_info(uid) msgs = [] response = callapi(p, msgs) # history[uid] = msgs + [[p, response]] count_i_dict[uid]["history"] = msgs + [[p, response]] logger.info(f"history for user={uid}: {msgs}, uid={uid}, count_i={get_count_i(uid)}, qid={qid}") logger.info(f"current p for user={uid}: {p}, count_i={get_count_i(uid)}, qid={qid}") logger.info(f"response for user={uid}: {response}, count_i={get_count_i(uid)}, qid={qid}") # count_i_dict[uid]["count_i"] += 1 set_count_i(uid,1) return ["text", response] def callapi(p, msgs): if (len(msgs) > 8): #简单 hard-code 8 回合对话。如果需要更精准的,应该计算 token 数 msgs = msgs[-8:] data = [{"role":"system", "content":prompt}] for m in msgs: data = data + [ {"role":"user", "content":m[0]}, {"role":"assistant", "content":m[1]} ] data = data + [{"role":"user", "content":p}] response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages= data ) print(response) response = response["choices"][0]["message"]["content"] while response.startswith("\n"): response = response[1:] return response iface = gr.Interface(fn=chat, inputs=["text", "text", "text"], outputs=["text", "text"], description="""您好!我是您的智能旅行计划师。请告诉我你想去哪里旅行?您可以直接回复我目的地,例如日本,香港,或者西双版纳。""") comments = """ 已添加多轮对话的极简示范,能将该 uid 的最近八条消息一起发给openai。本实现是内存中的,一旦重启即被清空。如需可持久的多轮对话,需要改用数据库等方式。 注意:duplicate 本项目后,需要将你自己的 openai apikey 设置到 settings 的 Repository Secrets 里,否则运行会报错。[了解详情](https://huggingface.co/spaces/baixing/hackathon_chatbot_openai_api/blob/main/%E6%B7%BB%E5%8A%A0%20secret%20%E7%9A%84%E6%96%B9%E6%B3%95.jpg) [对话测试](https://huggingface.co/spaces/BaixingAI/hackathon_test) [参考文档](https://huggingface.co/spaces/baixing/hackathon_test/blob/main/bot-api.md) [Q & A](https://huggingface.co/spaces/baixing/hackathon_test/blob/main/qna.md) """ iface.launch()