Spaces:
Sleeping
Sleeping
| # server/sandbox.py | |
| """ | |
| Secure subprocess execution for running agent-submitted code. | |
| NEVER run agent code as root. ALWAYS use timeouts. | |
| """ | |
| import subprocess | |
| import os | |
| import sys | |
| import tempfile | |
| from typing import Tuple | |
| from pathlib import Path | |
| import re | |
| EXECUTION_TIMEOUT = 10 # seconds β hard limit per test run | |
| MAX_OUTPUT_BYTES = 50_000 # truncate large outputs | |
| MAX_MEMORY_MB = 256 # memory limit for subprocess | |
| def run_pytest_sandboxed(repo_path: str, test_file: str = None) -> Tuple[float, str, bool]: | |
| """ | |
| Run pytest in a sandboxed subprocess. | |
| Returns: | |
| (pass_rate: float, output: str, timed_out: bool) | |
| """ | |
| cmd = [sys.executable, "-m", "pytest", "--tb=short", "-q", "--no-header"] | |
| if test_file: | |
| test_path = os.path.join(repo_path, test_file) | |
| if not os.path.exists(test_path): | |
| return 0.0, f"Test file not found: {test_file}", False | |
| cmd.append(test_path) | |
| else: | |
| tests_dir = os.path.join(repo_path, "tests") | |
| if os.path.exists(tests_dir): | |
| cmd.append(tests_dir) | |
| else: | |
| cmd.append(repo_path) | |
| def set_limits(): | |
| """Set resource limits for subprocess β runs in child process.""" | |
| try: | |
| import resource | |
| mem_bytes = MAX_MEMORY_MB * 1024 * 1024 | |
| resource.setrlimit(resource.RLIMIT_AS, (mem_bytes, mem_bytes)) | |
| except Exception: | |
| pass # Best effort β don't fail if setrlimit unavailable | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=EXECUTION_TIMEOUT, | |
| cwd=repo_path, | |
| env={ | |
| **os.environ, | |
| "PYTHONPATH": repo_path, | |
| "PYTHONDONTWRITEBYTECODE": "1", | |
| }, | |
| preexec_fn=set_limits if sys.platform != "win32" else None, | |
| ) | |
| output = (result.stdout + result.stderr)[:MAX_OUTPUT_BYTES] | |
| pass_rate = _parse_pass_rate(output, result.returncode) | |
| return pass_rate, output, False | |
| except subprocess.TimeoutExpired: | |
| return 0.0, f"TIMEOUT: Tests exceeded {EXECUTION_TIMEOUT}s limit", True | |
| except Exception as e: | |
| return 0.0, f"EXECUTION_ERROR: {str(e)}", False | |
| def _parse_pass_rate(output: str, returncode: int) -> float: | |
| """Parse pytest output to get pass rate 0.0β1.0.""" | |
| # Look for "X passed, Y failed" or "X passed" or "X failed" | |
| passed_match = re.search(r'(\d+) passed', output) | |
| failed_match = re.search(r'(\d+) failed', output) | |
| error_match = re.search(r'(\d+) error', output) | |
| passed = int(passed_match.group(1)) if passed_match else 0 | |
| failed = int(failed_match.group(1)) if failed_match else 0 | |
| errors = int(error_match.group(1)) if error_match else 0 | |
| total = passed + failed + errors | |
| if total == 0: | |
| # If returncode is 0, all passed; otherwise failure | |
| return 1.0 if returncode == 0 else 0.0 | |
| return round(passed / total, 3) | |
| def validate_file_path(path: str, repo_root: str) -> bool: | |
| """ | |
| Ensure agent cannot read/write files outside the repo. | |
| Prevents path traversal attacks. | |
| """ | |
| try: | |
| repo_abs = os.path.abspath(repo_root) | |
| file_abs = os.path.abspath(os.path.join(repo_root, path)) | |
| return file_abs.startswith(repo_abs + os.sep) or file_abs == repo_abs | |
| except Exception: | |
| return False | |
| def search_in_repo(query: str, repo_path: str) -> str: | |
| """Grep-style search across all Python files in the repo.""" | |
| results = [] | |
| for root, dirs, files in os.walk(repo_path): | |
| # Skip __pycache__ and hidden dirs | |
| dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__'] | |
| for fname in files: | |
| if fname.endswith('.py') or fname.endswith('.md') or fname.endswith('.json'): | |
| fpath = os.path.join(root, fname) | |
| rel_path = os.path.relpath(fpath, repo_path) | |
| try: | |
| with open(fpath, 'r', encoding='utf-8', errors='ignore') as f: | |
| for lineno, line in enumerate(f, 1): | |
| if query.lower() in line.lower(): | |
| results.append(f"{rel_path}:{lineno}: {line.rstrip()}") | |
| except Exception: | |
| continue | |
| if not results: | |
| return f"No matches found for: {query}" | |
| return '\n'.join(results[:50]) # Limit to 50 matches | |