Spaces:
Running
Running
| """Temporal worker β runs all workflows and activities.""" | |
| import asyncio | |
| from temporalio.client import Client, Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec | |
| from temporalio.worker import Worker, UnsandboxedWorkflowRunner | |
| from datetime import timedelta | |
| from src.config import get_settings | |
| from src.temporal_workflows.workflows import TrendToPostPublishWorkflow | |
| from src.temporal_workflows.scheduler_workflows import ( | |
| TrendFetchCronWorkflow, | |
| AutoPostSchedulerWorkflow, | |
| fetch_trends_activity, | |
| pick_best_unposted_trend, | |
| get_default_agent_version_id, | |
| trigger_post_workflow, | |
| ) | |
| from src.temporal_workflows import activities | |
| from loguru import logger | |
| settings = get_settings() | |
| async def _ensure_schedules(client: Client): | |
| """Create Temporal schedules if they don't already exist.""" | |
| # 1. Trend fetch β every 2 hours | |
| try: | |
| await client.create_schedule( | |
| "trend-fetch-every-2h", | |
| Schedule( | |
| action=ScheduleActionStartWorkflow( | |
| TrendFetchCronWorkflow.run, | |
| id="trend-fetch-cron", | |
| task_queue=settings.temporal_task_queue, | |
| ), | |
| spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=2))]), | |
| ), | |
| ) | |
| logger.info("Created schedule: trend-fetch-every-2h") | |
| except Exception as e: | |
| if "already exists" in str(e).lower(): | |
| logger.info("Schedule trend-fetch-every-2h already exists") | |
| else: | |
| logger.warning(f"Could not create trend-fetch schedule: {e}") | |
| # 2. Auto post β every 4 hours (6 posts/day ceiling) | |
| try: | |
| await client.create_schedule( | |
| "auto-post-every-4h", | |
| Schedule( | |
| action=ScheduleActionStartWorkflow( | |
| AutoPostSchedulerWorkflow.run, | |
| args=[settings.post_platform], | |
| id="auto-post-cron", | |
| task_queue=settings.temporal_task_queue, | |
| ), | |
| spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=4))]), | |
| ), | |
| ) | |
| logger.info("Created schedule: auto-post-every-4h") | |
| except Exception as e: | |
| if "already exists" in str(e).lower(): | |
| logger.info("Schedule auto-post-every-4h already exists") | |
| else: | |
| logger.warning(f"Could not create auto-post schedule: {e}") | |
| async def main(): | |
| """Start the Temporal worker and register cron schedules.""" | |
| client = await Client.connect( | |
| f"{settings.temporal_host}:{settings.temporal_port}", | |
| namespace=settings.temporal_namespace, | |
| ) | |
| logger.info(f"Connected to Temporal at {settings.temporal_host}:{settings.temporal_port}") | |
| # Register cron schedules on startup | |
| await _ensure_schedules(client) | |
| worker = Worker( | |
| client, | |
| task_queue=settings.temporal_task_queue, | |
| workflows=[ | |
| TrendToPostPublishWorkflow, | |
| TrendFetchCronWorkflow, | |
| AutoPostSchedulerWorkflow, | |
| ], | |
| activities=[ | |
| # Core pipeline | |
| activities.fetch_trend_details, | |
| activities.generate_post_content, | |
| activities.generate_images, | |
| activities.moderate_content, | |
| activities.store_media_to_cdn, | |
| activities.save_post_draft, | |
| activities.publish_to_platform, | |
| activities.record_execution_metrics, | |
| # Scheduler | |
| fetch_trends_activity, | |
| pick_best_unposted_trend, | |
| get_default_agent_version_id, | |
| trigger_post_workflow, | |
| ], | |
| workflow_runner=UnsandboxedWorkflowRunner(), | |
| max_concurrent_activities=10, | |
| max_concurrent_workflow_tasks=10, | |
| ) | |
| logger.info(f"Worker running on task queue: {settings.temporal_task_queue}") | |
| logger.info(f"Auto-approve: {settings.auto_approve} | Platform: {settings.post_platform}") | |
| try: | |
| await worker.run() | |
| except KeyboardInterrupt: | |
| logger.info("Worker interrupted") | |
| finally: | |
| await client.close() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |