File size: 4,018 Bytes
65cbdfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f3da1e
65cbdfa
 
 
 
 
 
 
 
 
 
 
7f3da1e
 
 
 
 
 
65cbdfa
 
 
7f3da1e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import asyncio
import logging
import subprocess
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

logger = logging.getLogger(__name__)

class CustomBrowser:
    def __init__(self, config=None):
        self.config = config or {}
        self.driver = None

    async def new_context(self):
        return await self._setup_browser()

    async def _setup_browser(self):
        chrome_options = Options()

        if self.config.get("headless", True):
            chrome_options.add_argument("--headless=new")

        if self.config.get("disable_security", False):
            chrome_options.add_argument("--disable-web-security")
            chrome_options.add_argument("--disable-site-isolation-trials")
            chrome_options.add_argument("--disable-features=IsolateOrigins,site-per-process")

        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-blink-features=AutomationControlled")
        chrome_options.add_argument("--disable-infobars")
        chrome_options.add_argument("--disable-background-timer-throttling")
        chrome_options.add_argument("--disable-popup-blocking")
        chrome_options.add_argument("--disable-backgrounding-occluded-windows")
        chrome_options.add_argument("--disable-renderer-backgrounding")
        chrome_options.add_argument("--window-position=0,0")

        if self.config.get("proxy"):
            chrome_options.add_argument(f"--proxy-server={self.config['proxy']}")

        if self.config.get("wss_url"):
            chrome_options.debugger_address = self.config["wss_url"]
            self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
            return self.driver

        elif self.config.get("chrome_instance_path"):
            try:
                response = requests.get('http://localhost:9222/json/version', timeout=2)
                if response.status_code == 200:
                    logger.info('Reusing existing Chrome instance')
                    chrome_options.debugger_address = "localhost:9222"
                    self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
                    return self.driver
            except requests.ConnectionError:
                logger.debug('No existing Chrome instance found, starting a new one')

            subprocess.Popen(
                [
                    self.config["chrome_instance_path"],
                    '--remote-debugging-port=9222',
                ],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
            )

            for _ in range(10):
                try:
                    response = requests.get('http://localhost:9222/json/version', timeout=2)
                    if response.status_code == 200:
                        break
                except requests.ConnectionError:
                    await asyncio.sleep(1)

            try:
                chrome_options.debugger_address = "localhost:9222"
                self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
                return self.driver
            except Exception as e:
                logger.error(f'Failed to start a new Chrome instance: {str(e)}')
                raise RuntimeError(
                    'To start Chrome in Debug mode, close all existing Chrome instances and try again.'
                )

        try:
            self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
            return self.driver
        except Exception as e:
            logger.error(f'Failed to initialize Selenium browser: {str(e)}')
            raise

    async def close(self):
        if self.driver:
            self.driver.quit()