Spaces:
Running
Running
| import re | |
| from ..subprocess_code_interpreter import SubprocessCodeInterpreter | |
| class R(SubprocessCodeInterpreter): | |
| file_extension = 'r' | |
| proper_name = 'R' | |
| def __init__(self): | |
| super().__init__() | |
| self.start_cmd = 'R -q --vanilla' # Start R in quiet and vanilla mode | |
| def preprocess_code(self, code): | |
| """ | |
| Add active line markers | |
| Wrap in a tryCatch for better error handling in R | |
| Add end of execution marker | |
| """ | |
| lines = code.split('\n') | |
| processed_lines = [] | |
| for i, line in enumerate(lines, 1): | |
| # Add active line print | |
| processed_lines.append(f'cat("##active_line{i}##\\n");{line}') | |
| # Join lines to form the processed code | |
| processed_code = '\n'.join(processed_lines) | |
| # Wrap in a tryCatch for error handling and add end of execution marker | |
| processed_code = f""" | |
| tryCatch({{ | |
| {processed_code} | |
| }}, error=function(e){{ | |
| cat("## execution_error ##\\n", conditionMessage(e), "\\n"); | |
| }}) | |
| cat("## end_of_execution ##\\n"); | |
| """ | |
| # Count the number of lines of processed_code | |
| # (R echoes all code back for some reason, but we can skip it if we track this!) | |
| self.code_line_count = len(processed_code.split('\n')) - 1 | |
| return processed_code | |
| def line_postprocessor(self, line): | |
| # If the line count attribute is set and non-zero, decrement and skip the line | |
| if hasattr(self, 'code_line_count') and self.code_line_count > 0: | |
| self.code_line_count -= 1 | |
| return None | |
| if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*|\s*>\s*|\s*\+\s*|\s*)$', line): | |
| return None | |
| if 'R version' in line: # Startup message | |
| return None | |
| if line.strip().startswith('[1] "') and line.endswith( | |
| '"'): # For strings, trim quotation marks | |
| return line[5:-1].strip() | |
| if line.strip().startswith( | |
| '[1]'): # Normal R output prefix for non-string outputs | |
| return line[4:].strip() | |
| return line | |
| def detect_active_line(self, line): | |
| if '##active_line' in line: | |
| return int(line.split('##active_line')[1].split('##')[0]) | |
| return None | |
| def detect_end_of_execution(self, line): | |
| return '##end_of_execution##' in line or '## execution_error ##' in line | |