Spaces:
Running
Running
| import ast | |
| import os | |
| import re | |
| import shlex | |
| import sys | |
| from ..subprocess_code_interpreter import SubprocessCodeInterpreter | |
| class Python(SubprocessCodeInterpreter): | |
| file_extension = 'py' | |
| proper_name = 'Python' | |
| def __init__(self): | |
| super().__init__() | |
| executable = sys.executable | |
| if os.name != 'nt': # not Windows | |
| executable = shlex.quote(executable) | |
| self.start_cmd = executable + ' -i -q -u' | |
| def preprocess_code(self, code): | |
| return preprocess_python(code) | |
| def line_postprocessor(self, line): | |
| if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line): | |
| return None | |
| return line | |
| def detect_active_line(self, line): | |
| if '##active_line' in line: | |
| return int(line.split('##active_line')[1].split('##')[0]) | |
| return None | |
| def detect_end_of_execution(self, line): | |
| return '##end_of_execution##' in line | |
| def preprocess_python(code): | |
| """ | |
| Add active line markers | |
| Wrap in a try except | |
| Add end of execution marker | |
| """ | |
| # Add print commands that tell us what the active line is | |
| code = add_active_line_prints(code) | |
| # Wrap in a try except | |
| code = wrap_in_try_except(code) | |
| # Remove any whitespace lines, as this will break indented blocks | |
| # (are we sure about this? test this) | |
| code_lines = code.split('\n') | |
| code_lines = [c for c in code_lines if c.strip() != ''] | |
| code = '\n'.join(code_lines) | |
| # Add end command (we'll be listening for this so we know when it ends) | |
| code += '\n\nprint("##end_of_execution##")' | |
| return code | |
| def add_active_line_prints(code): | |
| """ | |
| Add print statements indicating line numbers to a python string. | |
| """ | |
| tree = ast.parse(code) | |
| transformer = AddLinePrints() | |
| new_tree = transformer.visit(tree) | |
| return ast.unparse(new_tree) | |
| class AddLinePrints(ast.NodeTransformer): | |
| """ | |
| Transformer to insert print statements indicating the line number | |
| before every executable line in the AST. | |
| """ | |
| def insert_print_statement(self, line_number): | |
| """Inserts a print statement for a given line number.""" | |
| return ast.Expr( | |
| value=ast.Call( | |
| func=ast.Name(id='print', ctx=ast.Load()), | |
| args=[ast.Constant(value=f'##active_line{line_number}##')], | |
| keywords=[], | |
| )) | |
| def process_body(self, body): | |
| """Processes a block of statements, adding print calls.""" | |
| new_body = [] | |
| # In case it's not iterable: | |
| if not isinstance(body, list): | |
| body = [body] | |
| for sub_node in body: | |
| if hasattr(sub_node, 'lineno'): | |
| new_body.append(self.insert_print_statement(sub_node.lineno)) | |
| new_body.append(sub_node) | |
| return new_body | |
| def visit(self, node): | |
| """Overridden visit to transform nodes.""" | |
| new_node = super().visit(node) | |
| # If node has a body, process it | |
| if hasattr(new_node, 'body'): | |
| new_node.body = self.process_body(new_node.body) | |
| # If node has an orelse block (like in for, while, if), process it | |
| if hasattr(new_node, 'orelse') and new_node.orelse: | |
| new_node.orelse = self.process_body(new_node.orelse) | |
| # Special case for Try nodes as they have multiple blocks | |
| if isinstance(new_node, ast.Try): | |
| for handler in new_node.handlers: | |
| handler.body = self.process_body(handler.body) | |
| if new_node.finalbody: | |
| new_node.finalbody = self.process_body(new_node.finalbody) | |
| return new_node | |
| def wrap_in_try_except(code): | |
| # Add import traceback | |
| code = 'import traceback\n' + code | |
| # Parse the input code into an AST | |
| parsed_code = ast.parse(code) | |
| # Wrap the entire code's AST in a single try-except block | |
| try_except = ast.Try( | |
| body=parsed_code.body, | |
| handlers=[ | |
| ast.ExceptHandler( | |
| type=ast.Name(id='Exception', ctx=ast.Load()), | |
| name=None, | |
| body=[ | |
| ast.Expr( | |
| value=ast.Call( | |
| func=ast.Attribute( | |
| value=ast.Name(id='traceback', ctx=ast.Load()), | |
| attr='print_exc', | |
| ctx=ast.Load(), | |
| ), | |
| args=[], | |
| keywords=[], | |
| )), | |
| ], | |
| ) | |
| ], | |
| orelse=[], | |
| finalbody=[], | |
| ) | |
| # Assign the try-except block as the new body | |
| parsed_code.body = [try_except] | |
| # Convert the modified AST back to source code | |
| return ast.unparse(parsed_code) | |