"""Posts router — list, preview, approve, reject, delete.""" import asyncio from fastapi import APIRouter, Query, HTTPException from loguru import logger from pydantic import BaseModel from sqlalchemy import select, func, delete, cast, Date, or_ from sqlalchemy.orm import selectinload from src.utils.database import AsyncSessionLocal from src.models.database import Post, Trend, WorkflowExecution, Notification from typing import Optional, List from datetime import datetime, timedelta router = APIRouter() # In-memory registry of active generation tasks: post_id -> asyncio.Task _generation_tasks: dict = {} def _music_snippet(p: Post) -> Optional[dict]: mt = getattr(p, "music_track", None) if not mt: return None mins, secs = divmod(mt.duration_secs or 0, 60) return { "id": mt.id, "title": mt.title, "artist": mt.artist, "mood": mt.mood, "duration_label": f"{mins}:{secs:02d}", "preview_url": mt.preview_url, } def _post_dict(p: Post, trend_topic: Optional[str] = None, workflow_id: Optional[str] = None) -> dict: return { "id": p.id, "trend_id": p.trend_id, "trend_topic": trend_topic, "workflow_id": workflow_id, "content": p.content, "post_type": getattr(p, "post_type", "image") or "image", "image_url": p.image_url, "carousel_urls": getattr(p, "carousel_urls", None), "video_url": getattr(p, "video_url", None), "link_url": getattr(p, "link_url", None), "link_title": getattr(p, "link_title", None), "link_description": getattr(p, "link_description", None), "platform": p.platform, "status": p.status, "display_status": "scheduled" if (p.status == "draft" and p.approval_status == "approved" and p.scheduled_at) else p.status, "approval_status": p.approval_status, "platform_post_id": p.platform_post_id, "scheduled_at": p.scheduled_at, "published_at": p.published_at, "created_at": p.created_at, "updated_at": p.updated_at, "generation_cost": p.generation_cost, "token_count": p.token_count, "hallucination_score": getattr(p, "hallucination_score", None), "generation_error": getattr(p, "generation_error", None), "music_id": getattr(p, "music_id", None), "music_track": _music_snippet(p), "quality_score": getattr(p, "quality_score", None), "quality_breakdown": getattr(p, "quality_breakdown", None), "ig_likes": getattr(p, "ig_likes", None), "ig_comments": getattr(p, "ig_comments", None), "ig_reach": getattr(p, "ig_reach", None), "ig_saves": getattr(p, "ig_saves", None), "ig_shares": getattr(p, "ig_shares", None), "engagement_fetched_at": getattr(p, "engagement_fetched_at", None), } async def _auto_attach_music(post_id: int, content: str, topic: str) -> None: """Detect mood from post content, find/import a matching track, attach to post.""" from src.api.routes.music import _detect_moods, _jamendo_to_track, JAMENDO_MOOD_TAGS from src.models.database import MusicTrack from src.config import get_settings import httpx combined = f"{content} {topic}" moods = _detect_moods(combined) async with AsyncSessionLocal() as session: # 1. Try local library first — pick randomly among mood-matched tracks import random as _rnd track = None for mood in moods: result = await session.execute( select(MusicTrack) .where(MusicTrack.is_active == True, MusicTrack.mood == mood) ) candidates = result.scalars().all() if candidates: track = _rnd.choice(candidates) break # 2. No local match — search Jamendo, pick random from top 10 if not track: mood = moods[0] client_id = get_settings().jamendo_client_id if client_id: try: tags = JAMENDO_MOOD_TAGS.get(mood, "upbeat") async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( "https://api.jamendo.com/v3.0/tracks/", params={ "client_id": client_id, "format": "json", "limit": 10, "include": "musicinfo", "fuzzytags": tags, }, ) resp.raise_for_status() results = resp.json().get("results", []) if results: t = _jamendo_to_track(_rnd.choice(results)) track = MusicTrack( title=t["title"], artist=t.get("artist"), genre=t.get("genre"), mood=mood, bpm=t.get("bpm"), duration_secs=t.get("duration_secs"), preview_url=t.get("preview_url"), cover_url=t.get("cover_url"), tags=t.get("tags", []), ) session.add(track) await session.flush() # get track.id before commit except Exception as e: logger.warning(f"[AutoMusic] Jamendo fetch failed: {e}") if not track: return # 3. Attach to post post = await session.get(Post, post_id) if post: post.music_id = track.id await session.commit() logger.info(f"[AutoMusic] Attached track '{track.title}' (id={track.id}) to post {post_id}") async def _get_workflow_id(session, post_id: int) -> Optional[str]: """Look up the most recent workflow for a given post.""" result = await session.execute( select(WorkflowExecution.workflow_id) .where(WorkflowExecution.post_id == post_id) .order_by(WorkflowExecution.started_at.desc()) .limit(1) ) return result.scalar_one_or_none() async def _get_admin_user_id(session) -> Optional[int]: """Return the first admin user's DB id for notification targeting.""" from src.models.database import User result = await session.execute(select(User).where(User.is_active == True).limit(1)) user = result.scalar_one_or_none() return user.id if user else None async def _create_notification( session, notification_type: str, title: str, message: str, payload: Optional[dict] = None, ) -> None: user_id = await _get_admin_user_id(session) session.add( Notification( user_id=user_id, type=notification_type, title=title, message=message, payload=payload or {}, ) ) # ── List ─────────────────────────────────────────────────────────────────────── @router.get("/") async def list_posts( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), status: Optional[str] = None, platform: Optional[str] = None, post_type: Optional[str] = None, ): async with AsyncSessionLocal() as session: q = select(Post) if status: q = q.where(Post.status == status) if platform: q = q.where(Post.platform == platform) if post_type: q = q.where(Post.post_type == post_type) q = q.options(selectinload(Post.music_track)).order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size) count_q = select(func.count(Post.id)) if status: count_q = count_q.where(Post.status == status) if platform: count_q = count_q.where(Post.platform == platform) if post_type: count_q = count_q.where(Post.post_type == post_type) # SQLAlchemy async sessions don't allow concurrent operations on the same session results = await session.execute(q) total_row = await session.execute(count_q) all_total_row = await session.execute(select(func.count(Post.id))) total = total_row.scalar() all_total = all_total_row.scalar() posts = results.scalars().all() # Bulk-fetch trend topics in one query trend_ids = list({p.trend_id for p in posts if p.trend_id}) trend_map: dict = {} if trend_ids: tr_rows = (await session.execute( select(Trend.id, Trend.topic).where(Trend.id.in_(trend_ids)) )).all() trend_map = {r.id: r.topic for r in tr_rows} # Bulk-fetch workflow IDs in one query post_ids = [p.id for p in posts] wf_map: dict = {} if post_ids: wf_rows = (await session.execute( select(WorkflowExecution.post_id, WorkflowExecution.workflow_id) .where(WorkflowExecution.post_id.in_(post_ids)) .distinct(WorkflowExecution.post_id) .order_by(WorkflowExecution.post_id, WorkflowExecution.started_at.desc()) )).all() for row in wf_rows: if row.post_id not in wf_map: wf_map[row.post_id] = row.workflow_id items = [ _post_dict(p, trend_map.get(p.trend_id), wf_map.get(p.id)) for p in posts ] return {"total": total, "all_total": all_total, "page": page, "page_size": page_size, "items": items} @router.get("/pending-approval") async def pending_posts( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), ): async with AsyncSessionLocal() as session: q = select(Post).options(selectinload(Post.music_track)).where(Post.status == "draft").order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size) results = await session.execute(q) total_row = await session.execute(select(func.count(Post.id)).where(Post.status == "draft")) posts = results.scalars().all() trend_ids = list({p.trend_id for p in posts if p.trend_id}) trend_map: dict = {} if trend_ids: tr_rows = (await session.execute(select(Trend.id, Trend.topic).where(Trend.id.in_(trend_ids)))).all() trend_map = {r.id: r.topic for r in tr_rows} post_ids = [p.id for p in posts] wf_map: dict = {} if post_ids: wf_rows = (await session.execute( select(WorkflowExecution.post_id, WorkflowExecution.workflow_id) .where(WorkflowExecution.post_id.in_(post_ids)) .distinct(WorkflowExecution.post_id) .order_by(WorkflowExecution.post_id, WorkflowExecution.started_at.desc()) )).all() for row in wf_rows: if row.post_id not in wf_map: wf_map[row.post_id] = row.workflow_id items = [_post_dict(p, trend_map.get(p.trend_id), wf_map.get(p.id)) for p in posts] return {"total": total_row.scalar(), "page": page, "page_size": page_size, "items": items} # ── Analytics ───────────────────────────────────────────────────────────────── @router.get("/analytics/summary") async def posts_summary(): """Overall stats for the dashboard.""" async with AsyncSessionLocal() as session: total = (await session.execute(select(func.count(Post.id)))).scalar() published = ( await session.execute(select(func.count(Post.id)).where(Post.status == "published")) ).scalar() drafts = ( await session.execute(select(func.count(Post.id)).where(Post.status == "draft")) ).scalar() failed = ( await session.execute(select(func.count(Post.id)).where(Post.status == "failed")) ).scalar() by_platform = await session.execute( select(Post.platform, func.count(Post.id).label("count")) .where(Post.status == "published") .group_by(Post.platform) ) total_cost = ( await session.execute( select(func.sum(Post.generation_cost)).where(Post.generation_cost.isnot(None)) ) ).scalar() or 0.0 return { "total_posts": total, "published": published, "drafts_pending": drafts, "failed": failed, "total_cost_usd": round(total_cost, 4), "by_platform": [ {"platform": row.platform, "count": row.count} for row in by_platform.all() ], } # ── Preview ─────────────────────────────────────────────────────────────────── @router.get("/calendar") async def calendar_posts( start: datetime = Query(...), end: datetime = Query(...), limit: int = Query(default=200, le=500), ): """Posts that belong in the calendar for a date range.""" start = start.replace(tzinfo=None) end = end.replace(tzinfo=None) async with AsyncSessionLocal() as session: rows = await session.execute( select(Post) .options(selectinload(Post.music_track)) .where( or_( Post.scheduled_at.between(start, end), Post.published_at.between(start, end), Post.created_at.between(start, end), ) ) .order_by(Post.created_at.asc()) .limit(limit) ) posts = rows.scalars().all() # batch-load trends to avoid N+1 trend_ids = {p.trend_id for p in posts if p.trend_id} trend_map: dict = {} if trend_ids: t_rows = await session.execute(select(Trend).where(Trend.id.in_(trend_ids))) trend_map = {t.id: t.topic for t in t_rows.scalars().all()} items = [] for post in posts: trend_topic = trend_map.get(post.trend_id) if post.trend_id else None item = _post_dict(post, trend_topic, await _get_workflow_id(session, post.id)) item["calendar_at"] = post.scheduled_at or post.published_at or post.created_at items.append(item) return {"items": items, "start": start, "end": end, "total": len(items)} class PostEditBody(BaseModel): content: Optional[str] = None platform: Optional[str] = None post_type: Optional[str] = None scheduled_at: Optional[datetime] = None image_url: Optional[str] = None link_url: Optional[str] = None link_title: Optional[str] = None link_description: Optional[str] = None @router.put("/{post_id}") async def update_post(post_id: int, body: PostEditBody): """Edit a draft or failed post before approval/publish.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status == "published": raise HTTPException(status_code=400, detail="Published posts cannot be edited") fields_set = getattr(body, "model_fields_set", getattr(body, "__fields_set__", set())) for field in ( "content", "platform", "post_type", "scheduled_at", "image_url", "link_url", "link_title", "link_description", ): if field in fields_set: setattr(post, field, getattr(body, field)) post.updated_at = datetime.utcnow() await session.commit() await session.refresh(post) trend_topic = None if post.trend_id: trend = await session.get(Trend, post.trend_id) trend_topic = trend.topic if trend else None return _post_dict(post, trend_topic, await _get_workflow_id(session, post.id)) @router.get("/{post_id}/preview") async def preview_post(post_id: int): """Return post content + image URL for preview before approving.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") trend_topic = None if post.trend_id: tr = await session.get(Trend, post.trend_id) trend_topic = tr.topic if tr else None workflow_id = await _get_workflow_id(session, post.id) return { "post_id": post.id, "trend_topic": trend_topic, "workflow_id": workflow_id, "caption": post.content, "image_url": post.image_url, "platform": post.platform, "status": post.status, "character_count": len(post.content) if post.content else 0, "hashtags": [w for w in (post.content or "").split() if w.startswith("#")], } # ── Approve / Reject ───────────────────────────────────────────────────────── class ApproveBody(BaseModel): publish_mode: str = "post" # "post" | "reel" @router.post("/{post_id}/approve") async def approve_post(post_id: int, body: ApproveBody = ApproveBody()): """Approve a post — signals Temporal if workflow exists, else updates DB directly.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status == "published": return {"message": "Already published", "post_id": post_id} workflow_id = await _get_workflow_id(session, post.id) # Try to signal Temporal workflow if workflow_id: try: from src.config import get_settings from temporalio.client import Client settings = get_settings() client = await Client.connect( f"{settings.temporal_host}:{settings.temporal_port}", namespace=settings.temporal_namespace, ) handle = client.get_workflow_handle(workflow_id) await handle.signal("approved") return {"message": "Approved via Temporal", "workflow_id": workflow_id} except Exception: pass # Temporal not running — fall through to direct DB update # Direct publish — call Instagram API then update DB async with AsyncSessionLocal() as session: post = (await session.execute( select(Post).where(Post.id == post_id).options(selectinload(Post.music_track)) )).scalar_one_or_none() content = post.content image_url = post.image_url carousel_urls = getattr(post, "carousel_urls", None) or [] post_type = post.post_type or "text_image" platform = post.platform or "instagram" music_url = post.music_track.preview_url if post.music_track else None # Auto-improve content quality before publishing try: from src.services.post_improver import auto_improve content, improved_score = await auto_improve(content, post_type) # Persist improved content + updated score back to DB async with AsyncSessionLocal() as session: post_upd = await session.get(Post, post_id) if post_upd: post_upd.content = content post_upd.quality_score = improved_score["score"] post_upd.quality_breakdown = improved_score["breakdown"] await session.commit() except Exception as e: logger.warning(f"Auto-improve skipped: {e}") # Normalize hashtags in caption — ensure every tag word starts with # import re as _re def _normalize_caption_hashtags(text: str) -> str: def fix_tag_line(m): words = m.group(0).split() return " ".join(w if w.startswith("#") else f"#{w}" for w in words) return _re.sub(r"^(?:#?[A-Za-z][A-Za-z0-9]{2,}\s*){3,}$", fix_tag_line, text, flags=_re.MULTILINE) content = _normalize_caption_hashtags(content) # Inject auto_hashtags from brand voice (e.g. #dailywireup #news) try: from src.models.database import BrandVoice as _BV async with AsyncSessionLocal() as _s: _bv = (await _s.execute( select(_BV).where(_BV.is_active == True).limit(1) )).scalar_one_or_none() if _bv and _bv.auto_hashtags: extra = _bv.auto_hashtags.strip() if extra and extra not in content: content = content.rstrip() + "\n" + extra except Exception: pass platform_post_id = None publish_error = None try: from src.services.social_media import SocialMediaPublisher publisher = SocialMediaPublisher() # For carousel posts send all slide images; otherwise single image if post_type == "carousel" and carousel_urls: publish_images = carousel_urls elif image_url: publish_images = [image_url] else: publish_images = [] # If user chose "reel", force reel publishing regardless of post_type effective_type = "carousel" if body.publish_mode == "reel" else post_type effective_music = music_url if body.publish_mode == "reel" else (music_url if post_type == "carousel" else None) result = await publisher.publish_post( platform=platform, text=content, image_urls=publish_images, post_type=effective_type, music_url=effective_music, ) if result["success"]: platform_post_id = result.get("platform_post_id") else: publish_error = result.get("error") logger.warning(f"Publish failed for post {post_id}: {publish_error}") except Exception as e: publish_error = str(e) logger.error(f"Publish exception for post {post_id}: {e}") async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) post.approval_status = "approved" if platform_post_id: post.status = "published" post.published_at = datetime.utcnow() post.generation_error = None post.platform_post_id = platform_post_id else: post.status = "failed" post.generation_error = publish_error or "Publish failed" await _create_notification( session, "publish_failed", f"Post #{post_id} failed to publish", post.generation_error, {"post_id": post_id, "platform": platform}, ) await session.commit() if platform_post_id: return {"message": "Approved and published to Instagram", "post_id": post_id, "platform_post_id": platform_post_id} return {"message": "Approved but publish failed", "post_id": post_id, "error": publish_error} @router.post("/{post_id}/reject") async def reject_post(post_id: int): """Reject a post — signals Temporal if workflow exists, else updates DB directly.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") workflow_id = await _get_workflow_id(session, post.id) # Try to signal Temporal workflow if workflow_id: try: from src.config import get_settings from temporalio.client import Client settings = get_settings() client = await Client.connect( f"{settings.temporal_host}:{settings.temporal_port}", namespace=settings.temporal_namespace, ) handle = client.get_workflow_handle(workflow_id) await handle.signal("rejected") return {"message": "Rejected via Temporal", "workflow_id": workflow_id} except Exception: pass # Temporal not running — fall through to direct DB update # Direct DB update async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) post.approval_status = "rejected" post.status = "failed" await session.commit() return {"message": "Rejected directly", "post_id": post_id} # ── Delete ──────────────────────────────────────────────────────────────────── @router.post("/{post_id}/regenerate") async def regenerate_post(post_id: int): """Re-run the generation pipeline for a failed or draft post using the same trend.""" import asyncio from src.temporal_workflows.pipeline import ( generate_post_content, generate_images, store_media_to_cdn, save_post_draft, GeneratePostInput, GenerateImagesInput, StoreMediaInput, SavePostDraftInput, TrendData, ) async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status == "published": raise HTTPException(status_code=400, detail="Cannot regenerate a published post") trend_id = post.trend_id platform = post.platform or "instagram" post_type = getattr(post, "post_type", "image") or "image" trend_row = await session.get(Trend, trend_id) if trend_id else None if not trend_row: raise HTTPException(status_code=404, detail="Original trend not found — cannot regenerate") trend_data = TrendData( trend_id=trend_row.id, topic=trend_row.topic, source=trend_row.source, score=trend_row.score or 0.0, raw_data=trend_row.raw_data, ) # Reset post to generating state immediately async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) post.status = "generating" post.content = f"Regenerating post about: {trend_data.topic}" post.image_url = None post.generation_error = None await session.commit() async def _run(): try: post_data = await generate_post_content(GeneratePostInput(trend=trend_data, agent_version_id=1)) effective_type = post_type or post_data.post_type images = await generate_images(GenerateImagesInput( topic=trend_data.topic, count=1, image_prompt=post_data.image_prompt, post_type=effective_type, carousel_image_prompts=post_data.carousel_image_prompts if effective_type == "carousel" else None, )) cdn_images = await store_media_to_cdn(StoreMediaInput(images=images, post_id=post_id)) await save_post_draft(SavePostDraftInput( post=post_data, trend_id=trend_id, agent_version_id=1, images=cdn_images, platform=platform, post_type=post_type, placeholder_id=post_id, )) logger.info(f"[Regenerate] Post {post_id} regenerated successfully") except Exception as e: logger.error(f"[Regenerate] Failed for post {post_id}: {e}") async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if post: post.status = "failed" post.content = f"Regeneration failed: {e}" post.generation_error = str(e) await _create_notification( session, "generation_failed", f"Post #{post_id} regeneration failed", str(e), {"post_id": post_id}, ) await session.commit() asyncio.create_task(_run()) return {"message": "Regeneration started", "post_id": post_id} @router.post("/{post_id}/regenerate-image") async def regenerate_image(post_id: int): """Re-generate only the image for a post, keeping the caption unchanged.""" import asyncio from src.temporal_workflows.pipeline import ( generate_images, store_media_to_cdn, GenerateImagesInput, StoreMediaInput, TrendData, ) async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status == "published": raise HTTPException(status_code=400, detail="Cannot regenerate image on a published post") trend_row = await session.get(Trend, post.trend_id) if post.trend_id else None topic = trend_row.topic if trend_row else (post.content or "")[:80] post_type = getattr(post, "post_type", "image") or "image" async def _run_image(): try: images = await generate_images(GenerateImagesInput( topic=topic, count=1, image_prompt=None, post_type=post_type, )) cdn = await store_media_to_cdn(StoreMediaInput(images=images, post_id=post_id)) new_url = cdn.get("image_url") or cdn.get("cover_url") if new_url: async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if post: post.image_url = new_url post.updated_at = datetime.utcnow() await session.commit() logger.info(f"[RegenerateImage] Post {post_id} new image: {new_url}") except Exception as e: logger.error(f"[RegenerateImage] Failed for post {post_id}: {e}") asyncio.create_task(_run_image()) return {"message": "Image regeneration started", "post_id": post_id} @router.get("/{post_id}/insights") async def post_insights(post_id: int): """Fetch Instagram media insights for a published post.""" import httpx, os async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") media_id = post.platform_post_id platform = post.platform or "instagram" if not media_id: return {"available": False, "reason": "Post not published or missing platform ID"} if platform.lower() != "instagram": return {"available": False, "reason": f"Insights only available for Instagram (this is {platform})"} from src.config import get_settings as _gs token = _gs().instagram_access_token or os.getenv("INSTAGRAM_ACCESS_TOKEN", "") if not token: return {"available": False, "reason": "Instagram token not configured"} try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"https://graph.facebook.com/v22.0/{media_id}/insights", params={ "metric": "reach,saved,likes,comments,shares,total_interactions,views", "access_token": token, }, ) data = resp.json() if "error" in data: return {"available": False, "reason": data["error"].get("message", "API error")} metrics = {} for item in data.get("data", []): val = item.get("values", [{}])[0].get("value", 0) if item.get("values") else item.get("value", 0) metrics[item["name"]] = val return {"available": True, "media_id": media_id, "metrics": metrics} except Exception as e: return {"available": False, "reason": str(e)} class SchedulePostBody(BaseModel): scheduled_at: datetime @router.post("/{post_id}/schedule") @router.put("/{post_id}/schedule") async def schedule_post(post_id: int, body: SchedulePostBody): """Schedule or reschedule a post (POST = new schedule, PUT = drag-and-drop reschedule).""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status == "published": raise HTTPException(status_code=400, detail="Post already published") scheduled_at = body.scheduled_at.replace(tzinfo=None) post.scheduled_at = scheduled_at post.approval_status = "approved" post.updated_at = datetime.utcnow() await session.commit() return {"message": "Post scheduled", "post_id": post_id, "scheduled_at": scheduled_at.isoformat()} @router.post("/{post_id}/discard") async def discard_post(post_id: int): """Discard a draft post — hard deletes it so it never gets published.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status not in ("draft",): raise HTTPException(status_code=400, detail=f"Cannot discard a post with status '{post.status}'") await session.delete(post) await session.commit() return {"discarded": True, "post_id": post_id} @router.delete("/{post_id}") async def delete_post(post_id: int): """Hard delete a single post by ID.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") await session.delete(post) await session.commit() return {"deleted": 1} class GeneratePostBody(BaseModel): trend_id: Optional[int] = None # if None, picks the top pending trend platform: str = "instagram" post_type: str = "image" @router.post("/generate") async def generate_post(body: GeneratePostBody): """Run the full generation pipeline for one trend without Temporal.""" import asyncio from src.temporal_workflows.pipeline import ( generate_post_content, generate_images, store_media_to_cdn, save_post_draft, GeneratePostInput, GenerateImagesInput, StoreMediaInput, SavePostDraftInput, TrendData, ) # Pick trend async with AsyncSessionLocal() as session: if body.trend_id: trend_row = await session.get(Trend, body.trend_id) if not trend_row: raise HTTPException(status_code=404, detail="Trend not found") else: from sqlalchemy import func as sqlfunc from datetime import datetime as dt hours_old = sqlfunc.greatest( sqlfunc.extract("epoch", dt.utcnow() - Trend.created_at) / 3600.0, 0.01 ) boosted = Trend.score / (1.0 + hours_old / 12.0) result = await session.execute( select(Trend) .where(Trend.status == "pending_generation") .order_by(boosted.desc()) .limit(1) ) trend_row = result.scalar_one_or_none() if not trend_row: raise HTTPException(status_code=404, detail="No pending trends available") trend_id = trend_row.id trend_data = TrendData( trend_id=trend_row.id, topic=trend_row.topic, source=trend_row.source, score=trend_row.score or 0.0, raw_data=trend_row.raw_data, ) # ── Duplicate check: find semantically similar existing posts ────────── from src.services.embeddings import get_embedding, find_similar_posts trend_embedding = await get_embedding(trend_data.topic) duplicate_warning = None if trend_embedding: similar = await find_similar_posts(trend_embedding, threshold=0.88, limit=1) if similar: duplicate_warning = { "post_id": similar[0]["id"], "similarity": round(float(similar[0]["similarity"]) * 100), "content_preview": (similar[0]["content"] or "")[:80], } # ── Insert placeholder row immediately so the UI can track progress ── async with AsyncSessionLocal() as session: placeholder = Post( trend_id=trend_id, agent_version_id=1, content=f"Generating post about: {trend_data.topic}", post_type=body.post_type, platform=body.platform, status="generating", approval_status="pending", generation_error=None, ) session.add(placeholder) await session.commit() await session.refresh(placeholder) placeholder_id = placeholder.id async def _run_pipeline(): from src.agents.post_generator import set_post_step try: set_post_step(placeholder_id, "Analysing trend...") post_data = await generate_post_content(GeneratePostInput(trend=trend_data, agent_version_id=1, post_type=body.post_type, post_id=placeholder_id)) set_post_step(placeholder_id, "Generating image(s)...") carousel_prompts = post_data.carousel_image_prompts if body.post_type == "carousel" else None img_count = len(carousel_prompts) if carousel_prompts else (6 if body.post_type == "carousel" else 1) images = await generate_images(GenerateImagesInput( topic=trend_data.topic, count=img_count, image_prompt=post_data.image_prompt, post_type=body.post_type, carousel_image_prompts=carousel_prompts, post_id=placeholder_id, )) set_post_step(placeholder_id, "Storing media...") cdn_images = await store_media_to_cdn(StoreMediaInput(images=images, post_id=placeholder_id)) saved = await save_post_draft(SavePostDraftInput( post=post_data, trend_id=trend_id, agent_version_id=1, images=cdn_images, platform=body.platform, post_type=body.post_type, placeholder_id=placeholder_id, token_count=post_data.token_count, generation_cost=post_data.generation_cost, hallucination_score=post_data.hallucination_score, )) # Mark trend as used async with AsyncSessionLocal() as session: t = await session.get(Trend, trend_id) if t: t.status = "generated" t.used_for_post = True await session.commit() set_post_step(placeholder_id, "Matching music to topic mood...") # Auto-attach music based on post content + trend topic await _auto_attach_music(placeholder_id, post_data.content, trend_data.topic) set_post_step(placeholder_id, "Scoring quality — hook, CTA, hashtags, brand fit...") # Compute quality score, auto-improve if weak try: from src.services.quality_scorer import score_post from src.services.post_improver import auto_improve from src.models.database import BrandVoice async with AsyncSessionLocal() as session: brand = await session.execute( select(BrandVoice).where(BrandVoice.is_active == True).limit(1) ) brand_voice = brand.scalar_one_or_none() initial = score_post(post_data.content, body.post_type, brand_voice, body.platform) set_post_step(placeholder_id, f"Initial score: {initial['score']}/100 ({initial['grade']})") logger.info(f"[Quality] Post {placeholder_id} initial score {initial['score']}/100 ({initial['grade']})") final_content = post_data.content final_result = initial if initial["score"] < 70: set_post_step(placeholder_id, f"Score {initial['score']}/100 — auto-improving...") final_content, final_result = await auto_improve(post_data.content, body.post_type, brand_voice, post_id=placeholder_id) post = await session.get(Post, placeholder_id) if post: post.content = final_content post.quality_score = final_result["score"] post.quality_breakdown = final_result["breakdown"] await session.commit() logger.info(f"[Quality] Post {placeholder_id} final score {final_result['score']}/100 ({final_result['grade']})") except Exception as qe: logger.warning(f"[Quality] Scoring failed for post {placeholder_id}: {qe}") logger.info(f"[Generate] Post {placeholder_id} generation complete") # Budget alert check try: from src.config import get_settings as _gs _budget = _gs().monthly_budget_usd if _budget and _budget > 0: from datetime import datetime as _dt _month = _dt.utcnow().strftime("%Y-%m") async with AsyncSessionLocal() as _bs: _spent = (await _bs.execute( select(func.sum(Post.generation_cost)) .where(Post.generation_cost.isnot(None)) .where(func.to_char(Post.created_at, "YYYY-MM") == _month) )).scalar() or 0.0 if _spent >= _budget: _exists = (await _bs.execute( select(Notification).where( Notification.type == "budget_exceeded", ).order_by(Notification.created_at.desc()).limit(1) )).scalar_one_or_none() if not _exists: _bs.add(Notification( type="budget_exceeded", title="Monthly budget exceeded", message=f"Generation cost this month: ${_spent:.4f} — over your ${_budget:.2f} limit.", is_read=False, )) await _bs.commit() logger.warning(f"[Budget] Monthly spend ${_spent:.4f} exceeded limit ${_budget:.2f}") except Exception: pass # Clear step tracker so the UI stops showing "Scoring quality..." from src.agents.post_generator import _post_steps _post_steps.pop(placeholder_id, None) # Force WS push in case PG NOTIFY isn't reaching us (Neon pooled connections) from src.utils.notify import mark_dirty mark_dirty("posts_changed") except Exception as e: logger.error(f"[Generate] Pipeline failed for placeholder {placeholder_id}: {e}") async with AsyncSessionLocal() as session: post = await session.get(Post, placeholder_id) if post: post.status = "failed" post.content = f"Generation failed: {e}" post.generation_error = str(e) await _create_notification( session, "generation_failed", f"Post #{placeholder_id} generation failed", str(e), {"post_id": placeholder_id, "trend_id": trend_id}, ) await session.commit() from src.agents.post_generator import _post_steps _post_steps.pop(placeholder_id, None) from src.utils.notify import mark_dirty mark_dirty("posts_changed") # Fire pipeline in background — return immediately with placeholder id task = asyncio.create_task(_run_pipeline()) _generation_tasks[placeholder_id] = task task.add_done_callback(lambda _: _generation_tasks.pop(placeholder_id, None)) return { "message": "Generation started", "post_id": placeholder_id, "trend_id": trend_id, "topic": trend_data.topic, "platform": body.platform, "duplicate_warning": duplicate_warning, } @router.post("/{post_id}/cancel-generation") async def cancel_generation(post_id: int): """Cancel an in-progress post generation and mark the post as discarded.""" task = _generation_tasks.get(post_id) if task and not task.done(): task.cancel() async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.status not in ("generating",): raise HTTPException(status_code=400, detail=f"Post is not generating (status: {post.status})") post.status = "failed" post.generation_error = "Cancelled by user" await session.commit() return {"post_id": post_id, "cancelled": True} class ManualPostBody(BaseModel): content: str platform: str = "instagram" post_type: str = "text_only" image_url: Optional[str] = None link_url: Optional[str] = None link_title: Optional[str] = None link_description: Optional[str] = None @router.post("/manual") async def create_manual_post(body: ManualPostBody): """Create a post manually — no AI generation, saves directly as draft.""" async with AsyncSessionLocal() as session: post = Post( content=body.content, platform=body.platform, post_type=body.post_type, image_url=body.image_url or None, link_url=body.link_url or None, link_title=body.link_title or None, link_description=body.link_description or None, status="draft", approval_status="pending", ) session.add(post) await session.commit() await session.refresh(post) return _post_dict(post) class MusicAttachBody(BaseModel): music_id: Optional[int] = None @router.patch("/{post_id}/music") async def attach_music(post_id: int, body: MusicAttachBody): """Attach or detach a music track from a post.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") post.music_id = body.music_id post.updated_at = datetime.utcnow() await session.commit() return {"post_id": post_id, "music_id": body.music_id} class BulkIdsBody(BaseModel): ids: List[int] @router.post("/bulk-approve") async def bulk_approve_posts(body: BulkIdsBody): """Queue multiple draft posts for immediate publish.""" import asyncio async def _approve(post_id: int): async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post or post.status not in ("draft", "failed"): return {"post_id": post_id, "ok": False, "error": "Invalid status"} try: from src.services.scheduler import run_auto_publish await run_auto_publish(post_id) return {"post_id": post_id, "ok": True} except Exception as e: return {"post_id": post_id, "ok": False, "error": str(e)} results = await asyncio.gather(*[_approve(pid) for pid in body.ids]) return {"results": list(results), "approved": sum(1 for r in results if r["ok"])} class BulkScheduleBody(BaseModel): ids: List[int] scheduled_at: datetime @router.post("/bulk-schedule") async def bulk_schedule_posts(body: BulkScheduleBody): """Approve + schedule multiple draft posts for the same time slot.""" if not body.ids: return {"scheduled": 0} scheduled_at = body.scheduled_at.replace(tzinfo=None) async with AsyncSessionLocal() as session: rows = await session.execute(select(Post).where(Post.id.in_(body.ids))) posts = rows.scalars().all() count = 0 for p in posts: if p.status == "draft": p.approval_status = "approved" p.scheduled_at = scheduled_at count += 1 await session.commit() return {"scheduled": count, "scheduled_at": scheduled_at.isoformat()} @router.post("/bulk-reject") async def bulk_reject_posts(body: BulkIdsBody): """Reject multiple draft posts.""" if not body.ids: return {"rejected": 0} async with AsyncSessionLocal() as session: rows = await session.execute(select(Post).where(Post.id.in_(body.ids))) posts = rows.scalars().all() for p in posts: if p.status == "draft": p.approval_status = "rejected" p.status = "failed" await session.commit() return {"rejected": len(body.ids)} @router.post("/bulk-delete") async def bulk_delete_posts(body: BulkIdsBody): """Hard delete multiple posts by ID list.""" if not body.ids: return {"deleted": 0} async with AsyncSessionLocal() as session: result = await session.execute( delete(Post).where(Post.id.in_(body.ids)) ) await session.commit() return {"deleted": result.rowcount} # ── Extended Analytics ───────────────────────────────────────────────────────── @router.get("/analytics/timeseries") async def posts_timeseries(days: int = Query(30, ge=7, le=365)): """Daily post counts for the last N days — published + drafted.""" cutoff = datetime.utcnow() - timedelta(days=days) async with AsyncSessionLocal() as session: rows = await session.execute( select( cast(Post.created_at, Date).label("day"), Post.status, func.count(Post.id).label("count"), ) .where(Post.created_at >= cutoff) .group_by(cast(Post.created_at, Date), Post.status) .order_by(cast(Post.created_at, Date)) ) data: dict = {} for row in rows.all(): day = str(row.day) if day not in data: data[day] = {"date": day, "published": 0, "draft": 0, "failed": 0} key = row.status if row.status in ("published", "draft", "failed") else "draft" data[day][key] += row.count return {"days": days, "series": list(data.values())} @router.get("/analytics/by-type") async def posts_by_type(): """Breakdown of post counts by post_type.""" async with AsyncSessionLocal() as session: rows = await session.execute( select(Post.post_type, func.count(Post.id).label("count")) .group_by(Post.post_type) ) return [{"post_type": r.post_type or "image", "count": r.count} for r in rows.all()] @router.get("/analytics/top-posts") async def top_posts(limit: int = Query(5, ge=1, le=20)): """Most recently published posts — used for 'top content' panel.""" async with AsyncSessionLocal() as session: rows = await session.execute( select(Post) .options(selectinload(Post.music_track)) .where(Post.status == "published") .order_by(Post.published_at.desc()) .limit(limit) ) posts = rows.scalars().all() items = [] for p in posts: trend_topic = None if p.trend_id: tr = await session.get(Trend, p.trend_id) trend_topic = tr.topic if tr else None items.append({**_post_dict(p, trend_topic), "char_count": len(p.content or "")}) return {"items": items} @router.get("/analytics/posting-hours") async def posting_hours(): """Hour-of-day breakdown of published posts (0–23) for heatmap.""" async with AsyncSessionLocal() as session: rows = await session.execute( select( func.extract("hour", Post.published_at).label("hour"), func.count(Post.id).label("count"), ) .where(Post.published_at.isnot(None)) .group_by(func.extract("hour", Post.published_at)) .order_by(func.extract("hour", Post.published_at)) ) hour_map = {int(r.hour): r.count for r in rows.all()} return [{"hour": h, "count": hour_map.get(h, 0)} for h in range(24)] @router.get("/analytics/instagram-engagement") async def instagram_engagement(limit: int = Query(default=20, le=50)): """Aggregate Instagram engagement metrics across recently published posts.""" import httpx, os from src.config import get_settings as _gs2 token = _gs2().instagram_access_token or os.getenv("INSTAGRAM_ACCESS_TOKEN", "") if not token: return {"available": False, "reason": "Instagram token not configured", "posts": [], "totals": {}} async with AsyncSessionLocal() as session: rows = await session.execute( select(Post.id, Post.content, Post.image_url, Post.platform_post_id, Post.published_at) .where(Post.status == "published", Post.platform_post_id.isnot(None), Post.platform == "instagram") .order_by(Post.published_at.desc()) .limit(limit) ) posts = rows.all() if not posts: return {"available": True, "posts": [], "totals": {"views": 0, "reach": 0, "likes": 0, "comments": 0, "saved": 0, "shares": 0, "total_interactions": 0}} async def _fetch(post_row): try: async with httpx.AsyncClient(timeout=8) as client: resp = await client.get( f"https://graph.facebook.com/v22.0/{post_row.platform_post_id}/insights", params={"metric": "reach,saved,likes,comments,shares,total_interactions,views", "access_token": token}, ) data = resp.json() if "error" in data: return None metrics = {} for item in data.get("data", []): val = item.get("values", [{}])[0].get("value", 0) if item.get("values") else item.get("value", 0) metrics[item["name"]] = val return { "post_id": post_row.id, "media_id": post_row.platform_post_id, "content_preview": (post_row.content or "")[:80], "image_url": post_row.image_url, "published_at": post_row.published_at.isoformat() if post_row.published_at else None, **metrics, } except Exception: return None import asyncio results = await asyncio.gather(*[_fetch(p) for p in posts]) valid = [r for r in results if r] totals = {"views": 0, "reach": 0, "likes": 0, "comments": 0, "saved": 0, "shares": 0, "total_interactions": 0} for r in valid: for k in totals: totals[k] += r.get(k, 0) return {"available": True, "posts": valid, "totals": totals} @router.get("/{post_id}/similar") async def similar_posts( post_id: int, threshold: float = Query(0.80, ge=0.5, le=1.0), limit: int = Query(5, ge=1, le=20), ): """Find posts semantically similar to a given post.""" async with AsyncSessionLocal() as session: post = await session.get(Post, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") if post.content_embedding is None: from src.services.embeddings import get_embedding emb = await get_embedding(post.content or "") if not emb: return {"items": [], "note": "No embedding available for this post"} else: emb = list(post.content_embedding) from src.services.embeddings import find_similar_posts items = await find_similar_posts(emb, threshold=threshold, limit=limit, exclude_id=post_id) return {"items": items} @router.get("/{post_id}") async def get_post(post_id: int): """Fetch a single post by ID — same shape as the list endpoint items.""" async with AsyncSessionLocal() as session: result = await session.execute( select(Post).where(Post.id == post_id).options(selectinload(Post.music_track)) ) post = result.scalar_one_or_none() if not post: raise HTTPException(status_code=404, detail="Post not found") trend_topic = None if post.trend_id: t = await session.get(Trend, post.trend_id) trend_topic = t.topic if t else None return _post_dict(post, trend_topic, await _get_workflow_id(session, post.id)) _sched_status_cache: dict = {"ts": 0.0, "data": None} @router.get("/analytics/scheduler-status") async def scheduler_status(): """Live scheduler info — cached for 30s to avoid repeated DB queries.""" import time from src.services.scheduler import scheduler from src.config import get_settings now = time.monotonic() if _sched_status_cache["data"] and now - _sched_status_cache["ts"] < 30: return _sched_status_cache["data"] settings = get_settings() jobs = [] for job in scheduler.get_jobs(): next_run = job.next_run_time jobs.append({ "id": job.id, "name": job.name, "next_run": next_run.isoformat() if next_run else None, }) today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) async with AsyncSessionLocal() as session: today_count = (await session.execute( select(func.count(Post.id)).where(Post.created_at >= today_start) )).scalar() published_today = (await session.execute( select(func.count(Post.id)) .where(Post.published_at >= today_start) .where(Post.status == "published") )).scalar() # Include currently-running jobs so UI can show a "scheduler busy" indicator try: from src.services.scheduler import get_running_jobs running_jobs = get_running_jobs() except Exception: running_jobs = {} result = { "scheduler_running": scheduler.running, "posts_per_day": settings.posts_per_day, "auto_approve": settings.auto_approve, "jobs": jobs, "today_generated": today_count, "today_published": published_today, "running_jobs": running_jobs, "is_busy": bool(running_jobs), } # Don't cache when busy — UI needs frequent updates if not running_jobs: _sched_status_cache["data"] = result _sched_status_cache["ts"] = now return result class HashtagSuggestBody(BaseModel): content: str topic: str = "" @router.post("/suggest-hashtags") async def suggest_hashtags(body: HashtagSuggestBody): """Use LLM to suggest 8-10 relevant hashtags for a post.""" from src.agents.post_generator import _build_attempts import httpx, json as _json prompt = ( f"Generate 8-10 highly relevant Instagram hashtags for this post.\n" f"Topic: {body.topic}\n" f"Caption: {body.content[:400]}\n\n" f"Rules: Mix popular (#news) and niche (#dailywireup) tags. Each must start with #. " f"No spaces in tags. Output ONLY a JSON array of strings, e.g. [\"#tag1\",\"#tag2\"]. Nothing else." ) attempts = await _build_attempts() for attempt in attempts[:3]: # only try first 3 models if attempt.get("use_sdk"): continue try: base = (attempt["api_base"] or "").rstrip("/") if not base.startswith("http"): base = "https://" + base async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( f"{base}/chat/completions", headers={"Authorization": f"Bearer {attempt['api_key']}", "Content-Type": "application/json"}, json={"model": attempt["model"], "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 200}, ) resp.raise_for_status() raw = resp.json()["choices"][0]["message"]["content"].strip() # Parse JSON array start, end = raw.find("["), raw.rfind("]") if start != -1 and end != -1: tags = _json.loads(raw[start:end+1]) tags = [t if t.startswith("#") else f"#{t}" for t in tags if t.strip()] return {"hashtags": tags[:10]} except Exception: continue return {"hashtags": []}