Spaces:
Running
Running
| # ml_module/core/sandbox_runner.py | |
| """ | |
| Phase 5: Secure Code Sandbox Runner | |
| This module provides a secure sandbox environment for executing user-edited training code. | |
| It implements strict security controls to prevent malicious code execution while allowing | |
| legitimate machine learning operations. | |
| Security Features: | |
| - Subprocess isolation with resource limits | |
| - Import validation (allowlist only) | |
| - Filesystem access restrictions | |
| - Network access blocking | |
| - Time and memory limits | |
| - Code validation and sanitization | |
| """ | |
| import ast | |
| import json | |
| import os | |
| import sys | |
| import subprocess | |
| import tempfile | |
| import time | |
| import signal | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, Tuple, List | |
| from contextlib import contextmanager | |
| import traceback | |
| # Conditional import for resource module (Unix only) | |
| try: | |
| import resource | |
| HAS_RESOURCE = True | |
| except ImportError: | |
| HAS_RESOURCE = False | |
| from ml_module.core.exceptions import SandboxExecutionException, SecurityViolationException | |
| logger = logging.getLogger(__name__) | |
| # Warn about resource module if not available | |
| if not HAS_RESOURCE: | |
| logger.warning("resource module not available (Windows), resource limits will be skipped") | |
| class CodeSandboxRunner: | |
| """ | |
| Secure sandbox runner for executing user-provided training code. | |
| This class provides a secure environment for running user-edited training code | |
| with strict security controls and resource limitations. | |
| """ | |
| # Allowlisted imports for security | |
| ALLOWED_IMPORTS = { | |
| 'sklearn', 'sklearn.ensemble', 'sklearn.linear_model', 'sklearn.model_selection', | |
| 'sklearn.metrics', 'sklearn.preprocessing', 'sklearn.pipeline', | |
| 'pandas', 'pd', 'numpy', 'np', 'joblib', 'pickle', | |
| 'os', 'sys', 'pathlib', 'Path', 'json', 'datetime', 'time', | |
| 'math', 'random', 'warnings', 'logging' | |
| } | |
| # Blocked imports for security | |
| BLOCKED_IMPORTS = { | |
| 'subprocess', 'socket', 'urllib', 'requests', 'http', 'ftplib', | |
| 'smtplib', 'imaplib', 'poplib', 'telnetlib', 'paramiko', 'fabric', | |
| 'exec', 'eval', 'compile', 'importlib', '__import__', | |
| 'multiprocessing', 'threading', 'concurrent' | |
| } | |
| # Resource limits | |
| MAX_EXECUTION_TIME = 300 # 5 minutes | |
| MAX_MEMORY_MB = 1024 # 1GB | |
| MAX_FILE_SIZE_MB = 100 # 100MB per file | |
| def __init__(self, project_path: str, user_id: str, project_id: str): | |
| """ | |
| Initialize the sandbox runner. | |
| Args: | |
| project_path: Base path for project files (restricted filesystem access) | |
| user_id: User identifier for logging and security | |
| project_id: Project identifier for logging and security | |
| """ | |
| self.project_path = Path(project_path) | |
| self.user_id = user_id | |
| self.project_id = project_id | |
| self.execution_id = f"{user_id}_{project_id}_{int(time.time())}" | |
| def validate_code_security(self, code: str) -> None: | |
| """ | |
| Validate code for security violations before execution. | |
| Args: | |
| code: The Python code to validate | |
| Raises: | |
| SecurityViolationException: If code contains security violations | |
| """ | |
| try: | |
| # Parse the code into an AST for analysis | |
| tree = ast.parse(code) | |
| # Check for dangerous operations | |
| for node in ast.walk(tree): | |
| # Check imports | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name in self.BLOCKED_IMPORTS: | |
| raise SecurityViolationException( | |
| f"Blocked import detected: {alias.name}", | |
| "import_violation" | |
| ) | |
| if alias.name not in self.ALLOWED_IMPORTS and not self._is_allowed_submodule(alias.name): | |
| raise SecurityViolationException( | |
| f"Unauthorized import: {alias.name}. Only sklearn, pandas, numpy, and joblib are allowed.", | |
| "unauthorized_import" | |
| ) | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module in self.BLOCKED_IMPORTS: | |
| raise SecurityViolationException( | |
| f"Blocked import detected: {node.module}", | |
| "import_violation" | |
| ) | |
| if node.module and not self._is_allowed_submodule(node.module): | |
| raise SecurityViolationException( | |
| f"Unauthorized import: {node.module}. Only sklearn, pandas, numpy, and joblib are allowed.", | |
| "unauthorized_import" | |
| ) | |
| # Check for dangerous function calls | |
| elif isinstance(node, ast.Call): | |
| if isinstance(node.func, ast.Name): | |
| if node.func.id in ['exec', 'eval', 'compile', '__import__']: | |
| raise SecurityViolationException( | |
| f"Dangerous function call: {node.func.id}", | |
| "dangerous_function" | |
| ) | |
| # Check for file operations outside project directory | |
| elif isinstance(node, ast.Str): # String literals that might be file paths | |
| if any(dangerous in node.s.lower() for dangerous in ['/etc/', '/root/', '/home/', 'c:\\']): | |
| raise SecurityViolationException( | |
| f"Suspicious file path detected: {node.s}", | |
| "file_access_violation" | |
| ) | |
| logger.info(f"Code validation passed for execution {self.execution_id}") | |
| except SyntaxError as e: | |
| raise SecurityViolationException(f"Code contains syntax errors: {e}", "syntax_error") | |
| except Exception as e: | |
| if isinstance(e, SecurityViolationException): | |
| raise | |
| raise SecurityViolationException(f"Code validation failed: {e}", "validation_error") | |
| def _is_allowed_submodule(self, module_name: str) -> bool: | |
| """Check if a module is an allowed submodule of sklearn, pandas, numpy, or joblib.""" | |
| allowed_prefixes = ['sklearn', 'pandas', 'numpy', 'joblib', 'os', 'sys', 'json', 'pathlib'] | |
| return any(module_name.startswith(prefix) for prefix in allowed_prefixes) | |
| def execute_code( | |
| self, | |
| code: str, | |
| input_data_path: str, | |
| output_paths: Dict[str, str] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute user-provided training code in a secure sandbox. | |
| Args: | |
| code: The Python code to execute | |
| input_data_path: Path to input data file (relative to project path) | |
| output_paths: Dictionary of output file paths (relative to project path) | |
| Returns: | |
| Dict containing execution results, metrics, and output paths | |
| Raises: | |
| SandboxExecutionException: If execution fails | |
| SecurityViolationException: If security violations are detected | |
| """ | |
| # Validate code security first | |
| self.validate_code_security(code) | |
| # Create temporary execution directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_path = Path(temp_dir) | |
| # Create execution script | |
| script_path = temp_path / f"execute_{self.execution_id}.py" | |
| execution_code = self._prepare_execution_code(code, input_data_path, output_paths) | |
| with open(script_path, 'w') as f: | |
| f.write(execution_code) | |
| # Execute in subprocess with resource limits | |
| return self._execute_in_subprocess(script_path, temp_path) | |
| def _prepare_execution_code( | |
| self, | |
| user_code: str, | |
| input_data_path: str, | |
| output_paths: Dict[str, str] | |
| ) -> str: | |
| """ | |
| Prepare the complete execution code with imports and path setup. | |
| Args: | |
| user_code: User-provided training code | |
| input_data_path: Path to input data | |
| output_paths: Dictionary of output paths | |
| Returns: | |
| Complete executable Python code | |
| """ | |
| # Resolve absolute paths for safety | |
| project_abs_path = self.project_path.resolve() | |
| input_abs_path = (project_abs_path / input_data_path).resolve() | |
| # Validate paths are within project directory | |
| if not str(input_abs_path).startswith(str(project_abs_path)): | |
| raise SecurityViolationException( | |
| f"Input path outside project directory: {input_data_path}", | |
| "path_violation" | |
| ) | |
| output_abs_paths = {} | |
| for key, rel_path in output_paths.items(): | |
| abs_path = (project_abs_path / rel_path).resolve() | |
| if not str(abs_path).startswith(str(project_abs_path)): | |
| raise SecurityViolationException( | |
| f"Output path outside project directory: {rel_path}", | |
| "path_violation" | |
| ) | |
| output_abs_paths[key] = str(abs_path) | |
| # Create execution wrapper | |
| execution_template = f''' | |
| import os | |
| import sys | |
| import json | |
| import traceback | |
| import warnings | |
| from pathlib import Path | |
| # Suppress warnings for cleaner output | |
| warnings.filterwarnings('ignore') | |
| # Setup paths | |
| PROJECT_PATH = r"{project_abs_path}" | |
| INPUT_DATA_PATH = r"{input_abs_path}" | |
| OUTPUT_PATHS = {output_abs_paths} | |
| # Execution results tracking | |
| execution_results = {{ | |
| "status": "running", | |
| "start_time": None, | |
| "end_time": None, | |
| "metrics": {{}}, | |
| "outputs": {{}}, | |
| "errors": [], | |
| "logs": [] | |
| }} | |
| def log_message(msg): | |
| """Log execution messages.""" | |
| execution_results["logs"].append(str(msg)) | |
| print(f"[SANDBOX] {{msg}}") | |
| try: | |
| import time | |
| execution_results["start_time"] = time.time() | |
| log_message("Starting code execution in sandbox") | |
| # User code execution | |
| log_message("Executing user-provided training code...") | |
| {self._indent_code(user_code, 4)} | |
| execution_results["end_time"] = time.time() | |
| execution_results["status"] = "completed" | |
| log_message("Code execution completed successfully") | |
| # Check if expected outputs were created | |
| for output_key, output_path in OUTPUT_PATHS.items(): | |
| if os.path.exists(output_path): | |
| execution_results["outputs"][output_key] = output_path | |
| log_message(f"Output created: {{output_key}} -> {{output_path}}") | |
| else: | |
| log_message(f"Warning: Expected output not created: {{output_key}} -> {{output_path}}") | |
| except Exception as e: | |
| execution_results["end_time"] = time.time() | |
| execution_results["status"] = "failed" | |
| execution_results["errors"].append({{ | |
| "type": type(e).__name__, | |
| "message": str(e), | |
| "traceback": traceback.format_exc() | |
| }}) | |
| log_message(f"Execution failed: {{str(e)}}") | |
| print(traceback.format_exc()) | |
| finally: | |
| # Save execution results | |
| results_path = os.path.join(PROJECT_PATH, "execution_results.json") | |
| with open(results_path, 'w') as f: | |
| json.dump(execution_results, f, indent=2) | |
| log_message(f"Execution results saved to {{results_path}}") | |
| ''' | |
| return execution_template | |
| def _indent_code(self, code: str, indent: int) -> str: | |
| """Indent code block for proper Python formatting.""" | |
| lines = code.split('\n') | |
| indented_lines = [' ' * indent + line if line.strip() else line for line in lines] | |
| return '\n'.join(indented_lines) | |
| def _execute_in_subprocess(self, script_path: Path, temp_dir: Path) -> Dict[str, Any]: | |
| """ | |
| Execute code in a secure subprocess with resource limits. | |
| Args: | |
| script_path: Path to the execution script | |
| temp_dir: Temporary directory for execution | |
| Returns: | |
| Execution results dictionary | |
| """ | |
| def preexec_fn(): | |
| """Set resource limits for the subprocess.""" | |
| if HAS_RESOURCE: | |
| # Set memory limit | |
| resource.setrlimit(resource.RLIMIT_AS, ( | |
| self.MAX_MEMORY_MB * 1024 * 1024, | |
| self.MAX_MEMORY_MB * 1024 * 1024 | |
| )) | |
| # Set CPU time limit | |
| resource.setrlimit(resource.RLIMIT_CPU, ( | |
| self.MAX_EXECUTION_TIME, | |
| self.MAX_EXECUTION_TIME | |
| )) | |
| # Disable core dumps | |
| resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) | |
| else: | |
| logger.debug("Skipping resource limits (not available on Windows)") | |
| # Set process group for easier cleanup | |
| os.setpgrp() | |
| try: | |
| logger.info(f"Executing code in sandbox for {self.execution_id}") | |
| # Create restricted environment | |
| env = os.environ.copy() | |
| env['PYTHONPATH'] = str(self.project_path) | |
| env['HOME'] = str(temp_dir) # Restrict home directory access | |
| # Execute with timeout | |
| process = subprocess.Popen( | |
| [sys.executable, str(script_path)], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| env=env, | |
| preexec_fn=preexec_fn, | |
| cwd=str(self.project_path), | |
| text=True | |
| ) | |
| try: | |
| stdout, stderr = process.communicate(timeout=self.MAX_EXECUTION_TIME) | |
| return_code = process.returncode | |
| except subprocess.TimeoutExpired: | |
| # Kill the entire process group | |
| try: | |
| os.killpg(os.getpgid(process.pid), signal.SIGTERM) | |
| except ProcessLookupError: | |
| pass | |
| process.kill() | |
| stdout, stderr = process.communicate() | |
| raise SandboxExecutionException( | |
| f"Code execution timed out after {self.MAX_EXECUTION_TIME} seconds", | |
| "timeout_error" | |
| ) | |
| # Load execution results | |
| results_path = self.project_path / "execution_results.json" | |
| if results_path.exists(): | |
| with open(results_path, 'r') as f: | |
| results = json.load(f) | |
| # Clean up results file | |
| results_path.unlink() | |
| # Add subprocess info | |
| results["subprocess"] = { | |
| "return_code": return_code, | |
| "stdout": stdout, | |
| "stderr": stderr | |
| } | |
| if return_code != 0: | |
| results["status"] = "failed" | |
| results["errors"].append({ | |
| "type": "subprocess_error", | |
| "message": f"Process exited with code {return_code}", | |
| "stderr": stderr | |
| }) | |
| logger.info(f"Sandbox execution completed for {self.execution_id}: {results['status']}") | |
| return results | |
| else: | |
| # Fallback if results file wasn't created | |
| raise SandboxExecutionException( | |
| f"Execution results not found. stdout: {stdout}, stderr: {stderr}", | |
| "results_missing" | |
| ) | |
| except Exception as e: | |
| if isinstance(e, SandboxExecutionException): | |
| raise | |
| logger.error(f"Sandbox execution failed for {self.execution_id}: {e}") | |
| raise SandboxExecutionException(f"Sandbox execution failed: {e}", "execution_error") | |
| def temporary_limits(self): | |
| """Context manager for temporary resource limits during testing.""" | |
| original_time = self.MAX_EXECUTION_TIME | |
| original_memory = self.MAX_MEMORY_MB | |
| try: | |
| # Reduce limits for testing | |
| self.MAX_EXECUTION_TIME = 30 # 30 seconds for tests | |
| self.MAX_MEMORY_MB = 256 # 256MB for tests | |
| yield | |
| finally: | |
| # Restore original limits | |
| self.MAX_EXECUTION_TIME = original_time | |
| self.MAX_MEMORY_MB = original_memory | |
| # Convenience function for simple execution | |
| def execute_training_code_safely( | |
| code: str, | |
| project_path: str, | |
| user_id: str, | |
| project_id: str, | |
| input_data_path: str, | |
| output_paths: Dict[str, str] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convenience function to execute training code safely. | |
| Args: | |
| code: The training code to execute | |
| project_path: Base project path | |
| user_id: User identifier | |
| project_id: Project identifier | |
| input_data_path: Relative path to input data | |
| output_paths: Dictionary of expected output paths | |
| Returns: | |
| Execution results dictionary | |
| """ | |
| runner = CodeSandboxRunner(project_path, user_id, project_id) | |
| return runner.execute_code(code, input_data_path, output_paths) |