Spaces:
Paused
Paused
| """ | |
| Worker management module for changedetection.io | |
| Handles asynchronous workers for dynamic worker scaling. | |
| Sync worker support has been removed in favor of async-only architecture. | |
| """ | |
| import asyncio | |
| import os | |
| import threading | |
| import time | |
| from loguru import logger | |
| # Global worker state | |
| running_async_tasks = [] | |
| async_loop = None | |
| async_loop_thread = None | |
| # Track currently processing UUIDs for async workers | |
| currently_processing_uuids = set() | |
| # Configuration - async workers only | |
| USE_ASYNC_WORKERS = True | |
| def start_async_event_loop(): | |
| """Start a dedicated event loop for async workers in a separate thread""" | |
| global async_loop | |
| logger.info("Starting async event loop for workers") | |
| try: | |
| # Create a new event loop for this thread | |
| async_loop = asyncio.new_event_loop() | |
| # Set it as the event loop for this thread | |
| asyncio.set_event_loop(async_loop) | |
| logger.debug(f"Event loop created and set: {async_loop}") | |
| # Run the event loop forever | |
| async_loop.run_forever() | |
| except Exception as e: | |
| logger.error(f"Async event loop error: {e}") | |
| finally: | |
| # Clean up | |
| if async_loop and not async_loop.is_closed(): | |
| async_loop.close() | |
| async_loop = None | |
| logger.info("Async event loop stopped") | |
| def start_async_workers(n_workers, update_q, notification_q, app, datastore): | |
| """Start the async worker management system""" | |
| global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids | |
| # Clear any stale UUID tracking state | |
| currently_processing_uuids.clear() | |
| # Start the event loop in a separate thread | |
| async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True) | |
| async_loop_thread.start() | |
| # Wait for the loop to be available (with timeout for safety) | |
| max_wait_time = 5.0 | |
| wait_start = time.time() | |
| while async_loop is None and (time.time() - wait_start) < max_wait_time: | |
| time.sleep(0.1) | |
| if async_loop is None: | |
| logger.error("Failed to start async event loop within timeout") | |
| return | |
| # Additional brief wait to ensure loop is running | |
| time.sleep(0.2) | |
| # Start async workers | |
| logger.info(f"Starting {n_workers} async workers") | |
| for i in range(n_workers): | |
| try: | |
| # Use a factory function to create named worker coroutines | |
| def create_named_worker(worker_id): | |
| async def named_worker(): | |
| task = asyncio.current_task() | |
| if task: | |
| task.set_name(f"async-worker-{worker_id}") | |
| return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore) | |
| return named_worker() | |
| task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop) | |
| running_async_tasks.append(task_future) | |
| except RuntimeError as e: | |
| logger.error(f"Failed to start async worker {i}: {e}") | |
| continue | |
| async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore): | |
| """Start a single async worker with auto-restart capability""" | |
| from changedetectionio.async_update_worker import async_update_worker | |
| # Check if we're in pytest environment - if so, be more gentle with logging | |
| import os | |
| in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ | |
| while not app.config.exit.is_set(): | |
| try: | |
| if not in_pytest: | |
| logger.info(f"Starting async worker {worker_id}") | |
| await async_update_worker(worker_id, update_q, notification_q, app, datastore) | |
| # If we reach here, worker exited cleanly | |
| if not in_pytest: | |
| logger.info(f"Async worker {worker_id} exited cleanly") | |
| break | |
| except asyncio.CancelledError: | |
| # Task was cancelled (normal shutdown) | |
| if not in_pytest: | |
| logger.info(f"Async worker {worker_id} cancelled") | |
| break | |
| except Exception as e: | |
| logger.error(f"Async worker {worker_id} crashed: {e}") | |
| if not in_pytest: | |
| logger.info(f"Restarting async worker {worker_id} in 5 seconds...") | |
| await asyncio.sleep(5) | |
| if not in_pytest: | |
| logger.info(f"Async worker {worker_id} shutdown complete") | |
| def start_workers(n_workers, update_q, notification_q, app, datastore): | |
| """Start async workers - sync workers are deprecated""" | |
| start_async_workers(n_workers, update_q, notification_q, app, datastore) | |
| def add_worker(update_q, notification_q, app, datastore): | |
| """Add a new async worker (for dynamic scaling)""" | |
| global running_async_tasks | |
| if not async_loop: | |
| logger.error("Async loop not running, cannot add worker") | |
| return False | |
| worker_id = len(running_async_tasks) | |
| logger.info(f"Adding async worker {worker_id}") | |
| task_future = asyncio.run_coroutine_threadsafe( | |
| start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop | |
| ) | |
| running_async_tasks.append(task_future) | |
| return True | |
| def remove_worker(): | |
| """Remove an async worker (for dynamic scaling)""" | |
| global running_async_tasks | |
| if not running_async_tasks: | |
| return False | |
| # Cancel the last worker | |
| task_future = running_async_tasks.pop() | |
| task_future.cancel() | |
| logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining") | |
| return True | |
| def get_worker_count(): | |
| """Get current number of async workers""" | |
| return len(running_async_tasks) | |
| def get_running_uuids(): | |
| """Get list of UUIDs currently being processed by async workers""" | |
| return list(currently_processing_uuids) | |
| def set_uuid_processing(uuid, processing=True): | |
| """Mark a UUID as being processed or completed""" | |
| global currently_processing_uuids | |
| if processing: | |
| currently_processing_uuids.add(uuid) | |
| logger.debug(f"Started processing UUID: {uuid}") | |
| else: | |
| currently_processing_uuids.discard(uuid) | |
| logger.debug(f"Finished processing UUID: {uuid}") | |
| def is_watch_running(watch_uuid): | |
| """Check if a specific watch is currently being processed""" | |
| return watch_uuid in get_running_uuids() | |
| def queue_item_async_safe(update_q, item): | |
| """Queue an item for async queue processing""" | |
| if async_loop and not async_loop.is_closed(): | |
| try: | |
| # For async queue, schedule the put operation | |
| asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop) | |
| except RuntimeError as e: | |
| logger.error(f"Failed to queue item: {e}") | |
| else: | |
| logger.error("Async loop not available or closed for queueing item") | |
| def shutdown_workers(): | |
| """Shutdown all async workers fast and aggressively""" | |
| global async_loop, async_loop_thread, running_async_tasks | |
| # Check if we're in pytest environment - if so, be more gentle with logging | |
| import os | |
| in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ | |
| if not in_pytest: | |
| logger.info("Fast shutdown of async workers initiated...") | |
| # Cancel all async tasks immediately | |
| for task_future in running_async_tasks: | |
| if not task_future.done(): | |
| task_future.cancel() | |
| # Stop the async event loop immediately | |
| if async_loop and not async_loop.is_closed(): | |
| try: | |
| async_loop.call_soon_threadsafe(async_loop.stop) | |
| except RuntimeError: | |
| # Loop might already be stopped | |
| pass | |
| running_async_tasks.clear() | |
| async_loop = None | |
| # Give async thread minimal time to finish, then continue | |
| if async_loop_thread and async_loop_thread.is_alive(): | |
| async_loop_thread.join(timeout=1.0) # Only 1 second timeout | |
| if async_loop_thread.is_alive() and not in_pytest: | |
| logger.info("Async thread still running after timeout - continuing with shutdown") | |
| async_loop_thread = None | |
| if not in_pytest: | |
| logger.info("Async workers fast shutdown complete") | |
| def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None): | |
| """ | |
| Dynamically adjust the number of async workers. | |
| Args: | |
| new_count: Target number of workers | |
| update_q, notification_q, app, datastore: Required for adding new workers | |
| Returns: | |
| dict: Status of the adjustment operation | |
| """ | |
| global running_async_tasks | |
| current_count = get_worker_count() | |
| if new_count == current_count: | |
| return { | |
| 'status': 'no_change', | |
| 'message': f'Worker count already at {current_count}', | |
| 'current_count': current_count | |
| } | |
| if new_count > current_count: | |
| # Add workers | |
| workers_to_add = new_count - current_count | |
| logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})") | |
| if not all([update_q, notification_q, app, datastore]): | |
| return { | |
| 'status': 'error', | |
| 'message': 'Missing required parameters to add workers', | |
| 'current_count': current_count | |
| } | |
| for i in range(workers_to_add): | |
| worker_id = len(running_async_tasks) | |
| task_future = asyncio.run_coroutine_threadsafe( | |
| start_single_async_worker(worker_id, update_q, notification_q, app, datastore), | |
| async_loop | |
| ) | |
| running_async_tasks.append(task_future) | |
| return { | |
| 'status': 'success', | |
| 'message': f'Added {workers_to_add} workers', | |
| 'previous_count': current_count, | |
| 'current_count': new_count | |
| } | |
| else: | |
| # Remove workers | |
| workers_to_remove = current_count - new_count | |
| logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})") | |
| removed_count = 0 | |
| for _ in range(workers_to_remove): | |
| if running_async_tasks: | |
| task_future = running_async_tasks.pop() | |
| task_future.cancel() | |
| # Wait for the task to actually stop | |
| try: | |
| task_future.result(timeout=5) # 5 second timeout | |
| except Exception: | |
| pass # Task was cancelled, which is expected | |
| removed_count += 1 | |
| return { | |
| 'status': 'success', | |
| 'message': f'Removed {removed_count} workers', | |
| 'previous_count': current_count, | |
| 'current_count': current_count - removed_count | |
| } | |
| def get_worker_status(): | |
| """Get status information about async workers""" | |
| return { | |
| 'worker_type': 'async', | |
| 'worker_count': get_worker_count(), | |
| 'running_uuids': get_running_uuids(), | |
| 'async_loop_running': async_loop is not None, | |
| } | |
| def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None): | |
| """ | |
| Check if the expected number of async workers are running and restart any missing ones. | |
| Args: | |
| expected_count: Expected number of workers | |
| update_q, notification_q, app, datastore: Required for restarting workers | |
| Returns: | |
| dict: Health check results | |
| """ | |
| global running_async_tasks | |
| current_count = get_worker_count() | |
| if current_count == expected_count: | |
| return { | |
| 'status': 'healthy', | |
| 'expected_count': expected_count, | |
| 'actual_count': current_count, | |
| 'message': f'All {expected_count} async workers running' | |
| } | |
| # Check for crashed async workers | |
| dead_workers = [] | |
| alive_count = 0 | |
| for i, task_future in enumerate(running_async_tasks[:]): | |
| if task_future.done(): | |
| try: | |
| result = task_future.result() | |
| dead_workers.append(i) | |
| logger.warning(f"Async worker {i} completed unexpectedly") | |
| except Exception as e: | |
| dead_workers.append(i) | |
| logger.error(f"Async worker {i} crashed: {e}") | |
| else: | |
| alive_count += 1 | |
| # Remove dead workers from tracking | |
| for i in reversed(dead_workers): | |
| if i < len(running_async_tasks): | |
| running_async_tasks.pop(i) | |
| missing_workers = expected_count - alive_count | |
| restarted_count = 0 | |
| if missing_workers > 0 and all([update_q, notification_q, app, datastore]): | |
| logger.info(f"Restarting {missing_workers} crashed async workers") | |
| for i in range(missing_workers): | |
| worker_id = alive_count + i | |
| try: | |
| task_future = asyncio.run_coroutine_threadsafe( | |
| start_single_async_worker(worker_id, update_q, notification_q, app, datastore), | |
| async_loop | |
| ) | |
| running_async_tasks.append(task_future) | |
| restarted_count += 1 | |
| except Exception as e: | |
| logger.error(f"Failed to restart worker {worker_id}: {e}") | |
| return { | |
| 'status': 'repaired' if restarted_count > 0 else 'degraded', | |
| 'expected_count': expected_count, | |
| 'actual_count': alive_count, | |
| 'dead_workers': len(dead_workers), | |
| 'restarted_workers': restarted_count, | |
| 'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}' | |
| } |