orbis-backend / src /api /routes /posts.py
Deusxx1234's picture
fix(scheduler): never block API event loop — mutex + thread pool
4a69d62
"""Posts router — list, preview, approve, reject, delete."""
import asyncio
from fastapi import APIRouter, Query, HTTPException
from loguru import logger
from pydantic import BaseModel
from sqlalchemy import select, func, delete, cast, Date, or_
from sqlalchemy.orm import selectinload
from src.utils.database import AsyncSessionLocal
from src.models.database import Post, Trend, WorkflowExecution, Notification
from typing import Optional, List
from datetime import datetime, timedelta
router = APIRouter()
# In-memory registry of active generation tasks: post_id -> asyncio.Task
_generation_tasks: dict = {}
def _music_snippet(p: Post) -> Optional[dict]:
mt = getattr(p, "music_track", None)
if not mt:
return None
mins, secs = divmod(mt.duration_secs or 0, 60)
return {
"id": mt.id,
"title": mt.title,
"artist": mt.artist,
"mood": mt.mood,
"duration_label": f"{mins}:{secs:02d}",
"preview_url": mt.preview_url,
}
def _post_dict(p: Post, trend_topic: Optional[str] = None, workflow_id: Optional[str] = None) -> dict:
return {
"id": p.id,
"trend_id": p.trend_id,
"trend_topic": trend_topic,
"workflow_id": workflow_id,
"content": p.content,
"post_type": getattr(p, "post_type", "image") or "image",
"image_url": p.image_url,
"carousel_urls": getattr(p, "carousel_urls", None),
"video_url": getattr(p, "video_url", None),
"link_url": getattr(p, "link_url", None),
"link_title": getattr(p, "link_title", None),
"link_description": getattr(p, "link_description", None),
"platform": p.platform,
"status": p.status,
"display_status": "scheduled" if (p.status == "draft" and p.approval_status == "approved" and p.scheduled_at) else p.status,
"approval_status": p.approval_status,
"platform_post_id": p.platform_post_id,
"scheduled_at": p.scheduled_at,
"published_at": p.published_at,
"created_at": p.created_at,
"updated_at": p.updated_at,
"generation_cost": p.generation_cost,
"token_count": p.token_count,
"hallucination_score": getattr(p, "hallucination_score", None),
"generation_error": getattr(p, "generation_error", None),
"music_id": getattr(p, "music_id", None),
"music_track": _music_snippet(p),
"quality_score": getattr(p, "quality_score", None),
"quality_breakdown": getattr(p, "quality_breakdown", None),
"ig_likes": getattr(p, "ig_likes", None),
"ig_comments": getattr(p, "ig_comments", None),
"ig_reach": getattr(p, "ig_reach", None),
"ig_saves": getattr(p, "ig_saves", None),
"ig_shares": getattr(p, "ig_shares", None),
"engagement_fetched_at": getattr(p, "engagement_fetched_at", None),
}
async def _auto_attach_music(post_id: int, content: str, topic: str) -> None:
"""Detect mood from post content, find/import a matching track, attach to post."""
from src.api.routes.music import _detect_moods, _jamendo_to_track, JAMENDO_MOOD_TAGS
from src.models.database import MusicTrack
from src.config import get_settings
import httpx
combined = f"{content} {topic}"
moods = _detect_moods(combined)
async with AsyncSessionLocal() as session:
# 1. Try local library first — pick randomly among mood-matched tracks
import random as _rnd
track = None
for mood in moods:
result = await session.execute(
select(MusicTrack)
.where(MusicTrack.is_active == True, MusicTrack.mood == mood)
)
candidates = result.scalars().all()
if candidates:
track = _rnd.choice(candidates)
break
# 2. No local match — search Jamendo, pick random from top 10
if not track:
mood = moods[0]
client_id = get_settings().jamendo_client_id
if client_id:
try:
tags = JAMENDO_MOOD_TAGS.get(mood, "upbeat")
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
"https://api.jamendo.com/v3.0/tracks/",
params={
"client_id": client_id,
"format": "json",
"limit": 10,
"include": "musicinfo",
"fuzzytags": tags,
},
)
resp.raise_for_status()
results = resp.json().get("results", [])
if results:
t = _jamendo_to_track(_rnd.choice(results))
track = MusicTrack(
title=t["title"],
artist=t.get("artist"),
genre=t.get("genre"),
mood=mood,
bpm=t.get("bpm"),
duration_secs=t.get("duration_secs"),
preview_url=t.get("preview_url"),
cover_url=t.get("cover_url"),
tags=t.get("tags", []),
)
session.add(track)
await session.flush() # get track.id before commit
except Exception as e:
logger.warning(f"[AutoMusic] Jamendo fetch failed: {e}")
if not track:
return
# 3. Attach to post
post = await session.get(Post, post_id)
if post:
post.music_id = track.id
await session.commit()
logger.info(f"[AutoMusic] Attached track '{track.title}' (id={track.id}) to post {post_id}")
async def _get_workflow_id(session, post_id: int) -> Optional[str]:
"""Look up the most recent workflow for a given post."""
result = await session.execute(
select(WorkflowExecution.workflow_id)
.where(WorkflowExecution.post_id == post_id)
.order_by(WorkflowExecution.started_at.desc())
.limit(1)
)
return result.scalar_one_or_none()
async def _get_admin_user_id(session) -> Optional[int]:
"""Return the first admin user's DB id for notification targeting."""
from src.models.database import User
result = await session.execute(select(User).where(User.is_active == True).limit(1))
user = result.scalar_one_or_none()
return user.id if user else None
async def _create_notification(
session,
notification_type: str,
title: str,
message: str,
payload: Optional[dict] = None,
) -> None:
user_id = await _get_admin_user_id(session)
session.add(
Notification(
user_id=user_id,
type=notification_type,
title=title,
message=message,
payload=payload or {},
)
)
# ── List ───────────────────────────────────────────────────────────────────────
@router.get("/")
async def list_posts(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
status: Optional[str] = None,
platform: Optional[str] = None,
post_type: Optional[str] = None,
):
async with AsyncSessionLocal() as session:
q = select(Post)
if status:
q = q.where(Post.status == status)
if platform:
q = q.where(Post.platform == platform)
if post_type:
q = q.where(Post.post_type == post_type)
q = q.options(selectinload(Post.music_track)).order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size)
count_q = select(func.count(Post.id))
if status:
count_q = count_q.where(Post.status == status)
if platform:
count_q = count_q.where(Post.platform == platform)
if post_type:
count_q = count_q.where(Post.post_type == post_type)
# SQLAlchemy async sessions don't allow concurrent operations on the same session
results = await session.execute(q)
total_row = await session.execute(count_q)
all_total_row = await session.execute(select(func.count(Post.id)))
total = total_row.scalar()
all_total = all_total_row.scalar()
posts = results.scalars().all()
# Bulk-fetch trend topics in one query
trend_ids = list({p.trend_id for p in posts if p.trend_id})
trend_map: dict = {}
if trend_ids:
tr_rows = (await session.execute(
select(Trend.id, Trend.topic).where(Trend.id.in_(trend_ids))
)).all()
trend_map = {r.id: r.topic for r in tr_rows}
# Bulk-fetch workflow IDs in one query
post_ids = [p.id for p in posts]
wf_map: dict = {}
if post_ids:
wf_rows = (await session.execute(
select(WorkflowExecution.post_id, WorkflowExecution.workflow_id)
.where(WorkflowExecution.post_id.in_(post_ids))
.distinct(WorkflowExecution.post_id)
.order_by(WorkflowExecution.post_id, WorkflowExecution.started_at.desc())
)).all()
for row in wf_rows:
if row.post_id not in wf_map:
wf_map[row.post_id] = row.workflow_id
items = [
_post_dict(p, trend_map.get(p.trend_id), wf_map.get(p.id))
for p in posts
]
return {"total": total, "all_total": all_total, "page": page, "page_size": page_size, "items": items}
@router.get("/pending-approval")
async def pending_posts(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
):
async with AsyncSessionLocal() as session:
q = select(Post).options(selectinload(Post.music_track)).where(Post.status == "draft").order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size)
results = await session.execute(q)
total_row = await session.execute(select(func.count(Post.id)).where(Post.status == "draft"))
posts = results.scalars().all()
trend_ids = list({p.trend_id for p in posts if p.trend_id})
trend_map: dict = {}
if trend_ids:
tr_rows = (await session.execute(select(Trend.id, Trend.topic).where(Trend.id.in_(trend_ids)))).all()
trend_map = {r.id: r.topic for r in tr_rows}
post_ids = [p.id for p in posts]
wf_map: dict = {}
if post_ids:
wf_rows = (await session.execute(
select(WorkflowExecution.post_id, WorkflowExecution.workflow_id)
.where(WorkflowExecution.post_id.in_(post_ids))
.distinct(WorkflowExecution.post_id)
.order_by(WorkflowExecution.post_id, WorkflowExecution.started_at.desc())
)).all()
for row in wf_rows:
if row.post_id not in wf_map:
wf_map[row.post_id] = row.workflow_id
items = [_post_dict(p, trend_map.get(p.trend_id), wf_map.get(p.id)) for p in posts]
return {"total": total_row.scalar(), "page": page, "page_size": page_size, "items": items}
# ── Analytics ─────────────────────────────────────────────────────────────────
@router.get("/analytics/summary")
async def posts_summary():
"""Overall stats for the dashboard."""
async with AsyncSessionLocal() as session:
total = (await session.execute(select(func.count(Post.id)))).scalar()
published = (
await session.execute(select(func.count(Post.id)).where(Post.status == "published"))
).scalar()
drafts = (
await session.execute(select(func.count(Post.id)).where(Post.status == "draft"))
).scalar()
failed = (
await session.execute(select(func.count(Post.id)).where(Post.status == "failed"))
).scalar()
by_platform = await session.execute(
select(Post.platform, func.count(Post.id).label("count"))
.where(Post.status == "published")
.group_by(Post.platform)
)
total_cost = (
await session.execute(
select(func.sum(Post.generation_cost)).where(Post.generation_cost.isnot(None))
)
).scalar() or 0.0
return {
"total_posts": total,
"published": published,
"drafts_pending": drafts,
"failed": failed,
"total_cost_usd": round(total_cost, 4),
"by_platform": [
{"platform": row.platform, "count": row.count} for row in by_platform.all()
],
}
# ── Preview ───────────────────────────────────────────────────────────────────
@router.get("/calendar")
async def calendar_posts(
start: datetime = Query(...),
end: datetime = Query(...),
limit: int = Query(default=200, le=500),
):
"""Posts that belong in the calendar for a date range."""
start = start.replace(tzinfo=None)
end = end.replace(tzinfo=None)
async with AsyncSessionLocal() as session:
rows = await session.execute(
select(Post)
.options(selectinload(Post.music_track))
.where(
or_(
Post.scheduled_at.between(start, end),
Post.published_at.between(start, end),
Post.created_at.between(start, end),
)
)
.order_by(Post.created_at.asc())
.limit(limit)
)
posts = rows.scalars().all()
# batch-load trends to avoid N+1
trend_ids = {p.trend_id for p in posts if p.trend_id}
trend_map: dict = {}
if trend_ids:
t_rows = await session.execute(select(Trend).where(Trend.id.in_(trend_ids)))
trend_map = {t.id: t.topic for t in t_rows.scalars().all()}
items = []
for post in posts:
trend_topic = trend_map.get(post.trend_id) if post.trend_id else None
item = _post_dict(post, trend_topic, await _get_workflow_id(session, post.id))
item["calendar_at"] = post.scheduled_at or post.published_at or post.created_at
items.append(item)
return {"items": items, "start": start, "end": end, "total": len(items)}
class PostEditBody(BaseModel):
content: Optional[str] = None
platform: Optional[str] = None
post_type: Optional[str] = None
scheduled_at: Optional[datetime] = None
image_url: Optional[str] = None
link_url: Optional[str] = None
link_title: Optional[str] = None
link_description: Optional[str] = None
@router.put("/{post_id}")
async def update_post(post_id: int, body: PostEditBody):
"""Edit a draft or failed post before approval/publish."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status == "published":
raise HTTPException(status_code=400, detail="Published posts cannot be edited")
fields_set = getattr(body, "model_fields_set", getattr(body, "__fields_set__", set()))
for field in (
"content",
"platform",
"post_type",
"scheduled_at",
"image_url",
"link_url",
"link_title",
"link_description",
):
if field in fields_set:
setattr(post, field, getattr(body, field))
post.updated_at = datetime.utcnow()
await session.commit()
await session.refresh(post)
trend_topic = None
if post.trend_id:
trend = await session.get(Trend, post.trend_id)
trend_topic = trend.topic if trend else None
return _post_dict(post, trend_topic, await _get_workflow_id(session, post.id))
@router.get("/{post_id}/preview")
async def preview_post(post_id: int):
"""Return post content + image URL for preview before approving."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
trend_topic = None
if post.trend_id:
tr = await session.get(Trend, post.trend_id)
trend_topic = tr.topic if tr else None
workflow_id = await _get_workflow_id(session, post.id)
return {
"post_id": post.id,
"trend_topic": trend_topic,
"workflow_id": workflow_id,
"caption": post.content,
"image_url": post.image_url,
"platform": post.platform,
"status": post.status,
"character_count": len(post.content) if post.content else 0,
"hashtags": [w for w in (post.content or "").split() if w.startswith("#")],
}
# ── Approve / Reject ─────────────────────────────────────────────────────────
class ApproveBody(BaseModel):
publish_mode: str = "post" # "post" | "reel"
@router.post("/{post_id}/approve")
async def approve_post(post_id: int, body: ApproveBody = ApproveBody()):
"""Approve a post — signals Temporal if workflow exists, else updates DB directly."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status == "published":
return {"message": "Already published", "post_id": post_id}
workflow_id = await _get_workflow_id(session, post.id)
# Try to signal Temporal workflow
if workflow_id:
try:
from src.config import get_settings
from temporalio.client import Client
settings = get_settings()
client = await Client.connect(
f"{settings.temporal_host}:{settings.temporal_port}",
namespace=settings.temporal_namespace,
)
handle = client.get_workflow_handle(workflow_id)
await handle.signal("approved")
return {"message": "Approved via Temporal", "workflow_id": workflow_id}
except Exception:
pass # Temporal not running — fall through to direct DB update
# Direct publish — call Instagram API then update DB
async with AsyncSessionLocal() as session:
post = (await session.execute(
select(Post).where(Post.id == post_id).options(selectinload(Post.music_track))
)).scalar_one_or_none()
content = post.content
image_url = post.image_url
carousel_urls = getattr(post, "carousel_urls", None) or []
post_type = post.post_type or "text_image"
platform = post.platform or "instagram"
music_url = post.music_track.preview_url if post.music_track else None
# Auto-improve content quality before publishing
try:
from src.services.post_improver import auto_improve
content, improved_score = await auto_improve(content, post_type)
# Persist improved content + updated score back to DB
async with AsyncSessionLocal() as session:
post_upd = await session.get(Post, post_id)
if post_upd:
post_upd.content = content
post_upd.quality_score = improved_score["score"]
post_upd.quality_breakdown = improved_score["breakdown"]
await session.commit()
except Exception as e:
logger.warning(f"Auto-improve skipped: {e}")
# Normalize hashtags in caption — ensure every tag word starts with #
import re as _re
def _normalize_caption_hashtags(text: str) -> str:
def fix_tag_line(m):
words = m.group(0).split()
return " ".join(w if w.startswith("#") else f"#{w}" for w in words)
return _re.sub(r"^(?:#?[A-Za-z][A-Za-z0-9]{2,}\s*){3,}$", fix_tag_line, text, flags=_re.MULTILINE)
content = _normalize_caption_hashtags(content)
# Inject auto_hashtags from brand voice (e.g. #dailywireup #news)
try:
from src.models.database import BrandVoice as _BV
async with AsyncSessionLocal() as _s:
_bv = (await _s.execute(
select(_BV).where(_BV.is_active == True).limit(1)
)).scalar_one_or_none()
if _bv and _bv.auto_hashtags:
extra = _bv.auto_hashtags.strip()
if extra and extra not in content:
content = content.rstrip() + "\n" + extra
except Exception:
pass
platform_post_id = None
publish_error = None
try:
from src.services.social_media import SocialMediaPublisher
publisher = SocialMediaPublisher()
# For carousel posts send all slide images; otherwise single image
if post_type == "carousel" and carousel_urls:
publish_images = carousel_urls
elif image_url:
publish_images = [image_url]
else:
publish_images = []
# If user chose "reel", force reel publishing regardless of post_type
effective_type = "carousel" if body.publish_mode == "reel" else post_type
effective_music = music_url if body.publish_mode == "reel" else (music_url if post_type == "carousel" else None)
result = await publisher.publish_post(
platform=platform,
text=content,
image_urls=publish_images,
post_type=effective_type,
music_url=effective_music,
)
if result["success"]:
platform_post_id = result.get("platform_post_id")
else:
publish_error = result.get("error")
logger.warning(f"Publish failed for post {post_id}: {publish_error}")
except Exception as e:
publish_error = str(e)
logger.error(f"Publish exception for post {post_id}: {e}")
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
post.approval_status = "approved"
if platform_post_id:
post.status = "published"
post.published_at = datetime.utcnow()
post.generation_error = None
post.platform_post_id = platform_post_id
else:
post.status = "failed"
post.generation_error = publish_error or "Publish failed"
await _create_notification(
session,
"publish_failed",
f"Post #{post_id} failed to publish",
post.generation_error,
{"post_id": post_id, "platform": platform},
)
await session.commit()
if platform_post_id:
return {"message": "Approved and published to Instagram", "post_id": post_id, "platform_post_id": platform_post_id}
return {"message": "Approved but publish failed", "post_id": post_id, "error": publish_error}
@router.post("/{post_id}/reject")
async def reject_post(post_id: int):
"""Reject a post — signals Temporal if workflow exists, else updates DB directly."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
workflow_id = await _get_workflow_id(session, post.id)
# Try to signal Temporal workflow
if workflow_id:
try:
from src.config import get_settings
from temporalio.client import Client
settings = get_settings()
client = await Client.connect(
f"{settings.temporal_host}:{settings.temporal_port}",
namespace=settings.temporal_namespace,
)
handle = client.get_workflow_handle(workflow_id)
await handle.signal("rejected")
return {"message": "Rejected via Temporal", "workflow_id": workflow_id}
except Exception:
pass # Temporal not running — fall through to direct DB update
# Direct DB update
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
post.approval_status = "rejected"
post.status = "failed"
await session.commit()
return {"message": "Rejected directly", "post_id": post_id}
# ── Delete ────────────────────────────────────────────────────────────────────
@router.post("/{post_id}/regenerate")
async def regenerate_post(post_id: int):
"""Re-run the generation pipeline for a failed or draft post using the same trend."""
import asyncio
from src.temporal_workflows.pipeline import (
generate_post_content, generate_images, store_media_to_cdn, save_post_draft,
GeneratePostInput, GenerateImagesInput, StoreMediaInput, SavePostDraftInput, TrendData,
)
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status == "published":
raise HTTPException(status_code=400, detail="Cannot regenerate a published post")
trend_id = post.trend_id
platform = post.platform or "instagram"
post_type = getattr(post, "post_type", "image") or "image"
trend_row = await session.get(Trend, trend_id) if trend_id else None
if not trend_row:
raise HTTPException(status_code=404, detail="Original trend not found — cannot regenerate")
trend_data = TrendData(
trend_id=trend_row.id,
topic=trend_row.topic,
source=trend_row.source,
score=trend_row.score or 0.0,
raw_data=trend_row.raw_data,
)
# Reset post to generating state immediately
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
post.status = "generating"
post.content = f"Regenerating post about: {trend_data.topic}"
post.image_url = None
post.generation_error = None
await session.commit()
async def _run():
try:
post_data = await generate_post_content(GeneratePostInput(trend=trend_data, agent_version_id=1))
effective_type = post_type or post_data.post_type
images = await generate_images(GenerateImagesInput(
topic=trend_data.topic, count=1, image_prompt=post_data.image_prompt,
post_type=effective_type,
carousel_image_prompts=post_data.carousel_image_prompts if effective_type == "carousel" else None,
))
cdn_images = await store_media_to_cdn(StoreMediaInput(images=images, post_id=post_id))
await save_post_draft(SavePostDraftInput(
post=post_data, trend_id=trend_id, agent_version_id=1,
images=cdn_images, platform=platform, post_type=post_type,
placeholder_id=post_id,
))
logger.info(f"[Regenerate] Post {post_id} regenerated successfully")
except Exception as e:
logger.error(f"[Regenerate] Failed for post {post_id}: {e}")
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if post:
post.status = "failed"
post.content = f"Regeneration failed: {e}"
post.generation_error = str(e)
await _create_notification(
session,
"generation_failed",
f"Post #{post_id} regeneration failed",
str(e),
{"post_id": post_id},
)
await session.commit()
asyncio.create_task(_run())
return {"message": "Regeneration started", "post_id": post_id}
@router.post("/{post_id}/regenerate-image")
async def regenerate_image(post_id: int):
"""Re-generate only the image for a post, keeping the caption unchanged."""
import asyncio
from src.temporal_workflows.pipeline import (
generate_images, store_media_to_cdn,
GenerateImagesInput, StoreMediaInput, TrendData,
)
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status == "published":
raise HTTPException(status_code=400, detail="Cannot regenerate image on a published post")
trend_row = await session.get(Trend, post.trend_id) if post.trend_id else None
topic = trend_row.topic if trend_row else (post.content or "")[:80]
post_type = getattr(post, "post_type", "image") or "image"
async def _run_image():
try:
images = await generate_images(GenerateImagesInput(
topic=topic, count=1, image_prompt=None, post_type=post_type,
))
cdn = await store_media_to_cdn(StoreMediaInput(images=images, post_id=post_id))
new_url = cdn.get("image_url") or cdn.get("cover_url")
if new_url:
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if post:
post.image_url = new_url
post.updated_at = datetime.utcnow()
await session.commit()
logger.info(f"[RegenerateImage] Post {post_id} new image: {new_url}")
except Exception as e:
logger.error(f"[RegenerateImage] Failed for post {post_id}: {e}")
asyncio.create_task(_run_image())
return {"message": "Image regeneration started", "post_id": post_id}
@router.get("/{post_id}/insights")
async def post_insights(post_id: int):
"""Fetch Instagram media insights for a published post."""
import httpx, os
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
media_id = post.platform_post_id
platform = post.platform or "instagram"
if not media_id:
return {"available": False, "reason": "Post not published or missing platform ID"}
if platform.lower() != "instagram":
return {"available": False, "reason": f"Insights only available for Instagram (this is {platform})"}
from src.config import get_settings as _gs
token = _gs().instagram_access_token or os.getenv("INSTAGRAM_ACCESS_TOKEN", "")
if not token:
return {"available": False, "reason": "Instagram token not configured"}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://graph.facebook.com/v22.0/{media_id}/insights",
params={
"metric": "reach,saved,likes,comments,shares,total_interactions,views",
"access_token": token,
},
)
data = resp.json()
if "error" in data:
return {"available": False, "reason": data["error"].get("message", "API error")}
metrics = {}
for item in data.get("data", []):
val = item.get("values", [{}])[0].get("value", 0) if item.get("values") else item.get("value", 0)
metrics[item["name"]] = val
return {"available": True, "media_id": media_id, "metrics": metrics}
except Exception as e:
return {"available": False, "reason": str(e)}
class SchedulePostBody(BaseModel):
scheduled_at: datetime
@router.post("/{post_id}/schedule")
@router.put("/{post_id}/schedule")
async def schedule_post(post_id: int, body: SchedulePostBody):
"""Schedule or reschedule a post (POST = new schedule, PUT = drag-and-drop reschedule)."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status == "published":
raise HTTPException(status_code=400, detail="Post already published")
scheduled_at = body.scheduled_at.replace(tzinfo=None)
post.scheduled_at = scheduled_at
post.approval_status = "approved"
post.updated_at = datetime.utcnow()
await session.commit()
return {"message": "Post scheduled", "post_id": post_id, "scheduled_at": scheduled_at.isoformat()}
@router.post("/{post_id}/discard")
async def discard_post(post_id: int):
"""Discard a draft post — hard deletes it so it never gets published."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status not in ("draft",):
raise HTTPException(status_code=400, detail=f"Cannot discard a post with status '{post.status}'")
await session.delete(post)
await session.commit()
return {"discarded": True, "post_id": post_id}
@router.delete("/{post_id}")
async def delete_post(post_id: int):
"""Hard delete a single post by ID."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
await session.delete(post)
await session.commit()
return {"deleted": 1}
class GeneratePostBody(BaseModel):
trend_id: Optional[int] = None # if None, picks the top pending trend
platform: str = "instagram"
post_type: str = "image"
@router.post("/generate")
async def generate_post(body: GeneratePostBody):
"""Run the full generation pipeline for one trend without Temporal."""
import asyncio
from src.temporal_workflows.pipeline import (
generate_post_content, generate_images,
store_media_to_cdn, save_post_draft,
GeneratePostInput, GenerateImagesInput, StoreMediaInput, SavePostDraftInput,
TrendData,
)
# Pick trend
async with AsyncSessionLocal() as session:
if body.trend_id:
trend_row = await session.get(Trend, body.trend_id)
if not trend_row:
raise HTTPException(status_code=404, detail="Trend not found")
else:
from sqlalchemy import func as sqlfunc
from datetime import datetime as dt
hours_old = sqlfunc.greatest(
sqlfunc.extract("epoch", dt.utcnow() - Trend.created_at) / 3600.0,
0.01
)
boosted = Trend.score / (1.0 + hours_old / 12.0)
result = await session.execute(
select(Trend)
.where(Trend.status == "pending_generation")
.order_by(boosted.desc())
.limit(1)
)
trend_row = result.scalar_one_or_none()
if not trend_row:
raise HTTPException(status_code=404, detail="No pending trends available")
trend_id = trend_row.id
trend_data = TrendData(
trend_id=trend_row.id,
topic=trend_row.topic,
source=trend_row.source,
score=trend_row.score or 0.0,
raw_data=trend_row.raw_data,
)
# ── Duplicate check: find semantically similar existing posts ──────────
from src.services.embeddings import get_embedding, find_similar_posts
trend_embedding = await get_embedding(trend_data.topic)
duplicate_warning = None
if trend_embedding:
similar = await find_similar_posts(trend_embedding, threshold=0.88, limit=1)
if similar:
duplicate_warning = {
"post_id": similar[0]["id"],
"similarity": round(float(similar[0]["similarity"]) * 100),
"content_preview": (similar[0]["content"] or "")[:80],
}
# ── Insert placeholder row immediately so the UI can track progress ──
async with AsyncSessionLocal() as session:
placeholder = Post(
trend_id=trend_id,
agent_version_id=1,
content=f"Generating post about: {trend_data.topic}",
post_type=body.post_type,
platform=body.platform,
status="generating",
approval_status="pending",
generation_error=None,
)
session.add(placeholder)
await session.commit()
await session.refresh(placeholder)
placeholder_id = placeholder.id
async def _run_pipeline():
from src.agents.post_generator import set_post_step
try:
set_post_step(placeholder_id, "Analysing trend...")
post_data = await generate_post_content(GeneratePostInput(trend=trend_data, agent_version_id=1, post_type=body.post_type, post_id=placeholder_id))
set_post_step(placeholder_id, "Generating image(s)...")
carousel_prompts = post_data.carousel_image_prompts if body.post_type == "carousel" else None
img_count = len(carousel_prompts) if carousel_prompts else (6 if body.post_type == "carousel" else 1)
images = await generate_images(GenerateImagesInput(
topic=trend_data.topic,
count=img_count,
image_prompt=post_data.image_prompt,
post_type=body.post_type,
carousel_image_prompts=carousel_prompts,
post_id=placeholder_id,
))
set_post_step(placeholder_id, "Storing media...")
cdn_images = await store_media_to_cdn(StoreMediaInput(images=images, post_id=placeholder_id))
saved = await save_post_draft(SavePostDraftInput(
post=post_data,
trend_id=trend_id,
agent_version_id=1,
images=cdn_images,
platform=body.platform,
post_type=body.post_type,
placeholder_id=placeholder_id,
token_count=post_data.token_count,
generation_cost=post_data.generation_cost,
hallucination_score=post_data.hallucination_score,
))
# Mark trend as used
async with AsyncSessionLocal() as session:
t = await session.get(Trend, trend_id)
if t:
t.status = "generated"
t.used_for_post = True
await session.commit()
set_post_step(placeholder_id, "Matching music to topic mood...")
# Auto-attach music based on post content + trend topic
await _auto_attach_music(placeholder_id, post_data.content, trend_data.topic)
set_post_step(placeholder_id, "Scoring quality — hook, CTA, hashtags, brand fit...")
# Compute quality score, auto-improve if weak
try:
from src.services.quality_scorer import score_post
from src.services.post_improver import auto_improve
from src.models.database import BrandVoice
async with AsyncSessionLocal() as session:
brand = await session.execute(
select(BrandVoice).where(BrandVoice.is_active == True).limit(1)
)
brand_voice = brand.scalar_one_or_none()
initial = score_post(post_data.content, body.post_type, brand_voice, body.platform)
set_post_step(placeholder_id, f"Initial score: {initial['score']}/100 ({initial['grade']})")
logger.info(f"[Quality] Post {placeholder_id} initial score {initial['score']}/100 ({initial['grade']})")
final_content = post_data.content
final_result = initial
if initial["score"] < 70:
set_post_step(placeholder_id, f"Score {initial['score']}/100 — auto-improving...")
final_content, final_result = await auto_improve(post_data.content, body.post_type, brand_voice, post_id=placeholder_id)
post = await session.get(Post, placeholder_id)
if post:
post.content = final_content
post.quality_score = final_result["score"]
post.quality_breakdown = final_result["breakdown"]
await session.commit()
logger.info(f"[Quality] Post {placeholder_id} final score {final_result['score']}/100 ({final_result['grade']})")
except Exception as qe:
logger.warning(f"[Quality] Scoring failed for post {placeholder_id}: {qe}")
logger.info(f"[Generate] Post {placeholder_id} generation complete")
# Budget alert check
try:
from src.config import get_settings as _gs
_budget = _gs().monthly_budget_usd
if _budget and _budget > 0:
from datetime import datetime as _dt
_month = _dt.utcnow().strftime("%Y-%m")
async with AsyncSessionLocal() as _bs:
_spent = (await _bs.execute(
select(func.sum(Post.generation_cost))
.where(Post.generation_cost.isnot(None))
.where(func.to_char(Post.created_at, "YYYY-MM") == _month)
)).scalar() or 0.0
if _spent >= _budget:
_exists = (await _bs.execute(
select(Notification).where(
Notification.type == "budget_exceeded",
).order_by(Notification.created_at.desc()).limit(1)
)).scalar_one_or_none()
if not _exists:
_bs.add(Notification(
type="budget_exceeded",
title="Monthly budget exceeded",
message=f"Generation cost this month: ${_spent:.4f} — over your ${_budget:.2f} limit.",
is_read=False,
))
await _bs.commit()
logger.warning(f"[Budget] Monthly spend ${_spent:.4f} exceeded limit ${_budget:.2f}")
except Exception:
pass
# Clear step tracker so the UI stops showing "Scoring quality..."
from src.agents.post_generator import _post_steps
_post_steps.pop(placeholder_id, None)
# Force WS push in case PG NOTIFY isn't reaching us (Neon pooled connections)
from src.utils.notify import mark_dirty
mark_dirty("posts_changed")
except Exception as e:
logger.error(f"[Generate] Pipeline failed for placeholder {placeholder_id}: {e}")
async with AsyncSessionLocal() as session:
post = await session.get(Post, placeholder_id)
if post:
post.status = "failed"
post.content = f"Generation failed: {e}"
post.generation_error = str(e)
await _create_notification(
session,
"generation_failed",
f"Post #{placeholder_id} generation failed",
str(e),
{"post_id": placeholder_id, "trend_id": trend_id},
)
await session.commit()
from src.agents.post_generator import _post_steps
_post_steps.pop(placeholder_id, None)
from src.utils.notify import mark_dirty
mark_dirty("posts_changed")
# Fire pipeline in background — return immediately with placeholder id
task = asyncio.create_task(_run_pipeline())
_generation_tasks[placeholder_id] = task
task.add_done_callback(lambda _: _generation_tasks.pop(placeholder_id, None))
return {
"message": "Generation started",
"post_id": placeholder_id,
"trend_id": trend_id,
"topic": trend_data.topic,
"platform": body.platform,
"duplicate_warning": duplicate_warning,
}
@router.post("/{post_id}/cancel-generation")
async def cancel_generation(post_id: int):
"""Cancel an in-progress post generation and mark the post as discarded."""
task = _generation_tasks.get(post_id)
if task and not task.done():
task.cancel()
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.status not in ("generating",):
raise HTTPException(status_code=400, detail=f"Post is not generating (status: {post.status})")
post.status = "failed"
post.generation_error = "Cancelled by user"
await session.commit()
return {"post_id": post_id, "cancelled": True}
class ManualPostBody(BaseModel):
content: str
platform: str = "instagram"
post_type: str = "text_only"
image_url: Optional[str] = None
link_url: Optional[str] = None
link_title: Optional[str] = None
link_description: Optional[str] = None
@router.post("/manual")
async def create_manual_post(body: ManualPostBody):
"""Create a post manually — no AI generation, saves directly as draft."""
async with AsyncSessionLocal() as session:
post = Post(
content=body.content,
platform=body.platform,
post_type=body.post_type,
image_url=body.image_url or None,
link_url=body.link_url or None,
link_title=body.link_title or None,
link_description=body.link_description or None,
status="draft",
approval_status="pending",
)
session.add(post)
await session.commit()
await session.refresh(post)
return _post_dict(post)
class MusicAttachBody(BaseModel):
music_id: Optional[int] = None
@router.patch("/{post_id}/music")
async def attach_music(post_id: int, body: MusicAttachBody):
"""Attach or detach a music track from a post."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
post.music_id = body.music_id
post.updated_at = datetime.utcnow()
await session.commit()
return {"post_id": post_id, "music_id": body.music_id}
class BulkIdsBody(BaseModel):
ids: List[int]
@router.post("/bulk-approve")
async def bulk_approve_posts(body: BulkIdsBody):
"""Queue multiple draft posts for immediate publish."""
import asyncio
async def _approve(post_id: int):
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post or post.status not in ("draft", "failed"):
return {"post_id": post_id, "ok": False, "error": "Invalid status"}
try:
from src.services.scheduler import run_auto_publish
await run_auto_publish(post_id)
return {"post_id": post_id, "ok": True}
except Exception as e:
return {"post_id": post_id, "ok": False, "error": str(e)}
results = await asyncio.gather(*[_approve(pid) for pid in body.ids])
return {"results": list(results), "approved": sum(1 for r in results if r["ok"])}
class BulkScheduleBody(BaseModel):
ids: List[int]
scheduled_at: datetime
@router.post("/bulk-schedule")
async def bulk_schedule_posts(body: BulkScheduleBody):
"""Approve + schedule multiple draft posts for the same time slot."""
if not body.ids:
return {"scheduled": 0}
scheduled_at = body.scheduled_at.replace(tzinfo=None)
async with AsyncSessionLocal() as session:
rows = await session.execute(select(Post).where(Post.id.in_(body.ids)))
posts = rows.scalars().all()
count = 0
for p in posts:
if p.status == "draft":
p.approval_status = "approved"
p.scheduled_at = scheduled_at
count += 1
await session.commit()
return {"scheduled": count, "scheduled_at": scheduled_at.isoformat()}
@router.post("/bulk-reject")
async def bulk_reject_posts(body: BulkIdsBody):
"""Reject multiple draft posts."""
if not body.ids:
return {"rejected": 0}
async with AsyncSessionLocal() as session:
rows = await session.execute(select(Post).where(Post.id.in_(body.ids)))
posts = rows.scalars().all()
for p in posts:
if p.status == "draft":
p.approval_status = "rejected"
p.status = "failed"
await session.commit()
return {"rejected": len(body.ids)}
@router.post("/bulk-delete")
async def bulk_delete_posts(body: BulkIdsBody):
"""Hard delete multiple posts by ID list."""
if not body.ids:
return {"deleted": 0}
async with AsyncSessionLocal() as session:
result = await session.execute(
delete(Post).where(Post.id.in_(body.ids))
)
await session.commit()
return {"deleted": result.rowcount}
# ── Extended Analytics ─────────────────────────────────────────────────────────
@router.get("/analytics/timeseries")
async def posts_timeseries(days: int = Query(30, ge=7, le=365)):
"""Daily post counts for the last N days — published + drafted."""
cutoff = datetime.utcnow() - timedelta(days=days)
async with AsyncSessionLocal() as session:
rows = await session.execute(
select(
cast(Post.created_at, Date).label("day"),
Post.status,
func.count(Post.id).label("count"),
)
.where(Post.created_at >= cutoff)
.group_by(cast(Post.created_at, Date), Post.status)
.order_by(cast(Post.created_at, Date))
)
data: dict = {}
for row in rows.all():
day = str(row.day)
if day not in data:
data[day] = {"date": day, "published": 0, "draft": 0, "failed": 0}
key = row.status if row.status in ("published", "draft", "failed") else "draft"
data[day][key] += row.count
return {"days": days, "series": list(data.values())}
@router.get("/analytics/by-type")
async def posts_by_type():
"""Breakdown of post counts by post_type."""
async with AsyncSessionLocal() as session:
rows = await session.execute(
select(Post.post_type, func.count(Post.id).label("count"))
.group_by(Post.post_type)
)
return [{"post_type": r.post_type or "image", "count": r.count} for r in rows.all()]
@router.get("/analytics/top-posts")
async def top_posts(limit: int = Query(5, ge=1, le=20)):
"""Most recently published posts — used for 'top content' panel."""
async with AsyncSessionLocal() as session:
rows = await session.execute(
select(Post)
.options(selectinload(Post.music_track))
.where(Post.status == "published")
.order_by(Post.published_at.desc())
.limit(limit)
)
posts = rows.scalars().all()
items = []
for p in posts:
trend_topic = None
if p.trend_id:
tr = await session.get(Trend, p.trend_id)
trend_topic = tr.topic if tr else None
items.append({**_post_dict(p, trend_topic), "char_count": len(p.content or "")})
return {"items": items}
@router.get("/analytics/posting-hours")
async def posting_hours():
"""Hour-of-day breakdown of published posts (0–23) for heatmap."""
async with AsyncSessionLocal() as session:
rows = await session.execute(
select(
func.extract("hour", Post.published_at).label("hour"),
func.count(Post.id).label("count"),
)
.where(Post.published_at.isnot(None))
.group_by(func.extract("hour", Post.published_at))
.order_by(func.extract("hour", Post.published_at))
)
hour_map = {int(r.hour): r.count for r in rows.all()}
return [{"hour": h, "count": hour_map.get(h, 0)} for h in range(24)]
@router.get("/analytics/instagram-engagement")
async def instagram_engagement(limit: int = Query(default=20, le=50)):
"""Aggregate Instagram engagement metrics across recently published posts."""
import httpx, os
from src.config import get_settings as _gs2
token = _gs2().instagram_access_token or os.getenv("INSTAGRAM_ACCESS_TOKEN", "")
if not token:
return {"available": False, "reason": "Instagram token not configured", "posts": [], "totals": {}}
async with AsyncSessionLocal() as session:
rows = await session.execute(
select(Post.id, Post.content, Post.image_url, Post.platform_post_id, Post.published_at)
.where(Post.status == "published", Post.platform_post_id.isnot(None), Post.platform == "instagram")
.order_by(Post.published_at.desc())
.limit(limit)
)
posts = rows.all()
if not posts:
return {"available": True, "posts": [], "totals": {"views": 0, "reach": 0, "likes": 0, "comments": 0, "saved": 0, "shares": 0, "total_interactions": 0}}
async def _fetch(post_row):
try:
async with httpx.AsyncClient(timeout=8) as client:
resp = await client.get(
f"https://graph.facebook.com/v22.0/{post_row.platform_post_id}/insights",
params={"metric": "reach,saved,likes,comments,shares,total_interactions,views", "access_token": token},
)
data = resp.json()
if "error" in data:
return None
metrics = {}
for item in data.get("data", []):
val = item.get("values", [{}])[0].get("value", 0) if item.get("values") else item.get("value", 0)
metrics[item["name"]] = val
return {
"post_id": post_row.id,
"media_id": post_row.platform_post_id,
"content_preview": (post_row.content or "")[:80],
"image_url": post_row.image_url,
"published_at": post_row.published_at.isoformat() if post_row.published_at else None,
**metrics,
}
except Exception:
return None
import asyncio
results = await asyncio.gather(*[_fetch(p) for p in posts])
valid = [r for r in results if r]
totals = {"views": 0, "reach": 0, "likes": 0, "comments": 0, "saved": 0, "shares": 0, "total_interactions": 0}
for r in valid:
for k in totals:
totals[k] += r.get(k, 0)
return {"available": True, "posts": valid, "totals": totals}
@router.get("/{post_id}/similar")
async def similar_posts(
post_id: int,
threshold: float = Query(0.80, ge=0.5, le=1.0),
limit: int = Query(5, ge=1, le=20),
):
"""Find posts semantically similar to a given post."""
async with AsyncSessionLocal() as session:
post = await session.get(Post, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.content_embedding is None:
from src.services.embeddings import get_embedding
emb = await get_embedding(post.content or "")
if not emb:
return {"items": [], "note": "No embedding available for this post"}
else:
emb = list(post.content_embedding)
from src.services.embeddings import find_similar_posts
items = await find_similar_posts(emb, threshold=threshold, limit=limit, exclude_id=post_id)
return {"items": items}
@router.get("/{post_id}")
async def get_post(post_id: int):
"""Fetch a single post by ID — same shape as the list endpoint items."""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Post).where(Post.id == post_id).options(selectinload(Post.music_track))
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
trend_topic = None
if post.trend_id:
t = await session.get(Trend, post.trend_id)
trend_topic = t.topic if t else None
return _post_dict(post, trend_topic, await _get_workflow_id(session, post.id))
_sched_status_cache: dict = {"ts": 0.0, "data": None}
@router.get("/analytics/scheduler-status")
async def scheduler_status():
"""Live scheduler info — cached for 30s to avoid repeated DB queries."""
import time
from src.services.scheduler import scheduler
from src.config import get_settings
now = time.monotonic()
if _sched_status_cache["data"] and now - _sched_status_cache["ts"] < 30:
return _sched_status_cache["data"]
settings = get_settings()
jobs = []
for job in scheduler.get_jobs():
next_run = job.next_run_time
jobs.append({
"id": job.id,
"name": job.name,
"next_run": next_run.isoformat() if next_run else None,
})
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
async with AsyncSessionLocal() as session:
today_count = (await session.execute(
select(func.count(Post.id)).where(Post.created_at >= today_start)
)).scalar()
published_today = (await session.execute(
select(func.count(Post.id))
.where(Post.published_at >= today_start)
.where(Post.status == "published")
)).scalar()
# Include currently-running jobs so UI can show a "scheduler busy" indicator
try:
from src.services.scheduler import get_running_jobs
running_jobs = get_running_jobs()
except Exception:
running_jobs = {}
result = {
"scheduler_running": scheduler.running,
"posts_per_day": settings.posts_per_day,
"auto_approve": settings.auto_approve,
"jobs": jobs,
"today_generated": today_count,
"today_published": published_today,
"running_jobs": running_jobs,
"is_busy": bool(running_jobs),
}
# Don't cache when busy — UI needs frequent updates
if not running_jobs:
_sched_status_cache["data"] = result
_sched_status_cache["ts"] = now
return result
class HashtagSuggestBody(BaseModel):
content: str
topic: str = ""
@router.post("/suggest-hashtags")
async def suggest_hashtags(body: HashtagSuggestBody):
"""Use LLM to suggest 8-10 relevant hashtags for a post."""
from src.agents.post_generator import _build_attempts
import httpx, json as _json
prompt = (
f"Generate 8-10 highly relevant Instagram hashtags for this post.\n"
f"Topic: {body.topic}\n"
f"Caption: {body.content[:400]}\n\n"
f"Rules: Mix popular (#news) and niche (#dailywireup) tags. Each must start with #. "
f"No spaces in tags. Output ONLY a JSON array of strings, e.g. [\"#tag1\",\"#tag2\"]. Nothing else."
)
attempts = await _build_attempts()
for attempt in attempts[:3]: # only try first 3 models
if attempt.get("use_sdk"):
continue
try:
base = (attempt["api_base"] or "").rstrip("/")
if not base.startswith("http"):
base = "https://" + base
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base}/chat/completions",
headers={"Authorization": f"Bearer {attempt['api_key']}", "Content-Type": "application/json"},
json={"model": attempt["model"], "messages": [{"role": "user", "content": prompt}],
"temperature": 0.7, "max_tokens": 200},
)
resp.raise_for_status()
raw = resp.json()["choices"][0]["message"]["content"].strip()
# Parse JSON array
start, end = raw.find("["), raw.rfind("]")
if start != -1 and end != -1:
tags = _json.loads(raw[start:end+1])
tags = [t if t.startswith("#") else f"#{t}" for t in tags if t.strip()]
return {"hashtags": tags[:10]}
except Exception:
continue
return {"hashtags": []}