KevinHuSh
remove unused codes, seperate layout detection out as a new api. Add new rag methed 'table' (#55)
407b252
| # -*- coding: utf-8 -*- | |
| import os | |
| import random | |
| from functools import partial | |
| import fitz | |
| import requests | |
| import xgboost as xgb | |
| from io import BytesIO | |
| import torch | |
| import re | |
| import pdfplumber | |
| import logging | |
| from PIL import Image | |
| import numpy as np | |
| from api.db import ParserType | |
| from rag.nlp import huqie | |
| from collections import Counter | |
| from copy import deepcopy | |
| from huggingface_hub import hf_hub_download | |
| logging.getLogger("pdfminer").setLevel(logging.WARNING) | |
| class HuParser: | |
| def __init__(self): | |
| from paddleocr import PaddleOCR | |
| logging.getLogger("ppocr").setLevel(logging.ERROR) | |
| self.ocr = PaddleOCR(use_angle_cls=False, lang="ch") | |
| if not hasattr(self, "model_speciess"): | |
| self.model_speciess = ParserType.GENERAL.value | |
| self.layouter = partial(self.__remote_call, self.model_speciess) | |
| self.tbl_det = partial(self.__remote_call, "table_component") | |
| self.updown_cnt_mdl = xgb.Booster() | |
| if torch.cuda.is_available(): | |
| self.updown_cnt_mdl.set_param({"device": "cuda"}) | |
| self.updown_cnt_mdl.load_model(hf_hub_download(repo_id="InfiniFlow/text_concat_xgb_v1.0", | |
| filename="updown_concat_xgb.model")) | |
| """ | |
| If you have trouble downloading HuggingFace models, -_^ this might help!! | |
| For Linux: | |
| export HF_ENDPOINT=https://hf-mirror.com | |
| For Windows: | |
| Good luck | |
| ^_- | |
| """ | |
| def __remote_call(self, species, images, thr=0.7): | |
| url = os.environ.get("INFINIFLOW_SERVER") | |
| if not url:raise EnvironmentError("Please set environment variable: 'INFINIFLOW_SERVER'") | |
| token = os.environ.get("INFINIFLOW_TOKEN") | |
| if not token:raise EnvironmentError("Please set environment variable: 'INFINIFLOW_TOKEN'") | |
| def convert_image_to_bytes(PILimage): | |
| image = BytesIO() | |
| PILimage.save(image, format='png') | |
| image.seek(0) | |
| return image.getvalue() | |
| images = [convert_image_to_bytes(img) for img in images] | |
| def remote_call(): | |
| nonlocal images, thr | |
| res = requests.post(url+"/v1/layout/detect/"+species, files=[("image", img) for img in images], data={"threashold": thr}, | |
| headers={"Authorization": token}, timeout=len(images) * 10) | |
| res = res.json() | |
| if res["retcode"] != 0: raise RuntimeError(res["retmsg"]) | |
| return res["data"] | |
| for _ in range(3): | |
| try: | |
| return remote_call() | |
| except RuntimeError as e: | |
| raise e | |
| except Exception as e: | |
| logging.error("layout_predict:"+str(e)) | |
| return remote_call() | |
| def __char_width(self, c): | |
| return (c["x1"] - c["x0"]) // len(c["text"]) | |
| def __height(self, c): | |
| return c["bottom"] - c["top"] | |
| def _x_dis(self, a, b): | |
| return min(abs(a["x1"] - b["x0"]), abs(a["x0"] - b["x1"]), | |
| abs(a["x0"] + a["x1"] - b["x0"] - b["x1"]) / 2) | |
| def _y_dis( | |
| self, a, b): | |
| return ( | |
| b["top"] + b["bottom"] - a["top"] - a["bottom"]) / 2 | |
| def _match_proj(self, b): | |
| proj_patt = [ | |
| r"第[零一二三四五六七八九十百]+章", | |
| r"第[零一二三四五六七八九十百]+[条节]", | |
| r"[零一二三四五六七八九十百]+[、是 ]", | |
| r"[\((][零一二三四五六七八九十百]+[)\)]", | |
| r"[\((][0-9]+[)\)]", | |
| r"[0-9]+(、|\.[ ]|)|\.[^0-9./a-zA-Z_%><-]{4,})", | |
| r"[0-9]+\.[0-9.]+(、|\.[ ])", | |
| r"[⚫•➢①② ]", | |
| ] | |
| return any([re.match(p, b["text"]) for p in proj_patt]) | |
| def _updown_concat_features(self, up, down): | |
| w = max(self.__char_width(up), self.__char_width(down)) | |
| h = max(self.__height(up), self.__height(down)) | |
| y_dis = self._y_dis(up, down) | |
| LEN = 6 | |
| tks_down = huqie.qie(down["text"][:LEN]).split(" ") | |
| tks_up = huqie.qie(up["text"][-LEN:]).split(" ") | |
| tks_all = up["text"][-LEN:].strip() \ | |
| + (" " if re.match(r"[a-zA-Z0-9]+", | |
| up["text"][-1] + down["text"][0]) else "") \ | |
| + down["text"][:LEN].strip() | |
| tks_all = huqie.qie(tks_all).split(" ") | |
| fea = [ | |
| up.get("R", -1) == down.get("R", -1), | |
| y_dis / h, | |
| down["page_number"] - up["page_number"], | |
| up["layout_type"] == down["layout_type"], | |
| up["layout_type"] == "text", | |
| down["layout_type"] == "text", | |
| up["layout_type"] == "table", | |
| down["layout_type"] == "table", | |
| True if re.search( | |
| r"([。?!;!?;+))]|[a-z]\.)$", | |
| up["text"]) else False, | |
| True if re.search(r"[,:‘“、0-9(+-]$", up["text"]) else False, | |
| True if re.search( | |
| r"(^.?[/,?;:\],。;:’”?!》】)-])", | |
| down["text"]) else False, | |
| True if re.match(r"[\((][^\(\)()]+[)\)]$", up["text"]) else False, | |
| True if re.search(r"[,,][^。.]+$", up["text"]) else False, | |
| True if re.search(r"[,,][^。.]+$", up["text"]) else False, | |
| True if re.search(r"[\((][^\))]+$", up["text"]) | |
| and re.search(r"[\))]", down["text"]) else False, | |
| self._match_proj(down), | |
| True if re.match(r"[A-Z]", down["text"]) else False, | |
| True if re.match(r"[A-Z]", up["text"][-1]) else False, | |
| True if re.match(r"[a-z0-9]", up["text"][-1]) else False, | |
| True if re.match(r"[0-9.%,-]+$", down["text"]) else False, | |
| up["text"].strip()[-2:] == down["text"].strip()[-2:] if len(up["text"].strip() | |
| ) > 1 and len( | |
| down["text"].strip()) > 1 else False, | |
| up["x0"] > down["x1"], | |
| abs(self.__height(up) - self.__height(down)) / min(self.__height(up), | |
| self.__height(down)), | |
| self._x_dis(up, down) / max(w, 0.000001), | |
| (len(up["text"]) - len(down["text"])) / | |
| max(len(up["text"]), len(down["text"])), | |
| len(tks_all) - len(tks_up) - len(tks_down), | |
| len(tks_down) - len(tks_up), | |
| tks_down[-1] == tks_up[-1], | |
| max(down["in_row"], up["in_row"]), | |
| abs(down["in_row"] - up["in_row"]), | |
| len(tks_down) == 1 and huqie.tag(tks_down[0]).find("n") >= 0, | |
| len(tks_up) == 1 and huqie.tag(tks_up[0]).find("n") >= 0 | |
| ] | |
| return fea | |
| def sort_Y_firstly(arr, threashold): | |
| # sort using y1 first and then x1 | |
| arr = sorted(arr, key=lambda r: (r["top"], r["x0"])) | |
| for i in range(len(arr) - 1): | |
| for j in range(i, -1, -1): | |
| # restore the order using th | |
| if abs(arr[j + 1]["top"] - arr[j]["top"]) < threashold \ | |
| and arr[j + 1]["x0"] < arr[j]["x0"]: | |
| tmp = deepcopy(arr[j]) | |
| arr[j] = deepcopy(arr[j + 1]) | |
| arr[j + 1] = deepcopy(tmp) | |
| return arr | |
| def sort_X_by_page(arr, threashold): | |
| # sort using y1 first and then x1 | |
| arr = sorted(arr, key=lambda r: (r["page_number"], r["x0"], r["top"])) | |
| for i in range(len(arr) - 1): | |
| for j in range(i, -1, -1): | |
| # restore the order using th | |
| if abs(arr[j + 1]["x0"] - arr[j]["x0"]) < threashold \ | |
| and arr[j + 1]["top"] < arr[j]["top"]\ | |
| and arr[j + 1]["page_number"] == arr[j]["page_number"]: | |
| tmp = arr[j] | |
| arr[j] = arr[j + 1] | |
| arr[j + 1] = tmp | |
| return arr | |
| def sort_R_firstly(arr, thr=0): | |
| # sort using y1 first and then x1 | |
| # sorted(arr, key=lambda r: (r["top"], r["x0"])) | |
| arr = HuParser.sort_Y_firstly(arr, thr) | |
| for i in range(len(arr) - 1): | |
| for j in range(i, -1, -1): | |
| if "R" not in arr[j] or "R" not in arr[j + 1]: | |
| continue | |
| if arr[j + 1]["R"] < arr[j]["R"] \ | |
| or ( | |
| arr[j + 1]["R"] == arr[j]["R"] | |
| and arr[j + 1]["x0"] < arr[j]["x0"] | |
| ): | |
| tmp = arr[j] | |
| arr[j] = arr[j + 1] | |
| arr[j + 1] = tmp | |
| return arr | |
| def sort_X_firstly(arr, threashold, copy=True): | |
| # sort using y1 first and then x1 | |
| arr = sorted(arr, key=lambda r: (r["x0"], r["top"])) | |
| for i in range(len(arr) - 1): | |
| for j in range(i, -1, -1): | |
| # restore the order using th | |
| if abs(arr[j + 1]["x0"] - arr[j]["x0"]) < threashold \ | |
| and arr[j + 1]["top"] < arr[j]["top"]: | |
| tmp = deepcopy(arr[j]) if copy else arr[j] | |
| arr[j] = deepcopy(arr[j + 1]) if copy else arr[j + 1] | |
| arr[j + 1] = deepcopy(tmp) if copy else tmp | |
| return arr | |
| def sort_C_firstly(arr, thr=0): | |
| # sort using y1 first and then x1 | |
| # sorted(arr, key=lambda r: (r["x0"], r["top"])) | |
| arr = HuParser.sort_X_firstly(arr, thr) | |
| for i in range(len(arr) - 1): | |
| for j in range(i, -1, -1): | |
| # restore the order using th | |
| if "C" not in arr[j] or "C" not in arr[j + 1]: | |
| continue | |
| if arr[j + 1]["C"] < arr[j]["C"] \ | |
| or ( | |
| arr[j + 1]["C"] == arr[j]["C"] | |
| and arr[j + 1]["top"] < arr[j]["top"] | |
| ): | |
| tmp = arr[j] | |
| arr[j] = arr[j + 1] | |
| arr[j + 1] = tmp | |
| return arr | |
| return sorted(arr, key=lambda r: (r.get("C", r["x0"]), r["top"])) | |
| def _has_color(self, o): | |
| if o.get("ncs", "") == "DeviceGray": | |
| if o["stroking_color"] and o["stroking_color"][0] == 1 and o["non_stroking_color"] and \ | |
| o["non_stroking_color"][0] == 1: | |
| if re.match(r"[a-zT_\[\]\(\)-]+", o.get("text", "")): | |
| return False | |
| return True | |
| def __overlapped_area(self, a, b, ratio=True): | |
| tp, btm, x0, x1 = a["top"], a["bottom"], a["x0"], a["x1"] | |
| if b["x0"] > x1 or b["x1"] < x0: | |
| return 0 | |
| if b["bottom"] < tp or b["top"] > btm: | |
| return 0 | |
| x0_ = max(b["x0"], x0) | |
| x1_ = min(b["x1"], x1) | |
| assert x0_ <= x1_, "Fuckedup! T:{},B:{},X0:{},X1:{} ==> {}".format( | |
| tp, btm, x0, x1, b) | |
| tp_ = max(b["top"], tp) | |
| btm_ = min(b["bottom"], btm) | |
| assert tp_ <= btm_, "Fuckedup! T:{},B:{},X0:{},X1:{} => {}".format( | |
| tp, btm, x0, x1, b) | |
| ov = (btm_ - tp_) * (x1_ - x0_) if x1 - \ | |
| x0 != 0 and btm - tp != 0 else 0 | |
| if ov > 0 and ratio: | |
| ov /= (x1 - x0) * (btm - tp) | |
| return ov | |
| def __find_overlapped_with_threashold(self, box, boxes, thr=0.3): | |
| if not boxes: | |
| return | |
| max_overlaped_i, max_overlaped, _max_overlaped = None, thr, 0 | |
| s, e = 0, len(boxes) | |
| for i in range(s, e): | |
| ov = self.__overlapped_area(box, boxes[i]) | |
| _ov = self.__overlapped_area(boxes[i], box) | |
| if (ov, _ov) < (max_overlaped, _max_overlaped): | |
| continue | |
| max_overlaped_i = i | |
| max_overlaped = ov | |
| _max_overlaped = _ov | |
| return max_overlaped_i | |
| def __find_overlapped(self, box, boxes_sorted_by_y, naive=False): | |
| if not boxes_sorted_by_y: | |
| return | |
| bxs = boxes_sorted_by_y | |
| s, e, ii = 0, len(bxs), 0 | |
| while s < e and not naive: | |
| ii = (e + s) // 2 | |
| pv = bxs[ii] | |
| if box["bottom"] < pv["top"]: | |
| e = ii | |
| continue | |
| if box["top"] > pv["bottom"]: | |
| s = ii + 1 | |
| continue | |
| break | |
| while s < ii: | |
| if box["top"] > bxs[s]["bottom"]: | |
| s += 1 | |
| break | |
| while e - 1 > ii: | |
| if box["bottom"] < bxs[e - 1]["top"]: | |
| e -= 1 | |
| break | |
| max_overlaped_i, max_overlaped = None, 0 | |
| for i in range(s, e): | |
| ov = self.__overlapped_area(bxs[i], box) | |
| if ov <= max_overlaped: | |
| continue | |
| max_overlaped_i = i | |
| max_overlaped = ov | |
| return max_overlaped_i | |
| def _is_garbage(self, b): | |
| patt = [r"^•+$", r"(版权归©|免责条款|地址[::])", r"\.{3,}", "^[0-9]{1,2} / ?[0-9]{1,2}$", | |
| r"^[0-9]{1,2} of [0-9]{1,2}$", "^http://[^ ]{12,}", | |
| "(资料|数据)来源[::]", "[0-9a-z._-]+@[a-z0-9-]+\\.[a-z]{2,3}", | |
| "\\(cid *: *[0-9]+ *\\)" | |
| ] | |
| return any([re.search(p, b["text"]) for p in patt]) | |
| def __layouts_cleanup(self, boxes, layouts, far=2, thr=0.7): | |
| def notOverlapped(a, b): | |
| return any([a["x1"] < b["x0"], | |
| a["x0"] > b["x1"], | |
| a["bottom"] < b["top"], | |
| a["top"] > b["bottom"]]) | |
| i = 0 | |
| while i + 1 < len(layouts): | |
| j = i + 1 | |
| while j < min(i + far, len(layouts)) \ | |
| and (layouts[i].get("type", "") != layouts[j].get("type", "") | |
| or notOverlapped(layouts[i], layouts[j])): | |
| j += 1 | |
| if j >= min(i + far, len(layouts)): | |
| i += 1 | |
| continue | |
| if self.__overlapped_area(layouts[i], layouts[j]) < thr \ | |
| and self.__overlapped_area(layouts[j], layouts[i]) < thr: | |
| i += 1 | |
| continue | |
| if layouts[i].get("score") and layouts[j].get("score"): | |
| if layouts[i]["score"] > layouts[j]["score"]: | |
| layouts.pop(j) | |
| else: | |
| layouts.pop(i) | |
| continue | |
| area_i, area_i_1 = 0, 0 | |
| for b in boxes: | |
| if not notOverlapped(b, layouts[i]): | |
| area_i += self.__overlapped_area(b, layouts[i], False) | |
| if not notOverlapped(b, layouts[j]): | |
| area_i_1 += self.__overlapped_area(b, layouts[j], False) | |
| if area_i > area_i_1: | |
| layouts.pop(j) | |
| else: | |
| layouts.pop(i) | |
| return layouts | |
| def __table_paddle(self, images): | |
| tbls = self.tbl_det(images, thr=0.5) | |
| res = [] | |
| # align left&right for rows, align top&bottom for columns | |
| for tbl in tbls: | |
| lts = [{"label": b["type"], | |
| "score": b["score"], | |
| "x0": b["bbox"][0], "x1": b["bbox"][2], | |
| "top": b["bbox"][1], "bottom": b["bbox"][-1] | |
| } for b in tbl] | |
| if not lts: | |
| continue | |
| left = [b["x0"] for b in lts if b["label"].find( | |
| "row") > 0 or b["label"].find("header") > 0] | |
| right = [b["x1"] for b in lts if b["label"].find( | |
| "row") > 0 or b["label"].find("header") > 0] | |
| if not left: | |
| continue | |
| left = np.median(left) if len(left) > 4 else np.min(left) | |
| right = np.median(right) if len(right) > 4 else np.max(right) | |
| for b in lts: | |
| if b["label"].find("row") > 0 or b["label"].find("header") > 0: | |
| if b["x0"] > left: | |
| b["x0"] = left | |
| if b["x1"] < right: | |
| b["x1"] = right | |
| top = [b["top"] for b in lts if b["label"] == "table column"] | |
| bottom = [b["bottom"] for b in lts if b["label"] == "table column"] | |
| if not top: | |
| res.append(lts) | |
| continue | |
| top = np.median(top) if len(top) > 4 else np.min(top) | |
| bottom = np.median(bottom) if len(bottom) > 4 else np.max(bottom) | |
| for b in lts: | |
| if b["label"] == "table column": | |
| if b["top"] > top: | |
| b["top"] = top | |
| if b["bottom"] < bottom: | |
| b["bottom"] = bottom | |
| res.append(lts) | |
| return res | |
| def _table_transformer_job(self, ZM): | |
| logging.info("Table processing...") | |
| imgs, pos = [], [] | |
| tbcnt = [0] | |
| MARGIN = 10 | |
| self.tb_cpns = [] | |
| assert len(self.page_layout) == len(self.page_images) | |
| for p, tbls in enumerate(self.page_layout): # for page | |
| tbls = [f for f in tbls if f["type"] == "table"] | |
| tbcnt.append(len(tbls)) | |
| if not tbls: | |
| continue | |
| for tb in tbls: # for table | |
| left, top, right, bott = tb["x0"] - MARGIN, tb["top"] - MARGIN, \ | |
| tb["x1"] + MARGIN, tb["bottom"] + MARGIN | |
| left *= ZM | |
| top *= ZM | |
| right *= ZM | |
| bott *= ZM | |
| pos.append((left, top)) | |
| imgs.append(self.page_images[p].crop((left, top, right, bott))) | |
| assert len(self.page_images) == len(tbcnt) - 1 | |
| if not imgs: | |
| return | |
| recos = self.__table_paddle(imgs) | |
| tbcnt = np.cumsum(tbcnt) | |
| for i in range(len(tbcnt) - 1): # for page | |
| pg = [] | |
| for j, tb_items in enumerate( | |
| recos[tbcnt[i]: tbcnt[i + 1]]): # for table | |
| poss = pos[tbcnt[i]: tbcnt[i + 1]] | |
| for it in tb_items: # for table components | |
| it["x0"] = (it["x0"] + poss[j][0]) | |
| it["x1"] = (it["x1"] + poss[j][0]) | |
| it["top"] = (it["top"] + poss[j][1]) | |
| it["bottom"] = (it["bottom"] + poss[j][1]) | |
| for n in ["x0", "x1", "top", "bottom"]: | |
| it[n] /= ZM | |
| it["top"] += self.page_cum_height[i] | |
| it["bottom"] += self.page_cum_height[i] | |
| it["pn"] = i | |
| it["layoutno"] = j | |
| pg.append(it) | |
| self.tb_cpns.extend(pg) | |
| def gather(kwd, fzy=10, ption=0.6): | |
| eles = self.sort_Y_firstly( | |
| [r for r in self.tb_cpns if re.match(kwd, r["label"])], fzy) | |
| eles = self.__layouts_cleanup(self.boxes, eles, 5, ption) | |
| return self.sort_Y_firstly(eles, 0) | |
| # add R,H,C,SP tag to boxes within table layout | |
| headers = gather(r".*header$") | |
| rows = gather(r".* (row|header)") | |
| spans = gather(r".*spanning") | |
| clmns = sorted([r for r in self.tb_cpns if re.match( | |
| r"table column$", r["label"])], key=lambda x: (x["pn"], x["layoutno"], x["x0"])) | |
| clmns = self.__layouts_cleanup(self.boxes, clmns, 5, 0.5) | |
| for b in self.boxes: | |
| if b.get("layout_type", "") != "table": | |
| continue | |
| ii = self.__find_overlapped_with_threashold(b, rows, thr=0.3) | |
| if ii is not None: | |
| b["R"] = ii | |
| b["R_top"] = rows[ii]["top"] | |
| b["R_bott"] = rows[ii]["bottom"] | |
| ii = self.__find_overlapped_with_threashold(b, headers, thr=0.3) | |
| if ii is not None: | |
| b["H_top"] = headers[ii]["top"] | |
| b["H_bott"] = headers[ii]["bottom"] | |
| b["H_left"] = headers[ii]["x0"] | |
| b["H_right"] = headers[ii]["x1"] | |
| b["H"] = ii | |
| ii = self.__find_overlapped_with_threashold(b, clmns, thr=0.3) | |
| if ii is not None: | |
| b["C"] = ii | |
| b["C_left"] = clmns[ii]["x0"] | |
| b["C_right"] = clmns[ii]["x1"] | |
| ii = self.__find_overlapped_with_threashold(b, spans, thr=0.3) | |
| if ii is not None: | |
| b["H_top"] = spans[ii]["top"] | |
| b["H_bott"] = spans[ii]["bottom"] | |
| b["H_left"] = spans[ii]["x0"] | |
| b["H_right"] = spans[ii]["x1"] | |
| b["SP"] = ii | |
| def __ocr_paddle(self, pagenum, img, chars, ZM=3): | |
| bxs = self.ocr.ocr(np.array(img), cls=True)[0] | |
| if not bxs: | |
| self.boxes.append([]) | |
| return | |
| bxs = [(line[0], line[1][0]) for line in bxs] | |
| bxs = self.sort_Y_firstly( | |
| [{"x0": b[0][0] / ZM, "x1": b[1][0] / ZM, | |
| "top": b[0][1] / ZM, "text": "", "txt": t, | |
| "bottom": b[-1][1] / ZM, | |
| "page_number": pagenum} for b, t in bxs if b[0][0] <= b[1][0] and b[0][1] <= b[-1][1]], | |
| self.mean_height[-1] / 3 | |
| ) | |
| # merge chars in the same rect | |
| for c in self.sort_X_firstly(chars, self.mean_width[pagenum - 1] // 4): | |
| ii = self.__find_overlapped(c, bxs) | |
| if ii is None: | |
| self.lefted_chars.append(c) | |
| continue | |
| ch = c["bottom"] - c["top"] | |
| bh = bxs[ii]["bottom"] - bxs[ii]["top"] | |
| if abs(ch - bh) / max(ch, bh) >= 0.7 and c["text"] != ' ': | |
| self.lefted_chars.append(c) | |
| continue | |
| if c["text"] == " " and bxs[ii]["text"]: | |
| if re.match(r"[0-9a-zA-Z,.?;:!%%]", bxs[ii]["text"][-1]): bxs[ii]["text"] += " " | |
| else: | |
| bxs[ii]["text"] += c["text"] | |
| for b in bxs: | |
| if not b["text"]: | |
| b["text"] = b["txt"] | |
| del b["txt"] | |
| if self.mean_height[-1] == 0: | |
| self.mean_height[-1] = np.median([b["bottom"] - b["top"] | |
| for b in bxs]) | |
| self.boxes.append(bxs) | |
| def _layouts_paddle(self, ZM): | |
| assert len(self.page_images) == len(self.boxes) | |
| # Tag layout type | |
| boxes = [] | |
| layouts = self.layouter(self.page_images) | |
| assert len(self.page_images) == len(layouts) | |
| for pn, lts in enumerate(layouts): | |
| bxs = self.boxes[pn] | |
| lts = [{"type": b["type"], | |
| "score": float(b["score"]), | |
| "x0": b["bbox"][0] / ZM, "x1": b["bbox"][2] / ZM, | |
| "top": b["bbox"][1] / ZM, "bottom": b["bbox"][-1] / ZM, | |
| "page_number": pn, | |
| } for b in lts] | |
| lts = self.sort_Y_firstly(lts, self.mean_height[pn] / 2) | |
| lts = self.__layouts_cleanup(bxs, lts) | |
| self.page_layout.append(lts) | |
| # Tag layout type, layouts are ready | |
| def findLayout(ty): | |
| nonlocal bxs, lts | |
| lts_ = [lt for lt in lts if lt["type"] == ty] | |
| i = 0 | |
| while i < len(bxs): | |
| if bxs[i].get("layout_type"): | |
| i += 1 | |
| continue | |
| if self._is_garbage(bxs[i]): | |
| logging.debug("GARBAGE: " + bxs[i]["text"]) | |
| bxs.pop(i) | |
| continue | |
| ii = self.__find_overlapped_with_threashold(bxs[i], lts_, | |
| thr=0.4) | |
| if ii is None: # belong to nothing | |
| bxs[i]["layout_type"] = "" | |
| i += 1 | |
| continue | |
| lts_[ii]["visited"] = True | |
| if lts_[ii]["type"] in ["footer", "header", "reference"]: | |
| if lts_[ii]["type"] not in self.garbages: | |
| self.garbages[lts_[ii]["type"]] = [] | |
| self.garbages[lts_[ii]["type"]].append(bxs[i]["text"]) | |
| logging.debug("GARBAGE: " + bxs[i]["text"]) | |
| bxs.pop(i) | |
| continue | |
| bxs[i]["layoutno"] = f"{ty}-{ii}" | |
| bxs[i]["layout_type"] = lts_[ii]["type"] | |
| i += 1 | |
| for lt in ["footer", "header", "reference", "figure caption", | |
| "table caption", "title", "text", "table", "figure"]: | |
| findLayout(lt) | |
| # add box to figure layouts which has not text box | |
| for i, lt in enumerate( | |
| [lt for lt in lts if lt["type"] == "figure"]): | |
| if lt.get("visited"): | |
| continue | |
| lt = deepcopy(lt) | |
| del lt["type"] | |
| lt["text"] = "" | |
| lt["layout_type"] = "figure" | |
| lt["layoutno"] = f"figure-{i}" | |
| bxs.append(lt) | |
| boxes.extend(bxs) | |
| self.boxes = boxes | |
| garbage = set() | |
| for k in self.garbages.keys(): | |
| self.garbages[k] = Counter(self.garbages[k]) | |
| for g, c in self.garbages[k].items(): | |
| if c > 1: | |
| garbage.add(g) | |
| logging.debug("GARBAGE:" + ",".join(garbage)) | |
| self.boxes = [b for b in self.boxes if b["text"].strip() not in garbage] | |
| # cumlative Y | |
| for i in range(len(self.boxes)): | |
| self.boxes[i]["top"] += \ | |
| self.page_cum_height[self.boxes[i]["page_number"] - 1] | |
| self.boxes[i]["bottom"] += \ | |
| self.page_cum_height[self.boxes[i]["page_number"] - 1] | |
| def _text_merge(self): | |
| # merge adjusted boxes | |
| bxs = self.boxes | |
| def end_with(b, txt): | |
| txt = txt.strip() | |
| tt = b.get("text", "").strip() | |
| return tt and tt.find(txt) == len(tt) - len(txt) | |
| def start_with(b, txts): | |
| tt = b.get("text", "").strip() | |
| return tt and any([tt.find(t.strip()) == 0 for t in txts]) | |
| # horizontally merge adjacent box with the same layout | |
| i = 0 | |
| while i < len(bxs) - 1: | |
| b = bxs[i] | |
| b_ = bxs[i + 1] | |
| if b.get("layoutno", "0") != b_.get("layoutno", "1"): | |
| i += 1 | |
| continue | |
| dis_thr = 1 | |
| dis = b["x1"] - b_["x0"] | |
| if b.get("layout_type", "") != "text" or b_.get( | |
| "layout_type", "") != "text": | |
| if end_with(b, ",") or start_with(b_, "(,"): | |
| dis_thr = -8 | |
| else: | |
| i += 1 | |
| continue | |
| if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 5 \ | |
| and dis >= dis_thr and b["x1"] < b_["x1"]: | |
| # merge | |
| bxs[i]["x1"] = b_["x1"] | |
| bxs[i]["top"] = (b["top"] + b_["top"]) / 2 | |
| bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2 | |
| bxs[i]["text"] += b_["text"] | |
| bxs.pop(i + 1) | |
| continue | |
| i += 1 | |
| self.boxes = bxs | |
| def _naive_vertical_merge(self): | |
| bxs = self.sort_Y_firstly(self.boxes, np.median(self.mean_height) / 3) | |
| i = 0 | |
| while i + 1 < len(bxs): | |
| b = bxs[i] | |
| b_ = bxs[i + 1] | |
| if b["page_number"] < b_["page_number"] and re.match(r"[0-9 •一—-]+$", b["text"]): | |
| bxs.pop(i) | |
| continue | |
| concatting_feats = [ | |
| b["text"].strip()[-1] in ",;:'\",、‘“;:-", | |
| len(b["text"].strip()) > 1 and b["text"].strip()[-2] in ",;:'\",‘“、;:", | |
| b["text"].strip()[0] in "。;?!?”)),,、:", | |
| ] | |
| # features for not concating | |
| feats = [ | |
| b.get("layoutno", 0) != b.get("layoutno", 0), | |
| b["text"].strip()[-1] in "。?!?", | |
| self.is_english and b["text"].strip()[-1] in ".!?", | |
| b["page_number"] == b_["page_number"] and b_["top"] - \ | |
| b["bottom"] > self.mean_height[b["page_number"] - 1] * 1.5, | |
| b["page_number"] < b_["page_number"] and abs( | |
| b["x0"] - b_["x0"]) > self.mean_width[b["page_number"] - 1] * 4 | |
| ] | |
| if any(feats) and not any(concatting_feats): | |
| i += 1 | |
| continue | |
| # merge up and down | |
| b["bottom"] = b_["bottom"] | |
| b["text"] += b_["text"] | |
| b["x0"] = min(b["x0"], b_["x0"]) | |
| b["x1"] = max(b["x1"], b_["x1"]) | |
| bxs.pop(i + 1) | |
| self.boxes = bxs | |
| def _concat_downward(self, concat_between_pages=True): | |
| # count boxes in the same row as a feature | |
| for i in range(len(self.boxes)): | |
| mh = self.mean_height[self.boxes[i]["page_number"] - 1] | |
| self.boxes[i]["in_row"] = 0 | |
| j = max(0, i - 12) | |
| while j < min(i + 12, len(self.boxes)): | |
| if j == i: | |
| j += 1 | |
| continue | |
| ydis = self._y_dis(self.boxes[i], self.boxes[j]) / mh | |
| if abs(ydis) < 1: | |
| self.boxes[i]["in_row"] += 1 | |
| elif ydis > 0: | |
| break | |
| j += 1 | |
| # concat between rows | |
| boxes = deepcopy(self.boxes) | |
| blocks = [] | |
| while boxes: | |
| chunks = [] | |
| def dfs(up, dp): | |
| chunks.append(up) | |
| i = dp | |
| while i < min(dp + 12, len(boxes)): | |
| ydis = self._y_dis(up, boxes[i]) | |
| smpg = up["page_number"] == boxes[i]["page_number"] | |
| mh = self.mean_height[up["page_number"] - 1] | |
| mw = self.mean_width[up["page_number"] - 1] | |
| if smpg and ydis > mh * 4: | |
| break | |
| if not smpg and ydis > mh * 16: | |
| break | |
| down = boxes[i] | |
| if not concat_between_pages and down["page_number"] > up["page_number"]: | |
| break | |
| if up.get("R", "") != down.get( | |
| "R", "") and up["text"][-1] != ",": | |
| i += 1 | |
| continue | |
| if re.match(r"[0-9]{2,3}/[0-9]{3}$", up["text"]) \ | |
| or re.match(r"[0-9]{2,3}/[0-9]{3}$", down["text"]): | |
| i += 1 | |
| continue | |
| if not down["text"].strip(): | |
| i += 1 | |
| continue | |
| if up["x1"] < down["x0"] - 10 * \ | |
| mw or up["x0"] > down["x1"] + 10 * mw: | |
| i += 1 | |
| continue | |
| if i - dp < 5 and up.get("layout_type") == "text": | |
| if up.get("layoutno", "1") == down.get( | |
| "layoutno", "2"): | |
| dfs(down, i + 1) | |
| boxes.pop(i) | |
| return | |
| i += 1 | |
| continue | |
| fea = self._updown_concat_features(up, down) | |
| if self.updown_cnt_mdl.predict( | |
| xgb.DMatrix([fea]))[0] <= 0.5: | |
| i += 1 | |
| continue | |
| dfs(down, i + 1) | |
| boxes.pop(i) | |
| return | |
| dfs(boxes[0], 1) | |
| boxes.pop(0) | |
| if chunks: | |
| blocks.append(chunks) | |
| # concat within each block | |
| boxes = [] | |
| for b in blocks: | |
| if len(b) == 1: | |
| boxes.append(b[0]) | |
| continue | |
| t = b[0] | |
| for c in b[1:]: | |
| t["text"] = t["text"].strip() | |
| c["text"] = c["text"].strip() | |
| if not c["text"]: | |
| continue | |
| if t["text"] and re.match( | |
| r"[0-9\.a-zA-Z]+$", t["text"][-1] + c["text"][-1]): | |
| t["text"] += " " | |
| t["text"] += c["text"] | |
| t["x0"] = min(t["x0"], c["x0"]) | |
| t["x1"] = max(t["x1"], c["x1"]) | |
| t["page_number"] = min(t["page_number"], c["page_number"]) | |
| t["bottom"] = c["bottom"] | |
| if not t["layout_type"] \ | |
| and c["layout_type"]: | |
| t["layout_type"] = c["layout_type"] | |
| boxes.append(t) | |
| self.boxes = self.sort_Y_firstly(boxes, 0) | |
| def _filter_forpages(self): | |
| if not self.boxes: | |
| return | |
| findit = False | |
| i = 0 | |
| while i < len(self.boxes): | |
| if not re.match(r"(contents|目录|目次|table of contents|致谢|acknowledge)$", re.sub(r"( | |\u3000)+", "", self.boxes[i]["text"].lower())): | |
| i += 1 | |
| continue | |
| findit = True | |
| eng = re.match(r"[0-9a-zA-Z :'.-]{5,}", self.boxes[i]["text"].strip()) | |
| self.boxes.pop(i) | |
| if i >= len(self.boxes): break | |
| prefix = self.boxes[i]["text"].strip()[:3] if not eng else " ".join(self.boxes[i]["text"].strip().split(" ")[:2]) | |
| while not prefix: | |
| self.boxes.pop(i) | |
| if i >= len(self.boxes): break | |
| prefix = self.boxes[i]["text"].strip()[:3] if not eng else " ".join(self.boxes[i]["text"].strip().split(" ")[:2]) | |
| self.boxes.pop(i) | |
| if i >= len(self.boxes) or not prefix: break | |
| for j in range(i, min(i + 128, len(self.boxes))): | |
| if not re.match(prefix, self.boxes[j]["text"]): | |
| continue | |
| for k in range(i, j): self.boxes.pop(i) | |
| break | |
| if findit:return | |
| page_dirty = [0] * len(self.page_images) | |
| for b in self.boxes: | |
| if re.search(r"(··|··|··)", b["text"]): | |
| page_dirty[b["page_number"]-1] += 1 | |
| page_dirty = set([i+1 for i, t in enumerate(page_dirty) if t > 3]) | |
| if not page_dirty: return | |
| i = 0 | |
| while i < len(self.boxes): | |
| if self.boxes[i]["page_number"] in page_dirty: | |
| self.boxes.pop(i) | |
| continue | |
| i += 1 | |
| def _merge_with_same_bullet(self): | |
| i = 0 | |
| while i + 1 < len(self.boxes): | |
| b = self.boxes[i] | |
| b_ = self.boxes[i + 1] | |
| if not b["text"].strip(): | |
| self.boxes.pop(i) | |
| continue | |
| if not b_["text"].strip(): | |
| self.boxes.pop(i+1) | |
| continue | |
| if b["text"].strip()[0] != b_["text"].strip()[0] \ | |
| or b["text"].strip()[0].lower() in set("qwertyuopasdfghjklzxcvbnm") \ | |
| or huqie.is_chinese(b["text"].strip()[0]) \ | |
| or b["top"] > b_["bottom"]: | |
| i += 1 | |
| continue | |
| b_["text"] = b["text"] + "\n" + b_["text"] | |
| b_["x0"] = min(b["x0"], b_["x0"]) | |
| b_["x1"] = max(b["x1"], b_["x1"]) | |
| b_["top"] = b["top"] | |
| self.boxes.pop(i) | |
| def _blockType(self, b): | |
| patt = [ | |
| ("^(20|19)[0-9]{2}[年/-][0-9]{1,2}[月/-][0-9]{1,2}日*$", "Dt"), | |
| (r"^(20|19)[0-9]{2}年$", "Dt"), | |
| (r"^(20|19)[0-9]{2}[年-][0-9]{1,2}月*$", "Dt"), | |
| ("^[0-9]{1,2}[月-][0-9]{1,2}日*$", "Dt"), | |
| (r"^第*[一二三四1-4]季度$", "Dt"), | |
| (r"^(20|19)[0-9]{2}年*[一二三四1-4]季度$", "Dt"), | |
| (r"^(20|19)[0-9]{2}[ABCDE]$", "Dt"), | |
| ("^[0-9.,+%/ -]+$", "Nu"), | |
| (r"^[0-9A-Z/\._~-]+$", "Ca"), | |
| (r"^[A-Z]*[a-z' -]+$", "En"), | |
| (r"^[0-9.,+-]+[0-9A-Za-z/$¥%<>()()' -]+$", "NE"), | |
| (r"^.{1}$", "Sg") | |
| ] | |
| for p, n in patt: | |
| if re.search(p, b["text"].strip()): | |
| return n | |
| tks = [t for t in huqie.qie(b["text"]).split(" ") if len(t) > 1] | |
| if len(tks) > 3: | |
| if len(tks) < 12: | |
| return "Tx" | |
| else: | |
| return "Lx" | |
| if len(tks) == 1 and huqie.tag(tks[0]) == "nr": | |
| return "Nr" | |
| return "Ot" | |
| def __cal_spans(self, boxes, rows, cols, tbl, html=True): | |
| # caculate span | |
| clft = [np.mean([c.get("C_left", c["x0"]) for c in cln]) | |
| for cln in cols] | |
| crgt = [np.mean([c.get("C_right", c["x1"]) for c in cln]) | |
| for cln in cols] | |
| rtop = [np.mean([c.get("R_top", c["top"]) for c in row]) | |
| for row in rows] | |
| rbtm = [np.mean([c.get("R_btm", c["bottom"]) | |
| for c in row]) for row in rows] | |
| for b in boxes: | |
| if "SP" not in b: | |
| continue | |
| b["colspan"] = [b["cn"]] | |
| b["rowspan"] = [b["rn"]] | |
| # col span | |
| for j in range(0, len(clft)): | |
| if j == b["cn"]: | |
| continue | |
| if clft[j] + (crgt[j] - clft[j]) / 2 < b["H_left"]: | |
| continue | |
| if crgt[j] - (crgt[j] - clft[j]) / 2 > b["H_right"]: | |
| continue | |
| b["colspan"].append(j) | |
| # row span | |
| for j in range(0, len(rtop)): | |
| if j == b["rn"]: | |
| continue | |
| if rtop[j] + (rbtm[j] - rtop[j]) / 2 < b["H_top"]: | |
| continue | |
| if rbtm[j] - (rbtm[j] - rtop[j]) / 2 > b["H_bott"]: | |
| continue | |
| b["rowspan"].append(j) | |
| def join(arr): | |
| if not arr: | |
| return "" | |
| return "".join([t["text"] for t in arr]) | |
| # rm the spaning cells | |
| for i in range(len(tbl)): | |
| for j, arr in enumerate(tbl[i]): | |
| if not arr: | |
| continue | |
| if all(["rowspan" not in a and "colspan" not in a for a in arr]): | |
| continue | |
| rowspan, colspan = [], [] | |
| for a in arr: | |
| if isinstance(a.get("rowspan", 0), list): | |
| rowspan.extend(a["rowspan"]) | |
| if isinstance(a.get("colspan", 0), list): | |
| colspan.extend(a["colspan"]) | |
| rowspan, colspan = set(rowspan), set(colspan) | |
| if len(rowspan) < 2 and len(colspan) < 2: | |
| for a in arr: | |
| if "rowspan" in a: | |
| del a["rowspan"] | |
| if "colspan" in a: | |
| del a["colspan"] | |
| continue | |
| rowspan, colspan = sorted(rowspan), sorted(colspan) | |
| rowspan = list(range(rowspan[0], rowspan[-1] + 1)) | |
| colspan = list(range(colspan[0], colspan[-1] + 1)) | |
| assert i in rowspan, rowspan | |
| assert j in colspan, colspan | |
| arr = [] | |
| for r in rowspan: | |
| for c in colspan: | |
| arr_txt = join(arr) | |
| if tbl[r][c] and join(tbl[r][c]) != arr_txt: | |
| arr.extend(tbl[r][c]) | |
| tbl[r][c] = None if html else arr | |
| for a in arr: | |
| if len(rowspan) > 1: | |
| a["rowspan"] = len(rowspan) | |
| elif "rowspan" in a: | |
| del a["rowspan"] | |
| if len(colspan) > 1: | |
| a["colspan"] = len(colspan) | |
| elif "colspan" in a: | |
| del a["colspan"] | |
| tbl[rowspan[0]][colspan[0]] = arr | |
| return tbl | |
| def __construct_table(self, boxes, html=False): | |
| cap = "" | |
| i = 0 | |
| while i < len(boxes): | |
| if self.is_caption(boxes[i]): | |
| cap += boxes[i]["text"] | |
| boxes.pop(i) | |
| i -= 1 | |
| i += 1 | |
| if not boxes: | |
| return [] | |
| for b in boxes: | |
| b["btype"] = self._blockType(b) | |
| max_type = Counter([b["btype"] for b in boxes]).items() | |
| max_type = max(max_type, key=lambda x: x[1])[0] if max_type else "" | |
| logging.debug("MAXTYPE: " + max_type) | |
| rowh = [b["R_bott"] - b["R_top"] for b in boxes if "R" in b] | |
| rowh = np.min(rowh) if rowh else 0 | |
| # boxes = self.sort_Y_firstly(boxes, rowh/5) | |
| boxes = self.sort_R_firstly(boxes, rowh / 2) | |
| boxes[0]["rn"] = 0 | |
| rows = [[boxes[0]]] | |
| btm = boxes[0]["bottom"] | |
| for b in boxes[1:]: | |
| b["rn"] = len(rows) - 1 | |
| lst_r = rows[-1] | |
| if lst_r[-1].get("R", "") != b.get("R", "") \ | |
| or (b["top"] >= btm - 3 and lst_r[-1].get("R", "-1") != b.get("R", "-2") | |
| ): # new row | |
| btm = b["bottom"] | |
| b["rn"] += 1 | |
| rows.append([b]) | |
| continue | |
| btm = (btm + b["bottom"]) / 2. | |
| rows[-1].append(b) | |
| colwm = [b["C_right"] - b["C_left"] for b in boxes if "C" in b] | |
| colwm = np.min(colwm) if colwm else 0 | |
| crosspage = len(set([b["page_number"] for b in boxes])) > 1 | |
| if crosspage: | |
| boxes = self.sort_X_firstly(boxes, colwm / 2, False) | |
| else: | |
| boxes = self.sort_C_firstly(boxes, colwm / 2) | |
| boxes[0]["cn"] = 0 | |
| cols = [[boxes[0]]] | |
| right = boxes[0]["x1"] | |
| for b in boxes[1:]: | |
| b["cn"] = len(cols) - 1 | |
| lst_c = cols[-1] | |
| if (int(b.get("C", "1")) - int(lst_c[-1].get("C", "1")) == 1 and b["page_number"] == lst_c[-1][ | |
| "page_number"]) \ | |
| or (b["x0"] >= right and lst_c[-1].get("C", "-1") != b.get("C", "-2")): # new col | |
| right = b["x1"] | |
| b["cn"] += 1 | |
| cols.append([b]) | |
| continue | |
| right = (right + b["x1"]) / 2. | |
| cols[-1].append(b) | |
| tbl = [[[] for _ in range(len(cols))] for _ in range(len(rows))] | |
| for b in boxes: | |
| tbl[b["rn"]][b["cn"]].append(b) | |
| if len(rows) >= 4: | |
| # remove single in column | |
| j = 0 | |
| while j < len(tbl[0]): | |
| e, ii = 0, 0 | |
| for i in range(len(tbl)): | |
| if tbl[i][j]: | |
| e += 1 | |
| ii = i | |
| if e > 1: | |
| break | |
| if e > 1: | |
| j += 1 | |
| continue | |
| f = (j > 0 and tbl[ii][j - 1] and tbl[ii] | |
| [j - 1][0].get("text")) or j == 0 | |
| ff = (j + 1 < len(tbl[ii]) and tbl[ii][j + 1] and tbl[ii] | |
| [j + 1][0].get("text")) or j + 1 >= len(tbl[ii]) | |
| if f and ff: | |
| j += 1 | |
| continue | |
| bx = tbl[ii][j][0] | |
| logging.debug("Relocate column single: " + bx["text"]) | |
| # j column only has one value | |
| left, right = 100000, 100000 | |
| if j > 0 and not f: | |
| for i in range(len(tbl)): | |
| if tbl[i][j - 1]: | |
| left = min(left, np.min( | |
| [bx["x0"] - a["x1"] for a in tbl[i][j - 1]])) | |
| if j + 1 < len(tbl[0]) and not ff: | |
| for i in range(len(tbl)): | |
| if tbl[i][j + 1]: | |
| right = min(right, np.min( | |
| [a["x0"] - bx["x1"] for a in tbl[i][j + 1]])) | |
| assert left < 100000 or right < 100000 | |
| if left < right: | |
| for jj in range(j, len(tbl[0])): | |
| for i in range(len(tbl)): | |
| for a in tbl[i][jj]: | |
| a["cn"] -= 1 | |
| if tbl[ii][j - 1]: | |
| tbl[ii][j - 1].extend(tbl[ii][j]) | |
| else: | |
| tbl[ii][j - 1] = tbl[ii][j] | |
| for i in range(len(tbl)): | |
| tbl[i].pop(j) | |
| else: | |
| for jj in range(j + 1, len(tbl[0])): | |
| for i in range(len(tbl)): | |
| for a in tbl[i][jj]: | |
| a["cn"] -= 1 | |
| if tbl[ii][j + 1]: | |
| tbl[ii][j + 1].extend(tbl[ii][j]) | |
| else: | |
| tbl[ii][j + 1] = tbl[ii][j] | |
| for i in range(len(tbl)): | |
| tbl[i].pop(j) | |
| cols.pop(j) | |
| assert len(cols) == len(tbl[0]), "Column NO. miss matched: %d vs %d" % ( | |
| len(cols), len(tbl[0])) | |
| if len(cols) >= 4: | |
| # remove single in row | |
| i = 0 | |
| while i < len(tbl): | |
| e, jj = 0, 0 | |
| for j in range(len(tbl[i])): | |
| if tbl[i][j]: | |
| e += 1 | |
| jj = j | |
| if e > 1: | |
| break | |
| if e > 1: | |
| i += 1 | |
| continue | |
| f = (i > 0 and tbl[i - 1][jj] and tbl[i - 1] | |
| [jj][0].get("text")) or i == 0 | |
| ff = (i + 1 < len(tbl) and tbl[i + 1][jj] and tbl[i + 1] | |
| [jj][0].get("text")) or i + 1 >= len(tbl) | |
| if f and ff: | |
| i += 1 | |
| continue | |
| bx = tbl[i][jj][0] | |
| logging.debug("Relocate row single: " + bx["text"]) | |
| # i row only has one value | |
| up, down = 100000, 100000 | |
| if i > 0 and not f: | |
| for j in range(len(tbl[i - 1])): | |
| if tbl[i - 1][j]: | |
| up = min(up, np.min( | |
| [bx["top"] - a["bottom"] for a in tbl[i - 1][j]])) | |
| if i + 1 < len(tbl) and not ff: | |
| for j in range(len(tbl[i + 1])): | |
| if tbl[i + 1][j]: | |
| down = min(down, np.min( | |
| [a["top"] - bx["bottom"] for a in tbl[i + 1][j]])) | |
| assert up < 100000 or down < 100000 | |
| if up < down: | |
| for ii in range(i, len(tbl)): | |
| for j in range(len(tbl[ii])): | |
| for a in tbl[ii][j]: | |
| a["rn"] -= 1 | |
| if tbl[i - 1][jj]: | |
| tbl[i - 1][jj].extend(tbl[i][jj]) | |
| else: | |
| tbl[i - 1][jj] = tbl[i][jj] | |
| tbl.pop(i) | |
| else: | |
| for ii in range(i + 1, len(tbl)): | |
| for j in range(len(tbl[ii])): | |
| for a in tbl[ii][j]: | |
| a["rn"] -= 1 | |
| if tbl[i + 1][jj]: | |
| tbl[i + 1][jj].extend(tbl[i][jj]) | |
| else: | |
| tbl[i + 1][jj] = tbl[i][jj] | |
| tbl.pop(i) | |
| rows.pop(i) | |
| # which rows are headers | |
| hdset = set([]) | |
| for i in range(len(tbl)): | |
| cnt, h = 0, 0 | |
| for j, arr in enumerate(tbl[i]): | |
| if not arr: | |
| continue | |
| cnt += 1 | |
| if max_type == "Nu" and arr[0]["btype"] == "Nu": | |
| continue | |
| if any([a.get("H") for a in arr]) \ | |
| or (max_type == "Nu" and arr[0]["btype"] != "Nu"): | |
| h += 1 | |
| if h / cnt > 0.5: | |
| hdset.add(i) | |
| if html: | |
| return [self.__html_table(cap, hdset, | |
| self.__cal_spans(boxes, rows, | |
| cols, tbl, True) | |
| )] | |
| return self.__desc_table(cap, hdset, | |
| self.__cal_spans(boxes, rows, cols, tbl, False)) | |
| def __html_table(self, cap, hdset, tbl): | |
| # constrcut HTML | |
| html = "<table>" | |
| if cap: | |
| html += f"<caption>{cap}</caption>" | |
| for i in range(len(tbl)): | |
| row = "<tr>" | |
| txts = [] | |
| for j, arr in enumerate(tbl[i]): | |
| if arr is None: | |
| continue | |
| if not arr: | |
| row += "<td></td>" if i not in hdset else "<th></th>" | |
| continue | |
| txt = "" | |
| if arr: | |
| h = min(np.min([c["bottom"] - c["top"] for c in arr]) / 2, | |
| self.mean_height[arr[0]["page_number"] - 1] / 2) | |
| txt = "".join([c["text"] | |
| for c in self.sort_Y_firstly(arr, h)]) | |
| txts.append(txt) | |
| sp = "" | |
| if arr[0].get("colspan"): | |
| sp = "colspan={}".format(arr[0]["colspan"]) | |
| if arr[0].get("rowspan"): | |
| sp += " rowspan={}".format(arr[0]["rowspan"]) | |
| if i in hdset: | |
| row += f"<th {sp} >" + txt + "</th>" | |
| else: | |
| row += f"<td {sp} >" + txt + "</td>" | |
| if i in hdset: | |
| if all([t in hdset for t in txts]): | |
| continue | |
| for t in txts: | |
| hdset.add(t) | |
| if row != "<tr>": | |
| row += "</tr>" | |
| else: | |
| row = "" | |
| html += "\n" + row | |
| html += "\n</table>" | |
| return html | |
| def __desc_table(self, cap, hdr_rowno, tbl): | |
| # get text of every colomn in header row to become header text | |
| clmno = len(tbl[0]) | |
| rowno = len(tbl) | |
| headers = {} | |
| hdrset = set() | |
| lst_hdr = [] | |
| de = "的" if not self.is_english else " for " | |
| for r in sorted(list(hdr_rowno)): | |
| headers[r] = ["" for _ in range(clmno)] | |
| for i in range(clmno): | |
| if not tbl[r][i]: | |
| continue | |
| txt = "".join([a["text"].strip() for a in tbl[r][i]]) | |
| headers[r][i] = txt | |
| hdrset.add(txt) | |
| if all([not t for t in headers[r]]): | |
| del headers[r] | |
| hdr_rowno.remove(r) | |
| continue | |
| for j in range(clmno): | |
| if headers[r][j]: | |
| continue | |
| if j >= len(lst_hdr): | |
| break | |
| headers[r][j] = lst_hdr[j] | |
| lst_hdr = headers[r] | |
| for i in range(rowno): | |
| if i not in hdr_rowno: | |
| continue | |
| for j in range(i + 1, rowno): | |
| if j not in hdr_rowno: | |
| break | |
| for k in range(clmno): | |
| if not headers[j - 1][k]: | |
| continue | |
| if headers[j][k].find(headers[j - 1][k]) >= 0: | |
| continue | |
| if len(headers[j][k]) > len(headers[j - 1][k]): | |
| headers[j][k] += (de if headers[j][k] | |
| else "") + headers[j - 1][k] | |
| else: | |
| headers[j][k] = headers[j - 1][k] \ | |
| + (de if headers[j - 1][k] else "") \ | |
| + headers[j][k] | |
| logging.debug( | |
| f">>>>>>>>>>>>>>>>>{cap}:SIZE:{rowno}X{clmno} Header: {hdr_rowno}") | |
| row_txt = [] | |
| for i in range(rowno): | |
| if i in hdr_rowno: | |
| continue | |
| rtxt = [] | |
| def append(delimer): | |
| nonlocal rtxt, row_txt | |
| rtxt = delimer.join(rtxt) | |
| if row_txt and len(row_txt[-1]) + len(rtxt) < 64: | |
| row_txt[-1] += "\n" + rtxt | |
| else: | |
| row_txt.append(rtxt) | |
| r = 0 | |
| if len(headers.items()): | |
| _arr = [(i - r, r) for r, _ in headers.items() if r < i] | |
| if _arr: | |
| _, r = min(_arr, key=lambda x: x[0]) | |
| if r not in headers and clmno <= 2: | |
| for j in range(clmno): | |
| if not tbl[i][j]: | |
| continue | |
| txt = "".join([a["text"].strip() for a in tbl[i][j]]) | |
| if txt: | |
| rtxt.append(txt) | |
| if rtxt: | |
| append(":") | |
| continue | |
| for j in range(clmno): | |
| if not tbl[i][j]: | |
| continue | |
| txt = "".join([a["text"].strip() for a in tbl[i][j]]) | |
| if not txt: | |
| continue | |
| ctt = headers[r][j] if r in headers else "" | |
| if ctt: | |
| ctt += ":" | |
| ctt += txt | |
| if ctt: | |
| rtxt.append(ctt) | |
| if rtxt: | |
| row_txt.append("; ".join(rtxt)) | |
| if cap: | |
| if self.is_english: | |
| from_ = " in " | |
| else: | |
| from_ = "来自" | |
| row_txt = [t + f"\t——{from_}“{cap}”" for t in row_txt] | |
| return row_txt | |
| def is_caption(bx): | |
| patt = [ | |
| r"[图表]+[ 0-9::]{2,}" | |
| ] | |
| if any([re.match(p, bx["text"].strip()) for p in patt]) \ | |
| or bx["layout_type"].find("caption") >= 0: | |
| return True | |
| return False | |
| def _extract_table_figure(self, need_image, ZM, return_html): | |
| tables = {} | |
| figures = {} | |
| # extract figure and table boxes | |
| i = 0 | |
| lst_lout_no = "" | |
| nomerge_lout_no = [] | |
| while i < len(self.boxes): | |
| if "layoutno" not in self.boxes[i]: | |
| i += 1 | |
| continue | |
| lout_no = str(self.boxes[i]["page_number"]) + \ | |
| "-" + str(self.boxes[i]["layoutno"]) | |
| if self.is_caption(self.boxes[i]) or self.boxes[i]["layout_type"] in ["table caption", "title", | |
| "figure caption", "reference"]: | |
| nomerge_lout_no.append(lst_lout_no) | |
| if self.boxes[i]["layout_type"] == "table": | |
| if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): | |
| self.boxes.pop(i) | |
| continue | |
| if lout_no not in tables: | |
| tables[lout_no] = [] | |
| tables[lout_no].append(self.boxes[i]) | |
| self.boxes.pop(i) | |
| lst_lout_no = lout_no | |
| continue | |
| if need_image and self.boxes[i]["layout_type"] == "figure": | |
| if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): | |
| self.boxes.pop(i) | |
| continue | |
| if lout_no not in figures: | |
| figures[lout_no] = [] | |
| figures[lout_no].append(self.boxes[i]) | |
| self.boxes.pop(i) | |
| lst_lout_no = lout_no | |
| continue | |
| i += 1 | |
| # merge table on different pages | |
| nomerge_lout_no = set(nomerge_lout_no) | |
| tbls = sorted([(k, bxs) for k, bxs in tables.items()], | |
| key=lambda x: (x[1][0]["top"], x[1][0]["x0"])) | |
| i = len(tbls) - 1 | |
| while i - 1 >= 0: | |
| k0, bxs0 = tbls[i - 1] | |
| k, bxs = tbls[i] | |
| i -= 1 | |
| if k0 in nomerge_lout_no: | |
| continue | |
| if bxs[0]["page_number"] == bxs0[0]["page_number"]: | |
| continue | |
| if bxs[0]["page_number"] - bxs0[0]["page_number"] > 1: | |
| continue | |
| mh = self.mean_height[bxs[0]["page_number"] - 1] | |
| if self._y_dis(bxs0[-1], bxs[0]) > mh * 23: | |
| continue | |
| tables[k0].extend(tables[k]) | |
| del tables[k] | |
| def x_overlapped(a, b): | |
| return not any([a["x1"] < b["x0"], a["x0"] > b["x1"]]) | |
| # find captions and pop out | |
| i = 0 | |
| while i < len(self.boxes): | |
| c = self.boxes[i] | |
| # mh = self.mean_height[c["page_number"]-1] | |
| if not self.is_caption(c): | |
| i += 1 | |
| continue | |
| # find the nearest layouts | |
| def nearest(tbls): | |
| nonlocal c | |
| mink = "" | |
| minv = 1000000000 | |
| for k, bxs in tbls.items(): | |
| for b in bxs[:10]: | |
| if b.get("layout_type", "").find("caption") >= 0: | |
| continue | |
| y_dis = self._y_dis(c, b) | |
| x_dis = self._x_dis( | |
| c, b) if not x_overlapped( | |
| c, b) else 0 | |
| dis = y_dis * y_dis + x_dis * x_dis | |
| if dis < minv: | |
| mink = k | |
| minv = dis | |
| return mink, minv | |
| tk, tv = nearest(tables) | |
| fk, fv = nearest(figures) | |
| if min(tv, fv) > 2000: | |
| i += 1 | |
| continue | |
| if tv < fv: | |
| tables[tk].insert(0, c) | |
| logging.debug( | |
| "TABLE:" + | |
| self.boxes[i]["text"] + | |
| "; Cap: " + | |
| tk) | |
| else: | |
| figures[fk].insert(0, c) | |
| logging.debug( | |
| "FIGURE:" + | |
| self.boxes[i]["text"] + | |
| "; Cap: " + | |
| tk) | |
| self.boxes.pop(i) | |
| res = [] | |
| def cropout(bxs, ltype): | |
| nonlocal ZM | |
| pn = set([b["page_number"] - 1 for b in bxs]) | |
| if len(pn) < 2: | |
| pn = list(pn)[0] | |
| ht = self.page_cum_height[pn] | |
| b = { | |
| "x0": np.min([b["x0"] for b in bxs]), | |
| "top": np.min([b["top"] for b in bxs]) - ht, | |
| "x1": np.max([b["x1"] for b in bxs]), | |
| "bottom": np.max([b["bottom"] for b in bxs]) - ht | |
| } | |
| louts = [l for l in self.page_layout[pn] if l["type"] == ltype] | |
| ii = self.__find_overlapped(b, louts, naive=True) | |
| if ii is not None: | |
| b = louts[ii] | |
| else: | |
| logging.warn( | |
| f"Missing layout match: {pn + 1},%s" % | |
| (bxs[0].get( | |
| "layoutno", ""))) | |
| left, top, right, bott = b["x0"], b["top"], b["x1"], b["bottom"] | |
| return self.page_images[pn] \ | |
| .crop((left * ZM, top * ZM, | |
| right * ZM, bott * ZM)) | |
| pn = {} | |
| for b in bxs: | |
| p = b["page_number"] - 1 | |
| if p not in pn: | |
| pn[p] = [] | |
| pn[p].append(b) | |
| pn = sorted(pn.items(), key=lambda x: x[0]) | |
| imgs = [cropout(arr, ltype) for p, arr in pn] | |
| pic = Image.new("RGB", | |
| (int(np.max([i.size[0] for i in imgs])), | |
| int(np.sum([m.size[1] for m in imgs]))), | |
| (245, 245, 245)) | |
| height = 0 | |
| for img in imgs: | |
| pic.paste(img, (0, int(height))) | |
| height += img.size[1] | |
| return pic | |
| # crop figure out and add caption | |
| for k, bxs in figures.items(): | |
| txt = "\n".join( | |
| [b["text"] for b in bxs | |
| if not re.match(r"[0-9a-z.\+%-]", b["text"].strip()) | |
| and len(b["text"].strip()) >= 4 | |
| ] | |
| ) | |
| if not txt: | |
| continue | |
| res.append( | |
| (cropout( | |
| bxs, | |
| "figure"), | |
| [txt] if not return_html else [f"<p>{txt}</p>"])) | |
| for k, bxs in tables.items(): | |
| if not bxs: | |
| continue | |
| res.append((cropout(bxs, "table"), | |
| self.__construct_table(bxs, html=return_html))) | |
| return res | |
| def proj_match(self, line): | |
| if len(line) <= 2: | |
| return | |
| if re.match(r"[0-9 ().,%%+/-]+$", line): | |
| return False | |
| for p, j in [ | |
| (r"第[零一二三四五六七八九十百]+章", 1), | |
| (r"第[零一二三四五六七八九十百]+[条节]", 2), | |
| (r"[零一二三四五六七八九十百]+[、 ]", 3), | |
| (r"[\((][零一二三四五六七八九十百]+[)\)]", 4), | |
| (r"[0-9]+(、|\.[ ]|\.[^0-9])", 5), | |
| (r"[0-9]+\.[0-9]+(、|[. ]|[^0-9])", 6), | |
| (r"[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 7), | |
| (r"[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 8), | |
| (r".{,48}[::??]$", 9), | |
| (r"[0-9]+)", 10), | |
| (r"[\((][0-9]+[)\)]", 11), | |
| (r"[零一二三四五六七八九十百]+是", 12), | |
| (r"[⚫•➢✓]", 12) | |
| ]: | |
| if re.match(p, line): | |
| return j | |
| return | |
| def _line_tag(self, bx, ZM): | |
| pn = [bx["page_number"]] | |
| top = bx["top"] - self.page_cum_height[pn[0] - 1] | |
| bott = bx["bottom"] - self.page_cum_height[pn[0] - 1] | |
| while bott * ZM > self.page_images[pn[-1] - 1].size[1]: | |
| bott -= self.page_images[pn[-1] - 1].size[1] / ZM | |
| pn.append(pn[-1] + 1) | |
| return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##" \ | |
| .format("-".join([str(p) for p in pn]), | |
| bx["x0"], bx["x1"], top, bott) | |
| def __filterout_scraps(self, boxes, ZM): | |
| def width(b): | |
| return b["x1"] - b["x0"] | |
| def height(b): | |
| return b["bottom"] - b["top"] | |
| def usefull(b): | |
| if b.get("layout_type"): | |
| return True | |
| if width( | |
| b) > self.page_images[b["page_number"] - 1].size[0] / ZM / 3: | |
| return True | |
| if b["bottom"] - b["top"] > self.mean_height[b["page_number"] - 1]: | |
| return True | |
| return False | |
| res = [] | |
| while boxes: | |
| lines = [] | |
| widths = [] | |
| pw = self.page_images[boxes[0]["page_number"] - 1].size[0] / ZM | |
| mh = self.mean_height[boxes[0]["page_number"] - 1] | |
| mj = self.proj_match( | |
| boxes[0]["text"]) or boxes[0].get( | |
| "layout_type", | |
| "") == "title" | |
| def dfs(line, st): | |
| nonlocal mh, pw, lines, widths | |
| lines.append(line) | |
| widths.append(width(line)) | |
| width_mean = np.mean(widths) | |
| mmj = self.proj_match( | |
| line["text"]) or line.get( | |
| "layout_type", | |
| "") == "title" | |
| for i in range(st + 1, min(st + 20, len(boxes))): | |
| if (boxes[i]["page_number"] - line["page_number"]) > 0: | |
| break | |
| if not mmj and self._y_dis( | |
| line, boxes[i]) >= 3 * mh and height(line) < 1.5 * mh: | |
| break | |
| if not usefull(boxes[i]): | |
| continue | |
| if mmj or \ | |
| (self._x_dis(boxes[i], line) < pw / 10): \ | |
| # and abs(width(boxes[i])-width_mean)/max(width(boxes[i]),width_mean)<0.5): | |
| # concat following | |
| dfs(boxes[i], i) | |
| boxes.pop(i) | |
| break | |
| try: | |
| if usefull(boxes[0]): | |
| dfs(boxes[0], 0) | |
| else: | |
| logging.debug("WASTE: " + boxes[0]["text"]) | |
| except Exception as e: | |
| pass | |
| boxes.pop(0) | |
| mw = np.mean(widths) | |
| if mj or mw / pw >= 0.35 or mw > 200: | |
| res.append("\n".join([c["text"] + self._line_tag(c, ZM) for c in lines])) | |
| else: | |
| logging.debug("REMOVED: " + | |
| "<<".join([c["text"] for c in lines])) | |
| return "\n\n".join(res) | |
| def total_page_number(fnm, binary=None): | |
| try: | |
| pdf = pdfplumber.open(fnm) if not binary else pdfplumber.open(BytesIO(binary)) | |
| return len(pdf.pages) | |
| except Exception as e: | |
| pdf = fitz.open(fnm) if not binary else fitz.open(stream=fnm, filetype="pdf") | |
| return len(pdf) | |
| def __images__(self, fnm, zoomin=3, page_from=0, page_to=299): | |
| self.lefted_chars = [] | |
| self.mean_height = [] | |
| self.mean_width = [] | |
| self.boxes = [] | |
| self.garbages = {} | |
| self.page_cum_height = [0] | |
| self.page_layout = [] | |
| try: | |
| self.pdf = pdfplumber.open(fnm) if isinstance(fnm, str) else pdfplumber.open(BytesIO(fnm)) | |
| self.page_images = [p.to_image(resolution=72 * zoomin).annotated for i, p in | |
| enumerate(self.pdf.pages[page_from:page_to])] | |
| self.page_chars = [[c for c in page.chars if self._has_color(c)] for page in self.pdf.pages[page_from:page_to]] | |
| self.total_page = len(self.pdf.pages) | |
| except Exception as e: | |
| self.pdf = fitz.open(fnm) if isinstance(fnm, str) else fitz.open(stream=fnm, filetype="pdf") | |
| self.page_images = [] | |
| self.page_chars = [] | |
| mat = fitz.Matrix(zoomin, zoomin) | |
| self.total_page = len(self.pdf) | |
| for i, page in enumerate(self.pdf): | |
| if i < page_from:continue | |
| if i >= page_to:break | |
| pix = page.get_pixmap(matrix=mat) | |
| img = Image.frombytes("RGB", [pix.width, pix.height], | |
| pix.samples) | |
| self.page_images.append(img) | |
| self.page_chars.append([]) | |
| logging.info("Images converted.") | |
| self.is_english = [re.search(r"[a-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join(random.choices([c["text"] for c in self.page_chars[i]], k=min(100, len(self.page_chars[i]))))) for i in range(len(self.page_chars))] | |
| if sum([1 if e else 0 for e in self.is_english]) > len(self.page_images) / 2: | |
| self.is_english = True | |
| else: | |
| self.is_english = False | |
| for i, img in enumerate(self.page_images): | |
| chars = self.page_chars[i] if not self.is_english else [] | |
| self.mean_height.append( | |
| np.median(sorted([c["height"] for c in chars])) if chars else 0 | |
| ) | |
| self.mean_width.append( | |
| np.median(sorted([c["width"] for c in chars])) if chars else 8 | |
| ) | |
| self.page_cum_height.append(img.size[1] / zoomin) | |
| j = 0 | |
| while j + 1 < len(chars): | |
| if chars[j]["text"] and chars[j + 1]["text"] \ | |
| and re.match(r"[0-9a-zA-Z,.:;!%]+", chars[j]["text"] + chars[j + 1]["text"]) \ | |
| and chars[j + 1]["x0"] - chars[j]["x1"] >= min(chars[j + 1]["width"], | |
| chars[j]["width"]) / 2: | |
| chars[j]["text"] += " " | |
| j += 1 | |
| # if i > 0: | |
| # if not chars: | |
| # self.page_cum_height.append(img.size[1] / zoomin) | |
| # else: | |
| # self.page_cum_height.append( | |
| # np.max([c["bottom"] for c in chars])) | |
| self.__ocr_paddle(i + 1, img, chars, zoomin) | |
| if not self.is_english and not any([c for c in self.page_chars]) and self.boxes: | |
| bxes = [b for bxs in self.boxes for b in bxs] | |
| self.is_english = re.search(r"[\na-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join([b["text"] for b in random.choices(bxes, k=min(30, len(bxes)))])) | |
| logging.info("Is it English:", self.is_english) | |
| self.page_cum_height = np.cumsum(self.page_cum_height) | |
| assert len(self.page_cum_height) == len(self.page_images) + 1 | |
| def __call__(self, fnm, need_image=True, zoomin=3, return_html=False): | |
| self.__images__(fnm, zoomin) | |
| self._layouts_paddle(zoomin) | |
| self._table_transformer_job(zoomin) | |
| self._text_merge() | |
| self._concat_downward() | |
| self._filter_forpages() | |
| tbls = self._extract_table_figure(need_image, zoomin, return_html) | |
| return self.__filterout_scraps(deepcopy(self.boxes), zoomin), tbls | |
| def remove_tag(self, txt): | |
| return re.sub(r"@@[\t0-9.-]+?##", "", txt) | |
| def crop(self, text, ZM=3): | |
| imgs = [] | |
| for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text): | |
| pn, left, right, top, bottom = tag.strip( | |
| "#").strip("@").split("\t") | |
| left, right, top, bottom = float(left), float( | |
| right), float(top), float(bottom) | |
| bottom *= ZM | |
| pns = [int(p) - 1 for p in pn.split("-")] | |
| for pn in pns[1:]: | |
| bottom += self.page_images[pn - 1].size[1] | |
| imgs.append( | |
| self.page_images[pns[0]].crop((left * ZM, top * ZM, | |
| right * | |
| ZM, min( | |
| bottom, self.page_images[pns[0]].size[1]) | |
| )) | |
| ) | |
| bottom -= self.page_images[pns[0]].size[1] | |
| for pn in pns[1:]: | |
| imgs.append( | |
| self.page_images[pn].crop((left * ZM, 0, | |
| right * ZM, | |
| min(bottom, | |
| self.page_images[pn].size[1]) | |
| )) | |
| ) | |
| bottom -= self.page_images[pn].size[1] | |
| if not imgs: | |
| return | |
| GAP = 2 | |
| height = 0 | |
| for img in imgs: | |
| height += img.size[1] + GAP | |
| height = int(height) | |
| pic = Image.new("RGB", | |
| (int(np.max([i.size[0] for i in imgs])), height), | |
| (245, 245, 245)) | |
| height = 0 | |
| for img in imgs: | |
| pic.paste(img, (0, int(height))) | |
| height += img.size[1] + GAP | |
| return pic | |
| if __name__ == "__main__": | |
| pass | |