Spaces:
No application file
No application file
File size: 6,176 Bytes
a5e74de |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import os
import tempfile
import asyncio
import aiohttp
from playwright.async_api import async_playwright
async def start_chrome_with_debug_port(port: int = 9222):
"""
Start Chrome with remote debugging enabled.
Returns the Chrome process.
"""
user_data_dir = tempfile.mkdtemp(prefix='chrome_cdp_')
print(f"Created temp user data dir: {user_data_dir}")
chrome_paths = [
r'C:\Program Files\Google\Chrome\Application\chrome.exe',
'chrome.exe',
'chrome',
]
chrome_exe = None
print(f"Looking for Chrome executable in these locations: {chrome_paths}")
for path in chrome_paths:
if os.path.exists(path):
print(f"Found Chrome at: {path}")
try:
print(f"Testing executable: {path}")
test_proc = await asyncio.create_subprocess_exec(
path, '--version', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await test_proc.communicate()
if test_proc.returncode == 0:
version = stdout.decode().strip() if stdout else "Unknown version"
print(f"Chrome executable works! Version: {version}")
chrome_exe = path
break
else:
error = stderr.decode().strip() if stderr else "Unknown error"
print(f"Chrome executable test failed: {error}")
except Exception as e:
print(f"Error testing Chrome executable {path}: {e}")
continue
elif path in ['chrome', 'chromium', 'chrome.exe']:
print(f"Checking PATH for {path}")
try:
test_proc = await asyncio.create_subprocess_exec(
path, '--version', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await test_proc.communicate()
if test_proc.returncode == 0:
version = stdout.decode().strip() if stdout else "Unknown version"
print(f"Chrome executable works via PATH! Version: {version}")
chrome_exe = path
break
else:
error = stderr.decode().strip() if stderr else "Unknown error"
print(f"Chrome executable test via PATH failed: {error}")
except Exception as e:
print(f"Error testing Chrome executable via PATH {path}: {e}")
continue
if not chrome_exe:
raise RuntimeError('❌ Chrome not found. Please install Chrome or Chromium.')
cmd = [
chrome_exe,
f'--remote-debugging-port={port}',
f'--user-data-dir={user_data_dir}',
'--no-first-run',
'--no-default-browser-check',
'--disable-extensions',
'--disable-background-networking',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-breakpad',
'--disable-component-extensions-with-background-pages',
'--disable-features=TranslateUI,BlinkGenPropertyTrees',
'--disable-ipc-flooding-protection',
'--disable-popup-blocking',
'--disable-prompt-on-repost',
'--disable-renderer-backgrounding',
'--force-color-profile=srgb',
'--metrics-recording-only',
'--mute-audio',
'about:blank',
]
print(f"Starting Chrome with command: {cmd}")
process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
print(f"Chrome process started with PID: {process.pid}")
print(f"Waiting for Chrome CDP to be available at http://localhost:{port}/json/version...")
cdp_ready = False
for attempt in range(20):
try:
async with aiohttp.ClientSession() as session:
print(f"CDP check attempt {attempt+1}/20...")
async with session.get(
f'http://localhost:{port}/json/version', timeout=aiohttp.ClientTimeout(total=1)
) as response:
if response.status == 200:
data = await response.json()
print(f"CDP connected successfully! Chrome version: {data.get('Browser', 'Unknown')}")
cdp_ready = True
break
else:
print(f"CDP check failed with status: {response.status}")
except Exception as e:
print(f"CDP check failed with error: {type(e).__name__}: {e}")
await asyncio.sleep(1)
if not cdp_ready:
print(f"ERROR: Chrome DevTools Protocol not available after timeout on port {port}")
stdout_data, stderr_data = await process.communicate()
print(f"Chrome STDOUT: {stdout_data.decode('utf-8', errors='ignore')}")
print(f"Chrome STDERR: {stderr_data.decode('utf-8', errors='ignore')}")
process.terminate()
raise RuntimeError('❌ Chrome failed to start with CDP')
return process
async def connect_playwright_to_cdp(cdp_url: str):
"""
Connect Playwright to the same Chrome instance Browser-Use is using.
Returns the Playwright browser and page.
"""
print(f"Connecting Playwright to CDP URL: {cdp_url}")
playwright = await async_playwright().start()
playwright_browser = await playwright.chromium.connect_over_cdp(cdp_url)
print(f"Playwright connected to browser")
if playwright_browser and playwright_browser.contexts and playwright_browser.contexts[0].pages:
playwright_page = playwright_browser.contexts[0].pages[0]
print(f"Using existing page: {await playwright_page.title()}")
elif playwright_browser:
print("No existing pages found, creating a new context and page")
context = await playwright_browser.new_context()
playwright_page = await context.new_page()
else:
playwright_page = None
print(f"Playwright page setup complete")
return playwright_browser, playwright_page
|