"""Temporal worker — runs all workflows and activities.""" import asyncio from temporalio.client import Client, Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec from temporalio.worker import Worker, UnsandboxedWorkflowRunner from datetime import timedelta from src.config import get_settings from src.temporal_workflows.workflows import TrendToPostPublishWorkflow from src.temporal_workflows.scheduler_workflows import ( TrendFetchCronWorkflow, AutoPostSchedulerWorkflow, fetch_trends_activity, pick_best_unposted_trend, get_default_agent_version_id, trigger_post_workflow, ) from src.temporal_workflows import activities from loguru import logger settings = get_settings() async def _ensure_schedules(client: Client): """Create Temporal schedules if they don't already exist.""" # 1. Trend fetch — every 2 hours try: await client.create_schedule( "trend-fetch-every-2h", Schedule( action=ScheduleActionStartWorkflow( TrendFetchCronWorkflow.run, id="trend-fetch-cron", task_queue=settings.temporal_task_queue, ), spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=2))]), ), ) logger.info("Created schedule: trend-fetch-every-2h") except Exception as e: if "already exists" in str(e).lower(): logger.info("Schedule trend-fetch-every-2h already exists") else: logger.warning(f"Could not create trend-fetch schedule: {e}") # 2. Auto post — every 4 hours (6 posts/day ceiling) try: await client.create_schedule( "auto-post-every-4h", Schedule( action=ScheduleActionStartWorkflow( AutoPostSchedulerWorkflow.run, args=[settings.post_platform], id="auto-post-cron", task_queue=settings.temporal_task_queue, ), spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=4))]), ), ) logger.info("Created schedule: auto-post-every-4h") except Exception as e: if "already exists" in str(e).lower(): logger.info("Schedule auto-post-every-4h already exists") else: logger.warning(f"Could not create auto-post schedule: {e}") async def main(): """Start the Temporal worker and register cron schedules.""" client = await Client.connect( f"{settings.temporal_host}:{settings.temporal_port}", namespace=settings.temporal_namespace, ) logger.info(f"Connected to Temporal at {settings.temporal_host}:{settings.temporal_port}") # Register cron schedules on startup await _ensure_schedules(client) worker = Worker( client, task_queue=settings.temporal_task_queue, workflows=[ TrendToPostPublishWorkflow, TrendFetchCronWorkflow, AutoPostSchedulerWorkflow, ], activities=[ # Core pipeline activities.fetch_trend_details, activities.generate_post_content, activities.generate_images, activities.moderate_content, activities.store_media_to_cdn, activities.save_post_draft, activities.publish_to_platform, activities.record_execution_metrics, # Scheduler fetch_trends_activity, pick_best_unposted_trend, get_default_agent_version_id, trigger_post_workflow, ], workflow_runner=UnsandboxedWorkflowRunner(), max_concurrent_activities=10, max_concurrent_workflow_tasks=10, ) logger.info(f"Worker running on task queue: {settings.temporal_task_queue}") logger.info(f"Auto-approve: {settings.auto_approve} | Platform: {settings.post_platform}") try: await worker.run() except KeyboardInterrupt: logger.info("Worker interrupted") finally: await client.close() if __name__ == "__main__": asyncio.run(main())