|
|
import asyncio |
|
|
from collections.abc import Mapping |
|
|
from collections.abc import Sequence |
|
|
from functools import wraps |
|
|
import os |
|
|
import logging |
|
|
import threading |
|
|
import time |
|
|
import traceback |
|
|
from typing import Any |
|
|
from typing import Awaitable |
|
|
from typing import Callable |
|
|
from typing import List |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
class Structure(dict): |
|
|
""" |
|
|
This is a dict-like object structure, which you should subclass |
|
|
Only properties defined in the class context are used on initialization. |
|
|
|
|
|
See example |
|
|
""" |
|
|
|
|
|
_store = {} |
|
|
|
|
|
def __init__(self, *a, **kw): |
|
|
""" |
|
|
Instantiate a new instance. |
|
|
|
|
|
:param a: |
|
|
:param kw: |
|
|
""" |
|
|
|
|
|
super().__init__() |
|
|
|
|
|
|
|
|
d = dict(*a, **kw) |
|
|
for k, v in d.items(): |
|
|
if isinstance(v, Mapping): |
|
|
self[k] = self.__class__(v) |
|
|
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): |
|
|
self[k] = [self.__class__(i) for i in v] |
|
|
else: |
|
|
self[k] = v |
|
|
super().__setattr__("__dict__", self) |
|
|
|
|
|
def __getattr__(self, item): |
|
|
return getattr(super(), item) |
|
|
|
|
|
def __getitem__(self, item): |
|
|
return super().__getitem__(item) |
|
|
|
|
|
def __setattr__(self, key, value): |
|
|
self.__setitem__(key, value) |
|
|
|
|
|
def __setitem__(self, key, value): |
|
|
super().__setitem__(key, value) |
|
|
|
|
|
def update(self, *a, **kw): |
|
|
super().update(*a, **kw) |
|
|
|
|
|
def __eq__(self, other): |
|
|
return frozenset(other.items()) == frozenset(self.items()) |
|
|
|
|
|
def __hash__(self): |
|
|
return hash(frozenset(self.items())) |
|
|
|
|
|
@classmethod |
|
|
def __init_subclass__(cls, **kwargs): |
|
|
cls._store = {} |
|
|
|
|
|
def _normalize_strings(self): |
|
|
for k, v in self.copy().items(): |
|
|
if isinstance(v, (str)): |
|
|
self[k] = v.strip() |
|
|
|
|
|
|
|
|
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None): |
|
|
def wrapper(func): |
|
|
@wraps(func) |
|
|
def wrapped(*args, **kwargs): |
|
|
def function_reached_timeout(): |
|
|
if on_timeout: |
|
|
on_timeout(func) |
|
|
else: |
|
|
raise TimeoutError("function call timed out") |
|
|
|
|
|
t = threading.Timer(interval=seconds, function=function_reached_timeout) |
|
|
t.start() |
|
|
try: |
|
|
return func(*args, **kwargs) |
|
|
except: |
|
|
t.cancel() |
|
|
raise |
|
|
finally: |
|
|
t.cancel() |
|
|
|
|
|
return wrapped |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
def test(): |
|
|
import sys, os |
|
|
|
|
|
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) |
|
|
import undetected_chromedriver as uc |
|
|
import threading |
|
|
|
|
|
def collector( |
|
|
driver: uc.Chrome, |
|
|
stop_event: threading.Event, |
|
|
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None, |
|
|
listen_events: Sequence = ("browser", "network", "performance"), |
|
|
): |
|
|
def threaded(driver, stop_event, on_event_coro): |
|
|
async def _ensure_service_started(): |
|
|
while ( |
|
|
getattr(driver, "service", False) |
|
|
and getattr(driver.service, "process", False) |
|
|
and driver.service.process.poll() |
|
|
): |
|
|
print("waiting for driver service to come back on") |
|
|
await asyncio.sleep(0.05) |
|
|
|
|
|
|
|
|
async def get_log_lines(typ): |
|
|
await _ensure_service_started() |
|
|
return driver.get_log(typ) |
|
|
|
|
|
async def looper(): |
|
|
while not stop_event.is_set(): |
|
|
log_lines = [] |
|
|
try: |
|
|
for _ in listen_events: |
|
|
try: |
|
|
log_lines += await get_log_lines(_) |
|
|
except: |
|
|
if logging.getLogger().getEffectiveLevel() <= 10: |
|
|
traceback.print_exc() |
|
|
continue |
|
|
if log_lines and on_event_coro: |
|
|
await on_event_coro(log_lines) |
|
|
except Exception as e: |
|
|
if logging.getLogger().getEffectiveLevel() <= 10: |
|
|
traceback.print_exc() |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
loop.run_until_complete(looper()) |
|
|
|
|
|
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro)) |
|
|
t.start() |
|
|
|
|
|
async def on_event(data): |
|
|
print("on_event") |
|
|
print("data:", data) |
|
|
|
|
|
def func_called(fn): |
|
|
def wrapped(*args, **kwargs): |
|
|
print( |
|
|
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs) |
|
|
) |
|
|
while driver.service.process and driver.service.process.poll() is not None: |
|
|
time.sleep(0.1) |
|
|
res = fn(*args, **kwargs) |
|
|
print("func completed! (result: %s)" % res) |
|
|
return res |
|
|
|
|
|
return wrapped |
|
|
|
|
|
logging.basicConfig(level=10) |
|
|
|
|
|
options = uc.ChromeOptions() |
|
|
options.set_capability( |
|
|
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"} |
|
|
) |
|
|
|
|
|
driver = uc.Chrome(version_main=96, options=options) |
|
|
|
|
|
|
|
|
driver.command_executor._request = func_called(driver.command_executor._request) |
|
|
collector_stop = threading.Event() |
|
|
collector(driver, collector_stop, on_event) |
|
|
|
|
|
driver.get("https://nowsecure.nl") |
|
|
|
|
|
time.sleep(10) |
|
|
|
|
|
if os.name == "nt": |
|
|
driver.close() |
|
|
driver.quit() |
|
|
|