leezhuuu commited on
Commit
3726922
·
verified ·
1 Parent(s): f3f259e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +250 -251
app.py CHANGED
@@ -1,251 +1,250 @@
1
- from flask import Flask, request, jsonify, Response
2
- import subprocess
3
- import sys
4
- import threading
5
- import time
6
- import requests
7
- from functools import wraps
8
- import urllib.parse
9
- import re
10
- import json
11
- import base64
12
- import os
13
- import glob
14
-
15
- app = Flask(__name__)
16
-
17
- # 从环境变量中读取配置,如果未设置则使用默认值
18
- OUTPUT_BASE64 = os.getenv('OUTPUT_BASE64', 'True').lower() == 'true'
19
- AUTH_TOKEN = os.getenv('AUTH_TOKEN', '')
20
- # HOST = os.getenv('HOST', '127.0.0.1')
21
- HOST = os.getenv('HOST', '0.0.0.0')
22
- PORT = int(os.getenv('PORT', 5000))
23
- TIMEOUT = int(os.getenv('TIMEOUT', 30))
24
- REMOVE_PYTHON_CODE_BLOCK = os.getenv('REMOVE_PYTHON_CODE_BLOCK', 'True').lower() == 'true'
25
-
26
- # 配置变量,决定是否启用单元测试,默认启用
27
- ENABLE_UNIT_TESTS = False
28
-
29
- def auth_required(f):
30
- @wraps(f)
31
- def decorated(*args, **kwargs):
32
- if AUTH_TOKEN:
33
- auth_header = request.headers.get('Authorization')
34
- if not auth_header or not auth_header.startswith('Bearer '):
35
- return jsonify({'error': 'Authorization header is missing or invalid'}), 401
36
- token = auth_header.split('Bearer ')[1]
37
- if token != AUTH_TOKEN:
38
- return jsonify({'error': 'Invalid token'}), 401
39
- return f(*args, **kwargs)
40
- return decorated
41
-
42
- def extract_code(code):
43
- if REMOVE_PYTHON_CODE_BLOCK:
44
- match = re.search(r'```python\s*(.*?)\s*```', code, re.DOTALL)
45
- if match:
46
- return match.group(1)
47
- return code
48
-
49
- def run_code(language, code, variables, timeout):
50
- try:
51
- # 根据语言类型选择解释器
52
- if language == 'python':
53
- # 提取代码中的有效部分
54
- code = extract_code(code)
55
- # 将变量注入到代码环境中
56
- var_definitions = '\n'.join(f"{key} = {value}" for key, value in variables.items())
57
- code_with_vars = f"{var_definitions}\n{code}"
58
- # 使用subprocess运行代码
59
- result = subprocess.run([sys.executable, '-c', code_with_vars], capture_output=True, text=True, timeout=timeout)
60
- return result.stdout, result.stderr
61
- else:
62
- return None, "Unsupported language"
63
- except subprocess.TimeoutExpired:
64
- return None, "Code execution timed out"
65
- except Exception as e:
66
- return None, str(e)
67
-
68
- def convert_image_to_base64(image_path):
69
- with open(image_path, "rb") as image_file:
70
- encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
71
- return encoded_string
72
-
73
- def unescape_string(s):
74
- return s.encode('utf-8').decode('unicode_escape')
75
-
76
- @app.route('/runcode', methods=['POST', 'GET'])
77
- @auth_required
78
- def run_code_endpoint():
79
- if request.method == 'POST':
80
- data = request.get_json()
81
- if not data:
82
- return jsonify({'error': 'Invalid JSON'}), 400
83
- language = data.get('languageType', 'python')
84
- variables = data.get('variables', {})
85
- code = data.get('code', '')
86
- elif request.method == 'GET':
87
- query_string = request.query_string.decode('utf-8')
88
- print("Query String:",query_string)
89
- # 检查query_string中是否有"+"
90
- if '+' in query_string:
91
- # query_string的+替换为空格
92
- query_string = re.sub(r'\+', ' ', query_string)
93
- print("Query String:",query_string)
94
-
95
- # 如果没有+,则直接输出query_string
96
- else:
97
- print("Query String:",query_string)
98
-
99
-
100
- # 使用正则表达式提取参数
101
- language_match = re.search(r'languageType=("?)([^"&]+)\1', query_string)
102
- variables_match = re.search(r'variables=("?)([^"&]+)\1', query_string)
103
- code_match = re.search(r'code=("?)([^"&]+)\1', query_string)
104
- print("code_match:",code_match)
105
- if not language_match or not variables_match or not code_match:
106
- return jsonify({'error': 'Invalid parameters'}), 400
107
-
108
-
109
-
110
- language = urllib.parse.unquote(language_match.group(2))
111
- variables = urllib.parse.unquote(variables_match.group(2))
112
- code = urllib.parse.unquote(code_match.group(2))
113
- print("code:",code)
114
-
115
- # 处理转义字符
116
- language = unescape_string(language)
117
- variables = unescape_string(variables)
118
- code = unescape_string(code)
119
- print("code:",code)
120
-
121
- # # 使用正则表达式将 + 替换为空格
122
- # code = re.sub(r'\+', ' ', code)
123
- # print("code:",code)
124
-
125
-
126
- try:
127
- variables = json.loads(variables)
128
- except json.JSONDecodeError:
129
- return jsonify({'error': 'Invalid variables format'}), 400
130
-
131
- # 打印解析后的JSON参数
132
- print("Parsed JSON Parameters:")
133
- print(json.dumps({
134
- 'languageType': language,
135
- 'variables': variables,
136
- 'code': code
137
- }, indent=4))
138
- else:
139
- return jsonify({'error': 'Unsupported HTTP method'}), 405
140
-
141
- # 运行代码
142
- stdout, stderr = run_code(language, code, variables, TIMEOUT)
143
-
144
- if stderr:
145
- return jsonify({'error': stderr}), 400
146
- else:
147
- # 查找当前目录中的所有图片文件
148
- image_files = glob.glob('*.png') + glob.glob('*.jpg') + glob.glob('*.jpeg') + glob.glob('*.gif')
149
-
150
- if image_files:
151
- image_data = {}
152
- for image_path in image_files:
153
- encoded_image = convert_image_to_base64(image_path)
154
- image_data[image_path] = encoded_image
155
- os.remove(image_path) # 删除生成的图片文件
156
-
157
- if not OUTPUT_BASE64:
158
- # 如果 OUTPUT_BASE64 不是 True,则输出原始图片
159
- html_content = ''.join([f'<img src="{image_path}" alt="{image_path}" />' for image_path in image_data.keys()])
160
- return Response(html_content, mimetype='text/html')
161
-
162
- else:
163
- return jsonify({'output': stdout, 'images': image_data}), 200
164
- else:
165
- return jsonify({'output': stdout}), 200
166
-
167
- def run_test_case(test_case):
168
- url = f'http://{HOST}:{PORT}/runcode'
169
- if test_case['method'] == 'GET':
170
- params = {
171
- 'languageType': test_case['language'],
172
- 'variables': test_case['variables'],
173
- 'code': test_case['code']
174
- }
175
- headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
176
- print("Request Params:")
177
- print(params)
178
-
179
- response = requests.get(url, params=params, headers=headers)
180
- elif test_case['method'] == 'POST':
181
- data = {
182
- 'languageType': test_case['language'],
183
- 'variables': test_case['variables'],
184
- 'code': test_case['code']
185
- }
186
- headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
187
- print("Request Data:")
188
- print(data)
189
-
190
- response = requests.post(url, json=data, headers=headers)
191
- else:
192
- raise ValueError("Unsupported HTTP method in test case")
193
-
194
- # 打印返回JSON
195
- print("Response JSON:")
196
- print(response.json())
197
-
198
- def test_run_code_endpoint():
199
- test_cases = [
200
- {
201
- 'method': 'GET',
202
- 'language': 'python',
203
- 'variables': str({}),
204
- 'code': "print('Hello, World!')"
205
- },
206
- {
207
- 'method': 'GET',
208
- 'language': '"python"',
209
- 'variables': str({}),
210
- 'code': '"print(\'Hello, World!\')"'
211
- },
212
- {
213
- 'method': 'GET',
214
- 'language': 'python',
215
- 'variables': str({}),
216
- 'code': "print('Hello%20World')"
217
- },
218
- {
219
- 'method': 'POST',
220
- 'language': 'python',
221
- 'variables': {},
222
- 'code': "print(5 ** 11)"
223
- },
224
- {
225
- 'method': 'POST',
226
- 'language': 'python',
227
- 'variables': {'m': 7, 'n': 4},
228
- 'code': "print(m * n)"
229
- },
230
- {
231
- 'method': 'POST',
232
- 'language': 'python',
233
- 'variables': {},
234
- 'code': "```python\nprint('Hello from code block!')\n```"
235
- }
236
- ]
237
-
238
- for test_case in test_cases:
239
- run_test_case(test_case)
240
-
241
- if __name__ == '__main__':
242
- # 启动Flask应用
243
- threading.Thread(target=app.run, kwargs={'host': HOST, 'port': PORT, 'debug': False}).start()
244
-
245
- # 等待Flask应用启动
246
- time.sleep(1)
247
-
248
- # 运行单元测试
249
- if ENABLE_UNIT_TESTS:
250
- test_run_code_endpoint()
251
- print("All tests executed!")
 
1
+ from flask import Flask, request, jsonify, Response, render_template_string
2
+ import subprocess
3
+ import sys
4
+ import threading
5
+ import time
6
+ import requests
7
+ from functools import wraps
8
+ import urllib.parse
9
+ import re
10
+ import json
11
+ import base64
12
+ import os
13
+ import glob
14
+
15
+ app = Flask(__name__)
16
+
17
+ # 从环境变量中读取配置,如果未设置则使用默认值
18
+ OUTPUT_BASE64 = os.getenv('OUTPUT_BASE64', 'True').lower() == 'true'
19
+ AUTH_TOKEN = os.getenv('AUTH_TOKEN', '')
20
+ # HOST = os.getenv('HOST', '127.0.0.1')
21
+ HOST = os.getenv('HOST', '0.0.0.0')
22
+ PORT = int(os.getenv('PORT', 5000))
23
+ TIMEOUT = int(os.getenv('TIMEOUT', 30))
24
+ REMOVE_PYTHON_CODE_BLOCK = os.getenv('REMOVE_PYTHON_CODE_BLOCK', 'True').lower() == 'true'
25
+
26
+ # 配置变量,决定是否启用单元测试,默认启用
27
+ ENABLE_UNIT_TESTS = False
28
+
29
+ def auth_required(f):
30
+ @wraps(f)
31
+ def decorated(*args, **kwargs):
32
+ if AUTH_TOKEN:
33
+ auth_header = request.headers.get('Authorization')
34
+ if not auth_header or not auth_header.startswith('Bearer '):
35
+ return jsonify({'error': 'Authorization header is missing or invalid'}), 401
36
+ token = auth_header.split('Bearer ')[1]
37
+ if token != AUTH_TOKEN:
38
+ return jsonify({'error': 'Invalid token'}), 401
39
+ return f(*args, **kwargs)
40
+ return decorated
41
+
42
+ def extract_code(code):
43
+ if REMOVE_PYTHON_CODE_BLOCK:
44
+ match = re.search(r'```python\s*(.*?)\s*```', code, re.DOTALL)
45
+ if match:
46
+ return match.group(1)
47
+ return code
48
+
49
+ def run_code(language, code, variables, timeout):
50
+ try:
51
+ # 根据语言类型选择解释器
52
+ if language == 'python':
53
+ # 提取代码中的有效部分
54
+ code = extract_code(code)
55
+ # 将变量注入到代码环境中
56
+ var_definitions = '\n'.join(f"{key} = {value}" for key, value in variables.items())
57
+ code_with_vars = f"{var_definitions}\n{code}"
58
+ # 使用subprocess运行代码
59
+ result = subprocess.run([sys.executable, '-c', code_with_vars], capture_output=True, text=True, timeout=timeout)
60
+ return result.stdout, result.stderr
61
+ else:
62
+ return None, "Unsupported language"
63
+ except subprocess.TimeoutExpired:
64
+ return None, "Code execution timed out"
65
+ except Exception as e:
66
+ return None, str(e)
67
+
68
+ def convert_image_to_base64(image_path):
69
+ with open(image_path, "rb") as image_file:
70
+ encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
71
+ return encoded_string
72
+
73
+ def unescape_string(s):
74
+ return s.encode('utf-8').decode('unicode_escape')
75
+
76
+ @app.route('/runcode', methods=['POST', 'GET'])
77
+ @auth_required
78
+ def run_code_endpoint():
79
+ if request.method == 'POST':
80
+ data = request.get_json()
81
+ if not data:
82
+ return jsonify({'error': 'Invalid JSON'}), 400
83
+ language = data.get('languageType', 'python')
84
+ variables = data.get('variables', {})
85
+ code = data.get('code', '')
86
+ elif request.method == 'GET':
87
+ query_string = request.query_string.decode('utf-8')
88
+ print("Query String:",query_string)
89
+ # 检查query_string中是否有"+"
90
+ if '+' in query_string:
91
+ # query_string的+替换为空格
92
+ query_string = re.sub(r'\+', ' ', query_string)
93
+ print("Query String:",query_string)
94
+
95
+ # 如果没有+,则直接输出query_string
96
+ else:
97
+ print("Query String:",query_string)
98
+
99
+
100
+ # 使用正则表达式提取参数
101
+ language_match = re.search(r'languageType=("?)([^"&]+)\1', query_string)
102
+ variables_match = re.search(r'variables=("?)([^"&]+)\1', query_string)
103
+ code_match = re.search(r'code=("?)([^"&]+)\1', query_string)
104
+ print("code_match:",code_match)
105
+ if not language_match or not variables_match or not code_match:
106
+ return jsonify({'error': 'Invalid parameters'}), 400
107
+
108
+
109
+
110
+ language = urllib.parse.unquote(language_match.group(2))
111
+ variables = urllib.parse.unquote(variables_match.group(2))
112
+ code = urllib.parse.unquote(code_match.group(2))
113
+ print("code:",code)
114
+
115
+ # 处理转义字符
116
+ language = unescape_string(language)
117
+ variables = unescape_string(variables)
118
+ code = unescape_string(code)
119
+ print("code:",code)
120
+
121
+ try:
122
+ variables = json.loads(variables)
123
+ except json.JSONDecodeError:
124
+ return jsonify({'error': 'Invalid variables format'}), 400
125
+
126
+ # 打印解析后的JSON参数
127
+ print("Parsed JSON Parameters:")
128
+ print(json.dumps({
129
+ 'languageType': language,
130
+ 'variables': variables,
131
+ 'code': code
132
+ }, indent=4))
133
+ else:
134
+ return jsonify({'error': 'Unsupported HTTP method'}), 405
135
+
136
+ # 运行代码
137
+ stdout, stderr = run_code(language, code, variables, TIMEOUT)
138
+
139
+ if stderr:
140
+ return jsonify({'error': stderr}), 400
141
+ else:
142
+ # 查找当前目录中的所有图片文件
143
+ image_files = glob.glob('*.png') + glob.glob('*.jpg') + glob.glob('*.jpeg') + glob.glob('*.gif')
144
+
145
+ if image_files:
146
+ image_data = {}
147
+ for image_path in image_files:
148
+ encoded_image = convert_image_to_base64(image_path)
149
+ image_data[image_path] = encoded_image
150
+ os.remove(image_path) # 删除生成的图片文件
151
+
152
+ if not OUTPUT_BASE64:
153
+ # 如果 OUTPUT_BASE64 不是 True,则输出原始图片
154
+ html_content = ''.join([f'<img src="{image_path}" alt="{image_path}" />' for image_path in image_data.keys()])
155
+ return Response(html_content, mimetype='text/html')
156
+
157
+ else:
158
+ return jsonify({'output': stdout, 'images': image_data}), 200
159
+ else:
160
+ return jsonify({'output': stdout}), 200
161
+
162
+ @app.route('/', methods=['GET'])
163
+ def welcome_page():
164
+ return render_template_string('<h1>欢迎使用lee\'s code-interpreter</h1>')
165
+
166
+ def run_test_case(test_case):
167
+ url = f'http://{HOST}:{PORT}/runcode'
168
+ if test_case['method'] == 'GET':
169
+ params = {
170
+ 'languageType': test_case['language'],
171
+ 'variables': test_case['variables'],
172
+ 'code': test_case['code']
173
+ }
174
+ headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
175
+ print("Request Params:")
176
+ print(params)
177
+
178
+ response = requests.get(url, params=params, headers=headers)
179
+ elif test_case['method'] == 'POST':
180
+ data = {
181
+ 'languageType': test_case['language'],
182
+ 'variables': test_case['variables'],
183
+ 'code': test_case['code']
184
+ }
185
+ headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
186
+ print("Request Data:")
187
+ print(data)
188
+
189
+ response = requests.post(url, json=data, headers=headers)
190
+ else:
191
+ raise ValueError("Unsupported HTTP method in test case")
192
+
193
+ # 打印返回JSON
194
+ print("Response JSON:")
195
+ print(response.json())
196
+
197
+ def test_run_code_endpoint():
198
+ test_cases = [
199
+ {
200
+ 'method': 'GET',
201
+ 'language': 'python',
202
+ 'variables': str({}),
203
+ 'code': "print('Hello, World!')"
204
+ },
205
+ {
206
+ 'method': 'GET',
207
+ 'language': '"python"',
208
+ 'variables': str({}),
209
+ 'code': '"print(\'Hello, World!\')"'
210
+ },
211
+ {
212
+ 'method': 'GET',
213
+ 'language': 'python',
214
+ 'variables': str({}),
215
+ 'code': "print('Hello%20World')"
216
+ },
217
+ {
218
+ 'method': 'POST',
219
+ 'language': 'python',
220
+ 'variables': {},
221
+ 'code': "print(5 ** 11)"
222
+ },
223
+ {
224
+ 'method': 'POST',
225
+ 'language': 'python',
226
+ 'variables': {'m': 7, 'n': 4},
227
+ 'code': "print(m * n)"
228
+ },
229
+ {
230
+ 'method': 'POST',
231
+ 'language': 'python',
232
+ 'variables': {},
233
+ 'code': "```python\nprint('Hello from code block!')\n```"
234
+ }
235
+ ]
236
+
237
+ for test_case in test_cases:
238
+ run_test_case(test_case)
239
+
240
+ if __name__ == '__main__':
241
+ # 启动Flask应用
242
+ threading.Thread(target=app.run, kwargs={'host': HOST, 'port': PORT, 'debug': False}).start()
243
+
244
+ # 等待Flask应用启动
245
+ time.sleep(1)
246
+
247
+ # 运行单元测试
248
+ if ENABLE_UNIT_TESTS:
249
+ test_run_code_endpoint()
250
+ print("All tests executed!")