PostGen / backend /app /services /agentic_planner.py
Seth
update
6d1e595
import os
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from openai import OpenAI
from app.services.asset_analyzer import AssetAnalyzer
class AgenticPlanner:
"""Agentic AI service for planning and generating content campaigns"""
def __init__(self):
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
self.model = os.getenv("OPENAI_MODEL", "gpt-4o")
self.asset_analyzer = AssetAnalyzer()
async def plan_campaign(
self,
date_range_start: datetime,
date_range_end: datetime,
products: List[str],
post_types: List[str],
posts_per_week: int,
assets: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""
Agentic planning: Multi-step process to create a content campaign
Steps:
1. Analyze available assets and extract insights
2. Plan content distribution across date range
3. Select appropriate post types for each content piece
4. Generate content themes and topics
5. Optimize posting schedule
"""
# Step 1: Analyze assets and extract insights
asset_insights = await self._analyze_assets(assets or [])
# Step 2: Calculate campaign parameters
total_days = (date_range_end - date_range_start).days + 1
total_weeks = max(1, total_days / 7)
total_posts = int(posts_per_week * total_weeks)
# Step 3: Generate content plan using AI
content_plan = await self._generate_content_plan(
products=products,
post_types=post_types,
total_posts=total_posts,
date_range_start=date_range_start,
date_range_end=date_range_end,
asset_insights=asset_insights
)
# Step 4: Create detailed schedule
schedule = self._create_schedule(
content_plan=content_plan,
date_range_start=date_range_start,
date_range_end=date_range_end,
posts_per_week=posts_per_week
)
return {
"campaign_id": None, # Will be set when saved to DB
"generated_posts": len(schedule),
"schedule": schedule,
"asset_insights": asset_insights,
"content_themes": content_plan.get("themes", [])
}
async def _analyze_assets(self, assets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze all assets and extract key insights"""
insights_by_category = {}
total_assets = len(assets)
for asset in assets:
category = asset.get("product_category", "ocr")
if category not in insights_by_category:
insights_by_category[category] = {
"count": 0,
"insights": [],
"assets": []
}
insights_by_category[category]["count"] += 1
# Extract insights from analyzed content
extracted_content = asset.get("extracted_content")
if extracted_content:
insight = self.asset_analyzer.extract_key_insights(extracted_content)
if insight:
insights_by_category[category]["insights"].append(insight)
insights_by_category[category]["assets"].append({
"id": asset.get("id"),
"name": asset.get("name"),
"insight": insight
})
return {
"total_assets": total_assets,
"by_category": insights_by_category,
"summary": f"Analyzed {total_assets} assets across {len(insights_by_category)} product categories"
}
async def _generate_content_plan(
self,
products: List[str],
post_types: List[str],
total_posts: int,
date_range_start: datetime,
date_range_end: datetime,
asset_insights: Dict[str, Any]
) -> Dict[str, Any]:
"""Use AI to generate a content plan"""
product_descriptions = {
"ocr": "Intelligent Document Parsing (OCR) - AI-powered document processing and data extraction",
"p2p": "Purchase To Pay (P2P) - End-to-end procurement and accounts payable automation",
"o2c": "Order to Cash (O2C) - Complete order management and accounts receivable workflow"
}
post_type_descriptions = {
"carousel": "Multi-slide carousel post with visual storytelling",
"cover_content": "Post with cover image and engaging text content",
"content_only": "Text-only post focused on valuable insights",
"webinar": "Webinar invitation post to promote an upcoming event"
}
# Build asset context
asset_context = ""
if asset_insights.get("by_category"):
asset_context = "\n\nAvailable Asset Insights:\n"
for category, data in asset_insights["by_category"].items():
asset_context += f"\n{product_descriptions.get(category, category)}:\n"
asset_context += f"- {data['count']} assets available\n"
if data.get("insights"):
asset_context += f"- Key insights: {len(data['insights'])} extracted\n"
system_prompt = """You are an expert content strategist for B2B SaaS marketing on LinkedIn.
Your task is to create a comprehensive content plan that:
- Distributes content evenly across the date range
- Varies post types to maintain engagement
- Uses available assets and insights effectively
- Creates diverse, valuable content themes
- Follows LinkedIn best practices
Return a JSON structure with themes and recommended post types for each theme."""
user_prompt = f"""Create a content plan for a LinkedIn campaign:
Products to focus on: {', '.join([product_descriptions.get(p, p) for p in products])}
Available post types: {', '.join([post_type_descriptions.get(pt, pt) for pt in post_types])}
Total posts needed: {total_posts}
Date range: {date_range_start.strftime('%Y-%m-%d')} to {date_range_end.strftime('%Y-%m-%d')}
{asset_context}
Generate {total_posts} content themes with:
- Theme title
- Brief description
- Recommended post type
- Product category
- Key talking points
Return as JSON with structure:
{{
"themes": [
{{
"title": "Theme title",
"description": "Brief description",
"post_type": "carousel|cover_content|content_only|webinar",
"product_category": "ocr|p2p|o2c",
"talking_points": ["point1", "point2", "point3"]
}}
]
}}"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.8,
max_tokens=2000,
response_format={"type": "json_object"}
)
import json
content_plan = json.loads(response.choices[0].message.content)
return content_plan
except Exception as e:
# Fallback: Generate basic themes
return self._generate_fallback_themes(products, post_types, total_posts)
def _generate_fallback_themes(
self,
products: List[str],
post_types: List[str],
total_posts: int
) -> Dict[str, Any]:
"""Generate basic themes if AI fails"""
themes = []
theme_templates = {
"ocr": [
"Document Automation Benefits",
"OCR Technology Overview",
"Efficiency Gains with Intelligent Parsing"
],
"p2p": [
"Streamline Procurement Process",
"Accounts Payable Automation",
"Purchase Request Workflow"
],
"o2c": [
"Order Management Best Practices",
"Sales Order Processing",
"Accounts Receivable Optimization"
]
}
posts_per_product = total_posts // len(products) if products else total_posts
for product in products:
for i in range(posts_per_product):
theme_name = theme_templates.get(product, ["Product Feature"])[i % len(theme_templates.get(product, ["Feature"]))]
themes.append({
"title": f"{theme_name} - Post {i+1}",
"description": f"Content about {product}",
"post_type": post_types[i % len(post_types)] if post_types else "content_only",
"product_category": product,
"talking_points": ["Key benefit 1", "Key benefit 2", "Use case"]
})
return {"themes": themes[:total_posts]}
def _create_schedule(
self,
content_plan: Dict[str, Any],
date_range_start: datetime,
date_range_end: datetime,
posts_per_week: int
) -> List[Dict[str, Any]]:
"""Create a detailed posting schedule"""
themes = content_plan.get("themes", [])
if not themes:
return []
schedule = []
total_days = (date_range_end - date_range_start).days + 1
days_between_posts = max(1, int(7 / posts_per_week)) # Distribute across week
current_date = date_range_start
theme_index = 0
while current_date <= date_range_end and theme_index < len(themes):
theme = themes[theme_index]
# Schedule post for this date
schedule.append({
"date": current_date.isoformat(),
"time": "10:00", # Default time, can be optimized
"theme": theme.get("title", ""),
"description": theme.get("description", ""),
"post_type": theme.get("post_type", "content_only"),
"product_category": theme.get("product_category", "ocr"),
"talking_points": theme.get("talking_points", []),
"status": "planned"
})
# Move to next date
current_date += timedelta(days=days_between_posts)
theme_index += 1
return schedule