""" Scheduler Service for Automated Tasks Handles periodic tasks like hourly trace synchronization from connected AI observability platforms. """ import asyncio import logging import time import threading from datetime import datetime, timedelta from typing import Dict, Any, Optional logger = logging.getLogger("agent_monitoring_server.scheduler") class SchedulerService: """Service for managing scheduled tasks""" def __init__(self): self.running = False self.scheduler_thread = None self.last_sync_time = None def start(self): """Start the scheduler in a background thread""" if self.running: logger.warning("Scheduler already running") return self.running = True # Start the scheduler in a separate thread self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() logger.info("Scheduler service started - hourly sync enabled") def stop(self): """Stop the scheduler""" self.running = False logger.info("Scheduler service stopped") def _run_scheduler(self): """Run the scheduler loop - disabled for now""" while self.running: try: # Automatic sync disabled - only manual sync available # current_time = datetime.now() # # # Check if we need to sync (hourly) # if (self.last_sync_time is None or # current_time - self.last_sync_time >= timedelta(hours=1)): # # logger.info("Triggering hourly sync") # self._sync_all_platforms() # self.last_sync_time = current_time # Sleep for 5 minutes between checks time.sleep(300) except Exception as e: logger.error(f"Error in scheduler loop: {str(e)}") time.sleep(300) # Wait before retrying def _sync_all_platforms(self): """Sync traces from all connected platforms""" try: # Import here to avoid circular imports from backend.routers.observability import sync_platform_traces_background from backend.database import get_db from backend.database.models import ObservabilityConnection # Get connections from database instead of global variable db = next(get_db()) try: connections = db.query(ObservabilityConnection).filter( ObservabilityConnection.status == "connected" ).all() if not connections: logger.info("No connected platforms to sync") return for connection in connections: platform = connection.platform logger.info(f"Starting hourly sync for {platform} (connection: {connection.connection_id})") try: # Run sync in background - limit to last hour's traces only asyncio.create_task(sync_platform_traces_background(platform, limit=50)) logger.info(f"Triggered sync for {platform}") except Exception as e: logger.error(f"Failed to sync {platform}: {str(e)}") finally: db.close() except Exception as e: logger.error(f"Error in scheduled sync: {str(e)}") def trigger_manual_sync(self, platform: Optional[str] = None): """Manually trigger sync for specific platform or all platforms""" if platform: logger.info(f"Manual sync triggered for {platform}") self._sync_single_platform(platform) else: logger.info("Manual sync triggered for all platforms") self._sync_all_platforms() def _sync_single_platform(self, platform: str): """Sync traces from a specific platform""" try: from backend.routers.observability import sync_platform_traces_background from backend.database import get_db from backend.database.models import ObservabilityConnection # Get connection from database instead of global variable db = next(get_db()) try: connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.platform == platform, ObservabilityConnection.status == "connected" ).first() if not connection: logger.warning(f"No connection found for {platform}") return asyncio.create_task(sync_platform_traces_background(platform, limit=50)) logger.info(f"Manual sync triggered for {platform}") finally: db.close() except Exception as e: logger.error(f"Error in manual sync for {platform}: {str(e)}") # Global scheduler instance scheduler_service = SchedulerService()