| """APS Scheduler service for the Lin application.""" |
|
|
| import logging |
| from datetime import datetime, timedelta |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.cron import CronTrigger |
| from apscheduler.jobstores.memory import MemoryJobStore |
| from apscheduler.executors.pool import ThreadPoolExecutor |
| from backend.services.content_service import ContentService |
| from backend.services.linkedin_service import LinkedInService |
| from backend.utils.database import init_supabase |
| from backend.utils.image_utils import ensure_bytes_format |
| from backend.config import Config |
| from backend.utils.timezone_utils import ( |
| parse_timezone_schedule, |
| get_server_timezone, |
| convert_time_to_timezone, |
| validate_timezone |
| ) |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| class APSchedulerService: |
| """Service for managing APScheduler tasks.""" |
|
|
| def __init__(self, app=None): |
| self.app = app |
| self.scheduler = None |
| self.supabase_client = None |
|
|
| |
| if app is not None: |
| self.init_app(app) |
|
|
| def init_app(self, app): |
| """Initialize the scheduler with the Flask app.""" |
| try: |
| self.app = app |
|
|
| logger.info("π APScheduler starting...") |
|
|
| |
| self.supabase_client = init_supabase( |
| app.config['SUPABASE_URL'], |
| app.config['SUPABASE_KEY'] |
| ) |
|
|
| |
| jobstores = { |
| 'default': MemoryJobStore() |
| } |
|
|
| executors = { |
| 'default': ThreadPoolExecutor(20), |
| } |
|
|
| job_defaults = { |
| 'coalesce': False, |
| 'max_instances': 3 |
| } |
|
|
| |
| self.scheduler = BackgroundScheduler( |
| jobstores=jobstores, |
| executors=executors, |
| job_defaults=job_defaults, |
| timezone='UTC' |
| ) |
|
|
| |
| app.scheduler = self |
|
|
| |
| self.scheduler.start() |
| logger.info("β
APScheduler started successfully") |
|
|
| |
| self.scheduler.add_job( |
| func=self.load_schedules, |
| trigger=CronTrigger(minute='*/5'), |
| id='load_schedules', |
| name='Load schedules from database', |
| replace_existing=True |
| ) |
|
|
| |
| self.load_schedules() |
|
|
| except Exception as e: |
| logger.error(f"β APScheduler initialization failed: {str(e)}") |
| import traceback |
| logger.error(traceback.format_exc()) |
|
|
| def load_schedules(self): |
| """Load schedules from the database and create jobs.""" |
| try: |
| |
| with self.app.app_context(): |
| if not self.supabase_client: |
| logger.error("β Supabase client not initialized") |
| return |
|
|
| |
| response = ( |
| self.supabase_client |
| .table("Scheduling") |
| .select("*, Social_network(id_utilisateur, token, sub)") |
| .execute() |
| ) |
|
|
| schedules = response.data if response.data else [] |
| logger.info(f"π Found {len(schedules)} schedules in database") |
|
|
| |
| jobs_to_remove = [] |
| for job in self.scheduler.get_jobs(): |
| if job.id != 'load_schedules': |
| jobs_to_remove.append(job.id) |
|
|
| for job_id in jobs_to_remove: |
| try: |
| self.scheduler.remove_job(job_id) |
| except Exception as e: |
| logger.warning(f"Failed to remove job {job_id}: {str(e)}") |
|
|
| |
| for schedule in schedules: |
| try: |
| schedule_id = schedule.get('id') |
| schedule_time = schedule.get('schedule_time') |
| adjusted_time = schedule.get('adjusted_time') |
|
|
| if not schedule_time or not adjusted_time: |
| logger.warning(f"β οΈ Invalid schedule format for schedule {schedule_id}") |
| continue |
|
|
| |
| server_timezone = get_server_timezone() |
| schedule_time_part, schedule_timezone = parse_timezone_schedule(schedule_time) |
| adjusted_time_part, adjusted_timezone = parse_timezone_schedule(adjusted_time) |
|
|
| |
| if schedule_timezone and validate_timezone(schedule_timezone): |
| server_schedule_time = convert_time_to_timezone(schedule_time_part, schedule_timezone, server_timezone) |
| server_adjusted_time = convert_time_to_timezone(adjusted_time_part, adjusted_timezone or schedule_timezone, server_timezone) |
| else: |
| |
| server_schedule_time = schedule_time_part |
| server_adjusted_time = adjusted_time_part |
|
|
| |
| content_gen_cron = self._parse_schedule_time(server_adjusted_time) |
| publish_cron = self._parse_schedule_time(server_schedule_time) |
|
|
| |
| gen_job_id = f"gen_{schedule_id}" |
| self.scheduler.add_job( |
| func=self.generate_content_task, |
| trigger=CronTrigger( |
| minute=content_gen_cron['minute'], |
| hour=content_gen_cron['hour'], |
| day_of_week=content_gen_cron['day_of_week'] |
| ), |
| id=gen_job_id, |
| name=f"Content generation for schedule {schedule_id}", |
| args=[schedule.get('Social_network', {}).get('id_utilisateur'), schedule_id], |
| replace_existing=True |
| ) |
|
|
| |
| pub_job_id = f"pub_{schedule_id}" |
| self.scheduler.add_job( |
| func=self.publish_post_task, |
| trigger=CronTrigger( |
| minute=publish_cron['minute'], |
| hour=publish_cron['hour'], |
| day_of_week=publish_cron['day_of_week'] |
| ), |
| id=pub_job_id, |
| name=f"Post publishing for schedule {schedule_id}", |
| args=[schedule_id], |
| replace_existing=True |
| ) |
|
|
| logger.info(f"π
Created schedule jobs for {schedule_id}") |
|
|
| except Exception as e: |
| logger.error(f"β Error creating jobs for schedule {schedule.get('id')}: {str(e)}") |
|
|
| except Exception as e: |
| logger.error(f"β Error loading schedules: {str(e)}") |
|
|
| def _parse_schedule_time(self, schedule_time): |
| """ |
| Parse schedule time string into cron format. |
| |
| Args: |
| schedule_time (str): Schedule time in format "Day HH:MM" |
| |
| Returns: |
| dict: Cron parameters |
| """ |
| try: |
| day_name, time_str = schedule_time.split() |
| hour, minute = map(int, time_str.split(':')) |
|
|
| |
| day_map = { |
| 'Monday': 0, |
| 'Tuesday': 1, |
| 'Wednesday': 2, |
| 'Thursday': 3, |
| 'Friday': 4, |
| 'Saturday': 5, |
| 'Sunday': 6 |
| } |
|
|
| day_of_week = day_map.get(day_name, '*') |
| return { |
| 'minute': minute, |
| 'hour': hour, |
| 'day_of_week': day_of_week |
| } |
| except Exception as e: |
| logger.error(f"Error parsing schedule time {schedule_time}: {str(e)}") |
| |
| return { |
| 'minute': '*', |
| 'hour': '*', |
| 'day_of_week': '*' |
| } |
|
|
| def generate_content_task(self, user_id: str, schedule_id: str): |
| """ |
| APScheduler task to generate content for a scheduled post. |
| |
| Args: |
| user_id (str): User ID |
| schedule_id (str): Schedule ID |
| """ |
| try: |
| logger.info(f"π¨ Generating content for schedule {schedule_id}") |
|
|
| |
| with self.app.app_context(): |
| |
| content_service = ContentService() |
|
|
| |
| generated_result = content_service.generate_post_content(user_id) |
|
|
| |
| |
| if isinstance(generated_result, (tuple, list)) and len(generated_result) >= 1: |
| |
| text_content = generated_result[0] if generated_result[0] is not None else "Generated content will appear here..." |
| |
| image_data = generated_result[1] if len(generated_result) >= 2 and generated_result[1] is not None else None |
|
|
| |
| if not isinstance(text_content, str): |
| text_content = str(text_content) if text_content is not None else "Generated content will appear here..." |
| else: |
| |
| text_content = str(generated_result) if generated_result is not None else "Generated content will appear here..." |
| image_data = None |
|
|
| |
| if isinstance(text_content, (list, tuple)): |
| |
| text_content = str(text_content) |
|
|
| |
| processed_image_data = None |
| if image_data is not None: |
| try: |
| |
| processed_image_data = ensure_bytes_format(image_data) |
| logger.info(f"β
Image data processed for schedule {schedule_id}") |
| except Exception as e: |
| logger.error(f"β Error processing image data for schedule {schedule_id}: {str(e)}") |
| |
| processed_image_data = None |
|
|
| |
| |
| schedule_response = ( |
| self.supabase_client |
| .table("Scheduling") |
| .select("id_social") |
| .eq("id", schedule_id) |
| .execute() |
| ) |
|
|
| if not schedule_response.data: |
| raise Exception(f"Schedule {schedule_id} not found") |
|
|
| social_account_id = schedule_response.data[0]['id_social'] |
|
|
| |
| post_data = { |
| "id_social": social_account_id, |
| "Text_content": text_content, |
| "is_published": False, |
| "sched": schedule_id |
| } |
|
|
| |
| if processed_image_data is not None: |
| post_data["image_content_url"] = processed_image_data |
|
|
| |
| response = ( |
| self.supabase_client |
| .table("Post_content") |
| .insert(post_data) |
| .execute() |
| ) |
|
|
| if response.data: |
| logger.info(f"β
Content generated and stored for schedule {schedule_id}") |
| else: |
| logger.error(f"β Failed to store generated content for schedule {schedule_id}") |
|
|
| except Exception as e: |
| logger.error(f"β Error in content generation task for schedule {schedule_id}: {str(e)}") |
|
|
| def publish_post_task(self, schedule_id: str): |
| """ |
| APScheduler task to publish a scheduled post. |
| |
| Args: |
| schedule_id (str): Schedule ID |
| """ |
| try: |
| logger.info(f"π Publishing post for schedule {schedule_id}") |
|
|
| |
| with self.app.app_context(): |
| |
| response = ( |
| self.supabase_client |
| .table("Post_content") |
| .select("*") |
| .eq("sched", schedule_id) |
| .eq("is_published", False) |
| .order("created_at", desc=True) |
| .limit(1) |
| .execute() |
| ) |
|
|
| if not response.data: |
| logger.info(f"π No unpublished posts found for schedule {schedule_id}") |
| return |
|
|
| post = response.data[0] |
| post_id = post.get('id') |
| text_content = post.get('Text_content') |
| image_url = post.get('image_content_url') |
|
|
| logger.info(f"π Post content to be published: {text_content[:100]}...") |
| logger.info(f"πΌοΈ Image URL: {image_url}") |
|
|
| |
| schedule_response = ( |
| self.supabase_client |
| .table("Scheduling") |
| .select("Social_network(token, sub)") |
| .eq("id", schedule_id) |
| .execute() |
| ) |
|
|
| if not schedule_response.data: |
| logger.error(f"β Schedule {schedule_id} not found in database") |
| return |
|
|
| social_network = schedule_response.data[0].get('Social_network', {}) |
| access_token = social_network.get('token') |
| user_sub = social_network.get('sub') |
|
|
| if not access_token or not user_sub: |
| logger.error(f"β Missing social network credentials for schedule {schedule_id}") |
| return |
|
|
| logger.info(f"π Access token exists: {bool(access_token)}") |
| logger.info(f"π€ User sub exists: {bool(user_sub)}") |
|
|
| |
| |
| from backend.services.linkedin_service import LinkedInService |
| linkedin_service = LinkedInService() |
| logger.info(f"π LinkedIn service initialized successfully") |
|
|
| publish_response = linkedin_service.publish_post( |
| access_token, user_sub, text_content, image_url |
| ) |
|
|
| logger.info(f"β
LinkedIn API response received for schedule {schedule_id}") |
|
|
| |
| update_response = ( |
| self.supabase_client |
| .table("Post_content") |
| .update({"is_published": True}) |
| .eq("id", post_id) |
| .execute() |
| ) |
|
|
| logger.info(f"β
Post status updated to published for schedule {schedule_id}") |
|
|
| except Exception as e: |
| logger.error(f"β Error in publishing task for schedule {schedule_id}: {str(e)}") |
| logger.error(f"Full error traceback: ", exc_info=True) |
|
|
| def trigger_immediate_update(self): |
| """Trigger immediate schedule update.""" |
| try: |
| logger.info("π Triggering immediate schedule update...") |
| self.load_schedules() |
| return True |
| except Exception as e: |
| logger.error(f"β Error triggering immediate schedule update: {str(e)}") |
| return False |
|
|
| def shutdown(self): |
| """Shutdown the scheduler.""" |
| if self.scheduler: |
| self.scheduler.shutdown() |
| logger.info("π APS Scheduler shutdown") |