Spaces:
Build error
Build error
| from flask import Flask, request, jsonify | |
| import os | |
| import base64 | |
| import subprocess | |
| import copy | |
| from flask import Flask, send_file, abort | |
| from pypdf import PdfWriter, PdfReader | |
| from pypdf.generic import RectangleObject | |
| import sys | |
| import shutil | |
| import string | |
| from flask_cors import CORS | |
| ######################################## 默认配置 ######################################## | |
| port_num = int(os.environ.get("PORT", 8888)) # 设置端口号: 默认为8888 | |
| pdf2zh = "babeldoc" # 设置pdf2zh指令: 默认为'pdf2zh' | |
| ######### 可以在Zotero偏好设置中配置以下参数, Zotero配置会覆盖本文件中的配置参数 ######### | |
| thread_num = 4 # 设置线程数: 默认为4 | |
| service = "bing" # 设置翻译服务: 默认为bing | |
| translated_dir = "./translated/" # 设置翻译文件的输出路径(临时路径, 可以在翻译后删除) | |
| config_path = "./config.toml" # 设置PDF2zh配置文件路径 | |
| source_languages = "en" # 设置源语言 | |
| target_languages = "zh" # 设置目标语言 | |
| global_translated_dir = translated_dir | |
| # 从环境变量读取OpenAI配置 | |
| openai_base_url = os.environ.get("OPENAI_BASE_URL", "") | |
| openai_model = os.environ.get("OPENAI_MODEL", "gpt-4o") | |
| openai_api_key = os.environ.get("OPENAI_API_KEY", "") | |
| model_type = openai_model # 用于判断模型类型 | |
| claude_api_key = os.environ.get("CLAUDE_API_KEY", "") | |
| ########################################################################################## | |
| class Config: | |
| def __init__(self, request): | |
| self.thread_num = request.get_json().get("threadNum") | |
| if self.thread_num == None or self.thread_num == "": | |
| self.thread_num = thread_num | |
| self.service = request.get_json().get("engine") | |
| if self.service == None or self.service == "": | |
| self.service = service | |
| self.source_languages = request.get_json().get("sourceLanguages") | |
| if self.source_languages == None or self.source_languages == "": | |
| self.source_languages = source_languages | |
| self.target_languages = request.get_json().get("targetLanguages") | |
| if self.target_languages == None or self.target_languages == "": | |
| self.target_languages = target_languages | |
| self.translated_dir = request.get_json().get("outputPath") | |
| if self.translated_dir == None or self.translated_dir == "": | |
| self.translated_dir = translated_dir | |
| self.translated_dir = get_absolute_path(self.translated_dir) | |
| os.makedirs(self.translated_dir, exist_ok=True) | |
| self.config_path = request.get_json().get("configPath") | |
| if self.config_path == None or self.config_path == "": | |
| self.config_path = config_path | |
| self.config_path = get_absolute_path(self.config_path) | |
| self.mono_cut = request.get_json().get("mono_cut") | |
| self.dual_cut = request.get_json().get("dual_cut") | |
| self.compare = request.get_json().get("compare") | |
| print("outputPath: ", self.translated_dir) | |
| print("configPath: ", self.config_path) | |
| global global_translated_dir | |
| global_translated_dir = self.translated_dir | |
| def get_absolute_path(path): | |
| if os.path.isabs(path): | |
| return path | |
| else: | |
| return os.path.abspath(path) | |
| def get_file_from_request(request): | |
| config = Config(request) | |
| data = request.get_json() | |
| path = data.get("filePath") | |
| print("filePath: ", path) | |
| path = path.replace('\\', '/') # 把所有反斜杠\替换为正斜杠/ (Windows->Linux/MacOS) | |
| file_content = data.get("fileContent") | |
| input_path = os.path.join(config.translated_dir, os.path.basename(path)) | |
| input_path = get_absolute_path(input_path) | |
| print("input path: ", input_path) | |
| if file_content: | |
| if file_content.startswith( | |
| "data:application/pdf;base64," | |
| ): # 移除 Base64 编码中的前缀(如果有) | |
| file_content = file_content[len("data:application/pdf;base64,") :] | |
| file_data = base64.b64decode(file_content) # 解码 Base64 内容 | |
| with open(input_path, "wb") as f: | |
| f.write(file_data) | |
| return input_path, config | |
| def translate_pdf(input_path, config): | |
| print("\n############# Translating #############") | |
| print("## translate file path ## : ", input_path) | |
| print("## translated_dir ## : ", config.translated_dir) | |
| print("## config_path ## : ", config.config_path) | |
| try: | |
| # 检查是否存在本地配置文件 | |
| local_config_exists = os.path.exists(config.config_path) | |
| print("## local_config_exists ## : ", local_config_exists) | |
| # 判断是否在部署环境中运行(通过检查环境变量) | |
| is_deployed_env = os.environ.get("OPENAI_BASE_URL") or os.environ.get("OPENAI_API_KEY") | |
| print("## is_deployed_env ## : ", is_deployed_env) | |
| # 如果在部署环境中运行且有环境变量配置,则使用环境变量生成config.toml | |
| if is_deployed_env and not local_config_exists: | |
| try: | |
| # 如果环境变量中有配置,则使用环境变量中的配置生成config.toml | |
| template_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.toml.template") | |
| print("## template_path ## : ", template_path) | |
| with open(template_path, "r") as template_file: | |
| template_content = template_file.read() | |
| # 替换模板中的环境变量 | |
| template = string.Template(template_content) | |
| config_content = template.substitute({ | |
| "OPENAI_BASE_URL": openai_base_url, | |
| "OPENAI_MODEL": openai_model, | |
| "OPENAI_API_KEY": openai_api_key | |
| }) | |
| # 写入配置文件 | |
| config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), config_path) | |
| print("## config_file_path ## : ", config_file_path) | |
| with open(config_file_path, "w") as config_file: | |
| config_file.write(config_content) | |
| # 确保使用配置文件 | |
| config.config_path = config_file_path | |
| print("## 配置文件已生成 ## : ", config.config_path) | |
| except Exception as e: | |
| print(f"## 生成配置文件时出错 ## : {str(e)}") | |
| raise | |
| # 确保翻译目录存在 | |
| os.makedirs(config.translated_dir, exist_ok=True) | |
| print(f"## 确保翻译目录存在 ## : {config.translated_dir}") | |
| # 执行pdf2zh翻译, 用户可以自定义命令内容: | |
| if not os.path.exists(config.config_path): | |
| command = [ | |
| pdf2zh, | |
| input_path, | |
| "--t", | |
| str(config.thread_num), | |
| "--output", | |
| config.translated_dir, | |
| "--service", | |
| config.service, | |
| "--lang-in", | |
| config.source_languages, | |
| "--lang-out", | |
| config.target_languages, | |
| ] | |
| # 如果设置了API密钥,添加到命令中 | |
| if model_type.lower().startswith("gpt") and openai_api_key: | |
| command.extend(["--openai-api-key", openai_api_key, "--openai-model", model_type]) | |
| elif model_type.lower().startswith("claude") and claude_api_key: | |
| command.extend(["--claude-api-key", claude_api_key, "--claude-model", model_type]) | |
| else: | |
| command = [pdf2zh, "-c", config.config_path, "--files", input_path] | |
| print("## 执行命令 ## : ", " ".join(command)) | |
| # 使用Popen而不是run,以便实时显示输出(包括进度条) | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| # 实时读取并显示输出 | |
| stdout_lines = [] | |
| stderr_lines = [] | |
| while True: | |
| stdout_line = process.stdout.readline() | |
| stderr_line = process.stderr.readline() | |
| if stdout_line: | |
| print(stdout_line.strip()) | |
| stdout_lines.append(stdout_line) | |
| if stderr_line: | |
| print(stderr_line.strip()) | |
| stderr_lines.append(stderr_line) | |
| # 检查进程是否结束 | |
| if process.poll() is not None: | |
| # 读取剩余输出 | |
| for line in process.stdout: | |
| print(line.strip()) | |
| stdout_lines.append(line) | |
| for line in process.stderr: | |
| print(line.strip()) | |
| stderr_lines.append(line) | |
| break | |
| # 获取返回码 | |
| returncode = process.returncode | |
| stdout = ''.join(stdout_lines) | |
| stderr = ''.join(stderr_lines) | |
| print(f"## 命令执行结果 ## : 返回码={returncode}") | |
| if returncode != 0: | |
| error_msg = f"命令执行失败,返回码: {returncode}, 错误: {stderr}" | |
| print(f"## 详细错误信息 ## : {error_msg}") | |
| raise Exception(error_msg) | |
| # 检查输出文件 | |
| expected_mono = os.path.join( | |
| config.translated_dir, | |
| os.path.basename(input_path).replace(".pdf", ".zh.mono.pdf"), | |
| ) | |
| expected_dual = os.path.join( | |
| config.translated_dir, | |
| os.path.basename(input_path).replace(".pdf", ".zh.dual.pdf"), | |
| ) | |
| print(f"## 检查输出文件 ## : mono={expected_mono}, dual={expected_dual}") | |
| print(f"## 文件存在检查 ## : mono存在={os.path.exists(expected_mono)}, dual存在={os.path.exists(expected_dual)}") | |
| # 执行 mv 命令 | |
| mono = os.path.join( | |
| config.translated_dir, os.path.basename(input_path).replace(".pdf", "-mono.pdf") | |
| ) | |
| dual = os.path.join( | |
| config.translated_dir, os.path.basename(input_path).replace(".pdf", "-dual.pdf") | |
| ) | |
| try: | |
| if os.path.exists(expected_mono): | |
| shutil.move(expected_mono, mono) | |
| print(f"## 移动文件成功 ## : {expected_mono} -> {mono}") | |
| else: | |
| raise Exception(f"源文件不存在: {expected_mono}") | |
| if os.path.exists(expected_dual): | |
| shutil.move(expected_dual, dual) | |
| print(f"## 移动文件成功 ## : {expected_dual} -> {dual}") | |
| else: | |
| raise Exception(f"源文件不存在: {expected_dual}") | |
| except Exception as e: | |
| print(f"## 移动文件时出错 ## : {str(e)}") | |
| raise | |
| if not os.path.exists(mono) or not os.path.exists(dual): | |
| raise Exception("[Failed to generate translated files]: " + mono + ", " + dual) | |
| print("[mono file generated]: ", mono) | |
| print("[dual file generated]: ", dual) | |
| return mono, dual | |
| except Exception as e: | |
| print(f"## translate_pdf函数出错 ## : {str(e)}") | |
| # 重新抛出异常,以便上层函数可以捕获 | |
| raise | |
| app = Flask(__name__) | |
| CORS(app, resources={r"/*": {"origins": "*"}}) # 允许所有来源的跨域请求 | |
| def translate(): | |
| try: | |
| print("\n############# 开始翻译请求处理 #############") | |
| input_path, config = get_file_from_request(request) | |
| print(f"## 获取到输入文件 ## : {input_path}") | |
| mono, dual = translate_pdf(input_path, config) | |
| print(f"## 翻译完成 ## : mono={mono}, dual={dual}") | |
| if config.mono_cut and config.mono_cut == "true": | |
| try: | |
| path = mono.replace("-mono.pdf", "-mono-cut.pdf") | |
| print(f"## 开始切割mono文件 ## : {mono} -> {path}") | |
| split_and_merge_pdf(mono, path, compare=False) | |
| if not os.path.exists(path): | |
| raise Exception("[Failed to generate cutted files]: " + path) | |
| print("[mono-cut file generated]: ", path) | |
| except Exception as e: | |
| print(f"## 切割mono文件出错 ## : {str(e)}") | |
| raise | |
| if config.dual_cut and config.dual_cut == "true": | |
| try: | |
| path = dual.replace("-dual.pdf", "-dual-cut.pdf") | |
| print(f"## 开始切割dual文件 ## : {dual} -> {path}") | |
| split_and_merge_pdf(dual, path, compare=False) | |
| if not os.path.exists(path): | |
| raise Exception("[Failed to generate cutted files]: " + path) | |
| print("[dual-cut file generated]: ", path) | |
| except Exception as e: | |
| print(f"## 切割dual文件出错 ## : {str(e)}") | |
| raise | |
| if config.compare and config.compare == "true": | |
| try: | |
| path = dual.replace("-dual.pdf", "-compare.pdf") | |
| print(f"## 开始生成对比文件 ## : {dual} -> {path}") | |
| split_and_merge_pdf(dual, path, compare=True) | |
| if not os.path.exists(path): | |
| raise Exception("[Failed to generate compare files]: " + path) | |
| print("[compare file generated]: ", path) | |
| except Exception as e: | |
| print(f"## 生成对比文件出错 ## : {str(e)}") | |
| raise | |
| return jsonify({"status": "success"}), 200 | |
| except Exception as e: | |
| print(f"[Translate Error]: {e}") | |
| # 返回更详细的错误信息 | |
| error_message = str(e) | |
| traceback_info = sys.exc_info() | |
| if traceback_info[2]: | |
| import traceback | |
| traceback_str = "".join(traceback.format_tb(traceback_info[2])) | |
| print(f"## 错误堆栈 ## : {traceback_str}") | |
| error_message = f"{error_message}\n{traceback_str}" | |
| return jsonify({"status": "error", "message": error_message}), 500 | |
| def download(filename): | |
| print("\n############# Downloading #############") | |
| file_path = os.path.join(get_absolute_path(global_translated_dir), filename) | |
| if not os.path.isfile(file_path): | |
| print("[Download File not found]: ", file_path) | |
| return "[Download File not found]: " + file_path, 404 | |
| print("[Download file]: ", file_path) | |
| return send_file(file_path, as_attachment=True, download_name=filename) | |
| # 工具函数, 用于切割双栏pdf文件 | |
| def split_and_merge_pdf(input_pdf, output_pdf, compare=False): | |
| writer = PdfWriter() | |
| if "dual" in input_pdf: | |
| readers = [PdfReader(input_pdf) for _ in range(4)] | |
| for i in range(0, len(readers[0].pages), 2): | |
| original_media_box = readers[0].pages[i].mediabox | |
| width = original_media_box.width | |
| height = original_media_box.height | |
| left_page_1 = readers[0].pages[i] | |
| for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: | |
| setattr(left_page_1, box, RectangleObject((0, 0, width / 2, height))) | |
| left_page_2 = readers[1].pages[i + 1] | |
| for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: | |
| setattr(left_page_2, box, RectangleObject((0, 0, width / 2, height))) | |
| right_page_1 = readers[2].pages[i] | |
| for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: | |
| setattr( | |
| right_page_1, box, RectangleObject((width / 2, 0, width, height)) | |
| ) | |
| right_page_2 = readers[3].pages[i + 1] | |
| for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]: | |
| setattr( | |
| right_page_2, box, RectangleObject((width / 2, 0, width, height)) | |
| ) | |
| if compare == True: | |
| blank_page_1 = writer.add_blank_page(width, height) | |
| blank_page_1.merge_transformed_page(left_page_1, (1, 0, 0, 1, 0, 0)) | |
| blank_page_1.merge_transformed_page( | |
| left_page_2, (1, 0, 0, 1, width / 2, 0) | |
| ) | |
| blank_page_2 = writer.add_blank_page(width, height) | |
| blank_page_2.merge_transformed_page( | |
| right_page_1, (1, 0, 0, 1, -width / 2, 0) | |
| ) | |
| blank_page_2.merge_transformed_page(right_page_2, (1, 0, 0, 1, 0, 0)) | |
| else: | |
| writer.add_page(left_page_1) | |
| writer.add_page(left_page_2) | |
| writer.add_page(right_page_1) | |
| writer.add_page(right_page_2) | |
| else: | |
| readers = [PdfReader(input_pdf) for _ in range(2)] | |
| for i in range(len(readers[0].pages)): | |
| page = readers[0].pages[i] | |
| original_media_box = page.mediabox | |
| width = original_media_box.width | |
| height = original_media_box.height | |
| left_page = readers[0].pages[i] | |
| left_page.mediabox = RectangleObject((0, 0, width / 2, height)) | |
| right_page = readers[1].pages[i] | |
| right_page.mediabox = RectangleObject((width / 2, 0, width, height)) | |
| writer.add_page(left_page) | |
| writer.add_page(right_page) | |
| with open(output_pdf, "wb") as output_file: | |
| writer.write(output_file) | |
| # 用于切割双栏pdf文件 | |
| def cut(): | |
| print("\n############# Cutting #############") | |
| input_path, config = get_file_from_request(request) | |
| try: | |
| translated_path = os.path.join( | |
| config.translated_dir, | |
| os.path.basename(input_path).replace(".pdf", "-cut.pdf"), | |
| ) | |
| split_and_merge_pdf(input_path, translated_path) | |
| if not os.path.exists(translated_path): | |
| raise Exception("[Failed to generate cut files]: ", translated_path) | |
| print("[Cut file generated]: ", translated_path) | |
| return jsonify({"status": "success"}), 200 | |
| except Exception as e: | |
| print(f"[Cut File Error]: {e}") | |
| # 返回更详细的错误信息 | |
| error_message = str(e) | |
| traceback_info = sys.exc_info() | |
| if traceback_info[2]: | |
| import traceback | |
| traceback_str = "".join(traceback.format_tb(traceback_info[2])) | |
| print(f"## 错误堆栈 ## : {traceback_str}") | |
| error_message = f"{error_message}\n{traceback_str}" | |
| return jsonify({"status": "error", "message": error_message}), 500 | |
| # 用于生成中英对照文件 | |
| def cut_compare(): | |
| print("\n############# Comparing #############") | |
| try: | |
| input_path, config = get_file_from_request(request) | |
| print(f"## 获取到输入文件 ## : {input_path}") | |
| if "dual" in input_path: | |
| translated_path = os.path.join( | |
| config.translated_dir, | |
| os.path.basename(input_path).replace(".pdf", "-compare.pdf"), | |
| ) | |
| print(f"## 直接生成对比文件 ## : {input_path} -> {translated_path}") | |
| # 确保翻译目录存在 | |
| os.makedirs(os.path.dirname(translated_path), exist_ok=True) | |
| split_and_merge_pdf(input_path, translated_path, compare=True) | |
| else: | |
| print(f"## 需要先翻译再生成对比文件 ## : {input_path}") | |
| _, dual = translate_pdf(input_path, config) | |
| translated_path = dual.replace("-dual.pdf", "-compare.pdf") | |
| print(f"## 生成对比文件 ## : {dual} -> {translated_path}") | |
| split_and_merge_pdf(dual, translated_path, compare=True) | |
| if not os.path.exists(translated_path): | |
| raise Exception("[Failed to generate cutted file]: " + translated_path) | |
| print("[Compare file generated]: ", translated_path) | |
| return jsonify({"status": "success"}), 200 | |
| except Exception as e: | |
| print(f"[cut_compare() Error]: {e}") | |
| # 返回更详细的错误信息 | |
| error_message = str(e) | |
| traceback_info = sys.exc_info() | |
| if traceback_info[2]: | |
| import traceback | |
| traceback_str = "".join(traceback.format_tb(traceback_info[2])) | |
| print(f"## 错误堆栈 ## : {traceback_str}") | |
| error_message = f"{error_message}\n{traceback_str}" | |
| return jsonify({"status": "error", "message": error_message}), 500 | |
| if __name__ == "__main__": | |
| if len(sys.argv) > 1: | |
| port_num = int(sys.argv[1]) | |
| app.run(host="0.0.0.0", port=port_num) | |