File size: 4,924 Bytes
cf78e0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
import subprocess
import textwrap
import ast
import resource
import os

app = FastAPI(title="Python Exec Service (PES) on HuggingFace")

# ----------------- 配置 -----------------
MAX_CODE_LENGTH = 20_000           # 代码最大长度
MAX_TIMEOUT = 20                   # 固定最长 20 秒(由 PES 控制)
MAX_MEMORY_MB = 256                # 子进程最大内存
MAX_OUTPUT_SIZE = 4 * 1024 * 1024  # 4MB 输出限制

# ----------------- 请求 / 响应模型 -----------------
class ExecRequest(BaseModel):
    code: str = Field(..., description="要执行的 Python 代码")
    stdin: Optional[str] = Field("", description="传给程序的标准输入")

class ExecResponse(BaseModel):
    stdout: str
    stderr: str
    return_code: Optional[int]
    timeout: bool

# ----------------- 安全检查:AST 审计 -----------------
DANGEROUS_NAMES = {
    "__import__", "eval", "exec", "open", "compile",
    "input", "globals", "locals", "vars",
    "os", "sys", "subprocess", "socket",
    "shutil", "pathlib"
}

DANGEROUS_MODULES = {
    "os", "sys", "subprocess", "socket", "shutil",
    "pathlib", "threading", "multiprocessing"
}

class SafeVisitor(ast.NodeVisitor):
    def visit_Import(self, node):
        for alias in node.names:
            if alias.name.split('.')[0] in DANGEROUS_MODULES:
                raise ValueError(f"禁止导入模块: {alias.name}")
        self.generic_visit(node)

    def visit_ImportFrom(self, node):
        if node.module and node.module.split('.')[0] in DANGEROUS_MODULES:
            raise ValueError(f"禁止 from 导入模块: {node.module}")
        self.generic_visit(node)

    def visit_Attribute(self, node):
        # 禁止访问 __dict__ / __class__ / __globals__ 等双下划线属性
        if isinstance(node.attr, str) and node.attr.startswith("__"):
            raise ValueError(f"禁止访问特殊属性: {node.attr}")
        self.generic_visit(node)

    def visit_Name(self, node):
        if node.id in DANGEROUS_NAMES:
            raise ValueError(f"禁止使用名称: {node.id}")
        self.generic_visit(node)

def static_security_check(code: str):
    try:
        tree = ast.parse(code, mode="exec")
    except SyntaxError as e:
        raise ValueError(f"语法错误: {e}")
    SafeVisitor().visit(tree)

# ----------------- 资源限制(子进程 preexec_fn) -----------------
def set_limits():
    # CPU 时间限制(秒)—— hard limit MAX_TIMEOUT
    resource.setrlimit(resource.RLIMIT_CPU, (MAX_TIMEOUT, MAX_TIMEOUT))
    # 地址空间限制(内存)
    max_bytes = MAX_MEMORY_MB * 1024 * 1024
    resource.setrlimit(resource.RLIMIT_AS, (max_bytes, max_bytes))
    # 文件大小限制(防止写超大文件)
    resource.setrlimit(resource.RLIMIT_FSIZE, (MAX_OUTPUT_SIZE, MAX_OUTPUT_SIZE))

# ----------------- 核心执行 API -----------------
@app.post("/execute", response_model=ExecResponse)
def execute(req: ExecRequest):
    # 1. 限制代码长度
    if len(req.code) > MAX_CODE_LENGTH:
        raise HTTPException(status_code=400, detail="代码过长,超过限制")

    # 2. 去掉多余缩进
    code = textwrap.dedent(req.code)

    # 3. 静态安全检查(AST)
    try:
        static_security_check(code)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=f"安全检查不通过: {e}")

    # 4. 执行:PES 仅提供命令参数,不允许外部控制 timeout
    try:
        proc = subprocess.run(
            ["python", "-c", code],
            input=req.stdin.encode("utf-8") if req.stdin else None,
            capture_output=True,
            timeout=MAX_TIMEOUT,   # 固定 20 秒
            preexec_fn=set_limits  # 资源限制
        )
        stdout = proc.stdout[:MAX_OUTPUT_SIZE].decode("utf-8", errors="replace")
        stderr = proc.stderr[:MAX_OUTPUT_SIZE].decode("utf-8", errors="replace")

        return ExecResponse(
            stdout=stdout,
            stderr=stderr,
            return_code=proc.returncode,
            timeout=False
        )
    except subprocess.TimeoutExpired as e:
        # 子进程超时
        stdout = (e.stdout or b"")[:MAX_OUTPUT_SIZE].decode("utf-8", errors="replace")
        stderr = (e.stderr or b"")[:MAX_OUTPUT_SIZE].decode("utf-8", errors="replace")
        if not stderr:
            stderr = "Process timed out"
        return ExecResponse(
            stdout=stdout,
            stderr=stderr,
            return_code=None,
            timeout=True
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"执行失败: {e}")

# 本地调试用,HF Space 也可以用这个启动命令
if __name__ == "__main__":
    import uvicorn
    port = int(os.environ.get("PORT", 7860))
    uvicorn.run("app:app", host="0.0.0.0", port=port)