Spaces:
Sleeping
Sleeping
| """ | |
| Scheduler Service for Automated Tasks | |
| Handles periodic tasks like hourly trace synchronization from connected AI observability platforms. | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| import threading | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional | |
| logger = logging.getLogger("agent_monitoring_server.scheduler") | |
| class SchedulerService: | |
| """Service for managing scheduled tasks""" | |
| def __init__(self): | |
| self.running = False | |
| self.scheduler_thread = None | |
| self.last_sync_time = None | |
| def start(self): | |
| """Start the scheduler in a background thread""" | |
| if self.running: | |
| logger.warning("Scheduler already running") | |
| return | |
| self.running = True | |
| # Start the scheduler in a separate thread | |
| self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) | |
| self.scheduler_thread.start() | |
| logger.info("Scheduler service started - hourly sync enabled") | |
| def stop(self): | |
| """Stop the scheduler""" | |
| self.running = False | |
| logger.info("Scheduler service stopped") | |
| def _run_scheduler(self): | |
| """Run the scheduler loop - disabled for now""" | |
| while self.running: | |
| try: | |
| # Automatic sync disabled - only manual sync available | |
| # current_time = datetime.now() | |
| # | |
| # # Check if we need to sync (hourly) | |
| # if (self.last_sync_time is None or | |
| # current_time - self.last_sync_time >= timedelta(hours=1)): | |
| # | |
| # logger.info("Triggering hourly sync") | |
| # self._sync_all_platforms() | |
| # self.last_sync_time = current_time | |
| # Sleep for 5 minutes between checks | |
| time.sleep(300) | |
| except Exception as e: | |
| logger.error(f"Error in scheduler loop: {str(e)}") | |
| time.sleep(300) # Wait before retrying | |
| def _sync_all_platforms(self): | |
| """Sync traces from all connected platforms""" | |
| try: | |
| # Import here to avoid circular imports | |
| from backend.routers.observability import sync_platform_traces_background | |
| from backend.database import get_db | |
| from backend.database.models import ObservabilityConnection | |
| # Get connections from database instead of global variable | |
| db = next(get_db()) | |
| try: | |
| connections = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.status == "connected" | |
| ).all() | |
| if not connections: | |
| logger.info("No connected platforms to sync") | |
| return | |
| for connection in connections: | |
| platform = connection.platform | |
| logger.info(f"Starting hourly sync for {platform} (connection: {connection.connection_id})") | |
| try: | |
| # Run sync in background - limit to last hour's traces only | |
| asyncio.create_task(sync_platform_traces_background(platform, limit=50)) | |
| logger.info(f"Triggered sync for {platform}") | |
| except Exception as e: | |
| logger.error(f"Failed to sync {platform}: {str(e)}") | |
| finally: | |
| db.close() | |
| except Exception as e: | |
| logger.error(f"Error in scheduled sync: {str(e)}") | |
| def trigger_manual_sync(self, platform: Optional[str] = None): | |
| """Manually trigger sync for specific platform or all platforms""" | |
| if platform: | |
| logger.info(f"Manual sync triggered for {platform}") | |
| self._sync_single_platform(platform) | |
| else: | |
| logger.info("Manual sync triggered for all platforms") | |
| self._sync_all_platforms() | |
| def _sync_single_platform(self, platform: str): | |
| """Sync traces from a specific platform""" | |
| try: | |
| from backend.routers.observability import sync_platform_traces_background | |
| from backend.database import get_db | |
| from backend.database.models import ObservabilityConnection | |
| # Get connection from database instead of global variable | |
| db = next(get_db()) | |
| try: | |
| connection = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.platform == platform, | |
| ObservabilityConnection.status == "connected" | |
| ).first() | |
| if not connection: | |
| logger.warning(f"No connection found for {platform}") | |
| return | |
| asyncio.create_task(sync_platform_traces_background(platform, limit=50)) | |
| logger.info(f"Manual sync triggered for {platform}") | |
| finally: | |
| db.close() | |
| except Exception as e: | |
| logger.error(f"Error in manual sync for {platform}: {str(e)}") | |
| # Global scheduler instance | |
| scheduler_service = SchedulerService() |