Deusxx1234's picture
Initial deployment to HF Spaces
c84fdae
"""Temporal worker β€” runs all workflows and activities."""
import asyncio
from temporalio.client import Client, Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
from datetime import timedelta
from src.config import get_settings
from src.temporal_workflows.workflows import TrendToPostPublishWorkflow
from src.temporal_workflows.scheduler_workflows import (
TrendFetchCronWorkflow,
AutoPostSchedulerWorkflow,
fetch_trends_activity,
pick_best_unposted_trend,
get_default_agent_version_id,
trigger_post_workflow,
)
from src.temporal_workflows import activities
from loguru import logger
settings = get_settings()
async def _ensure_schedules(client: Client):
"""Create Temporal schedules if they don't already exist."""
# 1. Trend fetch β€” every 2 hours
try:
await client.create_schedule(
"trend-fetch-every-2h",
Schedule(
action=ScheduleActionStartWorkflow(
TrendFetchCronWorkflow.run,
id="trend-fetch-cron",
task_queue=settings.temporal_task_queue,
),
spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=2))]),
),
)
logger.info("Created schedule: trend-fetch-every-2h")
except Exception as e:
if "already exists" in str(e).lower():
logger.info("Schedule trend-fetch-every-2h already exists")
else:
logger.warning(f"Could not create trend-fetch schedule: {e}")
# 2. Auto post β€” every 4 hours (6 posts/day ceiling)
try:
await client.create_schedule(
"auto-post-every-4h",
Schedule(
action=ScheduleActionStartWorkflow(
AutoPostSchedulerWorkflow.run,
args=[settings.post_platform],
id="auto-post-cron",
task_queue=settings.temporal_task_queue,
),
spec=ScheduleSpec(intervals=[ScheduleIntervalSpec(every=timedelta(hours=4))]),
),
)
logger.info("Created schedule: auto-post-every-4h")
except Exception as e:
if "already exists" in str(e).lower():
logger.info("Schedule auto-post-every-4h already exists")
else:
logger.warning(f"Could not create auto-post schedule: {e}")
async def main():
"""Start the Temporal worker and register cron schedules."""
client = await Client.connect(
f"{settings.temporal_host}:{settings.temporal_port}",
namespace=settings.temporal_namespace,
)
logger.info(f"Connected to Temporal at {settings.temporal_host}:{settings.temporal_port}")
# Register cron schedules on startup
await _ensure_schedules(client)
worker = Worker(
client,
task_queue=settings.temporal_task_queue,
workflows=[
TrendToPostPublishWorkflow,
TrendFetchCronWorkflow,
AutoPostSchedulerWorkflow,
],
activities=[
# Core pipeline
activities.fetch_trend_details,
activities.generate_post_content,
activities.generate_images,
activities.moderate_content,
activities.store_media_to_cdn,
activities.save_post_draft,
activities.publish_to_platform,
activities.record_execution_metrics,
# Scheduler
fetch_trends_activity,
pick_best_unposted_trend,
get_default_agent_version_id,
trigger_post_workflow,
],
workflow_runner=UnsandboxedWorkflowRunner(),
max_concurrent_activities=10,
max_concurrent_workflow_tasks=10,
)
logger.info(f"Worker running on task queue: {settings.temporal_task_queue}")
logger.info(f"Auto-approve: {settings.auto_approve} | Platform: {settings.post_platform}")
try:
await worker.run()
except KeyboardInterrupt:
logger.info("Worker interrupted")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())