HMM / browser-use-main /browser_use /browser /python_highlights.py
Speedofmastery's picture
Merge Landrun + Browser-Use + Chromium with AI agent support (without binary files)
d7b3d84
"""Python-based highlighting system for drawing bounding boxes on screenshots.
This module replaces JavaScript-based highlighting with fast Python image processing
to draw bounding boxes around interactive elements directly on screenshots.
"""
import asyncio
import base64
import io
import logging
import os
from PIL import Image, ImageDraw, ImageFont
from browser_use.dom.views import DOMSelectorMap, EnhancedDOMTreeNode
from browser_use.observability import observe_debug
from browser_use.utils import time_execution_async
logger = logging.getLogger(__name__)
# Font cache to prevent repeated font loading and reduce memory usage
_FONT_CACHE: dict[tuple[str, int], ImageFont.FreeTypeFont | None] = {}
# Cross-platform font paths
_FONT_PATHS = [
'/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', # Linux (Debian/Ubuntu)
'/usr/share/fonts/TTF/DejaVuSans-Bold.ttf', # Linux (Arch/Fedora)
'/System/Library/Fonts/Arial.ttf', # macOS
'C:\\Windows\\Fonts\\arial.ttf', # Windows
'arial.ttf', # Windows (system path)
'Arial Bold.ttf', # macOS alternative
'/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf', # Linux alternative
]
def get_cross_platform_font(font_size: int) -> ImageFont.FreeTypeFont | None:
"""Get a cross-platform compatible font with caching to prevent memory leaks.
Args:
font_size: Size of the font to load
Returns:
ImageFont object or None if no system fonts are available
"""
# Use cache key based on font size
cache_key = ('system_font', font_size)
# Return cached font if available
if cache_key in _FONT_CACHE:
return _FONT_CACHE[cache_key]
# Try to load a system font
font = None
for font_path in _FONT_PATHS:
try:
font = ImageFont.truetype(font_path, font_size)
break
except OSError:
continue
# Cache the result (even if None) to avoid repeated attempts
_FONT_CACHE[cache_key] = font
return font
def cleanup_font_cache() -> None:
"""Clean up the font cache to prevent memory leaks in long-running applications."""
global _FONT_CACHE
_FONT_CACHE.clear()
# Color scheme for different element types
ELEMENT_COLORS = {
'button': '#FF6B6B', # Red for buttons
'input': '#4ECDC4', # Teal for inputs
'select': '#45B7D1', # Blue for dropdowns
'a': '#96CEB4', # Green for links
'textarea': '#FF8C42', # Orange for text areas (was yellow, now more visible)
'default': '#DDA0DD', # Light purple for other interactive elements
}
# Element type mappings
ELEMENT_TYPE_MAP = {
'button': 'button',
'input': 'input',
'select': 'select',
'a': 'a',
'textarea': 'textarea',
}
def get_element_color(tag_name: str, element_type: str | None = None) -> str:
"""Get color for element based on tag name and type."""
# Check input type first
if tag_name == 'input' and element_type:
if element_type in ['button', 'submit']:
return ELEMENT_COLORS['button']
# Use tag-based color
return ELEMENT_COLORS.get(tag_name.lower(), ELEMENT_COLORS['default'])
def should_show_index_overlay(backend_node_id: int | None) -> bool:
"""Determine if index overlay should be shown."""
return backend_node_id is not None
def draw_enhanced_bounding_box_with_text(
draw, # ImageDraw.Draw - avoiding type annotation due to PIL typing issues
bbox: tuple[int, int, int, int],
color: str,
text: str | None = None,
font: ImageFont.FreeTypeFont | None = None,
element_type: str = 'div',
image_size: tuple[int, int] = (2000, 1500),
device_pixel_ratio: float = 1.0,
) -> None:
"""Draw an enhanced bounding box with much bigger index containers and dashed borders."""
x1, y1, x2, y2 = bbox
# Draw dashed bounding box with pattern: 1 line, 2 spaces, 1 line, 2 spaces...
dash_length = 4
gap_length = 8
line_width = 2
# Helper function to draw dashed line
def draw_dashed_line(start_x, start_y, end_x, end_y):
if start_x == end_x: # Vertical line
y = start_y
while y < end_y:
dash_end = min(y + dash_length, end_y)
draw.line([(start_x, y), (start_x, dash_end)], fill=color, width=line_width)
y += dash_length + gap_length
else: # Horizontal line
x = start_x
while x < end_x:
dash_end = min(x + dash_length, end_x)
draw.line([(x, start_y), (dash_end, start_y)], fill=color, width=line_width)
x += dash_length + gap_length
# Draw dashed rectangle
draw_dashed_line(x1, y1, x2, y1) # Top
draw_dashed_line(x2, y1, x2, y2) # Right
draw_dashed_line(x2, y2, x1, y2) # Bottom
draw_dashed_line(x1, y2, x1, y1) # Left
# Draw much bigger index overlay if we have index text
if text:
try:
# Scale font size for appropriate sizing across different resolutions
img_width, img_height = image_size
css_width = img_width # / device_pixel_ratio
# Much smaller scaling - 1% of CSS viewport width, max 16px to prevent huge highlights
base_font_size = max(10, min(20, int(css_width * 0.01)))
# Use shared font loading function with caching
big_font = get_cross_platform_font(base_font_size)
if big_font is None:
big_font = font # Fallback to original font if no system fonts found
# Get text size with bigger font
if big_font:
bbox_text = draw.textbbox((0, 0), text, font=big_font)
text_width = bbox_text[2] - bbox_text[0]
text_height = bbox_text[3] - bbox_text[1]
else:
# Fallback for default font
bbox_text = draw.textbbox((0, 0), text)
text_width = bbox_text[2] - bbox_text[0]
text_height = bbox_text[3] - bbox_text[1]
# Scale padding appropriately for different resolutions
padding = max(4, min(10, int(css_width * 0.005))) # 0.3% of CSS width, max 4px
element_width = x2 - x1
element_height = y2 - y1
# Container dimensions
container_width = text_width + padding * 2
container_height = text_height + padding * 2
# Position in top center - for small elements, place further up to avoid blocking content
# Center horizontally within the element
bg_x1 = x1 + (element_width - container_width) // 2
# Simple rule: if element is small, place index further up to avoid blocking icons
if element_width < 60 or element_height < 30:
# Small element: place well above to avoid blocking content
bg_y1 = max(0, y1 - container_height - 5)
else:
# Regular element: place inside with small offset
bg_y1 = y1 + 2
bg_x2 = bg_x1 + container_width
bg_y2 = bg_y1 + container_height
# Center the number within the index box with proper baseline handling
text_x = bg_x1 + (container_width - text_width) // 2
# Add extra vertical space to prevent clipping
text_y = bg_y1 + (container_height - text_height) // 2 - bbox_text[1] # Subtract top offset
# Ensure container stays within image bounds
img_width, img_height = image_size
if bg_x1 < 0:
offset = -bg_x1
bg_x1 += offset
bg_x2 += offset
text_x += offset
if bg_y1 < 0:
offset = -bg_y1
bg_y1 += offset
bg_y2 += offset
text_y += offset
if bg_x2 > img_width:
offset = bg_x2 - img_width
bg_x1 -= offset
bg_x2 -= offset
text_x -= offset
if bg_y2 > img_height:
offset = bg_y2 - img_height
bg_y1 -= offset
bg_y2 -= offset
text_y -= offset
# Draw bigger background rectangle with thicker border
draw.rectangle([bg_x1, bg_y1, bg_x2, bg_y2], fill=color, outline='white', width=2)
# Draw white text centered in the index box
draw.text((text_x, text_y), text, fill='white', font=big_font or font)
except Exception as e:
logger.debug(f'Failed to draw enhanced text overlay: {e}')
def draw_bounding_box_with_text(
draw, # ImageDraw.Draw - avoiding type annotation due to PIL typing issues
bbox: tuple[int, int, int, int],
color: str,
text: str | None = None,
font: ImageFont.FreeTypeFont | None = None,
) -> None:
"""Draw a bounding box with optional text overlay."""
x1, y1, x2, y2 = bbox
# Draw dashed bounding box
dash_length = 2
gap_length = 6
# Top edge
x = x1
while x < x2:
end_x = min(x + dash_length, x2)
draw.line([(x, y1), (end_x, y1)], fill=color, width=2)
draw.line([(x, y1 + 1), (end_x, y1 + 1)], fill=color, width=2)
x += dash_length + gap_length
# Bottom edge
x = x1
while x < x2:
end_x = min(x + dash_length, x2)
draw.line([(x, y2), (end_x, y2)], fill=color, width=2)
draw.line([(x, y2 - 1), (end_x, y2 - 1)], fill=color, width=2)
x += dash_length + gap_length
# Left edge
y = y1
while y < y2:
end_y = min(y + dash_length, y2)
draw.line([(x1, y), (x1, end_y)], fill=color, width=2)
draw.line([(x1 + 1, y), (x1 + 1, end_y)], fill=color, width=2)
y += dash_length + gap_length
# Right edge
y = y1
while y < y2:
end_y = min(y + dash_length, y2)
draw.line([(x2, y), (x2, end_y)], fill=color, width=2)
draw.line([(x2 - 1, y), (x2 - 1, end_y)], fill=color, width=2)
y += dash_length + gap_length
# Draw index overlay if we have index text
if text:
try:
# Get text size
if font:
bbox_text = draw.textbbox((0, 0), text, font=font)
text_width = bbox_text[2] - bbox_text[0]
text_height = bbox_text[3] - bbox_text[1]
else:
# Fallback for default font
bbox_text = draw.textbbox((0, 0), text)
text_width = bbox_text[2] - bbox_text[0]
text_height = bbox_text[3] - bbox_text[1]
# Smart positioning based on element size
padding = 5
element_width = x2 - x1
element_height = y2 - y1
element_area = element_width * element_height
index_box_area = (text_width + padding * 2) * (text_height + padding * 2)
# Calculate size ratio to determine positioning strategy
size_ratio = element_area / max(index_box_area, 1)
if size_ratio < 4:
# Very small elements: place outside in bottom-right corner
text_x = x2 + padding
text_y = y2 - text_height
# Ensure it doesn't go off screen
text_x = min(text_x, 1200 - text_width - padding)
text_y = max(text_y, 0)
elif size_ratio < 16:
# Medium elements: place in bottom-right corner inside
text_x = x2 - text_width - padding
text_y = y2 - text_height - padding
else:
# Large elements: place in center
text_x = x1 + (element_width - text_width) // 2
text_y = y1 + (element_height - text_height) // 2
# Ensure text stays within bounds
text_x = max(0, min(text_x, 1200 - text_width))
text_y = max(0, min(text_y, 800 - text_height))
# Draw background rectangle for maximum contrast
bg_x1 = text_x - padding
bg_y1 = text_y - padding
bg_x2 = text_x + text_width + padding
bg_y2 = text_y + text_height + padding
# Use white background with thick black border for maximum visibility
draw.rectangle([bg_x1, bg_y1, bg_x2, bg_y2], fill='white', outline='black', width=2)
# Draw bold dark text on light background for best contrast
draw.text((text_x, text_y), text, fill='black', font=font)
except Exception as e:
logger.debug(f'Failed to draw text overlay: {e}')
def process_element_highlight(
element_id: int,
element: EnhancedDOMTreeNode,
draw,
device_pixel_ratio: float,
font,
filter_highlight_ids: bool,
image_size: tuple[int, int],
) -> None:
"""Process a single element for highlighting."""
try:
# Use absolute_position coordinates directly
if not element.absolute_position:
return
bounds = element.absolute_position
# Scale coordinates from CSS pixels to device pixels for screenshot
# The screenshot is captured at device pixel resolution, but coordinates are in CSS pixels
x1 = int(bounds.x * device_pixel_ratio)
y1 = int(bounds.y * device_pixel_ratio)
x2 = int((bounds.x + bounds.width) * device_pixel_ratio)
y2 = int((bounds.y + bounds.height) * device_pixel_ratio)
# Ensure coordinates are within image bounds
img_width, img_height = image_size
x1 = max(0, min(x1, img_width))
y1 = max(0, min(y1, img_height))
x2 = max(x1, min(x2, img_width))
y2 = max(y1, min(y2, img_height))
# Skip if bounding box is too small or invalid
if x2 - x1 < 2 or y2 - y1 < 2:
return
# Get element color based on type
tag_name = element.tag_name if hasattr(element, 'tag_name') else 'div'
element_type = None
if hasattr(element, 'attributes') and element.attributes:
element_type = element.attributes.get('type')
color = get_element_color(tag_name, element_type)
# Get element index for overlay and apply filtering
backend_node_id = getattr(element, 'backend_node_id', None)
index_text = None
if backend_node_id is not None:
if filter_highlight_ids:
# Use the meaningful text that matches what the LLM sees
meaningful_text = element.get_meaningful_text_for_llm()
# Show ID only if meaningful text is less than 5 characters
if len(meaningful_text) < 3:
index_text = str(backend_node_id)
else:
# Always show ID when filter is disabled
index_text = str(backend_node_id)
# Draw enhanced bounding box with bigger index
draw_enhanced_bounding_box_with_text(
draw, (x1, y1, x2, y2), color, index_text, font, tag_name, image_size, device_pixel_ratio
)
except Exception as e:
logger.debug(f'Failed to draw highlight for element {element_id}: {e}')
@observe_debug(ignore_input=True, ignore_output=True, name='create_highlighted_screenshot')
@time_execution_async('create_highlighted_screenshot')
async def create_highlighted_screenshot(
screenshot_b64: str,
selector_map: DOMSelectorMap,
device_pixel_ratio: float = 1.0,
viewport_offset_x: int = 0,
viewport_offset_y: int = 0,
filter_highlight_ids: bool = True,
) -> str:
"""Create a highlighted screenshot with bounding boxes around interactive elements.
Args:
screenshot_b64: Base64 encoded screenshot
selector_map: Map of interactive elements with their positions
device_pixel_ratio: Device pixel ratio for scaling coordinates
viewport_offset_x: X offset for viewport positioning
viewport_offset_y: Y offset for viewport positioning
Returns:
Base64 encoded highlighted screenshot
"""
try:
# Decode screenshot
screenshot_data = base64.b64decode(screenshot_b64)
image = Image.open(io.BytesIO(screenshot_data)).convert('RGBA')
# Create drawing context
draw = ImageDraw.Draw(image)
# Load font using shared function with caching
font = get_cross_platform_font(12)
# If no system fonts found, font remains None and will use default font
# Process elements sequentially to avoid ImageDraw thread safety issues
# PIL ImageDraw is not thread-safe, so we process elements one by one
for element_id, element in selector_map.items():
process_element_highlight(element_id, element, draw, device_pixel_ratio, font, filter_highlight_ids, image.size)
# Convert back to base64
output_buffer = io.BytesIO()
try:
image.save(output_buffer, format='PNG')
output_buffer.seek(0)
highlighted_b64 = base64.b64encode(output_buffer.getvalue()).decode('utf-8')
logger.debug(f'Successfully created highlighted screenshot with {len(selector_map)} elements')
return highlighted_b64
finally:
# Explicit cleanup to prevent memory leaks
output_buffer.close()
if 'image' in locals():
image.close()
except Exception as e:
logger.error(f'Failed to create highlighted screenshot: {e}')
# Clean up on error as well
if 'image' in locals():
image.close()
# Return original screenshot on error
return screenshot_b64
async def get_viewport_info_from_cdp(cdp_session) -> tuple[float, int, int]:
"""Get viewport information from CDP session.
Returns:
Tuple of (device_pixel_ratio, scroll_x, scroll_y)
"""
try:
# Get layout metrics which includes viewport info and device pixel ratio
metrics = await cdp_session.cdp_client.send.Page.getLayoutMetrics(session_id=cdp_session.session_id)
# Extract viewport information
visual_viewport = metrics.get('visualViewport', {})
css_visual_viewport = metrics.get('cssVisualViewport', {})
css_layout_viewport = metrics.get('cssLayoutViewport', {})
# Calculate device pixel ratio
css_width = css_visual_viewport.get('clientWidth', css_layout_viewport.get('clientWidth', 1280.0))
device_width = visual_viewport.get('clientWidth', css_width)
device_pixel_ratio = device_width / css_width if css_width > 0 else 1.0
# Get scroll position in CSS pixels
scroll_x = int(css_visual_viewport.get('pageX', 0))
scroll_y = int(css_visual_viewport.get('pageY', 0))
return float(device_pixel_ratio), scroll_x, scroll_y
except Exception as e:
logger.debug(f'Failed to get viewport info from CDP: {e}')
return 1.0, 0, 0
@time_execution_async('create_highlighted_screenshot_async')
async def create_highlighted_screenshot_async(
screenshot_b64: str, selector_map: DOMSelectorMap, cdp_session=None, filter_highlight_ids: bool = True
) -> str:
"""Async wrapper for creating highlighted screenshots.
Args:
screenshot_b64: Base64 encoded screenshot
selector_map: Map of interactive elements
cdp_session: CDP session for getting viewport info
filter_highlight_ids: Whether to filter element IDs based on meaningful text
Returns:
Base64 encoded highlighted screenshot
"""
# Get viewport information if CDP session is available
device_pixel_ratio = 1.0
viewport_offset_x = 0
viewport_offset_y = 0
if cdp_session:
try:
device_pixel_ratio, viewport_offset_x, viewport_offset_y = await get_viewport_info_from_cdp(cdp_session)
except Exception as e:
logger.debug(f'Failed to get viewport info from CDP: {e}')
# Create highlighted screenshot with async processing
final_screenshot = await create_highlighted_screenshot(
screenshot_b64, selector_map, device_pixel_ratio, viewport_offset_x, viewport_offset_y, filter_highlight_ids
)
filename = os.getenv('BROWSER_USE_SCREENSHOT_FILE')
if filename:
def _write_screenshot():
try:
with open(filename, 'wb') as f:
f.write(base64.b64decode(final_screenshot))
logger.debug('Saved screenshot to ' + str(filename))
except Exception as e:
logger.warning(f'Failed to save screenshot to {filename}: {e}')
await asyncio.to_thread(_write_screenshot)
return final_screenshot
# Export the cleanup function for external use in long-running applications
__all__ = ['create_highlighted_screenshot', 'create_highlighted_screenshot_async', 'cleanup_font_cache']