orbis-backend / src /temporal_workflows /scheduler_workflows.py
Deusxx1234's picture
Initial deployment to HF Spaces
c84fdae
"""Cron scheduler workflows β€” auto trend fetch + auto post pipeline."""
import uuid
from datetime import timedelta, datetime
from typing import Optional
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from loguru import logger
from sqlalchemy import select
from src.utils.database import AsyncSessionLocal
from src.models.database import Trend, Post as PostModel
# ── Activities ──────────────────────────────────────────────────────────────────
@activity.defn
async def fetch_trends_activity() -> int:
"""Fetch trends from all free sources and store in DB."""
from src.services.trend_fetcher import fetch_and_store_trends
return await fetch_and_store_trends()
@activity.defn
async def pick_best_unposted_trend() -> Optional[int]:
"""
Pick the highest-scoring trend that hasn't been posted yet today.
Returns trend_id or None if nothing available.
"""
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
async with AsyncSessionLocal() as session:
# Subquery: trend_ids already posted today
posted_today = select(PostModel.trend_id).where(PostModel.created_at >= today).scalar_subquery()
result = await session.execute(
select(Trend)
.where(
Trend.status == "pending_generation",
Trend.id.not_in(posted_today),
)
.order_by(Trend.score.desc())
.limit(1)
)
trend = result.scalar_one_or_none()
if not trend:
logger.info("No unposted trends available right now.")
return None
logger.info(f"Picked trend #{trend.id}: {trend.topic[:60]}")
return trend.id
@activity.defn
async def get_default_agent_version_id() -> int:
"""Get the active agent version ID (or create a default one if none exists)."""
from src.models.database import Agent, AgentVersion
async with AsyncSessionLocal() as session:
result = await session.execute(
select(AgentVersion)
.join(Agent, Agent.id == AgentVersion.agent_id)
.where(Agent.is_active.is_(True))
.order_by(AgentVersion.id.desc())
.limit(1)
)
version = result.scalar_one_or_none()
if version:
return version.id
# Create a default agent + version if none exist
agent = Agent(
name="default_instagram_agent",
description="Auto-created default agent for Instagram posts",
is_active=True,
)
session.add(agent)
await session.flush()
av = AgentVersion(
agent_id=agent.id,
version=1,
config={
"model": "openai/gpt-oss-20b:free",
"temperature": 0.8,
"platform": "instagram",
},
created_by="system",
)
session.add(av)
await session.flush()
agent.current_version_id = av.id
await session.commit()
await session.refresh(av)
logger.info(f"Created default agent version id={av.id}")
return av.id
@activity.defn
async def trigger_post_workflow(trend_id: int, agent_version_id: int, platform: str) -> str:
"""Kick off a TrendToPostPublishWorkflow for the given trend."""
from temporalio.client import Client
from src.config import get_settings
from src.temporal_workflows.workflows import TrendToPostPublishWorkflow
settings = get_settings()
client = await Client.connect(f"{settings.temporal_host}:{settings.temporal_port}")
workflow_id = f"auto-post-{trend_id}-{uuid.uuid4().hex[:8]}"
await client.start_workflow(
TrendToPostPublishWorkflow.run,
args=[trend_id, agent_version_id, platform],
id=workflow_id,
task_queue=settings.temporal_task_queue,
)
logger.info(f"Triggered workflow {workflow_id} for trend {trend_id}")
return workflow_id
# ── Cron Workflows ──────────────────────────────────────────────────────────────
@workflow.defn
class TrendFetchCronWorkflow:
"""
Runs every 2 hours.
Fetches trending topics from HackerNews, RSS feeds, and Google Trends.
Stores new trends in the database for the post scheduler to pick up.
"""
@workflow.run
async def run(self) -> dict:
logger.info("TrendFetchCronWorkflow: starting trend fetch")
inserted = await workflow.execute_activity(
fetch_trends_activity,
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10)),
)
logger.info(f"TrendFetchCronWorkflow: inserted {inserted} new trends")
return {"inserted": inserted, "timestamp": datetime.utcnow().isoformat()}
@workflow.defn
class AutoPostSchedulerWorkflow:
"""
Runs every 4 hours (6 posts/day max).
Picks the best unposted trend and fires the full publish pipeline.
Set AUTO_APPROVE=true in .env to skip human review.
"""
@workflow.run
async def run(self, platform: str = "instagram") -> dict:
logger.info("AutoPostSchedulerWorkflow: picking best trend to post")
# Pick a trend
trend_id = await workflow.execute_activity(
pick_best_unposted_trend,
start_to_close_timeout=timedelta(seconds=30),
)
if not trend_id:
logger.info("AutoPostSchedulerWorkflow: no trends available, skipping")
return {"status": "skipped", "reason": "no_trends"}
# Get agent version
agent_version_id = await workflow.execute_activity(
get_default_agent_version_id,
start_to_close_timeout=timedelta(seconds=30),
)
# Fire the main workflow
workflow_id = await workflow.execute_activity(
trigger_post_workflow,
args=[trend_id, agent_version_id, platform],
start_to_close_timeout=timedelta(seconds=30),
)
logger.info(f"AutoPostSchedulerWorkflow: fired {workflow_id}")
return {
"status": "triggered",
"trend_id": trend_id,
"workflow_id": workflow_id,
}