| # # # # import yaml | |
| # # # # import subprocess | |
| # # # # import uuid | |
| # # # # from flask import Flask, request, jsonify | |
| # # # # from queue import Queue | |
| # # # # import requests | |
| # # # # import threading | |
| # # # # import time | |
| # # # # import os | |
| # # # # import atexit | |
| # # # # import base64 | |
| # # # # import signal | |
| # # # # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer | |
| # # # # from sqlalchemy.orm import declarative_base, sessionmaker | |
| # # # # from sqlalchemy.exc import OperationalError, IntegrityError | |
| # # # # import psycopg2 | |
| # # # # app = Flask(__name__) | |
| # # # # # 读取配置文件,指定编码为 utf-8 | |
| # # # # with open('config.yaml', 'r', encoding='utf-8') as file: | |
| # # # # config = yaml.safe_load(file) | |
| # # # # DOMAIN = config['domain'] | |
| # # # # INTERPRETER_IMAGE = config['interpreter_image'] | |
| # # # # PORT_START = config['interpreter_port_range']['start'] | |
| # # # # PORT_END = config['interpreter_port_range']['end'] | |
| # # # # DEPENDENCIES = config['dependencies'] | |
| # # # # RESOURCE_LIMITS = config.get('resource_limits', {}) | |
| # # # # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory') | |
| # # # # CPU_LIMIT = RESOURCE_LIMITS.get('cpus') | |
| # # # # POSTGRES = config['postgres'] | |
| # # # # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒 | |
| # # # # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口 | |
| # # # # # 容器管理 | |
| # # # # containers = [] | |
| # # # # ports = list(range(PORT_START, PORT_END + 1)) | |
| # # # # lock = threading.Lock() | |
| # # # # request_queue = Queue() | |
| # # # # result_dict = {} # 用于存储结果的字典 | |
| # # # # current_container_index = 0 | |
| # # # # # 检查 PostgreSQL 容器是否存在 | |
| # # # # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter" | |
| # # # # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata') | |
| # # # # def is_postgres_container_running(): | |
| # # # # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True) | |
| # # # # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n') | |
| # # # # def wait_for_postgres_ready(): | |
| # # # # retry_attempts = 10 | |
| # # # # retry_delay = 5 # seconds | |
| # # # # for attempt in range(retry_attempts): | |
| # # # # try: | |
| # # # # conn = psycopg2.connect( | |
| # # # # dbname=POSTGRES['db'], | |
| # # # # user=POSTGRES['user'], | |
| # # # # password=POSTGRES['password'], | |
| # # # # host=POSTGRES['host'], | |
| # # # # port=POSTGRES['port'] | |
| # # # # ) | |
| # # # # conn.close() | |
| # # # # print("PostgreSQL is ready") | |
| # # # # return True | |
| # # # # except psycopg2.OperationalError as e: | |
| # # # # print(f"Waiting for PostgreSQL to be ready: {e}") | |
| # # # # time.sleep(retry_delay) | |
| # # # # return False | |
| # # # # if not is_postgres_container_running(): | |
| # # # # if not os.path.exists(POSTGRES_DATA_DIR): | |
| # # # # os.makedirs(POSTGRES_DATA_DIR) | |
| # # # # subprocess.run([ | |
| # # # # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d", | |
| # # # # "-e", f"POSTGRES_USER={POSTGRES['user']}", | |
| # # # # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}", | |
| # # # # "-e", f"POSTGRES_DB={POSTGRES['db']}", | |
| # # # # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data", | |
| # # # # "-p", f"{POSTGRES['port']}:5432", | |
| # # # # "postgres" | |
| # # # # ], check=True) | |
| # # # # if not wait_for_postgres_ready(): | |
| # # # # print("Failed to connect to PostgreSQL after multiple attempts.") | |
| # # # # exit(1) | |
| # # # # # PostgreSQL 配置 | |
| # # # # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}" | |
| # # # # engine = None | |
| # # # # Base = declarative_base() | |
| # # # # Session = None | |
| # # # # session = None | |
| # # # # def init_db(): | |
| # # # # global engine, Session, session | |
| # # # # retry_attempts = 5 | |
| # # # # retry_delay = 5 # seconds | |
| # # # # for attempt in range(retry_attempts): | |
| # # # # try: | |
| # # # # engine = create_engine(DATABASE_URL) | |
| # # # # Session = sessionmaker(bind=engine) | |
| # # # # session = Session() | |
| # # # # Base.metadata.create_all(engine) # 确保数据库模式已创建 | |
| # # # # print("Database connection successful") | |
| # # # # break | |
| # # # # except OperationalError as e: | |
| # # # # print(f"Database connection failed: {e}") | |
| # # # # if attempt < retry_attempts - 1: | |
| # # # # print(f"Retrying in {retry_delay} seconds...") | |
| # # # # time.sleep(retry_delay) | |
| # # # # else: | |
| # # # # print("Failed to connect to the database after multiple attempts.") | |
| # # # # raise | |
| # # # # class Image(Base): | |
| # # # # __tablename__ = 'images' | |
| # # # # id = Column(Integer, primary_key=True, autoincrement=True) | |
| # # # # filename = Column(String, unique=True, nullable=False) | |
| # # # # data = Column(LargeBinary, nullable=False) | |
| # # # # init_db() | |
| # # # # # 启动代码解释器容器 | |
| # # # # def start_container(port): | |
| # # # # container_name = f"code_interpreter_docker_{uuid.uuid4()}" | |
| # # # # run_command = [ | |
| # # # # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000" | |
| # # # # ] | |
| # # # # if MEMORY_LIMIT: | |
| # # # # run_command.extend(["--memory", MEMORY_LIMIT]) | |
| # # # # if CPU_LIMIT: | |
| # # # # run_command.extend(["--cpus", CPU_LIMIT]) | |
| # # # # run_command.append(INTERPRETER_IMAGE) | |
| # # # # subprocess.run(run_command, check=True) | |
| # # # # print(f"Started container {container_name} on port {port}") | |
| # # # # return container_name, port | |
| # # # # def stop_container(container_name): | |
| # # # # subprocess.run(["docker", "stop", container_name]) | |
| # # # # subprocess.run(["docker", "rm", container_name]) | |
| # # # # print(f"Stopped and removed container {container_name}") | |
| # # # # def start_containers(): | |
| # # # # for port in ports: | |
| # # # # container_name, port = start_container(port) | |
| # # # # containers.append((container_name, port)) | |
| # # # # def stop_containers(): | |
| # # # # for container_name, _ in containers: | |
| # # # # stop_container(container_name) | |
| # # # # @app.route('/runcode', methods=['POST', 'GET']) | |
| # # # # def run_code(): | |
| # # # # if request.method == 'POST': | |
| # # # # data = request.get_json() | |
| # # # # if not data: | |
| # # # # return jsonify({'error': 'Invalid JSON'}), 400 | |
| # # # # elif request.method == 'GET': | |
| # # # # query_string = request.query_string.decode('utf-8') | |
| # # # # data = {'query_string': query_string} | |
| # # # # else: | |
| # # # # return jsonify({'error': 'Invalid request method'}), 405 | |
| # # # # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID | |
| # # # # request_queue.put((request_id, data)) | |
| # # # # start_time = time.time() | |
| # # # # while time.time() - start_time < 10: # 最多等待10秒 | |
| # # # # if request_id in result_dict: | |
| # # # # output = result_dict.pop(request_id) | |
| # # # # # 处理返回结果,替换 base64 数据为链接 | |
| # # # # if 'images' in output: | |
| # # # # try: | |
| # # # # output = process_images(output) | |
| # # # # except Exception as e: | |
| # # # # return jsonify({'error': str(e)}), 500 | |
| # # # # return jsonify(output), 200 | |
| # # # # time.sleep(0.1) | |
| # # # # return jsonify({'error': 'Request timed out'}), 504 | |
| # # # # def process_images(output): | |
| # # # # images = output['images'] | |
| # # # # for filename, base64_data in images.items(): | |
| # # # # image_data = base64.b64decode(base64_data) | |
| # # # # unique_filename = f"{uuid.uuid4()}_{filename}" | |
| # # # # image_record = Image(filename=unique_filename, data=image_data) | |
| # # # # try: | |
| # # # # session.add(image_record) | |
| # # # # session.commit() | |
| # # # # except IntegrityError as e: | |
| # # # # session.rollback() | |
| # # # # print(f"Database error: {e}") | |
| # # # # continue | |
| # # # # if DOMAIN: | |
| # # # # images[filename] = f"https://{DOMAIN}/image/{unique_filename}" | |
| # # # # else: | |
| # # # # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}" | |
| # # # # return output | |
| # # # # @app.route('/image/<filename>', methods=['GET']) | |
| # # # # def serve_image(filename): | |
| # # # # image_record = session.query(Image).filter_by(filename=filename).first() | |
| # # # # if image_record: | |
| # # # # return image_record.data, 200, {'Content-Type': 'image/png'} | |
| # # # # return jsonify({'error': 'Image not found'}), 404 | |
| # # # # def handle_requests(): | |
| # # # # global current_container_index | |
| # # # # while True: | |
| # # # # request_id, data = request_queue.get() | |
| # # # # if data is None: | |
| # # # # break | |
| # # # # with lock: | |
| # # # # container_name, port = containers[current_container_index] | |
| # # # # current_container_index = (current_container_index + 1) % len(containers) | |
| # # # # try: | |
| # # # # if 'query_string' in data: | |
| # # # # # GET 请求处理 | |
| # # # # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT) | |
| # # # # else: | |
| # # # # # POST 请求处理 | |
| # # # # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT) | |
| # # # # output = response.json() | |
| # # # # except requests.exceptions.Timeout: | |
| # # # # output = {'error': 'Code execution timed out'} | |
| # # # # reset_container(container_name, port) | |
| # # # # except Exception as e: | |
| # # # # output = {'error': str(e)} | |
| # # # # result_dict[request_id] = output # 将结果放入结果字典 | |
| # # # # request_queue.task_done() | |
| # # # # print(f"Finished processing request {request_id}: {output}") | |
| # # # # def reset_container(container_name, port): | |
| # # # # print(f"Resetting container {container_name}") | |
| # # # # stop_container(container_name) | |
| # # # # new_container_name, _ = start_container(port) | |
| # # # # with lock: | |
| # # # # for i, (name, p) in enumerate(containers): | |
| # # # # if name == container_name: | |
| # # # # containers[i] = (new_container_name, port) | |
| # # # # break | |
| # # # # def signal_handler(signal, frame): | |
| # # # # print('Stopping containers and exiting program...') | |
| # # # # stop_containers() | |
| # # # # if session: | |
| # # # # session.close() | |
| # # # # engine.dispose() | |
| # # # # os._exit(0) | |
| # # # # signal.signal(signal.SIGINT, signal_handler) | |
| # # # # worker_thread = threading.Thread(target=handle_requests) | |
| # # # # worker_thread.start() | |
| # # # # start_containers() | |
| # # # # atexit.register(stop_containers) | |
| # # # # if __name__ == '__main__': | |
| # # # # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口 | |
| # # # import yaml | |
| # # # import subprocess | |
| # # # import uuid | |
| # # # from flask import Flask, request, jsonify | |
| # # # from queue import Queue | |
| # # # import requests | |
| # # # import threading | |
| # # # import time | |
| # # # import os | |
| # # # import atexit | |
| # # # import base64 | |
| # # # import signal | |
| # # # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer | |
| # # # from sqlalchemy.orm import declarative_base, sessionmaker | |
| # # # from sqlalchemy.exc import OperationalError, IntegrityError | |
| # # # import psycopg2 | |
| # # # app = Flask(__name__) | |
| # # # # 读取配置文件,指定编码为 utf-8 | |
| # # # with open('config.yaml', 'r', encoding='utf-8') as file: | |
| # # # config = yaml.safe_load(file) | |
| # # # DOMAIN = config['domain'] | |
| # # # INTERPRETER_IMAGE = config['interpreter_image'] | |
| # # # PORT_START = config['interpreter_port_range']['start'] | |
| # # # PORT_END = config['interpreter_port_range']['end'] | |
| # # # DEPENDENCIES = config['dependencies'] | |
| # # # RESOURCE_LIMITS = config.get('resource_limits', {}) | |
| # # # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory') | |
| # # # CPU_LIMIT = RESOURCE_LIMITS.get('cpus') | |
| # # # POSTGRES = config['postgres'] | |
| # # # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒 | |
| # # # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口 | |
| # # # # 容器管理 | |
| # # # containers = [] | |
| # # # ports = list(range(PORT_START, PORT_END + 1)) | |
| # # # lock = threading.Lock() | |
| # # # request_queue = Queue() | |
| # # # result_dict = {} # 用于存储结果的字典 | |
| # # # current_container_index = 0 | |
| # # # # 检查 PostgreSQL 容器是否存在 | |
| # # # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter" | |
| # # # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata') | |
| # # # def is_postgres_container_running(): | |
| # # # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True) | |
| # # # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n') | |
| # # # def wait_for_postgres_ready(): | |
| # # # retry_attempts = 10 | |
| # # # retry_delay = 5 # seconds | |
| # # # for attempt in range(retry_attempts): | |
| # # # try: | |
| # # # conn = psycopg2.connect( | |
| # # # dbname=POSTGRES['db'], | |
| # # # user=POSTGRES['user'], | |
| # # # password=POSTGRES['password'], | |
| # # # host=POSTGRES['host'], | |
| # # # port=POSTGRES['port'] | |
| # # # ) | |
| # # # conn.close() | |
| # # # print("PostgreSQL is ready") | |
| # # # return True | |
| # # # except psycopg2.OperationalError as e: | |
| # # # print(f"Waiting for PostgreSQL to be ready: {e}") | |
| # # # time.sleep(retry_delay) | |
| # # # return False | |
| # # # if not is_postgres_container_running(): | |
| # # # if not os.path.exists(POSTGRES_DATA_DIR): | |
| # # # os.makedirs(POSTGRES_DATA_DIR) | |
| # # # subprocess.run([ | |
| # # # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d", | |
| # # # "-e", f"POSTGRES_USER={POSTGRES['user']}", | |
| # # # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}", | |
| # # # "-e", f"POSTGRES_DB={POSTGRES['db']}", | |
| # # # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data", | |
| # # # "-p", f"{POSTGRES['port']}:5432", | |
| # # # "postgres" | |
| # # # ], check=True) | |
| # # # if not wait_for_postgres_ready(): | |
| # # # print("Failed to connect to PostgreSQL after multiple attempts.") | |
| # # # exit(1) | |
| # # # # PostgreSQL 配置 | |
| # # # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}" | |
| # # # engine = None | |
| # # # Base = declarative_base() | |
| # # # Session = None | |
| # # # session = None | |
| # # # def init_db(): | |
| # # # global engine, Session, session | |
| # # # retry_attempts = 5 | |
| # # # retry_delay = 5 # seconds | |
| # # # for attempt in range(retry_attempts): | |
| # # # try: | |
| # # # engine = create_engine(DATABASE_URL) | |
| # # # Session = sessionmaker(bind=engine) | |
| # # # session = Session() | |
| # # # Base.metadata.create_all(engine) # 确保数据库模式已创建 | |
| # # # print("Database connection successful") | |
| # # # break | |
| # # # except OperationalError as e: | |
| # # # print(f"Database connection failed: {e}") | |
| # # # if attempt < retry_attempts - 1: | |
| # # # print(f"Retrying in {retry_delay} seconds...") | |
| # # # time.sleep(retry_delay) | |
| # # # else: | |
| # # # print("Failed to connect to the database after multiple attempts.") | |
| # # # raise | |
| # # # class Image(Base): | |
| # # # __tablename__ = 'images' | |
| # # # id = Column(Integer, primary_key=True, autoincrement=True) | |
| # # # filename = Column(String, unique=True, nullable=False) | |
| # # # data = Column(LargeBinary, nullable=False) | |
| # # # init_db() | |
| # # # # 启动代码解释器容器 | |
| # # # def start_container(port): | |
| # # # container_name = f"code_interpreter_docker_{uuid.uuid4()}" | |
| # # # run_command = [ | |
| # # # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000" | |
| # # # ] | |
| # # # if MEMORY_LIMIT: | |
| # # # run_command.extend(["--memory", MEMORY_LIMIT]) | |
| # # # if CPU_LIMIT: | |
| # # # run_command.extend(["--cpus", CPU_LIMIT]) | |
| # # # run_command.append(INTERPRETER_IMAGE) | |
| # # # subprocess.run(run_command, check=True) | |
| # # # print(f"Started container {container_name} on port {port}") | |
| # # # return container_name, port | |
| # # # def stop_container(container_name): | |
| # # # subprocess.run(["docker", "stop", container_name]) | |
| # # # subprocess.run(["docker", "rm", container_name]) | |
| # # # print(f"Stopped and removed container {container_name}") | |
| # # # def start_containers(): | |
| # # # for port in ports: | |
| # # # container_name, port = start_container(port) | |
| # # # containers.append((container_name, port)) | |
| # # # def stop_containers(): | |
| # # # for container_name, _ in containers: | |
| # # # stop_container(container_name) | |
| # # # @app.route('/runcode', methods=['POST', 'GET']) | |
| # # # def run_code(): | |
| # # # if request.method == 'POST': | |
| # # # data = request.get_json() | |
| # # # if not data: | |
| # # # return jsonify({'error': 'Invalid JSON'}), 400 | |
| # # # elif request.method == 'GET': | |
| # # # query_string = request.query_string.decode('utf-8') | |
| # # # data = {'query_string': query_string} | |
| # # # else: | |
| # # # return jsonify({'error': 'Invalid request method'}), 405 | |
| # # # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID | |
| # # # request_queue.put((request_id, data)) | |
| # # # start_time = time.time() | |
| # # # while time.time() - start_time < 10: # 最多等待10秒 | |
| # # # if request_id in result_dict: | |
| # # # output = result_dict.pop(request_id) | |
| # # # # 处理返回结果,替换 base64 数据为链接 | |
| # # # if 'images' in output: | |
| # # # try: | |
| # # # output = process_images(output) | |
| # # # except Exception as e: | |
| # # # return jsonify({'error': str(e)}), 500 | |
| # # # return jsonify(output), 200 | |
| # # # time.sleep(0.1) | |
| # # # return jsonify({'error': 'Request timed out'}), 504 | |
| # # # def process_images(output): | |
| # # # images = output['images'] | |
| # # # for filename, base64_data in images.items(): | |
| # # # image_data = base64.b64decode(base64_data) | |
| # # # unique_filename = f"{uuid.uuid4()}_{filename}" | |
| # # # image_record = Image(filename=unique_filename, data=image_data) | |
| # # # try: | |
| # # # session.add(image_record) | |
| # # # session.commit() | |
| # # # except IntegrityError as e: | |
| # # # session.rollback() | |
| # # # print(f"Database error: {e}") | |
| # # # continue | |
| # # # if DOMAIN: | |
| # # # images[filename] = f"https://{DOMAIN}/image/{unique_filename}" | |
| # # # else: | |
| # # # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}" | |
| # # # return output | |
| # # # @app.route('/image/<filename>', methods=['GET']) | |
| # # # def serve_image(filename): | |
| # # # image_record = session.query(Image).filter_by(filename=filename).first() | |
| # # # if image_record: | |
| # # # return image_record.data, 200, {'Content-Type': 'image/png'} | |
| # # # return jsonify({'error': 'Image not found'}), 404 | |
| # # # def handle_requests(): | |
| # # # global current_container_index | |
| # # # while True: | |
| # # # request_id, data = request_queue.get() | |
| # # # if data is None: | |
| # # # break | |
| # # # with lock: | |
| # # # container_name, port = containers[current_container_index] | |
| # # # current_container_index = (current_container_index + 1) % len(containers) | |
| # # # try: | |
| # # # if 'query_string' in data: | |
| # # # # GET 请求处理 | |
| # # # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT) | |
| # # # else: | |
| # # # # POST 请求处理 | |
| # # # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT) | |
| # # # output = response.json() | |
| # # # except requests.exceptions.Timeout: | |
| # # # output = {'error': 'Code execution timed out'} | |
| # # # reset_container(container_name, port) | |
| # # # except Exception as e: | |
| # # # output = {'error': str(e)} | |
| # # # result_dict[request_id] = output # 将结果放入结果字典 | |
| # # # request_queue.task_done() | |
| # # # print(f"Finished processing request {request_id}: {output}") | |
| # # # # Reset container after each request | |
| # # # reset_container(container_name, port) | |
| # # # def reset_container(container_name, port): | |
| # # # print(f"Resetting container {container_name}") | |
| # # # stop_container(container_name) | |
| # # # new_container_name, _ = start_container(port) | |
| # # # with lock: | |
| # # # for i, (name, p) in enumerate(containers): | |
| # # # if name == container_name: | |
| # # # containers[i] = (new_container_name, port) | |
| # # # break | |
| # # # def signal_handler(signal, frame): | |
| # # # print('Stopping containers and exiting program...') | |
| # # # stop_containers() | |
| # # # if session: | |
| # # # session.close() | |
| # # # engine.dispose() | |
| # # # os._exit(0) | |
| # # # signal.signal(signal.SIGINT, signal_handler) | |
| # # # worker_thread = threading.Thread(target=handle_requests) | |
| # # # worker_thread.start() | |
| # # # start_containers() | |
| # # # atexit.register(stop_containers) | |
| # # # if __name__ == '__main__': | |
| # # # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口 | |
| # # import yaml | |
| # # import subprocess | |
| # # import uuid | |
| # # from flask import Flask, request, jsonify | |
| # # from queue import Queue | |
| # # import requests | |
| # # import threading | |
| # # import time | |
| # # import os | |
| # # import atexit | |
| # # import base64 | |
| # # import signal | |
| # # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer | |
| # # from sqlalchemy.orm import declarative_base, sessionmaker | |
| # # from sqlalchemy.exc import OperationalError, IntegrityError | |
| # # import psycopg2 | |
| # # app = Flask(__name__) | |
| # # # 读取配置文件,指定编码为 utf-8 | |
| # # with open('config.yaml', 'r', encoding='utf-8') as file: | |
| # # config = yaml.safe_load(file) | |
| # # DOMAIN = config['domain'] | |
| # # INTERPRETER_IMAGE = config['interpreter_image'] | |
| # # PORT_START = config['interpreter_port_range']['start'] | |
| # # PORT_END = config['interpreter_port_range']['end'] | |
| # # DEPENDENCIES = config['dependencies'] | |
| # # RESOURCE_LIMITS = config.get('resource_limits', {}) | |
| # # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory') | |
| # # CPU_LIMIT = RESOURCE_LIMITS.get('cpus') | |
| # # POSTGRES = config['postgres'] | |
| # # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒 | |
| # # TIMEOUT_SECONDS = config.get('timeout_seconds', 60) # 从配置中读取超时时间 | |
| # # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口 | |
| # # # 容器管理 | |
| # # containers = [] | |
| # # ports = list(range(PORT_START, PORT_END + 1)) | |
| # # lock = threading.Lock() | |
| # # request_queue = Queue() | |
| # # result_dict = {} # 用于存储结果的字典 | |
| # # current_container_index = 0 | |
| # # semaphore = threading.Semaphore(len(ports)) # 控制并发请求的信号量 | |
| # # # 检查 PostgreSQL 容器是否存在 | |
| # # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter" | |
| # # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata') | |
| # # def is_postgres_container_running(): | |
| # # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True) | |
| # # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n') | |
| # # def wait_for_postgres_ready(): | |
| # # retry_attempts = 10 | |
| # # retry_delay = 5 # seconds | |
| # # for attempt in range(retry_attempts): | |
| # # try: | |
| # # conn = psycopg2.connect( | |
| # # dbname=POSTGRES['db'], | |
| # # user=POSTGRES['user'], | |
| # # password=POSTGRES['password'], | |
| # # host=POSTGRES['host'], | |
| # # port=POSTGRES['port'] | |
| # # ) | |
| # # conn.close() | |
| # # print("PostgreSQL is ready") | |
| # # return True | |
| # # except psycopg2.OperationalError as e: | |
| # # print(f"Waiting for PostgreSQL to be ready: {e}") | |
| # # time.sleep(retry_delay) | |
| # # return False | |
| # # if not is_postgres_container_running(): | |
| # # if not os.path.exists(POSTGRES_DATA_DIR): | |
| # # os.makedirs(POSTGRES_DATA_DIR) | |
| # # subprocess.run([ | |
| # # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d", | |
| # # "-e", f"POSTGRES_USER={POSTGRES['user']}", | |
| # # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}", | |
| # # "-e", f"POSTGRES_DB={POSTGRES['db']}", | |
| # # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data", | |
| # # "-p", f"{POSTGRES['port']}:5432", | |
| # # "postgres" | |
| # # ], check=True) | |
| # # if not wait_for_postgres_ready(): | |
| # # print("Failed to connect to PostgreSQL after multiple attempts.") | |
| # # exit(1) | |
| # # # PostgreSQL 配置 | |
| # # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}" | |
| # # engine = None | |
| # # Base = declarative_base() | |
| # # Session = None | |
| # # session = None | |
| # # def init_db(): | |
| # # global engine, Session, session | |
| # # retry_attempts = 5 | |
| # # retry_delay = 5 # seconds | |
| # # for attempt in range(retry_attempts): | |
| # # try: | |
| # # engine = create_engine(DATABASE_URL) | |
| # # Session = sessionmaker(bind=engine) | |
| # # session = Session() | |
| # # Base.metadata.create_all(engine) # 确保数据库模式已创建 | |
| # # print("Database connection successful") | |
| # # break | |
| # # except OperationalError as e: | |
| # # print(f"Database connection failed: {e}") | |
| # # if attempt < retry_attempts - 1: | |
| # # print(f"Retrying in {retry_delay} seconds...") | |
| # # time.sleep(retry_delay) | |
| # # else: | |
| # # print("Failed to connect to the database after multiple attempts.") | |
| # # raise | |
| # # class Image(Base): | |
| # # __tablename__ = 'images' | |
| # # id = Column(Integer, primary_key=True, autoincrement=True) | |
| # # filename = Column(String, unique=True, nullable=False) | |
| # # data = Column(LargeBinary, nullable=False) | |
| # # init_db() | |
| # # # 启动代码解释器容器 | |
| # # def start_container(port): | |
| # # container_name = f"code_interpreter_docker_{uuid.uuid4()}" | |
| # # run_command = [ | |
| # # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000" | |
| # # ] | |
| # # if MEMORY_LIMIT: | |
| # # run_command.extend(["--memory", MEMORY_LIMIT]) | |
| # # if CPU_LIMIT: | |
| # # run_command.extend(["--cpus", CPU_LIMIT]) | |
| # # run_command.append(INTERPRETER_IMAGE) | |
| # # subprocess.run(run_command, check=True) | |
| # # print(f"Started container {container_name} on port {port}") | |
| # # return container_name, port | |
| # # def stop_container(container_name): | |
| # # subprocess.run(["docker", "stop", container_name]) | |
| # # subprocess.run(["docker", "rm", container_name]) | |
| # # print(f"Stopped and removed container {container_name}") | |
| # # def start_containers(): | |
| # # for port in ports: | |
| # # container_name, port = start_container(port) | |
| # # containers.append((container_name, port)) | |
| # # def stop_containers(): | |
| # # for container_name, _ in containers: | |
| # # stop_container(container_name) | |
| # # @app.route('/runcode', methods=['POST', 'GET']) | |
| # # def run_code(): | |
| # # if request.method == 'POST': | |
| # # data = request.get_json() | |
| # # if not data: | |
| # # return jsonify({'error': 'Invalid JSON'}), 400 | |
| # # elif request.method == 'GET': | |
| # # query_string = request.query_string.decode('utf-8') | |
| # # data = {'query_string': query_string} | |
| # # else: | |
| # # return jsonify({'error': 'Invalid request method'}), 405 | |
| # # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID | |
| # # request_queue.put((request_id, data)) | |
| # # start_time = time.time() | |
| # # while time.time() - start_time < 10: # 最多等待10秒 | |
| # # if request_id in result_dict: | |
| # # output = result_dict.pop(request_id) | |
| # # # 处理返回结果,替换 base64 数据为链接 | |
| # # if 'images' in output: | |
| # # try: | |
| # # output = process_images(output) | |
| # # except Exception as e: | |
| # # return jsonify({'error': str(e)}), 500 | |
| # # return jsonify(output), 200 | |
| # # time.sleep(0.1) | |
| # # return jsonify({'error': 'Request timed out'}), 504 | |
| # # def process_images(output): | |
| # # images = output['images'] | |
| # # for filename, base64_data in images.items(): | |
| # # image_data = base64.b64decode(base64_data) | |
| # # unique_filename = f"{uuid.uuid4()}_{filename}" | |
| # # image_record = Image(filename=unique_filename, data=image_data) | |
| # # try: | |
| # # session.add(image_record) | |
| # # session.commit() | |
| # # except IntegrityError as e: | |
| # # session.rollback() | |
| # # print(f"Database error: {e}") | |
| # # continue | |
| # # if DOMAIN: | |
| # # images[filename] = f"https://{DOMAIN}/image/{unique_filename}" | |
| # # else: | |
| # # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}" | |
| # # return output | |
| # # @app.route('/image/<filename>', methods=['GET']) | |
| # # def serve_image(filename): | |
| # # image_record = session.query(Image).filter_by(filename=filename).first() | |
| # # if image_record: | |
| # # return image_record.data, 200, {'Content-Type': 'image/png'} | |
| # # return jsonify({'error': 'Image not found'}), 404 | |
| # # def handle_requests(): | |
| # # global current_container_index | |
| # # while True: | |
| # # request_id, data = request_queue.get() | |
| # # if data is None: | |
| # # break | |
| # # semaphore.acquire() # 获取信号量,确保不超过并发限制 | |
| # # with lock: | |
| # # container_name, port = containers[current_container_index] | |
| # # current_container_index = (current_container_index + 1) % len(containers) | |
| # # try: | |
| # # if 'query_string' in data: | |
| # # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT_SECONDS) | |
| # # else: | |
| # # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT_SECONDS) | |
| # # output = response.json() | |
| # # except requests.exceptions.Timeout: | |
| # # output = {'error': 'Code execution timed out'} | |
| # # except Exception as e: | |
| # # output = {'error': str(e)} | |
| # # result_dict[request_id] = output # 将结果放入结果字典 | |
| # # request_queue.task_done() | |
| # # print(f"Finished processing request {request_id}: {output}") | |
| # # # 异步重置容器 | |
| # # reset_container(container_name, port) | |
| # # def reset_container(container_name, port): | |
| # # threading.Thread(target=_reset_container, args=(container_name, port)).start() | |
| # # def _reset_container(container_name, port): | |
| # # print(f"Resetting container {container_name}") | |
| # # stop_container(container_name) | |
| # # new_container_name, _ = start_container(port) | |
| # # with lock: | |
| # # for i, (name, p) in enumerate(containers): | |
| # # if name == container_name: | |
| # # containers[i] = (new_container_name, port) | |
| # # break | |
| # # semaphore.release() # 释放信号量 | |
| # # def signal_handler(signal, frame): | |
| # # print('Stopping containers and exiting program...') | |
| # # stop_containers() | |
| # # if session: | |
| # # session.close() | |
| # # engine.dispose() | |
| # # os._exit(0) | |
| # # signal.signal(signal.SIGINT, signal_handler) | |
| # # worker_thread = threading.Thread(target=handle_requests) | |
| # # worker_thread.start() | |
| # # start_containers() | |
| # # atexit.register(stop_containers) | |
| # # if __name__ == '__main__': | |
| # # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口 | |
| # import yaml | |
| # import subprocess | |
| # import uuid | |
| # from flask import Flask, request, jsonify | |
| # from queue import Queue | |
| # import requests | |
| # import threading | |
| # import time | |
| # import os | |
| # import atexit | |
| # import base64 | |
| # import signal | |
| # from sqlalchemy import create_engine, Column, String, LargeBinary, Integer | |
| # from sqlalchemy.orm import declarative_base, sessionmaker | |
| # from sqlalchemy.exc import OperationalError, IntegrityError | |
| # import psycopg2 | |
| # app = Flask(__name__) | |
| # # 读取配置文件,指定编码为 utf-8 | |
| # with open('config.yaml', 'r', encoding='utf-8') as file: | |
| # config = yaml.safe_load(file) | |
| # DOMAIN = config['domain'] | |
| # INTERPRETER_IMAGE = config['interpreter_image'] | |
| # PORT_START = config['interpreter_port_range']['start'] | |
| # PORT_END = config['interpreter_port_range']['end'] | |
| # DEPENDENCIES = config['dependencies'] | |
| # RESOURCE_LIMITS = config.get('resource_limits', {}) | |
| # MEMORY_LIMIT = RESOURCE_LIMITS.get('memory') | |
| # CPU_LIMIT = RESOURCE_LIMITS.get('cpus') | |
| # POSTGRES = config['postgres'] | |
| # TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒 | |
| # TIMEOUT_SECONDS = config.get('timeout_seconds', 60) # 从配置中读取超时时间 | |
| # SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口 | |
| # # 容器管理 | |
| # containers = [] | |
| # ports = list(range(PORT_START, PORT_END + 1)) | |
| # lock = threading.Lock() | |
| # request_queue = Queue() | |
| # result_dict = {} # 用于存储结果的字典 | |
| # current_container_index = 0 | |
| # semaphore = threading.Semaphore(len(ports)) # 控制并发请求的信号量 | |
| # # 检查 PostgreSQL 容器是否存在 | |
| # POSTGRES_CONTAINER_NAME = "postgres_code_interpreter" | |
| # POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata') | |
| # def is_postgres_container_running(): | |
| # result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True) | |
| # return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n') | |
| # def wait_for_postgres_ready(): | |
| # retry_attempts = 10 | |
| # retry_delay = 5 # seconds | |
| # for attempt in range(retry_attempts): | |
| # try: | |
| # conn = psycopg2.connect( | |
| # dbname=POSTGRES['db'], | |
| # user=POSTGRES['user'], | |
| # password=POSTGRES['password'], | |
| # host=POSTGRES['host'], | |
| # port=POSTGRES['port'] | |
| # ) | |
| # conn.close() | |
| # print("PostgreSQL is ready") | |
| # return True | |
| # except psycopg2.OperationalError as e: | |
| # print(f"Waiting for PostgreSQL to be ready: {e}") | |
| # time.sleep(retry_delay) | |
| # return False | |
| # if not is_postgres_container_running(): | |
| # if not os.path.exists(POSTGRES_DATA_DIR): | |
| # os.makedirs(POSTGRES_DATA_DIR) | |
| # subprocess.run([ | |
| # "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d", | |
| # "-e", f"POSTGRES_USER={POSTGRES['user']}", | |
| # "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}", | |
| # "-e", f"POSTGRES_DB={POSTGRES['db']}", | |
| # "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data", | |
| # "-p", f"{POSTGRES['port']}:5432", | |
| # "postgres" | |
| # ], check=True) | |
| # if not wait_for_postgres_ready(): | |
| # print("Failed to connect to PostgreSQL after multiple attempts.") | |
| # exit(1) | |
| # # PostgreSQL 配置 | |
| # DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}" | |
| # engine = None | |
| # Base = declarative_base() | |
| # Session = None | |
| # session = None | |
| # def init_db(): | |
| # global engine, Session, session | |
| # retry_attempts = 5 | |
| # retry_delay = 5 # seconds | |
| # for attempt in range(retry_attempts): | |
| # try: | |
| # engine = create_engine(DATABASE_URL) | |
| # Session = sessionmaker(bind=engine) | |
| # session = Session() | |
| # Base.metadata.create_all(engine) # 确保数据库模式已创建 | |
| # print("Database connection successful") | |
| # break | |
| # except OperationalError as e: | |
| # print(f"Database connection failed: {e}") | |
| # if attempt < retry_attempts - 1: | |
| # print(f"Retrying in {retry_delay} seconds...") | |
| # time.sleep(retry_delay) | |
| # else: | |
| # print("Failed to connect to the database after multiple attempts.") | |
| # raise | |
| # class Image(Base): | |
| # __tablename__ = 'images' | |
| # id = Column(Integer, primary_key=True, autoincrement=True) | |
| # filename = Column(String, unique=True, nullable=False) | |
| # data = Column(LargeBinary, nullable=False) | |
| # init_db() | |
| # # 启动代码解释器容器 | |
| # def start_container(port): | |
| # container_name = f"code_interpreter_docker_{uuid.uuid4()}" | |
| # run_command = [ | |
| # "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000" | |
| # ] | |
| # if MEMORY_LIMIT: | |
| # run_command.extend(["--memory", MEMORY_LIMIT]) | |
| # if CPU_LIMIT: | |
| # run_command.extend(["--cpus", CPU_LIMIT]) | |
| # run_command.append(INTERPRETER_IMAGE) | |
| # subprocess.run(run_command, check=True) | |
| # print(f"Started container {container_name} on port {port}") | |
| # return container_name, port | |
| # def stop_container(container_name): | |
| # subprocess.run(["docker", "stop", container_name]) | |
| # subprocess.run(["docker", "rm", container_name]) | |
| # print(f"Stopped and removed container {container_name}") | |
| # def start_containers(): | |
| # for port in ports: | |
| # container_name, port = start_container(port) | |
| # containers.append((container_name, port)) | |
| # def stop_containers(): | |
| # for container_name, _ in containers: | |
| # stop_container(container_name) | |
| # @app.route('/runcode', methods=['POST', 'GET']) | |
| # def run_code(): | |
| # if request.method == 'POST': | |
| # data = request.get_json() | |
| # if not data: | |
| # return jsonify({'error': 'Invalid JSON'}), 400 | |
| # elif request.method == 'GET': | |
| # query_string = request.query_string.decode('utf-8') | |
| # data = {'query_string': query_string} | |
| # else: | |
| # return jsonify({'error': 'Invalid request method'}), 405 | |
| # request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID | |
| # request_queue.put((request_id, data)) | |
| # start_time = time.time() | |
| # while time.time() - start_time < 10: # 最多等待10秒 | |
| # if request_id in result_dict: | |
| # output = result_dict.pop(request_id) | |
| # # 处理返回结果,替换 base64 数据为链接 | |
| # if 'images' in output: | |
| # try: | |
| # output = process_images(output) | |
| # except Exception as e: | |
| # return jsonify({'error': str(e)}), 500 | |
| # return jsonify(output), 200 | |
| # time.sleep(0.1) | |
| # return jsonify({'error': 'Request timed out'}), 504 | |
| # def process_images(output): | |
| # images = output['images'] | |
| # for filename, base64_data in images.items(): | |
| # image_data = base64.b64decode(base64_data) | |
| # unique_filename = f"{uuid.uuid4()}_{filename}" | |
| # image_record = Image(filename=unique_filename, data=image_data) | |
| # try: | |
| # session.add(image_record) | |
| # session.commit() | |
| # except IntegrityError as e: | |
| # session.rollback() | |
| # print(f"Database error: {e}") | |
| # continue | |
| # if DOMAIN: | |
| # images[filename] = f"https://{DOMAIN}/image/{unique_filename}" | |
| # else: | |
| # images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}" | |
| # return output | |
| # @app.route('/image/<filename>', methods=['GET']) | |
| # def serve_image(filename): | |
| # image_record = session.query(Image).filter_by(filename=filename).first() | |
| # if image_record: | |
| # return image_record.data, 200, {'Content-Type': 'image/png'} | |
| # return jsonify({'error': 'Image not found'}), 404 | |
| # def handle_requests(): | |
| # global current_container_index | |
| # while True: | |
| # request_id, data = request_queue.get() | |
| # if data is None: | |
| # break | |
| # semaphore.acquire() # 获取信号量,确保不超过并发限制 | |
| # with lock: | |
| # container_name, port = containers[current_container_index] | |
| # current_container_index = (current_container_index + 1) % len(containers) | |
| # try: | |
| # if 'query_string' in data: | |
| # response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT_SECONDS) | |
| # else: | |
| # response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT_SECONDS) | |
| # output = response.json() | |
| # except requests.exceptions.Timeout: | |
| # output = {'error': 'Code execution timed out'} | |
| # except Exception as e: | |
| # output = {'error': str(e)} | |
| # result_dict[request_id] = output # 将结果放入结果字典 | |
| # request_queue.task_done() | |
| # print(f"Finished processing request {request_id}: {output}") | |
| # # 异步重置容器 | |
| # reset_container(container_name, port) | |
| # def reset_container(container_name, port): | |
| # threading.Thread(target=_reset_container, args=(container_name, port)).start() | |
| # def _reset_container(container_name, port): | |
| # print(f"Resetting container {container_name}") | |
| # stop_container(container_name) | |
| # new_container_name, _ = start_container(port) | |
| # with lock: | |
| # for i, (name, p) in enumerate(containers): | |
| # if name == container_name: | |
| # containers[i] = (new_container_name, port) | |
| # break | |
| # semaphore.release() # 释放信号量 | |
| # def signal_handler(signal, frame): | |
| # print('Stopping containers and exiting program...') | |
| # stop_containers() | |
| # if session: | |
| # session.close() | |
| # engine.dispose() | |
| # os._exit(0) | |
| # signal.signal(signal.SIGINT, signal_handler) | |
| # # 启动多个工作线程来处理请求 | |
| # for _ in range(len(ports)): | |
| # threading.Thread(target=handle_requests).start() | |
| # start_containers() | |
| # atexit.register(stop_containers) | |
| # if __name__ == '__main__': | |
| # app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口 | |
| import yaml | |
| import subprocess | |
| import uuid | |
| from flask import Flask, request, jsonify | |
| from queue import Queue | |
| import requests | |
| import threading | |
| import time | |
| import os | |
| import atexit | |
| import base64 | |
| import signal | |
| from sqlalchemy import create_engine, Column, String, LargeBinary, Integer | |
| from sqlalchemy.orm import declarative_base, sessionmaker | |
| from sqlalchemy.exc import OperationalError, IntegrityError | |
| import psycopg2 | |
| app = Flask(__name__) | |
| # 读取配置文件,指定编码为 utf-8 | |
| with open('config.yaml', 'r', encoding='utf-8') as file: | |
| config = yaml.safe_load(file) | |
| DOMAIN = config['domain'] | |
| INTERPRETER_IMAGE = config['interpreter_image'] | |
| PORT_START = config['interpreter_port_range']['start'] | |
| PORT_END = config['interpreter_port_range']['end'] | |
| DEPENDENCIES = config['dependencies'] | |
| RESOURCE_LIMITS = config.get('resource_limits', {}) | |
| MEMORY_LIMIT = RESOURCE_LIMITS.get('memory') | |
| CPU_LIMIT = RESOURCE_LIMITS.get('cpus') | |
| POSTGRES = config['postgres'] | |
| TIMEOUT = config.get('timeout', 30) # 默认超时时间为30秒 | |
| TIMEOUT_SECONDS = config.get('timeout_seconds', 60) # 从配置中读取超时时间 | |
| SCHEDULER_PORT = config['scheduler_port'] # 新增调度中心端口 | |
| # 容器管理 | |
| containers = [] | |
| ports = list(range(PORT_START, PORT_END + 1)) | |
| lock = threading.Lock() | |
| request_queue = Queue() | |
| result_dict = {} # 用于存储结果的字典 | |
| current_container_index = 0 | |
| semaphore = threading.Semaphore(len(ports)) # 控制并发请求的信号量 | |
| # 检查 PostgreSQL 容器是否存在 | |
| POSTGRES_CONTAINER_NAME = "postgres_code_interpreter" | |
| POSTGRES_DATA_DIR = os.path.join(os.getcwd(), 'pgdata') | |
| def is_postgres_container_running(): | |
| result = subprocess.run(["docker", "ps", "-a", "--filter", f"name={POSTGRES_CONTAINER_NAME}", "--format", "{{.Names}}"], capture_output=True, text=True) | |
| return POSTGRES_CONTAINER_NAME in result.stdout.strip().split('\n') | |
| def wait_for_postgres_ready(): | |
| retry_attempts = 10 | |
| retry_delay = 5 # seconds | |
| for attempt in range(retry_attempts): | |
| try: | |
| conn = psycopg2.connect( | |
| dbname=POSTGRES['db'], | |
| user=POSTGRES['user'], | |
| password=POSTGRES['password'], | |
| host=POSTGRES['host'], | |
| port=POSTGRES['port'] | |
| ) | |
| conn.close() | |
| print("PostgreSQL is ready") | |
| return True | |
| except psycopg2.OperationalError as e: | |
| print(f"Waiting for PostgreSQL to be ready: {e}") | |
| time.sleep(retry_delay) | |
| return False | |
| if not is_postgres_container_running(): | |
| if not os.path.exists(POSTGRES_DATA_DIR): | |
| os.makedirs(POSTGRES_DATA_DIR) | |
| subprocess.run([ | |
| "docker", "run", "--name", POSTGRES_CONTAINER_NAME, "-d", | |
| "-e", f"POSTGRES_USER={POSTGRES['user']}", | |
| "-e", f"POSTGRES_PASSWORD={POSTGRES['password']}", | |
| "-e", f"POSTGRES_DB={POSTGRES['db']}", | |
| "-v", f"{POSTGRES_DATA_DIR}:/var/lib/postgresql/data", | |
| "-p", f"{POSTGRES['port']}:5432", | |
| "postgres" | |
| ], check=True) | |
| if not wait_for_postgres_ready(): | |
| print("Failed to connect to PostgreSQL after multiple attempts.") | |
| exit(1) | |
| # PostgreSQL 配置 | |
| DATABASE_URL = f"postgresql://{POSTGRES['user']}:{POSTGRES['password']}@{POSTGRES['host']}:{POSTGRES['port']}/{POSTGRES['db']}" | |
| engine = None | |
| Base = declarative_base() | |
| Session = None | |
| session = None | |
| def init_db(): | |
| global engine, Session, session | |
| retry_attempts = 5 | |
| retry_delay = 5 # seconds | |
| for attempt in range(retry_attempts): | |
| try: | |
| engine = create_engine(DATABASE_URL) | |
| Session = sessionmaker(bind=engine) | |
| session = Session() | |
| Base.metadata.create_all(engine) # 确保数据库模式已创建 | |
| print("Database connection successful") | |
| break | |
| except OperationalError as e: | |
| print(f"Database connection failed: {e}") | |
| if attempt < retry_attempts - 1: | |
| print(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| print("Failed to connect to the database after multiple attempts.") | |
| raise | |
| class Image(Base): | |
| __tablename__ = 'images' | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| filename = Column(String, unique=True, nullable=False) | |
| data = Column(LargeBinary, nullable=False) | |
| init_db() | |
| # 启动代码解释器容器 | |
| def start_container(port): | |
| container_name = f"code_interpreter_docker_{uuid.uuid4()}" | |
| run_command = [ | |
| "docker", "run", "--name", container_name, "-d", "-p", f"{port}:5000" | |
| ] | |
| if MEMORY_LIMIT: | |
| run_command.extend(["--memory", MEMORY_LIMIT]) | |
| if CPU_LIMIT: | |
| run_command.extend(["--cpus", CPU_LIMIT]) | |
| run_command.append(INTERPRETER_IMAGE) | |
| subprocess.run(run_command, check=True) | |
| print(f"Started container {container_name} on port {port}") | |
| return container_name, port | |
| def stop_container(container_name): | |
| subprocess.run(["docker", "stop", container_name]) | |
| subprocess.run(["docker", "rm", container_name]) | |
| print(f"Stopped and removed container {container_name}") | |
| def start_containers(): | |
| for port in ports: | |
| container_name, port = start_container(port) | |
| containers.append((container_name, port)) | |
| def stop_containers(): | |
| for container_name, _ in containers: | |
| stop_container(container_name) | |
| def run_code(): | |
| if request.method == 'POST': | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'Invalid JSON'}), 400 | |
| elif request.method == 'GET': | |
| query_string = request.query_string.decode('utf-8') | |
| data = {'query_string': query_string} | |
| else: | |
| return jsonify({'error': 'Invalid request method'}), 405 | |
| request_id = str(uuid.uuid4()) # 生成一个唯一的请求ID | |
| request_queue.put((request_id, data)) | |
| start_time = time.time() | |
| while time.time() - start_time < 10: # 最多等待10秒 | |
| if request_id in result_dict: | |
| output = result_dict.pop(request_id) | |
| # 处理返回结果,替换 base64 数据为链接 | |
| if 'images' in output: | |
| try: | |
| output = process_images(output) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| return jsonify(output), 200 | |
| time.sleep(0.1) | |
| return jsonify({'error': 'Request timed out'}), 504 | |
| def process_images(output): | |
| images = output['images'] | |
| for filename, base64_data in images.items(): | |
| image_data = base64.b64decode(base64_data) | |
| unique_filename = f"{uuid.uuid4()}_{filename}" | |
| image_record = Image(filename=unique_filename, data=image_data) | |
| try: | |
| session.add(image_record) | |
| session.commit() | |
| except IntegrityError as e: | |
| session.rollback() | |
| print(f"Database error: {e}") | |
| continue | |
| if DOMAIN: | |
| images[filename] = f"https://{DOMAIN}/image/{unique_filename}" | |
| else: | |
| images[filename] = f"http://127.0.0.1:{SCHEDULER_PORT}/image/{unique_filename}" | |
| return output | |
| def serve_image(filename): | |
| image_record = session.query(Image).filter_by(filename=filename).first() | |
| if image_record: | |
| return image_record.data, 200, {'Content-Type': 'image/png'} | |
| return jsonify({'error': 'Image not found'}), 404 | |
| def handle_requests(): | |
| global current_container_index | |
| while True: | |
| request_id, data = request_queue.get() | |
| if data is None: | |
| break | |
| semaphore.acquire() # 获取信号量,确保不超过并发限制 | |
| with lock: | |
| container_name, port = containers[current_container_index] | |
| current_container_index = (current_container_index + 1) % len(containers) | |
| try: | |
| if 'query_string' in data: | |
| response = requests.get(f"http://localhost:{port}/runcode?{data['query_string']}", timeout=TIMEOUT_SECONDS) | |
| else: | |
| response = requests.post(f"http://localhost:{port}/runcode", json=data, timeout=TIMEOUT_SECONDS) | |
| output = response.json() | |
| except requests.exceptions.Timeout: | |
| output = {'error': 'Code execution timed out'} | |
| except Exception as e: | |
| output = {'error': str(e)} | |
| result_dict[request_id] = output # 将结果放入结果字典 | |
| request_queue.task_done() | |
| print(f"Finished processing request {request_id}: {output}") | |
| # 异步重置容器 | |
| reset_container(container_name, port) | |
| def reset_container(container_name, port): | |
| threading.Thread(target=_reset_container, args=(container_name, port)).start() | |
| def _reset_container(container_name, port): | |
| print(f"Resetting container {container_name}") | |
| stop_container(container_name) | |
| new_container_name, _ = start_container(port) | |
| with lock: | |
| for i, (name, p) in enumerate(containers): | |
| if name == container_name: | |
| containers[i] = (new_container_name, port) | |
| break | |
| semaphore.release() # 释放信号量 | |
| def signal_handler(signal, frame): | |
| print('Stopping containers and exiting program...') | |
| stop_containers() | |
| if session: | |
| session.close() | |
| engine.dispose() | |
| os._exit(0) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| # 启动多个工作线程来处理请求 | |
| for _ in range(len(ports)): | |
| threading.Thread(target=handle_requests).start() | |
| start_containers() | |
| atexit.register(stop_containers) | |
| if __name__ == '__main__': | |
| app.run(port=SCHEDULER_PORT, threaded=True) # 使用调度中心端口 |