| | |
| | import json |
| | import logging |
| | from typing import Optional, Type |
| | from urllib.parse import urlparse |
| | import tldextract |
| |
|
| | from ultradata_math_parser.parsers.article_parser import ArticleParser |
| | from ultradata_math_parser.parsers.forum_parser import ForumParser |
| | from ultradata_math_parser.parsers.custom_parser import CustomParser |
| | from ultradata_math_parser.parsers.unified_parser import UnifiedParser |
| | from ultradata_math_parser.utils import text_len, run_w3m_dump, W3MError |
| | from ultradata_math_parser.config import URL_PATTERNS_TO_HTML_TYPE, BUILTIN_SITE_RULES |
| |
|
| |
|
| | class GeneralParser: |
| | def __init__(self, config_path="", w3m_path: str = "w3m"): |
| | self.logger = logging.getLogger(__name__) |
| | if config_path: |
| | try: |
| | with open(config_path, 'r', encoding='utf-8') as f: |
| | self.rule = json.loads(f.read()) |
| | except: |
| | pass |
| | else: |
| | self.rule = {} |
| | self.w3m_path = w3m_path or "w3m" |
| | self.tld_extractor = tldextract.TLDExtract() |
| |
|
| | def extract(self, html="", w3m_path: Optional[str] = None, **kwargs) -> dict: |
| | base_url = kwargs.get("base_url", "") |
| | netloc = urlparse(base_url).netloc if base_url else "" |
| | html_type = kwargs.pop("html_type", None) |
| |
|
| | current_w3m_path = w3m_path or self.w3m_path |
| |
|
| | |
| | if base_url and self._quick_check_builtin_rules(base_url): |
| | try: |
| | extracted = self.tld_extractor(base_url) |
| | domain = f"{extracted.domain}.{extracted.suffix}" |
| | self.logger.debug("TLD Extract result for %s: domain=%s, suffix=%s -> key=%s", base_url, extracted.domain, extracted.suffix, domain) |
| |
|
| | if domain in BUILTIN_SITE_RULES: |
| | try: |
| | builtin_rule = BUILTIN_SITE_RULES[domain] |
| | new_kwargs = dict() |
| | new_kwargs["rule"] = builtin_rule |
| | new_kwargs.update(kwargs) |
| | self.logger.debug("Using builtin rule for domain: %s", domain) |
| | return self._run_extractor(CustomParser, html, new_kwargs, w3m_path=current_w3m_path) |
| | except Exception as exc: |
| | self.logger.debug("Builtin rule extractor failed for %s: %s", domain, exc) |
| | except Exception as e: |
| | self.logger.debug("Error extracting domain or checking builtin rules: %s", e) |
| |
|
| | |
| | if not html_type and base_url: |
| | for pattern, type in URL_PATTERNS_TO_HTML_TYPE.items(): |
| | if pattern in base_url: |
| | html_type = type |
| | break |
| |
|
| | |
| | if netloc in self.rule: |
| | try: |
| | new_kwargs = dict() |
| | new_kwargs["rule"] = self.rule[netloc] |
| | new_kwargs.update(kwargs) |
| | return self._run_extractor(CustomParser, html, new_kwargs, w3m_path=current_w3m_path) |
| | except Exception as exc: |
| | self.logger.debug("Custom extractor failed for %s: %s", netloc, exc) |
| |
|
| | |
| | if html_type == "forum": |
| | return self._run_extractor(ForumParser, html, kwargs, w3m_path=current_w3m_path) |
| | if html_type == "article": |
| | return self._run_extractor(ArticleParser, html, kwargs, w3m_path=current_w3m_path) |
| | if html_type == "unified": |
| | return self._run_extractor(UnifiedParser, html, kwargs, w3m_path=current_w3m_path) |
| |
|
| | |
| | return self._run_extractor(UnifiedParser, html, kwargs, w3m_path=current_w3m_path) |
| |
|
| | def _quick_check_builtin_rules(self, url: str) -> bool: |
| | if not url: |
| | return False |
| | url_lower = url.lower() |
| | for domain in BUILTIN_SITE_RULES: |
| | if domain in url_lower: |
| | return True |
| | return False |
| |
|
| | def _run_extractor(self, extractor_cls: Type, html: str, kwargs: dict, w3m_path: str): |
| | result = extractor_cls().extract(html=html, **dict(kwargs)) |
| | return self._apply_w3m(result, w3m_path=w3m_path) |
| |
|
| | def _apply_w3m(self, result: Optional[dict], w3m_path: str) -> Optional[dict]: |
| | if not result: |
| | return result |
| | html_fragment = result.get("html") |
| | if not html_fragment: |
| | raise RuntimeError("Extraction result does not contain 'html' for w3m") |
| | text = run_w3m_dump(html_fragment, w3m_path) |
| | enriched = dict(result) |
| | enriched["text"] = text |
| | enriched["w3m_text"] = text |
| | enriched["text_length"] = text_len(text) |
| | return enriched |
| |
|