Spaces:
Runtime error
Runtime error
| import queue | |
| # import streamlit as st | |
| import base64 | |
| from io import BytesIO | |
| import re | |
| import os | |
| import jupyter_client | |
| from PIL import Image | |
| from subprocess import PIPE | |
| from pprint import pprint | |
| import nbformat as nbf | |
| IPYKERNEL = os.environ.get('IPYKERNEL', 'dsagent') | |
| # client = get_client() | |
| class CodeKernel(object): | |
| def __init__(self, | |
| kernel_name='kernel', | |
| kernel_id=None, | |
| kernel_config_path="", | |
| python_path=None, | |
| ipython_path=None, | |
| init_file_path="./startup.py", | |
| verbose=1, | |
| max_exe_time=6000): | |
| self.kernel_name = kernel_name | |
| self.kernel_id = kernel_id | |
| self.kernel_config_path = kernel_config_path | |
| self.python_path = python_path | |
| self.ipython_path = ipython_path | |
| self.init_file_path = init_file_path | |
| self.executed_cells = [] | |
| self.verbose = verbose | |
| self.interrupt_signal = False | |
| self.max_exe_time = max_exe_time | |
| if python_path is None and ipython_path is None: | |
| env = None | |
| else: | |
| env = {"PATH": self.python_path + ":$PATH", "PYTHONPATH": self.python_path} | |
| # Initialize the backend kernel | |
| self.kernel_manager = jupyter_client.KernelManager(kernel_name=IPYKERNEL, | |
| connection_file=self.kernel_config_path, | |
| exec_files=[self.init_file_path], | |
| env=env) | |
| if self.kernel_config_path: | |
| self.kernel_manager.load_connection_file() | |
| self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE) | |
| print("Backend kernel started with the configuration: {}".format( | |
| self.kernel_config_path)) | |
| else: | |
| self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE) | |
| print("Backend kernel started with the configuration: {}".format( | |
| self.kernel_manager.connection_file)) | |
| if verbose: | |
| pprint(self.kernel_manager.get_connection_info()) | |
| # Initialize the code kernel | |
| self.kernel = self.kernel_manager.blocking_client() | |
| # self.kernel.load_connection_file() | |
| self.kernel.start_channels() | |
| print("Code kernel started.") | |
| def execute(self, code): | |
| self.kernel.execute(code) | |
| try: | |
| shell_msg = self.kernel.get_shell_msg(timeout=self.max_exe_time) | |
| io_msg_content = self.kernel.get_iopub_msg(timeout=self.max_exe_time)['content'] | |
| msg_out_list = [] | |
| while True: | |
| msg_out = io_msg_content | |
| if 'name' in msg_out and msg_out['name'] != 'stderr': | |
| msg_out_list.append(msg_out) | |
| elif 'data' in msg_out and 'image/png' in msg_out['data']: | |
| msg_out_list.append(msg_out) | |
| ### Poll the message | |
| try: | |
| io_msg_content = self.kernel.get_iopub_msg(timeout=self.max_exe_time)['content'] | |
| if 'execution_state' in io_msg_content and io_msg_content['execution_state'] == 'idle': | |
| msg_out_list.append(msg_out) | |
| break | |
| except queue.Empty: | |
| break | |
| # msg_out = '\n'.join(msg_out_list) | |
| return shell_msg, msg_out_list | |
| except Exception as e: | |
| print(e) | |
| return None | |
| def execute_code_(self, code): | |
| msg_id = self.kernel.execute(code) | |
| # Get the output of the code | |
| msg_list = [] | |
| while True: | |
| try: | |
| iopub_msg = self.kernel.get_iopub_msg(timeout=self.max_exe_time) | |
| msg_list.append(iopub_msg) | |
| if iopub_msg['msg_type'] == 'status' and iopub_msg['content'].get('execution_state') == 'idle': | |
| break | |
| except: | |
| if self.interrupt_signal: | |
| self.kernel_manager.interrupt_kernel() | |
| self.interrupt_signal = False | |
| continue | |
| all_output = [] | |
| for iopub_msg in msg_list: | |
| if iopub_msg['msg_type'] == 'stream': | |
| if iopub_msg['content'].get('name') == 'stdout': | |
| output = iopub_msg['content']['text'] | |
| all_output.append(('stdout', output)) | |
| elif iopub_msg['msg_type'] == 'execute_result': | |
| if 'data' in iopub_msg['content']: | |
| if 'text/plain' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['text/plain'] | |
| all_output.append(('execute_result_text', output)) | |
| if 'text/html' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['text/html'] | |
| all_output.append(('execute_result_html', output)) | |
| if 'image/png' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['image/png'] | |
| all_output.append(('execute_result_png', output)) | |
| if 'image/jpeg' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['image/jpeg'] | |
| all_output.append(('execute_result_jpeg', output)) | |
| elif iopub_msg['msg_type'] == 'display_data': | |
| if 'data' in iopub_msg['content']: | |
| if 'text/plain' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['text/plain'] | |
| all_output.append(('display_text', output)) | |
| if 'text/html' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['text/html'] | |
| all_output.append(('display_html', output)) | |
| if 'image/png' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['image/png'] | |
| all_output.append(('display_png', output)) | |
| if 'image/jpeg' in iopub_msg['content']['data']: | |
| output = iopub_msg['content']['data']['image/jpeg'] | |
| all_output.append(('display_jpeg', output)) | |
| elif iopub_msg['msg_type'] == 'error': | |
| if 'traceback' in iopub_msg['content']: | |
| output = '\n'.join(iopub_msg['content']['traceback']) | |
| all_output.append(('error', output)) | |
| return all_output | |
| def export(self, file_path): | |
| nb = nbf.v4.new_notebook() | |
| nb.cells = self.executed_cells | |
| print("cell: ",nb.cells) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| nbf.write(nb, f) | |
| print(f"Notebook exported to {file_path}") | |
| def execute_interactive(self, code, verbose=False): | |
| shell_msg = self.kernel.execute_interactive(code) | |
| if shell_msg is queue.Empty: | |
| if verbose: | |
| print("Timeout waiting for shell message.") | |
| self.check_msg(shell_msg, verbose=verbose) | |
| return shell_msg | |
| def inspect(self, code, verbose=False): | |
| msg_id = self.kernel.inspect(code) | |
| shell_msg = self.kernel.get_shell_msg(timeout=30) | |
| if shell_msg is queue.Empty: | |
| if verbose: | |
| print("Timeout waiting for shell message.") | |
| self.check_msg(shell_msg, verbose=verbose) | |
| return shell_msg | |
| def get_error_msg(self, msg, verbose=False) -> str | None: | |
| if msg['content']['status'] == 'error': | |
| try: | |
| error_msg = msg['content']['traceback'] | |
| except: | |
| try: | |
| error_msg = msg['content']['traceback'][-1].strip() | |
| except: | |
| error_msg = "Traceback Error" | |
| if verbose: | |
| print("Error: ", error_msg) | |
| return error_msg | |
| return None | |
| def check_msg(self, msg, verbose=False): | |
| status = msg['content']['status'] | |
| if status == 'ok': | |
| if verbose: | |
| print("Execution succeeded.") | |
| elif status == 'error': | |
| for line in msg['content']['traceback']: | |
| if verbose: | |
| print(line) | |
| def shutdown(self): | |
| # Shutdown the backend kernel | |
| self.kernel_manager.shutdown_kernel(now=True) | |
| print("Backend kernel shutdown.") | |
| # Shutdown the code kernel | |
| self.kernel.shutdown() | |
| print("Code kernel shutdown.") | |
| def restart(self): | |
| # Restart the backend kernel | |
| self.kernel_manager.restart_kernel() | |
| print("Backend kernel restarted.") | |
| def start(self): | |
| # Initialize the code kernel | |
| self.kernel = self.kernel_manager.blocking_client() | |
| # self.kernel.load_connection_file() | |
| self.kernel.start_channels() | |
| print("Code kernel started.") | |
| def interrupt(self): | |
| # Interrupt the backend kernel | |
| self.kernel_manager.interrupt_kernel() | |
| print("Backend kernel interrupted.") | |
| def is_alive(self): | |
| return self.kernel.is_alive() | |
| def b64_2_img(data): | |
| buff = BytesIO(base64.b64decode(data)) | |
| return Image.open(buff) | |
| def clean_ansi_codes(input_string): | |
| ansi_escape = re.compile(r'(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]') | |
| return ansi_escape.sub('', input_string) | |
| def execute(code, kernel: CodeKernel) -> tuple[str, str | Image.Image]: | |
| res = "" | |
| res_type = None | |
| # code = code.replace("<|observation|>", "") | |
| # code = code.replace("<|assistant|>interpreter", "") | |
| # code = code.replace("<|assistant|>", "") | |
| # code = code.replace("<|user|>", "") | |
| # code = code.replace("<|system|>", "") | |
| msg, output_list = kernel.execute(code) | |
| if msg['metadata']['status'] == "timeout": | |
| return res_type, 'Timed out' | |
| elif msg['metadata']['status'] == 'error': | |
| return res_type, clean_ansi_codes('\n'.join(kernel.get_error_msg(msg, verbose=True))) #todo: change the res_type to error | |
| if len(output_list) > 1: | |
| res_list = [] | |
| for output in output_list: | |
| if 'text' in output: | |
| res_type = "text" | |
| res = output['text'] | |
| elif 'data' in output: | |
| for key in output['data']: | |
| if 'text/plain' in key: | |
| res_type = "text" | |
| res = output['data'][key] | |
| elif 'image/png' in key: | |
| res_type = "image" | |
| res = output['data'][key] | |
| break | |
| if res_type == "image": | |
| #return res_type, b64_2_img(res) | |
| res_list.append(b64_2_img(res)) | |
| elif res_type == "text" or res_type == "traceback": #traceback is error? | |
| #res = res | |
| res_list.append(res) | |
| if len(res_list) > 1 and res_list[-1] == res_list[-2]: | |
| res_list.pop() | |
| res = '\n'.join(res_list) | |
| return res_type, res | |
| else: | |
| output = output_list[0] | |
| if 'text' in output: #output is a dict, what situation is 'text' in output? | |
| res_type = "text" | |
| res = output['text'] | |
| elif 'data' in output: | |
| for key in output['data']: | |
| if 'text/plain' in key: #key = 'text/plain' | |
| res_type = "text" | |
| res = output['data'][key] | |
| elif 'image/png' in key: # image will be present as html file? | |
| res_type = "image" | |
| res = output['data'][key] | |
| break | |
| if res_type == "image": | |
| return res_type, b64_2_img(res) | |
| elif res_type == "text" or res_type == "traceback": # traceback is error? | |
| res = res | |
| return res_type, res | |
| # @st.cache_resource | |
| def get_kernel(): | |
| kernel = CodeKernel() | |
| return kernel | |
| if __name__ == '__main__': | |
| kernel = CodeKernel() | |
| table_code = """ | |
| import pandas as pd | |
| data = pd.read_csv('/Users/stephensun/Desktop/pypro/LAMBDA/cache/conv_cache/c1829f26-f6b3-4cbc-96a7-88aef2c33df2-2024-08-13/wine.csv') | |
| data.head() | |
| """ | |
| img_code = """ | |
| import matplotlib.pyplot as plt | |
| x = [1, 2, 3, 4, 5] | |
| y = [2, 3, 5, 7, 6] | |
| plt.plot(x, y) | |
| plt.title('Simple Line Plot') | |
| plt.xlabel('X-axis') | |
| plt.ylabel('Y-axis') | |
| plt.show() | |
| """ | |
| res_type, res = execute(img_code, kernel) | |
| print(res_type,res) | |
| # kernel.export("my_notebook.ipynb") | |