| import asyncio |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| from apscheduler.triggers.cron import CronTrigger |
| from database.supabase_manager import SupabaseManager |
| from services.challenge_agent import ChallengeAgentService |
| from config.settings import settings |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class DailyChallengeScheduler: |
| def __init__(self): |
| self.scheduler = AsyncIOScheduler() |
| self.supabase_manager = SupabaseManager() |
| self.challenge_agent = ChallengeAgentService() |
| |
| async def run_daily_notifications(self): |
| """Run daily challenge notifications for all active users""" |
| try: |
| logger.info("Starting daily challenge notifications...") |
| |
| |
| active_users = await self.supabase_manager.get_active_users() |
| logger.info(f"Found {len(active_users)} active users") |
| |
| for user in active_users: |
| try: |
| user_id = user["id"] |
| email = user["email"] |
| preferences = user["preferences_text"] |
| |
| logger.info(f"Processing user: {email}") |
| |
| |
| result = await self.challenge_agent.run_for_user( |
| user_id=user_id, |
| email=email, |
| preferences=preferences |
| ) |
| |
| |
| await self.supabase_manager.log_agent_run( |
| user_id=user_id, |
| challenges_found=result["challenges_found"], |
| notifications_sent=result["notifications_sent"], |
| status=result["status"], |
| error_message=result.get("error_message") |
| ) |
| |
| logger.info(f"User {email}: processing completed") |
| |
| |
| await asyncio.sleep(2) |
| |
| except Exception as user_error: |
| logger.error(f"Error processing user {user.get('email', 'unknown')}: {user_error}") |
| |
| |
| await self.supabase_manager.log_agent_run( |
| user_id=user.get("id", ""), |
| challenges_found=0, |
| notifications_sent=0, |
| status="failed", |
| error_message=str(user_error) |
| ) |
| |
| logger.info("Daily notifications completed") |
| |
| except Exception as e: |
| logger.error(f"Error in daily notifications job: {e}") |
| |
| def start_scheduler(self): |
| """Start the daily scheduler""" |
| try: |
| |
| self.scheduler.start() |
| |
| |
| cron_parts = settings.DAILY_NOTIFICATION_CRON.split() |
| if len(cron_parts) != 5: |
| logger.error(f"Invalid cron expression: {settings.DAILY_NOTIFICATION_CRON}. Using default.") |
| cron_parts = ["0", "6", "*", "*", "*"] |
| |
| minute, hour, day, month, day_of_week = cron_parts |
| |
| self.scheduler.add_job( |
| self.run_daily_notifications, |
| CronTrigger( |
| minute=minute, |
| hour=hour, |
| day=day, |
| month=month, |
| day_of_week=day_of_week, |
| timezone="UTC" |
| ), |
| id='daily_challenge_notifications', |
| name='Daily Challenge Notifications', |
| replace_existing=True |
| ) |
| |
| logger.info(f"Daily notification job added - cron schedule: {settings.DAILY_NOTIFICATION_CRON} UTC") |
| |
| except Exception as e: |
| logger.error(f"Error starting scheduler: {e}") |
| |
| def stop_scheduler(self): |
| """Stop the scheduler""" |
| if self.scheduler.running: |
| self.scheduler.shutdown() |
| logger.info("Scheduler stopped") |
| |
| async def run_now_for_testing(self): |
| """Run notifications immediately for testing purposes""" |
| logger.info("Running notifications now for testing...") |
| await self.run_daily_notifications() |
|
|