Spaces:
Paused
Paused
| from dataclasses import dataclass | |
| import os, json, contextlib, subprocess, ast, shlex | |
| from io import StringIO | |
| import time | |
| from typing import Literal | |
| from python.helpers import files, messages | |
| from agent import Agent | |
| from python.helpers.tool import Tool, Response | |
| from python.helpers import files | |
| from python.helpers.print_style import PrintStyle | |
| from python.helpers.shell_local import LocalInteractiveSession | |
| from python.helpers.shell_ssh import SSHInteractiveSession | |
| from python.helpers.docker import DockerContainerManager | |
| class State: | |
| shell: LocalInteractiveSession | SSHInteractiveSession | |
| docker: DockerContainerManager | None | |
| class CodeExecution(Tool): | |
| def execute(self,**kwargs): | |
| self.prepare_state() | |
| # os.chdir(files.get_abs_path("./work_dir")) #change CWD to work_dir | |
| runtime = self.args["runtime"].lower().strip() | |
| if runtime == "python": | |
| response = self.execute_python_code(self.args["code"]) | |
| elif runtime == "nodejs": | |
| response = self.execute_nodejs_code(self.args["code"]) | |
| elif runtime == "terminal": | |
| response = self.execute_terminal_command(self.args["code"]) | |
| else: | |
| response = files.read_file("./prompts/fw.code_runtime_wrong.md", runtime=runtime) | |
| if not response: response = files.read_file("./prompts/fw.code_no_output.md") | |
| return Response(message=response, break_loop=False) | |
| def after_execution(self, response, **kwargs): | |
| msg_response = files.read_file("./prompts/fw.tool_response.md", tool_name=self.name, tool_response=response.message) | |
| self.agent.append_message(msg_response, human=True) | |
| def prepare_state(self): | |
| self.state = self.agent.get_data("cot_state") | |
| if not self.state: | |
| #initialize docker container if execution in docker is configured | |
| if self.agent.config.code_exec_docker_enabled: | |
| docker = DockerContainerManager(name=self.agent.config.code_exec_docker_name, image=self.agent.config.code_exec_docker_image, ports=self.agent.config.code_exec_docker_ports, volumes=self.agent.config.code_exec_docker_volumes) | |
| docker.start_container() | |
| else: docker = None | |
| #initialize local or remote interactive shell insterface | |
| if self.agent.config.code_exec_ssh_enabled: | |
| shell = SSHInteractiveSession(self.agent.config.code_exec_ssh_addr,self.agent.config.code_exec_ssh_port,self.agent.config.code_exec_ssh_user,self.agent.config.code_exec_ssh_pass) | |
| else: shell = LocalInteractiveSession() | |
| self.state = State(shell=shell,docker=docker) | |
| shell.connect() | |
| self.agent.set_data("cot_state", self.state) | |
| def execute_python_code(self, code): | |
| escaped_code = shlex.quote(code) | |
| command = f'python3 -c {escaped_code}' | |
| return self.terminal_session(command) | |
| def execute_nodejs_code(self, code): | |
| escaped_code = shlex.quote(code) | |
| command = f'node -e {escaped_code}' | |
| return self.terminal_session(command) | |
| def execute_terminal_command(self, command): | |
| return self.terminal_session(command) | |
| def terminal_session(self, command): | |
| self.state.shell.send_command(command) | |
| PrintStyle(background_color="white",font_color="#85C1E9",bold=True).print(f"{self.agent.agent_name} code execution output:") | |
| idle=0 | |
| while True: | |
| time.sleep(0.1) # Wait for some output to be generated | |
| full_output, partial_output = self.state.shell.read_output() | |
| if partial_output: | |
| PrintStyle(font_color="#85C1E9").stream(partial_output) | |
| idle=0 | |
| else: | |
| idle+=1 | |
| if ( full_output and idle > 30 ) or ( not full_output and idle > 100 ): return full_output | |