File size: 2,990 Bytes
e34b94f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from typing import List, Tuple, Any
import re
import multiprocessing
from multiprocessing.connection import Connection

ExecuteResult = Tuple[bool, str, Tuple[bool]]


def extract_python_code(text_string: str) -> List[str]:

    code_blocks = re.findall(r"```python(.*?)```", text_string, re.DOTALL)
    if not code_blocks:
        code_blocks = [text_string]

    results = []
    for block in code_blocks:
        funcs = re.findall(r"(def\s+\w+\(.*?:[\s\S]*?)(?=^def\s|\Z)", block.strip(), re.MULTILINE)
        for func in funcs:
            results.append(func.strip())

    return results

def _exec_code_and_capture(code: str, conn: Connection):

    try:
        local_ns = {}
        exec(code, local_ns)

        for name, func in local_ns.items():
            if callable(func) and name.startswith("test_"):
                func()
        conn.send(True)  
    except Exception as e:
        conn.send(e)
    finally:
        conn.close()

class PyExecutor:
    def _run_with_timeout(self, code: str, timeout: int) -> Any:
        parent_conn, child_conn = multiprocessing.Pipe()
        p = multiprocessing.Process(target=_exec_code_and_capture, args=(code, child_conn))
        
        p.start()
        p.join(timeout)  
        
        if p.is_alive():
            p.kill()
            p.join()  
            raise TimeoutError("Test execution timed out")

        if parent_conn.poll():
            result = parent_conn.recv()
            if isinstance(result, Exception):
                raise result
            return result
        else:
            raise RuntimeError("Child process terminated unexpectedly without sending a result.")

    def execute(self, func: str, tests: List[str], timeout: int = 5, verbose: bool = True) -> ExecuteResult:
        success_tests = []
        failed_tests = []
        is_passing = True

        for test_code in tests:
            cleaned_test = re.sub(r"^\s*from\s+solution\s+import\s+\w+\s*", "", test_code, flags=re.MULTILINE)
            code_to_run = func + "\n" + cleaned_test
            try:
                self._run_with_timeout(code_to_run, timeout)
                success_tests.append(test_code)
            except Exception as e:
                failed_tests.append(f"{test_code}  # output: {e}")
                is_passing = False

        state = tuple(test in success_tests for test in tests)
        feedback = (
            "Tests passed:\n" + "\n".join(success_tests)
            + "\n\nTests failed:\n" + "\n".join(failed_tests)
        )
        return is_passing, feedback, state

    def evaluate(self, name: str, func: str, test: str, timeout: int = 5) -> bool:
        cleaned_test = re.sub(r"^\s*from\s+solution\s+import\s+\w+\s*", "", test, flags=re.MULTILINE)
        code_to_run = func + "\n" + cleaned_test
        try:
            self._run_with_timeout(code_to_run, timeout)
            return True
        except Exception:
            return False