| | import concurrent.futures |
| | import logging |
| | import re |
| | import unicodedata |
| | from enum import Enum |
| | from string import Template |
| | from typing import Dict |
| |
|
| | import numpy as np |
| | from pdfminer.converter import PDFConverter |
| | from pdfminer.layout import LTChar, LTFigure, LTLine, LTPage |
| | from pdfminer.pdffont import PDFCIDFont, PDFUnicodeNotDefined |
| | from pdfminer.pdfinterp import PDFGraphicState, PDFResourceManager |
| | from pdfminer.utils import apply_matrix_pt, mult_matrix |
| | from pymupdf import Font |
| | from tenacity import retry, wait_fixed |
| |
|
| | from pdf2zh.translator import ( |
| | AnythingLLMTranslator, |
| | ArgosTranslator, |
| | AzureOpenAITranslator, |
| | AzureTranslator, |
| | BaseTranslator, |
| | BingTranslator, |
| | DeepLTranslator, |
| | DeepLXTranslator, |
| | DeepseekTranslator, |
| | DifyTranslator, |
| | GeminiTranslator, |
| | GoogleTranslator, |
| | GrokTranslator, |
| | GroqTranslator, |
| | ModelScopeTranslator, |
| | OllamaTranslator, |
| | OpenAIlikedTranslator, |
| | OpenAITranslator, |
| | QwenMtTranslator, |
| | SiliconTranslator, |
| | TencentTranslator, |
| | XinferenceTranslator, |
| | ZhipuTranslator, |
| | ) |
| |
|
| | log = logging.getLogger(__name__) |
| |
|
| |
|
| | class PDFConverterEx(PDFConverter): |
| | def __init__( |
| | self, |
| | rsrcmgr: PDFResourceManager, |
| | ) -> None: |
| | PDFConverter.__init__(self, rsrcmgr, None, "utf-8", 1, None) |
| |
|
| | def begin_page(self, page, ctm) -> None: |
| | |
| | (x0, y0, x1, y1) = page.cropbox |
| | (x0, y0) = apply_matrix_pt(ctm, (x0, y0)) |
| | (x1, y1) = apply_matrix_pt(ctm, (x1, y1)) |
| | mediabox = (0, 0, abs(x0 - x1), abs(y0 - y1)) |
| | self.cur_item = LTPage(page.pageno, mediabox) |
| |
|
| | def end_page(self, page): |
| | |
| | return self.receive_layout(self.cur_item) |
| |
|
| | def begin_figure(self, name, bbox, matrix) -> None: |
| | |
| | self._stack.append(self.cur_item) |
| | self.cur_item = LTFigure(name, bbox, mult_matrix(matrix, self.ctm)) |
| | self.cur_item.pageid = self._stack[-1].pageid |
| |
|
| | def end_figure(self, _: str) -> None: |
| | |
| | fig = self.cur_item |
| | assert isinstance(self.cur_item, LTFigure), str(type(self.cur_item)) |
| | self.cur_item = self._stack.pop() |
| | self.cur_item.add(fig) |
| | return self.receive_layout(fig) |
| |
|
| | def render_char( |
| | self, |
| | matrix, |
| | font, |
| | fontsize: float, |
| | scaling: float, |
| | rise: float, |
| | cid: int, |
| | ncs, |
| | graphicstate: PDFGraphicState, |
| | ) -> float: |
| | |
| | try: |
| | text = font.to_unichr(cid) |
| | assert isinstance(text, str), str(type(text)) |
| | except PDFUnicodeNotDefined: |
| | text = self.handle_undefined_char(font, cid) |
| | textwidth = font.char_width(cid) |
| | textdisp = font.char_disp(cid) |
| | item = LTChar( |
| | matrix, |
| | font, |
| | fontsize, |
| | scaling, |
| | rise, |
| | text, |
| | textwidth, |
| | textdisp, |
| | ncs, |
| | graphicstate, |
| | ) |
| | self.cur_item.add(item) |
| | item.cid = cid |
| | item.font = font |
| | return item.adv |
| |
|
| |
|
| | class Paragraph: |
| | def __init__(self, y, x, x0, x1, y0, y1, size, brk): |
| | self.y: float = y |
| | self.x: float = x |
| | self.x0: float = x0 |
| | self.x1: float = x1 |
| | self.y0: float = y0 |
| | self.y1: float = y1 |
| | self.size: float = size |
| | self.brk: bool = brk |
| |
|
| |
|
| | |
| | class TranslateConverter(PDFConverterEx): |
| | def __init__( |
| | self, |
| | rsrcmgr, |
| | vfont: str = None, |
| | vchar: str = None, |
| | thread: int = 0, |
| | layout={}, |
| | lang_in: str = "", |
| | lang_out: str = "", |
| | service: str = "", |
| | noto_name: str = "", |
| | noto: Font = None, |
| | envs: Dict = None, |
| | prompt: Template = None, |
| | ignore_cache: bool = False, |
| | ) -> None: |
| | super().__init__(rsrcmgr) |
| | self.vfont = vfont |
| | self.vchar = vchar |
| | self.thread = thread |
| | self.layout = layout |
| | self.noto_name = noto_name |
| | self.noto = noto |
| | self.translator: BaseTranslator = None |
| | |
| | param = service.split(":", 1) |
| | service_name = param[0] |
| | service_model = param[1] if len(param) > 1 else None |
| | if not envs: |
| | envs = {} |
| | for translator in [GoogleTranslator, BingTranslator, DeepLTranslator, DeepLXTranslator, OllamaTranslator, XinferenceTranslator, AzureOpenAITranslator, |
| | OpenAITranslator, ZhipuTranslator, ModelScopeTranslator, SiliconTranslator, GeminiTranslator, AzureTranslator, TencentTranslator, DifyTranslator, AnythingLLMTranslator, ArgosTranslator, GrokTranslator, GroqTranslator, DeepseekTranslator, OpenAIlikedTranslator, QwenMtTranslator,]: |
| | if service_name == translator.name: |
| | self.translator = translator(lang_in, lang_out, service_model, envs=envs, prompt=prompt, ignore_cache=ignore_cache) |
| | if not self.translator: |
| | raise ValueError("Unsupported translation service") |
| |
|
| | def receive_layout(self, ltpage: LTPage): |
| | |
| | sstk: list[str] = [] |
| | pstk: list[Paragraph] = [] |
| | vbkt: int = 0 |
| | |
| | vstk: list[LTChar] = [] |
| | vlstk: list[LTLine] = [] |
| | vfix: float = 0 |
| | |
| | var: list[list[LTChar]] = [] |
| | varl: list[list[LTLine]] = [] |
| | varf: list[float] = [] |
| | vlen: list[float] = [] |
| | |
| | lstk: list[LTLine] = [] |
| | xt: LTChar = None |
| | xt_cls: int = -1 |
| | vmax: float = ltpage.width / 4 |
| | ops: str = "" |
| |
|
| | def vflag(font: str, char: str): |
| | if isinstance(font, bytes): |
| | try: |
| | font = font.decode('utf-8') |
| | except UnicodeDecodeError: |
| | font = "" |
| | font = font.split("+")[-1] |
| | if re.match(r"\(cid:", char): |
| | return True |
| | |
| | if self.vfont: |
| | if re.match(self.vfont, font): |
| | return True |
| | else: |
| | if re.match( |
| | r"(CM[^R]|MS.M|XY|MT|BL|RM|EU|LA|RS|LINE|LCIRCLE|TeX-|rsfs|txsy|wasy|stmary|.*Mono|.*Code|.*Ital|.*Sym|.*Math)", |
| | font, |
| | ): |
| | return True |
| | |
| | if self.vchar: |
| | if re.match(self.vchar, char): |
| | return True |
| | else: |
| | if ( |
| | char |
| | and char != " " |
| | and ( |
| | unicodedata.category(char[0]) |
| | in ["Lm", "Mn", "Sk", "Sm", "Zl", "Zp", "Zs"] |
| | or ord(char[0]) in range(0x370, 0x400) |
| | ) |
| | ): |
| | return True |
| | return False |
| |
|
| | |
| | |
| | for child in ltpage: |
| | if isinstance(child, LTChar): |
| | cur_v = False |
| | layout = self.layout[ltpage.pageid] |
| | |
| | h, w = layout.shape |
| | |
| | cx, cy = np.clip(int(child.x0), 0, w - 1), np.clip(int(child.y0), 0, h - 1) |
| | cls = layout[cy, cx] |
| | |
| | if child.get_text() == "•": |
| | cls = 0 |
| | |
| | if ( |
| | cls == 0 |
| | or (cls == xt_cls and len(sstk[-1].strip()) > 1 and child.size < pstk[-1].size * 0.79) |
| | or vflag(child.fontname, child.get_text()) |
| | or (child.matrix[0] == 0 and child.matrix[3] == 0) |
| | ): |
| | cur_v = True |
| | |
| | if not cur_v: |
| | if vstk and child.get_text() == "(": |
| | cur_v = True |
| | vbkt += 1 |
| | if vbkt and child.get_text() == ")": |
| | cur_v = True |
| | vbkt -= 1 |
| | if ( |
| | not cur_v |
| | or cls != xt_cls |
| | |
| | |
| | |
| | |
| | or (sstk[-1] != "" and abs(child.x0 - xt.x0) > vmax) |
| | ): |
| | if vstk: |
| | if ( |
| | not cur_v |
| | and cls == xt_cls |
| | and child.x0 > max([vch.x0 for vch in vstk]) |
| | ): |
| | vfix = vstk[0].y0 - child.y0 |
| | if sstk[-1] == "": |
| | xt_cls = -1 |
| | sstk[-1] += f"{{v{len(var)}}}" |
| | var.append(vstk) |
| | varl.append(vlstk) |
| | varf.append(vfix) |
| | vstk = [] |
| | vlstk = [] |
| | vfix = 0 |
| | |
| | if not vstk: |
| | if cls == xt_cls: |
| | if child.x0 > xt.x1 + 1: |
| | sstk[-1] += " " |
| | elif child.x1 < xt.x0: |
| | sstk[-1] += " " |
| | pstk[-1].brk = True |
| | else: |
| | sstk.append("") |
| | pstk.append(Paragraph(child.y0, child.x0, child.x0, child.x0, child.y0, child.y1, child.size, False)) |
| | if not cur_v: |
| | if ( |
| | child.size > pstk[-1].size |
| | or len(sstk[-1].strip()) == 1 |
| | ) and child.get_text() != " ": |
| | pstk[-1].y -= child.size - pstk[-1].size |
| | pstk[-1].size = child.size |
| | sstk[-1] += child.get_text() |
| | else: |
| | if ( |
| | not vstk |
| | and cls == xt_cls |
| | and child.x0 > xt.x0 |
| | ): |
| | vfix = child.y0 - xt.y0 |
| | vstk.append(child) |
| | |
| | pstk[-1].x0 = min(pstk[-1].x0, child.x0) |
| | pstk[-1].x1 = max(pstk[-1].x1, child.x1) |
| | pstk[-1].y0 = min(pstk[-1].y0, child.y0) |
| | pstk[-1].y1 = max(pstk[-1].y1, child.y1) |
| | |
| | xt = child |
| | xt_cls = cls |
| | elif isinstance(child, LTFigure): |
| | pass |
| | elif isinstance(child, LTLine): |
| | layout = self.layout[ltpage.pageid] |
| | |
| | h, w = layout.shape |
| | |
| | cx, cy = np.clip(int(child.x0), 0, w - 1), np.clip(int(child.y0), 0, h - 1) |
| | cls = layout[cy, cx] |
| | if vstk and cls == xt_cls: |
| | vlstk.append(child) |
| | else: |
| | lstk.append(child) |
| | else: |
| | pass |
| | |
| | if vstk: |
| | sstk[-1] += f"{{v{len(var)}}}" |
| | var.append(vstk) |
| | varl.append(vlstk) |
| | varf.append(vfix) |
| | log.debug("\n==========[VSTACK]==========\n") |
| | for id, v in enumerate(var): |
| | l = max([vch.x1 for vch in v]) - v[0].x0 |
| | log.debug(f'< {l:.1f} {v[0].x0:.1f} {v[0].y0:.1f} {v[0].cid} {v[0].fontname} {len(varl[id])} > v{id} = {"".join([ch.get_text() for ch in v])}') |
| | vlen.append(l) |
| |
|
| | |
| | |
| | log.debug("\n==========[SSTACK]==========\n") |
| |
|
| | @retry(wait=wait_fixed(1)) |
| | def worker(s: str): |
| | if not s.strip() or re.match(r"^\{v\d+\}$", s): |
| | return s |
| | try: |
| | new = self.translator.translate(s) |
| | return new |
| | except BaseException as e: |
| | if log.isEnabledFor(logging.DEBUG): |
| | log.exception(e) |
| | else: |
| | log.exception(e, exc_info=False) |
| | raise e |
| | with concurrent.futures.ThreadPoolExecutor( |
| | max_workers=self.thread |
| | ) as executor: |
| | news = list(executor.map(worker, sstk)) |
| |
|
| | |
| | |
| | def raw_string(fcur: str, cstk: str): |
| | if fcur == self.noto_name: |
| | return "".join(["%04x" % self.noto.has_glyph(ord(c)) for c in cstk]) |
| | elif isinstance(self.fontmap[fcur], PDFCIDFont): |
| | return "".join(["%04x" % ord(c) for c in cstk]) |
| | else: |
| | return "".join(["%02x" % ord(c) for c in cstk]) |
| |
|
| | |
| | LANG_LINEHEIGHT_MAP = { |
| | "zh-cn": 1.4, "zh-tw": 1.4, "zh-hans": 1.4, "zh-hant": 1.4, "zh": 1.4, |
| | "ja": 1.1, "ko": 1.2, "en": 1.2, "ar": 1.0, "ru": 0.8, "uk": 0.8, "ta": 0.8 |
| | } |
| | default_line_height = LANG_LINEHEIGHT_MAP.get(self.translator.lang_out.lower(), 1.1) |
| | _x, _y = 0, 0 |
| | ops_list = [] |
| |
|
| | def gen_op_txt(font, size, x, y, rtxt): |
| | return f"/{font} {size:f} Tf 1 0 0 1 {x:f} {y:f} Tm [<{rtxt}>] TJ " |
| |
|
| | def gen_op_line(x, y, xlen, ylen, linewidth): |
| | return f"ET q 1 0 0 1 {x:f} {y:f} cm [] 0 d 0 J {linewidth:f} w 0 0 m {xlen:f} {ylen:f} l S Q BT " |
| |
|
| | for id, new in enumerate(news): |
| | x: float = pstk[id].x |
| | y: float = pstk[id].y |
| | x0: float = pstk[id].x0 |
| | x1: float = pstk[id].x1 |
| | height: float = pstk[id].y1 - pstk[id].y0 |
| | size: float = pstk[id].size |
| | brk: bool = pstk[id].brk |
| | cstk: str = "" |
| | fcur: str = None |
| | lidx = 0 |
| | tx = x |
| | fcur_ = fcur |
| | ptr = 0 |
| | log.debug(f"< {y} {x} {x0} {x1} {size} {brk} > {sstk[id]} | {new}") |
| |
|
| | ops_vals: list[dict] = [] |
| |
|
| | while ptr < len(new): |
| | vy_regex = re.match( |
| | r"\{\s*v([\d\s]+)\}", new[ptr:], re.IGNORECASE |
| | ) |
| | mod = 0 |
| | if vy_regex: |
| | ptr += len(vy_regex.group(0)) |
| | try: |
| | vid = int(vy_regex.group(1).replace(" ", "")) |
| | adv = vlen[vid] |
| | except Exception: |
| | continue |
| | if var[vid][-1].get_text() and unicodedata.category(var[vid][-1].get_text()[0]) in ["Lm", "Mn", "Sk"]: |
| | mod = var[vid][-1].width |
| | else: |
| | ch = new[ptr] |
| | fcur_ = None |
| | try: |
| | if fcur_ is None and self.fontmap["tiro"].to_unichr(ord(ch)) == ch: |
| | fcur_ = "tiro" |
| | except Exception: |
| | pass |
| | if fcur_ is None: |
| | fcur_ = self.noto_name |
| | if fcur_ == self.noto_name: |
| | adv = self.noto.char_lengths(ch, size)[0] |
| | else: |
| | adv = self.fontmap[fcur_].char_width(ord(ch)) * size |
| | ptr += 1 |
| | if ( |
| | fcur_ != fcur |
| | or vy_regex |
| | or x + adv > x1 + 0.1 * size |
| | ): |
| | if cstk: |
| | ops_vals.append({ |
| | "type": OpType.TEXT, |
| | "font": fcur, |
| | "size": size, |
| | "x": tx, |
| | "dy": 0, |
| | "rtxt": raw_string(fcur, cstk), |
| | "lidx": lidx |
| | }) |
| | cstk = "" |
| | if brk and x + adv > x1 + 0.1 * size: |
| | x = x0 |
| | lidx += 1 |
| | if vy_regex: |
| | fix = 0 |
| | if fcur is not None: |
| | fix = varf[vid] |
| | for vch in var[vid]: |
| | vc = chr(vch.cid) |
| | ops_vals.append({ |
| | "type": OpType.TEXT, |
| | "font": self.fontid[vch.font], |
| | "size": vch.size, |
| | "x": x + vch.x0 - var[vid][0].x0, |
| | "dy": fix + vch.y0 - var[vid][0].y0, |
| | "rtxt": raw_string(self.fontid[vch.font], vc), |
| | "lidx": lidx |
| | }) |
| | if log.isEnabledFor(logging.DEBUG): |
| | lstk.append(LTLine(0.1, (_x, _y), (x + vch.x0 - var[vid][0].x0, fix + y + vch.y0 - var[vid][0].y0))) |
| | _x, _y = x + vch.x0 - var[vid][0].x0, fix + y + vch.y0 - var[vid][0].y0 |
| | for l in varl[vid]: |
| | if l.linewidth < 5: |
| | ops_vals.append({ |
| | "type": OpType.LINE, |
| | "x": l.pts[0][0] + x - var[vid][0].x0, |
| | "dy": l.pts[0][1] + fix - var[vid][0].y0, |
| | "linewidth": l.linewidth, |
| | "xlen": l.pts[1][0] - l.pts[0][0], |
| | "ylen": l.pts[1][1] - l.pts[0][1], |
| | "lidx": lidx |
| | }) |
| | else: |
| | if not cstk: |
| | tx = x |
| | if x == x0 and ch == " ": |
| | adv = 0 |
| | else: |
| | cstk += ch |
| | else: |
| | cstk += ch |
| | adv -= mod |
| | fcur = fcur_ |
| | x += adv |
| | if log.isEnabledFor(logging.DEBUG): |
| | lstk.append(LTLine(0.1, (_x, _y), (x, y))) |
| | _x, _y = x, y |
| | |
| | if cstk: |
| | ops_vals.append({ |
| | "type": OpType.TEXT, |
| | "font": fcur, |
| | "size": size, |
| | "x": tx, |
| | "dy": 0, |
| | "rtxt": raw_string(fcur, cstk), |
| | "lidx": lidx |
| | }) |
| |
|
| | line_height = default_line_height |
| |
|
| | while (lidx + 1) * size * line_height > height and line_height >= 1: |
| | line_height -= 0.05 |
| |
|
| | for vals in ops_vals: |
| | if vals["type"] == OpType.TEXT: |
| | ops_list.append(gen_op_txt(vals["font"], vals["size"], vals["x"], vals["dy"] + y - vals["lidx"] * size * line_height, vals["rtxt"])) |
| | elif vals["type"] == OpType.LINE: |
| | ops_list.append(gen_op_line(vals["x"], vals["dy"] + y - vals["lidx"] * size * line_height, vals["xlen"], vals["ylen"], vals["linewidth"])) |
| |
|
| | for l in lstk: |
| | if l.linewidth < 5: |
| | ops_list.append(gen_op_line(l.pts[0][0], l.pts[0][1], l.pts[1][0] - l.pts[0][0], l.pts[1][1] - l.pts[0][1], l.linewidth)) |
| |
|
| | ops = f"BT {''.join(ops_list)}ET " |
| | return ops |
| |
|
| |
|
| | class OpType(Enum): |
| | TEXT = "text" |
| | LINE = "line" |
| |
|