| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.cron import CronTrigger |
| from datetime import datetime, timedelta |
| import logging |
| from services.content_service import ContentService |
| from services.linkedin_service import LinkedInService |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| def init_scheduler(scheduler: BackgroundScheduler, supabase_client): |
| """ |
| Initialize the task scheduler with jobs. |
| |
| Args: |
| scheduler (BackgroundScheduler): The scheduler instance |
| supabase_client: Supabase client instance |
| """ |
| |
| scheduler.add_job( |
| func=load_schedules, |
| trigger=CronTrigger(minute='*/5'), |
| id='load_schedules', |
| name='Load schedules from database', |
| args=[scheduler, supabase_client] |
| ) |
| |
| |
| load_schedules(scheduler, supabase_client) |
|
|
| def load_schedules(scheduler: BackgroundScheduler, supabase_client): |
| """ |
| Load schedules from the database and create jobs. |
| |
| Args: |
| scheduler (BackgroundScheduler): The scheduler instance |
| supabase_client: Supabase client instance |
| """ |
| try: |
| logger.info("Loading schedules from database...") |
| |
| |
| response = ( |
| supabase_client |
| .table("Scheduling") |
| .select("*, Social_network(id_utilisateur, token, sub)") |
| .execute() |
| ) |
| |
| schedules = response.data if response.data else [] |
| logger.info(f"Found {len(schedules)} schedules") |
| |
| |
| job_ids = [job.id for job in scheduler.get_jobs() if job.id != 'load_schedules'] |
| for job_id in job_ids: |
| scheduler.remove_job(job_id) |
| logger.info(f"Removed job: {job_id}") |
| |
| |
| for schedule in schedules: |
| try: |
| create_scheduling_jobs(scheduler, schedule, supabase_client) |
| except Exception as e: |
| logger.error(f"Error creating jobs for schedule {schedule.get('id')}: {str(e)}") |
| |
| except Exception as e: |
| logger.error(f"Error loading schedules: {str(e)}") |
|
|
| def create_scheduling_jobs(scheduler: BackgroundScheduler, schedule: dict, supabase_client): |
| """ |
| Create jobs for a specific schedule. |
| |
| Args: |
| scheduler (BackgroundScheduler): The scheduler instance |
| schedule (dict): Schedule data |
| supabase_client: Supabase client instance |
| """ |
| schedule_id = schedule.get('id') |
| schedule_time = schedule.get('schedule_time') |
| adjusted_time = schedule.get('adjusted_time') |
| |
| if not schedule_time or not adjusted_time: |
| logger.warning(f"Invalid schedule format for schedule {schedule_id}") |
| return |
| |
| |
| try: |
| |
| day_name, time_str = schedule_time.split() |
| hour, minute = map(int, time_str.split(':')) |
| |
| |
| adj_day_name, adj_time_str = adjusted_time.split() |
| adj_hour, adj_minute = map(int, adj_time_str.split(':')) |
| |
| |
| day_map = { |
| 'Monday': 'mon', |
| 'Tuesday': 'tue', |
| 'Wednesday': 'wed', |
| 'Thursday': 'thu', |
| 'Friday': 'fri', |
| 'Saturday': 'sat', |
| 'Sunday': 'sun' |
| } |
| |
| if day_name not in day_map or adj_day_name not in day_map: |
| logger.warning(f"Invalid day name in schedule {schedule_id}") |
| return |
| |
| day_cron = day_map[day_name] |
| adj_day_cron = day_map[adj_day_name] |
| |
| |
| gen_job_id = f"gen_{schedule_id}" |
| scheduler.add_job( |
| func=generate_content_job, |
| trigger=CronTrigger( |
| day_of_week=adj_day_cron, |
| hour=adj_hour, |
| minute=adj_minute |
| ), |
| id=gen_job_id, |
| name=f"Generate content for schedule {schedule_id}", |
| args=[schedule, supabase_client] |
| ) |
| logger.info(f"Created content generation job: {gen_job_id}") |
| |
| |
| pub_job_id = f"pub_{schedule_id}" |
| scheduler.add_job( |
| func=publish_post_job, |
| trigger=CronTrigger( |
| day_of_week=day_cron, |
| hour=hour, |
| minute=minute |
| ), |
| id=pub_job_id, |
| name=f"Publish post for schedule {schedule_id}", |
| args=[schedule, supabase_client] |
| ) |
| logger.info(f"Created publishing job: {pub_job_id}") |
| |
| except Exception as e: |
| logger.error(f"Error creating jobs for schedule {schedule_id}: {str(e)}") |
|
|
| def generate_content_job(schedule: dict, supabase_client): |
| """ |
| Job to generate content for a scheduled post. |
| |
| Args: |
| schedule (dict): Schedule data |
| supabase_client: Supabase client instance |
| """ |
| try: |
| schedule_id = schedule.get('id') |
| user_id = schedule.get('Social_network', {}).get('id_utilisateur') |
| |
| if not user_id: |
| logger.warning(f"No user ID found for schedule {schedule_id}") |
| return |
| |
| logger.info(f"Generating content for schedule {schedule_id}") |
| |
| |
| content_service = ContentService() |
| generated_content = content_service.generate_post_content(user_id) |
| |
| |
| social_account_id = schedule.get('id_social') |
| |
| response = ( |
| supabase_client |
| .table("Post_content") |
| .insert({ |
| "social_account_id": social_account_id, |
| "Text_content": generated_content, |
| "is_published": False, |
| "sched": schedule_id |
| }) |
| .execute() |
| ) |
| |
| if response.data: |
| logger.info(f"Content generated and stored for schedule {schedule_id}") |
| else: |
| logger.error(f"Failed to store generated content for schedule {schedule_id}") |
| |
| except Exception as e: |
| logger.error(f"Error in content generation job for schedule {schedule.get('id')}: {str(e)}") |
|
|
| def publish_post_job(schedule: dict, supabase_client): |
| """ |
| Job to publish a scheduled post. |
| |
| Args: |
| schedule (dict): Schedule data |
| supabase_client: Supabase client instance |
| """ |
| try: |
| schedule_id = schedule.get('id') |
| logger.info(f"Publishing post for schedule {schedule_id}") |
| |
| |
| response = ( |
| supabase_client |
| .table("Post_content") |
| .select("*") |
| .eq("sched", schedule_id) |
| .eq("is_published", False) |
| .order("created_at", desc=True) |
| .limit(1) |
| .execute() |
| ) |
| |
| if not response.data: |
| logger.info(f"No unpublished posts found for schedule {schedule_id}") |
| return |
| |
| post = response.data[0] |
| post_id = post.get('id') |
| text_content = post.get('Text_content') |
| image_url = post.get('image_content_url') |
| |
| |
| access_token = schedule.get('Social_network', {}).get('token') |
| user_sub = schedule.get('Social_network', {}).get('sub') |
| |
| if not access_token or not user_sub: |
| logger.error(f"Missing social network credentials for schedule {schedule_id}") |
| return |
| |
| |
| linkedin_service = LinkedInService() |
| publish_response = linkedin_service.publish_post( |
| access_token, user_sub, text_content, image_url |
| ) |
| |
| |
| update_response = ( |
| supabase_client |
| .table("Post_content") |
| .update({"is_published": True}) |
| .eq("id", post_id) |
| .execute() |
| ) |
| |
| logger.info(f"Post published successfully for schedule {schedule_id}") |
| |
| except Exception as e: |
| logger.error(f"Error in publishing job for schedule {schedule.get('id')}: {str(e)}") |