AgentGraph / backend /services /scheduler_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Scheduler Service for Automated Tasks
Handles periodic tasks like hourly trace synchronization from connected AI observability platforms.
"""
import asyncio
import logging
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
logger = logging.getLogger("agent_monitoring_server.scheduler")
class SchedulerService:
"""Service for managing scheduled tasks"""
def __init__(self):
self.running = False
self.scheduler_thread = None
self.last_sync_time = None
def start(self):
"""Start the scheduler in a background thread"""
if self.running:
logger.warning("Scheduler already running")
return
self.running = True
# Start the scheduler in a separate thread
self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread.start()
logger.info("Scheduler service started - hourly sync enabled")
def stop(self):
"""Stop the scheduler"""
self.running = False
logger.info("Scheduler service stopped")
def _run_scheduler(self):
"""Run the scheduler loop - disabled for now"""
while self.running:
try:
# Automatic sync disabled - only manual sync available
# current_time = datetime.now()
#
# # Check if we need to sync (hourly)
# if (self.last_sync_time is None or
# current_time - self.last_sync_time >= timedelta(hours=1)):
#
# logger.info("Triggering hourly sync")
# self._sync_all_platforms()
# self.last_sync_time = current_time
# Sleep for 5 minutes between checks
time.sleep(300)
except Exception as e:
logger.error(f"Error in scheduler loop: {str(e)}")
time.sleep(300) # Wait before retrying
def _sync_all_platforms(self):
"""Sync traces from all connected platforms"""
try:
# Import here to avoid circular imports
from backend.routers.observability import sync_platform_traces_background
from backend.database import get_db
from backend.database.models import ObservabilityConnection
# Get connections from database instead of global variable
db = next(get_db())
try:
connections = db.query(ObservabilityConnection).filter(
ObservabilityConnection.status == "connected"
).all()
if not connections:
logger.info("No connected platforms to sync")
return
for connection in connections:
platform = connection.platform
logger.info(f"Starting hourly sync for {platform} (connection: {connection.connection_id})")
try:
# Run sync in background - limit to last hour's traces only
asyncio.create_task(sync_platform_traces_background(platform, limit=50))
logger.info(f"Triggered sync for {platform}")
except Exception as e:
logger.error(f"Failed to sync {platform}: {str(e)}")
finally:
db.close()
except Exception as e:
logger.error(f"Error in scheduled sync: {str(e)}")
def trigger_manual_sync(self, platform: Optional[str] = None):
"""Manually trigger sync for specific platform or all platforms"""
if platform:
logger.info(f"Manual sync triggered for {platform}")
self._sync_single_platform(platform)
else:
logger.info("Manual sync triggered for all platforms")
self._sync_all_platforms()
def _sync_single_platform(self, platform: str):
"""Sync traces from a specific platform"""
try:
from backend.routers.observability import sync_platform_traces_background
from backend.database import get_db
from backend.database.models import ObservabilityConnection
# Get connection from database instead of global variable
db = next(get_db())
try:
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.platform == platform,
ObservabilityConnection.status == "connected"
).first()
if not connection:
logger.warning(f"No connection found for {platform}")
return
asyncio.create_task(sync_platform_traces_background(platform, limit=50))
logger.info(f"Manual sync triggered for {platform}")
finally:
db.close()
except Exception as e:
logger.error(f"Error in manual sync for {platform}: {str(e)}")
# Global scheduler instance
scheduler_service = SchedulerService()