| | |
| |
|
| | import os |
| | import re |
| | import logging |
| | import subprocess |
| | import tempfile |
| | from gzip import decompress |
| |
|
| | import numpy as np |
| | from lxml import etree |
| | from lxml.html import Element, HtmlElement, HTMLParser, fromstring, tostring |
| | from lxml.html.clean import Cleaner |
| | from urllib3.response import HTTPResponse |
| | from ultradata_math_parser.config import Unique_ID |
| |
|
| | try: |
| | import brotli |
| | except ImportError: |
| | brotli = None |
| |
|
| | try: |
| | from cchardet import detect as cchardet_detect |
| | except ImportError: |
| | cchardet_detect = None |
| |
|
| | from difflib import SequenceMatcher |
| |
|
| | from charset_normalizer import from_bytes |
| |
|
| | HTML_PARSER = HTMLParser( |
| | collect_ids=False, |
| | default_doctype=False, |
| | encoding="utf-8", |
| | remove_comments=True, |
| | remove_pis=True, |
| | ) |
| | DOCTYPE_TAG = re.compile("^< ?! ?DOCTYPE.+?/ ?>", re.I) |
| | UNICODE_ALIASES = {"utf-8", "utf_8"} |
| |
|
| | HTML_CLEANER = Cleaner( |
| | annoying_tags=False, |
| | comments=True, |
| | embedded=False, |
| | forms=False, |
| | frames=False, |
| | javascript=False, |
| | links=False, |
| | meta=False, |
| | page_structure=False, |
| | processing_instructions=True, |
| | remove_unknown_tags=False, |
| | safe_attrs_only=False, |
| | scripts=False, |
| | style=False, |
| | ) |
| |
|
| | color_regex = re.compile(r"\\textcolor\[.*?\]\{.*?\}") |
| |
|
| | latex_image_class_names = [ |
| | "latexcenter", |
| | "latex", |
| | "tex", |
| | "latexdisplay", |
| | "latexblock", |
| | "latexblockcenter", |
| | ] |
| |
|
| |
|
| | def _translator(): |
| | old_log_level = logging.getLogger().level |
| | try: |
| | import py_asciimath.translator.translator as _translator |
| |
|
| | return _translator |
| | finally: |
| | logging.getLogger().setLevel(old_log_level) |
| |
|
| |
|
| | def ASCIIMath2Tex(*args, **kwargs): |
| | return _translator().ASCIIMath2Tex(*args, **kwargs) |
| |
|
| |
|
| | def MathML2Tex(*args, **kwargs): |
| | return _translator().MathML2Tex(*args, **kwargs) |
| |
|
| |
|
| | asciimath2tex = ASCIIMath2Tex(log=False) |
| |
|
| |
|
| | def lcs_of_2(a, b): |
| | match = SequenceMatcher(None, a, b).find_longest_match(0, len(a), 0, len(b)) |
| | return a[match[0]: match[0] + match[2]] |
| |
|
| |
|
| | def lcs_of_list(*args): |
| | if len(args) == 2: |
| | return lcs_of_2(args[0], args[1]) |
| | first = args[0] |
| | remains = args[1:] |
| | return lcs_of_2(first, lcs_of_list(*remains)) |
| |
|
| |
|
| | def isutf8(data): |
| | try: |
| | data.decode("UTF-8") |
| | except UnicodeDecodeError: |
| | return False |
| | return True |
| |
|
| |
|
| | def handle_compressed_file(filecontent): |
| | if isinstance(filecontent, bytes): |
| | if filecontent[:2] == b"\x1f\x8b": |
| | try: |
| | filecontent = decompress(filecontent) |
| | except (EOFError, OSError): |
| | pass |
| | elif brotli is not None: |
| | try: |
| | filecontent = brotli.decompress(filecontent) |
| | except brotli.error: |
| | pass |
| | return filecontent |
| |
|
| |
|
| | def detect_encoding(bytesobject): |
| | if isutf8(bytesobject): |
| | return ["utf-8"] |
| | guesses = [] |
| | if cchardet_detect is not None: |
| | cchardet_guess = cchardet_detect(bytesobject)["encoding"] |
| | if cchardet_guess is not None: |
| | guesses.append(cchardet_guess.lower()) |
| | detection_results = from_bytes(bytesobject[:15000]) or from_bytes(bytesobject) |
| | if len(detection_results) > 0: |
| | guesses.extend([r.encoding for r in detection_results]) |
| | return [g for g in guesses if g not in UNICODE_ALIASES] |
| |
|
| |
|
| | def decode_file(filecontent): |
| | if isinstance(filecontent, str): |
| | return filecontent |
| | htmltext = None |
| | filecontent = handle_compressed_file(filecontent) |
| | for guessed_encoding in detect_encoding(filecontent): |
| | try: |
| | htmltext = filecontent.decode(guessed_encoding) |
| | except (LookupError, UnicodeDecodeError): |
| | htmltext = None |
| | else: |
| | break |
| | return htmltext or str(filecontent, encoding="utf-8", errors="replace") |
| |
|
| |
|
| | def strip_faulty_doctypes(htmlstring: str, beginning: str) -> str: |
| | if "doctype" in beginning: |
| | firstline, _, rest = htmlstring.partition("\n") |
| | return DOCTYPE_TAG.sub("", firstline, count=1) + "\n" + rest |
| | return htmlstring |
| |
|
| |
|
| | def is_dubious_html(beginning: str) -> bool: |
| | return "html" not in beginning |
| |
|
| |
|
| | def fromstring_bytes(htmlobject): |
| | tree = None |
| | try: |
| | tree = fromstring( |
| | htmlobject.encode("utf8", "surrogatepass"), parser=HTML_PARSER |
| | ) |
| | except Exception as err: |
| | pass |
| | return tree |
| |
|
| |
|
| | def ancestor_node_check(node: HtmlElement, tags: list): |
| | for tag in tags: |
| | if node.xpath(f'ancestor::{tag}[1]'): |
| | return True |
| | return False |
| |
|
| |
|
| | def load_html(htmlobject): |
| | if isinstance(htmlobject, HtmlElement): |
| | return htmlobject |
| | if isinstance(htmlobject, HTTPResponse) or hasattr(htmlobject, "data"): |
| | htmlobject = htmlobject.data |
| | if not isinstance(htmlobject, (bytes, str)): |
| | raise TypeError("incompatible input type", type(htmlobject)) |
| | tree = None |
| | htmlobject = decode_file(htmlobject) |
| | beginning = htmlobject[:50].lower() |
| | check_flag = is_dubious_html(beginning) |
| | htmlobject = strip_faulty_doctypes(htmlobject, beginning) |
| | fallback_parse = False |
| | try: |
| | tree = fromstring(htmlobject, parser=HTML_PARSER) |
| | except ValueError: |
| | tree = fromstring_bytes(htmlobject) |
| | fallback_parse = True |
| | except Exception as err: |
| | pass |
| | if (tree is None or len(tree) < 1) and not fallback_parse: |
| | tree = fromstring_bytes(htmlobject) |
| | if tree is not None and check_flag is True and len(tree) < 2: |
| | tree = None |
| | return tree |
| |
|
| |
|
| | class W3MError(RuntimeError): |
| | """Raised when w3m rendering fails.""" |
| |
|
| |
|
| | def run_w3m_dump(html_content: str, w3m_path: str, *, columns: int = 200) -> str: |
| | """ |
| | Render HTML content into plain text using w3m. |
| | |
| | :param html_content: HTML snippet to render. |
| | :param w3m_path: Path to the w3m executable. |
| | :param columns: Column width passed to w3m (-cols). |
| | :return: Rendered plain text. |
| | :raises RuntimeError: if w3m is unavailable or returns a non-zero exit code. |
| | """ |
| | if not w3m_path: |
| | raise W3MError("w3m path must be provided") |
| |
|
| | tmp_file = tempfile.NamedTemporaryFile( |
| | mode="w", suffix=".html", delete=False, encoding="utf-8" |
| | ) |
| | try: |
| | tmp_file.write(html_content or "") |
| | tmp_file.flush() |
| | tmp_file.close() |
| |
|
| | try: |
| | completed = subprocess.run( |
| | [ |
| | w3m_path, |
| | "-dump", |
| | "-T", |
| | "text/html", |
| | "-cols", |
| | str(columns), |
| | tmp_file.name, |
| | ], |
| | check=True, |
| | capture_output=True, |
| | text=True, |
| | ) |
| | except FileNotFoundError as exc: |
| | raise W3MError(f"w3m executable not found at '{w3m_path}'") from exc |
| | except subprocess.CalledProcessError as exc: |
| | stderr = (exc.stderr or "").strip() |
| | message = f"w3m exited with status {exc.returncode}" |
| | if stderr: |
| | message = f"{message}: {stderr}" |
| | raise W3MError(message) from exc |
| |
|
| | return completed.stdout |
| | finally: |
| | try: |
| | os.unlink(tmp_file.name) |
| | except OSError: |
| | pass |
| |
|
| |
|
| | def is_empty_element(node: HtmlElement): |
| | return not node.getchildren() and not node.text |
| |
|
| |
|
| | def iter_node(element: HtmlElement): |
| | yield element |
| | for sub_element in element: |
| | if isinstance(sub_element, HtmlElement): |
| | yield from iter_node(sub_element) |
| |
|
| |
|
| | def img_div_check(tree): |
| | """ |
| | 如果一个div中只有一张图,且子节点数小于4则保留 |
| | """ |
| | if len(tree.xpath(".//img")) == 1 and len(tree.xpath(".//*")) < 4: |
| | return False |
| | else: |
| | return True |
| |
|
| |
|
| | def text_len(s): |
| | s = re.sub(" +", " ", s) |
| | s = re.sub("[\n\t\r]+", "\n", s) |
| | english_words = s.split() |
| | chinese_characters = re.findall(r"[\u4e00-\u9fff]", s) |
| | japanese_characters = re.findall(r"[\u3040-\u309F\u30A0-\u30FF]", s) |
| | arabic_characters = re.findall(r"[\u0600-\u06FF]", s) |
| | return ( |
| | len(english_words) |
| | + len(chinese_characters) |
| | + len(japanese_characters) |
| | + len(arabic_characters) |
| | ) |
| |
|
| |
|
| | def alias(element): |
| | if element is None: |
| | return "" |
| | tag = element.tag |
| | |
| | if tag in ["html", "body"]: |
| | return tag |
| | attribs = [tag] |
| | for k, v in element.attrib.items(): |
| | if k == Unique_ID: |
| | continue |
| | k, v = re.sub(r"\s*", "", k), re.sub(r"\s*", "", v) |
| | v = re.sub(r"-\d+", "", v) |
| | attribs.append(f'[{k}="{v}"]' if v else f"[{k}]") |
| | result = "".join(attribs) |
| |
|
| | |
| | nth = "" |
| | for child in element.getchildren(): |
| | if child.tag in ["dt", "dd", "li"]: |
| | try: |
| | |
| | nth += str(len(list(child.getchildren()))) |
| | except: |
| | pass |
| | continue |
| | attribs = [child.tag] |
| | for k, v in child.attrib.items(): |
| | if k == Unique_ID: |
| | continue |
| | k, v = re.sub(r"\s*", "", k), re.sub(r"\s*", "", v) |
| | v = re.sub(r"-\d+", "", v) |
| | attribs.append(f"[{k}]" if v else f"[{k}]") |
| | nth += "".join(attribs) |
| |
|
| | result += f":{nth}" |
| | return result |
| |
|
| |
|
| | def similarity2(s1, s2): |
| | if not s1 or not s2: |
| | return 0 |
| | s1_set = set(list(s1)) |
| | s2_set = set(list(s2)) |
| | intersection = s1_set.intersection(s2_set) |
| | union = s1_set.union(s2_set) |
| | return len(intersection) / len(union) |
| |
|
| |
|
| | def similarity_with_element(element1, element2): |
| | alias1 = alias(element1) |
| | alias2 = alias(element2) |
| | return similarity2(alias1, alias2) |
| |
|
| |
|
| | def similarity_with_siblings(element, siblings): |
| | scores = [] |
| | for sibling in siblings: |
| | |
| | scores.append(similarity_with_element(element, sibling)) |
| | if not scores: |
| | return 0 |
| | |
| | min_value = min(scores) |
| | scores.remove(min_value) |
| | return np.mean(scores) |
| |
|
| |
|
| | def number_of_a_char(ele, xpath=".//a//text()"): |
| | s = "".join(ele.xpath(xpath)).strip() |
| | return text_len(s) |
| |
|
| |
|
| | def number_of_char(ele, xpath=".//text()"): |
| | s = "".join(ele.xpath(xpath)).strip() |
| | return text_len(s) + 1 |
| |
|
| |
|
| | def density_of_a_text(ele, pre=0.7): |
| | a_char = number_of_a_char(ele) |
| | t_char = number_of_char(ele) |
| | if a_char / t_char >= pre: |
| | return True |
| | else: |
| | return False |
| |
|
| |
|
| | def uniquify_list(l): |
| | return list(dict.fromkeys(l)) |
| |
|
| |
|
| | def trim(string): |
| | """Remove unnecessary spaces within a text string""" |
| | try: |
| | return " ".join(string.split()).strip() |
| | except (AttributeError, TypeError): |
| | return None |
| |
|
| |
|
| | def collect_link_info(links_xpath, favor_precision=False): |
| | shortelems, mylist = 0, [] |
| | threshold = 10 if not favor_precision else 50 |
| | for subelem in links_xpath: |
| | subelemtext = trim(subelem.text_content()) |
| | if subelemtext: |
| | mylist.append(subelemtext) |
| | if len(subelemtext) < threshold: |
| | shortelems += 1 |
| | lengths = sum(len(text) for text in mylist) |
| | return lengths, len(mylist), shortelems, mylist |
| |
|
| |
|
| | def link_density_test(element, text, favor_precision=False): |
| | links_xpath, mylist = element.findall(".//a"), [] |
| | if links_xpath: |
| | if element.tag == "p": |
| | if favor_precision is False: |
| | if element.getnext() is None: |
| | limitlen, threshold = 60, 0.8 |
| | else: |
| | limitlen, threshold = 30, 0.8 |
| | else: |
| | limitlen, threshold = 200, 0.8 |
| | else: |
| | if element.getnext() is None: |
| | limitlen, threshold = 300, 0.8 |
| | else: |
| | limitlen, threshold = 100, 0.8 |
| | elemlen = len(text) |
| | if elemlen < limitlen: |
| | linklen, elemnum, shortelems, mylist = collect_link_info( |
| | links_xpath, favor_precision |
| | ) |
| | if elemnum == 0: |
| | return True, mylist |
| | if density_of_a_text(element, 0.5): |
| | if linklen > threshold * elemlen or ( |
| | elemnum > 1 and shortelems / elemnum > 0.8 |
| | ): |
| | return True, mylist |
| | return False, mylist |
| |
|
| |
|
| | def text_strip(text): |
| | return text.strip() if text else text |
| |
|
| |
|
| | def wrap_math(s, display=False): |
| | s = re.sub(r"\s+", " ", s) |
| | s = color_regex.sub("", s) |
| | s = s.replace("$", "") |
| | s = s.replace("\n", " ").replace("\\n", "") |
| | s = s.strip() |
| | if len(s) == 0: |
| | return s |
| | |
| | if "align" in s: |
| | return s |
| | if display: |
| | return "$$" + s + "$$" |
| | return "$" + s + "$" |
| |
|
| |
|
| | def extract_asciimath(s): |
| | parsed = asciimath2tex.translate(s) |
| | return parsed |
| |
|
| |
|
| | cur_file = os.path.abspath(__file__) |
| | xsl_path = os.path.join(os.path.dirname(cur_file), "mmltex/mmltex.xsl") |
| |
|
| | xslt = etree.parse(xsl_path) |
| | transform = etree.XSLT(xslt) |
| |
|
| |
|
| | def mml_to_latex(mml_code): |
| | |
| | mml_code = re.sub(r"(<math.*?>)", r"\1", mml_code) |
| | mml_ns = mml_code.replace( |
| | "<math>", '<math xmlns="http://www.w3.org/1998/Math/MathML">' |
| | ) |
| |
|
| | mml_ns = mml_ns.replace(""", '"') |
| | mml_ns = mml_ns.replace("'\\\"", '"').replace("\\\"'", '"') |
| |
|
| | |
| | |
| |
|
| | pattern = r'"([^"]+?)\'' |
| | mml_ns = re.sub(pattern, r'"\1"', mml_ns) |
| |
|
| | mml_dom = etree.fromstring(mml_ns) |
| | mmldom = transform(mml_dom) |
| | latex_code = str(mmldom) |
| | return latex_code |
| |
|