Webui / src /browser /custom_browser.py
Josedcape's picture
Update src/browser/custom_browser.py
7f3da1e verified
import asyncio
import logging
import subprocess
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
logger = logging.getLogger(__name__)
class CustomBrowser:
def __init__(self, config=None):
self.config = config or {}
self.driver = None
async def new_context(self):
return await self._setup_browser()
async def _setup_browser(self):
chrome_options = Options()
if self.config.get("headless", True):
chrome_options.add_argument("--headless=new")
if self.config.get("disable_security", False):
chrome_options.add_argument("--disable-web-security")
chrome_options.add_argument("--disable-site-isolation-trials")
chrome_options.add_argument("--disable-features=IsolateOrigins,site-per-process")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--disable-infobars")
chrome_options.add_argument("--disable-background-timer-throttling")
chrome_options.add_argument("--disable-popup-blocking")
chrome_options.add_argument("--disable-backgrounding-occluded-windows")
chrome_options.add_argument("--disable-renderer-backgrounding")
chrome_options.add_argument("--window-position=0,0")
if self.config.get("proxy"):
chrome_options.add_argument(f"--proxy-server={self.config['proxy']}")
if self.config.get("wss_url"):
chrome_options.debugger_address = self.config["wss_url"]
self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
return self.driver
elif self.config.get("chrome_instance_path"):
try:
response = requests.get('http://localhost:9222/json/version', timeout=2)
if response.status_code == 200:
logger.info('Reusing existing Chrome instance')
chrome_options.debugger_address = "localhost:9222"
self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
return self.driver
except requests.ConnectionError:
logger.debug('No existing Chrome instance found, starting a new one')
subprocess.Popen(
[
self.config["chrome_instance_path"],
'--remote-debugging-port=9222',
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
for _ in range(10):
try:
response = requests.get('http://localhost:9222/json/version', timeout=2)
if response.status_code == 200:
break
except requests.ConnectionError:
await asyncio.sleep(1)
try:
chrome_options.debugger_address = "localhost:9222"
self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
return self.driver
except Exception as e:
logger.error(f'Failed to start a new Chrome instance: {str(e)}')
raise RuntimeError(
'To start Chrome in Debug mode, close all existing Chrome instances and try again.'
)
try:
self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
return self.driver
except Exception as e:
logger.error(f'Failed to initialize Selenium browser: {str(e)}')
raise
async def close(self):
if self.driver:
self.driver.quit()