| import json |
| import logging |
| import os |
| import platform |
| import re |
| import shutil |
| import sys |
| import tempfile |
| import urllib.parse |
|
|
| from selenium.webdriver.chrome.webdriver import WebDriver |
| import undetected_chromedriver as uc |
|
|
| FLARESOLVERR_VERSION = None |
| PLATFORM_VERSION = None |
| CHROME_EXE_PATH = None |
| CHROME_MAJOR_VERSION = None |
| USER_AGENT = None |
| XVFB_DISPLAY = None |
| PATCHED_DRIVER_PATH = None |
|
|
|
|
| def get_config_log_html() -> bool: |
| return os.environ.get('LOG_HTML', 'false').lower() == 'true' |
|
|
|
|
| def get_config_headless() -> bool: |
| return os.environ.get('HEADLESS', 'true').lower() == 'true' |
|
|
|
|
| def get_config_disable_media() -> bool: |
| return os.environ.get('DISABLE_MEDIA', 'false').lower() == 'true' |
|
|
|
|
| def get_flaresolverr_version() -> str: |
| global FLARESOLVERR_VERSION |
| if FLARESOLVERR_VERSION is not None: |
| return FLARESOLVERR_VERSION |
|
|
| package_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, 'package.json') |
| if not os.path.isfile(package_path): |
| package_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'package.json') |
| with open(package_path) as f: |
| FLARESOLVERR_VERSION = json.loads(f.read())['version'] |
| return FLARESOLVERR_VERSION |
|
|
| def get_current_platform() -> str: |
| global PLATFORM_VERSION |
| if PLATFORM_VERSION is not None: |
| return PLATFORM_VERSION |
| PLATFORM_VERSION = os.name |
| return PLATFORM_VERSION |
|
|
|
|
| def create_proxy_extension(proxy: dict) -> str: |
| parsed_url = urllib.parse.urlparse(proxy['url']) |
| scheme = parsed_url.scheme |
| host = parsed_url.hostname |
| port = parsed_url.port |
| username = proxy['username'] |
| password = proxy['password'] |
| manifest_json = """ |
| { |
| "version": "1.0.0", |
| "manifest_version": 3, |
| "name": "Chrome Proxy", |
| "permissions": [ |
| "proxy", |
| "tabs", |
| "storage", |
| "webRequest", |
| "webRequestAuthProvider" |
| ], |
| "host_permissions": [ |
| "<all_urls>" |
| ], |
| "background": { |
| "service_worker": "background.js" |
| }, |
| "minimum_chrome_version": "76.0.0" |
| } |
| """ |
|
|
| background_js = """ |
| var config = { |
| mode: "fixed_servers", |
| rules: { |
| singleProxy: { |
| scheme: "%s", |
| host: "%s", |
| port: %d |
| }, |
| bypassList: ["localhost"] |
| } |
| }; |
| |
| chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); |
| |
| function callbackFn(details) { |
| return { |
| authCredentials: { |
| username: "%s", |
| password: "%s" |
| } |
| }; |
| } |
| |
| chrome.webRequest.onAuthRequired.addListener( |
| callbackFn, |
| { urls: ["<all_urls>"] }, |
| ['blocking'] |
| ); |
| """ % ( |
| scheme, |
| host, |
| port, |
| username, |
| password |
| ) |
|
|
| proxy_extension_dir = tempfile.mkdtemp() |
|
|
| with open(os.path.join(proxy_extension_dir, "manifest.json"), "w") as f: |
| f.write(manifest_json) |
|
|
| with open(os.path.join(proxy_extension_dir, "background.js"), "w") as f: |
| f.write(background_js) |
|
|
| return proxy_extension_dir |
|
|
|
|
| def get_webdriver(proxy: dict = None) -> WebDriver: |
| global PATCHED_DRIVER_PATH, USER_AGENT |
| logging.debug('Launching web browser...') |
|
|
| |
| options = uc.ChromeOptions() |
| options.add_argument('--no-sandbox') |
| options.add_argument('--window-size=1920,1080') |
| options.add_argument('--disable-search-engine-choice-screen') |
| |
| options.add_argument('--disable-setuid-sandbox') |
| options.add_argument('--disable-dev-shm-usage') |
| |
| options.add_argument('--no-zygote') |
| |
| IS_ARMARCH = platform.machine().startswith(('arm', 'aarch')) |
| if IS_ARMARCH: |
| options.add_argument('--disable-gpu-sandbox') |
| options.add_argument('--ignore-certificate-errors') |
| options.add_argument('--ignore-ssl-errors') |
|
|
| language = os.environ.get('LANG', None) |
| if language is not None: |
| options.add_argument('--accept-lang=%s' % language) |
|
|
| |
| if USER_AGENT is not None: |
| options.add_argument('--user-agent=%s' % USER_AGENT) |
|
|
| proxy_extension_dir = None |
| if proxy and all(key in proxy for key in ['url', 'username', 'password']): |
| proxy_extension_dir = create_proxy_extension(proxy) |
| options.add_argument("--disable-features=DisableLoadExtensionCommandLineSwitch") |
| options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir)) |
| elif proxy and 'url' in proxy: |
| proxy_url = proxy['url'] |
| logging.debug("Using webdriver proxy: %s", proxy_url) |
| options.add_argument('--proxy-server=%s' % proxy_url) |
|
|
| |
| |
| windows_headless = False |
| if get_config_headless(): |
| if os.name == 'nt': |
| windows_headless = True |
| else: |
| start_xvfb_display() |
| |
| |
|
|
| |
| driver_exe_path = None |
| version_main = None |
| if os.path.exists("/app/chromedriver"): |
| |
| driver_exe_path = "/app/chromedriver" |
| else: |
| version_main = get_chrome_major_version() |
| if PATCHED_DRIVER_PATH is not None: |
| driver_exe_path = PATCHED_DRIVER_PATH |
|
|
| |
| browser_executable_path = get_chrome_exe_path() |
|
|
| |
| |
| try: |
| driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path, |
| driver_executable_path=driver_exe_path, version_main=version_main, |
| windows_headless=windows_headless, headless=get_config_headless()) |
| except Exception as e: |
| logging.error("Error starting Chrome: %s" % e) |
| |
| raise e |
|
|
| |
| if driver_exe_path is None: |
| PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name) |
| if PATCHED_DRIVER_PATH != driver.patcher.executable_path: |
| shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH) |
|
|
| |
| if proxy_extension_dir is not None: |
| shutil.rmtree(proxy_extension_dir) |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| return driver |
|
|
|
|
| def get_chrome_exe_path() -> str: |
| global CHROME_EXE_PATH |
| if CHROME_EXE_PATH is not None: |
| return CHROME_EXE_PATH |
| |
| chrome_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome', "chrome") |
| if os.path.exists(chrome_path): |
| if not os.access(chrome_path, os.X_OK): |
| raise Exception(f'Chrome binary "{chrome_path}" is not executable. ' |
| f'Please, extract the archive with "tar xzf <file.tar.gz>".') |
| CHROME_EXE_PATH = chrome_path |
| return CHROME_EXE_PATH |
| |
| chrome_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome', "chrome.exe") |
| if os.path.exists(chrome_path): |
| CHROME_EXE_PATH = chrome_path |
| return CHROME_EXE_PATH |
| |
| CHROME_EXE_PATH = uc.find_chrome_executable() |
| return CHROME_EXE_PATH |
|
|
|
|
| def get_chrome_major_version() -> str: |
| global CHROME_MAJOR_VERSION |
| if CHROME_MAJOR_VERSION is not None: |
| return CHROME_MAJOR_VERSION |
|
|
| if os.name == 'nt': |
| |
| try: |
| complete_version = extract_version_nt_executable(get_chrome_exe_path()) |
| except Exception: |
| try: |
| complete_version = extract_version_nt_registry() |
| except Exception: |
| |
| complete_version = extract_version_nt_folder() |
| else: |
| chrome_path = get_chrome_exe_path() |
| process = os.popen(f'"{chrome_path}" --version') |
| |
| |
| complete_version = process.read() |
| process.close() |
|
|
| CHROME_MAJOR_VERSION = complete_version.split('.')[0].split(' ')[-1] |
| return CHROME_MAJOR_VERSION |
|
|
|
|
| def extract_version_nt_executable(exe_path: str) -> str: |
| import pefile |
| pe = pefile.PE(exe_path, fast_load=True) |
| pe.parse_data_directories( |
| directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_RESOURCE"]] |
| ) |
| return pe.FileInfo[0][0].StringTable[0].entries[b"FileVersion"].decode('utf-8') |
|
|
|
|
| def extract_version_nt_registry() -> str: |
| stream = os.popen( |
| 'reg query "HKLM\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Google Chrome"') |
| output = stream.read() |
| google_version = '' |
| for letter in output[output.rindex('DisplayVersion REG_SZ') + 24:]: |
| if letter != '\n': |
| google_version += letter |
| else: |
| break |
| return google_version.strip() |
|
|
|
|
| def extract_version_nt_folder() -> str: |
| |
| for i in range(2): |
| path = 'C:\\Program Files' + (' (x86)' if i else '') + '\\Google\\Chrome\\Application' |
| if os.path.isdir(path): |
| paths = [f.path for f in os.scandir(path) if f.is_dir()] |
| for path in paths: |
| filename = os.path.basename(path) |
| pattern = r'\d+\.\d+\.\d+\.\d+' |
| match = re.search(pattern, filename) |
| if match and match.group(): |
| |
| return match.group(0) |
| return '' |
|
|
|
|
| def get_user_agent(driver=None) -> str: |
| global USER_AGENT |
| if USER_AGENT is not None: |
| return USER_AGENT |
|
|
| try: |
| if driver is None: |
| driver = get_webdriver() |
| USER_AGENT = driver.execute_script("return navigator.userAgent") |
| |
| USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE) |
| return USER_AGENT |
| except Exception as e: |
| raise Exception("Error getting browser User-Agent. " + str(e)) |
| finally: |
| if driver is not None: |
| if PLATFORM_VERSION == "nt": |
| driver.close() |
| driver.quit() |
|
|
|
|
| def start_xvfb_display(): |
| global XVFB_DISPLAY |
| if XVFB_DISPLAY is None: |
| from xvfbwrapper import Xvfb |
| XVFB_DISPLAY = Xvfb() |
| XVFB_DISPLAY.start() |
|
|
|
|
| def object_to_dict(_object): |
| json_dict = json.loads(json.dumps(_object, default=lambda o: o.__dict__)) |
| |
| return {k: v for k, v in json_dict.items() if not k.startswith('__')} |
|
|