| """ |
| Comprehensive Scheduler for All Data Sources |
| Schedules and runs data collection from all available sources with configurable intervals |
| """ |
|
|
| import asyncio |
| import json |
| from datetime import datetime, timezone, timedelta |
| from typing import Dict, List, Optional, Any |
| from pathlib import Path |
| from utils.logger import setup_logger |
| from collectors.master_collector import DataSourceCollector |
|
|
| logger = setup_logger("comprehensive_scheduler") |
|
|
|
|
| class ComprehensiveScheduler: |
| """ |
| Comprehensive scheduler that manages data collection from all sources |
| """ |
|
|
| def __init__(self, config_file: Optional[str] = None): |
| """ |
| Initialize the comprehensive scheduler |
| |
| Args: |
| config_file: Path to scheduler configuration file |
| """ |
| self.collector = DataSourceCollector() |
| self.config_file = config_file or "scheduler_config.json" |
| self.config = self._load_config() |
| self.last_run_times: Dict[str, datetime] = {} |
| self.running = False |
| logger.info("Comprehensive Scheduler initialized") |
|
|
| def _load_config(self) -> Dict[str, Any]: |
| """ |
| Load scheduler configuration |
| |
| Returns: |
| Configuration dict |
| """ |
| default_config = { |
| "schedules": { |
| "market_data": { |
| "interval_seconds": 60, |
| "enabled": True |
| }, |
| "blockchain": { |
| "interval_seconds": 300, |
| "enabled": True |
| }, |
| "news": { |
| "interval_seconds": 600, |
| "enabled": True |
| }, |
| "sentiment": { |
| "interval_seconds": 1800, |
| "enabled": True |
| }, |
| "whale_tracking": { |
| "interval_seconds": 300, |
| "enabled": True |
| }, |
| "full_collection": { |
| "interval_seconds": 3600, |
| "enabled": True |
| } |
| }, |
| "max_retries": 3, |
| "retry_delay_seconds": 5, |
| "persist_results": True, |
| "results_directory": "data/collections" |
| } |
|
|
| config_path = Path(self.config_file) |
| if config_path.exists(): |
| try: |
| with open(config_path, 'r') as f: |
| loaded_config = json.load(f) |
| |
| default_config.update(loaded_config) |
| logger.info(f"Loaded scheduler config from {config_path}") |
| except Exception as e: |
| logger.error(f"Error loading config file: {e}, using defaults") |
|
|
| return default_config |
|
|
| def save_config(self): |
| """Save current configuration to file""" |
| try: |
| config_path = Path(self.config_file) |
| config_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| with open(config_path, 'w') as f: |
| json.dump(self.config, f, indent=2) |
|
|
| logger.info(f"Saved scheduler config to {config_path}") |
| except Exception as e: |
| logger.error(f"Error saving config: {e}") |
|
|
| async def _save_results(self, category: str, results: Any): |
| """ |
| Save collection results to file |
| |
| Args: |
| category: Category name |
| results: Results to save |
| """ |
| if not self.config.get("persist_results", True): |
| return |
|
|
| try: |
| results_dir = Path(self.config.get("results_directory", "data/collections")) |
| results_dir.mkdir(parents=True, exist_ok=True) |
|
|
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") |
| filename = results_dir / f"{category}_{timestamp}.json" |
|
|
| with open(filename, 'w') as f: |
| json.dump(results, f, indent=2, default=str) |
|
|
| logger.info(f"Saved {category} results to {filename}") |
| except Exception as e: |
| logger.error(f"Error saving results: {e}") |
|
|
| def should_run(self, category: str) -> bool: |
| """ |
| Check if a category should run based on its schedule |
| |
| Args: |
| category: Category name |
| |
| Returns: |
| True if should run, False otherwise |
| """ |
| schedule = self.config.get("schedules", {}).get(category, {}) |
|
|
| if not schedule.get("enabled", True): |
| return False |
|
|
| interval = schedule.get("interval_seconds", 3600) |
| last_run = self.last_run_times.get(category) |
|
|
| if not last_run: |
| return True |
|
|
| elapsed = (datetime.now(timezone.utc) - last_run).total_seconds() |
| return elapsed >= interval |
|
|
| async def run_category_with_retry(self, category: str) -> Optional[Any]: |
| """ |
| Run a category collection with retry logic |
| |
| Args: |
| category: Category name |
| |
| Returns: |
| Collection results or None if failed |
| """ |
| max_retries = self.config.get("max_retries", 3) |
| retry_delay = self.config.get("retry_delay_seconds", 5) |
|
|
| for attempt in range(max_retries): |
| try: |
| logger.info(f"Running {category} collection (attempt {attempt + 1}/{max_retries})") |
|
|
| if category == "full_collection": |
| results = await self.collector.collect_all_data() |
| else: |
| results = await self.collector.collect_category(category) |
|
|
| self.last_run_times[category] = datetime.now(timezone.utc) |
|
|
| |
| await self._save_results(category, results) |
|
|
| return results |
|
|
| except Exception as e: |
| logger.error(f"Error in {category} collection (attempt {attempt + 1}): {e}") |
|
|
| if attempt < max_retries - 1: |
| logger.info(f"Retrying in {retry_delay} seconds...") |
| await asyncio.sleep(retry_delay) |
| else: |
| logger.error(f"Failed {category} collection after {max_retries} attempts") |
| return None |
|
|
| async def run_cycle(self): |
| """Run one scheduler cycle - check and run due categories""" |
| logger.info("Running scheduler cycle...") |
|
|
| categories = self.config.get("schedules", {}).keys() |
| tasks = [] |
|
|
| for category in categories: |
| if self.should_run(category): |
| logger.info(f"Scheduling {category} collection") |
| task = self.run_category_with_retry(category) |
| tasks.append((category, task)) |
|
|
| if tasks: |
| |
| results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) |
|
|
| for (category, _), result in zip(tasks, results): |
| if isinstance(result, Exception): |
| logger.error(f"{category} collection failed: {str(result)}") |
| else: |
| if result: |
| stats = result.get("statistics", {}) if isinstance(result, dict) else None |
| if stats: |
| logger.info( |
| f"{category} collection complete: " |
| f"{stats.get('successful_sources', 'N/A')}/{stats.get('total_sources', 'N/A')} successful" |
| ) |
| else: |
| logger.info("No collections due in this cycle") |
|
|
| async def run_forever(self, cycle_interval: int = 30): |
| """ |
| Run the scheduler forever with specified cycle interval |
| |
| Args: |
| cycle_interval: Seconds between scheduler cycles |
| """ |
| self.running = True |
| logger.info(f"Starting comprehensive scheduler (cycle interval: {cycle_interval}s)") |
|
|
| try: |
| while self.running: |
| await self.run_cycle() |
|
|
| |
| logger.info(f"Waiting {cycle_interval} seconds until next cycle...") |
| await asyncio.sleep(cycle_interval) |
|
|
| except KeyboardInterrupt: |
| logger.info("Scheduler interrupted by user") |
| except Exception as e: |
| logger.error(f"Scheduler error: {e}") |
| finally: |
| self.running = False |
| logger.info("Scheduler stopped") |
|
|
| def stop(self): |
| """Stop the scheduler""" |
| logger.info("Stopping scheduler...") |
| self.running = False |
|
|
| async def run_once(self, category: Optional[str] = None): |
| """ |
| Run a single collection immediately |
| |
| Args: |
| category: Category to run, or None for full collection |
| """ |
| if category: |
| logger.info(f"Running single {category} collection...") |
| results = await self.run_category_with_retry(category) |
| else: |
| logger.info("Running single full collection...") |
| results = await self.run_category_with_retry("full_collection") |
|
|
| return results |
|
|
| def get_status(self) -> Dict[str, Any]: |
| """ |
| Get scheduler status |
| |
| Returns: |
| Dict with scheduler status information |
| """ |
| now = datetime.now(timezone.utc) |
| status = { |
| "running": self.running, |
| "current_time": now.isoformat(), |
| "schedules": {} |
| } |
|
|
| for category, schedule in self.config.get("schedules", {}).items(): |
| last_run = self.last_run_times.get(category) |
| interval = schedule.get("interval_seconds", 0) |
|
|
| next_run = None |
| if last_run: |
| next_run = last_run + timedelta(seconds=interval) |
|
|
| time_until_next = None |
| if next_run: |
| time_until_next = (next_run - now).total_seconds() |
|
|
| status["schedules"][category] = { |
| "enabled": schedule.get("enabled", True), |
| "interval_seconds": interval, |
| "last_run": last_run.isoformat() if last_run else None, |
| "next_run": next_run.isoformat() if next_run else None, |
| "seconds_until_next": round(time_until_next, 2) if time_until_next else None, |
| "should_run_now": self.should_run(category) |
| } |
|
|
| return status |
|
|
| def update_schedule(self, category: str, interval_seconds: Optional[int] = None, enabled: Optional[bool] = None): |
| """ |
| Update schedule for a category |
| |
| Args: |
| category: Category name |
| interval_seconds: New interval in seconds |
| enabled: Enable/disable the schedule |
| """ |
| if category not in self.config.get("schedules", {}): |
| logger.error(f"Unknown category: {category}") |
| return |
|
|
| if interval_seconds is not None: |
| self.config["schedules"][category]["interval_seconds"] = interval_seconds |
| logger.info(f"Updated {category} interval to {interval_seconds}s") |
|
|
| if enabled is not None: |
| self.config["schedules"][category]["enabled"] = enabled |
| logger.info(f"{'Enabled' if enabled else 'Disabled'} {category} schedule") |
|
|
| self.save_config() |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| scheduler = ComprehensiveScheduler() |
|
|
| |
| print("\n" + "=" * 80) |
| print("COMPREHENSIVE SCHEDULER STATUS") |
| print("=" * 80) |
|
|
| status = scheduler.get_status() |
| print(f"Running: {status['running']}") |
| print(f"Current Time: {status['current_time']}") |
| print("\nSchedules:") |
| print("-" * 80) |
|
|
| for category, sched in status['schedules'].items(): |
| enabled = "✓" if sched['enabled'] else "✗" |
| interval = sched['interval_seconds'] |
| next_run = sched.get('seconds_until_next', 'N/A') |
|
|
| print(f"{enabled} {category:20} | Interval: {interval:6}s | Next in: {next_run}") |
|
|
| print("=" * 80) |
|
|
| |
| print("\nRunning market_data collection once as example...") |
| results = await scheduler.run_once("market_data") |
|
|
| if results: |
| print(f"\nCollected {len(results)} market data sources") |
| successful = sum(1 for r in results if r.get('success', False)) |
| print(f"Successful: {successful}/{len(results)}") |
|
|
| print("\n" + "=" * 80) |
| print("To run scheduler forever, use: scheduler.run_forever()") |
| print("=" * 80) |
|
|
| asyncio.run(main()) |
|
|