| from celery import current_task |
| from backend.services.content_service import ContentService |
| from backend.services.linkedin_service import LinkedInService |
| from backend.utils.database import init_supabase |
| from backend.celery_config import celery_app |
|
|
| |
| import logging |
| logger = logging.getLogger(__name__) |
|
|
| @celery_app.task(bind=True) |
| def generate_content_task(self, user_id: str, schedule_id: str, supabase_client_config: dict): |
| """ |
| Celery task to generate content for a scheduled post. |
| |
| Args: |
| user_id (str): User ID |
| schedule_id (str): Schedule ID |
| supabase_client_config (dict): Supabase client configuration |
| |
| Returns: |
| dict: Result of content generation |
| """ |
| try: |
| print(f"[CONTENT TASK] Starting content generation for schedule {schedule_id}") |
| logger.info(f"Starting content generation for schedule {schedule_id}") |
| |
| |
| self.update_state(state='PROGRESS', meta={'status': 'Generating content...'}) |
| |
| |
| content_service = ContentService() |
| |
| |
| generated_content = content_service.generate_post_content(user_id) |
| |
| |
| from backend.utils.database import init_supabase |
| supabase_client = init_supabase( |
| supabase_client_config['SUPABASE_URL'], |
| supabase_client_config['SUPABASE_KEY'] |
| ) |
| |
| |
| |
| schedule_response = ( |
| supabase_client |
| .table("Scheduling") |
| .select("id_social") |
| .eq("id", schedule_id) |
| .execute() |
| ) |
| |
| if not schedule_response.data: |
| raise Exception(f"Schedule {schedule_id} not found") |
| |
| social_account_id = schedule_response.data[0]['id_social'] |
| |
| |
| response = ( |
| supabase_client |
| .table("Post_content") |
| .insert({ |
| "social_account_id": social_account_id, |
| "Text_content": generated_content, |
| "is_published": False, |
| "sched": schedule_id |
| }) |
| .execute() |
| ) |
| |
| if response.data: |
| logger.info(f"Content generated and stored for schedule {schedule_id}") |
| return { |
| 'status': 'success', |
| 'message': f'Content generated for schedule {schedule_id}', |
| 'post_id': response.data[0]['id'] |
| } |
| else: |
| logger.error(f"Failed to store generated content for schedule {schedule_id}") |
| return { |
| 'status': 'failure', |
| 'message': f'Failed to store generated content for schedule {schedule_id}' |
| } |
| |
| except Exception as e: |
| logger.error(f"Error in content generation task for schedule {schedule_id}: {str(e)}") |
| return { |
| 'status': 'failure', |
| 'message': f'Error in content generation: {str(e)}' |
| } |
|
|
| @celery_app.task(bind=True) |
| def publish_post_task(self, schedule_id: str, supabase_client_config: dict): |
| """ |
| Celery task to publish a scheduled post. |
| |
| Args: |
| schedule_id (str): Schedule ID |
| supabase_client_config (dict): Supabase client configuration |
| |
| Returns: |
| dict: Result of post publishing |
| """ |
| try: |
| print(f"[PUBLISH TASK] Starting post publishing for schedule {schedule_id}") |
| logger.info(f"Starting post publishing for schedule {schedule_id}") |
| |
| |
| self.update_state(state='PROGRESS', meta={'status': 'Publishing post...'}) |
| |
| |
| from backend.utils.database import init_supabase |
| supabase_client = init_supabase( |
| supabase_client_config['SUPABASE_URL'], |
| supabase_client_config['SUPABASE_KEY'] |
| ) |
| |
| |
| response = ( |
| supabase_client |
| .table("Post_content") |
| .select("*") |
| .eq("sched", schedule_id) |
| .eq("is_published", False) |
| .order("created_at", desc=True) |
| .limit(1) |
| .execute() |
| ) |
| |
| if not response.data: |
| logger.info(f"No unpublished posts found for schedule {schedule_id}") |
| return { |
| 'status': 'info', |
| 'message': f'No unpublished posts found for schedule {schedule_id}' |
| } |
| |
| post = response.data[0] |
| post_id = post.get('id') |
| text_content = post.get('Text_content') |
| image_url = post.get('image_content_url') |
| |
| |
| schedule_response = ( |
| supabase_client |
| .table("Scheduling") |
| .select("Social_network(token, sub)") |
| .eq("id", schedule_id) |
| .execute() |
| ) |
| |
| if not schedule_response.data: |
| raise Exception(f"Schedule {schedule_id} not found") |
| |
| social_network = schedule_response.data[0].get('Social_network', {}) |
| access_token = social_network.get('token') |
| user_sub = social_network.get('sub') |
| |
| if not access_token or not user_sub: |
| logger.error(f"Missing social network credentials for schedule {schedule_id}") |
| return { |
| 'status': 'failure', |
| 'message': f'Missing social network credentials for schedule {schedule_id}' |
| } |
| |
| |
| linkedin_service = LinkedInService() |
| publish_response = linkedin_service.publish_post( |
| access_token, user_sub, text_content, image_url |
| ) |
| |
| |
| update_response = ( |
| supabase_client |
| .table("Post_content") |
| .update({"is_published": True}) |
| .eq("id", post_id) |
| .execute() |
| ) |
| |
| logger.info(f"Post published successfully for schedule {schedule_id}") |
| return { |
| 'status': 'success', |
| 'message': f'Post published successfully for schedule {schedule_id}', |
| 'linkedin_response': publish_response |
| } |
| |
| except Exception as e: |
| logger.error(f"Error in publishing task for schedule {schedule_id}: {str(e)}") |
| return { |
| 'status': 'failure', |
| 'message': f'Error in publishing post: {str(e)}' |
| } |
|
|