tc-agent / services /scheduler.py
togitoon's picture
Initial
bf5f290
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from database.supabase_manager import SupabaseManager
from services.challenge_agent import ChallengeAgentService
from config.settings import settings
import logging
logger = logging.getLogger(__name__)
class DailyChallengeScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.supabase_manager = SupabaseManager()
self.challenge_agent = ChallengeAgentService()
async def run_daily_notifications(self):
"""Run daily challenge notifications for all active users"""
try:
logger.info("Starting daily challenge notifications...")
# Get all active users
active_users = await self.supabase_manager.get_active_users()
logger.info(f"Found {len(active_users)} active users")
for user in active_users:
try:
user_id = user["id"]
email = user["email"]
preferences = user["preferences_text"]
logger.info(f"Processing user: {email}")
# Run challenge agent for this user
result = await self.challenge_agent.run_for_user(
user_id=user_id,
email=email,
preferences=preferences
)
# Log the agent run
await self.supabase_manager.log_agent_run(
user_id=user_id,
challenges_found=result["challenges_found"],
notifications_sent=result["notifications_sent"],
status=result["status"],
error_message=result.get("error_message")
)
logger.info(f"User {email}: processing completed")
# Small delay between users to avoid rate limiting
await asyncio.sleep(2)
except Exception as user_error:
logger.error(f"Error processing user {user.get('email', 'unknown')}: {user_error}")
# Log failed agent run
await self.supabase_manager.log_agent_run(
user_id=user.get("id", ""),
challenges_found=0,
notifications_sent=0,
status="failed",
error_message=str(user_error)
)
logger.info("Daily notifications completed")
except Exception as e:
logger.error(f"Error in daily notifications job: {e}")
def start_scheduler(self):
"""Start the daily scheduler"""
try:
# Start the scheduler first
self.scheduler.start()
# Parse the cron expression and add the job
cron_parts = settings.DAILY_NOTIFICATION_CRON.split()
if len(cron_parts) != 5:
logger.error(f"Invalid cron expression: {settings.DAILY_NOTIFICATION_CRON}. Using default.")
cron_parts = ["0", "6", "*", "*", "*"] # Default: 6:00 AM daily
minute, hour, day, month, day_of_week = cron_parts
self.scheduler.add_job(
self.run_daily_notifications,
CronTrigger(
minute=minute,
hour=hour,
day=day,
month=month,
day_of_week=day_of_week,
timezone="UTC"
),
id='daily_challenge_notifications',
name='Daily Challenge Notifications',
replace_existing=True
)
logger.info(f"Daily notification job added - cron schedule: {settings.DAILY_NOTIFICATION_CRON} UTC")
except Exception as e:
logger.error(f"Error starting scheduler: {e}")
def stop_scheduler(self):
"""Stop the scheduler"""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Scheduler stopped")
async def run_now_for_testing(self):
"""Run notifications immediately for testing purposes"""
logger.info("Running notifications now for testing...")
await self.run_daily_notifications()