Spaces:
Paused
Paused
| from .processors.exceptions import ProcessorException | |
| import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions | |
| from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse | |
| from changedetectionio import html_tools | |
| from changedetectionio.flask_app import watch_check_update | |
| import asyncio | |
| import importlib | |
| import os | |
| import time | |
| from loguru import logger | |
| # Async version of update_worker | |
| # Processes jobs from AsyncSignalPriorityQueue instead of threaded queue | |
| async def async_update_worker(worker_id, q, notification_q, app, datastore): | |
| """ | |
| Async worker function that processes watch check jobs from the queue. | |
| Args: | |
| worker_id: Unique identifier for this worker | |
| q: AsyncSignalPriorityQueue containing jobs to process | |
| notification_q: Standard queue for notifications | |
| app: Flask application instance | |
| datastore: Application datastore | |
| """ | |
| # Set a descriptive name for this task | |
| task = asyncio.current_task() | |
| if task: | |
| task.set_name(f"async-worker-{worker_id}") | |
| logger.info(f"Starting async worker {worker_id}") | |
| while not app.config.exit.is_set(): | |
| update_handler = None | |
| watch = None | |
| try: | |
| # Use asyncio wait_for to make queue.get() cancellable | |
| queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0) | |
| except asyncio.TimeoutError: | |
| # No jobs available, continue loop | |
| continue | |
| except Exception as e: | |
| logger.error(f"Worker {worker_id} error getting queue item: {e}") | |
| await asyncio.sleep(0.1) | |
| continue | |
| uuid = queued_item_data.item.get('uuid') | |
| fetch_start_time = round(time.time()) | |
| # Mark this UUID as being processed | |
| from changedetectionio import worker_handler | |
| worker_handler.set_uuid_processing(uuid, processing=True) | |
| try: | |
| if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'): | |
| changed_detected = False | |
| contents = b'' | |
| process_changedetection_results = True | |
| update_obj = {} | |
| # Clear last errors | |
| datastore.data['watching'][uuid]['browser_steps_last_error_step'] = None | |
| datastore.data['watching'][uuid]['last_checked'] = fetch_start_time | |
| watch = datastore.data['watching'].get(uuid) | |
| logger.info(f"Worker {worker_id} processing watch UUID {uuid} Priority {queued_item_data.priority} URL {watch['url']}") | |
| try: | |
| watch_check_update.send(watch_uuid=uuid) | |
| # Processor is what we are using for detecting the "Change" | |
| processor = watch.get('processor', 'text_json_diff') | |
| # Init a new 'difference_detection_processor' | |
| processor_module_name = f"changedetectionio.processors.{processor}.processor" | |
| try: | |
| processor_module = importlib.import_module(processor_module_name) | |
| except ModuleNotFoundError as e: | |
| print(f"Processor module '{processor}' not found.") | |
| raise e | |
| update_handler = processor_module.perform_site_check(datastore=datastore, | |
| watch_uuid=uuid) | |
| # All fetchers are now async, so call directly | |
| await update_handler.call_browser() | |
| # Run change detection (this is synchronous) | |
| changed_detected, update_obj, contents = update_handler.run_changedetection(watch=watch) | |
| except PermissionError as e: | |
| logger.critical(f"File permission error updating file, watch: {uuid}") | |
| logger.critical(str(e)) | |
| process_changedetection_results = False | |
| except ProcessorException as e: | |
| if e.screenshot: | |
| watch.save_screenshot(screenshot=e.screenshot) | |
| if e.xpath_data: | |
| watch.save_xpath_data(data=e.xpath_data) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': e.message}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.ReplyWithContentButNoText as e: | |
| extra_help = "" | |
| if e.has_filters: | |
| has_img = html_tools.include_filters(include_filters='img', | |
| html_content=e.html_content) | |
| if has_img: | |
| extra_help = ", it's possible that the filters you have give an empty result or contain only an image." | |
| else: | |
| extra_help = ", it's possible that the filters were found, but contained no usable text." | |
| datastore.update_watch(uuid=uuid, update_obj={ | |
| 'last_error': f"Got HTML content but no text found (With {e.status_code} reply code){extra_help}" | |
| }) | |
| if e.screenshot: | |
| watch.save_screenshot(screenshot=e.screenshot, as_error=True) | |
| if e.xpath_data: | |
| watch.save_xpath_data(data=e.xpath_data) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.Non200ErrorCodeReceived as e: | |
| if e.status_code == 403: | |
| err_text = "Error - 403 (Access denied) received" | |
| elif e.status_code == 404: | |
| err_text = "Error - 404 (Page not found) received" | |
| elif e.status_code == 407: | |
| err_text = "Error - 407 (Proxy authentication required) received, did you need a username and password for the proxy?" | |
| elif e.status_code == 500: | |
| err_text = "Error - 500 (Internal server error) received from the web site" | |
| else: | |
| extra = ' (Access denied or blocked)' if str(e.status_code).startswith('4') else '' | |
| err_text = f"Error - Request returned a HTTP error code {e.status_code}{extra}" | |
| if e.screenshot: | |
| watch.save_screenshot(screenshot=e.screenshot, as_error=True) | |
| if e.xpath_data: | |
| watch.save_xpath_data(data=e.xpath_data, as_error=True) | |
| if e.page_text: | |
| watch.save_error_text(contents=e.page_text) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) | |
| process_changedetection_results = False | |
| except FilterNotFoundInResponse as e: | |
| if not datastore.data['watching'].get(uuid): | |
| continue | |
| err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary." | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) | |
| # Filter wasnt found, but we should still update the visual selector so that they can have a chance to set it up again | |
| if e.screenshot: | |
| watch.save_screenshot(screenshot=e.screenshot) | |
| if e.xpath_data: | |
| watch.save_xpath_data(data=e.xpath_data) | |
| # Only when enabled, send the notification | |
| if watch.get('filter_failure_notification_send', False): | |
| c = watch.get('consecutive_filter_failures', 0) | |
| c += 1 | |
| # Send notification if we reached the threshold? | |
| threshold = datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) | |
| logger.debug(f"Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}") | |
| if c >= threshold: | |
| if not watch.get('notification_muted'): | |
| logger.debug(f"Sending filter failed notification for {uuid}") | |
| await send_filter_failure_notification(uuid, notification_q, datastore) | |
| c = 0 | |
| logger.debug(f"Reset filter failure count back to zero") | |
| datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) | |
| else: | |
| logger.trace(f"{uuid} - filter_failure_notification_send not enabled, skipping") | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.checksumFromPreviousCheckWasTheSame as e: | |
| # Yes fine, so nothing todo, don't continue to process. | |
| process_changedetection_results = False | |
| changed_detected = False | |
| except content_fetchers_exceptions.BrowserConnectError as e: | |
| datastore.update_watch(uuid=uuid, | |
| update_obj={'last_error': e.msg}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.BrowserFetchTimedOut as e: | |
| datastore.update_watch(uuid=uuid, | |
| update_obj={'last_error': e.msg}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.BrowserStepsStepException as e: | |
| if not datastore.data['watching'].get(uuid): | |
| continue | |
| error_step = e.step_n + 1 | |
| from playwright._impl._errors import TimeoutError, Error | |
| # Generally enough info for TimeoutError (couldnt locate the element after default seconds) | |
| err_text = f"Browser step at position {error_step} could not run, check the watch, add a delay if necessary, view Browser Steps to see screenshot at that step." | |
| if e.original_e.name == "TimeoutError": | |
| # Just the first line is enough, the rest is the stack trace | |
| err_text += " Could not find the target." | |
| else: | |
| # Other Error, more info is good. | |
| err_text += " " + str(e.original_e).splitlines()[0] | |
| logger.debug(f"BrowserSteps exception at step {error_step} {str(e.original_e)}") | |
| datastore.update_watch(uuid=uuid, | |
| update_obj={'last_error': err_text, | |
| 'browser_steps_last_error_step': error_step}) | |
| if watch.get('filter_failure_notification_send', False): | |
| c = watch.get('consecutive_filter_failures', 0) | |
| c += 1 | |
| # Send notification if we reached the threshold? | |
| threshold = datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) | |
| logger.error(f"Step for {uuid} not found, consecutive_filter_failures: {c}") | |
| if threshold > 0 and c >= threshold: | |
| if not watch.get('notification_muted'): | |
| await send_step_failure_notification(watch_uuid=uuid, step_n=e.step_n, notification_q=notification_q, datastore=datastore) | |
| c = 0 | |
| datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.EmptyReply as e: | |
| # Some kind of custom to-str handler in the exception handler that does this? | |
| err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | |
| 'last_check_status': e.status_code}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.ScreenshotUnavailable as e: | |
| err_text = "Screenshot unavailable, page did not render fully in the expected time or page was too long - try increasing 'Wait seconds before extracting text'" | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | |
| 'last_check_status': e.status_code}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.JSActionExceptions as e: | |
| err_text = "Error running JS Actions - Page request - "+e.message | |
| if e.screenshot: | |
| watch.save_screenshot(screenshot=e.screenshot, as_error=True) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | |
| 'last_check_status': e.status_code}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.PageUnloadable as e: | |
| err_text = "Page request from server didnt respond correctly" | |
| if e.message: | |
| err_text = "{} - {}".format(err_text, e.message) | |
| if e.screenshot: | |
| watch.save_screenshot(screenshot=e.screenshot, as_error=True) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | |
| 'last_check_status': e.status_code, | |
| 'has_ldjson_price_data': None}) | |
| process_changedetection_results = False | |
| except content_fetchers_exceptions.BrowserStepsInUnsupportedFetcher as e: | |
| err_text = "This watch has Browser Steps configured and so it cannot run with the 'Basic fast Plaintext/HTTP Client', either remove the Browser Steps or select a Chrome fetcher." | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) | |
| process_changedetection_results = False | |
| logger.error(f"Exception (BrowserStepsInUnsupportedFetcher) reached processing watch UUID: {uuid}") | |
| except Exception as e: | |
| logger.error(f"Worker {worker_id} exception processing watch UUID: {uuid}") | |
| logger.error(str(e)) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': "Exception: " + str(e)}) | |
| process_changedetection_results = False | |
| else: | |
| if not datastore.data['watching'].get(uuid): | |
| continue | |
| update_obj['content-type'] = update_handler.fetcher.get_all_headers().get('content-type', '').lower() | |
| if not watch.get('ignore_status_codes'): | |
| update_obj['consecutive_filter_failures'] = 0 | |
| update_obj['last_error'] = False | |
| cleanup_error_artifacts(uuid, datastore) | |
| if not datastore.data['watching'].get(uuid): | |
| continue | |
| if process_changedetection_results: | |
| # Extract title if needed | |
| if datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']: | |
| if not watch['title'] or not len(watch['title']): | |
| try: | |
| update_obj['title'] = html_tools.extract_element(find='title', html_content=update_handler.fetcher.content) | |
| logger.info(f"UUID: {uuid} Extract <title> updated title to '{update_obj['title']}") | |
| except Exception as e: | |
| logger.warning(f"UUID: {uuid} Extract <title> as watch title was enabled, but couldn't find a <title>.") | |
| try: | |
| datastore.update_watch(uuid=uuid, update_obj=update_obj) | |
| if changed_detected or not watch.history_n: | |
| if update_handler.screenshot: | |
| watch.save_screenshot(screenshot=update_handler.screenshot) | |
| if update_handler.xpath_data: | |
| watch.save_xpath_data(data=update_handler.xpath_data) | |
| # Ensure unique timestamp for history | |
| if watch.newest_history_key and int(fetch_start_time) == int(watch.newest_history_key): | |
| logger.warning(f"Timestamp {fetch_start_time} already exists, waiting 1 seconds") | |
| fetch_start_time += 1 | |
| await asyncio.sleep(1) | |
| watch.save_history_text(contents=contents, | |
| timestamp=int(fetch_start_time), | |
| snapshot_id=update_obj.get('previous_md5', 'none')) | |
| empty_pages_are_a_change = datastore.data['settings']['application'].get('empty_pages_are_a_change', False) | |
| if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change): | |
| watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time)) | |
| # Send notifications on second+ check | |
| if watch.history_n >= 2: | |
| logger.info(f"Change detected in UUID {uuid} - {watch['url']}") | |
| if not watch.get('notification_muted'): | |
| await send_content_changed_notification(uuid, notification_q, datastore) | |
| except Exception as e: | |
| logger.critical(f"Worker {worker_id} exception in process_changedetection_results") | |
| logger.critical(str(e)) | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | |
| # Always record attempt count | |
| count = watch.get('check_count', 0) + 1 | |
| # Record server header | |
| try: | |
| server_header = update_handler.fetcher.headers.get('server', '').strip().lower()[:255] | |
| datastore.update_watch(uuid=uuid, update_obj={'remote_server_reply': server_header}) | |
| except Exception as e: | |
| pass | |
| datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3), | |
| 'check_count': count}) | |
| except Exception as e: | |
| logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}") | |
| logger.error(f"Worker {worker_id} traceback:", exc_info=True) | |
| # Also update the watch with error information | |
| if datastore and uuid in datastore.data['watching']: | |
| datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"}) | |
| finally: | |
| # Always cleanup - this runs whether there was an exception or not | |
| if uuid: | |
| try: | |
| # Mark UUID as no longer being processed | |
| worker_handler.set_uuid_processing(uuid, processing=False) | |
| # Send completion signal | |
| if watch: | |
| #logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}") | |
| watch_check_update.send(watch_uuid=watch['uuid']) | |
| update_handler = None | |
| logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s") | |
| except Exception as cleanup_error: | |
| logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}") | |
| # Brief pause before continuing to avoid tight error loops (only on error) | |
| if 'e' in locals(): | |
| await asyncio.sleep(1.0) | |
| else: | |
| # Small yield for normal completion | |
| await asyncio.sleep(0.01) | |
| # Check if we should exit | |
| if app.config.exit.is_set(): | |
| break | |
| # Check if we're in pytest environment - if so, be more gentle with logging | |
| import sys | |
| in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ | |
| if not in_pytest: | |
| logger.info(f"Worker {worker_id} shutting down") | |
| def cleanup_error_artifacts(uuid, datastore): | |
| """Helper function to clean up error artifacts""" | |
| cleanup_files = ["last-error-screenshot.png", "last-error.txt"] | |
| for f in cleanup_files: | |
| full_path = os.path.join(datastore.datastore_path, uuid, f) | |
| if os.path.isfile(full_path): | |
| os.unlink(full_path) | |
| async def send_content_changed_notification(watch_uuid, notification_q, datastore): | |
| """Helper function to queue notifications using the new notification service""" | |
| try: | |
| from changedetectionio.notification_service import create_notification_service | |
| # Create notification service instance | |
| notification_service = create_notification_service(datastore, notification_q) | |
| notification_service.send_content_changed_notification(watch_uuid) | |
| except Exception as e: | |
| logger.error(f"Error sending notification for {watch_uuid}: {e}") | |
| async def send_filter_failure_notification(watch_uuid, notification_q, datastore): | |
| """Helper function to send filter failure notifications using the new notification service""" | |
| try: | |
| from changedetectionio.notification_service import create_notification_service | |
| # Create notification service instance | |
| notification_service = create_notification_service(datastore, notification_q) | |
| notification_service.send_filter_failure_notification(watch_uuid) | |
| except Exception as e: | |
| logger.error(f"Error sending filter failure notification for {watch_uuid}: {e}") | |
| async def send_step_failure_notification(watch_uuid, step_n, notification_q, datastore): | |
| """Helper function to send step failure notifications using the new notification service""" | |
| try: | |
| from changedetectionio.notification_service import create_notification_service | |
| # Create notification service instance | |
| notification_service = create_notification_service(datastore, notification_q) | |
| notification_service.send_step_failure_notification(watch_uuid, step_n) | |
| except Exception as e: | |
| logger.error(f"Error sending step failure notification for {watch_uuid}: {e}") |