smgp / src /utils /chrome_playwright.py
muhammadmaazuddin's picture
worked
a5e74de
import os
import tempfile
import asyncio
import aiohttp
from playwright.async_api import async_playwright
async def start_chrome_with_debug_port(port: int = 9222):
"""
Start Chrome with remote debugging enabled.
Returns the Chrome process.
"""
user_data_dir = tempfile.mkdtemp(prefix='chrome_cdp_')
print(f"Created temp user data dir: {user_data_dir}")
chrome_paths = [
r'C:\Program Files\Google\Chrome\Application\chrome.exe',
'chrome.exe',
'chrome',
]
chrome_exe = None
print(f"Looking for Chrome executable in these locations: {chrome_paths}")
for path in chrome_paths:
if os.path.exists(path):
print(f"Found Chrome at: {path}")
try:
print(f"Testing executable: {path}")
test_proc = await asyncio.create_subprocess_exec(
path, '--version', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await test_proc.communicate()
if test_proc.returncode == 0:
version = stdout.decode().strip() if stdout else "Unknown version"
print(f"Chrome executable works! Version: {version}")
chrome_exe = path
break
else:
error = stderr.decode().strip() if stderr else "Unknown error"
print(f"Chrome executable test failed: {error}")
except Exception as e:
print(f"Error testing Chrome executable {path}: {e}")
continue
elif path in ['chrome', 'chromium', 'chrome.exe']:
print(f"Checking PATH for {path}")
try:
test_proc = await asyncio.create_subprocess_exec(
path, '--version', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await test_proc.communicate()
if test_proc.returncode == 0:
version = stdout.decode().strip() if stdout else "Unknown version"
print(f"Chrome executable works via PATH! Version: {version}")
chrome_exe = path
break
else:
error = stderr.decode().strip() if stderr else "Unknown error"
print(f"Chrome executable test via PATH failed: {error}")
except Exception as e:
print(f"Error testing Chrome executable via PATH {path}: {e}")
continue
if not chrome_exe:
raise RuntimeError('❌ Chrome not found. Please install Chrome or Chromium.')
cmd = [
chrome_exe,
f'--remote-debugging-port={port}',
f'--user-data-dir={user_data_dir}',
'--no-first-run',
'--no-default-browser-check',
'--disable-extensions',
'--disable-background-networking',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-breakpad',
'--disable-component-extensions-with-background-pages',
'--disable-features=TranslateUI,BlinkGenPropertyTrees',
'--disable-ipc-flooding-protection',
'--disable-popup-blocking',
'--disable-prompt-on-repost',
'--disable-renderer-backgrounding',
'--force-color-profile=srgb',
'--metrics-recording-only',
'--mute-audio',
'about:blank',
]
print(f"Starting Chrome with command: {cmd}")
process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
print(f"Chrome process started with PID: {process.pid}")
print(f"Waiting for Chrome CDP to be available at http://localhost:{port}/json/version...")
cdp_ready = False
for attempt in range(20):
try:
async with aiohttp.ClientSession() as session:
print(f"CDP check attempt {attempt+1}/20...")
async with session.get(
f'http://localhost:{port}/json/version', timeout=aiohttp.ClientTimeout(total=1)
) as response:
if response.status == 200:
data = await response.json()
print(f"CDP connected successfully! Chrome version: {data.get('Browser', 'Unknown')}")
cdp_ready = True
break
else:
print(f"CDP check failed with status: {response.status}")
except Exception as e:
print(f"CDP check failed with error: {type(e).__name__}: {e}")
await asyncio.sleep(1)
if not cdp_ready:
print(f"ERROR: Chrome DevTools Protocol not available after timeout on port {port}")
stdout_data, stderr_data = await process.communicate()
print(f"Chrome STDOUT: {stdout_data.decode('utf-8', errors='ignore')}")
print(f"Chrome STDERR: {stderr_data.decode('utf-8', errors='ignore')}")
process.terminate()
raise RuntimeError('❌ Chrome failed to start with CDP')
return process
async def connect_playwright_to_cdp(cdp_url: str):
"""
Connect Playwright to the same Chrome instance Browser-Use is using.
Returns the Playwright browser and page.
"""
print(f"Connecting Playwright to CDP URL: {cdp_url}")
playwright = await async_playwright().start()
playwright_browser = await playwright.chromium.connect_over_cdp(cdp_url)
print(f"Playwright connected to browser")
if playwright_browser and playwright_browser.contexts and playwright_browser.contexts[0].pages:
playwright_page = playwright_browser.contexts[0].pages[0]
print(f"Using existing page: {await playwright_page.title()}")
elif playwright_browser:
print("No existing pages found, creating a new context and page")
context = await playwright_browser.new_context()
playwright_page = await context.new_page()
else:
playwright_page = None
print(f"Playwright page setup complete")
return playwright_browser, playwright_page