leezhuuu commited on
Commit
f3f259e
·
verified ·
1 Parent(s): 4286f59

Upload 7 files

Browse files
Files changed (7) hide show
  1. Dockerfile +38 -0
  2. app.py +251 -0
  3. build.py +27 -0
  4. center.py +1392 -0
  5. concurrent_test.py +32 -0
  6. config.yaml +44 -0
  7. requirements.txt +24 -0
Dockerfile ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 第一阶段:构建阶段
2
+ FROM python:3.10-slim AS builder
3
+
4
+ # 设置工作目录
5
+ WORKDIR /app
6
+
7
+ # 复制依赖文件
8
+ COPY requirements.txt .
9
+
10
+ # 安装构建依赖
11
+ RUN apt-get update && apt-get install -y \
12
+ build-essential \
13
+ libssl-dev \
14
+ libffi-dev \
15
+ python3-dev \
16
+ && apt-get clean \
17
+ && pip install --no-cache-dir --upgrade pip \
18
+ && pip install --no-cache-dir -r requirements.txt
19
+
20
+ # 第二阶段:运行阶段
21
+ FROM python:3.10-slim
22
+
23
+ # 设置工作目录
24
+ WORKDIR /app
25
+
26
+ # 从构建阶段复制必要的文件
27
+ COPY --from=builder /usr/local/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages
28
+ COPY --from=builder /usr/local/bin /usr/local/bin
29
+ COPY . .
30
+
31
+ # 设置环境变量
32
+ ENV PYTHONUNBUFFERED=1
33
+
34
+ # 暴露 Flask 端口
35
+ EXPOSE 5000
36
+
37
+ # 运行 Flask 应用
38
+ CMD ["gunicorn", "-b", "0.0.0.0:5000", "app:app"]
app.py ADDED
@@ -0,0 +1,251 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from flask import Flask, request, jsonify, Response
2
+ import subprocess
3
+ import sys
4
+ import threading
5
+ import time
6
+ import requests
7
+ from functools import wraps
8
+ import urllib.parse
9
+ import re
10
+ import json
11
+ import base64
12
+ import os
13
+ import glob
14
+
15
+ app = Flask(__name__)
16
+
17
+ # 从环境变量中读取配置,如果未设置则使用默认值
18
+ OUTPUT_BASE64 = os.getenv('OUTPUT_BASE64', 'True').lower() == 'true'
19
+ AUTH_TOKEN = os.getenv('AUTH_TOKEN', '')
20
+ # HOST = os.getenv('HOST', '127.0.0.1')
21
+ HOST = os.getenv('HOST', '0.0.0.0')
22
+ PORT = int(os.getenv('PORT', 5000))
23
+ TIMEOUT = int(os.getenv('TIMEOUT', 30))
24
+ REMOVE_PYTHON_CODE_BLOCK = os.getenv('REMOVE_PYTHON_CODE_BLOCK', 'True').lower() == 'true'
25
+
26
+ # 配置变量,决定是否启用单元测试,默认启用
27
+ ENABLE_UNIT_TESTS = False
28
+
29
+ def auth_required(f):
30
+ @wraps(f)
31
+ def decorated(*args, **kwargs):
32
+ if AUTH_TOKEN:
33
+ auth_header = request.headers.get('Authorization')
34
+ if not auth_header or not auth_header.startswith('Bearer '):
35
+ return jsonify({'error': 'Authorization header is missing or invalid'}), 401
36
+ token = auth_header.split('Bearer ')[1]
37
+ if token != AUTH_TOKEN:
38
+ return jsonify({'error': 'Invalid token'}), 401
39
+ return f(*args, **kwargs)
40
+ return decorated
41
+
42
+ def extract_code(code):
43
+ if REMOVE_PYTHON_CODE_BLOCK:
44
+ match = re.search(r'```python\s*(.*?)\s*```', code, re.DOTALL)
45
+ if match:
46
+ return match.group(1)
47
+ return code
48
+
49
+ def run_code(language, code, variables, timeout):
50
+ try:
51
+ # 根据语言类型选择解释器
52
+ if language == 'python':
53
+ # 提取代码中的有效部分
54
+ code = extract_code(code)
55
+ # 将变量注入到代码环境中
56
+ var_definitions = '\n'.join(f"{key} = {value}" for key, value in variables.items())
57
+ code_with_vars = f"{var_definitions}\n{code}"
58
+ # 使用subprocess运行代码
59
+ result = subprocess.run([sys.executable, '-c', code_with_vars], capture_output=True, text=True, timeout=timeout)
60
+ return result.stdout, result.stderr
61
+ else:
62
+ return None, "Unsupported language"
63
+ except subprocess.TimeoutExpired:
64
+ return None, "Code execution timed out"
65
+ except Exception as e:
66
+ return None, str(e)
67
+
68
+ def convert_image_to_base64(image_path):
69
+ with open(image_path, "rb") as image_file:
70
+ encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
71
+ return encoded_string
72
+
73
+ def unescape_string(s):
74
+ return s.encode('utf-8').decode('unicode_escape')
75
+
76
+ @app.route('/runcode', methods=['POST', 'GET'])
77
+ @auth_required
78
+ def run_code_endpoint():
79
+ if request.method == 'POST':
80
+ data = request.get_json()
81
+ if not data:
82
+ return jsonify({'error': 'Invalid JSON'}), 400
83
+ language = data.get('languageType', 'python')
84
+ variables = data.get('variables', {})
85
+ code = data.get('code', '')
86
+ elif request.method == 'GET':
87
+ query_string = request.query_string.decode('utf-8')
88
+ print("Query String:",query_string)
89
+ # 检查query_string中是否有"+"
90
+ if '+' in query_string:
91
+ # query_string的+替换为空格
92
+ query_string = re.sub(r'\+', ' ', query_string)
93
+ print("Query String:",query_string)
94
+
95
+ # 如果没有+,则直接输出query_string
96
+ else:
97
+ print("Query String:",query_string)
98
+
99
+
100
+ # 使用正则表达式提取参数
101
+ language_match = re.search(r'languageType=("?)([^"&]+)\1', query_string)
102
+ variables_match = re.search(r'variables=("?)([^"&]+)\1', query_string)
103
+ code_match = re.search(r'code=("?)([^"&]+)\1', query_string)
104
+ print("code_match:",code_match)
105
+ if not language_match or not variables_match or not code_match:
106
+ return jsonify({'error': 'Invalid parameters'}), 400
107
+
108
+
109
+
110
+ language = urllib.parse.unquote(language_match.group(2))
111
+ variables = urllib.parse.unquote(variables_match.group(2))
112
+ code = urllib.parse.unquote(code_match.group(2))
113
+ print("code:",code)
114
+
115
+ # 处理转义字符
116
+ language = unescape_string(language)
117
+ variables = unescape_string(variables)
118
+ code = unescape_string(code)
119
+ print("code:",code)
120
+
121
+ # # 使用正则表达式将 + 替换为空格
122
+ # code = re.sub(r'\+', ' ', code)
123
+ # print("code:",code)
124
+
125
+
126
+ try:
127
+ variables = json.loads(variables)
128
+ except json.JSONDecodeError:
129
+ return jsonify({'error': 'Invalid variables format'}), 400
130
+
131
+ # 打印解析后的JSON参数
132
+ print("Parsed JSON Parameters:")
133
+ print(json.dumps({
134
+ 'languageType': language,
135
+ 'variables': variables,
136
+ 'code': code
137
+ }, indent=4))
138
+ else:
139
+ return jsonify({'error': 'Unsupported HTTP method'}), 405
140
+
141
+ # 运行代码
142
+ stdout, stderr = run_code(language, code, variables, TIMEOUT)
143
+
144
+ if stderr:
145
+ return jsonify({'error': stderr}), 400
146
+ else:
147
+ # 查找当前目录中的所有图片文件
148
+ image_files = glob.glob('*.png') + glob.glob('*.jpg') + glob.glob('*.jpeg') + glob.glob('*.gif')
149
+
150
+ if image_files:
151
+ image_data = {}
152
+ for image_path in image_files:
153
+ encoded_image = convert_image_to_base64(image_path)
154
+ image_data[image_path] = encoded_image
155
+ os.remove(image_path) # 删除生成的图片文件
156
+
157
+ if not OUTPUT_BASE64:
158
+ # 如果 OUTPUT_BASE64 不是 True,则输出原始图片
159
+ html_content = ''.join([f'<img src="{image_path}" alt="{image_path}" />' for image_path in image_data.keys()])
160
+ return Response(html_content, mimetype='text/html')
161
+
162
+ else:
163
+ return jsonify({'output': stdout, 'images': image_data}), 200
164
+ else:
165
+ return jsonify({'output': stdout}), 200
166
+
167
+ def run_test_case(test_case):
168
+ url = f'http://{HOST}:{PORT}/runcode'
169
+ if test_case['method'] == 'GET':
170
+ params = {
171
+ 'languageType': test_case['language'],
172
+ 'variables': test_case['variables'],
173
+ 'code': test_case['code']
174
+ }
175
+ headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
176
+ print("Request Params:")
177
+ print(params)
178
+
179
+ response = requests.get(url, params=params, headers=headers)
180
+ elif test_case['method'] == 'POST':
181
+ data = {
182
+ 'languageType': test_case['language'],
183
+ 'variables': test_case['variables'],
184
+ 'code': test_case['code']
185
+ }
186
+ headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
187
+ print("Request Data:")
188
+ print(data)
189
+
190
+ response = requests.post(url, json=data, headers=headers)
191
+ else:
192
+ raise ValueError("Unsupported HTTP method in test case")
193
+
194
+ # 打印返回JSON
195
+ print("Response JSON:")
196
+ print(response.json())
197
+
198
+ def test_run_code_endpoint():
199
+ test_cases = [
200
+ {
201
+ 'method': 'GET',
202
+ 'language': 'python',
203
+ 'variables': str({}),
204
+ 'code': "print('Hello, World!')"
205
+ },
206
+ {
207
+ 'method': 'GET',
208
+ 'language': '"python"',
209
+ 'variables': str({}),
210
+ 'code': '"print(\'Hello, World!\')"'
211
+ },
212
+ {
213
+ 'method': 'GET',
214
+ 'language': 'python',
215
+ 'variables': str({}),
216
+ 'code': "print('Hello%20World')"
217
+ },
218
+ {
219
+ 'method': 'POST',
220
+ 'language': 'python',
221
+ 'variables': {},
222
+ 'code': "print(5 ** 11)"
223
+ },
224
+ {
225
+ 'method': 'POST',
226
+ 'language': 'python',
227
+ 'variables': {'m': 7, 'n': 4},
228
+ 'code': "print(m * n)"
229
+ },
230
+ {
231
+ 'method': 'POST',
232
+ 'language': 'python',
233
+ 'variables': {},
234
+ 'code': "```python\nprint('Hello from code block!')\n```"
235
+ }
236
+ ]
237
+
238
+ for test_case in test_cases:
239
+ run_test_case(test_case)
240
+
241
+ if __name__ == '__main__':
242
+ # 启动Flask应用
243
+ threading.Thread(target=app.run, kwargs={'host': HOST, 'port': PORT, 'debug': False}).start()
244
+
245
+ # 等待Flask应用启动
246
+ time.sleep(1)
247
+
248
+ # 运行单元测试
249
+ if ENABLE_UNIT_TESTS:
250
+ test_run_code_endpoint()
251
+ print("All tests executed!")
build.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import yaml
2
+ import subprocess
3
+
4
+ # 读取配置文件,指定编码为 utf-8
5
+ with open('config.yaml', 'r', encoding='utf-8') as file:
6
+ config = yaml.safe_load(file)
7
+
8
+ DEPENDENCIES = config['dependencies']
9
+ INTERPRETER_IMAGE = config['interpreter_image']
10
+
11
+ # 动态生成 requirements.txt
12
+ def generate_requirements(dependencies):
13
+ with open('requirements.txt', 'w', encoding='utf-8') as f:
14
+ for dep in dependencies:
15
+ f.write(f"{dep}\n")
16
+
17
+ generate_requirements(DEPENDENCIES)
18
+ print("Generated requirements.txt with specified dependencies.")
19
+
20
+ # 构建 Docker 镜像
21
+ def build_docker_image(image_name):
22
+ build_command = ["docker", "build", "-t", image_name, "."]
23
+ subprocess.run(build_command, check=True)
24
+ print(f"Built Docker image {image_name}")
25
+
26
+ build_docker_image(INTERPRETER_IMAGE)
27
+
center.py ADDED
@@ -0,0 +1,1392 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # # # # import yaml
2
+ # # # # import subprocess
3
+ # # # # import uuid
4
+ # # # # from flask import Flask, request, jsonify
5
+ # # # # from queue import Queue
6
+ # # # # import requests
7
+ # # # # import threading
8
+ # # # # import time
9
+ # # # # import os
10
+ # # # # import atexit
11
+ # # # # import base64
12
+ # # # # import signal
13
+ # # # # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer
14
+ # # # # from sqlalchemy.orm import declarative_base, sessionmaker
15
+ # # # # from sqlalchemy.exc import OperationalError, IntegrityError
16
+ # # # # import psycopg2
17
+
18
+ # # # # app = Flask(__name__)
19
+
20
+ # # # # # 读取配置文件,指定编码为 utf-8
21
+ # # # # with open('config.yaml', 'r', encoding='utf-8') as file:
22
+ # # # # config = yaml.safe_load(file)
23
+
24
+ # # # # DOMAIN = config['domain']
25
+ # # # # INTERPRETER_IMAGE = config['interpreter_image']
26
+ # # # # PORT_START = config['interpreter_port_range']['start']
27
+ # # # # PORT_END = config['interpreter_port_range']['end']
28
+ # # # # DEPENDENCIES = config['dependencies']
29
+ # # # # RESOURCE_LIMITS = config.get('resource_limits', {})
30
+ # # # # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory')
31
+ # # # # CPU_LIMIT = RESOURCE_LIMITS.get('cpus')
32
+ # # # # POSTGRES = config['postgres']
33
+ # # # # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒
34
+ # # # # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口
35
+
36
+ # # # # # 容器管理
37
+ # # # # containers = []
38
+ # # # # ports = list(range(PORT_START, PORT_END + 1))
39
+ # # # # lock = threading.Lock()
40
+ # # # # request_queue = Queue()
41
+ # # # # result_dict = {} # 用于存储结果的字典
42
+ # # # # current_container_index = 0
43
+
44
+ # # # # # 检查 PostgreSQL 容器是否存在
45
+ # # # # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter"
46
+ # # # # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata')
47
+
48
+ # # # # def is_postgres_container_running():
49
+ # # # # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True)
50
+ # # # # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n')
51
+
52
+ # # # # def wait_for_postgres_ready():
53
+ # # # # retry_attempts = 10
54
+ # # # # retry_delay = 5 # seconds
55
+ # # # # for attempt in range(retry_attempts):
56
+ # # # # try:
57
+ # # # # conn = psycopg2.connect(
58
+ # # # # dbname=POSTGRES['db'],
59
+ # # # # user=POSTGRES['user'],
60
+ # # # # password=POSTGRES['password'],
61
+ # # # # host=POSTGRES['host'],
62
+ # # # # port=POSTGRES['port']
63
+ # # # # )
64
+ # # # # conn.close()
65
+ # # # # print("PostgreSQL is ready")
66
+ # # # # return True
67
+ # # # # except psycopg2.OperationalError as e:
68
+ # # # # print(f"Waiting for PostgreSQL to be ready: {e}")
69
+ # # # # time.sleep(retry_delay)
70
+ # # # # return False
71
+
72
+ # # # # if not is_postgres_container_running():
73
+ # # # # if not os.path.exists(POSTGRES_DATA_DIR):
74
+ # # # # os.makedirs(POSTGRES_DATA_DIR)
75
+
76
+ # # # # subprocess.run([
77
+ # # # # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d",
78
+ # # # # "-e", f"POSTGRES_USER={POSTGRES['user']}",
79
+ # # # # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}",
80
+ # # # # "-e", f"POSTGRES_DB={POSTGRES['db']}",
81
+ # # # # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data",
82
+ # # # # "-p", f"{POSTGRES['port']}:5432",
83
+ # # # # "postgres"
84
+ # # # # ], check=True)
85
+
86
+ # # # # if not wait_for_postgres_ready():
87
+ # # # # print("Failed to connect to PostgreSQL after multiple attempts.")
88
+ # # # # exit(1)
89
+
90
+ # # # # # PostgreSQL 配置
91
+ # # # # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}"
92
+ # # # # engine = None
93
+ # # # # Base = declarative_base()
94
+ # # # # Session = None
95
+ # # # # session = None
96
+
97
+ # # # # def init_db():
98
+ # # # # global engine, Session, session
99
+ # # # # retry_attempts = 5
100
+ # # # # retry_delay = 5 # seconds
101
+ # # # # for attempt in range(retry_attempts):
102
+ # # # # try:
103
+ # # # # engine = create_engine(DATABASE_URL)
104
+ # # # # Session = sessionmaker(bind=engine)
105
+ # # # # session = Session()
106
+ # # # # Base.metadata.create_all(engine) # 确保数据库模式已创建
107
+ # # # # print("Database connection successful")
108
+ # # # # break
109
+ # # # # except OperationalError as e:
110
+ # # # # print(f"Database connection failed: {e}")
111
+ # # # # if attempt < retry_attempts - 1:
112
+ # # # # print(f"Retrying in {retry_delay} seconds...")
113
+ # # # # time.sleep(retry_delay)
114
+ # # # # else:
115
+ # # # # print("Failed to connect to the database after multiple attempts.")
116
+ # # # # raise
117
+
118
+ # # # # class Image(Base):
119
+ # # # # __tablename__ = 'images'
120
+ # # # # id = Column(Integer, primary_key=True, autoincrement=True)
121
+ # # # # filename = Column(String, unique=True, nullable=False)
122
+ # # # # data = Column(LargeBinary, nullable=False)
123
+
124
+ # # # # init_db()
125
+
126
+ # # # # # 启动代码解释器容器
127
+ # # # # def start_container(port):
128
+ # # # # container_name = f"code_interpreter_docker_{uuid.uuid4()}"
129
+ # # # # run_command = [
130
+ # # # # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000"
131
+ # # # # ]
132
+ # # # # if MEMORY_LIMIT:
133
+ # # # # run_command.extend(["--memory", MEMORY_LIMIT])
134
+ # # # # if CPU_LIMIT:
135
+ # # # # run_command.extend(["--cpus", CPU_LIMIT])
136
+ # # # # run_command.append(INTERPRETER_IMAGE)
137
+
138
+ # # # # subprocess.run(run_command, check=True)
139
+ # # # # print(f"Started container {container_name} on port {port}")
140
+ # # # # return container_name, port
141
+
142
+ # # # # def stop_container(container_name):
143
+ # # # # subprocess.run(["docker", "stop", container_name])
144
+ # # # # subprocess.run(["docker", "rm", container_name])
145
+ # # # # print(f"Stopped and removed container {container_name}")
146
+
147
+ # # # # def start_containers():
148
+ # # # # for port in ports:
149
+ # # # # container_name, port = start_container(port)
150
+ # # # # containers.append((container_name, port))
151
+
152
+ # # # # def stop_containers():
153
+ # # # # for container_name, _ in containers:
154
+ # # # # stop_container(container_name)
155
+
156
+ # # # # @app.route('/runcode', methods=['POST', 'GET'])
157
+ # # # # def run_code():
158
+ # # # # if request.method == 'POST':
159
+ # # # # data = request.get_json()
160
+ # # # # if not data:
161
+ # # # # return jsonify({'error': 'Invalid JSON'}), 400
162
+ # # # # elif request.method == 'GET':
163
+ # # # # query_string = request.query_string.decode('utf-8')
164
+ # # # # data = {'query_string': query_string}
165
+ # # # # else:
166
+ # # # # return jsonify({'error': 'Invalid request method'}), 405
167
+
168
+ # # # # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID
169
+ # # # # request_queue.put((request_id, data))
170
+
171
+ # # # # start_time = time.time()
172
+ # # # # while time.time() - start_time < 10: # 最多等待10秒
173
+ # # # # if request_id in result_dict:
174
+ # # # # output = result_dict.pop(request_id)
175
+ # # # # # 处理返回结果,替换 base64 数据为链接
176
+ # # # # if 'images' in output:
177
+ # # # # try:
178
+ # # # # output = process_images(output)
179
+ # # # # except Exception as e:
180
+ # # # # return jsonify({'error': str(e)}), 500
181
+ # # # # return jsonify(output), 200
182
+
183
+ # # # # time.sleep(0.1)
184
+
185
+ # # # # return jsonify({'error': 'Request timed out'}), 504
186
+
187
+ # # # # def process_images(output):
188
+ # # # # images = output['images']
189
+ # # # # for filename, base64_data in images.items():
190
+ # # # # image_data = base64.b64decode(base64_data)
191
+ # # # # unique_filename = f"{uuid.uuid4()}_{filename}"
192
+ # # # # image_record = Image(filename=unique_filename, data=image_data)
193
+ # # # # try:
194
+ # # # # session.add(image_record)
195
+ # # # # session.commit()
196
+ # # # # except IntegrityError as e:
197
+ # # # # session.rollback()
198
+ # # # # print(f"Database error: {e}")
199
+ # # # # continue
200
+ # # # # if DOMAIN:
201
+ # # # # images[filename] = f"https://{DOMAIN}/image/{unique_filename}"
202
+ # # # # else:
203
+ # # # # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}"
204
+ # # # # return output
205
+
206
+
207
+ # # # # @app.route('/image/<filename>', methods=['GET'])
208
+ # # # # def serve_image(filename):
209
+ # # # # image_record = session.query(Image).filter_by(filename=filename).first()
210
+ # # # # if image_record:
211
+ # # # # return image_record.data, 200, {'Content-Type': 'image/png'}
212
+ # # # # return jsonify({'error': 'Image not found'}), 404
213
+
214
+ # # # # def handle_requests():
215
+ # # # # global current_container_index
216
+ # # # # while True:
217
+ # # # # request_id, data = request_queue.get()
218
+ # # # # if data is None:
219
+ # # # # break
220
+
221
+ # # # # with lock:
222
+ # # # # container_name, port = containers[current_container_index]
223
+ # # # # current_container_index = (current_container_index + 1) % len(containers)
224
+
225
+ # # # # try:
226
+ # # # # if 'query_string' in data:
227
+ # # # # # GET 请求处理
228
+ # # # # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT)
229
+ # # # # else:
230
+ # # # # # POST 请求处理
231
+ # # # # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT)
232
+ # # # # output = response.json()
233
+ # # # # except requests.exceptions.Timeout:
234
+ # # # # output = {'error': 'Code execution timed out'}
235
+ # # # # reset_container(container_name, port)
236
+ # # # # except Exception as e:
237
+ # # # # output = {'error': str(e)}
238
+
239
+ # # # # result_dict[request_id] = output # 将结果放入结果字典
240
+
241
+ # # # # request_queue.task_done()
242
+ # # # # print(f"Finished processing request {request_id}: {output}")
243
+
244
+ # # # # def reset_container(container_name, port):
245
+ # # # # print(f"Resetting container {container_name}")
246
+ # # # # stop_container(container_name)
247
+ # # # # new_container_name, _ = start_container(port)
248
+ # # # # with lock:
249
+ # # # # for i, (name, p) in enumerate(containers):
250
+ # # # # if name == container_name:
251
+ # # # # containers[i] = (new_container_name, port)
252
+ # # # # break
253
+
254
+ # # # # def signal_handler(signal, frame):
255
+ # # # # print('Stopping containers and exiting program...')
256
+ # # # # stop_containers()
257
+ # # # # if session:
258
+ # # # # session.close()
259
+ # # # # engine.dispose()
260
+ # # # # os._exit(0)
261
+
262
+ # # # # signal.signal(signal.SIGINT, signal_handler)
263
+
264
+ # # # # worker_thread = threading.Thread(target=handle_requests)
265
+ # # # # worker_thread.start()
266
+
267
+ # # # # start_containers()
268
+ # # # # atexit.register(stop_containers)
269
+
270
+ # # # # if __name__ == '__main__':
271
+ # # # # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口
272
+
273
+
274
+
275
+ # # # import yaml
276
+ # # # import subprocess
277
+ # # # import uuid
278
+ # # # from flask import Flask, request, jsonify
279
+ # # # from queue import Queue
280
+ # # # import requests
281
+ # # # import threading
282
+ # # # import time
283
+ # # # import os
284
+ # # # import atexit
285
+ # # # import base64
286
+ # # # import signal
287
+ # # # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer
288
+ # # # from sqlalchemy.orm import declarative_base, sessionmaker
289
+ # # # from sqlalchemy.exc import OperationalError, IntegrityError
290
+ # # # import psycopg2
291
+
292
+ # # # app = Flask(__name__)
293
+
294
+ # # # # 读取配置文件,指定编码为 utf-8
295
+ # # # with open('config.yaml', 'r', encoding='utf-8') as file:
296
+ # # # config = yaml.safe_load(file)
297
+
298
+ # # # DOMAIN = config['domain']
299
+ # # # INTERPRETER_IMAGE = config['interpreter_image']
300
+ # # # PORT_START = config['interpreter_port_range']['start']
301
+ # # # PORT_END = config['interpreter_port_range']['end']
302
+ # # # DEPENDENCIES = config['dependencies']
303
+ # # # RESOURCE_LIMITS = config.get('resource_limits', {})
304
+ # # # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory')
305
+ # # # CPU_LIMIT = RESOURCE_LIMITS.get('cpus')
306
+ # # # POSTGRES = config['postgres']
307
+ # # # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒
308
+ # # # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口
309
+
310
+ # # # # 容器管理
311
+ # # # containers = []
312
+ # # # ports = list(range(PORT_START, PORT_END + 1))
313
+ # # # lock = threading.Lock()
314
+ # # # request_queue = Queue()
315
+ # # # result_dict = {} # 用于存储结果的字典
316
+ # # # current_container_index = 0
317
+
318
+ # # # # 检查 PostgreSQL 容器是否存在
319
+ # # # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter"
320
+ # # # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata')
321
+
322
+ # # # def is_postgres_container_running():
323
+ # # # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True)
324
+ # # # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n')
325
+
326
+ # # # def wait_for_postgres_ready():
327
+ # # # retry_attempts = 10
328
+ # # # retry_delay = 5 # seconds
329
+ # # # for attempt in range(retry_attempts):
330
+ # # # try:
331
+ # # # conn = psycopg2.connect(
332
+ # # # dbname=POSTGRES['db'],
333
+ # # # user=POSTGRES['user'],
334
+ # # # password=POSTGRES['password'],
335
+ # # # host=POSTGRES['host'],
336
+ # # # port=POSTGRES['port']
337
+ # # # )
338
+ # # # conn.close()
339
+ # # # print("PostgreSQL is ready")
340
+ # # # return True
341
+ # # # except psycopg2.OperationalError as e:
342
+ # # # print(f"Waiting for PostgreSQL to be ready: {e}")
343
+ # # # time.sleep(retry_delay)
344
+ # # # return False
345
+
346
+ # # # if not is_postgres_container_running():
347
+ # # # if not os.path.exists(POSTGRES_DATA_DIR):
348
+ # # # os.makedirs(POSTGRES_DATA_DIR)
349
+
350
+ # # # subprocess.run([
351
+ # # # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d",
352
+ # # # "-e", f"POSTGRES_USER={POSTGRES['user']}",
353
+ # # # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}",
354
+ # # # "-e", f"POSTGRES_DB={POSTGRES['db']}",
355
+ # # # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data",
356
+ # # # "-p", f"{POSTGRES['port']}:5432",
357
+ # # # "postgres"
358
+ # # # ], check=True)
359
+
360
+ # # # if not wait_for_postgres_ready():
361
+ # # # print("Failed to connect to PostgreSQL after multiple attempts.")
362
+ # # # exit(1)
363
+
364
+ # # # # PostgreSQL 配置
365
+ # # # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}"
366
+ # # # engine = None
367
+ # # # Base = declarative_base()
368
+ # # # Session = None
369
+ # # # session = None
370
+
371
+ # # # def init_db():
372
+ # # # global engine, Session, session
373
+ # # # retry_attempts = 5
374
+ # # # retry_delay = 5 # seconds
375
+ # # # for attempt in range(retry_attempts):
376
+ # # # try:
377
+ # # # engine = create_engine(DATABASE_URL)
378
+ # # # Session = sessionmaker(bind=engine)
379
+ # # # session = Session()
380
+ # # # Base.metadata.create_all(engine) # 确保数据库模式已创建
381
+ # # # print("Database connection successful")
382
+ # # # break
383
+ # # # except OperationalError as e:
384
+ # # # print(f"Database connection failed: {e}")
385
+ # # # if attempt < retry_attempts - 1:
386
+ # # # print(f"Retrying in {retry_delay} seconds...")
387
+ # # # time.sleep(retry_delay)
388
+ # # # else:
389
+ # # # print("Failed to connect to the database after multiple attempts.")
390
+ # # # raise
391
+
392
+ # # # class Image(Base):
393
+ # # # __tablename__ = 'images'
394
+ # # # id = Column(Integer, primary_key=True, autoincrement=True)
395
+ # # # filename = Column(String, unique=True, nullable=False)
396
+ # # # data = Column(LargeBinary, nullable=False)
397
+
398
+ # # # init_db()
399
+
400
+ # # # # 启动代码解释器容器
401
+ # # # def start_container(port):
402
+ # # # container_name = f"code_interpreter_docker_{uuid.uuid4()}"
403
+ # # # run_command = [
404
+ # # # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000"
405
+ # # # ]
406
+ # # # if MEMORY_LIMIT:
407
+ # # # run_command.extend(["--memory", MEMORY_LIMIT])
408
+ # # # if CPU_LIMIT:
409
+ # # # run_command.extend(["--cpus", CPU_LIMIT])
410
+ # # # run_command.append(INTERPRETER_IMAGE)
411
+
412
+ # # # subprocess.run(run_command, check=True)
413
+ # # # print(f"Started container {container_name} on port {port}")
414
+ # # # return container_name, port
415
+
416
+ # # # def stop_container(container_name):
417
+ # # # subprocess.run(["docker", "stop", container_name])
418
+ # # # subprocess.run(["docker", "rm", container_name])
419
+ # # # print(f"Stopped and removed container {container_name}")
420
+
421
+ # # # def start_containers():
422
+ # # # for port in ports:
423
+ # # # container_name, port = start_container(port)
424
+ # # # containers.append((container_name, port))
425
+
426
+ # # # def stop_containers():
427
+ # # # for container_name, _ in containers:
428
+ # # # stop_container(container_name)
429
+
430
+ # # # @app.route('/runcode', methods=['POST', 'GET'])
431
+ # # # def run_code():
432
+ # # # if request.method == 'POST':
433
+ # # # data = request.get_json()
434
+ # # # if not data:
435
+ # # # return jsonify({'error': 'Invalid JSON'}), 400
436
+ # # # elif request.method == 'GET':
437
+ # # # query_string = request.query_string.decode('utf-8')
438
+ # # # data = {'query_string': query_string}
439
+ # # # else:
440
+ # # # return jsonify({'error': 'Invalid request method'}), 405
441
+
442
+ # # # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID
443
+ # # # request_queue.put((request_id, data))
444
+
445
+ # # # start_time = time.time()
446
+ # # # while time.time() - start_time < 10: # 最多等待10秒
447
+ # # # if request_id in result_dict:
448
+ # # # output = result_dict.pop(request_id)
449
+ # # # # 处理返回结果,替换 base64 数据为链接
450
+ # # # if 'images' in output:
451
+ # # # try:
452
+ # # # output = process_images(output)
453
+ # # # except Exception as e:
454
+ # # # return jsonify({'error': str(e)}), 500
455
+ # # # return jsonify(output), 200
456
+
457
+ # # # time.sleep(0.1)
458
+
459
+ # # # return jsonify({'error': 'Request timed out'}), 504
460
+
461
+ # # # def process_images(output):
462
+ # # # images = output['images']
463
+ # # # for filename, base64_data in images.items():
464
+ # # # image_data = base64.b64decode(base64_data)
465
+ # # # unique_filename = f"{uuid.uuid4()}_{filename}"
466
+ # # # image_record = Image(filename=unique_filename, data=image_data)
467
+ # # # try:
468
+ # # # session.add(image_record)
469
+ # # # session.commit()
470
+ # # # except IntegrityError as e:
471
+ # # # session.rollback()
472
+ # # # print(f"Database error: {e}")
473
+ # # # continue
474
+ # # # if DOMAIN:
475
+ # # # images[filename] = f"https://{DOMAIN}/image/{unique_filename}"
476
+ # # # else:
477
+ # # # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}"
478
+ # # # return output
479
+
480
+
481
+ # # # @app.route('/image/<filename>', methods=['GET'])
482
+ # # # def serve_image(filename):
483
+ # # # image_record = session.query(Image).filter_by(filename=filename).first()
484
+ # # # if image_record:
485
+ # # # return image_record.data, 200, {'Content-Type': 'image/png'}
486
+ # # # return jsonify({'error': 'Image not found'}), 404
487
+
488
+ # # # def handle_requests():
489
+ # # # global current_container_index
490
+ # # # while True:
491
+ # # # request_id, data = request_queue.get()
492
+ # # # if data is None:
493
+ # # # break
494
+
495
+ # # # with lock:
496
+ # # # container_name, port = containers[current_container_index]
497
+ # # # current_container_index = (current_container_index + 1) % len(containers)
498
+
499
+ # # # try:
500
+ # # # if 'query_string' in data:
501
+ # # # # GET 请求处理
502
+ # # # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT)
503
+ # # # else:
504
+ # # # # POST 请求处理
505
+ # # # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT)
506
+ # # # output = response.json()
507
+ # # # except requests.exceptions.Timeout:
508
+ # # # output = {'error': 'Code execution timed out'}
509
+ # # # reset_container(container_name, port)
510
+ # # # except Exception as e:
511
+ # # # output = {'error': str(e)}
512
+
513
+ # # # result_dict[request_id] = output # 将结果放入结果字典
514
+
515
+ # # # request_queue.task_done()
516
+ # # # print(f"Finished processing request {request_id}: {output}")
517
+
518
+ # # # # Reset container after each request
519
+ # # # reset_container(container_name, port)
520
+
521
+ # # # def reset_container(container_name, port):
522
+ # # # print(f"Resetting container {container_name}")
523
+ # # # stop_container(container_name)
524
+ # # # new_container_name, _ = start_container(port)
525
+ # # # with lock:
526
+ # # # for i, (name, p) in enumerate(containers):
527
+ # # # if name == container_name:
528
+ # # # containers[i] = (new_container_name, port)
529
+ # # # break
530
+
531
+ # # # def signal_handler(signal, frame):
532
+ # # # print('Stopping containers and exiting program...')
533
+ # # # stop_containers()
534
+ # # # if session:
535
+ # # # session.close()
536
+ # # # engine.dispose()
537
+ # # # os._exit(0)
538
+
539
+ # # # signal.signal(signal.SIGINT, signal_handler)
540
+
541
+ # # # worker_thread = threading.Thread(target=handle_requests)
542
+ # # # worker_thread.start()
543
+
544
+ # # # start_containers()
545
+ # # # atexit.register(stop_containers)
546
+
547
+ # # # if __name__ == '__main__':
548
+ # # # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口
549
+
550
+
551
+
552
+ # # import yaml
553
+ # # import subprocess
554
+ # # import uuid
555
+ # # from flask import Flask, request, jsonify
556
+ # # from queue import Queue
557
+ # # import requests
558
+ # # import threading
559
+ # # import time
560
+ # # import os
561
+ # # import atexit
562
+ # # import base64
563
+ # # import signal
564
+ # # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer
565
+ # # from sqlalchemy.orm import declarative_base, sessionmaker
566
+ # # from sqlalchemy.exc import OperationalError, IntegrityError
567
+ # # import psycopg2
568
+
569
+ # # app = Flask(__name__)
570
+
571
+ # # # 读取配置文件,指定编码为 utf-8
572
+ # # with open('config.yaml', 'r', encoding='utf-8') as file:
573
+ # # config = yaml.safe_load(file)
574
+
575
+ # # DOMAIN = config['domain']
576
+ # # INTERPRETER_IMAGE = config['interpreter_image']
577
+ # # PORT_START = config['interpreter_port_range']['start']
578
+ # # PORT_END = config['interpreter_port_range']['end']
579
+ # # DEPENDENCIES = config['dependencies']
580
+ # # RESOURCE_LIMITS = config.get('resource_limits', {})
581
+ # # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory')
582
+ # # CPU_LIMIT = RESOURCE_LIMITS.get('cpus')
583
+ # # POSTGRES = config['postgres']
584
+ # # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒
585
+ # # TIMEOUT_SECONDS = config.get('timeout_seconds', 60) # 从配置中读取超时时间
586
+ # # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口
587
+
588
+ # # # 容器管理
589
+ # # containers = []
590
+ # # ports = list(range(PORT_START, PORT_END + 1))
591
+ # # lock = threading.Lock()
592
+ # # request_queue = Queue()
593
+ # # result_dict = {} # 用于存储结果的字典
594
+ # # current_container_index = 0
595
+ # # semaphore = threading.Semaphore(len(ports)) # 控制并发请求的信号量
596
+
597
+ # # # 检查 PostgreSQL 容器是否存在
598
+ # # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter"
599
+ # # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata')
600
+
601
+ # # def is_postgres_container_running():
602
+ # # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True)
603
+ # # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n')
604
+
605
+ # # def wait_for_postgres_ready():
606
+ # # retry_attempts = 10
607
+ # # retry_delay = 5 # seconds
608
+ # # for attempt in range(retry_attempts):
609
+ # # try:
610
+ # # conn = psycopg2.connect(
611
+ # # dbname=POSTGRES['db'],
612
+ # # user=POSTGRES['user'],
613
+ # # password=POSTGRES['password'],
614
+ # # host=POSTGRES['host'],
615
+ # # port=POSTGRES['port']
616
+ # # )
617
+ # # conn.close()
618
+ # # print("PostgreSQL is ready")
619
+ # # return True
620
+ # # except psycopg2.OperationalError as e:
621
+ # # print(f"Waiting for PostgreSQL to be ready: {e}")
622
+ # # time.sleep(retry_delay)
623
+ # # return False
624
+
625
+ # # if not is_postgres_container_running():
626
+ # # if not os.path.exists(POSTGRES_DATA_DIR):
627
+ # # os.makedirs(POSTGRES_DATA_DIR)
628
+
629
+ # # subprocess.run([
630
+ # # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d",
631
+ # # "-e", f"POSTGRES_USER={POSTGRES['user']}",
632
+ # # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}",
633
+ # # "-e", f"POSTGRES_DB={POSTGRES['db']}",
634
+ # # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data",
635
+ # # "-p", f"{POSTGRES['port']}:5432",
636
+ # # "postgres"
637
+ # # ], check=True)
638
+
639
+ # # if not wait_for_postgres_ready():
640
+ # # print("Failed to connect to PostgreSQL after multiple attempts.")
641
+ # # exit(1)
642
+
643
+ # # # PostgreSQL 配置
644
+ # # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}"
645
+ # # engine = None
646
+ # # Base = declarative_base()
647
+ # # Session = None
648
+ # # session = None
649
+
650
+ # # def init_db():
651
+ # # global engine, Session, session
652
+ # # retry_attempts = 5
653
+ # # retry_delay = 5 # seconds
654
+ # # for attempt in range(retry_attempts):
655
+ # # try:
656
+ # # engine = create_engine(DATABASE_URL)
657
+ # # Session = sessionmaker(bind=engine)
658
+ # # session = Session()
659
+ # # Base.metadata.create_all(engine) # 确保数据库模式已创建
660
+ # # print("Database connection successful")
661
+ # # break
662
+ # # except OperationalError as e:
663
+ # # print(f"Database connection failed: {e}")
664
+ # # if attempt < retry_attempts - 1:
665
+ # # print(f"Retrying in {retry_delay} seconds...")
666
+ # # time.sleep(retry_delay)
667
+ # # else:
668
+ # # print("Failed to connect to the database after multiple attempts.")
669
+ # # raise
670
+
671
+ # # class Image(Base):
672
+ # # __tablename__ = 'images'
673
+ # # id = Column(Integer, primary_key=True, autoincrement=True)
674
+ # # filename = Column(String, unique=True, nullable=False)
675
+ # # data = Column(LargeBinary, nullable=False)
676
+
677
+ # # init_db()
678
+
679
+ # # # 启动代码解释器容器
680
+ # # def start_container(port):
681
+ # # container_name = f"code_interpreter_docker_{uuid.uuid4()}"
682
+ # # run_command = [
683
+ # # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000"
684
+ # # ]
685
+ # # if MEMORY_LIMIT:
686
+ # # run_command.extend(["--memory", MEMORY_LIMIT])
687
+ # # if CPU_LIMIT:
688
+ # # run_command.extend(["--cpus", CPU_LIMIT])
689
+ # # run_command.append(INTERPRETER_IMAGE)
690
+
691
+ # # subprocess.run(run_command, check=True)
692
+ # # print(f"Started container {container_name} on port {port}")
693
+ # # return container_name, port
694
+
695
+ # # def stop_container(container_name):
696
+ # # subprocess.run(["docker", "stop", container_name])
697
+ # # subprocess.run(["docker", "rm", container_name])
698
+ # # print(f"Stopped and removed container {container_name}")
699
+
700
+ # # def start_containers():
701
+ # # for port in ports:
702
+ # # container_name, port = start_container(port)
703
+ # # containers.append((container_name, port))
704
+
705
+ # # def stop_containers():
706
+ # # for container_name, _ in containers:
707
+ # # stop_container(container_name)
708
+
709
+ # # @app.route('/runcode', methods=['POST', 'GET'])
710
+ # # def run_code():
711
+ # # if request.method == 'POST':
712
+ # # data = request.get_json()
713
+ # # if not data:
714
+ # # return jsonify({'error': 'Invalid JSON'}), 400
715
+ # # elif request.method == 'GET':
716
+ # # query_string = request.query_string.decode('utf-8')
717
+ # # data = {'query_string': query_string}
718
+ # # else:
719
+ # # return jsonify({'error': 'Invalid request method'}), 405
720
+
721
+ # # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID
722
+ # # request_queue.put((request_id, data))
723
+
724
+ # # start_time = time.time()
725
+ # # while time.time() - start_time < 10: # 最多等待10秒
726
+ # # if request_id in result_dict:
727
+ # # output = result_dict.pop(request_id)
728
+ # # # 处理返回结果,替换 base64 数据为链接
729
+ # # if 'images' in output:
730
+ # # try:
731
+ # # output = process_images(output)
732
+ # # except Exception as e:
733
+ # # return jsonify({'error': str(e)}), 500
734
+ # # return jsonify(output), 200
735
+
736
+ # # time.sleep(0.1)
737
+
738
+ # # return jsonify({'error': 'Request timed out'}), 504
739
+
740
+ # # def process_images(output):
741
+ # # images = output['images']
742
+ # # for filename, base64_data in images.items():
743
+ # # image_data = base64.b64decode(base64_data)
744
+ # # unique_filename = f"{uuid.uuid4()}_{filename}"
745
+ # # image_record = Image(filename=unique_filename, data=image_data)
746
+ # # try:
747
+ # # session.add(image_record)
748
+ # # session.commit()
749
+ # # except IntegrityError as e:
750
+ # # session.rollback()
751
+ # # print(f"Database error: {e}")
752
+ # # continue
753
+ # # if DOMAIN:
754
+ # # images[filename] = f"https://{DOMAIN}/image/{unique_filename}"
755
+ # # else:
756
+ # # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}"
757
+ # # return output
758
+
759
+ # # @app.route('/image/<filename>', methods=['GET'])
760
+ # # def serve_image(filename):
761
+ # # image_record = session.query(Image).filter_by(filename=filename).first()
762
+ # # if image_record:
763
+ # # return image_record.data, 200, {'Content-Type': 'image/png'}
764
+ # # return jsonify({'error': 'Image not found'}), 404
765
+
766
+ # # def handle_requests():
767
+ # # global current_container_index
768
+ # # while True:
769
+ # # request_id, data = request_queue.get()
770
+ # # if data is None:
771
+ # # break
772
+
773
+ # # semaphore.acquire() # 获取信号量,确保不超过并发限制
774
+
775
+ # # with lock:
776
+ # # container_name, port = containers[current_container_index]
777
+ # # current_container_index = (current_container_index + 1) % len(containers)
778
+
779
+ # # try:
780
+ # # if 'query_string' in data:
781
+ # # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT_SECONDS)
782
+ # # else:
783
+ # # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT_SECONDS)
784
+ # # output = response.json()
785
+ # # except requests.exceptions.Timeout:
786
+ # # output = {'error': 'Code execution timed out'}
787
+ # # except Exception as e:
788
+ # # output = {'error': str(e)}
789
+
790
+ # # result_dict[request_id] = output # 将结果放入结果字典
791
+
792
+ # # request_queue.task_done()
793
+ # # print(f"Finished processing request {request_id}: {output}")
794
+
795
+ # # # 异步重置容器
796
+ # # reset_container(container_name, port)
797
+
798
+ # # def reset_container(container_name, port):
799
+ # # threading.Thread(target=_reset_container, args=(container_name, port)).start()
800
+
801
+ # # def _reset_container(container_name, port):
802
+ # # print(f"Resetting container {container_name}")
803
+ # # stop_container(container_name)
804
+ # # new_container_name, _ = start_container(port)
805
+ # # with lock:
806
+ # # for i, (name, p) in enumerate(containers):
807
+ # # if name == container_name:
808
+ # # containers[i] = (new_container_name, port)
809
+ # # break
810
+ # # semaphore.release() # 释放信号量
811
+
812
+ # # def signal_handler(signal, frame):
813
+ # # print('Stopping containers and exiting program...')
814
+ # # stop_containers()
815
+ # # if session:
816
+ # # session.close()
817
+ # # engine.dispose()
818
+ # # os._exit(0)
819
+
820
+ # # signal.signal(signal.SIGINT, signal_handler)
821
+
822
+ # # worker_thread = threading.Thread(target=handle_requests)
823
+ # # worker_thread.start()
824
+
825
+ # # start_containers()
826
+ # # atexit.register(stop_containers)
827
+
828
+ # # if __name__ == '__main__':
829
+ # # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口
830
+
831
+
832
+
833
+ # import yaml
834
+ # import subprocess
835
+ # import uuid
836
+ # from flask import Flask, request, jsonify
837
+ # from queue import Queue
838
+ # import requests
839
+ # import threading
840
+ # import time
841
+ # import os
842
+ # import atexit
843
+ # import base64
844
+ # import signal
845
+ # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer
846
+ # from sqlalchemy.orm import declarative_base, sessionmaker
847
+ # from sqlalchemy.exc import OperationalError, IntegrityError
848
+ # import psycopg2
849
+
850
+ # app = Flask(__name__)
851
+
852
+ # # 读取配置文件,指定编码为 utf-8
853
+ # with open('config.yaml', 'r', encoding='utf-8') as file:
854
+ # config = yaml.safe_load(file)
855
+
856
+ # DOMAIN = config['domain']
857
+ # INTERPRETER_IMAGE = config['interpreter_image']
858
+ # PORT_START = config['interpreter_port_range']['start']
859
+ # PORT_END = config['interpreter_port_range']['end']
860
+ # DEPENDENCIES = config['dependencies']
861
+ # RESOURCE_LIMITS = config.get('resource_limits', {})
862
+ # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory')
863
+ # CPU_LIMIT = RESOURCE_LIMITS.get('cpus')
864
+ # POSTGRES = config['postgres']
865
+ # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒
866
+ # TIMEOUT_SECONDS = config.get('timeout_seconds', 60) # 从配置中读取超时时间
867
+ # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口
868
+
869
+ # # 容器管理
870
+ # containers = []
871
+ # ports = list(range(PORT_START, PORT_END + 1))
872
+ # lock = threading.Lock()
873
+ # request_queue = Queue()
874
+ # result_dict = {} # 用于存储结果的字典
875
+ # current_container_index = 0
876
+ # semaphore = threading.Semaphore(len(ports)) # 控制并发请求的信号量
877
+
878
+ # # 检查 PostgreSQL 容器是否存在
879
+ # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter"
880
+ # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata')
881
+
882
+ # def is_postgres_container_running():
883
+ # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True)
884
+ # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n')
885
+
886
+ # def wait_for_postgres_ready():
887
+ # retry_attempts = 10
888
+ # retry_delay = 5 # seconds
889
+ # for attempt in range(retry_attempts):
890
+ # try:
891
+ # conn = psycopg2.connect(
892
+ # dbname=POSTGRES['db'],
893
+ # user=POSTGRES['user'],
894
+ # password=POSTGRES['password'],
895
+ # host=POSTGRES['host'],
896
+ # port=POSTGRES['port']
897
+ # )
898
+ # conn.close()
899
+ # print("PostgreSQL is ready")
900
+ # return True
901
+ # except psycopg2.OperationalError as e:
902
+ # print(f"Waiting for PostgreSQL to be ready: {e}")
903
+ # time.sleep(retry_delay)
904
+ # return False
905
+
906
+ # if not is_postgres_container_running():
907
+ # if not os.path.exists(POSTGRES_DATA_DIR):
908
+ # os.makedirs(POSTGRES_DATA_DIR)
909
+
910
+ # subprocess.run([
911
+ # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d",
912
+ # "-e", f"POSTGRES_USER={POSTGRES['user']}",
913
+ # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}",
914
+ # "-e", f"POSTGRES_DB={POSTGRES['db']}",
915
+ # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data",
916
+ # "-p", f"{POSTGRES['port']}:5432",
917
+ # "postgres"
918
+ # ], check=True)
919
+
920
+ # if not wait_for_postgres_ready():
921
+ # print("Failed to connect to PostgreSQL after multiple attempts.")
922
+ # exit(1)
923
+
924
+ # # PostgreSQL 配置
925
+ # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}"
926
+ # engine = None
927
+ # Base = declarative_base()
928
+ # Session = None
929
+ # session = None
930
+
931
+ # def init_db():
932
+ # global engine, Session, session
933
+ # retry_attempts = 5
934
+ # retry_delay = 5 # seconds
935
+ # for attempt in range(retry_attempts):
936
+ # try:
937
+ # engine = create_engine(DATABASE_URL)
938
+ # Session = sessionmaker(bind=engine)
939
+ # session = Session()
940
+ # Base.metadata.create_all(engine) # 确保数据库模式已创建
941
+ # print("Database connection successful")
942
+ # break
943
+ # except OperationalError as e:
944
+ # print(f"Database connection failed: {e}")
945
+ # if attempt < retry_attempts - 1:
946
+ # print(f"Retrying in {retry_delay} seconds...")
947
+ # time.sleep(retry_delay)
948
+ # else:
949
+ # print("Failed to connect to the database after multiple attempts.")
950
+ # raise
951
+
952
+ # class Image(Base):
953
+ # __tablename__ = 'images'
954
+ # id = Column(Integer, primary_key=True, autoincrement=True)
955
+ # filename = Column(String, unique=True, nullable=False)
956
+ # data = Column(LargeBinary, nullable=False)
957
+
958
+ # init_db()
959
+
960
+ # # 启动代码解释器容器
961
+ # def start_container(port):
962
+ # container_name = f"code_interpreter_docker_{uuid.uuid4()}"
963
+ # run_command = [
964
+ # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000"
965
+ # ]
966
+ # if MEMORY_LIMIT:
967
+ # run_command.extend(["--memory", MEMORY_LIMIT])
968
+ # if CPU_LIMIT:
969
+ # run_command.extend(["--cpus", CPU_LIMIT])
970
+ # run_command.append(INTERPRETER_IMAGE)
971
+
972
+ # subprocess.run(run_command, check=True)
973
+ # print(f"Started container {container_name} on port {port}")
974
+ # return container_name, port
975
+
976
+ # def stop_container(container_name):
977
+ # subprocess.run(["docker", "stop", container_name])
978
+ # subprocess.run(["docker", "rm", container_name])
979
+ # print(f"Stopped and removed container {container_name}")
980
+
981
+ # def start_containers():
982
+ # for port in ports:
983
+ # container_name, port = start_container(port)
984
+ # containers.append((container_name, port))
985
+
986
+ # def stop_containers():
987
+ # for container_name, _ in containers:
988
+ # stop_container(container_name)
989
+
990
+ # @app.route('/runcode', methods=['POST', 'GET'])
991
+ # def run_code():
992
+ # if request.method == 'POST':
993
+ # data = request.get_json()
994
+ # if not data:
995
+ # return jsonify({'error': 'Invalid JSON'}), 400
996
+ # elif request.method == 'GET':
997
+ # query_string = request.query_string.decode('utf-8')
998
+ # data = {'query_string': query_string}
999
+ # else:
1000
+ # return jsonify({'error': 'Invalid request method'}), 405
1001
+
1002
+ # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID
1003
+ # request_queue.put((request_id, data))
1004
+
1005
+ # start_time = time.time()
1006
+ # while time.time() - start_time < 10: # 最多等待10秒
1007
+ # if request_id in result_dict:
1008
+ # output = result_dict.pop(request_id)
1009
+ # # 处理返回结果,替换 base64 数据为链接
1010
+ # if 'images' in output:
1011
+ # try:
1012
+ # output = process_images(output)
1013
+ # except Exception as e:
1014
+ # return jsonify({'error': str(e)}), 500
1015
+ # return jsonify(output), 200
1016
+
1017
+ # time.sleep(0.1)
1018
+
1019
+ # return jsonify({'error': 'Request timed out'}), 504
1020
+
1021
+ # def process_images(output):
1022
+ # images = output['images']
1023
+ # for filename, base64_data in images.items():
1024
+ # image_data = base64.b64decode(base64_data)
1025
+ # unique_filename = f"{uuid.uuid4()}_{filename}"
1026
+ # image_record = Image(filename=unique_filename, data=image_data)
1027
+ # try:
1028
+ # session.add(image_record)
1029
+ # session.commit()
1030
+ # except IntegrityError as e:
1031
+ # session.rollback()
1032
+ # print(f"Database error: {e}")
1033
+ # continue
1034
+ # if DOMAIN:
1035
+ # images[filename] = f"https://{DOMAIN}/image/{unique_filename}"
1036
+ # else:
1037
+ # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}"
1038
+ # return output
1039
+
1040
+ # @app.route('/image/<filename>', methods=['GET'])
1041
+ # def serve_image(filename):
1042
+ # image_record = session.query(Image).filter_by(filename=filename).first()
1043
+ # if image_record:
1044
+ # return image_record.data, 200, {'Content-Type': 'image/png'}
1045
+ # return jsonify({'error': 'Image not found'}), 404
1046
+
1047
+ # def handle_requests():
1048
+ # global current_container_index
1049
+ # while True:
1050
+ # request_id, data = request_queue.get()
1051
+ # if data is None:
1052
+ # break
1053
+
1054
+ # semaphore.acquire() # 获取信号量,确保不超过并发限制
1055
+
1056
+ # with lock:
1057
+ # container_name, port = containers[current_container_index]
1058
+ # current_container_index = (current_container_index + 1) % len(containers)
1059
+
1060
+ # try:
1061
+ # if 'query_string' in data:
1062
+ # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT_SECONDS)
1063
+ # else:
1064
+ # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT_SECONDS)
1065
+ # output = response.json()
1066
+ # except requests.exceptions.Timeout:
1067
+ # output = {'error': 'Code execution timed out'}
1068
+ # except Exception as e:
1069
+ # output = {'error': str(e)}
1070
+
1071
+ # result_dict[request_id] = output # 将结果放入结果字典
1072
+
1073
+ # request_queue.task_done()
1074
+ # print(f"Finished processing request {request_id}: {output}")
1075
+
1076
+ # # 异步重置容器
1077
+ # reset_container(container_name, port)
1078
+
1079
+ # def reset_container(container_name, port):
1080
+ # threading.Thread(target=_reset_container, args=(container_name, port)).start()
1081
+
1082
+ # def _reset_container(container_name, port):
1083
+ # print(f"Resetting container {container_name}")
1084
+ # stop_container(container_name)
1085
+ # new_container_name, _ = start_container(port)
1086
+ # with lock:
1087
+ # for i, (name, p) in enumerate(containers):
1088
+ # if name == container_name:
1089
+ # containers[i] = (new_container_name, port)
1090
+ # break
1091
+ # semaphore.release() # 释放信号量
1092
+
1093
+ # def signal_handler(signal, frame):
1094
+ # print('Stopping containers and exiting program...')
1095
+ # stop_containers()
1096
+ # if session:
1097
+ # session.close()
1098
+ # engine.dispose()
1099
+ # os._exit(0)
1100
+
1101
+ # signal.signal(signal.SIGINT, signal_handler)
1102
+
1103
+ # # 启动多个工作线程来处理请求
1104
+ # for _ in range(len(ports)):
1105
+ # threading.Thread(target=handle_requests).start()
1106
+
1107
+ # start_containers()
1108
+ # atexit.register(stop_containers)
1109
+
1110
+ # if __name__ == '__main__':
1111
+ # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口
1112
+
1113
+
1114
+ import yaml
1115
+ import subprocess
1116
+ import uuid
1117
+ from flask import Flask, request, jsonify
1118
+ from queue import Queue
1119
+ import requests
1120
+ import threading
1121
+ import time
1122
+ import os
1123
+ import atexit
1124
+ import base64
1125
+ import signal
1126
+ from sqlalchemy import create_engine, Column, String, LargeBinary, Integer
1127
+ from sqlalchemy.orm import declarative_base, sessionmaker
1128
+ from sqlalchemy.exc import OperationalError, IntegrityError
1129
+ import psycopg2
1130
+
1131
+ app = Flask(__name__)
1132
+
1133
+ # 读取配置文件,指定编码为 utf-8
1134
+ with open('config.yaml', 'r', encoding='utf-8') as file:
1135
+ config = yaml.safe_load(file)
1136
+
1137
+ DOMAIN = config['domain']
1138
+ INTERPRETER_IMAGE = config['interpreter_image']
1139
+ PORT_START = config['interpreter_port_range']['start']
1140
+ PORT_END = config['interpreter_port_range']['end']
1141
+ DEPENDENCIES = config['dependencies']
1142
+ RESOURCE_LIMITS = config.get('resource_limits', {})
1143
+ MEMORY_LIMIT = RESOURCE_LIMITS.get('memory')
1144
+ CPU_LIMIT = RESOURCE_LIMITS.get('cpus')
1145
+ POSTGRES = config['postgres']
1146
+ TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒
1147
+ TIMEOUT_SECONDS = config.get('timeout_seconds', 60) # 从配置中读取超时时间
1148
+ SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口
1149
+
1150
+ # 容器管理
1151
+ containers = []
1152
+ ports = list(range(PORT_START, PORT_END + 1))
1153
+ lock = threading.Lock()
1154
+ request_queue = Queue()
1155
+ result_dict = {} # 用于存储结果的字典
1156
+ current_container_index = 0
1157
+ semaphore = threading.Semaphore(len(ports)) # 控制并发请求的信号量
1158
+
1159
+ # 检查 PostgreSQL 容器是否存在
1160
+ POSTGRES_CONTAINER_NAME = "postgres_code_interpreter"
1161
+ POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata')
1162
+
1163
+ def is_postgres_container_running():
1164
+ result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True)
1165
+ return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n')
1166
+
1167
+ def wait_for_postgres_ready():
1168
+ retry_attempts = 10
1169
+ retry_delay = 5 # seconds
1170
+ for attempt in range(retry_attempts):
1171
+ try:
1172
+ conn = psycopg2.connect(
1173
+ dbname=POSTGRES['db'],
1174
+ user=POSTGRES['user'],
1175
+ password=POSTGRES['password'],
1176
+ host=POSTGRES['host'],
1177
+ port=POSTGRES['port']
1178
+ )
1179
+ conn.close()
1180
+ print("PostgreSQL is ready")
1181
+ return True
1182
+ except psycopg2.OperationalError as e:
1183
+ print(f"Waiting for PostgreSQL to be ready: {e}")
1184
+ time.sleep(retry_delay)
1185
+ return False
1186
+
1187
+ if not is_postgres_container_running():
1188
+ if not os.path.exists(POSTGRES_DATA_DIR):
1189
+ os.makedirs(POSTGRES_DATA_DIR)
1190
+
1191
+ subprocess.run([
1192
+ "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d",
1193
+ "-e", f"POSTGRES_USER={POSTGRES['user']}",
1194
+ "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}",
1195
+ "-e", f"POSTGRES_DB={POSTGRES['db']}",
1196
+ "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data",
1197
+ "-p", f"{POSTGRES['port']}:5432",
1198
+ "postgres"
1199
+ ], check=True)
1200
+
1201
+ if not wait_for_postgres_ready():
1202
+ print("Failed to connect to PostgreSQL after multiple attempts.")
1203
+ exit(1)
1204
+
1205
+ # PostgreSQL 配置
1206
+ DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}"
1207
+ engine = None
1208
+ Base = declarative_base()
1209
+ Session = None
1210
+ session = None
1211
+
1212
+ def init_db():
1213
+ global engine, Session, session
1214
+ retry_attempts = 5
1215
+ retry_delay = 5 # seconds
1216
+ for attempt in range(retry_attempts):
1217
+ try:
1218
+ engine = create_engine(DATABASE_URL)
1219
+ Session = sessionmaker(bind=engine)
1220
+ session = Session()
1221
+ Base.metadata.create_all(engine) # 确保数据库模式已创建
1222
+ print("Database connection successful")
1223
+ break
1224
+ except OperationalError as e:
1225
+ print(f"Database connection failed: {e}")
1226
+ if attempt < retry_attempts - 1:
1227
+ print(f"Retrying in {retry_delay} seconds...")
1228
+ time.sleep(retry_delay)
1229
+ else:
1230
+ print("Failed to connect to the database after multiple attempts.")
1231
+ raise
1232
+
1233
+ class Image(Base):
1234
+ __tablename__ = 'images'
1235
+ id = Column(Integer, primary_key=True, autoincrement=True)
1236
+ filename = Column(String, unique=True, nullable=False)
1237
+ data = Column(LargeBinary, nullable=False)
1238
+
1239
+ init_db()
1240
+
1241
+ # 启动代码解释器容器
1242
+ def start_container(port):
1243
+ container_name = f"code_interpreter_docker_{uuid.uuid4()}"
1244
+ run_command = [
1245
+ "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000"
1246
+ ]
1247
+ if MEMORY_LIMIT:
1248
+ run_command.extend(["--memory", MEMORY_LIMIT])
1249
+ if CPU_LIMIT:
1250
+ run_command.extend(["--cpus", CPU_LIMIT])
1251
+ run_command.append(INTERPRETER_IMAGE)
1252
+
1253
+ subprocess.run(run_command, check=True)
1254
+ print(f"Started container {container_name} on port {port}")
1255
+ return container_name, port
1256
+
1257
+ def stop_container(container_name):
1258
+ subprocess.run(["docker", "stop", container_name])
1259
+ subprocess.run(["docker", "rm", container_name])
1260
+ print(f"Stopped and removed container {container_name}")
1261
+
1262
+ def start_containers():
1263
+ for port in ports:
1264
+ container_name, port = start_container(port)
1265
+ containers.append((container_name, port))
1266
+
1267
+ def stop_containers():
1268
+ for container_name, _ in containers:
1269
+ stop_container(container_name)
1270
+
1271
+ @app.route('/runcode', methods=['POST', 'GET'])
1272
+ def run_code():
1273
+ if request.method == 'POST':
1274
+ data = request.get_json()
1275
+ if not data:
1276
+ return jsonify({'error': 'Invalid JSON'}), 400
1277
+ elif request.method == 'GET':
1278
+ query_string = request.query_string.decode('utf-8')
1279
+ data = {'query_string': query_string}
1280
+ else:
1281
+ return jsonify({'error': 'Invalid request method'}), 405
1282
+
1283
+ request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID
1284
+ request_queue.put((request_id, data))
1285
+
1286
+ start_time = time.time()
1287
+ while time.time() - start_time < 10: # 最多等待10秒
1288
+ if request_id in result_dict:
1289
+ output = result_dict.pop(request_id)
1290
+ # 处理返回结果,替换 base64 数据为链接
1291
+ if 'images' in output:
1292
+ try:
1293
+ output = process_images(output)
1294
+ except Exception as e:
1295
+ return jsonify({'error': str(e)}), 500
1296
+ return jsonify(output), 200
1297
+
1298
+ time.sleep(0.1)
1299
+
1300
+ return jsonify({'error': 'Request timed out'}), 504
1301
+
1302
+ def process_images(output):
1303
+ images = output['images']
1304
+ for filename, base64_data in images.items():
1305
+ image_data = base64.b64decode(base64_data)
1306
+ unique_filename = f"{uuid.uuid4()}_{filename}"
1307
+ image_record = Image(filename=unique_filename, data=image_data)
1308
+ try:
1309
+ session.add(image_record)
1310
+ session.commit()
1311
+ except IntegrityError as e:
1312
+ session.rollback()
1313
+ print(f"Database error: {e}")
1314
+ continue
1315
+ if DOMAIN:
1316
+ images[filename] = f"https://{DOMAIN}/image/{unique_filename}"
1317
+ else:
1318
+ images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}"
1319
+ return output
1320
+
1321
+ @app.route('/image/<filename>', methods=['GET'])
1322
+ def serve_image(filename):
1323
+ image_record = session.query(Image).filter_by(filename=filename).first()
1324
+ if image_record:
1325
+ return image_record.data, 200, {'Content-Type': 'image/png'}
1326
+ return jsonify({'error': 'Image not found'}), 404
1327
+
1328
+ def handle_requests():
1329
+ global current_container_index
1330
+ while True:
1331
+ request_id, data = request_queue.get()
1332
+ if data is None:
1333
+ break
1334
+
1335
+ semaphore.acquire() # 获取信号量,确保不超过并发限制
1336
+
1337
+ with lock:
1338
+ container_name, port = containers[current_container_index]
1339
+ current_container_index = (current_container_index + 1) % len(containers)
1340
+
1341
+ try:
1342
+ if 'query_string' in data:
1343
+ response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT_SECONDS)
1344
+ else:
1345
+ response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT_SECONDS)
1346
+ output = response.json()
1347
+ except requests.exceptions.Timeout:
1348
+ output = {'error': 'Code execution timed out'}
1349
+ except Exception as e:
1350
+ output = {'error': str(e)}
1351
+
1352
+ result_dict[request_id] = output # 将结果放入结果字典
1353
+
1354
+ request_queue.task_done()
1355
+ print(f"Finished processing request {request_id}: {output}")
1356
+
1357
+ # 异步重置容器
1358
+ reset_container(container_name, port)
1359
+
1360
+ def reset_container(container_name, port):
1361
+ threading.Thread(target=_reset_container, args=(container_name, port)).start()
1362
+
1363
+ def _reset_container(container_name, port):
1364
+ print(f"Resetting container {container_name}")
1365
+ stop_container(container_name)
1366
+ new_container_name, _ = start_container(port)
1367
+ with lock:
1368
+ for i, (name, p) in enumerate(containers):
1369
+ if name == container_name:
1370
+ containers[i] = (new_container_name, port)
1371
+ break
1372
+ semaphore.release() # 释放信号量
1373
+
1374
+ def signal_handler(signal, frame):
1375
+ print('Stopping containers and exiting program...')
1376
+ stop_containers()
1377
+ if session:
1378
+ session.close()
1379
+ engine.dispose()
1380
+ os._exit(0)
1381
+
1382
+ signal.signal(signal.SIGINT, signal_handler)
1383
+
1384
+ # 启动多个工作线程来处理请求
1385
+ for _ in range(len(ports)):
1386
+ threading.Thread(target=handle_requests).start()
1387
+
1388
+ start_containers()
1389
+ atexit.register(stop_containers)
1390
+
1391
+ if __name__ == '__main__':
1392
+ app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口
concurrent_test.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import aiohttp
2
+ import asyncio
3
+
4
+ async def post_request(session, url, headers, json_data):
5
+ async with session.post(url, headers=headers, json=json_data) as response:
6
+ response_text = await response.text()
7
+ print(response_text)
8
+ return response_text
9
+
10
+ async def main(concurrent_requests):
11
+ url = 'http://127.0.0.1:8000/runcode'
12
+ headers = {
13
+ 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
14
+ 'Content-Type': 'application/json',
15
+ 'Authorization': 'Bearer 114514',
16
+ 'Accept': '*/*',
17
+ 'Host': '127.0.0.1:8000',
18
+ 'Connection': 'keep-alive'
19
+ }
20
+ json_data = {
21
+ "languageType": "python",
22
+ "variables": {},
23
+ "code": "```python\nimport time\nprint('Hello from code block!')\ntime.sleep(3)\n```"
24
+ }
25
+
26
+ async with aiohttp.ClientSession() as session:
27
+ tasks = [post_request(session, url, headers, json_data) for _ in range(concurrent_requests)]
28
+ await asyncio.gather(*tasks)
29
+
30
+ if __name__ == '__main__':
31
+ concurrent_requests = 5 # 设置并发数
32
+ asyncio.run(main(concurrent_requests))
config.yaml ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ domain: "" #填写域名后对外提供链接将使用域名替代 ip:scheduler_port
2
+ interpreter_image: "leezhuuu/code_interpreter"
3
+ scheduler_port: 8000 # 调度中心端口
4
+ interpreter_port_range: # 设置端口范围为 8001-8003,端口范围决定容器数量
5
+ start: 8001
6
+ end: 8003
7
+ dependencies: # 定义容器的依赖项,以下为默认依赖项,可以根据需要自行更改
8
+ - numpy
9
+ - pandas
10
+ - scipy
11
+ - matplotlib
12
+ - seaborn
13
+ - scikit-learn
14
+ - opencv-python
15
+ - opencv-python-headless
16
+ - Pillow
17
+ - requests
18
+ - Flask
19
+ - pyyaml
20
+ - sympy
21
+ - plotly
22
+ - bokeh
23
+ - statsmodels
24
+ - jupyter
25
+ - ipython
26
+ - jupyterlab
27
+ - pytest
28
+ - hypothesis
29
+ - Flask-Cors
30
+ - Werkzeug
31
+ - Gunicorn
32
+ resource_limits:
33
+ memory: "" # 可以自定义内存限制,例如 "2g"、"500m" 等,不填则默认不进行限制
34
+ cpus: "" # 可以自定义 CPU 限制,例如 "1.5"、"0.5" 等,不填则默认不进行限制
35
+ timeout_seconds: 60 # 设置超时时间为 1 分钟(60 秒)
36
+ postgres:
37
+ user: "user" # 数据库用户名,请自行更改
38
+ password: "password" # 数据库密码,请自行更改
39
+ db: "code_interpreter_db"
40
+ host: "localhost"
41
+ port: "5432" # 数据库端口,请自行更改
42
+ mode: "docker" # 选择运行模式,可以是 "docker" 或 "k8s"
43
+ kubeconfig_path: "/path/to/kubeconfig.yaml" # Kubernetes 配置文件路径
44
+ k8s_interpreter_yaml: "/path/to/code-interpreter.yaml" # Kubernetes 部署配置文件路径
requirements.txt ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ numpy
2
+ pandas
3
+ scipy
4
+ matplotlib
5
+ seaborn
6
+ scikit-learn
7
+ opencv-python
8
+ opencv-python-headless
9
+ Pillow
10
+ requests
11
+ Flask
12
+ pyyaml
13
+ sympy
14
+ plotly
15
+ bokeh
16
+ statsmodels
17
+ jupyter
18
+ ipython
19
+ jupyterlab
20
+ pytest
21
+ hypothesis
22
+ Flask-Cors
23
+ Werkzeug
24
+ Gunicorn