# ml_module/core/sandbox_runner.py """ Phase 5: Secure Code Sandbox Runner This module provides a secure sandbox environment for executing user-edited training code. It implements strict security controls to prevent malicious code execution while allowing legitimate machine learning operations. Security Features: - Subprocess isolation with resource limits - Import validation (allowlist only) - Filesystem access restrictions - Network access blocking - Time and memory limits - Code validation and sanitization """ import ast import json import os import sys import subprocess import tempfile import time import signal import logging from pathlib import Path from typing import Dict, Any, Optional, Tuple, List from contextlib import contextmanager import traceback # Conditional import for resource module (Unix only) try: import resource HAS_RESOURCE = True except ImportError: HAS_RESOURCE = False from ml_module.core.exceptions import SandboxExecutionException, SecurityViolationException logger = logging.getLogger(__name__) # Warn about resource module if not available if not HAS_RESOURCE: logger.warning("resource module not available (Windows), resource limits will be skipped") class CodeSandboxRunner: """ Secure sandbox runner for executing user-provided training code. This class provides a secure environment for running user-edited training code with strict security controls and resource limitations. """ # Allowlisted imports for security ALLOWED_IMPORTS = { 'sklearn', 'sklearn.ensemble', 'sklearn.linear_model', 'sklearn.model_selection', 'sklearn.metrics', 'sklearn.preprocessing', 'sklearn.pipeline', 'pandas', 'pd', 'numpy', 'np', 'joblib', 'pickle', 'os', 'sys', 'pathlib', 'Path', 'json', 'datetime', 'time', 'math', 'random', 'warnings', 'logging' } # Blocked imports for security BLOCKED_IMPORTS = { 'subprocess', 'socket', 'urllib', 'requests', 'http', 'ftplib', 'smtplib', 'imaplib', 'poplib', 'telnetlib', 'paramiko', 'fabric', 'exec', 'eval', 'compile', 'importlib', '__import__', 'multiprocessing', 'threading', 'concurrent' } # Resource limits MAX_EXECUTION_TIME = 300 # 5 minutes MAX_MEMORY_MB = 1024 # 1GB MAX_FILE_SIZE_MB = 100 # 100MB per file def __init__(self, project_path: str, user_id: str, project_id: str): """ Initialize the sandbox runner. Args: project_path: Base path for project files (restricted filesystem access) user_id: User identifier for logging and security project_id: Project identifier for logging and security """ self.project_path = Path(project_path) self.user_id = user_id self.project_id = project_id self.execution_id = f"{user_id}_{project_id}_{int(time.time())}" def validate_code_security(self, code: str) -> None: """ Validate code for security violations before execution. Args: code: The Python code to validate Raises: SecurityViolationException: If code contains security violations """ try: # Parse the code into an AST for analysis tree = ast.parse(code) # Check for dangerous operations for node in ast.walk(tree): # Check imports if isinstance(node, ast.Import): for alias in node.names: if alias.name in self.BLOCKED_IMPORTS: raise SecurityViolationException( f"Blocked import detected: {alias.name}", "import_violation" ) if alias.name not in self.ALLOWED_IMPORTS and not self._is_allowed_submodule(alias.name): raise SecurityViolationException( f"Unauthorized import: {alias.name}. Only sklearn, pandas, numpy, and joblib are allowed.", "unauthorized_import" ) elif isinstance(node, ast.ImportFrom): if node.module in self.BLOCKED_IMPORTS: raise SecurityViolationException( f"Blocked import detected: {node.module}", "import_violation" ) if node.module and not self._is_allowed_submodule(node.module): raise SecurityViolationException( f"Unauthorized import: {node.module}. Only sklearn, pandas, numpy, and joblib are allowed.", "unauthorized_import" ) # Check for dangerous function calls elif isinstance(node, ast.Call): if isinstance(node.func, ast.Name): if node.func.id in ['exec', 'eval', 'compile', '__import__']: raise SecurityViolationException( f"Dangerous function call: {node.func.id}", "dangerous_function" ) # Check for file operations outside project directory elif isinstance(node, ast.Str): # String literals that might be file paths if any(dangerous in node.s.lower() for dangerous in ['/etc/', '/root/', '/home/', 'c:\\']): raise SecurityViolationException( f"Suspicious file path detected: {node.s}", "file_access_violation" ) logger.info(f"Code validation passed for execution {self.execution_id}") except SyntaxError as e: raise SecurityViolationException(f"Code contains syntax errors: {e}", "syntax_error") except Exception as e: if isinstance(e, SecurityViolationException): raise raise SecurityViolationException(f"Code validation failed: {e}", "validation_error") def _is_allowed_submodule(self, module_name: str) -> bool: """Check if a module is an allowed submodule of sklearn, pandas, numpy, or joblib.""" allowed_prefixes = ['sklearn', 'pandas', 'numpy', 'joblib', 'os', 'sys', 'json', 'pathlib'] return any(module_name.startswith(prefix) for prefix in allowed_prefixes) def execute_code( self, code: str, input_data_path: str, output_paths: Dict[str, str] ) -> Dict[str, Any]: """ Execute user-provided training code in a secure sandbox. Args: code: The Python code to execute input_data_path: Path to input data file (relative to project path) output_paths: Dictionary of output file paths (relative to project path) Returns: Dict containing execution results, metrics, and output paths Raises: SandboxExecutionException: If execution fails SecurityViolationException: If security violations are detected """ # Validate code security first self.validate_code_security(code) # Create temporary execution directory with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create execution script script_path = temp_path / f"execute_{self.execution_id}.py" execution_code = self._prepare_execution_code(code, input_data_path, output_paths) with open(script_path, 'w') as f: f.write(execution_code) # Execute in subprocess with resource limits return self._execute_in_subprocess(script_path, temp_path) def _prepare_execution_code( self, user_code: str, input_data_path: str, output_paths: Dict[str, str] ) -> str: """ Prepare the complete execution code with imports and path setup. Args: user_code: User-provided training code input_data_path: Path to input data output_paths: Dictionary of output paths Returns: Complete executable Python code """ # Resolve absolute paths for safety project_abs_path = self.project_path.resolve() input_abs_path = (project_abs_path / input_data_path).resolve() # Validate paths are within project directory if not str(input_abs_path).startswith(str(project_abs_path)): raise SecurityViolationException( f"Input path outside project directory: {input_data_path}", "path_violation" ) output_abs_paths = {} for key, rel_path in output_paths.items(): abs_path = (project_abs_path / rel_path).resolve() if not str(abs_path).startswith(str(project_abs_path)): raise SecurityViolationException( f"Output path outside project directory: {rel_path}", "path_violation" ) output_abs_paths[key] = str(abs_path) # Create execution wrapper execution_template = f''' import os import sys import json import traceback import warnings from pathlib import Path # Suppress warnings for cleaner output warnings.filterwarnings('ignore') # Setup paths PROJECT_PATH = r"{project_abs_path}" INPUT_DATA_PATH = r"{input_abs_path}" OUTPUT_PATHS = {output_abs_paths} # Execution results tracking execution_results = {{ "status": "running", "start_time": None, "end_time": None, "metrics": {{}}, "outputs": {{}}, "errors": [], "logs": [] }} def log_message(msg): """Log execution messages.""" execution_results["logs"].append(str(msg)) print(f"[SANDBOX] {{msg}}") try: import time execution_results["start_time"] = time.time() log_message("Starting code execution in sandbox") # User code execution log_message("Executing user-provided training code...") {self._indent_code(user_code, 4)} execution_results["end_time"] = time.time() execution_results["status"] = "completed" log_message("Code execution completed successfully") # Check if expected outputs were created for output_key, output_path in OUTPUT_PATHS.items(): if os.path.exists(output_path): execution_results["outputs"][output_key] = output_path log_message(f"Output created: {{output_key}} -> {{output_path}}") else: log_message(f"Warning: Expected output not created: {{output_key}} -> {{output_path}}") except Exception as e: execution_results["end_time"] = time.time() execution_results["status"] = "failed" execution_results["errors"].append({{ "type": type(e).__name__, "message": str(e), "traceback": traceback.format_exc() }}) log_message(f"Execution failed: {{str(e)}}") print(traceback.format_exc()) finally: # Save execution results results_path = os.path.join(PROJECT_PATH, "execution_results.json") with open(results_path, 'w') as f: json.dump(execution_results, f, indent=2) log_message(f"Execution results saved to {{results_path}}") ''' return execution_template def _indent_code(self, code: str, indent: int) -> str: """Indent code block for proper Python formatting.""" lines = code.split('\n') indented_lines = [' ' * indent + line if line.strip() else line for line in lines] return '\n'.join(indented_lines) def _execute_in_subprocess(self, script_path: Path, temp_dir: Path) -> Dict[str, Any]: """ Execute code in a secure subprocess with resource limits. Args: script_path: Path to the execution script temp_dir: Temporary directory for execution Returns: Execution results dictionary """ def preexec_fn(): """Set resource limits for the subprocess.""" if HAS_RESOURCE: # Set memory limit resource.setrlimit(resource.RLIMIT_AS, ( self.MAX_MEMORY_MB * 1024 * 1024, self.MAX_MEMORY_MB * 1024 * 1024 )) # Set CPU time limit resource.setrlimit(resource.RLIMIT_CPU, ( self.MAX_EXECUTION_TIME, self.MAX_EXECUTION_TIME )) # Disable core dumps resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) else: logger.debug("Skipping resource limits (not available on Windows)") # Set process group for easier cleanup os.setpgrp() try: logger.info(f"Executing code in sandbox for {self.execution_id}") # Create restricted environment env = os.environ.copy() env['PYTHONPATH'] = str(self.project_path) env['HOME'] = str(temp_dir) # Restrict home directory access # Execute with timeout process = subprocess.Popen( [sys.executable, str(script_path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=preexec_fn, cwd=str(self.project_path), text=True ) try: stdout, stderr = process.communicate(timeout=self.MAX_EXECUTION_TIME) return_code = process.returncode except subprocess.TimeoutExpired: # Kill the entire process group try: os.killpg(os.getpgid(process.pid), signal.SIGTERM) except ProcessLookupError: pass process.kill() stdout, stderr = process.communicate() raise SandboxExecutionException( f"Code execution timed out after {self.MAX_EXECUTION_TIME} seconds", "timeout_error" ) # Load execution results results_path = self.project_path / "execution_results.json" if results_path.exists(): with open(results_path, 'r') as f: results = json.load(f) # Clean up results file results_path.unlink() # Add subprocess info results["subprocess"] = { "return_code": return_code, "stdout": stdout, "stderr": stderr } if return_code != 0: results["status"] = "failed" results["errors"].append({ "type": "subprocess_error", "message": f"Process exited with code {return_code}", "stderr": stderr }) logger.info(f"Sandbox execution completed for {self.execution_id}: {results['status']}") return results else: # Fallback if results file wasn't created raise SandboxExecutionException( f"Execution results not found. stdout: {stdout}, stderr: {stderr}", "results_missing" ) except Exception as e: if isinstance(e, SandboxExecutionException): raise logger.error(f"Sandbox execution failed for {self.execution_id}: {e}") raise SandboxExecutionException(f"Sandbox execution failed: {e}", "execution_error") @contextmanager def temporary_limits(self): """Context manager for temporary resource limits during testing.""" original_time = self.MAX_EXECUTION_TIME original_memory = self.MAX_MEMORY_MB try: # Reduce limits for testing self.MAX_EXECUTION_TIME = 30 # 30 seconds for tests self.MAX_MEMORY_MB = 256 # 256MB for tests yield finally: # Restore original limits self.MAX_EXECUTION_TIME = original_time self.MAX_MEMORY_MB = original_memory # Convenience function for simple execution def execute_training_code_safely( code: str, project_path: str, user_id: str, project_id: str, input_data_path: str, output_paths: Dict[str, str] ) -> Dict[str, Any]: """ Convenience function to execute training code safely. Args: code: The training code to execute project_path: Base project path user_id: User identifier project_id: Project identifier input_data_path: Relative path to input data output_paths: Dictionary of expected output paths Returns: Execution results dictionary """ runner = CodeSandboxRunner(project_path, user_id, project_id) return runner.execute_code(code, input_data_path, output_paths)