HMM / browser-use-main /browser_use /dom /markdown_extractor.py
Speedofmastery's picture
Merge Landrun + Browser-Use + Chromium with AI agent support (without binary files)
d7b3d84
"""
Shared markdown extraction utilities for browser content processing.
This module provides a unified interface for extracting clean markdown from browser content,
used by both the tools service and page actor.
"""
import re
from typing import TYPE_CHECKING, Any
from browser_use.dom.serializer.html_serializer import HTMLSerializer
from browser_use.dom.service import DomService
if TYPE_CHECKING:
from browser_use.browser.session import BrowserSession
from browser_use.browser.watchdogs.dom_watchdog import DOMWatchdog
async def extract_clean_markdown(
browser_session: 'BrowserSession | None' = None,
dom_service: DomService | None = None,
target_id: str | None = None,
extract_links: bool = False,
) -> tuple[str, dict[str, Any]]:
"""Extract clean markdown from browser content using enhanced DOM tree.
This unified function can extract markdown using either a browser session (for tools service)
or a DOM service with target ID (for page actor).
Args:
browser_session: Browser session to extract content from (tools service path)
dom_service: DOM service instance (page actor path)
target_id: Target ID for the page (required when using dom_service)
extract_links: Whether to preserve links in markdown
Returns:
tuple: (clean_markdown_content, content_statistics)
Raises:
ValueError: If neither browser_session nor (dom_service + target_id) are provided
"""
# Validate input parameters
if browser_session is not None:
if dom_service is not None or target_id is not None:
raise ValueError('Cannot specify both browser_session and dom_service/target_id')
# Browser session path (tools service)
enhanced_dom_tree = await _get_enhanced_dom_tree_from_browser_session(browser_session)
current_url = await browser_session.get_current_page_url()
method = 'enhanced_dom_tree'
elif dom_service is not None and target_id is not None:
# DOM service path (page actor)
enhanced_dom_tree = await dom_service.get_dom_tree(target_id=target_id)
current_url = None # Not available via DOM service
method = 'dom_service'
else:
raise ValueError('Must provide either browser_session or both dom_service and target_id')
# Use the HTML serializer with the enhanced DOM tree
html_serializer = HTMLSerializer(extract_links=extract_links)
page_html = html_serializer.serialize(enhanced_dom_tree)
original_html_length = len(page_html)
# Use markdownify for clean markdown conversion
from markdownify import markdownify as md
content = md(
page_html,
heading_style='ATX', # Use # style headings
strip=['script', 'style'], # Remove these tags
bullets='-', # Use - for unordered lists
code_language='', # Don't add language to code blocks
escape_asterisks=False, # Don't escape asterisks (cleaner output)
escape_underscores=False, # Don't escape underscores (cleaner output)
escape_misc=False, # Don't escape other characters (cleaner output)
autolinks=False, # Don't convert URLs to <> format
default_title=False, # Don't add default title attributes
keep_inline_images_in=[], # Don't keep inline images in any tags (we already filter base64 in HTML)
)
initial_markdown_length = len(content)
# Minimal cleanup - markdownify already does most of the work
content = re.sub(r'%[0-9A-Fa-f]{2}', '', content) # Remove any remaining URL encoding
# Apply light preprocessing to clean up excessive whitespace
content, chars_filtered = _preprocess_markdown_content(content)
final_filtered_length = len(content)
# Content statistics
stats = {
'method': method,
'original_html_chars': original_html_length,
'initial_markdown_chars': initial_markdown_length,
'filtered_chars_removed': chars_filtered,
'final_filtered_chars': final_filtered_length,
}
# Add URL to stats if available
if current_url:
stats['url'] = current_url
return content, stats
async def _get_enhanced_dom_tree_from_browser_session(browser_session: 'BrowserSession'):
"""Get enhanced DOM tree from browser session via DOMWatchdog."""
# Get the enhanced DOM tree from DOMWatchdog
# This captures the current state of the page including dynamic content, shadow roots, etc.
dom_watchdog: DOMWatchdog | None = browser_session._dom_watchdog
assert dom_watchdog is not None, 'DOMWatchdog not available'
# Use cached enhanced DOM tree if available, otherwise build it
if dom_watchdog.enhanced_dom_tree is not None:
return dom_watchdog.enhanced_dom_tree
# Build the enhanced DOM tree if not cached
await dom_watchdog._build_dom_tree_without_highlights()
enhanced_dom_tree = dom_watchdog.enhanced_dom_tree
assert enhanced_dom_tree is not None, 'Enhanced DOM tree not available'
return enhanced_dom_tree
# Legacy aliases removed - all code now uses the unified extract_clean_markdown function
def _preprocess_markdown_content(content: str, max_newlines: int = 3) -> tuple[str, int]:
"""
Light preprocessing of markdown output - minimal cleanup with JSON blob removal.
Args:
content: Markdown content to lightly filter
max_newlines: Maximum consecutive newlines to allow
Returns:
tuple: (filtered_content, chars_filtered)
"""
original_length = len(content)
# Remove JSON blobs (common in SPAs like LinkedIn, Facebook, etc.)
# These are often embedded as `{"key":"value",...}` and can be massive
# Match JSON objects/arrays that are at least 100 chars long
# This catches SPA state/config data without removing small inline JSON
content = re.sub(r'`\{["\w].*?\}`', '', content, flags=re.DOTALL) # Remove JSON in code blocks
content = re.sub(r'\{"\$type":[^}]{100,}\}', '', content) # Remove JSON with $type fields (common pattern)
content = re.sub(r'\{"[^"]{5,}":\{[^}]{100,}\}', '', content) # Remove nested JSON objects
# Compress consecutive newlines (4+ newlines become max_newlines)
content = re.sub(r'\n{4,}', '\n' * max_newlines, content)
# Remove lines that are only whitespace or very short (likely artifacts)
lines = content.split('\n')
filtered_lines = []
for line in lines:
stripped = line.strip()
# Keep lines with substantial content
if len(stripped) > 2:
# Skip lines that look like JSON (start with { or [ and are very long)
if (stripped.startswith('{') or stripped.startswith('[')) and len(stripped) > 100:
continue
filtered_lines.append(line)
content = '\n'.join(filtered_lines)
content = content.strip()
chars_filtered = original_length - len(content)
return content, chars_filtered