| """ |
| Comprehensive Task Scheduler for Crypto API Monitoring |
| Implements scheduled tasks using APScheduler with full compliance tracking |
| """ |
|
|
| import asyncio |
| import time |
| from datetime import datetime, timedelta |
| from typing import Dict, Optional, Callable, Any, List |
| from threading import Lock |
|
|
| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.interval import IntervalTrigger |
| from apscheduler.triggers.cron import CronTrigger |
| from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR |
|
|
| |
| from monitoring.health_checker import HealthChecker |
| from monitoring.rate_limiter import rate_limiter |
| from database.db_manager import db_manager |
| from utils.logger import setup_logger |
| from config import config |
|
|
| |
| logger = setup_logger("scheduler", level="INFO") |
|
|
|
|
| class TaskScheduler: |
| """ |
| Comprehensive task scheduler with compliance tracking |
| Manages all scheduled tasks for the API monitoring system |
| """ |
|
|
| def __init__(self, db_path: str = "data/api_monitor.db"): |
| """ |
| Initialize task scheduler |
| |
| Args: |
| db_path: Path to SQLite database |
| """ |
| self.scheduler = BackgroundScheduler() |
| self.db_path = db_path |
| self.health_checker = HealthChecker(db_path=db_path) |
| self.lock = Lock() |
|
|
| |
| self.expected_run_times: Dict[str, datetime] = {} |
|
|
| |
| self._is_running = False |
|
|
| |
| self.scheduler.add_listener( |
| self._job_executed_listener, |
| EVENT_JOB_EXECUTED | EVENT_JOB_ERROR |
| ) |
|
|
| logger.info("TaskScheduler initialized") |
|
|
| def _job_executed_listener(self, event): |
| """ |
| Listener for job execution events |
| |
| Args: |
| event: APScheduler event object |
| """ |
| job_id = event.job_id |
|
|
| if event.exception: |
| logger.error( |
| f"Job {job_id} raised an exception: {event.exception}", |
| exc_info=True |
| ) |
| else: |
| logger.debug(f"Job {job_id} executed successfully") |
|
|
| def _record_compliance( |
| self, |
| task_name: str, |
| expected_time: datetime, |
| actual_time: datetime, |
| success: bool = True, |
| skip_reason: Optional[str] = None |
| ): |
| """ |
| Record schedule compliance metrics |
| |
| Args: |
| task_name: Name of the scheduled task |
| expected_time: Expected execution time |
| actual_time: Actual execution time |
| success: Whether task succeeded |
| skip_reason: Reason if task was skipped |
| """ |
| try: |
| |
| delay_seconds = int((actual_time - expected_time).total_seconds()) |
| on_time = abs(delay_seconds) <= 5 |
|
|
| |
| |
| provider_id = 1 |
|
|
| |
| |
| |
|
|
| logger.info( |
| f"Schedule compliance - Task: {task_name}, " |
| f"Expected: {expected_time.isoformat()}, " |
| f"Actual: {actual_time.isoformat()}, " |
| f"Delay: {delay_seconds}s, " |
| f"On-time: {on_time}, " |
| f"Skip reason: {skip_reason or 'None'}" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Failed to record compliance for {task_name}: {e}") |
|
|
| def _wrap_task( |
| self, |
| task_name: str, |
| task_func: Callable, |
| *args, |
| **kwargs |
| ): |
| """ |
| Wrapper for scheduled tasks to add logging and compliance tracking |
| |
| Args: |
| task_name: Name of the task |
| task_func: Function to execute |
| *args: Positional arguments for task_func |
| **kwargs: Keyword arguments for task_func |
| """ |
| start_time = datetime.utcnow() |
|
|
| |
| expected_time = self.expected_run_times.get(task_name, start_time) |
|
|
| |
| |
|
|
| logger.info(f"Starting task: {task_name}") |
|
|
| try: |
| |
| result = task_func(*args, **kwargs) |
|
|
| end_time = datetime.utcnow() |
| duration_ms = (end_time - start_time).total_seconds() * 1000 |
|
|
| logger.info( |
| f"Completed task: {task_name} in {duration_ms:.2f}ms" |
| ) |
|
|
| |
| self._record_compliance( |
| task_name=task_name, |
| expected_time=expected_time, |
| actual_time=start_time, |
| success=True |
| ) |
|
|
| return result |
|
|
| except Exception as e: |
| end_time = datetime.utcnow() |
| duration_ms = (end_time - start_time).total_seconds() * 1000 |
|
|
| logger.error( |
| f"Task {task_name} failed after {duration_ms:.2f}ms: {e}", |
| exc_info=True |
| ) |
|
|
| |
| self._record_compliance( |
| task_name=task_name, |
| expected_time=expected_time, |
| actual_time=start_time, |
| success=False, |
| skip_reason=f"Error: {str(e)[:200]}" |
| ) |
|
|
| |
|
|
| |
| |
| |
|
|
| def _health_check_task(self): |
| """ |
| Health check task - runs checks on all providers with staggering |
| """ |
| logger.info("Executing health check task") |
|
|
| try: |
| |
| providers = config.get_all_providers() |
|
|
| |
| async def run_staggered_checks(): |
| results = [] |
| for i, provider in enumerate(providers): |
| |
| if i > 0: |
| await asyncio.sleep(10) |
|
|
| result = await self.health_checker.check_provider(provider.name) |
| if result: |
| results.append(result) |
| logger.info( |
| f"Health check: {provider.name} - {result.status.value} " |
| f"({result.response_time:.2f}ms)" |
| ) |
|
|
| return results |
|
|
| |
| results = asyncio.run(run_staggered_checks()) |
|
|
| logger.info(f"Health check completed: {len(results)} providers checked") |
|
|
| except Exception as e: |
| logger.error(f"Health check task failed: {e}", exc_info=True) |
|
|
| def _market_data_collection_task(self): |
| """ |
| Market data collection task - collects data from market data providers |
| """ |
| logger.info("Executing market data collection task") |
|
|
| try: |
| |
| providers = config.get_providers_by_category('market_data') |
|
|
| logger.info(f"Collecting market data from {len(providers)} providers") |
|
|
| |
| |
| for provider in providers: |
| logger.debug(f"Would collect market data from: {provider.name}") |
|
|
| except Exception as e: |
| logger.error(f"Market data collection failed: {e}", exc_info=True) |
|
|
| def _explorer_data_collection_task(self): |
| """ |
| Explorer data collection task - collects data from blockchain explorers |
| """ |
| logger.info("Executing explorer data collection task") |
|
|
| try: |
| |
| providers = config.get_providers_by_category('blockchain_explorers') |
|
|
| logger.info(f"Collecting explorer data from {len(providers)} providers") |
|
|
| |
| for provider in providers: |
| logger.debug(f"Would collect explorer data from: {provider.name}") |
|
|
| except Exception as e: |
| logger.error(f"Explorer data collection failed: {e}", exc_info=True) |
|
|
| def _news_collection_task(self): |
| """ |
| News collection task - collects news from news providers |
| """ |
| logger.info("Executing news collection task") |
|
|
| try: |
| |
| providers = config.get_providers_by_category('news') |
|
|
| logger.info(f"Collecting news from {len(providers)} providers") |
|
|
| |
| for provider in providers: |
| logger.debug(f"Would collect news from: {provider.name}") |
|
|
| except Exception as e: |
| logger.error(f"News collection failed: {e}", exc_info=True) |
|
|
| def _sentiment_collection_task(self): |
| """ |
| Sentiment collection task - collects sentiment data |
| """ |
| logger.info("Executing sentiment collection task") |
|
|
| try: |
| |
| providers = config.get_providers_by_category('sentiment') |
|
|
| logger.info(f"Collecting sentiment data from {len(providers)} providers") |
|
|
| |
| for provider in providers: |
| logger.debug(f"Would collect sentiment data from: {provider.name}") |
|
|
| except Exception as e: |
| logger.error(f"Sentiment collection failed: {e}", exc_info=True) |
|
|
| def _rate_limit_snapshot_task(self): |
| """ |
| Rate limit snapshot task - captures current rate limit usage |
| """ |
| logger.info("Executing rate limit snapshot task") |
|
|
| try: |
| |
| statuses = rate_limiter.get_all_statuses() |
|
|
| |
| for provider_name, status_data in statuses.items(): |
| if status_data: |
| |
| provider = config.get_provider(provider_name) |
| if provider: |
| |
| db_provider = db_manager.get_provider(name=provider_name) |
| if db_provider: |
| |
| db_manager.save_rate_limit_usage( |
| provider_id=db_provider.id, |
| limit_type=status_data['limit_type'], |
| limit_value=status_data['limit_value'], |
| current_usage=status_data['current_usage'], |
| reset_time=datetime.fromisoformat(status_data['reset_time']) |
| ) |
|
|
| logger.debug( |
| f"Rate limit snapshot: {provider_name} - " |
| f"{status_data['current_usage']}/{status_data['limit_value']} " |
| f"({status_data['percentage']}%)" |
| ) |
|
|
| logger.info(f"Rate limit snapshot completed: {len(statuses)} providers") |
|
|
| except Exception as e: |
| logger.error(f"Rate limit snapshot failed: {e}", exc_info=True) |
|
|
| def _metrics_aggregation_task(self): |
| """ |
| Metrics aggregation task - aggregates system metrics |
| """ |
| logger.info("Executing metrics aggregation task") |
|
|
| try: |
| |
| all_providers = config.get_all_providers() |
| total_providers = len(all_providers) |
|
|
| |
| connection_attempts = db_manager.get_connection_attempts(hours=1, limit=10000) |
|
|
| |
| online_count = 0 |
| degraded_count = 0 |
| offline_count = 0 |
| total_response_time = 0 |
| response_count = 0 |
|
|
| total_requests = len(connection_attempts) |
| total_failures = sum( |
| 1 for attempt in connection_attempts |
| if attempt.status in ['failed', 'timeout'] |
| ) |
|
|
| |
| provider_latest_status = {} |
| for attempt in connection_attempts: |
| if attempt.provider_id not in provider_latest_status: |
| provider_latest_status[attempt.provider_id] = attempt |
|
|
| if attempt.status == 'success': |
| online_count += 1 |
| if attempt.response_time_ms: |
| total_response_time += attempt.response_time_ms |
| response_count += 1 |
| elif attempt.status == 'timeout': |
| offline_count += 1 |
| else: |
| degraded_count += 1 |
|
|
| |
| avg_response_time = ( |
| total_response_time / response_count |
| if response_count > 0 |
| else 0 |
| ) |
|
|
| |
| online_percentage = (online_count / total_providers * 100) if total_providers > 0 else 0 |
|
|
| if online_percentage >= 80: |
| system_health = "healthy" |
| elif online_percentage >= 50: |
| system_health = "degraded" |
| else: |
| system_health = "critical" |
|
|
| |
| db_manager.save_system_metrics( |
| total_providers=total_providers, |
| online_count=online_count, |
| degraded_count=degraded_count, |
| offline_count=offline_count, |
| avg_response_time_ms=avg_response_time, |
| total_requests_hour=total_requests, |
| total_failures_hour=total_failures, |
| system_health=system_health |
| ) |
|
|
| logger.info( |
| f"Metrics aggregation completed - " |
| f"Health: {system_health}, " |
| f"Online: {online_count}/{total_providers}, " |
| f"Avg Response: {avg_response_time:.2f}ms" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Metrics aggregation failed: {e}", exc_info=True) |
|
|
| def _database_cleanup_task(self): |
| """ |
| Database cleanup task - removes old records (>30 days) |
| """ |
| logger.info("Executing database cleanup task") |
|
|
| try: |
| |
| deleted_counts = db_manager.cleanup_old_data(days=30) |
|
|
| total_deleted = sum(deleted_counts.values()) |
|
|
| logger.info( |
| f"Database cleanup completed - Deleted {total_deleted} old records" |
| ) |
|
|
| |
| for table, count in deleted_counts.items(): |
| if count > 0: |
| logger.info(f" {table}: {count} records deleted") |
|
|
| except Exception as e: |
| logger.error(f"Database cleanup failed: {e}", exc_info=True) |
|
|
| |
| |
| |
|
|
| def start(self): |
| """ |
| Start all scheduled tasks |
| """ |
| if self._is_running: |
| logger.warning("Scheduler is already running") |
| return |
|
|
| logger.info("Starting task scheduler...") |
|
|
| try: |
| |
| now = datetime.utcnow() |
|
|
| |
| self.expected_run_times['health_checks'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('health_checks', self._health_check_task), |
| trigger=IntervalTrigger(minutes=5), |
| id='health_checks', |
| name='Health Checks (Staggered)', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Health checks every 5 minutes") |
|
|
| |
| self.expected_run_times['market_data'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('market_data', self._market_data_collection_task), |
| trigger=IntervalTrigger(minutes=1), |
| id='market_data', |
| name='Market Data Collection', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Market data collection every 1 minute") |
|
|
| |
| self.expected_run_times['explorer_data'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('explorer_data', self._explorer_data_collection_task), |
| trigger=IntervalTrigger(minutes=5), |
| id='explorer_data', |
| name='Explorer Data Collection', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Explorer data collection every 5 minutes") |
|
|
| |
| self.expected_run_times['news_collection'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('news_collection', self._news_collection_task), |
| trigger=IntervalTrigger(minutes=10), |
| id='news_collection', |
| name='News Collection', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: News collection every 10 minutes") |
|
|
| |
| self.expected_run_times['sentiment_collection'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('sentiment_collection', self._sentiment_collection_task), |
| trigger=IntervalTrigger(minutes=15), |
| id='sentiment_collection', |
| name='Sentiment Collection', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Sentiment collection every 15 minutes") |
|
|
| |
| self.expected_run_times['rate_limit_snapshot'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('rate_limit_snapshot', self._rate_limit_snapshot_task), |
| trigger=IntervalTrigger(minutes=1), |
| id='rate_limit_snapshot', |
| name='Rate Limit Snapshot', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Rate limit snapshot every 1 minute") |
|
|
| |
| self.expected_run_times['metrics_aggregation'] = now |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('metrics_aggregation', self._metrics_aggregation_task), |
| trigger=IntervalTrigger(minutes=5), |
| id='metrics_aggregation', |
| name='Metrics Aggregation', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Metrics aggregation every 5 minutes") |
|
|
| |
| self.expected_run_times['database_cleanup'] = now.replace(hour=3, minute=0, second=0) |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task('database_cleanup', self._database_cleanup_task), |
| trigger=CronTrigger(hour=3, minute=0), |
| id='database_cleanup', |
| name='Database Cleanup (Daily 3 AM)', |
| replace_existing=True, |
| max_instances=1 |
| ) |
| logger.info("Scheduled: Database cleanup daily at 3 AM") |
|
|
| |
| self.scheduler.start() |
| self._is_running = True |
|
|
| logger.info("Task scheduler started successfully") |
|
|
| |
| jobs = self.scheduler.get_jobs() |
| logger.info(f"Active scheduled jobs: {len(jobs)}") |
| for job in jobs: |
| logger.info(f" - {job.name} (ID: {job.id}) - Next run: {job.next_run_time}") |
|
|
| except Exception as e: |
| logger.error(f"Failed to start scheduler: {e}", exc_info=True) |
| raise |
|
|
| def stop(self): |
| """ |
| Stop scheduler gracefully |
| """ |
| if not self._is_running: |
| logger.warning("Scheduler is not running") |
| return |
|
|
| logger.info("Stopping task scheduler...") |
|
|
| try: |
| |
| self.scheduler.shutdown(wait=True) |
| self._is_running = False |
|
|
| |
| asyncio.run(self.health_checker.close()) |
|
|
| logger.info("Task scheduler stopped successfully") |
|
|
| except Exception as e: |
| logger.error(f"Error stopping scheduler: {e}", exc_info=True) |
|
|
| def add_job( |
| self, |
| job_id: str, |
| job_name: str, |
| job_func: Callable, |
| trigger_type: str = 'interval', |
| **trigger_kwargs |
| ) -> bool: |
| """ |
| Add a custom scheduled job |
| |
| Args: |
| job_id: Unique job identifier |
| job_name: Human-readable job name |
| job_func: Function to execute |
| trigger_type: Type of trigger ('interval' or 'cron') |
| **trigger_kwargs: Trigger-specific parameters |
| |
| Returns: |
| True if successful, False otherwise |
| |
| Examples: |
| # Add interval job |
| scheduler.add_job( |
| 'my_job', 'My Custom Job', my_function, |
| trigger_type='interval', minutes=30 |
| ) |
| |
| # Add cron job |
| scheduler.add_job( |
| 'daily_job', 'Daily Job', daily_function, |
| trigger_type='cron', hour=12, minute=0 |
| ) |
| """ |
| try: |
| |
| if trigger_type == 'interval': |
| trigger = IntervalTrigger(**trigger_kwargs) |
| elif trigger_type == 'cron': |
| trigger = CronTrigger(**trigger_kwargs) |
| else: |
| logger.error(f"Unknown trigger type: {trigger_type}") |
| return False |
|
|
| |
| self.scheduler.add_job( |
| func=lambda: self._wrap_task(job_id, job_func), |
| trigger=trigger, |
| id=job_id, |
| name=job_name, |
| replace_existing=True, |
| max_instances=1 |
| ) |
|
|
| |
| self.expected_run_times[job_id] = datetime.utcnow() |
|
|
| logger.info(f"Added custom job: {job_name} (ID: {job_id})") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to add job {job_id}: {e}", exc_info=True) |
| return False |
|
|
| def remove_job(self, job_id: str) -> bool: |
| """ |
| Remove a scheduled job |
| |
| Args: |
| job_id: Job identifier to remove |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| try: |
| self.scheduler.remove_job(job_id) |
|
|
| |
| if job_id in self.expected_run_times: |
| del self.expected_run_times[job_id] |
|
|
| logger.info(f"Removed job: {job_id}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to remove job {job_id}: {e}", exc_info=True) |
| return False |
|
|
| def trigger_immediate(self, job_id: str) -> bool: |
| """ |
| Trigger immediate execution of a scheduled job |
| |
| Args: |
| job_id: Job identifier to trigger |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| try: |
| job = self.scheduler.get_job(job_id) |
|
|
| if not job: |
| logger.error(f"Job not found: {job_id}") |
| return False |
|
|
| |
| job.modify(next_run_time=datetime.utcnow()) |
|
|
| logger.info(f"Triggered immediate execution of job: {job_id}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to trigger job {job_id}: {e}", exc_info=True) |
| return False |
|
|
| def get_job_status(self, job_id: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Get status of scheduled jobs |
| |
| Args: |
| job_id: Specific job ID, or None for all jobs |
| |
| Returns: |
| Dictionary with job status information |
| """ |
| try: |
| if job_id: |
| job = self.scheduler.get_job(job_id) |
| if not job: |
| return {} |
|
|
| return { |
| 'id': job.id, |
| 'name': job.name, |
| 'next_run': job.next_run_time.isoformat() if job.next_run_time else None, |
| 'trigger': str(job.trigger) |
| } |
| else: |
| |
| jobs = self.scheduler.get_jobs() |
| return { |
| 'total_jobs': len(jobs), |
| 'is_running': self._is_running, |
| 'jobs': [ |
| { |
| 'id': job.id, |
| 'name': job.name, |
| 'next_run': job.next_run_time.isoformat() if job.next_run_time else None, |
| 'trigger': str(job.trigger) |
| } |
| for job in jobs |
| ] |
| } |
|
|
| except Exception as e: |
| logger.error(f"Failed to get job status: {e}", exc_info=True) |
| return {} |
|
|
| def is_running(self) -> bool: |
| """ |
| Check if scheduler is running |
| |
| Returns: |
| True if running, False otherwise |
| """ |
| return self._is_running |
|
|
|
|
| |
| |
| |
|
|
| |
| task_scheduler = TaskScheduler() |
|
|
|
|
| |
| |
| |
|
|
| def start_scheduler(): |
| """Start the global task scheduler""" |
| task_scheduler.start() |
|
|
|
|
| def stop_scheduler(): |
| """Stop the global task scheduler""" |
| task_scheduler.stop() |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("Task Scheduler Module") |
| print("=" * 80) |
|
|
| |
| scheduler = TaskScheduler() |
|
|
| try: |
| |
| scheduler.start() |
|
|
| |
| print("\nScheduler is running. Press Ctrl+C to stop...") |
| print(f"Scheduler status: {scheduler.get_job_status()}") |
|
|
| |
| import time |
| while True: |
| time.sleep(60) |
|
|
| |
| status = scheduler.get_job_status() |
| print(f"\n[{datetime.utcnow().isoformat()}] Active jobs: {status['total_jobs']}") |
| for job in status.get('jobs', []): |
| print(f" - {job['name']}: Next run at {job['next_run']}") |
|
|
| except KeyboardInterrupt: |
| print("\n\nStopping scheduler...") |
| scheduler.stop() |
| print("Scheduler stopped. Goodbye!") |
|
|