pdfMathTranslate / server.py
Amamiyaren's picture
Upload 2 files
272deef verified
from flask import Flask, request, jsonify
import os
import base64
import subprocess
import copy
from flask import Flask, send_file, abort
from pypdf import PdfWriter, PdfReader
from pypdf.generic import RectangleObject
import sys
import shutil
import string
from flask_cors import CORS
######################################## 默认配置 ########################################
port_num = int(os.environ.get("PORT", 8888)) # 设置端口号: 默认为8888
pdf2zh = "babeldoc" # 设置pdf2zh指令: 默认为'pdf2zh'
######### 可以在Zotero偏好设置中配置以下参数, Zotero配置会覆盖本文件中的配置参数 #########
thread_num = 4 # 设置线程数: 默认为4
service = "bing" # 设置翻译服务: 默认为bing
translated_dir = "./translated/" # 设置翻译文件的输出路径(临时路径, 可以在翻译后删除)
config_path = "./config.toml" # 设置PDF2zh配置文件路径
source_languages = "en" # 设置源语言
target_languages = "zh" # 设置目标语言
global_translated_dir = translated_dir
# 从环境变量读取OpenAI配置
openai_base_url = os.environ.get("OPENAI_BASE_URL", "")
openai_model = os.environ.get("OPENAI_MODEL", "gpt-4o")
openai_api_key = os.environ.get("OPENAI_API_KEY", "")
model_type = openai_model # 用于判断模型类型
claude_api_key = os.environ.get("CLAUDE_API_KEY", "")
##########################################################################################
class Config:
def __init__(self, request):
self.thread_num = request.get_json().get("threadNum")
if self.thread_num == None or self.thread_num == "":
self.thread_num = thread_num
self.service = request.get_json().get("engine")
if self.service == None or self.service == "":
self.service = service
self.source_languages = request.get_json().get("sourceLanguages")
if self.source_languages == None or self.source_languages == "":
self.source_languages = source_languages
self.target_languages = request.get_json().get("targetLanguages")
if self.target_languages == None or self.target_languages == "":
self.target_languages = target_languages
self.translated_dir = request.get_json().get("outputPath")
if self.translated_dir == None or self.translated_dir == "":
self.translated_dir = translated_dir
self.translated_dir = get_absolute_path(self.translated_dir)
os.makedirs(self.translated_dir, exist_ok=True)
self.config_path = request.get_json().get("configPath")
if self.config_path == None or self.config_path == "":
self.config_path = config_path
self.config_path = get_absolute_path(self.config_path)
self.mono_cut = request.get_json().get("mono_cut")
self.dual_cut = request.get_json().get("dual_cut")
self.compare = request.get_json().get("compare")
print("outputPath: ", self.translated_dir)
print("configPath: ", self.config_path)
global global_translated_dir
global_translated_dir = self.translated_dir
def get_absolute_path(path):
if os.path.isabs(path):
return path
else:
return os.path.abspath(path)
def get_file_from_request(request):
config = Config(request)
data = request.get_json()
path = data.get("filePath")
print("filePath: ", path)
path = path.replace('\\', '/') # 把所有反斜杠\替换为正斜杠/ (Windows->Linux/MacOS)
file_content = data.get("fileContent")
input_path = os.path.join(config.translated_dir, os.path.basename(path))
input_path = get_absolute_path(input_path)
print("input path: ", input_path)
if file_content:
if file_content.startswith(
"data:application/pdf;base64,"
): # 移除 Base64 编码中的前缀(如果有)
file_content = file_content[len("data:application/pdf;base64,") :]
file_data = base64.b64decode(file_content) # 解码 Base64 内容
with open(input_path, "wb") as f:
f.write(file_data)
return input_path, config
def translate_pdf(input_path, config):
print("\n############# Translating #############")
print("## translate file path ## : ", input_path)
print("## translated_dir ## : ", config.translated_dir)
print("## config_path ## : ", config.config_path)
try:
# 检查是否存在本地配置文件
local_config_exists = os.path.exists(config.config_path)
print("## local_config_exists ## : ", local_config_exists)
# 判断是否在部署环境中运行(通过检查环境变量)
is_deployed_env = os.environ.get("OPENAI_BASE_URL") or os.environ.get("OPENAI_API_KEY")
print("## is_deployed_env ## : ", is_deployed_env)
# 如果在部署环境中运行且有环境变量配置,则使用环境变量生成config.toml
if is_deployed_env and not local_config_exists:
try:
# 如果环境变量中有配置,则使用环境变量中的配置生成config.toml
template_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.toml.template")
print("## template_path ## : ", template_path)
with open(template_path, "r") as template_file:
template_content = template_file.read()
# 替换模板中的环境变量
template = string.Template(template_content)
config_content = template.substitute({
"OPENAI_BASE_URL": openai_base_url,
"OPENAI_MODEL": openai_model,
"OPENAI_API_KEY": openai_api_key
})
# 写入配置文件
config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), config_path)
print("## config_file_path ## : ", config_file_path)
with open(config_file_path, "w") as config_file:
config_file.write(config_content)
# 确保使用配置文件
config.config_path = config_file_path
print("## 配置文件已生成 ## : ", config.config_path)
except Exception as e:
print(f"## 生成配置文件时出错 ## : {str(e)}")
raise
# 确保翻译目录存在
os.makedirs(config.translated_dir, exist_ok=True)
print(f"## 确保翻译目录存在 ## : {config.translated_dir}")
# 执行pdf2zh翻译, 用户可以自定义命令内容:
if not os.path.exists(config.config_path):
command = [
pdf2zh,
input_path,
"--t",
str(config.thread_num),
"--output",
config.translated_dir,
"--service",
config.service,
"--lang-in",
config.source_languages,
"--lang-out",
config.target_languages,
]
# 如果设置了API密钥,添加到命令中
if model_type.lower().startswith("gpt") and openai_api_key:
command.extend(["--openai-api-key", openai_api_key, "--openai-model", model_type])
elif model_type.lower().startswith("claude") and claude_api_key:
command.extend(["--claude-api-key", claude_api_key, "--claude-model", model_type])
else:
command = [pdf2zh, "-c", config.config_path, "--files", input_path]
print("## 执行命令 ## : ", " ".join(command))
# 使用Popen而不是run,以便实时显示输出(包括进度条)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
# 实时读取并显示输出
stdout_lines = []
stderr_lines = []
while True:
stdout_line = process.stdout.readline()
stderr_line = process.stderr.readline()
if stdout_line:
print(stdout_line.strip())
stdout_lines.append(stdout_line)
if stderr_line:
print(stderr_line.strip())
stderr_lines.append(stderr_line)
# 检查进程是否结束
if process.poll() is not None:
# 读取剩余输出
for line in process.stdout:
print(line.strip())
stdout_lines.append(line)
for line in process.stderr:
print(line.strip())
stderr_lines.append(line)
break
# 获取返回码
returncode = process.returncode
stdout = ''.join(stdout_lines)
stderr = ''.join(stderr_lines)
print(f"## 命令执行结果 ## : 返回码={returncode}")
if returncode != 0:
error_msg = f"命令执行失败,返回码: {returncode}, 错误: {stderr}"
print(f"## 详细错误信息 ## : {error_msg}")
raise Exception(error_msg)
# 检查输出文件
expected_mono = os.path.join(
config.translated_dir,
os.path.basename(input_path).replace(".pdf", ".zh.mono.pdf"),
)
expected_dual = os.path.join(
config.translated_dir,
os.path.basename(input_path).replace(".pdf", ".zh.dual.pdf"),
)
print(f"## 检查输出文件 ## : mono={expected_mono}, dual={expected_dual}")
print(f"## 文件存在检查 ## : mono存在={os.path.exists(expected_mono)}, dual存在={os.path.exists(expected_dual)}")
# 执行 mv 命令
mono = os.path.join(
config.translated_dir, os.path.basename(input_path).replace(".pdf", "-mono.pdf")
)
dual = os.path.join(
config.translated_dir, os.path.basename(input_path).replace(".pdf", "-dual.pdf")
)
try:
if os.path.exists(expected_mono):
shutil.move(expected_mono, mono)
print(f"## 移动文件成功 ## : {expected_mono} -> {mono}")
else:
raise Exception(f"源文件不存在: {expected_mono}")
if os.path.exists(expected_dual):
shutil.move(expected_dual, dual)
print(f"## 移动文件成功 ## : {expected_dual} -> {dual}")
else:
raise Exception(f"源文件不存在: {expected_dual}")
except Exception as e:
print(f"## 移动文件时出错 ## : {str(e)}")
raise
if not os.path.exists(mono) or not os.path.exists(dual):
raise Exception("[Failed to generate translated files]: " + mono + ", " + dual)
print("[mono file generated]: ", mono)
print("[dual file generated]: ", dual)
return mono, dual
except Exception as e:
print(f"## translate_pdf函数出错 ## : {str(e)}")
# 重新抛出异常,以便上层函数可以捕获
raise
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}}) # 允许所有来源的跨域请求
@app.route("/translate", methods=["POST"])
def translate():
try:
print("\n############# 开始翻译请求处理 #############")
input_path, config = get_file_from_request(request)
print(f"## 获取到输入文件 ## : {input_path}")
mono, dual = translate_pdf(input_path, config)
print(f"## 翻译完成 ## : mono={mono}, dual={dual}")
if config.mono_cut and config.mono_cut == "true":
try:
path = mono.replace("-mono.pdf", "-mono-cut.pdf")
print(f"## 开始切割mono文件 ## : {mono} -> {path}")
split_and_merge_pdf(mono, path, compare=False)
if not os.path.exists(path):
raise Exception("[Failed to generate cutted files]: " + path)
print("[mono-cut file generated]: ", path)
except Exception as e:
print(f"## 切割mono文件出错 ## : {str(e)}")
raise
if config.dual_cut and config.dual_cut == "true":
try:
path = dual.replace("-dual.pdf", "-dual-cut.pdf")
print(f"## 开始切割dual文件 ## : {dual} -> {path}")
split_and_merge_pdf(dual, path, compare=False)
if not os.path.exists(path):
raise Exception("[Failed to generate cutted files]: " + path)
print("[dual-cut file generated]: ", path)
except Exception as e:
print(f"## 切割dual文件出错 ## : {str(e)}")
raise
if config.compare and config.compare == "true":
try:
path = dual.replace("-dual.pdf", "-compare.pdf")
print(f"## 开始生成对比文件 ## : {dual} -> {path}")
split_and_merge_pdf(dual, path, compare=True)
if not os.path.exists(path):
raise Exception("[Failed to generate compare files]: " + path)
print("[compare file generated]: ", path)
except Exception as e:
print(f"## 生成对比文件出错 ## : {str(e)}")
raise
return jsonify({"status": "success"}), 200
except Exception as e:
print(f"[Translate Error]: {e}")
# 返回更详细的错误信息
error_message = str(e)
traceback_info = sys.exc_info()
if traceback_info[2]:
import traceback
traceback_str = "".join(traceback.format_tb(traceback_info[2]))
print(f"## 错误堆栈 ## : {traceback_str}")
error_message = f"{error_message}\n{traceback_str}"
return jsonify({"status": "error", "message": error_message}), 500
@app.route("/translatedFile/<filename>")
def download(filename):
print("\n############# Downloading #############")
file_path = os.path.join(get_absolute_path(global_translated_dir), filename)
if not os.path.isfile(file_path):
print("[Download File not found]: ", file_path)
return "[Download File not found]: " + file_path, 404
print("[Download file]: ", file_path)
return send_file(file_path, as_attachment=True, download_name=filename)
# 工具函数, 用于切割双栏pdf文件
def split_and_merge_pdf(input_pdf, output_pdf, compare=False):
writer = PdfWriter()
if "dual" in input_pdf:
readers = [PdfReader(input_pdf) for _ in range(4)]
for i in range(0, len(readers[0].pages), 2):
original_media_box = readers[0].pages[i].mediabox
width = original_media_box.width
height = original_media_box.height
left_page_1 = readers[0].pages[i]
for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]:
setattr(left_page_1, box, RectangleObject((0, 0, width / 2, height)))
left_page_2 = readers[1].pages[i + 1]
for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]:
setattr(left_page_2, box, RectangleObject((0, 0, width / 2, height)))
right_page_1 = readers[2].pages[i]
for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]:
setattr(
right_page_1, box, RectangleObject((width / 2, 0, width, height))
)
right_page_2 = readers[3].pages[i + 1]
for box in ["mediabox", "cropbox", "trimbox", "bleedbox", "artbox"]:
setattr(
right_page_2, box, RectangleObject((width / 2, 0, width, height))
)
if compare == True:
blank_page_1 = writer.add_blank_page(width, height)
blank_page_1.merge_transformed_page(left_page_1, (1, 0, 0, 1, 0, 0))
blank_page_1.merge_transformed_page(
left_page_2, (1, 0, 0, 1, width / 2, 0)
)
blank_page_2 = writer.add_blank_page(width, height)
blank_page_2.merge_transformed_page(
right_page_1, (1, 0, 0, 1, -width / 2, 0)
)
blank_page_2.merge_transformed_page(right_page_2, (1, 0, 0, 1, 0, 0))
else:
writer.add_page(left_page_1)
writer.add_page(left_page_2)
writer.add_page(right_page_1)
writer.add_page(right_page_2)
else:
readers = [PdfReader(input_pdf) for _ in range(2)]
for i in range(len(readers[0].pages)):
page = readers[0].pages[i]
original_media_box = page.mediabox
width = original_media_box.width
height = original_media_box.height
left_page = readers[0].pages[i]
left_page.mediabox = RectangleObject((0, 0, width / 2, height))
right_page = readers[1].pages[i]
right_page.mediabox = RectangleObject((width / 2, 0, width, height))
writer.add_page(left_page)
writer.add_page(right_page)
with open(output_pdf, "wb") as output_file:
writer.write(output_file)
# 用于切割双栏pdf文件
@app.route("/cut", methods=["POST"])
def cut():
print("\n############# Cutting #############")
input_path, config = get_file_from_request(request)
try:
translated_path = os.path.join(
config.translated_dir,
os.path.basename(input_path).replace(".pdf", "-cut.pdf"),
)
split_and_merge_pdf(input_path, translated_path)
if not os.path.exists(translated_path):
raise Exception("[Failed to generate cut files]: ", translated_path)
print("[Cut file generated]: ", translated_path)
return jsonify({"status": "success"}), 200
except Exception as e:
print(f"[Cut File Error]: {e}")
# 返回更详细的错误信息
error_message = str(e)
traceback_info = sys.exc_info()
if traceback_info[2]:
import traceback
traceback_str = "".join(traceback.format_tb(traceback_info[2]))
print(f"## 错误堆栈 ## : {traceback_str}")
error_message = f"{error_message}\n{traceback_str}"
return jsonify({"status": "error", "message": error_message}), 500
# 用于生成中英对照文件
@app.route("/cut-compare", methods=["POST"])
def cut_compare():
print("\n############# Comparing #############")
try:
input_path, config = get_file_from_request(request)
print(f"## 获取到输入文件 ## : {input_path}")
if "dual" in input_path:
translated_path = os.path.join(
config.translated_dir,
os.path.basename(input_path).replace(".pdf", "-compare.pdf"),
)
print(f"## 直接生成对比文件 ## : {input_path} -> {translated_path}")
# 确保翻译目录存在
os.makedirs(os.path.dirname(translated_path), exist_ok=True)
split_and_merge_pdf(input_path, translated_path, compare=True)
else:
print(f"## 需要先翻译再生成对比文件 ## : {input_path}")
_, dual = translate_pdf(input_path, config)
translated_path = dual.replace("-dual.pdf", "-compare.pdf")
print(f"## 生成对比文件 ## : {dual} -> {translated_path}")
split_and_merge_pdf(dual, translated_path, compare=True)
if not os.path.exists(translated_path):
raise Exception("[Failed to generate cutted file]: " + translated_path)
print("[Compare file generated]: ", translated_path)
return jsonify({"status": "success"}), 200
except Exception as e:
print(f"[cut_compare() Error]: {e}")
# 返回更详细的错误信息
error_message = str(e)
traceback_info = sys.exc_info()
if traceback_info[2]:
import traceback
traceback_str = "".join(traceback.format_tb(traceback_info[2]))
print(f"## 错误堆栈 ## : {traceback_str}")
error_message = f"{error_message}\n{traceback_str}"
return jsonify({"status": "error", "message": error_message}), 500
if __name__ == "__main__":
if len(sys.argv) > 1:
port_num = int(sys.argv[1])
app.run(host="0.0.0.0", port=port_num)