| """ |
| Enhanced Scheduler Service |
| Manages periodic and real-time data updates with persistence |
| """ |
| import asyncio |
| import logging |
| from typing import Dict, Any, List, Optional, Callable |
| from datetime import datetime, timedelta |
| from dataclasses import dataclass, asdict |
| import json |
| from collections import defaultdict |
| import httpx |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ScheduleTask: |
| """Represents a scheduled task""" |
| api_id: str |
| name: str |
| category: str |
| interval: int |
| update_type: str |
| enabled: bool |
| last_update: Optional[datetime] = None |
| next_update: Optional[datetime] = None |
| last_status: Optional[str] = None |
| last_data: Optional[Dict[str, Any]] = None |
| error_count: int = 0 |
| success_count: int = 0 |
|
|
|
|
| class SchedulerService: |
| """Advanced scheduler for managing API data updates""" |
|
|
| def __init__(self, config_loader, db_manager=None): |
| self.config_loader = config_loader |
| self.db_manager = db_manager |
| self.tasks: Dict[str, ScheduleTask] = {} |
| self.running = False |
| self.periodic_task = None |
| self.realtime_tasks: Dict[str, asyncio.Task] = {} |
| self.data_cache: Dict[str, Any] = {} |
| self.callbacks: Dict[str, List[Callable]] = defaultdict(list) |
|
|
| |
| self._initialize_tasks() |
|
|
| def _initialize_tasks(self): |
| """Initialize schedule tasks from config loader""" |
| apis = self.config_loader.get_all_apis() |
| schedules = self.config_loader.schedules |
|
|
| for api_id, api in apis.items(): |
| schedule = schedules.get(api_id, {}) |
|
|
| task = ScheduleTask( |
| api_id=api_id, |
| name=api.get('name', api_id), |
| category=api.get('category', 'unknown'), |
| interval=schedule.get('interval', 300), |
| update_type=api.get('update_type', 'periodic'), |
| enabled=schedule.get('enabled', True), |
| next_update=datetime.now() |
| ) |
|
|
| self.tasks[api_id] = task |
|
|
| logger.info(f"Initialized {len(self.tasks)} schedule tasks") |
|
|
| async def start(self): |
| """Start the scheduler""" |
| if self.running: |
| logger.warning("Scheduler already running") |
| return |
|
|
| self.running = True |
| logger.info("Starting scheduler...") |
|
|
| |
| self.periodic_task = asyncio.create_task(self._periodic_update_loop()) |
|
|
| |
| await self._start_realtime_tasks() |
|
|
| logger.info("Scheduler started successfully") |
|
|
| async def stop(self): |
| """Stop the scheduler""" |
| if not self.running: |
| return |
|
|
| self.running = False |
| logger.info("Stopping scheduler...") |
|
|
| |
| if self.periodic_task: |
| self.periodic_task.cancel() |
| try: |
| await self.periodic_task |
| except asyncio.CancelledError: |
| pass |
|
|
| |
| for task in self.realtime_tasks.values(): |
| task.cancel() |
|
|
| logger.info("Scheduler stopped") |
|
|
| async def _periodic_update_loop(self): |
| """Main loop for periodic updates""" |
| while self.running: |
| try: |
| |
| due_tasks = self._get_due_tasks() |
|
|
| if due_tasks: |
| logger.info(f"Processing {len(due_tasks)} due tasks") |
|
|
| |
| await asyncio.gather( |
| *[self._execute_task(task) for task in due_tasks], |
| return_exceptions=True |
| ) |
|
|
| |
| await asyncio.sleep(5) |
|
|
| except asyncio.CancelledError: |
| break |
| except Exception as e: |
| logger.error(f"Error in periodic update loop: {e}") |
| await asyncio.sleep(10) |
|
|
| def _get_due_tasks(self) -> List[ScheduleTask]: |
| """Get tasks that are due for update""" |
| now = datetime.now() |
| due_tasks = [] |
|
|
| for task in self.tasks.values(): |
| if not task.enabled: |
| continue |
|
|
| if task.update_type == 'realtime': |
| continue |
|
|
| if task.next_update is None or now >= task.next_update: |
| due_tasks.append(task) |
|
|
| return due_tasks |
|
|
| async def _execute_task(self, task: ScheduleTask): |
| """Execute a single scheduled task""" |
| try: |
| api = self.config_loader.apis.get(task.api_id) |
| if not api: |
| logger.error(f"API not found: {task.api_id}") |
| return |
|
|
| |
| data = await self._fetch_api_data(api) |
|
|
| |
| task.last_update = datetime.now() |
| task.next_update = task.last_update + timedelta(seconds=task.interval) |
| task.last_status = 'success' |
| task.last_data = data |
| task.success_count += 1 |
| task.error_count = 0 |
|
|
| |
| self.data_cache[task.api_id] = { |
| 'data': data, |
| 'timestamp': datetime.now(), |
| 'task': task.name |
| } |
|
|
| |
| if self.db_manager: |
| await self._save_to_database(task, data) |
|
|
| |
| await self._trigger_callbacks(task.api_id, data) |
|
|
| |
| self.config_loader.mark_updated(task.api_id) |
|
|
| logger.info(f"✓ Updated {task.name} ({task.category})") |
|
|
| except Exception as e: |
| logger.error(f"✗ Failed to update {task.name}: {e}") |
| task.last_status = 'failed' |
| task.error_count += 1 |
|
|
| |
| if task.error_count >= 3: |
| task.interval = min(task.interval * 2, 3600) |
| logger.warning(f"Increased interval for {task.name} to {task.interval}s") |
|
|
| async def _fetch_api_data(self, api: Dict[str, Any]) -> Dict[str, Any]: |
| """Fetch data from an API""" |
| base_url = api.get('base_url', '') |
| auth = api.get('auth', {}) |
|
|
| |
| url = base_url |
|
|
| |
| headers = {} |
| params = {} |
|
|
| auth_type = auth.get('type', 'none') |
|
|
| if auth_type == 'apiKey' or auth_type == 'apiKeyHeader': |
| key = auth.get('key') |
| header_name = auth.get('header_name', 'X-API-Key') |
| if key: |
| headers[header_name] = key |
|
|
| elif auth_type == 'apiKeyQuery': |
| key = auth.get('key') |
| param_name = auth.get('param_name', 'apikey') |
| if key: |
| params[param_name] = key |
|
|
| elif auth_type == 'apiKeyPath': |
| key = auth.get('key') |
| param_name = auth.get('param_name', 'API_KEY') |
| if key: |
| url = url.replace(f'{{{param_name}}}', key) |
|
|
| |
| timeout = httpx.Timeout(10.0) |
|
|
| async with httpx.AsyncClient(timeout=timeout) as client: |
| |
| endpoints = api.get('endpoints') |
|
|
| if isinstance(endpoints, dict) and 'health' in endpoints: |
| url = endpoints['health'] |
| elif isinstance(endpoints, str): |
| url = endpoints |
|
|
| |
| if params: |
| url = f"{url}{'&' if '?' in url else '?'}" + '&'.join(f"{k}={v}" for k, v in params.items()) |
|
|
| response = await client.get(url, headers=headers) |
| response.raise_for_status() |
|
|
| return response.json() |
|
|
| async def _save_to_database(self, task: ScheduleTask, data: Dict[str, Any]): |
| """Save task data to database""" |
| if not self.db_manager: |
| return |
|
|
| try: |
| |
| await self.db_manager.save_collection_data( |
| api_id=task.api_id, |
| category=task.category, |
| data=data, |
| timestamp=datetime.now() |
| ) |
| except Exception as e: |
| logger.error(f"Error saving to database: {e}") |
|
|
| async def _trigger_callbacks(self, api_id: str, data: Dict[str, Any]): |
| """Trigger callbacks for API updates""" |
| if api_id in self.callbacks: |
| for callback in self.callbacks[api_id]: |
| try: |
| if asyncio.iscoroutinefunction(callback): |
| await callback(api_id, data) |
| else: |
| callback(api_id, data) |
| except Exception as e: |
| logger.error(f"Error in callback for {api_id}: {e}") |
|
|
| async def _start_realtime_tasks(self): |
| """Start WebSocket connections for real-time APIs""" |
| realtime_apis = self.config_loader.get_realtime_apis() |
|
|
| for api_id, api in realtime_apis.items(): |
| task = self.tasks.get(api_id) |
|
|
| if task and task.enabled: |
| |
| ws_task = asyncio.create_task(self._realtime_task(task, api)) |
| self.realtime_tasks[api_id] = ws_task |
|
|
| logger.info(f"Started {len(self.realtime_tasks)} real-time tasks") |
|
|
| async def _realtime_task(self, task: ScheduleTask, api: Dict[str, Any]): |
| """Handle real-time WebSocket connection""" |
| |
| |
| while self.running: |
| try: |
| |
| |
| |
| |
| |
| |
|
|
| logger.info(f"Real-time task for {task.name} (placeholder)") |
| await asyncio.sleep(60) |
|
|
| except asyncio.CancelledError: |
| break |
| except Exception as e: |
| logger.error(f"Error in real-time task {task.name}: {e}") |
| await asyncio.sleep(30) |
|
|
| async def _handle_realtime_data(self, task: ScheduleTask, data: Dict[str, Any]): |
| """Handle incoming real-time data""" |
| task.last_update = datetime.now() |
| task.last_status = 'success' |
| task.last_data = data |
| task.success_count += 1 |
|
|
| |
| self.data_cache[task.api_id] = { |
| 'data': data, |
| 'timestamp': datetime.now(), |
| 'task': task.name |
| } |
|
|
| |
| if self.db_manager: |
| await self._save_to_database(task, data) |
|
|
| |
| await self._trigger_callbacks(task.api_id, data) |
|
|
| def register_callback(self, api_id: str, callback: Callable): |
| """Register a callback for API updates""" |
| self.callbacks[api_id].append(callback) |
|
|
| def unregister_callback(self, api_id: str, callback: Callable): |
| """Unregister a callback""" |
| if api_id in self.callbacks: |
| self.callbacks[api_id] = [cb for cb in self.callbacks[api_id] if cb != callback] |
|
|
| def update_task_schedule(self, api_id: str, interval: int = None, enabled: bool = None): |
| """Update schedule for a task""" |
| if api_id in self.tasks: |
| task = self.tasks[api_id] |
|
|
| if interval is not None: |
| task.interval = interval |
| self.config_loader.update_schedule(api_id, interval=interval) |
|
|
| if enabled is not None: |
| task.enabled = enabled |
| self.config_loader.update_schedule(api_id, enabled=enabled) |
|
|
| logger.info(f"Updated schedule for {task.name}") |
|
|
| def get_task_status(self, api_id: str) -> Optional[Dict[str, Any]]: |
| """Get status of a specific task""" |
| task = self.tasks.get(api_id) |
|
|
| if not task: |
| return None |
|
|
| return { |
| 'api_id': task.api_id, |
| 'name': task.name, |
| 'category': task.category, |
| 'interval': task.interval, |
| 'update_type': task.update_type, |
| 'enabled': task.enabled, |
| 'last_update': task.last_update.isoformat() if task.last_update else None, |
| 'next_update': task.next_update.isoformat() if task.next_update else None, |
| 'last_status': task.last_status, |
| 'success_count': task.success_count, |
| 'error_count': task.error_count |
| } |
|
|
| def get_all_task_statuses(self) -> Dict[str, Any]: |
| """Get status of all tasks""" |
| return { |
| api_id: self.get_task_status(api_id) |
| for api_id in self.tasks.keys() |
| } |
|
|
| def get_cached_data(self, api_id: str) -> Optional[Dict[str, Any]]: |
| """Get cached data for an API""" |
| return self.data_cache.get(api_id) |
|
|
| def get_all_cached_data(self) -> Dict[str, Any]: |
| """Get all cached data""" |
| return self.data_cache |
|
|
| async def force_update(self, api_id: str) -> bool: |
| """Force an immediate update for an API""" |
| task = self.tasks.get(api_id) |
|
|
| if not task: |
| logger.error(f"Task not found: {api_id}") |
| return False |
|
|
| logger.info(f"Forcing update for {task.name}") |
| await self._execute_task(task) |
|
|
| return task.last_status == 'success' |
|
|
| def export_schedules(self, filepath: str): |
| """Export schedules to JSON""" |
| schedules_data = { |
| api_id: { |
| 'name': task.name, |
| 'category': task.category, |
| 'interval': task.interval, |
| 'update_type': task.update_type, |
| 'enabled': task.enabled, |
| 'last_update': task.last_update.isoformat() if task.last_update else None, |
| 'success_count': task.success_count, |
| 'error_count': task.error_count |
| } |
| for api_id, task in self.tasks.items() |
| } |
|
|
| with open(filepath, 'w') as f: |
| json.dump(schedules_data, f, indent=2) |
|
|
| logger.info(f"Exported schedules to {filepath}") |
|
|
| def import_schedules(self, filepath: str): |
| """Import schedules from JSON""" |
| with open(filepath, 'r') as f: |
| schedules_data = json.load(f) |
|
|
| for api_id, schedule_data in schedules_data.items(): |
| if api_id in self.tasks: |
| task = self.tasks[api_id] |
| task.interval = schedule_data.get('interval', task.interval) |
| task.enabled = schedule_data.get('enabled', task.enabled) |
|
|
| logger.info(f"Imported schedules from {filepath}") |
|
|