Spaces:
Running
Running
| """Cron scheduler workflows β auto trend fetch + auto post pipeline.""" | |
| import uuid | |
| from datetime import timedelta, datetime | |
| from typing import Optional | |
| from temporalio import workflow, activity | |
| from temporalio.common import RetryPolicy | |
| from loguru import logger | |
| from sqlalchemy import select | |
| from src.utils.database import AsyncSessionLocal | |
| from src.models.database import Trend, Post as PostModel | |
| # ββ Activities ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def fetch_trends_activity() -> int: | |
| """Fetch trends from all free sources and store in DB.""" | |
| from src.services.trend_fetcher import fetch_and_store_trends | |
| return await fetch_and_store_trends() | |
| async def pick_best_unposted_trend() -> Optional[int]: | |
| """ | |
| Pick the highest-scoring trend that hasn't been posted yet today. | |
| Returns trend_id or None if nothing available. | |
| """ | |
| today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) | |
| async with AsyncSessionLocal() as session: | |
| # Subquery: trend_ids already posted today | |
| posted_today = select(PostModel.trend_id).where(PostModel.created_at >= today).scalar_subquery() | |
| result = await session.execute( | |
| select(Trend) | |
| .where( | |
| Trend.status == "pending_generation", | |
| Trend.id.not_in(posted_today), | |
| ) | |
| .order_by(Trend.score.desc()) | |
| .limit(1) | |
| ) | |
| trend = result.scalar_one_or_none() | |
| if not trend: | |
| logger.info("No unposted trends available right now.") | |
| return None | |
| logger.info(f"Picked trend #{trend.id}: {trend.topic[:60]}") | |
| return trend.id | |
| async def get_default_agent_version_id() -> int: | |
| """Get the active agent version ID (or create a default one if none exists).""" | |
| from src.models.database import Agent, AgentVersion | |
| async with AsyncSessionLocal() as session: | |
| result = await session.execute( | |
| select(AgentVersion) | |
| .join(Agent, Agent.id == AgentVersion.agent_id) | |
| .where(Agent.is_active.is_(True)) | |
| .order_by(AgentVersion.id.desc()) | |
| .limit(1) | |
| ) | |
| version = result.scalar_one_or_none() | |
| if version: | |
| return version.id | |
| # Create a default agent + version if none exist | |
| agent = Agent( | |
| name="default_instagram_agent", | |
| description="Auto-created default agent for Instagram posts", | |
| is_active=True, | |
| ) | |
| session.add(agent) | |
| await session.flush() | |
| av = AgentVersion( | |
| agent_id=agent.id, | |
| version=1, | |
| config={ | |
| "model": "openai/gpt-oss-20b:free", | |
| "temperature": 0.8, | |
| "platform": "instagram", | |
| }, | |
| created_by="system", | |
| ) | |
| session.add(av) | |
| await session.flush() | |
| agent.current_version_id = av.id | |
| await session.commit() | |
| await session.refresh(av) | |
| logger.info(f"Created default agent version id={av.id}") | |
| return av.id | |
| async def trigger_post_workflow(trend_id: int, agent_version_id: int, platform: str) -> str: | |
| """Kick off a TrendToPostPublishWorkflow for the given trend.""" | |
| from temporalio.client import Client | |
| from src.config import get_settings | |
| from src.temporal_workflows.workflows import TrendToPostPublishWorkflow | |
| settings = get_settings() | |
| client = await Client.connect(f"{settings.temporal_host}:{settings.temporal_port}") | |
| workflow_id = f"auto-post-{trend_id}-{uuid.uuid4().hex[:8]}" | |
| await client.start_workflow( | |
| TrendToPostPublishWorkflow.run, | |
| args=[trend_id, agent_version_id, platform], | |
| id=workflow_id, | |
| task_queue=settings.temporal_task_queue, | |
| ) | |
| logger.info(f"Triggered workflow {workflow_id} for trend {trend_id}") | |
| return workflow_id | |
| # ββ Cron Workflows ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TrendFetchCronWorkflow: | |
| """ | |
| Runs every 2 hours. | |
| Fetches trending topics from HackerNews, RSS feeds, and Google Trends. | |
| Stores new trends in the database for the post scheduler to pick up. | |
| """ | |
| async def run(self) -> dict: | |
| logger.info("TrendFetchCronWorkflow: starting trend fetch") | |
| inserted = await workflow.execute_activity( | |
| fetch_trends_activity, | |
| start_to_close_timeout=timedelta(minutes=5), | |
| retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10)), | |
| ) | |
| logger.info(f"TrendFetchCronWorkflow: inserted {inserted} new trends") | |
| return {"inserted": inserted, "timestamp": datetime.utcnow().isoformat()} | |
| class AutoPostSchedulerWorkflow: | |
| """ | |
| Runs every 4 hours (6 posts/day max). | |
| Picks the best unposted trend and fires the full publish pipeline. | |
| Set AUTO_APPROVE=true in .env to skip human review. | |
| """ | |
| async def run(self, platform: str = "instagram") -> dict: | |
| logger.info("AutoPostSchedulerWorkflow: picking best trend to post") | |
| # Pick a trend | |
| trend_id = await workflow.execute_activity( | |
| pick_best_unposted_trend, | |
| start_to_close_timeout=timedelta(seconds=30), | |
| ) | |
| if not trend_id: | |
| logger.info("AutoPostSchedulerWorkflow: no trends available, skipping") | |
| return {"status": "skipped", "reason": "no_trends"} | |
| # Get agent version | |
| agent_version_id = await workflow.execute_activity( | |
| get_default_agent_version_id, | |
| start_to_close_timeout=timedelta(seconds=30), | |
| ) | |
| # Fire the main workflow | |
| workflow_id = await workflow.execute_activity( | |
| trigger_post_workflow, | |
| args=[trend_id, agent_version_id, platform], | |
| start_to_close_timeout=timedelta(seconds=30), | |
| ) | |
| logger.info(f"AutoPostSchedulerWorkflow: fired {workflow_id}") | |
| return { | |
| "status": "triggered", | |
| "trend_id": trend_id, | |
| "workflow_id": workflow_id, | |
| } | |