"""Cron scheduler workflows — auto trend fetch + auto post pipeline.""" import uuid from datetime import timedelta, datetime from typing import Optional from temporalio import workflow, activity from temporalio.common import RetryPolicy from loguru import logger from sqlalchemy import select from src.utils.database import AsyncSessionLocal from src.models.database import Trend, Post as PostModel # ── Activities ────────────────────────────────────────────────────────────────── @activity.defn async def fetch_trends_activity() -> int: """Fetch trends from all free sources and store in DB.""" from src.services.trend_fetcher import fetch_and_store_trends return await fetch_and_store_trends() @activity.defn async def pick_best_unposted_trend() -> Optional[int]: """ Pick the highest-scoring trend that hasn't been posted yet today. Returns trend_id or None if nothing available. """ today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) async with AsyncSessionLocal() as session: # Subquery: trend_ids already posted today posted_today = select(PostModel.trend_id).where(PostModel.created_at >= today).scalar_subquery() result = await session.execute( select(Trend) .where( Trend.status == "pending_generation", Trend.id.not_in(posted_today), ) .order_by(Trend.score.desc()) .limit(1) ) trend = result.scalar_one_or_none() if not trend: logger.info("No unposted trends available right now.") return None logger.info(f"Picked trend #{trend.id}: {trend.topic[:60]}") return trend.id @activity.defn async def get_default_agent_version_id() -> int: """Get the active agent version ID (or create a default one if none exists).""" from src.models.database import Agent, AgentVersion async with AsyncSessionLocal() as session: result = await session.execute( select(AgentVersion) .join(Agent, Agent.id == AgentVersion.agent_id) .where(Agent.is_active.is_(True)) .order_by(AgentVersion.id.desc()) .limit(1) ) version = result.scalar_one_or_none() if version: return version.id # Create a default agent + version if none exist agent = Agent( name="default_instagram_agent", description="Auto-created default agent for Instagram posts", is_active=True, ) session.add(agent) await session.flush() av = AgentVersion( agent_id=agent.id, version=1, config={ "model": "openai/gpt-oss-20b:free", "temperature": 0.8, "platform": "instagram", }, created_by="system", ) session.add(av) await session.flush() agent.current_version_id = av.id await session.commit() await session.refresh(av) logger.info(f"Created default agent version id={av.id}") return av.id @activity.defn async def trigger_post_workflow(trend_id: int, agent_version_id: int, platform: str) -> str: """Kick off a TrendToPostPublishWorkflow for the given trend.""" from temporalio.client import Client from src.config import get_settings from src.temporal_workflows.workflows import TrendToPostPublishWorkflow settings = get_settings() client = await Client.connect(f"{settings.temporal_host}:{settings.temporal_port}") workflow_id = f"auto-post-{trend_id}-{uuid.uuid4().hex[:8]}" await client.start_workflow( TrendToPostPublishWorkflow.run, args=[trend_id, agent_version_id, platform], id=workflow_id, task_queue=settings.temporal_task_queue, ) logger.info(f"Triggered workflow {workflow_id} for trend {trend_id}") return workflow_id # ── Cron Workflows ────────────────────────────────────────────────────────────── @workflow.defn class TrendFetchCronWorkflow: """ Runs every 2 hours. Fetches trending topics from HackerNews, RSS feeds, and Google Trends. Stores new trends in the database for the post scheduler to pick up. """ @workflow.run async def run(self) -> dict: logger.info("TrendFetchCronWorkflow: starting trend fetch") inserted = await workflow.execute_activity( fetch_trends_activity, start_to_close_timeout=timedelta(minutes=5), retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10)), ) logger.info(f"TrendFetchCronWorkflow: inserted {inserted} new trends") return {"inserted": inserted, "timestamp": datetime.utcnow().isoformat()} @workflow.defn class AutoPostSchedulerWorkflow: """ Runs every 4 hours (6 posts/day max). Picks the best unposted trend and fires the full publish pipeline. Set AUTO_APPROVE=true in .env to skip human review. """ @workflow.run async def run(self, platform: str = "instagram") -> dict: logger.info("AutoPostSchedulerWorkflow: picking best trend to post") # Pick a trend trend_id = await workflow.execute_activity( pick_best_unposted_trend, start_to_close_timeout=timedelta(seconds=30), ) if not trend_id: logger.info("AutoPostSchedulerWorkflow: no trends available, skipping") return {"status": "skipped", "reason": "no_trends"} # Get agent version agent_version_id = await workflow.execute_activity( get_default_agent_version_id, start_to_close_timeout=timedelta(seconds=30), ) # Fire the main workflow workflow_id = await workflow.execute_activity( trigger_post_workflow, args=[trend_id, agent_version_id, platform], start_to_close_timeout=timedelta(seconds=30), ) logger.info(f"AutoPostSchedulerWorkflow: fired {workflow_id}") return { "status": "triggered", "trend_id": trend_id, "workflow_id": workflow_id, }