from curses import KEY_LEFT import os import json import pickle as pkl import numpy as np from tqdm import tqdm # Set proxy for demo purpose # os.environ['HTTP_PROXY'] = 'socks5h://127.0.0.1:1080' # os.environ['HTTPS_PROXY'] = 'socks5h://127.0.0.1:1080' def get_rules(): '''Add your questing rules here. ''' # feature_list=["症状和体征","诊断","预后","治疗"] info_head="概述" no="的" identity="图书管理员" found="{identity}找到了" not_found="{identity}需要其他资料" query_1=''' 我想让你扮演一位虚拟的{identity}。不同于给出诊断结果的专业医生,{identity}自身没有任何的医学知识,也无法回答患者的提问。因此,请忘记你的医学知识。现在你必须从知识库中,查阅与患者的问题最可能有帮助的医学知识。我将扮演知识库,告诉你医学知识,以及可以查询的主题。你需要在我提供的选项中,选择一个查询主题,我将告诉你新的医学知识,以及新的可以查询的主题。请重复以上流程,直到你认为,你从我查询到的医学知识,对患者的提问可能有帮助,此时,请告诉我'{found}'注意,你是一个{identity},无法回答患者的提问,你必须从我扮演的知识库提供的选项中,选择一个医学知识的主题进行查询。患者的问题是:"{quest}"你需要尽量查询与这个问题有关的知识。 现在,你必须选择以下一个主题选项中,选择最可能有帮助的主题回复我: {topic_list} 注意,你不允许回答患者的提问,不允许回复其他内容,不允许提出建议,不允许回复你做出选择的原因,解释或者假设。你只允许从我扮演的知识库提供的主题选项中,选择一项查询。只回复你想查询的主题选项的名字。 ''' query_topic1=''' 如果你认为,你已经查询到了,对于"{quest}"这个问题,可能有帮助的医学知识,请回复我'{found}'如果还没有,你必须选择以下一个主题选项中,选择最可能有帮助的主题回复我: {topic_list} '{not_found}' 只回复你想查询的选项的名字就可以了,不需要回复别的内容。 ''' query_topic2=''' 如果你认为,你已经查询到了,对于"{quest}"这个问题,可能有帮助的医学知识,请回复我'{found}'如果还没有,你必须回复我'{not_found}'。只回复你想查询的选项的名字就可以了,不需要回复别的内容。 ''' query_2=''' 做得很好! 你查询到的医学知识是: \''' {knowledge} \''' {query_topic} ''' #v2.5 query_res=''' 请告诉我,刚才你查询到的哪些医学知识,对"{quest}"可能有帮助?请打印\'''内的原文,不要打印别的内容。 ''' #v3 # query_res=''' # 请打印刚才查询到的医学知识,这些知识应当对"{quest}"可能有帮助。不要打印别的内容。 # ''' return locals() global_rules=get_rules() def format_query(query,verbose=False,**kwargs): # format_pool={ # key:globals()[key] for key in keylist # } # format_pool=global_rules # print(kwargs) kwargs={**global_rules,**kwargs} if verbose: print(query) while '{' in query: query=query.format(**kwargs) if verbose: print(query) return query def list2str(word_list:list): ret=" ".join(map(lambda x:f"'{x}'",word_list)) return ret from tkinter import N from revChatGPT.V3 import Chatbot from revChatGPT.typings import ChatbotError import time class Chat_api: def __init__(self,api_key,proxy=None,verbose=False): self.api_key=api_key # if proxy: # os.environ["http_proxy"] = proxy # os.environ["all_proxy"] = proxy self.chatbot = Chatbot(api_key=api_key,proxy=proxy) # self.chatbot = Chatbot(api_key=api_key,proxy='http://127.0.0.1:7890') # for data in chatbot.ask_stream("Hello world"): # print(data, end="", flush=True) self.now_query="" self.now_res="" self.verbose=verbose def prompt(self,query,**kwargs): query=format_query(query,**kwargs) self.now_query=query if self.verbose: print("Human:\n",query,flush=True) def get_res(self,max_connection_try=5,fail_sleep_time=10): res=None for i in range(max_connection_try): try: res=self.chatbot.ask(self.now_query) break except ChatbotError: # print("Warn: openAI Connection Error! Try again!") time.sleep(fail_sleep_time) if self.verbose: print("ChatGPT:\n",res,flush=True) print() self.now_res=res return res def get_choice_res(self,possible_res,max_false_time=5): ''' Give several choice for Chatgpt ''' # res=input() possible_res=[format_query(q) for q in possible_res] def check_res(res:str,possible_res:list): commas=",,.。'‘’/、\\::\"“”??!!;;`·~@#$%^&*()_+-=<>[]{}|" for c in commas: res=res.replace(c,' ') res_tks=res.split() for p in possible_res: if p in res_tks: return p return None for i in range(max_false_time): self.now_res=self.get_res() res_choice=check_res(self.now_res,possible_res) if res_choice: if self.verbose: # print("ChatGPT:\n",self.now_res,flush=True) print("Choice of ChatGPT:",res_choice) print(flush=True) return res_choice self.chatbot.rollback(2) # print("Warn: ChatGPT didn't give a possible response!") return None import json,types def answer_quest(quest: str,api_key: str,topic_base_dict: list):#,topic): global_rules['quest']=quest feature_list,info_head,no,quest,topic,identity,found,not_found,query_1,query_topic1,query_topic2,query_2,query_res=global_rules.get('feature_list'),global_rules.get('info_head'),global_rules.get('no'),global_rules.get('quest'),global_rules.get('topic'),global_rules.get('identity'),global_rules.get('found'),global_rules.get('not_found'),global_rules.get('query_1'),global_rules.get('query_topic1'),global_rules.get('query_topic2'),global_rules.get('query_2'),global_rules.get('query_res') infobase=json.load(open(os.path.join(os.path.dirname(__file__), 'dataset', 'disease_info.json'),"r",encoding="utf-8")) # Set proxy for demo purpose chatapi=Chat_api(api_key=api_key, verbose=False, proxy='socks5h://127.0.0.1:1080') prompt=chatapi.prompt get_res=chatapi.get_res get_choice_res=chatapi.get_choice_res info_topic="" # topic_list=list(topic_base_dict.keys()) topic_list=topic_base_dict infobase={i:infobase[i] for i in topic_base_dict} prompt(query_1,topic_list=list2str(topic_list)) now_res=get_choice_res([found,not_found]+topic_list) if now_res in topic_base_dict: info_topic=now_res info_list=[infobase] while len(info_list)!=0: now_info=info_list[-1] if now_res==format_query(found): prompt(query_res) found_data=get_res() # print(found_data) # return now_info_str,found_data return info_topic,found_data # break elif now_res==format_query(not_found): info_list.pop() if len(info_list)==0: # print("not found") break now_info=info_list[-1] topic_list=list(now_info.keys()) if info_head in topic_list:topic_list.remove(info_head) prompt(query_topic1,topic_list=list2str(topic_list)) possible_res=[found,not_found]+topic_list elif now_res in topic_list: # now_info=now_info[now_res] if type(now_info[now_res])==str: now_info_str=now_info.pop(now_res) now_info={info_head:now_info_str} info_list.append(now_info) topic_list=[] prompt(query_2,knowledge=now_info_str,query_topic=query_topic2) possible_res=[found,not_found] else: now_info=now_info.pop(now_res) topic_list=list(now_info.keys()) info_list.append(now_info) if info_head in topic_list: topic_list.remove(info_head) now_info_str=now_info[info_head] if len(topic_list)==0: prompt(query_2,knowledge=now_info_str,query_topic=query_topic2) # possible_res=[found,not_found] else: prompt(query_2,knowledge=now_info_str,query_topic=query_topic1,topic_list=list2str(topic_list)) else: prompt(query_topic1,topic_list=list2str(topic_list)) possible_res=[found,not_found]+topic_list else: # print("unhandle strange result") break now_res=get_choice_res(possible_res) if now_res in topic_base_dict: info_topic=now_res # topic_list=now_info[now_res].keys() # prompt(query_2,knowledge=now_info[now_res],query_topic=query_topic2) return None def query_range(model, query: str,k:int=3,bar=0.6): msd=json.load(open(os.path.join(os.path.dirname(__file__), 'dataset', 'disease_info.json'),"r",encoding='utf-8')) emb_d = pkl.load(open(os.path.join(os.path.dirname(__file__), 'dataset', 'MSD.pkl'),'rb')) embeddings=[] for key,value in emb_d.items(): embeddings.append(value) embeddings=np.asarray(embeddings) # m = SentenceModel() q_emb = model.encode(query) # q_emb = m.encode(query) q_emb=q_emb/np.linalg.norm(q_emb, ord=2) # Calculate the cosine similarity between the query embedding and all other embeddings cos_similarities = np.dot(embeddings, q_emb) # Get the indices of the embeddings with the highest cosine similarity scores top_k_indices = cos_similarities.argsort()[-k:][::-1] # print(f"cos similarities of top k choices; only > {bar} will be selected :") # print(cos_similarities[top_k_indices]) sift_topK=top_k_indices[np.argwhere(cos_similarities[top_k_indices]>bar)] sift_topK=sift_topK.reshape(sift_topK.shape[0],) ret, raw_ret = [], [] if len(sift_topK)==0: return ret, [None,None] for indices in sift_topK: key=list(emb_d.keys())[indices] ret.append(key) for indices in top_k_indices: key=list(emb_d.keys())[indices] raw_ret.append(key) return ret, [raw_ret, cos_similarities[top_k_indices]]