| import os |
| from typing import List, Dict, Any, Optional |
| from datetime import datetime, timedelta |
| from openai import OpenAI |
| from app.services.asset_analyzer import AssetAnalyzer |
|
|
| class AgenticPlanner: |
| """Agentic AI service for planning and generating content campaigns""" |
| |
| def __init__(self): |
| self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", "")) |
| self.model = os.getenv("OPENAI_MODEL", "gpt-4o") |
| self.asset_analyzer = AssetAnalyzer() |
| |
| async def plan_campaign( |
| self, |
| date_range_start: datetime, |
| date_range_end: datetime, |
| products: List[str], |
| post_types: List[str], |
| posts_per_week: int, |
| assets: Optional[List[Dict[str, Any]]] = None |
| ) -> Dict[str, Any]: |
| """ |
| Agentic planning: Multi-step process to create a content campaign |
| |
| Steps: |
| 1. Analyze available assets and extract insights |
| 2. Plan content distribution across date range |
| 3. Select appropriate post types for each content piece |
| 4. Generate content themes and topics |
| 5. Optimize posting schedule |
| """ |
| |
| |
| asset_insights = await self._analyze_assets(assets or []) |
| |
| |
| total_days = (date_range_end - date_range_start).days + 1 |
| total_weeks = max(1, total_days / 7) |
| total_posts = int(posts_per_week * total_weeks) |
| |
| |
| content_plan = await self._generate_content_plan( |
| products=products, |
| post_types=post_types, |
| total_posts=total_posts, |
| date_range_start=date_range_start, |
| date_range_end=date_range_end, |
| asset_insights=asset_insights |
| ) |
| |
| |
| schedule = self._create_schedule( |
| content_plan=content_plan, |
| date_range_start=date_range_start, |
| date_range_end=date_range_end, |
| posts_per_week=posts_per_week |
| ) |
| |
| return { |
| "campaign_id": None, |
| "generated_posts": len(schedule), |
| "schedule": schedule, |
| "asset_insights": asset_insights, |
| "content_themes": content_plan.get("themes", []) |
| } |
| |
| async def _analyze_assets(self, assets: List[Dict[str, Any]]) -> Dict[str, Any]: |
| """Analyze all assets and extract key insights""" |
| insights_by_category = {} |
| total_assets = len(assets) |
| |
| for asset in assets: |
| category = asset.get("product_category", "ocr") |
| if category not in insights_by_category: |
| insights_by_category[category] = { |
| "count": 0, |
| "insights": [], |
| "assets": [] |
| } |
| |
| insights_by_category[category]["count"] += 1 |
| |
| |
| extracted_content = asset.get("extracted_content") |
| if extracted_content: |
| insight = self.asset_analyzer.extract_key_insights(extracted_content) |
| if insight: |
| insights_by_category[category]["insights"].append(insight) |
| insights_by_category[category]["assets"].append({ |
| "id": asset.get("id"), |
| "name": asset.get("name"), |
| "insight": insight |
| }) |
| |
| return { |
| "total_assets": total_assets, |
| "by_category": insights_by_category, |
| "summary": f"Analyzed {total_assets} assets across {len(insights_by_category)} product categories" |
| } |
| |
| async def _generate_content_plan( |
| self, |
| products: List[str], |
| post_types: List[str], |
| total_posts: int, |
| date_range_start: datetime, |
| date_range_end: datetime, |
| asset_insights: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """Use AI to generate a content plan""" |
| |
| product_descriptions = { |
| "ocr": "Intelligent Document Parsing (OCR) - AI-powered document processing and data extraction", |
| "p2p": "Purchase To Pay (P2P) - End-to-end procurement and accounts payable automation", |
| "o2c": "Order to Cash (O2C) - Complete order management and accounts receivable workflow" |
| } |
| |
| post_type_descriptions = { |
| "carousel": "Multi-slide carousel post with visual storytelling", |
| "cover_content": "Post with cover image and engaging text content", |
| "content_only": "Text-only post focused on valuable insights", |
| "webinar": "Webinar invitation post to promote an upcoming event" |
| } |
| |
| |
| asset_context = "" |
| if asset_insights.get("by_category"): |
| asset_context = "\n\nAvailable Asset Insights:\n" |
| for category, data in asset_insights["by_category"].items(): |
| asset_context += f"\n{product_descriptions.get(category, category)}:\n" |
| asset_context += f"- {data['count']} assets available\n" |
| if data.get("insights"): |
| asset_context += f"- Key insights: {len(data['insights'])} extracted\n" |
| |
| system_prompt = """You are an expert content strategist for B2B SaaS marketing on LinkedIn. |
| Your task is to create a comprehensive content plan that: |
| - Distributes content evenly across the date range |
| - Varies post types to maintain engagement |
| - Uses available assets and insights effectively |
| - Creates diverse, valuable content themes |
| - Follows LinkedIn best practices |
| |
| Return a JSON structure with themes and recommended post types for each theme.""" |
| |
| user_prompt = f"""Create a content plan for a LinkedIn campaign: |
| |
| Products to focus on: {', '.join([product_descriptions.get(p, p) for p in products])} |
| Available post types: {', '.join([post_type_descriptions.get(pt, pt) for pt in post_types])} |
| Total posts needed: {total_posts} |
| Date range: {date_range_start.strftime('%Y-%m-%d')} to {date_range_end.strftime('%Y-%m-%d')} |
| {asset_context} |
| |
| Generate {total_posts} content themes with: |
| - Theme title |
| - Brief description |
| - Recommended post type |
| - Product category |
| - Key talking points |
| |
| Return as JSON with structure: |
| {{ |
| "themes": [ |
| {{ |
| "title": "Theme title", |
| "description": "Brief description", |
| "post_type": "carousel|cover_content|content_only|webinar", |
| "product_category": "ocr|p2p|o2c", |
| "talking_points": ["point1", "point2", "point3"] |
| }} |
| ] |
| }}""" |
| |
| try: |
| response = self.client.chat.completions.create( |
| model=self.model, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt} |
| ], |
| temperature=0.8, |
| max_tokens=2000, |
| response_format={"type": "json_object"} |
| ) |
| |
| import json |
| content_plan = json.loads(response.choices[0].message.content) |
| return content_plan |
| except Exception as e: |
| |
| return self._generate_fallback_themes(products, post_types, total_posts) |
| |
| def _generate_fallback_themes( |
| self, |
| products: List[str], |
| post_types: List[str], |
| total_posts: int |
| ) -> Dict[str, Any]: |
| """Generate basic themes if AI fails""" |
| themes = [] |
| theme_templates = { |
| "ocr": [ |
| "Document Automation Benefits", |
| "OCR Technology Overview", |
| "Efficiency Gains with Intelligent Parsing" |
| ], |
| "p2p": [ |
| "Streamline Procurement Process", |
| "Accounts Payable Automation", |
| "Purchase Request Workflow" |
| ], |
| "o2c": [ |
| "Order Management Best Practices", |
| "Sales Order Processing", |
| "Accounts Receivable Optimization" |
| ] |
| } |
| |
| posts_per_product = total_posts // len(products) if products else total_posts |
| for product in products: |
| for i in range(posts_per_product): |
| theme_name = theme_templates.get(product, ["Product Feature"])[i % len(theme_templates.get(product, ["Feature"]))] |
| themes.append({ |
| "title": f"{theme_name} - Post {i+1}", |
| "description": f"Content about {product}", |
| "post_type": post_types[i % len(post_types)] if post_types else "content_only", |
| "product_category": product, |
| "talking_points": ["Key benefit 1", "Key benefit 2", "Use case"] |
| }) |
| |
| return {"themes": themes[:total_posts]} |
| |
| def _create_schedule( |
| self, |
| content_plan: Dict[str, Any], |
| date_range_start: datetime, |
| date_range_end: datetime, |
| posts_per_week: int |
| ) -> List[Dict[str, Any]]: |
| """Create a detailed posting schedule""" |
| themes = content_plan.get("themes", []) |
| if not themes: |
| return [] |
| |
| schedule = [] |
| total_days = (date_range_end - date_range_start).days + 1 |
| days_between_posts = max(1, int(7 / posts_per_week)) |
| |
| current_date = date_range_start |
| theme_index = 0 |
| |
| while current_date <= date_range_end and theme_index < len(themes): |
| theme = themes[theme_index] |
| |
| |
| schedule.append({ |
| "date": current_date.isoformat(), |
| "time": "10:00", |
| "theme": theme.get("title", ""), |
| "description": theme.get("description", ""), |
| "post_type": theme.get("post_type", "content_only"), |
| "product_category": theme.get("product_category", "ocr"), |
| "talking_points": theme.get("talking_points", []), |
| "status": "planned" |
| }) |
| |
| |
| current_date += timedelta(days=days_between_posts) |
| theme_index += 1 |
| |
| return schedule |
|
|
|
|