|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import platform |
|
|
import re |
|
|
import shutil |
|
|
import sys |
|
|
import tempfile |
|
|
import urllib.parse |
|
|
|
|
|
from selenium.webdriver.chrome.webdriver import WebDriver |
|
|
import undetected_chromedriver as uc |
|
|
|
|
|
FLARESOLVERR_VERSION = None |
|
|
PLATFORM_VERSION = None |
|
|
CHROME_EXE_PATH = None |
|
|
CHROME_MAJOR_VERSION = None |
|
|
USER_AGENT = None |
|
|
XVFB_DISPLAY = None |
|
|
PATCHED_DRIVER_PATH = None |
|
|
|
|
|
|
|
|
def get_config_log_html() -> bool: |
|
|
return os.environ.get('LOG_HTML', 'false').lower() == 'true' |
|
|
|
|
|
|
|
|
def get_config_headless() -> bool: |
|
|
return os.environ.get('HEADLESS', 'true').lower() == 'true' |
|
|
|
|
|
|
|
|
def get_flaresolverr_version() -> str: |
|
|
global FLARESOLVERR_VERSION |
|
|
if FLARESOLVERR_VERSION is not None: |
|
|
return FLARESOLVERR_VERSION |
|
|
|
|
|
package_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, 'package.json') |
|
|
if not os.path.isfile(package_path): |
|
|
package_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'package.json') |
|
|
with open(package_path) as f: |
|
|
FLARESOLVERR_VERSION = json.loads(f.read())['version'] |
|
|
return FLARESOLVERR_VERSION |
|
|
|
|
|
def get_current_platform() -> str: |
|
|
global PLATFORM_VERSION |
|
|
if PLATFORM_VERSION is not None: |
|
|
return PLATFORM_VERSION |
|
|
PLATFORM_VERSION = os.name |
|
|
return PLATFORM_VERSION |
|
|
|
|
|
|
|
|
def create_proxy_extension(proxy: dict) -> str: |
|
|
parsed_url = urllib.parse.urlparse(proxy['url']) |
|
|
scheme = parsed_url.scheme |
|
|
host = parsed_url.hostname |
|
|
port = parsed_url.port |
|
|
username = proxy['username'] |
|
|
password = proxy['password'] |
|
|
manifest_json = """ |
|
|
{ |
|
|
"version": "1.0.0", |
|
|
"manifest_version": 2, |
|
|
"name": "Chrome Proxy", |
|
|
"permissions": [ |
|
|
"proxy", |
|
|
"tabs", |
|
|
"unlimitedStorage", |
|
|
"storage", |
|
|
"<all_urls>", |
|
|
"webRequest", |
|
|
"webRequestBlocking" |
|
|
], |
|
|
"background": {"scripts": ["background.js"]}, |
|
|
"minimum_chrome_version": "76.0.0" |
|
|
} |
|
|
""" |
|
|
|
|
|
background_js = """ |
|
|
var config = { |
|
|
mode: "fixed_servers", |
|
|
rules: { |
|
|
singleProxy: { |
|
|
scheme: "%s", |
|
|
host: "%s", |
|
|
port: %d |
|
|
}, |
|
|
bypassList: ["localhost"] |
|
|
} |
|
|
}; |
|
|
|
|
|
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); |
|
|
|
|
|
function callbackFn(details) { |
|
|
return { |
|
|
authCredentials: { |
|
|
username: "%s", |
|
|
password: "%s" |
|
|
} |
|
|
}; |
|
|
} |
|
|
|
|
|
chrome.webRequest.onAuthRequired.addListener( |
|
|
callbackFn, |
|
|
{ urls: ["<all_urls>"] }, |
|
|
['blocking'] |
|
|
); |
|
|
""" % ( |
|
|
scheme, |
|
|
host, |
|
|
port, |
|
|
username, |
|
|
password |
|
|
) |
|
|
|
|
|
proxy_extension_dir = tempfile.mkdtemp() |
|
|
|
|
|
with open(os.path.join(proxy_extension_dir, "manifest.json"), "w") as f: |
|
|
f.write(manifest_json) |
|
|
|
|
|
with open(os.path.join(proxy_extension_dir, "background.js"), "w") as f: |
|
|
f.write(background_js) |
|
|
|
|
|
return proxy_extension_dir |
|
|
|
|
|
|
|
|
def get_webdriver(proxy: dict = None) -> WebDriver: |
|
|
global PATCHED_DRIVER_PATH, USER_AGENT |
|
|
logging.debug('Launching web browser...') |
|
|
|
|
|
|
|
|
options = uc.ChromeOptions() |
|
|
options.add_argument('--no-sandbox') |
|
|
options.add_argument('--window-size=1920,1080') |
|
|
options.add_argument('--disable-search-engine-choice-screen') |
|
|
|
|
|
options.add_argument('--disable-setuid-sandbox') |
|
|
options.add_argument('--disable-dev-shm-usage') |
|
|
|
|
|
options.add_argument('--no-zygote') |
|
|
|
|
|
IS_ARMARCH = platform.machine().startswith(('arm', 'aarch')) |
|
|
if IS_ARMARCH: |
|
|
options.add_argument('--disable-gpu-sandbox') |
|
|
options.add_argument('--disable-software-rasterizer') |
|
|
options.add_argument('--ignore-certificate-errors') |
|
|
options.add_argument('--ignore-ssl-errors') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
options.add_argument('--use-gl=swiftshader') |
|
|
|
|
|
language = os.environ.get('LANG', None) |
|
|
if language is not None: |
|
|
options.add_argument('--accept-lang=%s' % language) |
|
|
|
|
|
|
|
|
if USER_AGENT is not None: |
|
|
options.add_argument('--user-agent=%s' % USER_AGENT) |
|
|
|
|
|
proxy_extension_dir = None |
|
|
if proxy and all(key in proxy for key in ['url', 'username', 'password']): |
|
|
proxy_extension_dir = create_proxy_extension(proxy) |
|
|
options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir)) |
|
|
elif proxy and 'url' in proxy: |
|
|
proxy_url = proxy['url'] |
|
|
logging.debug("Using webdriver proxy: %s", proxy_url) |
|
|
options.add_argument('--proxy-server=%s' % proxy_url) |
|
|
|
|
|
|
|
|
|
|
|
windows_headless = False |
|
|
if get_config_headless(): |
|
|
if os.name == 'nt': |
|
|
windows_headless = True |
|
|
else: |
|
|
start_xvfb_display() |
|
|
|
|
|
|
|
|
|
|
|
options.add_argument("--auto-open-devtools-for-tabs") |
|
|
|
|
|
|
|
|
driver_exe_path = None |
|
|
version_main = None |
|
|
if os.path.exists("/app/chromedriver"): |
|
|
|
|
|
driver_exe_path = "/app/chromedriver" |
|
|
else: |
|
|
version_main = get_chrome_major_version() |
|
|
if PATCHED_DRIVER_PATH is not None: |
|
|
driver_exe_path = PATCHED_DRIVER_PATH |
|
|
|
|
|
|
|
|
browser_executable_path = get_chrome_exe_path() |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path, |
|
|
driver_executable_path=driver_exe_path, version_main=version_main, |
|
|
windows_headless=windows_headless, headless=get_config_headless()) |
|
|
except Exception as e: |
|
|
logging.error("Error starting Chrome: %s" % e) |
|
|
|
|
|
|
|
|
if driver_exe_path is None: |
|
|
PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name) |
|
|
if PATCHED_DRIVER_PATH != driver.patcher.executable_path: |
|
|
shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH) |
|
|
|
|
|
|
|
|
if proxy_extension_dir is not None: |
|
|
shutil.rmtree(proxy_extension_dir) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return driver |
|
|
|
|
|
|
|
|
def get_chrome_exe_path() -> str: |
|
|
global CHROME_EXE_PATH |
|
|
if CHROME_EXE_PATH is not None: |
|
|
return CHROME_EXE_PATH |
|
|
|
|
|
chrome_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome', "chrome") |
|
|
if os.path.exists(chrome_path): |
|
|
if not os.access(chrome_path, os.X_OK): |
|
|
raise Exception(f'Chrome binary "{chrome_path}" is not executable. ' |
|
|
f'Please, extract the archive with "tar xzf <file.tar.gz>".') |
|
|
CHROME_EXE_PATH = chrome_path |
|
|
return CHROME_EXE_PATH |
|
|
|
|
|
chrome_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome', "chrome.exe") |
|
|
if os.path.exists(chrome_path): |
|
|
CHROME_EXE_PATH = chrome_path |
|
|
return CHROME_EXE_PATH |
|
|
|
|
|
CHROME_EXE_PATH = uc.find_chrome_executable() |
|
|
return CHROME_EXE_PATH |
|
|
|
|
|
|
|
|
def get_chrome_major_version() -> str: |
|
|
global CHROME_MAJOR_VERSION |
|
|
if CHROME_MAJOR_VERSION is not None: |
|
|
return CHROME_MAJOR_VERSION |
|
|
|
|
|
if os.name == 'nt': |
|
|
|
|
|
try: |
|
|
complete_version = extract_version_nt_executable(get_chrome_exe_path()) |
|
|
except Exception: |
|
|
try: |
|
|
complete_version = extract_version_nt_registry() |
|
|
except Exception: |
|
|
|
|
|
complete_version = extract_version_nt_folder() |
|
|
else: |
|
|
chrome_path = get_chrome_exe_path() |
|
|
process = os.popen(f'"{chrome_path}" --version') |
|
|
|
|
|
|
|
|
complete_version = process.read() |
|
|
process.close() |
|
|
|
|
|
CHROME_MAJOR_VERSION = complete_version.split('.')[0].split(' ')[-1] |
|
|
return CHROME_MAJOR_VERSION |
|
|
|
|
|
|
|
|
def extract_version_nt_executable(exe_path: str) -> str: |
|
|
import pefile |
|
|
pe = pefile.PE(exe_path, fast_load=True) |
|
|
pe.parse_data_directories( |
|
|
directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_RESOURCE"]] |
|
|
) |
|
|
return pe.FileInfo[0][0].StringTable[0].entries[b"FileVersion"].decode('utf-8') |
|
|
|
|
|
|
|
|
def extract_version_nt_registry() -> str: |
|
|
stream = os.popen( |
|
|
'reg query "HKLM\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Google Chrome"') |
|
|
output = stream.read() |
|
|
google_version = '' |
|
|
for letter in output[output.rindex('DisplayVersion REG_SZ') + 24:]: |
|
|
if letter != '\n': |
|
|
google_version += letter |
|
|
else: |
|
|
break |
|
|
return google_version.strip() |
|
|
|
|
|
|
|
|
def extract_version_nt_folder() -> str: |
|
|
|
|
|
for i in range(2): |
|
|
path = 'C:\\Program Files' + (' (x86)' if i else '') + '\\Google\\Chrome\\Application' |
|
|
if os.path.isdir(path): |
|
|
paths = [f.path for f in os.scandir(path) if f.is_dir()] |
|
|
for path in paths: |
|
|
filename = os.path.basename(path) |
|
|
pattern = '\d+\.\d+\.\d+\.\d+' |
|
|
match = re.search(pattern, filename) |
|
|
if match and match.group(): |
|
|
|
|
|
return match.group(0) |
|
|
return '' |
|
|
|
|
|
|
|
|
def get_user_agent(driver=None) -> str: |
|
|
global USER_AGENT |
|
|
if USER_AGENT is not None: |
|
|
return USER_AGENT |
|
|
|
|
|
try: |
|
|
if driver is None: |
|
|
driver = get_webdriver() |
|
|
USER_AGENT = driver.execute_script("return navigator.userAgent") |
|
|
|
|
|
USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE) |
|
|
return USER_AGENT |
|
|
except Exception as e: |
|
|
raise Exception("Error getting browser User-Agent. " + str(e)) |
|
|
finally: |
|
|
if driver is not None: |
|
|
if PLATFORM_VERSION == "nt": |
|
|
driver.close() |
|
|
driver.quit() |
|
|
|
|
|
|
|
|
def start_xvfb_display(): |
|
|
global XVFB_DISPLAY |
|
|
if XVFB_DISPLAY is None: |
|
|
from xvfbwrapper import Xvfb |
|
|
XVFB_DISPLAY = Xvfb() |
|
|
XVFB_DISPLAY.start() |
|
|
|
|
|
|
|
|
def object_to_dict(_object): |
|
|
json_dict = json.loads(json.dumps(_object, default=lambda o: o.__dict__)) |
|
|
|
|
|
return {k: v for k, v in json_dict.items() if not k.startswith('__')} |
|
|
|