| import logging |
| import re |
| import subprocess |
| from functools import lru_cache |
| from urllib.parse import urljoin |
|
|
| from markdownify import markdownify as md |
| from readabilipy import simple_json_from_html_string |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _is_readability_js_available() -> bool: |
| """Check whether Node + @mozilla/readability is available in runtime.""" |
| try: |
| subprocess.run( |
| ["node", "-e", "require.resolve('@mozilla/readability')"], |
| capture_output=True, |
| text=True, |
| check=True, |
| timeout=5, |
| ) |
| return True |
| except FileNotFoundError: |
| logger.warning("Node.js is not available; readability extraction will use pure-Python mode") |
| return False |
| except subprocess.TimeoutExpired: |
| logger.warning("Node.js readability check timed out; readability extraction will use pure-Python mode") |
| return False |
| except subprocess.CalledProcessError as exc: |
| stderr_info = (exc.stderr or "").strip() |
| if stderr_info: |
| logger.warning( |
| "Node readability module is unavailable (%s); readability extraction will use pure-Python mode", |
| stderr_info, |
| ) |
| else: |
| logger.warning("Node readability module is unavailable; readability extraction will use pure-Python mode") |
| return False |
|
|
|
|
| class Article: |
| url: str |
|
|
| def __init__(self, title: str, html_content: str): |
| self.title = title |
| self.html_content = html_content |
|
|
| def to_markdown(self, including_title: bool = True) -> str: |
| markdown = "" |
| if including_title: |
| markdown += f"# {self.title}\n\n" |
|
|
| if self.html_content is None or not str(self.html_content).strip(): |
| markdown += "*No content available*\n" |
| else: |
| markdown += md(self.html_content) |
|
|
| return markdown |
|
|
| def to_message(self) -> list[dict]: |
| image_pattern = r"!\[.*?\]\((.*?)\)" |
|
|
| content: list[dict[str, str]] = [] |
| markdown = self.to_markdown() |
|
|
| if not markdown or not markdown.strip(): |
| return [{"type": "text", "text": "No content available"}] |
|
|
| parts = re.split(image_pattern, markdown) |
|
|
| for i, part in enumerate(parts): |
| if i % 2 == 1: |
| image_url = urljoin(self.url, part.strip()) |
| content.append({"type": "image_url", "image_url": {"url": image_url}}) |
| else: |
| text_part = part.strip() |
| if text_part: |
| content.append({"type": "text", "text": text_part}) |
|
|
| |
| if not content: |
| content = [{"type": "text", "text": "No content available"}] |
|
|
| return content |
|
|
|
|
| class ReadabilityExtractor: |
| def extract_article(self, html: str) -> Article: |
| use_readability_js = _is_readability_js_available() |
| try: |
| article = simple_json_from_html_string(html, use_readability=use_readability_js) |
| except (subprocess.CalledProcessError, FileNotFoundError) as exc: |
| stderr = getattr(exc, "stderr", None) |
| if isinstance(stderr, bytes): |
| stderr = stderr.decode(errors="replace") |
| stderr_info = f"; stderr={stderr.strip()}" if isinstance(stderr, str) and stderr.strip() else "" |
| logger.warning( |
| "Readability.js extraction failed with %s%s; falling back to pure-Python extraction", |
| type(exc).__name__, |
| stderr_info, |
| exc_info=True, |
| ) |
| article = simple_json_from_html_string(html, use_readability=False) |
|
|
| html_content = article.get("content") |
| if not html_content or not str(html_content).strip(): |
| html_content = "No content could be extracted from this page" |
|
|
| title = article.get("title") |
| if not title or not str(title).strip(): |
| title = "Untitled" |
|
|
| return Article(title=title, html_content=html_content) |
|
|