File size: 4,488 Bytes
a5c1fa0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# server/sandbox.py
"""
Secure subprocess execution for running agent-submitted code.
NEVER run agent code as root. ALWAYS use timeouts.
"""
import subprocess
import os
import sys
import tempfile
from typing import Tuple
from pathlib import Path
import re


EXECUTION_TIMEOUT = 10       # seconds — hard limit per test run
MAX_OUTPUT_BYTES = 50_000    # truncate large outputs
MAX_MEMORY_MB = 256          # memory limit for subprocess


def run_pytest_sandboxed(repo_path: str, test_file: str = None) -> Tuple[float, str, bool]:
    """
    Run pytest in a sandboxed subprocess.

    Returns:
        (pass_rate: float, output: str, timed_out: bool)
    """
    cmd = [sys.executable, "-m", "pytest", "--tb=short", "-q", "--no-header"]

    if test_file:
        test_path = os.path.join(repo_path, test_file)
        if not os.path.exists(test_path):
            return 0.0, f"Test file not found: {test_file}", False
        cmd.append(test_path)
    else:
        tests_dir = os.path.join(repo_path, "tests")
        if os.path.exists(tests_dir):
            cmd.append(tests_dir)
        else:
            cmd.append(repo_path)

    def set_limits():
        """Set resource limits for subprocess — runs in child process."""
        try:
            import resource
            mem_bytes = MAX_MEMORY_MB * 1024 * 1024
            resource.setrlimit(resource.RLIMIT_AS, (mem_bytes, mem_bytes))
        except Exception:
            pass  # Best effort — don't fail if setrlimit unavailable

    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=EXECUTION_TIMEOUT,
            cwd=repo_path,
            env={
                **os.environ,
                "PYTHONPATH": repo_path,
                "PYTHONDONTWRITEBYTECODE": "1",
            },
            preexec_fn=set_limits if sys.platform != "win32" else None,
        )

        output = (result.stdout + result.stderr)[:MAX_OUTPUT_BYTES]
        pass_rate = _parse_pass_rate(output, result.returncode)
        return pass_rate, output, False

    except subprocess.TimeoutExpired:
        return 0.0, f"TIMEOUT: Tests exceeded {EXECUTION_TIMEOUT}s limit", True
    except Exception as e:
        return 0.0, f"EXECUTION_ERROR: {str(e)}", False


def _parse_pass_rate(output: str, returncode: int) -> float:
    """Parse pytest output to get pass rate 0.0–1.0."""
    # Look for "X passed, Y failed" or "X passed" or "X failed"
    passed_match = re.search(r'(\d+) passed', output)
    failed_match = re.search(r'(\d+) failed', output)
    error_match = re.search(r'(\d+) error', output)

    passed = int(passed_match.group(1)) if passed_match else 0
    failed = int(failed_match.group(1)) if failed_match else 0
    errors = int(error_match.group(1)) if error_match else 0

    total = passed + failed + errors
    if total == 0:
        # If returncode is 0, all passed; otherwise failure
        return 1.0 if returncode == 0 else 0.0

    return round(passed / total, 3)


def validate_file_path(path: str, repo_root: str) -> bool:
    """
    Ensure agent cannot read/write files outside the repo.
    Prevents path traversal attacks.
    """
    try:
        repo_abs = os.path.abspath(repo_root)
        file_abs = os.path.abspath(os.path.join(repo_root, path))
        return file_abs.startswith(repo_abs + os.sep) or file_abs == repo_abs
    except Exception:
        return False


def search_in_repo(query: str, repo_path: str) -> str:
    """Grep-style search across all Python files in the repo."""
    results = []
    for root, dirs, files in os.walk(repo_path):
        # Skip __pycache__ and hidden dirs
        dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__']
        for fname in files:
            if fname.endswith('.py') or fname.endswith('.md') or fname.endswith('.json'):
                fpath = os.path.join(root, fname)
                rel_path = os.path.relpath(fpath, repo_path)
                try:
                    with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
                        for lineno, line in enumerate(f, 1):
                            if query.lower() in line.lower():
                                results.append(f"{rel_path}:{lineno}: {line.rstrip()}")
                except Exception:
                    continue
    if not results:
        return f"No matches found for: {query}"
    return '\n'.join(results[:50])  # Limit to 50 matches