Spaces:
Running
Running
File size: 10,571 Bytes
6d1e595 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
import os
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from openai import OpenAI
from app.services.asset_analyzer import AssetAnalyzer
class AgenticPlanner:
"""Agentic AI service for planning and generating content campaigns"""
def __init__(self):
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
self.model = os.getenv("OPENAI_MODEL", "gpt-4o")
self.asset_analyzer = AssetAnalyzer()
async def plan_campaign(
self,
date_range_start: datetime,
date_range_end: datetime,
products: List[str],
post_types: List[str],
posts_per_week: int,
assets: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""
Agentic planning: Multi-step process to create a content campaign
Steps:
1. Analyze available assets and extract insights
2. Plan content distribution across date range
3. Select appropriate post types for each content piece
4. Generate content themes and topics
5. Optimize posting schedule
"""
# Step 1: Analyze assets and extract insights
asset_insights = await self._analyze_assets(assets or [])
# Step 2: Calculate campaign parameters
total_days = (date_range_end - date_range_start).days + 1
total_weeks = max(1, total_days / 7)
total_posts = int(posts_per_week * total_weeks)
# Step 3: Generate content plan using AI
content_plan = await self._generate_content_plan(
products=products,
post_types=post_types,
total_posts=total_posts,
date_range_start=date_range_start,
date_range_end=date_range_end,
asset_insights=asset_insights
)
# Step 4: Create detailed schedule
schedule = self._create_schedule(
content_plan=content_plan,
date_range_start=date_range_start,
date_range_end=date_range_end,
posts_per_week=posts_per_week
)
return {
"campaign_id": None, # Will be set when saved to DB
"generated_posts": len(schedule),
"schedule": schedule,
"asset_insights": asset_insights,
"content_themes": content_plan.get("themes", [])
}
async def _analyze_assets(self, assets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze all assets and extract key insights"""
insights_by_category = {}
total_assets = len(assets)
for asset in assets:
category = asset.get("product_category", "ocr")
if category not in insights_by_category:
insights_by_category[category] = {
"count": 0,
"insights": [],
"assets": []
}
insights_by_category[category]["count"] += 1
# Extract insights from analyzed content
extracted_content = asset.get("extracted_content")
if extracted_content:
insight = self.asset_analyzer.extract_key_insights(extracted_content)
if insight:
insights_by_category[category]["insights"].append(insight)
insights_by_category[category]["assets"].append({
"id": asset.get("id"),
"name": asset.get("name"),
"insight": insight
})
return {
"total_assets": total_assets,
"by_category": insights_by_category,
"summary": f"Analyzed {total_assets} assets across {len(insights_by_category)} product categories"
}
async def _generate_content_plan(
self,
products: List[str],
post_types: List[str],
total_posts: int,
date_range_start: datetime,
date_range_end: datetime,
asset_insights: Dict[str, Any]
) -> Dict[str, Any]:
"""Use AI to generate a content plan"""
product_descriptions = {
"ocr": "Intelligent Document Parsing (OCR) - AI-powered document processing and data extraction",
"p2p": "Purchase To Pay (P2P) - End-to-end procurement and accounts payable automation",
"o2c": "Order to Cash (O2C) - Complete order management and accounts receivable workflow"
}
post_type_descriptions = {
"carousel": "Multi-slide carousel post with visual storytelling",
"cover_content": "Post with cover image and engaging text content",
"content_only": "Text-only post focused on valuable insights",
"webinar": "Webinar invitation post to promote an upcoming event"
}
# Build asset context
asset_context = ""
if asset_insights.get("by_category"):
asset_context = "\n\nAvailable Asset Insights:\n"
for category, data in asset_insights["by_category"].items():
asset_context += f"\n{product_descriptions.get(category, category)}:\n"
asset_context += f"- {data['count']} assets available\n"
if data.get("insights"):
asset_context += f"- Key insights: {len(data['insights'])} extracted\n"
system_prompt = """You are an expert content strategist for B2B SaaS marketing on LinkedIn.
Your task is to create a comprehensive content plan that:
- Distributes content evenly across the date range
- Varies post types to maintain engagement
- Uses available assets and insights effectively
- Creates diverse, valuable content themes
- Follows LinkedIn best practices
Return a JSON structure with themes and recommended post types for each theme."""
user_prompt = f"""Create a content plan for a LinkedIn campaign:
Products to focus on: {', '.join([product_descriptions.get(p, p) for p in products])}
Available post types: {', '.join([post_type_descriptions.get(pt, pt) for pt in post_types])}
Total posts needed: {total_posts}
Date range: {date_range_start.strftime('%Y-%m-%d')} to {date_range_end.strftime('%Y-%m-%d')}
{asset_context}
Generate {total_posts} content themes with:
- Theme title
- Brief description
- Recommended post type
- Product category
- Key talking points
Return as JSON with structure:
{{
"themes": [
{{
"title": "Theme title",
"description": "Brief description",
"post_type": "carousel|cover_content|content_only|webinar",
"product_category": "ocr|p2p|o2c",
"talking_points": ["point1", "point2", "point3"]
}}
]
}}"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.8,
max_tokens=2000,
response_format={"type": "json_object"}
)
import json
content_plan = json.loads(response.choices[0].message.content)
return content_plan
except Exception as e:
# Fallback: Generate basic themes
return self._generate_fallback_themes(products, post_types, total_posts)
def _generate_fallback_themes(
self,
products: List[str],
post_types: List[str],
total_posts: int
) -> Dict[str, Any]:
"""Generate basic themes if AI fails"""
themes = []
theme_templates = {
"ocr": [
"Document Automation Benefits",
"OCR Technology Overview",
"Efficiency Gains with Intelligent Parsing"
],
"p2p": [
"Streamline Procurement Process",
"Accounts Payable Automation",
"Purchase Request Workflow"
],
"o2c": [
"Order Management Best Practices",
"Sales Order Processing",
"Accounts Receivable Optimization"
]
}
posts_per_product = total_posts // len(products) if products else total_posts
for product in products:
for i in range(posts_per_product):
theme_name = theme_templates.get(product, ["Product Feature"])[i % len(theme_templates.get(product, ["Feature"]))]
themes.append({
"title": f"{theme_name} - Post {i+1}",
"description": f"Content about {product}",
"post_type": post_types[i % len(post_types)] if post_types else "content_only",
"product_category": product,
"talking_points": ["Key benefit 1", "Key benefit 2", "Use case"]
})
return {"themes": themes[:total_posts]}
def _create_schedule(
self,
content_plan: Dict[str, Any],
date_range_start: datetime,
date_range_end: datetime,
posts_per_week: int
) -> List[Dict[str, Any]]:
"""Create a detailed posting schedule"""
themes = content_plan.get("themes", [])
if not themes:
return []
schedule = []
total_days = (date_range_end - date_range_start).days + 1
days_between_posts = max(1, int(7 / posts_per_week)) # Distribute across week
current_date = date_range_start
theme_index = 0
while current_date <= date_range_end and theme_index < len(themes):
theme = themes[theme_index]
# Schedule post for this date
schedule.append({
"date": current_date.isoformat(),
"time": "10:00", # Default time, can be optimized
"theme": theme.get("title", ""),
"description": theme.get("description", ""),
"post_type": theme.get("post_type", "content_only"),
"product_category": theme.get("product_category", "ocr"),
"talking_points": theme.get("talking_points", []),
"status": "planned"
})
# Move to next date
current_date += timedelta(days=days_between_posts)
theme_index += 1
return schedule
|