# -*- coding:utf-8 -*- import re from copy import deepcopy from lxml.html import Element, tostring, fromstring from ultradata_math_parser.config import Forum_XPATH, Unique_ID from ultradata_math_parser.utils import load_html, text_len from ultradata_math_parser.parsers.base_parser import BaseParser from ultradata_math_parser.parsers.title_parser import TitleParser class UnifiedParser(BaseParser): def __init__(self): super().__init__() self.need_comment = True self.enable_forum_assembly = True self.forum_assembly_min_gain = 1.1 def extract(self, html="", **kwargs) -> dict: base_url = kwargs.get("base_url", "") self.process_math = kwargs.get("process_math", self.process_math) self.preserve_math_containers = kwargs.get("preserve_math_containers", self.preserve_math_containers) self.include_tables = kwargs.get("include_tables", self.include_tables) self.include_images = kwargs.get("include_images", self.include_images) self.enable_forum_assembly = kwargs.get("enable_forum_assembly", self.enable_forum_assembly) self.fallback_min_length = kwargs.get("fallback_min_length", self.fallback_min_length) html = html.replace(" ", " ").replace(" ", " ") tree = load_html(html) if tree is None: raise ValueError title = TitleParser().process(tree) raw_tree = deepcopy(tree) # base_url base_href = tree.xpath("//base/@href") if base_href and "http" in base_href[0]: base_url = base_href[0] self.generate_unique_id(tree) # 标签转换 format_tree = self.convert_tags(tree, base_url=base_url) format_tree = self._remove_tables_from_tree(format_tree) format_tree = self._remove_images_from_tree(format_tree) normal_tree = self.clean_tags(format_tree) normal_tree = self._remove_tables_from_tree(normal_tree) normal_tree = self._remove_images_from_tree(normal_tree) fallback_tree = deepcopy(normal_tree) # 主体提取 subtree, xp_num, drop_list = self.xp_1_5(normal_tree) if xp_num == "others": subtree, drop_list = self.prune_unwanted_sections(normal_tree) body_html = self.get_content_html(subtree, xp_num, base_url) # 论坛帖子拼装 forum_assembled = False if self.enable_forum_assembly: if xp_num != "others": normal_tree, _ = self.prune_unwanted_sections(normal_tree) original_length = self._text_length_from_html(body_html) assembled_html = self._try_forum_assembly(normal_tree, body_html) assembled_length = self._text_length_from_html(assembled_html) if assembled_length >= original_length * self.forum_assembly_min_gain: body_html = assembled_html forum_assembled = True # 条件兜底 current_length = self._text_length_from_html(body_html) fallback_strategy = "primary" if current_length < self.fallback_min_length: body_html, fallback_strategy = self.apply_fallbacks( primary_html=body_html, base_url=base_url, normal_tree=fallback_tree, raw_tree=raw_tree, ) body_html = self._strip_tables_from_html(body_html) body_html = self._strip_images_from_html(body_html) text_length = self._text_length_from_html(body_html) return { "xp_num": xp_num, "drop_list": drop_list, "html": body_html, "title": title, "base_url": base_url, "fallback_strategy": fallback_strategy, "text_length": text_length, "forum_assembled": forum_assembled, } def _try_forum_assembly(self, normal_tree, body_html): if not body_html: return body_html try: body_html_tree = fromstring(body_html) except Exception: return body_html try: body_tree = body_html_tree.body except: body_tree = Element("body") body_tree.extend(body_html_tree) main_ids = body_tree.xpath(f".//@{Unique_ID}") for main_id in main_ids: main_tree = normal_tree.xpath(f".//*[@{Unique_ID}={main_id}]") if main_tree: try: self.remove_node(main_tree[0]) except: pass if not main_ids: main_ids = [-1] for c_xpath in Forum_XPATH: while True: matches = normal_tree.xpath(c_xpath) if not matches: break x = matches[0] self.remove_node(x) if "'post-'" in c_xpath or "'post_'" in c_xpath: elem_id = x.attrib.get("id", "").lower() if not (re.search(r'post-\d+', elem_id) or re.search(r'post_\d+', elem_id)): continue if "header" in x.attrib.get("class", "").lower() or "header" in x.attrib.get("id", "").lower(): continue try: node_id = int(x.attrib.get(Unique_ID, "0")) last_main_id = int(main_ids[-1]) if main_ids else -1 if node_id > last_main_id: body_tree.append(x) else: prefix_div = Element("div") suffix_div = Element("div") need_prefix = False need_suffix = False while x.xpath(f".//*[number(@{Unique_ID}) > {last_main_id}]"): tmp_x = x.xpath(f".//*[number(@{Unique_ID}) > {last_main_id}]")[0] self.remove_node(tmp_x) suffix_div.append(tmp_x) need_suffix = True while x.xpath(f".//*[number(@{Unique_ID}) < {last_main_id}]"): tmp_x = x.xpath(f".//*[number(@{Unique_ID}) < {last_main_id}]")[0] self.remove_node(tmp_x) prefix_div.append(tmp_x) need_prefix = True if need_prefix: body_tree.insert(0, prefix_div) if need_suffix: body_tree.append(suffix_div) except Exception: pass result_html = re.sub( f' {Unique_ID}="\d+"', "", tostring(body_tree, encoding=str), ) return result_html