Spaces:
Running
Running
| """Posts router — list, preview, approve, reject, delete.""" | |
| import asyncio | |
| from fastapi import APIRouter, Query, HTTPException | |
| from loguru import logger | |
| from pydantic import BaseModel | |
| from sqlalchemy import select, func, delete, cast, Date, or_ | |
| from sqlalchemy.orm import selectinload | |
| from src.utils.database import AsyncSessionLocal | |
| from src.models.database import Post, Trend, WorkflowExecution, Notification | |
| from typing import Optional, List | |
| from datetime import datetime, timedelta | |
| router = APIRouter() | |
| # In-memory registry of active generation tasks: post_id -> asyncio.Task | |
| _generation_tasks: dict = {} | |
| def _music_snippet(p: Post) -> Optional[dict]: | |
| mt = getattr(p, "music_track", None) | |
| if not mt: | |
| return None | |
| mins, secs = divmod(mt.duration_secs or 0, 60) | |
| return { | |
| "id": mt.id, | |
| "title": mt.title, | |
| "artist": mt.artist, | |
| "mood": mt.mood, | |
| "duration_label": f"{mins}:{secs:02d}", | |
| "preview_url": mt.preview_url, | |
| } | |
| def _post_dict(p: Post, trend_topic: Optional[str] = None, workflow_id: Optional[str] = None) -> dict: | |
| return { | |
| "id": p.id, | |
| "trend_id": p.trend_id, | |
| "trend_topic": trend_topic, | |
| "workflow_id": workflow_id, | |
| "content": p.content, | |
| "post_type": getattr(p, "post_type", "image") or "image", | |
| "image_url": p.image_url, | |
| "carousel_urls": getattr(p, "carousel_urls", None), | |
| "video_url": getattr(p, "video_url", None), | |
| "link_url": getattr(p, "link_url", None), | |
| "link_title": getattr(p, "link_title", None), | |
| "link_description": getattr(p, "link_description", None), | |
| "platform": p.platform, | |
| "status": p.status, | |
| "display_status": "scheduled" if (p.status == "draft" and p.approval_status == "approved" and p.scheduled_at) else p.status, | |
| "approval_status": p.approval_status, | |
| "platform_post_id": p.platform_post_id, | |
| "scheduled_at": p.scheduled_at, | |
| "published_at": p.published_at, | |
| "created_at": p.created_at, | |
| "updated_at": p.updated_at, | |
| "generation_cost": p.generation_cost, | |
| "token_count": p.token_count, | |
| "hallucination_score": getattr(p, "hallucination_score", None), | |
| "generation_error": getattr(p, "generation_error", None), | |
| "music_id": getattr(p, "music_id", None), | |
| "music_track": _music_snippet(p), | |
| "quality_score": getattr(p, "quality_score", None), | |
| "quality_breakdown": getattr(p, "quality_breakdown", None), | |
| "ig_likes": getattr(p, "ig_likes", None), | |
| "ig_comments": getattr(p, "ig_comments", None), | |
| "ig_reach": getattr(p, "ig_reach", None), | |
| "ig_saves": getattr(p, "ig_saves", None), | |
| "ig_shares": getattr(p, "ig_shares", None), | |
| "engagement_fetched_at": getattr(p, "engagement_fetched_at", None), | |
| } | |
| async def _auto_attach_music(post_id: int, content: str, topic: str) -> None: | |
| """Detect mood from post content, find/import a matching track, attach to post.""" | |
| from src.api.routes.music import _detect_moods, _jamendo_to_track, JAMENDO_MOOD_TAGS | |
| from src.models.database import MusicTrack | |
| from src.config import get_settings | |
| import httpx | |
| combined = f"{content} {topic}" | |
| moods = _detect_moods(combined) | |
| async with AsyncSessionLocal() as session: | |
| # 1. Try local library first — pick randomly among mood-matched tracks | |
| import random as _rnd | |
| track = None | |
| for mood in moods: | |
| result = await session.execute( | |
| select(MusicTrack) | |
| .where(MusicTrack.is_active == True, MusicTrack.mood == mood) | |
| ) | |
| candidates = result.scalars().all() | |
| if candidates: | |
| track = _rnd.choice(candidates) | |
| break | |
| # 2. No local match — search Jamendo, pick random from top 10 | |
| if not track: | |
| mood = moods[0] | |
| client_id = get_settings().jamendo_client_id | |
| if client_id: | |
| try: | |
| tags = JAMENDO_MOOD_TAGS.get(mood, "upbeat") | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| resp = await client.get( | |
| "https://api.jamendo.com/v3.0/tracks/", | |
| params={ | |
| "client_id": client_id, | |
| "format": "json", | |
| "limit": 10, | |
| "include": "musicinfo", | |
| "fuzzytags": tags, | |
| }, | |
| ) | |
| resp.raise_for_status() | |
| results = resp.json().get("results", []) | |
| if results: | |
| t = _jamendo_to_track(_rnd.choice(results)) | |
| track = MusicTrack( | |
| title=t["title"], | |
| artist=t.get("artist"), | |
| genre=t.get("genre"), | |
| mood=mood, | |
| bpm=t.get("bpm"), | |
| duration_secs=t.get("duration_secs"), | |
| preview_url=t.get("preview_url"), | |
| cover_url=t.get("cover_url"), | |
| tags=t.get("tags", []), | |
| ) | |
| session.add(track) | |
| await session.flush() # get track.id before commit | |
| except Exception as e: | |
| logger.warning(f"[AutoMusic] Jamendo fetch failed: {e}") | |
| if not track: | |
| return | |
| # 3. Attach to post | |
| post = await session.get(Post, post_id) | |
| if post: | |
| post.music_id = track.id | |
| await session.commit() | |
| logger.info(f"[AutoMusic] Attached track '{track.title}' (id={track.id}) to post {post_id}") | |
| async def _get_workflow_id(session, post_id: int) -> Optional[str]: | |
| """Look up the most recent workflow for a given post.""" | |
| result = await session.execute( | |
| select(WorkflowExecution.workflow_id) | |
| .where(WorkflowExecution.post_id == post_id) | |
| .order_by(WorkflowExecution.started_at.desc()) | |
| .limit(1) | |
| ) | |
| return result.scalar_one_or_none() | |
| async def _get_admin_user_id(session) -> Optional[int]: | |
| """Return the first admin user's DB id for notification targeting.""" | |
| from src.models.database import User | |
| result = await session.execute(select(User).where(User.is_active == True).limit(1)) | |
| user = result.scalar_one_or_none() | |
| return user.id if user else None | |
| async def _create_notification( | |
| session, | |
| notification_type: str, | |
| title: str, | |
| message: str, | |
| payload: Optional[dict] = None, | |
| ) -> None: | |
| user_id = await _get_admin_user_id(session) | |
| session.add( | |
| Notification( | |
| user_id=user_id, | |
| type=notification_type, | |
| title=title, | |
| message=message, | |
| payload=payload or {}, | |
| ) | |
| ) | |
| # ── List ─────────────────────────────────────────────────────────────────────── | |
| async def list_posts( | |
| page: int = Query(1, ge=1), | |
| page_size: int = Query(20, ge=1, le=100), | |
| status: Optional[str] = None, | |
| platform: Optional[str] = None, | |
| post_type: Optional[str] = None, | |
| ): | |
| async with AsyncSessionLocal() as session: | |
| q = select(Post) | |
| if status: | |
| q = q.where(Post.status == status) | |
| if platform: | |
| q = q.where(Post.platform == platform) | |
| if post_type: | |
| q = q.where(Post.post_type == post_type) | |
| q = q.options(selectinload(Post.music_track)).order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size) | |
| count_q = select(func.count(Post.id)) | |
| if status: | |
| count_q = count_q.where(Post.status == status) | |
| if platform: | |
| count_q = count_q.where(Post.platform == platform) | |
| if post_type: | |
| count_q = count_q.where(Post.post_type == post_type) | |
| # SQLAlchemy async sessions don't allow concurrent operations on the same session | |
| results = await session.execute(q) | |
| total_row = await session.execute(count_q) | |
| all_total_row = await session.execute(select(func.count(Post.id))) | |
| total = total_row.scalar() | |
| all_total = all_total_row.scalar() | |
| posts = results.scalars().all() | |
| # Bulk-fetch trend topics in one query | |
| trend_ids = list({p.trend_id for p in posts if p.trend_id}) | |
| trend_map: dict = {} | |
| if trend_ids: | |
| tr_rows = (await session.execute( | |
| select(Trend.id, Trend.topic).where(Trend.id.in_(trend_ids)) | |
| )).all() | |
| trend_map = {r.id: r.topic for r in tr_rows} | |
| # Bulk-fetch workflow IDs in one query | |
| post_ids = [p.id for p in posts] | |
| wf_map: dict = {} | |
| if post_ids: | |
| wf_rows = (await session.execute( | |
| select(WorkflowExecution.post_id, WorkflowExecution.workflow_id) | |
| .where(WorkflowExecution.post_id.in_(post_ids)) | |
| .distinct(WorkflowExecution.post_id) | |
| .order_by(WorkflowExecution.post_id, WorkflowExecution.started_at.desc()) | |
| )).all() | |
| for row in wf_rows: | |
| if row.post_id not in wf_map: | |
| wf_map[row.post_id] = row.workflow_id | |
| items = [ | |
| _post_dict(p, trend_map.get(p.trend_id), wf_map.get(p.id)) | |
| for p in posts | |
| ] | |
| return {"total": total, "all_total": all_total, "page": page, "page_size": page_size, "items": items} | |
| async def pending_posts( | |
| page: int = Query(1, ge=1), | |
| page_size: int = Query(20, ge=1, le=100), | |
| ): | |
| async with AsyncSessionLocal() as session: | |
| q = select(Post).options(selectinload(Post.music_track)).where(Post.status == "draft").order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size) | |
| results = await session.execute(q) | |
| total_row = await session.execute(select(func.count(Post.id)).where(Post.status == "draft")) | |
| posts = results.scalars().all() | |
| trend_ids = list({p.trend_id for p in posts if p.trend_id}) | |
| trend_map: dict = {} | |
| if trend_ids: | |
| tr_rows = (await session.execute(select(Trend.id, Trend.topic).where(Trend.id.in_(trend_ids)))).all() | |
| trend_map = {r.id: r.topic for r in tr_rows} | |
| post_ids = [p.id for p in posts] | |
| wf_map: dict = {} | |
| if post_ids: | |
| wf_rows = (await session.execute( | |
| select(WorkflowExecution.post_id, WorkflowExecution.workflow_id) | |
| .where(WorkflowExecution.post_id.in_(post_ids)) | |
| .distinct(WorkflowExecution.post_id) | |
| .order_by(WorkflowExecution.post_id, WorkflowExecution.started_at.desc()) | |
| )).all() | |
| for row in wf_rows: | |
| if row.post_id not in wf_map: | |
| wf_map[row.post_id] = row.workflow_id | |
| items = [_post_dict(p, trend_map.get(p.trend_id), wf_map.get(p.id)) for p in posts] | |
| return {"total": total_row.scalar(), "page": page, "page_size": page_size, "items": items} | |
| # ── Analytics ───────────────────────────────────────────────────────────────── | |
| async def posts_summary(): | |
| """Overall stats for the dashboard.""" | |
| async with AsyncSessionLocal() as session: | |
| total = (await session.execute(select(func.count(Post.id)))).scalar() | |
| published = ( | |
| await session.execute(select(func.count(Post.id)).where(Post.status == "published")) | |
| ).scalar() | |
| drafts = ( | |
| await session.execute(select(func.count(Post.id)).where(Post.status == "draft")) | |
| ).scalar() | |
| failed = ( | |
| await session.execute(select(func.count(Post.id)).where(Post.status == "failed")) | |
| ).scalar() | |
| by_platform = await session.execute( | |
| select(Post.platform, func.count(Post.id).label("count")) | |
| .where(Post.status == "published") | |
| .group_by(Post.platform) | |
| ) | |
| total_cost = ( | |
| await session.execute( | |
| select(func.sum(Post.generation_cost)).where(Post.generation_cost.isnot(None)) | |
| ) | |
| ).scalar() or 0.0 | |
| return { | |
| "total_posts": total, | |
| "published": published, | |
| "drafts_pending": drafts, | |
| "failed": failed, | |
| "total_cost_usd": round(total_cost, 4), | |
| "by_platform": [ | |
| {"platform": row.platform, "count": row.count} for row in by_platform.all() | |
| ], | |
| } | |
| # ── Preview ─────────────────────────────────────────────────────────────────── | |
| async def calendar_posts( | |
| start: datetime = Query(...), | |
| end: datetime = Query(...), | |
| limit: int = Query(default=200, le=500), | |
| ): | |
| """Posts that belong in the calendar for a date range.""" | |
| start = start.replace(tzinfo=None) | |
| end = end.replace(tzinfo=None) | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute( | |
| select(Post) | |
| .options(selectinload(Post.music_track)) | |
| .where( | |
| or_( | |
| Post.scheduled_at.between(start, end), | |
| Post.published_at.between(start, end), | |
| Post.created_at.between(start, end), | |
| ) | |
| ) | |
| .order_by(Post.created_at.asc()) | |
| .limit(limit) | |
| ) | |
| posts = rows.scalars().all() | |
| # batch-load trends to avoid N+1 | |
| trend_ids = {p.trend_id for p in posts if p.trend_id} | |
| trend_map: dict = {} | |
| if trend_ids: | |
| t_rows = await session.execute(select(Trend).where(Trend.id.in_(trend_ids))) | |
| trend_map = {t.id: t.topic for t in t_rows.scalars().all()} | |
| items = [] | |
| for post in posts: | |
| trend_topic = trend_map.get(post.trend_id) if post.trend_id else None | |
| item = _post_dict(post, trend_topic, await _get_workflow_id(session, post.id)) | |
| item["calendar_at"] = post.scheduled_at or post.published_at or post.created_at | |
| items.append(item) | |
| return {"items": items, "start": start, "end": end, "total": len(items)} | |
| class PostEditBody(BaseModel): | |
| content: Optional[str] = None | |
| platform: Optional[str] = None | |
| post_type: Optional[str] = None | |
| scheduled_at: Optional[datetime] = None | |
| image_url: Optional[str] = None | |
| link_url: Optional[str] = None | |
| link_title: Optional[str] = None | |
| link_description: Optional[str] = None | |
| async def update_post(post_id: int, body: PostEditBody): | |
| """Edit a draft or failed post before approval/publish.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status == "published": | |
| raise HTTPException(status_code=400, detail="Published posts cannot be edited") | |
| fields_set = getattr(body, "model_fields_set", getattr(body, "__fields_set__", set())) | |
| for field in ( | |
| "content", | |
| "platform", | |
| "post_type", | |
| "scheduled_at", | |
| "image_url", | |
| "link_url", | |
| "link_title", | |
| "link_description", | |
| ): | |
| if field in fields_set: | |
| setattr(post, field, getattr(body, field)) | |
| post.updated_at = datetime.utcnow() | |
| await session.commit() | |
| await session.refresh(post) | |
| trend_topic = None | |
| if post.trend_id: | |
| trend = await session.get(Trend, post.trend_id) | |
| trend_topic = trend.topic if trend else None | |
| return _post_dict(post, trend_topic, await _get_workflow_id(session, post.id)) | |
| async def preview_post(post_id: int): | |
| """Return post content + image URL for preview before approving.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| trend_topic = None | |
| if post.trend_id: | |
| tr = await session.get(Trend, post.trend_id) | |
| trend_topic = tr.topic if tr else None | |
| workflow_id = await _get_workflow_id(session, post.id) | |
| return { | |
| "post_id": post.id, | |
| "trend_topic": trend_topic, | |
| "workflow_id": workflow_id, | |
| "caption": post.content, | |
| "image_url": post.image_url, | |
| "platform": post.platform, | |
| "status": post.status, | |
| "character_count": len(post.content) if post.content else 0, | |
| "hashtags": [w for w in (post.content or "").split() if w.startswith("#")], | |
| } | |
| # ── Approve / Reject ───────────────────────────────────────────────────────── | |
| class ApproveBody(BaseModel): | |
| publish_mode: str = "post" # "post" | "reel" | |
| async def approve_post(post_id: int, body: ApproveBody = ApproveBody()): | |
| """Approve a post — signals Temporal if workflow exists, else updates DB directly.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status == "published": | |
| return {"message": "Already published", "post_id": post_id} | |
| workflow_id = await _get_workflow_id(session, post.id) | |
| # Try to signal Temporal workflow | |
| if workflow_id: | |
| try: | |
| from src.config import get_settings | |
| from temporalio.client import Client | |
| settings = get_settings() | |
| client = await Client.connect( | |
| f"{settings.temporal_host}:{settings.temporal_port}", | |
| namespace=settings.temporal_namespace, | |
| ) | |
| handle = client.get_workflow_handle(workflow_id) | |
| await handle.signal("approved") | |
| return {"message": "Approved via Temporal", "workflow_id": workflow_id} | |
| except Exception: | |
| pass # Temporal not running — fall through to direct DB update | |
| # Direct publish — call Instagram API then update DB | |
| async with AsyncSessionLocal() as session: | |
| post = (await session.execute( | |
| select(Post).where(Post.id == post_id).options(selectinload(Post.music_track)) | |
| )).scalar_one_or_none() | |
| content = post.content | |
| image_url = post.image_url | |
| carousel_urls = getattr(post, "carousel_urls", None) or [] | |
| post_type = post.post_type or "text_image" | |
| platform = post.platform or "instagram" | |
| music_url = post.music_track.preview_url if post.music_track else None | |
| # Auto-improve content quality before publishing | |
| try: | |
| from src.services.post_improver import auto_improve | |
| content, improved_score = await auto_improve(content, post_type) | |
| # Persist improved content + updated score back to DB | |
| async with AsyncSessionLocal() as session: | |
| post_upd = await session.get(Post, post_id) | |
| if post_upd: | |
| post_upd.content = content | |
| post_upd.quality_score = improved_score["score"] | |
| post_upd.quality_breakdown = improved_score["breakdown"] | |
| await session.commit() | |
| except Exception as e: | |
| logger.warning(f"Auto-improve skipped: {e}") | |
| # Normalize hashtags in caption — ensure every tag word starts with # | |
| import re as _re | |
| def _normalize_caption_hashtags(text: str) -> str: | |
| def fix_tag_line(m): | |
| words = m.group(0).split() | |
| return " ".join(w if w.startswith("#") else f"#{w}" for w in words) | |
| return _re.sub(r"^(?:#?[A-Za-z][A-Za-z0-9]{2,}\s*){3,}$", fix_tag_line, text, flags=_re.MULTILINE) | |
| content = _normalize_caption_hashtags(content) | |
| # Inject auto_hashtags from brand voice (e.g. #dailywireup #news) | |
| try: | |
| from src.models.database import BrandVoice as _BV | |
| async with AsyncSessionLocal() as _s: | |
| _bv = (await _s.execute( | |
| select(_BV).where(_BV.is_active == True).limit(1) | |
| )).scalar_one_or_none() | |
| if _bv and _bv.auto_hashtags: | |
| extra = _bv.auto_hashtags.strip() | |
| if extra and extra not in content: | |
| content = content.rstrip() + "\n" + extra | |
| except Exception: | |
| pass | |
| platform_post_id = None | |
| publish_error = None | |
| try: | |
| from src.services.social_media import SocialMediaPublisher | |
| publisher = SocialMediaPublisher() | |
| # For carousel posts send all slide images; otherwise single image | |
| if post_type == "carousel" and carousel_urls: | |
| publish_images = carousel_urls | |
| elif image_url: | |
| publish_images = [image_url] | |
| else: | |
| publish_images = [] | |
| # If user chose "reel", force reel publishing regardless of post_type | |
| effective_type = "carousel" if body.publish_mode == "reel" else post_type | |
| effective_music = music_url if body.publish_mode == "reel" else (music_url if post_type == "carousel" else None) | |
| result = await publisher.publish_post( | |
| platform=platform, | |
| text=content, | |
| image_urls=publish_images, | |
| post_type=effective_type, | |
| music_url=effective_music, | |
| ) | |
| if result["success"]: | |
| platform_post_id = result.get("platform_post_id") | |
| else: | |
| publish_error = result.get("error") | |
| logger.warning(f"Publish failed for post {post_id}: {publish_error}") | |
| except Exception as e: | |
| publish_error = str(e) | |
| logger.error(f"Publish exception for post {post_id}: {e}") | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| post.approval_status = "approved" | |
| if platform_post_id: | |
| post.status = "published" | |
| post.published_at = datetime.utcnow() | |
| post.generation_error = None | |
| post.platform_post_id = platform_post_id | |
| else: | |
| post.status = "failed" | |
| post.generation_error = publish_error or "Publish failed" | |
| await _create_notification( | |
| session, | |
| "publish_failed", | |
| f"Post #{post_id} failed to publish", | |
| post.generation_error, | |
| {"post_id": post_id, "platform": platform}, | |
| ) | |
| await session.commit() | |
| if platform_post_id: | |
| return {"message": "Approved and published to Instagram", "post_id": post_id, "platform_post_id": platform_post_id} | |
| return {"message": "Approved but publish failed", "post_id": post_id, "error": publish_error} | |
| async def reject_post(post_id: int): | |
| """Reject a post — signals Temporal if workflow exists, else updates DB directly.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| workflow_id = await _get_workflow_id(session, post.id) | |
| # Try to signal Temporal workflow | |
| if workflow_id: | |
| try: | |
| from src.config import get_settings | |
| from temporalio.client import Client | |
| settings = get_settings() | |
| client = await Client.connect( | |
| f"{settings.temporal_host}:{settings.temporal_port}", | |
| namespace=settings.temporal_namespace, | |
| ) | |
| handle = client.get_workflow_handle(workflow_id) | |
| await handle.signal("rejected") | |
| return {"message": "Rejected via Temporal", "workflow_id": workflow_id} | |
| except Exception: | |
| pass # Temporal not running — fall through to direct DB update | |
| # Direct DB update | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| post.approval_status = "rejected" | |
| post.status = "failed" | |
| await session.commit() | |
| return {"message": "Rejected directly", "post_id": post_id} | |
| # ── Delete ──────────────────────────────────────────────────────────────────── | |
| async def regenerate_post(post_id: int): | |
| """Re-run the generation pipeline for a failed or draft post using the same trend.""" | |
| import asyncio | |
| from src.temporal_workflows.pipeline import ( | |
| generate_post_content, generate_images, store_media_to_cdn, save_post_draft, | |
| GeneratePostInput, GenerateImagesInput, StoreMediaInput, SavePostDraftInput, TrendData, | |
| ) | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status == "published": | |
| raise HTTPException(status_code=400, detail="Cannot regenerate a published post") | |
| trend_id = post.trend_id | |
| platform = post.platform or "instagram" | |
| post_type = getattr(post, "post_type", "image") or "image" | |
| trend_row = await session.get(Trend, trend_id) if trend_id else None | |
| if not trend_row: | |
| raise HTTPException(status_code=404, detail="Original trend not found — cannot regenerate") | |
| trend_data = TrendData( | |
| trend_id=trend_row.id, | |
| topic=trend_row.topic, | |
| source=trend_row.source, | |
| score=trend_row.score or 0.0, | |
| raw_data=trend_row.raw_data, | |
| ) | |
| # Reset post to generating state immediately | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| post.status = "generating" | |
| post.content = f"Regenerating post about: {trend_data.topic}" | |
| post.image_url = None | |
| post.generation_error = None | |
| await session.commit() | |
| async def _run(): | |
| try: | |
| post_data = await generate_post_content(GeneratePostInput(trend=trend_data, agent_version_id=1)) | |
| effective_type = post_type or post_data.post_type | |
| images = await generate_images(GenerateImagesInput( | |
| topic=trend_data.topic, count=1, image_prompt=post_data.image_prompt, | |
| post_type=effective_type, | |
| carousel_image_prompts=post_data.carousel_image_prompts if effective_type == "carousel" else None, | |
| )) | |
| cdn_images = await store_media_to_cdn(StoreMediaInput(images=images, post_id=post_id)) | |
| await save_post_draft(SavePostDraftInput( | |
| post=post_data, trend_id=trend_id, agent_version_id=1, | |
| images=cdn_images, platform=platform, post_type=post_type, | |
| placeholder_id=post_id, | |
| )) | |
| logger.info(f"[Regenerate] Post {post_id} regenerated successfully") | |
| except Exception as e: | |
| logger.error(f"[Regenerate] Failed for post {post_id}: {e}") | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if post: | |
| post.status = "failed" | |
| post.content = f"Regeneration failed: {e}" | |
| post.generation_error = str(e) | |
| await _create_notification( | |
| session, | |
| "generation_failed", | |
| f"Post #{post_id} regeneration failed", | |
| str(e), | |
| {"post_id": post_id}, | |
| ) | |
| await session.commit() | |
| asyncio.create_task(_run()) | |
| return {"message": "Regeneration started", "post_id": post_id} | |
| async def regenerate_image(post_id: int): | |
| """Re-generate only the image for a post, keeping the caption unchanged.""" | |
| import asyncio | |
| from src.temporal_workflows.pipeline import ( | |
| generate_images, store_media_to_cdn, | |
| GenerateImagesInput, StoreMediaInput, TrendData, | |
| ) | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status == "published": | |
| raise HTTPException(status_code=400, detail="Cannot regenerate image on a published post") | |
| trend_row = await session.get(Trend, post.trend_id) if post.trend_id else None | |
| topic = trend_row.topic if trend_row else (post.content or "")[:80] | |
| post_type = getattr(post, "post_type", "image") or "image" | |
| async def _run_image(): | |
| try: | |
| images = await generate_images(GenerateImagesInput( | |
| topic=topic, count=1, image_prompt=None, post_type=post_type, | |
| )) | |
| cdn = await store_media_to_cdn(StoreMediaInput(images=images, post_id=post_id)) | |
| new_url = cdn.get("image_url") or cdn.get("cover_url") | |
| if new_url: | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if post: | |
| post.image_url = new_url | |
| post.updated_at = datetime.utcnow() | |
| await session.commit() | |
| logger.info(f"[RegenerateImage] Post {post_id} new image: {new_url}") | |
| except Exception as e: | |
| logger.error(f"[RegenerateImage] Failed for post {post_id}: {e}") | |
| asyncio.create_task(_run_image()) | |
| return {"message": "Image regeneration started", "post_id": post_id} | |
| async def post_insights(post_id: int): | |
| """Fetch Instagram media insights for a published post.""" | |
| import httpx, os | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| media_id = post.platform_post_id | |
| platform = post.platform or "instagram" | |
| if not media_id: | |
| return {"available": False, "reason": "Post not published or missing platform ID"} | |
| if platform.lower() != "instagram": | |
| return {"available": False, "reason": f"Insights only available for Instagram (this is {platform})"} | |
| from src.config import get_settings as _gs | |
| token = _gs().instagram_access_token or os.getenv("INSTAGRAM_ACCESS_TOKEN", "") | |
| if not token: | |
| return {"available": False, "reason": "Instagram token not configured"} | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| resp = await client.get( | |
| f"https://graph.facebook.com/v22.0/{media_id}/insights", | |
| params={ | |
| "metric": "reach,saved,likes,comments,shares,total_interactions,views", | |
| "access_token": token, | |
| }, | |
| ) | |
| data = resp.json() | |
| if "error" in data: | |
| return {"available": False, "reason": data["error"].get("message", "API error")} | |
| metrics = {} | |
| for item in data.get("data", []): | |
| val = item.get("values", [{}])[0].get("value", 0) if item.get("values") else item.get("value", 0) | |
| metrics[item["name"]] = val | |
| return {"available": True, "media_id": media_id, "metrics": metrics} | |
| except Exception as e: | |
| return {"available": False, "reason": str(e)} | |
| class SchedulePostBody(BaseModel): | |
| scheduled_at: datetime | |
| async def schedule_post(post_id: int, body: SchedulePostBody): | |
| """Schedule or reschedule a post (POST = new schedule, PUT = drag-and-drop reschedule).""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status == "published": | |
| raise HTTPException(status_code=400, detail="Post already published") | |
| scheduled_at = body.scheduled_at.replace(tzinfo=None) | |
| post.scheduled_at = scheduled_at | |
| post.approval_status = "approved" | |
| post.updated_at = datetime.utcnow() | |
| await session.commit() | |
| return {"message": "Post scheduled", "post_id": post_id, "scheduled_at": scheduled_at.isoformat()} | |
| async def discard_post(post_id: int): | |
| """Discard a draft post — hard deletes it so it never gets published.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status not in ("draft",): | |
| raise HTTPException(status_code=400, detail=f"Cannot discard a post with status '{post.status}'") | |
| await session.delete(post) | |
| await session.commit() | |
| return {"discarded": True, "post_id": post_id} | |
| async def delete_post(post_id: int): | |
| """Hard delete a single post by ID.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| await session.delete(post) | |
| await session.commit() | |
| return {"deleted": 1} | |
| class GeneratePostBody(BaseModel): | |
| trend_id: Optional[int] = None # if None, picks the top pending trend | |
| platform: str = "instagram" | |
| post_type: str = "image" | |
| async def generate_post(body: GeneratePostBody): | |
| """Run the full generation pipeline for one trend without Temporal.""" | |
| import asyncio | |
| from src.temporal_workflows.pipeline import ( | |
| generate_post_content, generate_images, | |
| store_media_to_cdn, save_post_draft, | |
| GeneratePostInput, GenerateImagesInput, StoreMediaInput, SavePostDraftInput, | |
| TrendData, | |
| ) | |
| # Pick trend | |
| async with AsyncSessionLocal() as session: | |
| if body.trend_id: | |
| trend_row = await session.get(Trend, body.trend_id) | |
| if not trend_row: | |
| raise HTTPException(status_code=404, detail="Trend not found") | |
| else: | |
| from sqlalchemy import func as sqlfunc | |
| from datetime import datetime as dt | |
| hours_old = sqlfunc.greatest( | |
| sqlfunc.extract("epoch", dt.utcnow() - Trend.created_at) / 3600.0, | |
| 0.01 | |
| ) | |
| boosted = Trend.score / (1.0 + hours_old / 12.0) | |
| result = await session.execute( | |
| select(Trend) | |
| .where(Trend.status == "pending_generation") | |
| .order_by(boosted.desc()) | |
| .limit(1) | |
| ) | |
| trend_row = result.scalar_one_or_none() | |
| if not trend_row: | |
| raise HTTPException(status_code=404, detail="No pending trends available") | |
| trend_id = trend_row.id | |
| trend_data = TrendData( | |
| trend_id=trend_row.id, | |
| topic=trend_row.topic, | |
| source=trend_row.source, | |
| score=trend_row.score or 0.0, | |
| raw_data=trend_row.raw_data, | |
| ) | |
| # ── Duplicate check: find semantically similar existing posts ────────── | |
| from src.services.embeddings import get_embedding, find_similar_posts | |
| trend_embedding = await get_embedding(trend_data.topic) | |
| duplicate_warning = None | |
| if trend_embedding: | |
| similar = await find_similar_posts(trend_embedding, threshold=0.88, limit=1) | |
| if similar: | |
| duplicate_warning = { | |
| "post_id": similar[0]["id"], | |
| "similarity": round(float(similar[0]["similarity"]) * 100), | |
| "content_preview": (similar[0]["content"] or "")[:80], | |
| } | |
| # ── Insert placeholder row immediately so the UI can track progress ── | |
| async with AsyncSessionLocal() as session: | |
| placeholder = Post( | |
| trend_id=trend_id, | |
| agent_version_id=1, | |
| content=f"Generating post about: {trend_data.topic}", | |
| post_type=body.post_type, | |
| platform=body.platform, | |
| status="generating", | |
| approval_status="pending", | |
| generation_error=None, | |
| ) | |
| session.add(placeholder) | |
| await session.commit() | |
| await session.refresh(placeholder) | |
| placeholder_id = placeholder.id | |
| async def _run_pipeline(): | |
| from src.agents.post_generator import set_post_step | |
| try: | |
| set_post_step(placeholder_id, "Analysing trend...") | |
| post_data = await generate_post_content(GeneratePostInput(trend=trend_data, agent_version_id=1, post_type=body.post_type, post_id=placeholder_id)) | |
| set_post_step(placeholder_id, "Generating image(s)...") | |
| carousel_prompts = post_data.carousel_image_prompts if body.post_type == "carousel" else None | |
| img_count = len(carousel_prompts) if carousel_prompts else (6 if body.post_type == "carousel" else 1) | |
| images = await generate_images(GenerateImagesInput( | |
| topic=trend_data.topic, | |
| count=img_count, | |
| image_prompt=post_data.image_prompt, | |
| post_type=body.post_type, | |
| carousel_image_prompts=carousel_prompts, | |
| post_id=placeholder_id, | |
| )) | |
| set_post_step(placeholder_id, "Storing media...") | |
| cdn_images = await store_media_to_cdn(StoreMediaInput(images=images, post_id=placeholder_id)) | |
| saved = await save_post_draft(SavePostDraftInput( | |
| post=post_data, | |
| trend_id=trend_id, | |
| agent_version_id=1, | |
| images=cdn_images, | |
| platform=body.platform, | |
| post_type=body.post_type, | |
| placeholder_id=placeholder_id, | |
| token_count=post_data.token_count, | |
| generation_cost=post_data.generation_cost, | |
| hallucination_score=post_data.hallucination_score, | |
| )) | |
| # Mark trend as used | |
| async with AsyncSessionLocal() as session: | |
| t = await session.get(Trend, trend_id) | |
| if t: | |
| t.status = "generated" | |
| t.used_for_post = True | |
| await session.commit() | |
| set_post_step(placeholder_id, "Matching music to topic mood...") | |
| # Auto-attach music based on post content + trend topic | |
| await _auto_attach_music(placeholder_id, post_data.content, trend_data.topic) | |
| set_post_step(placeholder_id, "Scoring quality — hook, CTA, hashtags, brand fit...") | |
| # Compute quality score, auto-improve if weak | |
| try: | |
| from src.services.quality_scorer import score_post | |
| from src.services.post_improver import auto_improve | |
| from src.models.database import BrandVoice | |
| async with AsyncSessionLocal() as session: | |
| brand = await session.execute( | |
| select(BrandVoice).where(BrandVoice.is_active == True).limit(1) | |
| ) | |
| brand_voice = brand.scalar_one_or_none() | |
| initial = score_post(post_data.content, body.post_type, brand_voice, body.platform) | |
| set_post_step(placeholder_id, f"Initial score: {initial['score']}/100 ({initial['grade']})") | |
| logger.info(f"[Quality] Post {placeholder_id} initial score {initial['score']}/100 ({initial['grade']})") | |
| final_content = post_data.content | |
| final_result = initial | |
| if initial["score"] < 70: | |
| set_post_step(placeholder_id, f"Score {initial['score']}/100 — auto-improving...") | |
| final_content, final_result = await auto_improve(post_data.content, body.post_type, brand_voice, post_id=placeholder_id) | |
| post = await session.get(Post, placeholder_id) | |
| if post: | |
| post.content = final_content | |
| post.quality_score = final_result["score"] | |
| post.quality_breakdown = final_result["breakdown"] | |
| await session.commit() | |
| logger.info(f"[Quality] Post {placeholder_id} final score {final_result['score']}/100 ({final_result['grade']})") | |
| except Exception as qe: | |
| logger.warning(f"[Quality] Scoring failed for post {placeholder_id}: {qe}") | |
| logger.info(f"[Generate] Post {placeholder_id} generation complete") | |
| # Budget alert check | |
| try: | |
| from src.config import get_settings as _gs | |
| _budget = _gs().monthly_budget_usd | |
| if _budget and _budget > 0: | |
| from datetime import datetime as _dt | |
| _month = _dt.utcnow().strftime("%Y-%m") | |
| async with AsyncSessionLocal() as _bs: | |
| _spent = (await _bs.execute( | |
| select(func.sum(Post.generation_cost)) | |
| .where(Post.generation_cost.isnot(None)) | |
| .where(func.to_char(Post.created_at, "YYYY-MM") == _month) | |
| )).scalar() or 0.0 | |
| if _spent >= _budget: | |
| _exists = (await _bs.execute( | |
| select(Notification).where( | |
| Notification.type == "budget_exceeded", | |
| ).order_by(Notification.created_at.desc()).limit(1) | |
| )).scalar_one_or_none() | |
| if not _exists: | |
| _bs.add(Notification( | |
| type="budget_exceeded", | |
| title="Monthly budget exceeded", | |
| message=f"Generation cost this month: ${_spent:.4f} — over your ${_budget:.2f} limit.", | |
| is_read=False, | |
| )) | |
| await _bs.commit() | |
| logger.warning(f"[Budget] Monthly spend ${_spent:.4f} exceeded limit ${_budget:.2f}") | |
| except Exception: | |
| pass | |
| # Clear step tracker so the UI stops showing "Scoring quality..." | |
| from src.agents.post_generator import _post_steps | |
| _post_steps.pop(placeholder_id, None) | |
| # Force WS push in case PG NOTIFY isn't reaching us (Neon pooled connections) | |
| from src.utils.notify import mark_dirty | |
| mark_dirty("posts_changed") | |
| except Exception as e: | |
| logger.error(f"[Generate] Pipeline failed for placeholder {placeholder_id}: {e}") | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, placeholder_id) | |
| if post: | |
| post.status = "failed" | |
| post.content = f"Generation failed: {e}" | |
| post.generation_error = str(e) | |
| await _create_notification( | |
| session, | |
| "generation_failed", | |
| f"Post #{placeholder_id} generation failed", | |
| str(e), | |
| {"post_id": placeholder_id, "trend_id": trend_id}, | |
| ) | |
| await session.commit() | |
| from src.agents.post_generator import _post_steps | |
| _post_steps.pop(placeholder_id, None) | |
| from src.utils.notify import mark_dirty | |
| mark_dirty("posts_changed") | |
| # Fire pipeline in background — return immediately with placeholder id | |
| task = asyncio.create_task(_run_pipeline()) | |
| _generation_tasks[placeholder_id] = task | |
| task.add_done_callback(lambda _: _generation_tasks.pop(placeholder_id, None)) | |
| return { | |
| "message": "Generation started", | |
| "post_id": placeholder_id, | |
| "trend_id": trend_id, | |
| "topic": trend_data.topic, | |
| "platform": body.platform, | |
| "duplicate_warning": duplicate_warning, | |
| } | |
| async def cancel_generation(post_id: int): | |
| """Cancel an in-progress post generation and mark the post as discarded.""" | |
| task = _generation_tasks.get(post_id) | |
| if task and not task.done(): | |
| task.cancel() | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.status not in ("generating",): | |
| raise HTTPException(status_code=400, detail=f"Post is not generating (status: {post.status})") | |
| post.status = "failed" | |
| post.generation_error = "Cancelled by user" | |
| await session.commit() | |
| return {"post_id": post_id, "cancelled": True} | |
| class ManualPostBody(BaseModel): | |
| content: str | |
| platform: str = "instagram" | |
| post_type: str = "text_only" | |
| image_url: Optional[str] = None | |
| link_url: Optional[str] = None | |
| link_title: Optional[str] = None | |
| link_description: Optional[str] = None | |
| async def create_manual_post(body: ManualPostBody): | |
| """Create a post manually — no AI generation, saves directly as draft.""" | |
| async with AsyncSessionLocal() as session: | |
| post = Post( | |
| content=body.content, | |
| platform=body.platform, | |
| post_type=body.post_type, | |
| image_url=body.image_url or None, | |
| link_url=body.link_url or None, | |
| link_title=body.link_title or None, | |
| link_description=body.link_description or None, | |
| status="draft", | |
| approval_status="pending", | |
| ) | |
| session.add(post) | |
| await session.commit() | |
| await session.refresh(post) | |
| return _post_dict(post) | |
| class MusicAttachBody(BaseModel): | |
| music_id: Optional[int] = None | |
| async def attach_music(post_id: int, body: MusicAttachBody): | |
| """Attach or detach a music track from a post.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| post.music_id = body.music_id | |
| post.updated_at = datetime.utcnow() | |
| await session.commit() | |
| return {"post_id": post_id, "music_id": body.music_id} | |
| class BulkIdsBody(BaseModel): | |
| ids: List[int] | |
| async def bulk_approve_posts(body: BulkIdsBody): | |
| """Queue multiple draft posts for immediate publish.""" | |
| import asyncio | |
| async def _approve(post_id: int): | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post or post.status not in ("draft", "failed"): | |
| return {"post_id": post_id, "ok": False, "error": "Invalid status"} | |
| try: | |
| from src.services.scheduler import run_auto_publish | |
| await run_auto_publish(post_id) | |
| return {"post_id": post_id, "ok": True} | |
| except Exception as e: | |
| return {"post_id": post_id, "ok": False, "error": str(e)} | |
| results = await asyncio.gather(*[_approve(pid) for pid in body.ids]) | |
| return {"results": list(results), "approved": sum(1 for r in results if r["ok"])} | |
| class BulkScheduleBody(BaseModel): | |
| ids: List[int] | |
| scheduled_at: datetime | |
| async def bulk_schedule_posts(body: BulkScheduleBody): | |
| """Approve + schedule multiple draft posts for the same time slot.""" | |
| if not body.ids: | |
| return {"scheduled": 0} | |
| scheduled_at = body.scheduled_at.replace(tzinfo=None) | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute(select(Post).where(Post.id.in_(body.ids))) | |
| posts = rows.scalars().all() | |
| count = 0 | |
| for p in posts: | |
| if p.status == "draft": | |
| p.approval_status = "approved" | |
| p.scheduled_at = scheduled_at | |
| count += 1 | |
| await session.commit() | |
| return {"scheduled": count, "scheduled_at": scheduled_at.isoformat()} | |
| async def bulk_reject_posts(body: BulkIdsBody): | |
| """Reject multiple draft posts.""" | |
| if not body.ids: | |
| return {"rejected": 0} | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute(select(Post).where(Post.id.in_(body.ids))) | |
| posts = rows.scalars().all() | |
| for p in posts: | |
| if p.status == "draft": | |
| p.approval_status = "rejected" | |
| p.status = "failed" | |
| await session.commit() | |
| return {"rejected": len(body.ids)} | |
| async def bulk_delete_posts(body: BulkIdsBody): | |
| """Hard delete multiple posts by ID list.""" | |
| if not body.ids: | |
| return {"deleted": 0} | |
| async with AsyncSessionLocal() as session: | |
| result = await session.execute( | |
| delete(Post).where(Post.id.in_(body.ids)) | |
| ) | |
| await session.commit() | |
| return {"deleted": result.rowcount} | |
| # ── Extended Analytics ───────────────────────────────────────────────────────── | |
| async def posts_timeseries(days: int = Query(30, ge=7, le=365)): | |
| """Daily post counts for the last N days — published + drafted.""" | |
| cutoff = datetime.utcnow() - timedelta(days=days) | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute( | |
| select( | |
| cast(Post.created_at, Date).label("day"), | |
| Post.status, | |
| func.count(Post.id).label("count"), | |
| ) | |
| .where(Post.created_at >= cutoff) | |
| .group_by(cast(Post.created_at, Date), Post.status) | |
| .order_by(cast(Post.created_at, Date)) | |
| ) | |
| data: dict = {} | |
| for row in rows.all(): | |
| day = str(row.day) | |
| if day not in data: | |
| data[day] = {"date": day, "published": 0, "draft": 0, "failed": 0} | |
| key = row.status if row.status in ("published", "draft", "failed") else "draft" | |
| data[day][key] += row.count | |
| return {"days": days, "series": list(data.values())} | |
| async def posts_by_type(): | |
| """Breakdown of post counts by post_type.""" | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute( | |
| select(Post.post_type, func.count(Post.id).label("count")) | |
| .group_by(Post.post_type) | |
| ) | |
| return [{"post_type": r.post_type or "image", "count": r.count} for r in rows.all()] | |
| async def top_posts(limit: int = Query(5, ge=1, le=20)): | |
| """Most recently published posts — used for 'top content' panel.""" | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute( | |
| select(Post) | |
| .options(selectinload(Post.music_track)) | |
| .where(Post.status == "published") | |
| .order_by(Post.published_at.desc()) | |
| .limit(limit) | |
| ) | |
| posts = rows.scalars().all() | |
| items = [] | |
| for p in posts: | |
| trend_topic = None | |
| if p.trend_id: | |
| tr = await session.get(Trend, p.trend_id) | |
| trend_topic = tr.topic if tr else None | |
| items.append({**_post_dict(p, trend_topic), "char_count": len(p.content or "")}) | |
| return {"items": items} | |
| async def posting_hours(): | |
| """Hour-of-day breakdown of published posts (0–23) for heatmap.""" | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute( | |
| select( | |
| func.extract("hour", Post.published_at).label("hour"), | |
| func.count(Post.id).label("count"), | |
| ) | |
| .where(Post.published_at.isnot(None)) | |
| .group_by(func.extract("hour", Post.published_at)) | |
| .order_by(func.extract("hour", Post.published_at)) | |
| ) | |
| hour_map = {int(r.hour): r.count for r in rows.all()} | |
| return [{"hour": h, "count": hour_map.get(h, 0)} for h in range(24)] | |
| async def instagram_engagement(limit: int = Query(default=20, le=50)): | |
| """Aggregate Instagram engagement metrics across recently published posts.""" | |
| import httpx, os | |
| from src.config import get_settings as _gs2 | |
| token = _gs2().instagram_access_token or os.getenv("INSTAGRAM_ACCESS_TOKEN", "") | |
| if not token: | |
| return {"available": False, "reason": "Instagram token not configured", "posts": [], "totals": {}} | |
| async with AsyncSessionLocal() as session: | |
| rows = await session.execute( | |
| select(Post.id, Post.content, Post.image_url, Post.platform_post_id, Post.published_at) | |
| .where(Post.status == "published", Post.platform_post_id.isnot(None), Post.platform == "instagram") | |
| .order_by(Post.published_at.desc()) | |
| .limit(limit) | |
| ) | |
| posts = rows.all() | |
| if not posts: | |
| return {"available": True, "posts": [], "totals": {"views": 0, "reach": 0, "likes": 0, "comments": 0, "saved": 0, "shares": 0, "total_interactions": 0}} | |
| async def _fetch(post_row): | |
| try: | |
| async with httpx.AsyncClient(timeout=8) as client: | |
| resp = await client.get( | |
| f"https://graph.facebook.com/v22.0/{post_row.platform_post_id}/insights", | |
| params={"metric": "reach,saved,likes,comments,shares,total_interactions,views", "access_token": token}, | |
| ) | |
| data = resp.json() | |
| if "error" in data: | |
| return None | |
| metrics = {} | |
| for item in data.get("data", []): | |
| val = item.get("values", [{}])[0].get("value", 0) if item.get("values") else item.get("value", 0) | |
| metrics[item["name"]] = val | |
| return { | |
| "post_id": post_row.id, | |
| "media_id": post_row.platform_post_id, | |
| "content_preview": (post_row.content or "")[:80], | |
| "image_url": post_row.image_url, | |
| "published_at": post_row.published_at.isoformat() if post_row.published_at else None, | |
| **metrics, | |
| } | |
| except Exception: | |
| return None | |
| import asyncio | |
| results = await asyncio.gather(*[_fetch(p) for p in posts]) | |
| valid = [r for r in results if r] | |
| totals = {"views": 0, "reach": 0, "likes": 0, "comments": 0, "saved": 0, "shares": 0, "total_interactions": 0} | |
| for r in valid: | |
| for k in totals: | |
| totals[k] += r.get(k, 0) | |
| return {"available": True, "posts": valid, "totals": totals} | |
| async def similar_posts( | |
| post_id: int, | |
| threshold: float = Query(0.80, ge=0.5, le=1.0), | |
| limit: int = Query(5, ge=1, le=20), | |
| ): | |
| """Find posts semantically similar to a given post.""" | |
| async with AsyncSessionLocal() as session: | |
| post = await session.get(Post, post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| if post.content_embedding is None: | |
| from src.services.embeddings import get_embedding | |
| emb = await get_embedding(post.content or "") | |
| if not emb: | |
| return {"items": [], "note": "No embedding available for this post"} | |
| else: | |
| emb = list(post.content_embedding) | |
| from src.services.embeddings import find_similar_posts | |
| items = await find_similar_posts(emb, threshold=threshold, limit=limit, exclude_id=post_id) | |
| return {"items": items} | |
| async def get_post(post_id: int): | |
| """Fetch a single post by ID — same shape as the list endpoint items.""" | |
| async with AsyncSessionLocal() as session: | |
| result = await session.execute( | |
| select(Post).where(Post.id == post_id).options(selectinload(Post.music_track)) | |
| ) | |
| post = result.scalar_one_or_none() | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| trend_topic = None | |
| if post.trend_id: | |
| t = await session.get(Trend, post.trend_id) | |
| trend_topic = t.topic if t else None | |
| return _post_dict(post, trend_topic, await _get_workflow_id(session, post.id)) | |
| _sched_status_cache: dict = {"ts": 0.0, "data": None} | |
| async def scheduler_status(): | |
| """Live scheduler info — cached for 30s to avoid repeated DB queries.""" | |
| import time | |
| from src.services.scheduler import scheduler | |
| from src.config import get_settings | |
| now = time.monotonic() | |
| if _sched_status_cache["data"] and now - _sched_status_cache["ts"] < 30: | |
| return _sched_status_cache["data"] | |
| settings = get_settings() | |
| jobs = [] | |
| for job in scheduler.get_jobs(): | |
| next_run = job.next_run_time | |
| jobs.append({ | |
| "id": job.id, | |
| "name": job.name, | |
| "next_run": next_run.isoformat() if next_run else None, | |
| }) | |
| today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) | |
| async with AsyncSessionLocal() as session: | |
| today_count = (await session.execute( | |
| select(func.count(Post.id)).where(Post.created_at >= today_start) | |
| )).scalar() | |
| published_today = (await session.execute( | |
| select(func.count(Post.id)) | |
| .where(Post.published_at >= today_start) | |
| .where(Post.status == "published") | |
| )).scalar() | |
| # Include currently-running jobs so UI can show a "scheduler busy" indicator | |
| try: | |
| from src.services.scheduler import get_running_jobs | |
| running_jobs = get_running_jobs() | |
| except Exception: | |
| running_jobs = {} | |
| result = { | |
| "scheduler_running": scheduler.running, | |
| "posts_per_day": settings.posts_per_day, | |
| "auto_approve": settings.auto_approve, | |
| "jobs": jobs, | |
| "today_generated": today_count, | |
| "today_published": published_today, | |
| "running_jobs": running_jobs, | |
| "is_busy": bool(running_jobs), | |
| } | |
| # Don't cache when busy — UI needs frequent updates | |
| if not running_jobs: | |
| _sched_status_cache["data"] = result | |
| _sched_status_cache["ts"] = now | |
| return result | |
| class HashtagSuggestBody(BaseModel): | |
| content: str | |
| topic: str = "" | |
| async def suggest_hashtags(body: HashtagSuggestBody): | |
| """Use LLM to suggest 8-10 relevant hashtags for a post.""" | |
| from src.agents.post_generator import _build_attempts | |
| import httpx, json as _json | |
| prompt = ( | |
| f"Generate 8-10 highly relevant Instagram hashtags for this post.\n" | |
| f"Topic: {body.topic}\n" | |
| f"Caption: {body.content[:400]}\n\n" | |
| f"Rules: Mix popular (#news) and niche (#dailywireup) tags. Each must start with #. " | |
| f"No spaces in tags. Output ONLY a JSON array of strings, e.g. [\"#tag1\",\"#tag2\"]. Nothing else." | |
| ) | |
| attempts = await _build_attempts() | |
| for attempt in attempts[:3]: # only try first 3 models | |
| if attempt.get("use_sdk"): | |
| continue | |
| try: | |
| base = (attempt["api_base"] or "").rstrip("/") | |
| if not base.startswith("http"): | |
| base = "https://" + base | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| resp = await client.post( | |
| f"{base}/chat/completions", | |
| headers={"Authorization": f"Bearer {attempt['api_key']}", "Content-Type": "application/json"}, | |
| json={"model": attempt["model"], "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.7, "max_tokens": 200}, | |
| ) | |
| resp.raise_for_status() | |
| raw = resp.json()["choices"][0]["message"]["content"].strip() | |
| # Parse JSON array | |
| start, end = raw.find("["), raw.rfind("]") | |
| if start != -1 and end != -1: | |
| tags = _json.loads(raw[start:end+1]) | |
| tags = [t if t.startswith("#") else f"#{t}" for t in tags if t.strip()] | |
| return {"hashtags": tags[:10]} | |
| except Exception: | |
| continue | |
| return {"hashtags": []} | |