Spaces:
Runtime error
Runtime error
| import os | |
| import sys | |
| import openai | |
| import json | |
| # sys.path.append(r"D:\python\Lib\site-packages") | |
| import sys | |
| if 'D:\\python\\Lib\\site-packages' in sys.path: | |
| sys.path.remove('D:\\python\\Lib\\site-packages') | |
| from programmer import Programmer | |
| from inspector import Inspector | |
| from function_calling import * | |
| from cache.cache import * | |
| from prompt_engineering.prompts import * | |
| import warnings | |
| import traceback | |
| # import pdfkit | |
| import zipfile | |
| from kernel import * | |
| from lambda_utils import * | |
| from cache.oss import upload_oss_file | |
| # from utilities import function_calling | |
| warnings.filterwarnings("ignore") | |
| class Conversation(): | |
| def __init__(self, config) -> None: | |
| self.config = config | |
| self.model = config['conv_model'] | |
| self.client = openai.OpenAI(api_key=config['api_key'], base_url=config['base_url_conv_model']) | |
| self.programmer = Programmer(api_key=config['api_key'], model=config['programmer_model'], base_url=config['base_url_programmer']) | |
| self.inspector = Inspector(api_key=config['api_key'], model=config['inspector_model'], base_url=config['base_url_inspector']) | |
| self.messages = [] | |
| self.chat_history = [] | |
| self.retrieval = self.config['retrieval'] | |
| self.kernel = CodeKernel(config['max_exe_time']) | |
| self.function_repository = {} | |
| self.my_data_cache = None | |
| self.max_attempts = config['max_attempts'] # maximum attempts of self-correcting | |
| self.error_count = 0 # error count of self-correcting | |
| self.repair_count = 0 # repair count of self-correcting | |
| self.oss_dir = None | |
| self.local_cache_dir = None | |
| self.run_code(IMPORT) | |
| def add_functions(self, function_lib: dict) -> None: | |
| self.function_repository = function_lib | |
| def add_data(self, data_path) -> None: | |
| self.my_data_cache = data_cache(data_path) | |
| def check_folder(self, before_files, after_files): | |
| new_files = set(after_files) - set(before_files) | |
| cloud_cache_info = [] | |
| if new_files: | |
| for file in new_files: | |
| cache_info = upload_oss_file(self.oss_dir, os.path.join(self.local_cache_dir, file)) | |
| cloud_cache_info.append(cache_info) | |
| return cloud_cache_info | |
| def save_conv(self): | |
| with open(os.path.join(self.local_cache_dir,'programmer_msg.json'), 'w') as f: | |
| json.dump(self.programmer.messages, f, indent=4) | |
| f.close() | |
| with open(os.path.join(self.local_cache_dir,'inspector_msg.json'), 'w') as f: | |
| json.dump(self.inspector.messages, f, indent=4) | |
| f.close() | |
| print(f"Conversation saved in {os.path.join(self.local_cache_dir,'programmer_msg.json')}") | |
| print(f"Conversation saved in {os.path.join(self.local_cache_dir, 'inspector_msg.json')}") | |
| def add_programmer_msg(self, message: dict): | |
| self.programmer.messages.append(message) | |
| def add_programmer_repair_msg(self, bug_code:str, error_msg:str, fix_method:str, role="user"): | |
| message = {"role": role, "content": CODE_FIX.format(bug_code=bug_code, error_message=error_msg, fix_method=fix_method)} | |
| self.programmer.messages.append(message) | |
| def add_inspector_msg(self, bug_code:str, error_msg:str, role="user"): | |
| message = {"role": role, "content": CODE_INSPECT.format(bug_code=bug_code,error_message=error_msg)} | |
| self.inspector.messages.append(message) | |
| def run_code(self,code): | |
| try: | |
| res_type, res = execute(code, self.kernel) # error in code of kernel will result in res_type:None | |
| except Exception as e: | |
| print(f'Error when executing code in kernel: {e}') | |
| res_type, res = 'error', str(e) | |
| self.add_inspector_msg(code, str(e)) | |
| return res_type, res | |
| def rendering_code(self): | |
| for i in range(len(self.programmer.messages)-1, 0, -1): | |
| if self.programmer.messages[i]["role"] == "assistant": | |
| is_python, code = extract_code(self.programmer.messages[i]["content"]) | |
| if is_python: | |
| return code | |
| return None | |
| # def human_loop(self, code): | |
| # self.programmer.messages.append({"role": "user", "content": HUMAN_LOOP.format(code=code)}) | |
| # | |
| # result = self.run_code(code) | |
| # exec_result = f"Execution result of user's code:\n{result}." | |
| # self.programmer.messages.append({"role": "system", "content": exec_result}) | |
| # return response | |
| def show_data(self) -> pd.DataFrame: | |
| return self.my_data_cache.data | |
| def document_generation(self): # use conv model to generate report. | |
| print("Report generating...") | |
| self.messages = self.programmer.messages + [{"role": "user", "content": Academic_Report}] | |
| # self.programmer.messages.append({"role": "user", "content": DOCUMENT}) | |
| report = self.call_chat_model(max_tokens=self.config["max_token_conv_model"]).choices[0].message.content | |
| self.messages.append({"role": "assistant", "content": report}) | |
| #self.local_cache_dir = 'xxx' | |
| mkd_path = os.path.join(self.local_cache_dir,'report.md') | |
| # pdf_path = os.path.join(self.local_cache_dir,'report.pdf') | |
| # zip_path = os.path.join(self.local_cache_dir,'report.zip') | |
| with open(mkd_path, "w") as f: | |
| f.write(report) | |
| f.close() | |
| # pdfkit.from_file(mkd_path, pdf_path) | |
| # with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| # zipf.write('report.md') | |
| # zipf.write('report.pdf') | |
| return mkd_path | |
| def export_code(self): | |
| print("Exporting notebook...") | |
| notebook_path = os.path.join(self.local_cache_dir, 'notebook.ipynb') | |
| try: | |
| self.kernel.export(notebook_path) | |
| except Exception as e: | |
| print(f"An error occurred when exporting notebook: {e}") | |
| return notebook_path | |
| def call_chat_model(self, max_tokens, functions=None, include_functions=False, ) -> dict: | |
| params = { | |
| "model": self.model, | |
| "messages": self.messages, | |
| "max_tokens": None if max_tokens is None else max_tokens | |
| } | |
| if include_functions: | |
| params["functions"] = functions | |
| #params["function_call"] = "auto" | |
| try: | |
| return self.client.chat.completions.create(**params) | |
| except Exception as e: | |
| self.messages = [self.messages[0], self.messages[-1]] | |
| params = { | |
| "model": self.model, | |
| "messages": self.messages | |
| } | |
| print(f"Occurs error of calling chat model: {e}") | |
| return self.client.chat.completions.create(**params) | |
| def clear(self): | |
| self.messages = [] | |
| self.programmer.clear() | |
| self.inspector.clear() | |
| self.kernel.shutdown() | |
| del self.kernel | |
| self.kernel = CodeKernel() | |
| self.my_data_cache = None | |
| self.my_data_description = None | |
| self.oss_dir = None | |
| self.local_cache_dir = None | |
| def stream_workflow(self, chat_history, function_lib: dict=None, code=None) -> object: | |
| try: | |
| chat_history[-1][1] = "" | |
| if not self.my_data_cache and code is None: #todo: remove this restriction | |
| for message in self.programmer._call_chat_model_streaming(): | |
| chat_history[-1][1] += message | |
| yield chat_history | |
| #chat_history += "\nNote, no data found. Please upload the data manually to start your task." | |
| yield chat_history | |
| final_response = chat_history[-1][1] #''.join(chat_history[-1][1]) | |
| self.add_programmer_msg({"role": "assistant", "content": final_response}) | |
| else: | |
| if not code: | |
| #_, _ = execute(IMPORT, self.kernel) #todo install packages | |
| prog_response1_content = '' | |
| for message in self.programmer._call_chat_model_streaming(retrieval=self.retrieval, kernel=self.kernel): | |
| chat_history[-1][1] += message | |
| prog_response1_content += message | |
| yield chat_history | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response1_content}) | |
| else: | |
| prog_response1_content = HUMAN_LOOP.format(code=code) | |
| self.add_programmer_msg({"role": "user", "content": prog_response1_content}) | |
| is_python, code = extract_code(prog_response1_content) | |
| print("is_python:",is_python) | |
| if is_python: | |
| chat_history[-1][1] += '\nπ₯οΈ Execute code...\n' | |
| yield chat_history | |
| before_files = os.listdir(self.local_cache_dir) | |
| res_type, res = self.run_code(code) | |
| if res_type and res_type != 'error' or not res: # no error in code not res = res is None | |
| after_files = os.listdir(self.local_cache_dir) | |
| cloud_cache_info = self.check_folder(before_files, after_files) | |
| link_info = '\n'.join([f"![{info['file_name']}]({info['download_link']})" if info['file_name'].endswith( | |
| ('.jpg', '.jpeg', '.png', '.gif')) else f"[{info['file_name']}]({info['download_link']})" for info in | |
| cloud_cache_info]) | |
| print("res:", res) | |
| chat_history[-1][1] += f"\nβ Execution result:\n{res}\n\n" | |
| yield chat_history | |
| self.add_programmer_msg({"role": "user", "content": RESULT_PROMPT.format(res)}) | |
| prog_response2 = '' | |
| for message in self.programmer._call_chat_model_streaming(): | |
| chat_history[-1][1] += message | |
| prog_response2 += message | |
| yield chat_history | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response2}) | |
| chat_history[-1][1] += f"\n{link_info}" | |
| yield chat_history | |
| #elif not res_type and res: #todo: problems related the image output in the result. | |
| # self-correcting | |
| else: | |
| self.error_count += 1 | |
| round = 0 | |
| while res and round < self.max_attempts: #max 5 round | |
| chat_history[-1][1] = f'β Execution error, try to repair the code, attempts: {round+1}....\n' | |
| yield chat_history | |
| self.add_inspector_msg(code, res) | |
| if round == 3: | |
| insp_response1_content = "Try other packages or methods." | |
| else: | |
| insp_response1 = self.inspector._call_chat_model() | |
| insp_response1_content = insp_response1.choices[0].message.content | |
| self.inspector.messages.append({"role": "assistant", "content": insp_response1_content}) | |
| self.add_programmer_repair_msg(code, res, insp_response1_content) | |
| prog_response1_content = '' | |
| for message in self.programmer._call_chat_model_streaming(): | |
| chat_history[-1][1] += message | |
| prog_response1_content += message | |
| yield chat_history | |
| chat_history[-1][1] += '\nπ₯οΈ Execute code...\n' | |
| yield chat_history | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response1_content}) | |
| is_python, code = extract_code(prog_response1_content) | |
| if is_python: | |
| before_files = os.listdir(self.local_cache_dir) | |
| res_type, res = self.run_code(code) | |
| if res_type and res_type != 'error' or not res: # execute successfully | |
| self.repair_count += 1 | |
| break | |
| round += 1 | |
| if round == self.max_attempts: # maximum retry | |
| return prog_response1_content + "\nSorry, I can't fix the code, can you help me to modified it or give some suggestions?" | |
| #self.add_programmer_msg({"role": "user", "content": "You can not tackle the error, you should ask me to give suggestions or modify the code."}) | |
| # revision successful | |
| after_files = os.listdir(self.local_cache_dir) | |
| cloud_cache_info = self.check_folder(before_files, after_files) | |
| link_info = '\n'.join([f"![{info['file_name']}]({info['download_link']})" if info['file_name'].endswith( | |
| ('.jpg', '.jpeg', '.png', '.gif')) else f"[{info['file_name']}]({info['download_link']})" for info in | |
| cloud_cache_info]) | |
| print("res:", res) | |
| chat_history[-1][1] += f"\nβ Execution result:\n{res}\n\n" | |
| yield chat_history | |
| self.add_programmer_msg({"role": "user", "content": RESULT_PROMPT.format(res)}) | |
| prog_response2 = '' | |
| for message in self.programmer._call_chat_model_streaming(): | |
| chat_history[-1][1] += message | |
| prog_response2 += message | |
| yield chat_history | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response2}) | |
| chat_history[-1][1] += f"\n{link_info}" | |
| yield chat_history | |
| # else: #save file, figure.. | |
| # after_files = os.listdir(self.local_cache_dir) | |
| # cloud_cache_info = self.check_folder(before_files, after_files) | |
| # link_info = '\n'.join( | |
| # [f"![{info['file_name']}]({info['download_link']})" if info['file_name'].endswith( | |
| # ( | |
| # '.jpg', '.jpeg', '.png', '.gif')) else f"[{info['file_name']}]({info['download_link']})" | |
| # for info in | |
| # cloud_cache_info]) | |
| # print("res:", res) | |
| # self.add_programmer_msg({"role": "system", "content": FIG_PROMPT}) | |
| # prog_response2 = self.programmer._call_chat_model().choices[0].message.content | |
| # final_response = prog_response1_content + "\n" + prog_response2 | |
| # final_response += f"\n{link_info}" | |
| else: | |
| chat_history[-1][1] += "\nNo code detected or code is not python code." #todo : delete printing this | |
| yield chat_history | |
| final_response = prog_response1_content + "\nNo code detected or code is not python code." | |
| if self.programmer.messages[-1]["role"] == "assistant": | |
| self.programmer.messages[-1]["content"] = final_response | |
| except Exception as e: | |
| chat_history[-1][1] += "\nSorry, there is an error in the program, please try again." | |
| print(f"An error occurred: {e}") | |
| traceback.print_exc() | |
| if self.programmer.messages[-1]["role"] == "user": | |
| self.programmer.messages.append({"role": "assistant", "content": f"An error occurred in program: {e}"}) | |
| #return "Sorry, there is an error in the program, please try again." | |
| #return final_response | |
| # a previous code without streaming | |
| def run_workflow(self,function_lib: dict=None, code=None) -> object: | |
| try: | |
| if not self.my_data_cache and code is None: | |
| final_response = self.programmer._call_chat_model().choices[0].message.content | |
| self.add_programmer_msg({"role": "assistant", "content": final_response}) | |
| else: | |
| if not code: | |
| #_, _ = execute(IMPORT, self.kernel) #todo prompt of install packages | |
| prog_response1 = self.programmer._call_chat_model() | |
| prog_response1_content = prog_response1.choices[0].message.content | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response1_content}) | |
| else: | |
| prog_response1_content = HUMAN_LOOP.format(code=code) | |
| self.add_programmer_msg({"role": "user", "content": prog_response1_content}) | |
| self.add_programmer_msg({"role": "assistant", "content": "I got the code, let me execute it."}) | |
| is_python, code = extract_code(prog_response1_content) | |
| print("is_python:",is_python) | |
| if is_python: | |
| before_files = os.listdir(self.local_cache_dir) | |
| res_type, res = self.run_code(code) | |
| if res_type and res_type != 'error' or not res: # no error in code not res = res is None | |
| after_files = os.listdir(self.local_cache_dir) | |
| cloud_cache_info = self.check_folder(before_files, after_files) | |
| link_info = '\n'.join([f"![{info['file_name']}]({info['download_link']})" if info['file_name'].endswith( | |
| ('.jpg', '.jpeg', '.png', '.gif')) else f"[{info['file_name']}]({info['download_link']})" for info in | |
| cloud_cache_info]) | |
| print("res:", res) | |
| self.add_programmer_msg({"role": "user", "content": RESULT_PROMPT.format(res)}) | |
| prog_response2 = self.programmer._call_chat_model().choices[0].message.content | |
| final_response = prog_response1_content + "\n" + f"Execution result:\n{res}\n\n{prog_response2}" | |
| final_response += f"\n{link_info}" | |
| #elif not res_type and res: #error occurs in code, only image | |
| else: | |
| self.error_count += 1 | |
| round = 0 | |
| while res and round < self.max_attempts: #max 5 round | |
| self.add_inspector_msg(code, res) | |
| if round == 3: | |
| insp_response1_content = "Try other packages or methods." | |
| else: | |
| insp_response1 = self.inspector._call_chat_model() | |
| insp_response1_content = insp_response1.choices[0].message.content | |
| self.inspector.messages.append({"role": "assistant", "content": insp_response1_content}) | |
| #self.add_programmer_msg(modify_msg) | |
| self.add_programmer_repair_msg(code, res, insp_response1_content) | |
| prog_response1 = self.programmer._call_chat_model() | |
| prog_response1_content = prog_response1.choices[0].message.content | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response1_content}) | |
| is_python, code = extract_code(prog_response1_content) | |
| if is_python: | |
| before_files = os.listdir(self.local_cache_dir) | |
| res_type, res = self.run_code(code) | |
| if res_type or not res: # execute successfully | |
| self.repair_count += 1 | |
| break | |
| round += 1 | |
| if round == self.max_attempts: # max retry | |
| # self.add_programmer_msg({"role": "assistant", "content": "Sorry, I can't fix the code, can you help me to modified it or give some suggestions?"}) | |
| return prog_response1_content + "\nSorry, I can't fix the code, can you help me to modified it or give some suggestions?" | |
| # revision successful | |
| after_files = os.listdir(self.local_cache_dir) | |
| cloud_cache_info = self.check_folder(before_files, after_files) | |
| link_info = '\n'.join([f"![{info['file_name']}]({info['download_link']})" if info['file_name'].endswith( | |
| ('.jpg', '.jpeg', '.png', '.gif')) else f"[{info['file_name']}]({info['download_link']})" for info in | |
| cloud_cache_info]) | |
| print("res:", res) | |
| if 'head()' in code: | |
| self.messages.append({"role": "user", "content": MKD_PROMPT.format(res)}) | |
| mkd_res = self.call_chat_model(max_tokens=512).choices[0].message.content | |
| res = mkd_res | |
| self.messages.pop() | |
| self.add_programmer_msg({"role": "user", "content": RESULT_PROMPT.format(res)}) | |
| prog_response2 = self.programmer._call_chat_model().choices[0].message.content | |
| # todo: delete code in second response | |
| # prog_response2 = remove_code_blocks(prog_response2) | |
| self.add_programmer_msg({"role": "assistant", "content": prog_response2}) | |
| # final_response = prog_response1_content + "\n" + prog_response2 | |
| final_response = prog_response1_content + "\n" + f"Execution result:\n{res}\n\n{prog_response2}" | |
| final_response += f"\n{link_info}" | |
| # else: #save file, figure.. | |
| # after_files = os.listdir(self.local_cache_dir) | |
| # cloud_cache_info = self.check_folder(before_files, after_files) | |
| # link_info = '\n'.join( | |
| # [f"![{info['file_name']}]({info['download_link']})" if info['file_name'].endswith( | |
| # ( | |
| # '.jpg', '.jpeg', '.png', '.gif')) else f"[{info['file_name']}]({info['download_link']})" | |
| # for info in | |
| # cloud_cache_info]) | |
| # print("res:", res) | |
| # self.add_programmer_msg({"role": "system", "content": FIG_PROMPT}) | |
| # prog_response2 = self.programmer._call_chat_model().choices[0].message.content | |
| # final_response = prog_response1_content + "\n" + prog_response2 | |
| # final_response += f"\n{link_info}" | |
| else: | |
| final_response = prog_response1_content + "\n" + "No code detected or code is not python code." | |
| if self.programmer.messages[-1]["role"] == "assistant": | |
| self.programmer.messages[-1]["content"] = final_response | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| traceback.print_exc() | |
| if self.programmer.messages[-1]["role"] == "user": | |
| self.programmer.messages.append({"role": "assistant", "content": f"An error occurred in program: {e}"}) | |
| return "Sorry, there is an error in the program, please try again." | |
| return final_response | |
| # function_lib = { | |
| # | |
| # } | |