| from celery import current_app |
| from celery.schedules import crontab |
| from datetime import datetime |
| import logging |
| from backend.utils.database import init_supabase |
| from backend.config import Config |
| from backend.celery_tasks.scheduler import schedule_content_generation, schedule_post_publishing |
| from backend.celery_config import celery_app |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| def get_supabase_config(): |
| """Get Supabase configuration from environment.""" |
| return { |
| 'SUPABASE_URL': Config.SUPABASE_URL, |
| 'SUPABASE_KEY': Config.SUPABASE_KEY |
| } |
|
|
| def parse_schedule_time(schedule_time): |
| """ |
| Parse schedule time string into crontab format. |
| |
| Args: |
| schedule_time (str): Schedule time in format "Day HH:MM" |
| |
| Returns: |
| dict: Crontab parameters |
| """ |
| try: |
| print(f"[CELERY BEAT] Parsing schedule time: {schedule_time}") |
| day_name, time_str = schedule_time.split() |
| hour, minute = map(int, time_str.split(':')) |
| |
| |
| day_map = { |
| 'Monday': 1, |
| 'Tuesday': 2, |
| 'Wednesday': 3, |
| 'Thursday': 4, |
| 'Friday': 5, |
| 'Saturday': 6, |
| 'Sunday': 0 |
| } |
| |
| day_of_week = day_map.get(day_name, '*') |
| result = { |
| 'minute': minute, |
| 'hour': hour, |
| 'day_of_week': day_of_week |
| } |
| print(f"[CELERY BEAT] Parsed schedule time result: {result}") |
| return result |
| except Exception as e: |
| logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}") |
| |
| return { |
| 'minute': '*', |
| 'hour': '*', |
| 'day_of_week': '*' |
| } |
|
|
| @celery_app.task(bind=True) |
| def load_schedules_task(self): |
| """ |
| Celery task to load schedules from the database and create periodic tasks. |
| This task runs every 5 minutes to check for new or updated schedules. |
| """ |
| try: |
| print(f"[CELERY BEAT] Loading schedules from database at {datetime.now()}...") |
| logger.info("Loading schedules from database...") |
| |
| |
| supabase_config = get_supabase_config() |
| print(f"[CELERY BEAT] Supabase config: URL={supabase_config['SUPABASE_URL'][:50]}...") |
| |
| |
| supabase_client = init_supabase( |
| supabase_config['SUPABASE_URL'], |
| supabase_config['SUPABASE_KEY'] |
| ) |
| |
| |
| print("[CELERY BEAT] Executing database query...") |
| response = ( |
| supabase_client |
| .table("Scheduling") |
| .select("*, Social_network(id_utilisateur, token, sub)") |
| .execute() |
| ) |
| |
| print(f"[CELERY BEAT] Database query response: {type(response)}") |
| print(f"[CELERY BEAT] Response data: {response.data if response.data else 'None'}") |
| |
| schedules = response.data if response.data else [] |
| print(f"[CELERY BEAT] Found {len(schedules)} schedules in database") |
| logger.info(f"Found {len(schedules)} schedules") |
| |
| |
| for i, schedule in enumerate(schedules): |
| print(f"[CELERY BEAT] Schedule {i}: {schedule}") |
| schedule_id = schedule.get('id') |
| schedule_time = schedule.get('schedule_time') |
| adjusted_time = schedule.get('adjusted_time') |
| social_network = schedule.get('Social_network', {}) |
| print(f"[CELERY BEAT] Schedule {schedule_id} - schedule_time: {schedule_time}, adjusted_time: {adjusted_time}, social_network: {social_network}") |
| |
| |
| current_schedule = celery_app.conf.beat_schedule |
| |
| |
| |
| loader_job = current_schedule.get('load-schedules', {}) |
| new_schedule = {'load-schedules': loader_job} |
| |
| |
| for schedule in schedules: |
| try: |
| schedule_id = schedule.get('id') |
| schedule_time = schedule.get('schedule_time') |
| adjusted_time = schedule.get('adjusted_time') |
| |
| print(f"[CELERY BEAT] Processing schedule {schedule_id}: schedule_time={schedule_time}, adjusted_time={adjusted_time}") |
| |
| if not schedule_time or not adjusted_time: |
| logger.warning(f"Invalid schedule format for schedule {schedule_id}") |
| print(f"[CELERY BEAT] WARNING: Invalid schedule format for schedule {schedule_id}") |
| continue |
| |
| |
| content_gen_time = parse_schedule_time(adjusted_time) |
| publish_time = parse_schedule_time(schedule_time) |
| |
| print(f"[CELERY BEAT] Parsed times - Content gen: {content_gen_time}, Publish: {publish_time}") |
| |
| |
| gen_job_id = f"gen_{schedule_id}" |
| task_schedule = crontab( |
| minute=content_gen_time['minute'], |
| hour=content_gen_time['hour'], |
| day_of_week=content_gen_time['day_of_week'] |
| ) |
| print(f"[CELERY BEAT] Creating content task - ID: {gen_job_id}") |
| print(f"[CELERY BEAT] Content task schedule: minute={content_gen_time['minute']}, hour={content_gen_time['hour']}, day_of_week={content_gen_time['day_of_week']}") |
| print(f"[CELERY BEAT] Content task args: {( |
| schedule.get('Social_network', {}).get('id_utilisateur'), |
| schedule_id, |
| supabase_config |
| )}") |
| new_schedule[gen_job_id] = { |
| 'task': 'backend.celery_tasks.content_tasks.generate_content_task', |
| 'schedule': task_schedule, |
| 'args': ( |
| schedule.get('Social_network', {}).get('id_utilisateur'), |
| schedule_id, |
| supabase_config |
| ) |
| } |
| logger.info(f"Created content generation job: {gen_job_id}") |
| print(f"[CELERY BEAT] Created content generation job: {gen_job_id}") |
| |
| |
| pub_job_id = f"pub_{schedule_id}" |
| task_schedule = crontab( |
| minute=publish_time['minute'], |
| hour=publish_time['hour'], |
| day_of_week=publish_time['day_of_week'] |
| ) |
| print(f"[CELERY BEAT] Creating publish task - ID: {pub_job_id}") |
| print(f"[CELERY BEAT] Publish task schedule: minute={publish_time['minute']}, hour={publish_time['hour']}, day_of_week={publish_time['day_of_week']}") |
| print(f"[CELERY BEAT] Publish task args: {( |
| schedule_id, |
| supabase_config |
| )}") |
| new_schedule[pub_job_id] = { |
| 'task': 'backend.celery_tasks.content_tasks.publish_post_task', |
| 'schedule': task_schedule, |
| 'args': ( |
| schedule_id, |
| supabase_config |
| ) |
| } |
| logger.info(f"Created publishing job: {pub_job_id}") |
| print(f"[CELERY BEAT] Created publishing job: {pub_job_id}") |
| |
| except Exception as e: |
| logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}") |
| |
| |
| print(f"[CELERY BEAT] Current schedule keys before update: {list(current_app.conf.beat_schedule.keys())}") |
| print(f"[CELERY BEAT] New schedule keys: {list(new_schedule.keys())}") |
| current_app.conf.beat_schedule = new_schedule |
| print(f"[CELERY BEAT] Successfully updated Celery Beat schedule with {len(new_schedule)} jobs") |
| logger.info("Updated Celery Beat schedule") |
| |
| return { |
| 'status': 'success', |
| 'message': f'Loaded {len(schedules)} schedules', |
| 'schedules_count': len(schedules) |
| } |
| |
| except Exception as e: |
| logger.error(f"Error loading schedules: {str(e)}") |
| return { |
| 'status': 'error', |
| 'message': f'Error loading schedules: {str(e)}' |
| } |