from flask import Flask, request, jsonify import os import base64 import subprocess import copy from flask import Flask, send_file, abort from pypdf import PdfWriter, PdfReader from pypdf.generic import RectangleObject import sys import shutil import string from flask_cors import CORS ######################################## 默认配置 ######################################## port_num = int(os.environ.get("PORT", 8888)) # 设置端口号: 默认为8888 pdf2zh = "babeldoc" # 设置pdf2zh指令: 默认为'pdf2zh' ######### 可以在Zotero偏好设置中配置以下参数, Zotero配置会覆盖本文件中的配置参数 ######### thread_num = 4 # 设置线程数: 默认为4 service = "bing" # 设置翻译服务: 默认为bing translated_dir = "./translated/" # 设置翻译文件的输出路径(临时路径, 可以在翻译后删除) config_path = "./config.toml" # 设置PDF2zh配置文件路径 source_languages = "en" # 设置源语言 target_languages = "zh" # 设置目标语言 global_translated_dir = translated_dir # 从环境变量读取OpenAI配置 openai_base_url = os.environ.get("OPENAI_BASE_URL", "") openai_model = os.environ.get("OPENAI_MODEL", "gpt-4o") openai_api_key = os.environ.get("OPENAI_API_KEY", "") model_type = openai_model # 用于判断模型类型 claude_api_key = os.environ.get("CLAUDE_API_KEY", "") ########################################################################################## class Config: def __init__(self, request): self.thread_num = request.get_json().get("threadNum") if self.thread_num == None or self.thread_num == "": self.thread_num = thread_num self.service = request.get_json().get("engine") if self.service == None or self.service == "": self.service = service self.source_languages = request.get_json().get("sourceLanguages") if self.source_languages == None or self.source_languages == "": self.source_languages = source_languages self.target_languages = request.get_json().get("targetLanguages") if self.target_languages == None or self.target_languages == "": self.target_languages = target_languages self.translated_dir = request.get_json().get("outputPath") if self.translated_dir == None or self.translated_dir == "": self.translated_dir = translated_dir self.translated_dir = get_absolute_path(self.translated_dir) os.makedirs(self.translated_dir, exist_ok=True) self.config_path = request.get_json().get("configPath") if self.config_path == None or self.config_path == "": self.config_path = config_path self.config_path = get_absolute_path(self.config_path) self.mono_cut = request.get_json().get("mono_cut") self.dual_cut = request.get_json().get("dual_cut") self.compare = request.get_json().get("compare") print("outputPath: ", self.translated_dir) print("configPath: ", self.config_path) global global_translated_dir global_translated_dir = self.translated_dir def get_absolute_path(path): if os.path.isabs(path): return path else: return os.path.abspath(path) def get_file_from_request(request): config = Config(request) data = request.get_json() path = data.get("filePath") print("filePath: ", path) path = path.replace('\\', '/') # 把所有反斜杠\替换为正斜杠/ (Windows->Linux/MacOS) file_content = data.get("fileContent") input_path = os.path.join(config.translated_dir, os.path.basename(path)) input_path = get_absolute_path(input_path) print("input path: ", input_path) if file_content: if file_content.startswith( "data:application/pdf;base64," ): # 移除 Base64 编码中的前缀(如果有) file_content = file_content[len("data:application/pdf;base64,") :] file_data = base64.b64decode(file_content) # 解码 Base64 内容 with open(input_path, "wb") as f: f.write(file_data) return input_path, config def translate_pdf(input_path, config): print("\n############# Translating #############") print("## translate file path ## : ", input_path) print("## translated_dir ## : ", config.translated_dir) print("## config_path ## : ", config.config_path) try: # 检查是否存在本地配置文件 local_config_exists = os.path.exists(config.config_path) print("## local_config_exists ## : ", local_config_exists) # 判断是否在部署环境中运行(通过检查环境变量) is_deployed_env = os.environ.get("OPENAI_BASE_URL") or os.environ.get("OPENAI_API_KEY") print("## is_deployed_env ## : ", is_deployed_env) # 如果在部署环境中运行且有环境变量配置,则使用环境变量生成config.toml if is_deployed_env and not local_config_exists: try: # 如果环境变量中有配置,则使用环境变量中的配置生成config.toml template_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.toml.template") print("## template_path ## : ", template_path) with open(template_path, "r") as template_file: template_content = template_file.read() # 替换模板中的环境变量 template = string.Template(template_content) config_content = template.substitute({ "OPENAI_BASE_URL": openai_base_url, "OPENAI_MODEL": openai_model, "OPENAI_API_KEY": openai_api_key }) # 写入配置文件 config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), config_path) print("## config_file_path ## : ", config_file_path) with open(config_file_path, "w") as config_file: config_file.write(config_content) # 确保使用配置文件 config.config_path = config_file_path print("## 配置文件已生成 ## : ", config.config_path) except Exception as e: print(f"## 生成配置文件时出错 ## : {str(e)}") raise # 确保翻译目录存在 os.makedirs(config.translated_dir, exist_ok=True) print(f"## 确保翻译目录存在 ## : {config.translated_dir}") # 执行pdf2zh翻译, 用户可以自定义命令内容: if not os.path.exists(config.config_path): command = [ pdf2zh, input_path, "--t", str(config.thread_num), "--output", config.translated_dir, "--service", config.service, "--lang-in", config.source_languages, "--lang-out", config.target_languages, ] # 如果设置了API密钥,添加到命令中 if model_type.lower().startswith("gpt") and openai_api_key: command.extend(["--openai-api-key", openai_api_key, "--openai-model", model_type]) elif model_type.lower().startswith("claude") and claude_api_key: command.extend(["--claude-api-key", claude_api_key, "--claude-model", model_type]) else: command = [pdf2zh, "-c", config.config_path, "--files", input_path] print("## 执行命令 ## : ", " ".join(command)) # 使用Popen而不是run,以便实时显示输出(包括进度条) process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) # 实时读取并显示输出 stdout_lines = [] stderr_lines = [] while True: stdout_line = process.stdout.readline() stderr_line = process.stderr.readline() if stdout_line: print(stdout_line.strip()) stdout_lines.append(stdout_line) if stderr_line: print(stderr_line.strip()) stderr_lines.append(stderr_line) # 检查进程是否结束 if process.poll() is not None: # 读取剩余输出 for line in process.stdout: print(line.strip()) stdout_lines.append(line) for line in process.stderr: print(line.strip()) stderr_lines.append(line) break # 获取返回码 returncode = process.returncode stdout = ''.join(stdout_lines) stderr = ''.join(stderr_lines) print(f"## 命令执行结果 ## : 返回码={returncode}") if returncode != 0: error_msg = f"命令执行失败,返回码: {returncode}, 错误: {stderr}" print(f"## 详细错误信息 ## : {error_msg}") raise Exception(error_msg) # 检查输出文件 expected_mono = os.path.join( config.translated_dir, os.path.basename(input_path).replace(".pdf", ".zh.mono.pdf"), ) expected_dual = os.path.join( config.translated_dir, os.path.basename(input_path).replace(".pdf", ".zh.dual.pdf"), ) print(f"## 检查输出文件 ## : mono={expected_mono}, dual={expected_dual}") print(f"## 文件存在检查 ## : mono存在={os.path.exists(expected_mono)}, dual存在={os.path.exists(expected_dual)}") # 执行 mv 命令 mono = os.path.join( config.translated_dir, os.path.basename(input_path).replace(".pdf", "-mono.pdf") ) dual = os.path.join( config.translated_dir, os.path.basename(input_path).replace(".pdf", "-dual.pdf") ) try: if os.path.exists(expected_mono): shutil.move(expected_mono, mono) print(f"## 移动文件成功 ## : {expected_mono} -> {mono}") else: raise Exception(f"源文件不存在: {expected_mono}") if os.path.exists(expected_dual): shutil.move(expected_dual, dual) print(f"## 移动文件成功 ## : {expected_dual} -> {dual}") else: raise Exception(f"源文件不存在: {expected_dual}") except Exception as e: print(f"## 移动文件时出错 ## : {str(e)}") raise if not os.path.exists(mono) or not os.path.exists(dual): raise Exception("[Failed to generate translated files]: " + mono + ", " + dual) print("[mono file generated]: ", mono) print("[dual file generated]: ", dual) return mono, dual except Exception as e: print(f"## translate_pdf函数出错 ## : {str(e)}") # 重新抛出异常,以便上层函数可以捕获 raise app = Flask(__name__) CORS(app, resources={r"/*": {"origins": "*"}}) # 允许所有来源的跨域请求 @app.route("/translate", methods=["POST"]) def translate(): try: print("\n############# 开始翻译请求处理 #############") input_path, config = get_file_from_request(request) print(f"## 获取到输入文件 ## : {input_path}") mono, dual = translate_pdf(input_path, config) print(f"## 翻译完成 ## : mono={mono}, dual={dual}") if config.mono_cut and config.mono_cut == "true": try: path = mono.replace("-mono.pdf", "-mono-cut.pdf") print(f"## 开始切割mono文件 ## : {mono} -> {path}") split_and_merge_pdf(mono, path, compare=False) if not os.path.exists(path): raise Exception("[Failed to generate cutted files]: " + path) print("[mono-cut file generated]: ", path) except Exception as e: print(f"## 切割mono文件出错 ## : {str(e)}") raise if config.dual_cut and config.dual_cut == "true": try: path = dual.replace("-dual.pdf", "-dual-cut.pdf") print(f"## 开始切割dual文件 ## : {dual} -> {path}") split_and_merge_pdf(dual, path, compare=False) if not os.path.exists(path): raise Exception("[Failed to generate cutted files]: " + path) print("[dual-cut file generated]: ", path) except Exception as e: print(f"## 切割dual文件出错 ## : {str(e)}") raise if config.compare and config.compare == "true": try: path = dual.replace("-dual.pdf", "-compare.pdf") print(f"## 开始生成对比文件 ## : {dual} -> {path}") split_and_merge_pdf(dual, path, compare=True) if not os.path.exists(path): raise Exception("[Failed to generate compare files]: " + path) print("[compare file generated]: ", path) except Exception as e: print(f"## 生成对比文件出错 ## : {str(e)}") raise return jsonify({"status": "success"}), 200 except Exception as e: print(f"[Translate Error]: {e}") # 返回更详细的错误信息 error_message = str(e) traceback_info = sys.exc_info() if traceback_info[2]: import traceback traceback_str = "".join(traceback.format_tb(traceback_info[2])) print(f"## 错误堆栈 ## : {traceback_str}") error_message = f"{error_message}\n{traceback_str}" return jsonify({"status": "error", "message": error_message}), 500 @app.route("/translatedFile/") def download(filename): print("\n############# Downloading #############") file_path = os.path.join(get_absolute_path(global_translated_dir), filename) if not os.path.isfile(file_path): print("[Download File not found]: ", file_path) return "[Download File not found]: " + file_path, 404 print("[Download file]: ", file_path) return send_file(file_path, as_attachment=True, download_name=filename) # 工具函数, 用于切割双栏pdf文件 def split_and_merge_pdf(input_pdf, output_pdf, compare=False): writer = PdfWriter() if "dual" in input_pdf: readers = [PdfReader(input_pdf) for _ in range(4)] for i in range(0, len(readers[0].pages), 2): original_media_box = readers[0].pages[i].mediabox width = original_media_box.width height = original_media_box.height left_page_1 = readers[0].pages[i] for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: setattr(left_page_1, box, RectangleObject((0, 0, width / 2, height))) left_page_2 = readers[1].pages[i + 1] for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: setattr(left_page_2, box, RectangleObject((0, 0, width / 2, height))) right_page_1 = readers[2].pages[i] for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: setattr( right_page_1, box, RectangleObject((width / 2, 0, width, height)) ) right_page_2 = readers[3].pages[i + 1] for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: setattr( right_page_2, box, RectangleObject((width / 2, 0, width, height)) ) if compare == True: blank_page_1 = writer.add_blank_page(width, height) blank_page_1.merge_transformed_page(left_page_1, (1, 0, 0, 1, 0, 0)) blank_page_1.merge_transformed_page( left_page_2, (1, 0, 0, 1, width / 2, 0) ) blank_page_2 = writer.add_blank_page(width, height) blank_page_2.merge_transformed_page( right_page_1, (1, 0, 0, 1, -width / 2, 0) ) blank_page_2.merge_transformed_page(right_page_2, (1, 0, 0, 1, 0, 0)) else: writer.add_page(left_page_1) writer.add_page(left_page_2) writer.add_page(right_page_1) writer.add_page(right_page_2) else: readers = [PdfReader(input_pdf) for _ in range(2)] for i in range(len(readers[0].pages)): page = readers[0].pages[i] original_media_box = page.mediabox width = original_media_box.width height = original_media_box.height left_page = readers[0].pages[i] left_page.mediabox = RectangleObject((0, 0, width / 2, height)) right_page = readers[1].pages[i] right_page.mediabox = RectangleObject((width / 2, 0, width, height)) writer.add_page(left_page) writer.add_page(right_page) with open(output_pdf, "wb") as output_file: writer.write(output_file) # 用于切割双栏pdf文件 @app.route("/cut", methods=["POST"]) def cut(): print("\n############# Cutting #############") input_path, config = get_file_from_request(request) try: translated_path = os.path.join( config.translated_dir, os.path.basename(input_path).replace(".pdf", "-cut.pdf"), ) split_and_merge_pdf(input_path, translated_path) if not os.path.exists(translated_path): raise Exception("[Failed to generate cut files]: ", translated_path) print("[Cut file generated]: ", translated_path) return jsonify({"status": "success"}), 200 except Exception as e: print(f"[Cut File Error]: {e}") # 返回更详细的错误信息 error_message = str(e) traceback_info = sys.exc_info() if traceback_info[2]: import traceback traceback_str = "".join(traceback.format_tb(traceback_info[2])) print(f"## 错误堆栈 ## : {traceback_str}") error_message = f"{error_message}\n{traceback_str}" return jsonify({"status": "error", "message": error_message}), 500 # 用于生成中英对照文件 @app.route("/cut-compare", methods=["POST"]) def cut_compare(): print("\n############# Comparing #############") try: input_path, config = get_file_from_request(request) print(f"## 获取到输入文件 ## : {input_path}") if "dual" in input_path: translated_path = os.path.join( config.translated_dir, os.path.basename(input_path).replace(".pdf", "-compare.pdf"), ) print(f"## 直接生成对比文件 ## : {input_path} -> {translated_path}") # 确保翻译目录存在 os.makedirs(os.path.dirname(translated_path), exist_ok=True) split_and_merge_pdf(input_path, translated_path, compare=True) else: print(f"## 需要先翻译再生成对比文件 ## : {input_path}") _, dual = translate_pdf(input_path, config) translated_path = dual.replace("-dual.pdf", "-compare.pdf") print(f"## 生成对比文件 ## : {dual} -> {translated_path}") split_and_merge_pdf(dual, translated_path, compare=True) if not os.path.exists(translated_path): raise Exception("[Failed to generate cutted file]: " + translated_path) print("[Compare file generated]: ", translated_path) return jsonify({"status": "success"}), 200 except Exception as e: print(f"[cut_compare() Error]: {e}") # 返回更详细的错误信息 error_message = str(e) traceback_info = sys.exc_info() if traceback_info[2]: import traceback traceback_str = "".join(traceback.format_tb(traceback_info[2])) print(f"## 错误堆栈 ## : {traceback_str}") error_message = f"{error_message}\n{traceback_str}" return jsonify({"status": "error", "message": error_message}), 500 if __name__ == "__main__": if len(sys.argv) > 1: port_num = int(sys.argv[1]) app.run(host="0.0.0.0", port=port_num)