|
|
import sys |
|
|
import contextlib |
|
|
import inspect |
|
|
import io |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import ssl |
|
|
import time |
|
|
|
|
|
|
|
|
|
|
|
if sys.version_info >= (3, 10): |
|
|
import importlib.resources as ilr |
|
|
else: |
|
|
import importlib_resources as ilr |
|
|
|
|
|
from urllib.error import URLError |
|
|
from urllib.parse import quote_plus |
|
|
from urllib import request |
|
|
from fake_useragent.log import logger |
|
|
|
|
|
|
|
|
try: |
|
|
from pkg_resources import resource_filename |
|
|
except: |
|
|
pass |
|
|
|
|
|
str_types = (str,) |
|
|
text = str |
|
|
urlopen_args = inspect.getfullargspec(request.urlopen).kwonlyargs |
|
|
urlopen_has_ssl_context = "context" in urlopen_args |
|
|
|
|
|
|
|
|
def get(url, verify_ssl=True): |
|
|
attempt = 0 |
|
|
|
|
|
while True: |
|
|
requestObj = request.Request(url) |
|
|
|
|
|
attempt += 1 |
|
|
|
|
|
try: |
|
|
if urlopen_has_ssl_context: |
|
|
if not verify_ssl: |
|
|
context = ssl._create_unverified_context() |
|
|
else: |
|
|
context = None |
|
|
|
|
|
with contextlib.closing( |
|
|
request.urlopen( |
|
|
requestObj, |
|
|
timeout=settings.HTTP_TIMEOUT, |
|
|
context=context, |
|
|
) |
|
|
) as response: |
|
|
return response.read() |
|
|
else: |
|
|
with contextlib.closing( |
|
|
request.urlopen( |
|
|
requestObj, |
|
|
timeout=settings.HTTP_TIMEOUT, |
|
|
) |
|
|
) as response: |
|
|
return response.read() |
|
|
except (URLError, OSError) as exc: |
|
|
logger.debug( |
|
|
"Error occurred during fetching %s", |
|
|
url, |
|
|
exc_info=exc, |
|
|
) |
|
|
|
|
|
if attempt == settings.HTTP_RETRIES: |
|
|
raise FakeUserAgentError("Maximum amount of retries reached") |
|
|
else: |
|
|
logger.debug( |
|
|
"Sleeping for %s seconds", |
|
|
settings.HTTP_DELAY, |
|
|
) |
|
|
time.sleep(settings.HTTP_DELAY) |
|
|
|
|
|
|
|
|
def get_browser_user_agents_online(browser, verify_ssl=True): |
|
|
""" |
|
|
Retrieve browser user agent strings from website |
|
|
""" |
|
|
html = get( |
|
|
settings.BROWSER_BASE_PAGE.format(browser=quote_plus(browser)), |
|
|
verify_ssl=verify_ssl, |
|
|
) |
|
|
try: |
|
|
html = html.decode("utf-8") |
|
|
except (UnicodeDecodeError, AttributeError): |
|
|
pass |
|
|
html = html.split("<div id='liste'>")[1] |
|
|
html = html.split("</div>")[0] |
|
|
|
|
|
pattern = r"<a href=\'/.*?>(.+?)</a>" |
|
|
browsers_iter = re.finditer(pattern, html, re.UNICODE) |
|
|
|
|
|
browsers = [] |
|
|
|
|
|
for browser in browsers_iter: |
|
|
if "more" in browser.group(1).lower(): |
|
|
continue |
|
|
|
|
|
browsers.append(browser.group(1)) |
|
|
|
|
|
if len(browsers) == settings.BROWSERS_COUNT_LIMIT: |
|
|
break |
|
|
|
|
|
if not browsers: |
|
|
raise FakeUserAgentError( |
|
|
"No browser user-agent strings found for browser: {browser}".format( |
|
|
browser=browser |
|
|
) |
|
|
) |
|
|
|
|
|
return browsers |
|
|
|
|
|
|
|
|
def load(browsers, use_local_file=True, verify_ssl=True): |
|
|
data = {} |
|
|
fetch_online = True |
|
|
if use_local_file: |
|
|
try: |
|
|
json_lines = ( |
|
|
ilr.files("fake_useragent.data").joinpath("browsers.json").read_text() |
|
|
) |
|
|
for line in json_lines.splitlines(): |
|
|
data.update(json.loads(line)) |
|
|
fetch_online = False |
|
|
ret = data |
|
|
except Exception as exc: |
|
|
|
|
|
data = {} |
|
|
logger.warning( |
|
|
"Unable to find local data/json file or could not parse the contents using importlib-resources. Try pkg-resource next.", |
|
|
exc_info=exc, |
|
|
) |
|
|
try: |
|
|
with open( |
|
|
resource_filename("fake_useragent", "data/browsers.json") |
|
|
) as file: |
|
|
json_lines = file.read() |
|
|
for line in json_lines.splitlines(): |
|
|
data.update(json.loads(line)) |
|
|
fetch_online = False |
|
|
ret = data |
|
|
except Exception as exc2: |
|
|
|
|
|
data = {} |
|
|
logger.warning( |
|
|
"Could not find local data/json file or could not parse the contents using pkg-resource. Fallback to external resource.", |
|
|
exc_info=exc2, |
|
|
) |
|
|
|
|
|
|
|
|
if fetch_online: |
|
|
try: |
|
|
|
|
|
for browser_name in browsers: |
|
|
browser_name = browser_name.lower().strip() |
|
|
data[browser_name] = get_browser_user_agents_online( |
|
|
browser_name, |
|
|
verify_ssl=verify_ssl, |
|
|
) |
|
|
except Exception: |
|
|
raise FakeUserAgentError("Could not load data from external website") |
|
|
else: |
|
|
ret = data |
|
|
|
|
|
if not ret: |
|
|
raise FakeUserAgentError("Data dictionary is empty", ret) |
|
|
|
|
|
if not isinstance(ret, dict): |
|
|
raise FakeUserAgentError("Data is not dictionary ", ret) |
|
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
def write(path, data): |
|
|
with open(path, encoding="utf-8", mode="w") as fp: |
|
|
dumped = json.dumps(data) |
|
|
|
|
|
if not isinstance(dumped, text): |
|
|
dumped = dumped.decode("utf-8") |
|
|
|
|
|
fp.write(dumped) |
|
|
|
|
|
|
|
|
def read(path): |
|
|
with open(path, encoding="utf-8") as fp: |
|
|
return json.loads(fp.read()) |
|
|
|
|
|
|
|
|
def exist(path): |
|
|
return os.path.isfile(path) |
|
|
|
|
|
|
|
|
def rm(path): |
|
|
if exist(path): |
|
|
os.remove(path) |
|
|
|
|
|
|
|
|
def update(cache_path, browsers, verify_ssl=True): |
|
|
rm(cache_path) |
|
|
|
|
|
write(cache_path, load(browsers, use_local_file=False, verify_ssl=verify_ssl)) |
|
|
|
|
|
|
|
|
def load_cached(cache_path, browsers, verify_ssl=True): |
|
|
if not exist(cache_path): |
|
|
update(cache_path, browsers, verify_ssl=verify_ssl) |
|
|
|
|
|
return read(cache_path) |
|
|
|
|
|
|
|
|
from fake_useragent import settings |
|
|
from fake_useragent.errors import FakeUserAgentError |
|
|
|