sirus / backend /ml_module /core /sandbox_runner.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
783a952
# ml_module/core/sandbox_runner.py
"""
Phase 5: Secure Code Sandbox Runner
This module provides a secure sandbox environment for executing user-edited training code.
It implements strict security controls to prevent malicious code execution while allowing
legitimate machine learning operations.
Security Features:
- Subprocess isolation with resource limits
- Import validation (allowlist only)
- Filesystem access restrictions
- Network access blocking
- Time and memory limits
- Code validation and sanitization
"""
import ast
import json
import os
import sys
import subprocess
import tempfile
import time
import signal
import logging
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, List
from contextlib import contextmanager
import traceback
# Conditional import for resource module (Unix only)
try:
import resource
HAS_RESOURCE = True
except ImportError:
HAS_RESOURCE = False
from ml_module.core.exceptions import SandboxExecutionException, SecurityViolationException
logger = logging.getLogger(__name__)
# Warn about resource module if not available
if not HAS_RESOURCE:
logger.warning("resource module not available (Windows), resource limits will be skipped")
class CodeSandboxRunner:
"""
Secure sandbox runner for executing user-provided training code.
This class provides a secure environment for running user-edited training code
with strict security controls and resource limitations.
"""
# Allowlisted imports for security
ALLOWED_IMPORTS = {
'sklearn', 'sklearn.ensemble', 'sklearn.linear_model', 'sklearn.model_selection',
'sklearn.metrics', 'sklearn.preprocessing', 'sklearn.pipeline',
'pandas', 'pd', 'numpy', 'np', 'joblib', 'pickle',
'os', 'sys', 'pathlib', 'Path', 'json', 'datetime', 'time',
'math', 'random', 'warnings', 'logging'
}
# Blocked imports for security
BLOCKED_IMPORTS = {
'subprocess', 'socket', 'urllib', 'requests', 'http', 'ftplib',
'smtplib', 'imaplib', 'poplib', 'telnetlib', 'paramiko', 'fabric',
'exec', 'eval', 'compile', 'importlib', '__import__',
'multiprocessing', 'threading', 'concurrent'
}
# Resource limits
MAX_EXECUTION_TIME = 300 # 5 minutes
MAX_MEMORY_MB = 1024 # 1GB
MAX_FILE_SIZE_MB = 100 # 100MB per file
def __init__(self, project_path: str, user_id: str, project_id: str):
"""
Initialize the sandbox runner.
Args:
project_path: Base path for project files (restricted filesystem access)
user_id: User identifier for logging and security
project_id: Project identifier for logging and security
"""
self.project_path = Path(project_path)
self.user_id = user_id
self.project_id = project_id
self.execution_id = f"{user_id}_{project_id}_{int(time.time())}"
def validate_code_security(self, code: str) -> None:
"""
Validate code for security violations before execution.
Args:
code: The Python code to validate
Raises:
SecurityViolationException: If code contains security violations
"""
try:
# Parse the code into an AST for analysis
tree = ast.parse(code)
# Check for dangerous operations
for node in ast.walk(tree):
# Check imports
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name in self.BLOCKED_IMPORTS:
raise SecurityViolationException(
f"Blocked import detected: {alias.name}",
"import_violation"
)
if alias.name not in self.ALLOWED_IMPORTS and not self._is_allowed_submodule(alias.name):
raise SecurityViolationException(
f"Unauthorized import: {alias.name}. Only sklearn, pandas, numpy, and joblib are allowed.",
"unauthorized_import"
)
elif isinstance(node, ast.ImportFrom):
if node.module in self.BLOCKED_IMPORTS:
raise SecurityViolationException(
f"Blocked import detected: {node.module}",
"import_violation"
)
if node.module and not self._is_allowed_submodule(node.module):
raise SecurityViolationException(
f"Unauthorized import: {node.module}. Only sklearn, pandas, numpy, and joblib are allowed.",
"unauthorized_import"
)
# Check for dangerous function calls
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ['exec', 'eval', 'compile', '__import__']:
raise SecurityViolationException(
f"Dangerous function call: {node.func.id}",
"dangerous_function"
)
# Check for file operations outside project directory
elif isinstance(node, ast.Str): # String literals that might be file paths
if any(dangerous in node.s.lower() for dangerous in ['/etc/', '/root/', '/home/', 'c:\\']):
raise SecurityViolationException(
f"Suspicious file path detected: {node.s}",
"file_access_violation"
)
logger.info(f"Code validation passed for execution {self.execution_id}")
except SyntaxError as e:
raise SecurityViolationException(f"Code contains syntax errors: {e}", "syntax_error")
except Exception as e:
if isinstance(e, SecurityViolationException):
raise
raise SecurityViolationException(f"Code validation failed: {e}", "validation_error")
def _is_allowed_submodule(self, module_name: str) -> bool:
"""Check if a module is an allowed submodule of sklearn, pandas, numpy, or joblib."""
allowed_prefixes = ['sklearn', 'pandas', 'numpy', 'joblib', 'os', 'sys', 'json', 'pathlib']
return any(module_name.startswith(prefix) for prefix in allowed_prefixes)
def execute_code(
self,
code: str,
input_data_path: str,
output_paths: Dict[str, str]
) -> Dict[str, Any]:
"""
Execute user-provided training code in a secure sandbox.
Args:
code: The Python code to execute
input_data_path: Path to input data file (relative to project path)
output_paths: Dictionary of output file paths (relative to project path)
Returns:
Dict containing execution results, metrics, and output paths
Raises:
SandboxExecutionException: If execution fails
SecurityViolationException: If security violations are detected
"""
# Validate code security first
self.validate_code_security(code)
# Create temporary execution directory
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create execution script
script_path = temp_path / f"execute_{self.execution_id}.py"
execution_code = self._prepare_execution_code(code, input_data_path, output_paths)
with open(script_path, 'w') as f:
f.write(execution_code)
# Execute in subprocess with resource limits
return self._execute_in_subprocess(script_path, temp_path)
def _prepare_execution_code(
self,
user_code: str,
input_data_path: str,
output_paths: Dict[str, str]
) -> str:
"""
Prepare the complete execution code with imports and path setup.
Args:
user_code: User-provided training code
input_data_path: Path to input data
output_paths: Dictionary of output paths
Returns:
Complete executable Python code
"""
# Resolve absolute paths for safety
project_abs_path = self.project_path.resolve()
input_abs_path = (project_abs_path / input_data_path).resolve()
# Validate paths are within project directory
if not str(input_abs_path).startswith(str(project_abs_path)):
raise SecurityViolationException(
f"Input path outside project directory: {input_data_path}",
"path_violation"
)
output_abs_paths = {}
for key, rel_path in output_paths.items():
abs_path = (project_abs_path / rel_path).resolve()
if not str(abs_path).startswith(str(project_abs_path)):
raise SecurityViolationException(
f"Output path outside project directory: {rel_path}",
"path_violation"
)
output_abs_paths[key] = str(abs_path)
# Create execution wrapper
execution_template = f'''
import os
import sys
import json
import traceback
import warnings
from pathlib import Path
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
# Setup paths
PROJECT_PATH = r"{project_abs_path}"
INPUT_DATA_PATH = r"{input_abs_path}"
OUTPUT_PATHS = {output_abs_paths}
# Execution results tracking
execution_results = {{
"status": "running",
"start_time": None,
"end_time": None,
"metrics": {{}},
"outputs": {{}},
"errors": [],
"logs": []
}}
def log_message(msg):
"""Log execution messages."""
execution_results["logs"].append(str(msg))
print(f"[SANDBOX] {{msg}}")
try:
import time
execution_results["start_time"] = time.time()
log_message("Starting code execution in sandbox")
# User code execution
log_message("Executing user-provided training code...")
{self._indent_code(user_code, 4)}
execution_results["end_time"] = time.time()
execution_results["status"] = "completed"
log_message("Code execution completed successfully")
# Check if expected outputs were created
for output_key, output_path in OUTPUT_PATHS.items():
if os.path.exists(output_path):
execution_results["outputs"][output_key] = output_path
log_message(f"Output created: {{output_key}} -> {{output_path}}")
else:
log_message(f"Warning: Expected output not created: {{output_key}} -> {{output_path}}")
except Exception as e:
execution_results["end_time"] = time.time()
execution_results["status"] = "failed"
execution_results["errors"].append({{
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
}})
log_message(f"Execution failed: {{str(e)}}")
print(traceback.format_exc())
finally:
# Save execution results
results_path = os.path.join(PROJECT_PATH, "execution_results.json")
with open(results_path, 'w') as f:
json.dump(execution_results, f, indent=2)
log_message(f"Execution results saved to {{results_path}}")
'''
return execution_template
def _indent_code(self, code: str, indent: int) -> str:
"""Indent code block for proper Python formatting."""
lines = code.split('\n')
indented_lines = [' ' * indent + line if line.strip() else line for line in lines]
return '\n'.join(indented_lines)
def _execute_in_subprocess(self, script_path: Path, temp_dir: Path) -> Dict[str, Any]:
"""
Execute code in a secure subprocess with resource limits.
Args:
script_path: Path to the execution script
temp_dir: Temporary directory for execution
Returns:
Execution results dictionary
"""
def preexec_fn():
"""Set resource limits for the subprocess."""
if HAS_RESOURCE:
# Set memory limit
resource.setrlimit(resource.RLIMIT_AS, (
self.MAX_MEMORY_MB * 1024 * 1024,
self.MAX_MEMORY_MB * 1024 * 1024
))
# Set CPU time limit
resource.setrlimit(resource.RLIMIT_CPU, (
self.MAX_EXECUTION_TIME,
self.MAX_EXECUTION_TIME
))
# Disable core dumps
resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
else:
logger.debug("Skipping resource limits (not available on Windows)")
# Set process group for easier cleanup
os.setpgrp()
try:
logger.info(f"Executing code in sandbox for {self.execution_id}")
# Create restricted environment
env = os.environ.copy()
env['PYTHONPATH'] = str(self.project_path)
env['HOME'] = str(temp_dir) # Restrict home directory access
# Execute with timeout
process = subprocess.Popen(
[sys.executable, str(script_path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=preexec_fn,
cwd=str(self.project_path),
text=True
)
try:
stdout, stderr = process.communicate(timeout=self.MAX_EXECUTION_TIME)
return_code = process.returncode
except subprocess.TimeoutExpired:
# Kill the entire process group
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except ProcessLookupError:
pass
process.kill()
stdout, stderr = process.communicate()
raise SandboxExecutionException(
f"Code execution timed out after {self.MAX_EXECUTION_TIME} seconds",
"timeout_error"
)
# Load execution results
results_path = self.project_path / "execution_results.json"
if results_path.exists():
with open(results_path, 'r') as f:
results = json.load(f)
# Clean up results file
results_path.unlink()
# Add subprocess info
results["subprocess"] = {
"return_code": return_code,
"stdout": stdout,
"stderr": stderr
}
if return_code != 0:
results["status"] = "failed"
results["errors"].append({
"type": "subprocess_error",
"message": f"Process exited with code {return_code}",
"stderr": stderr
})
logger.info(f"Sandbox execution completed for {self.execution_id}: {results['status']}")
return results
else:
# Fallback if results file wasn't created
raise SandboxExecutionException(
f"Execution results not found. stdout: {stdout}, stderr: {stderr}",
"results_missing"
)
except Exception as e:
if isinstance(e, SandboxExecutionException):
raise
logger.error(f"Sandbox execution failed for {self.execution_id}: {e}")
raise SandboxExecutionException(f"Sandbox execution failed: {e}", "execution_error")
@contextmanager
def temporary_limits(self):
"""Context manager for temporary resource limits during testing."""
original_time = self.MAX_EXECUTION_TIME
original_memory = self.MAX_MEMORY_MB
try:
# Reduce limits for testing
self.MAX_EXECUTION_TIME = 30 # 30 seconds for tests
self.MAX_MEMORY_MB = 256 # 256MB for tests
yield
finally:
# Restore original limits
self.MAX_EXECUTION_TIME = original_time
self.MAX_MEMORY_MB = original_memory
# Convenience function for simple execution
def execute_training_code_safely(
code: str,
project_path: str,
user_id: str,
project_id: str,
input_data_path: str,
output_paths: Dict[str, str]
) -> Dict[str, Any]:
"""
Convenience function to execute training code safely.
Args:
code: The training code to execute
project_path: Base project path
user_id: User identifier
project_id: Project identifier
input_data_path: Relative path to input data
output_paths: Dictionary of expected output paths
Returns:
Execution results dictionary
"""
runner = CodeSandboxRunner(project_path, user_id, project_id)
return runner.execute_code(code, input_data_path, output_paths)