import os import time from typing import Dict, Any import subprocess from flow_modules.Tachi67.InterpreterFlowModule import InterpreterAtomicFlow class ExecuteCodeAtomicFlow(InterpreterAtomicFlow): def _prepare_code(self, input_data: Dict[str, Any]): file_location = input_data["temp_code_file_location"] start_marker = "# Code:\n" end_marker = "############" code_started = False code_str = "" with open(file_location, 'r') as file: for line in file: if line.strip() == start_marker.strip(): code_started = True continue if line.strip() == end_marker.strip(): break if code_started: code_str += line input_data["code"] = code_str def _check_input(self, input_data: Dict[str, Any]): assert "temp_code_file_location" in input_data, "temp_code_file_location not passed to ExecuteCodeAtomicFlow" assert "language" in input_data, "language not passed to ExecuteCodeAtomicFlow" def _delete_file(self, file_location): if os.path.exists(file_location): os.remove(file_location) def _open_file_and_wait_for_upd(self, file_location): process = subprocess.Popen(["code", "--wait", file_location]) while True: if process.poll() is not None: break time.sleep(1) def run( self, input_data: Dict[str, Any]): self._check_input(input_data) file_loc = input_data["temp_code_file_location"] self._open_file_and_wait_for_upd(file_loc) self._prepare_code(input_data) self._process_input_data(input_data) execution_output = self._call() self._delete_file(file_loc) response = {"interpreter_output": execution_output, "code_ran": input_data['code']} return response