File size: 6,682 Bytes
046723b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import os
from abc import abstractmethod
from loguru import logger

from changedetectionio.content_fetchers import BrowserStepsStepException


def manage_user_agent(headers, current_ua=''):
    """
    Basic setting of user-agent

    NOTE!!!!!! The service that does the actual Chrome fetching should handle any anti-robot techniques
    THERE ARE MANY WAYS THAT IT CAN BE DETECTED AS A ROBOT!!
    This does not take care of
    - Scraping of 'navigator' (platform, productSub, vendor, oscpu etc etc) browser object (navigator.appVersion) etc
    - TCP/IP fingerprint JA3 etc
    - Graphic rendering fingerprinting
    - Your IP being obviously in a pool of bad actors
    - Too many requests
    - Scraping of SCH-UA browser replies (thanks google!!)
    - Scraping of ServiceWorker, new window calls etc

    See https://filipvitas.medium.com/how-to-set-user-agent-header-with-puppeteer-js-and-not-fail-28c7a02165da
    Puppeteer requests https://github.com/dgtlmoon/pyppeteerstealth

    :param page:
    :param headers:
    :return:
    """
    # Ask it what the user agent is, if its obviously ChromeHeadless, switch it to the default
    ua_in_custom_headers = headers.get('User-Agent')
    if ua_in_custom_headers:
        return ua_in_custom_headers

    if not ua_in_custom_headers and current_ua:
        current_ua = current_ua.replace('HeadlessChrome', 'Chrome')
        return current_ua

    return None


class Fetcher():
    browser_connection_is_custom = None
    browser_connection_url = None
    browser_steps = None
    browser_steps_screenshot_path = None
    content = None
    error = None
    fetcher_description = "No description"
    headers = {}
    instock_data = None
    instock_data_js = ""
    status_code = None
    webdriver_js_execute_code = None
    xpath_data = None
    xpath_element_js = ""

    # Will be needed in the future by the VisualSelector, always get this where possible.
    screenshot = False
    system_http_proxy = os.getenv('HTTP_PROXY')
    system_https_proxy = os.getenv('HTTPS_PROXY')

    # Time ONTOP of the system defined env minimum time
    render_extract_delay = 0

    @abstractmethod
    def get_error(self):
        return self.error

    @abstractmethod
    async def run(self,
            url,
            timeout,
            request_headers,
            request_body,
            request_method,
            ignore_status_codes=False,
            current_include_filters=None,
            is_binary=False,
            empty_pages_are_a_change=False):
        # Should set self.error, self.status_code and self.content
        pass

    @abstractmethod
    def quit(self, watch=None):
        return

    @abstractmethod
    def get_last_status_code(self):
        return self.status_code

    @abstractmethod
    def screenshot_step(self, step_n):
        if self.browser_steps_screenshot_path and not os.path.isdir(self.browser_steps_screenshot_path):
            logger.debug(f"> Creating data dir {self.browser_steps_screenshot_path}")
            os.mkdir(self.browser_steps_screenshot_path)
        return None

    @abstractmethod
    # Return true/false if this checker is ready to run, in the case it needs todo some special config check etc
    def is_ready(self):
        return True

    def get_all_headers(self):
        """
        Get all headers but ensure all keys are lowercase
        :return:
        """
        return {k.lower(): v for k, v in self.headers.items()}

    def browser_steps_get_valid_steps(self):
        if self.browser_steps is not None and len(self.browser_steps):
            valid_steps = list(filter(
                lambda s: (s['operation'] and len(s['operation']) and s['operation'] != 'Choose one'),
                self.browser_steps))

            # Just incase they selected Goto site by accident with older JS
            if valid_steps and valid_steps[0]['operation'] == 'Goto site':
                del(valid_steps[0])

            return valid_steps

        return None

    async def iterate_browser_steps(self, start_url=None):
        from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
        from playwright._impl._errors import TimeoutError, Error
        from changedetectionio.safe_jinja import render as jinja_render
        step_n = 0

        if self.browser_steps is not None and len(self.browser_steps):
            interface = steppable_browser_interface(start_url=start_url)
            interface.page = self.page
            valid_steps = self.browser_steps_get_valid_steps()

            for step in valid_steps:
                step_n += 1
                logger.debug(f">> Iterating check - browser Step n {step_n} - {step['operation']}...")
                await self.screenshot_step("before-" + str(step_n))
                await self.save_step_html("before-" + str(step_n))

                try:
                    optional_value = step['optional_value']
                    selector = step['selector']
                    # Support for jinja2 template in step values, with date module added
                    if '{%' in step['optional_value'] or '{{' in step['optional_value']:
                        optional_value = jinja_render(template_str=step['optional_value'])
                    if '{%' in step['selector'] or '{{' in step['selector']:
                        selector = jinja_render(template_str=step['selector'])

                    await getattr(interface, "call_action")(action_name=step['operation'],
                                                      selector=selector,
                                                      optional_value=optional_value)
                    await self.screenshot_step(step_n)
                    await self.save_step_html(step_n)
                except (Error, TimeoutError) as e:
                    logger.debug(str(e))
                    # Stop processing here
                    raise BrowserStepsStepException(step_n=step_n, original_e=e)

    # It's always good to reset these
    def delete_browser_steps_screenshots(self):
        import glob
        if self.browser_steps_screenshot_path is not None:
            dest = os.path.join(self.browser_steps_screenshot_path, 'step_*.jpeg')
            files = glob.glob(dest)
            for f in files:
                if os.path.isfile(f):
                    os.unlink(f)

    def save_step_html(self, step_n):
        if self.browser_steps_screenshot_path and not os.path.isdir(self.browser_steps_screenshot_path):
            logger.debug(f"> Creating data dir {self.browser_steps_screenshot_path}")
            os.mkdir(self.browser_steps_screenshot_path)
        pass