Spaces:
Running
Running
| import os | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime, timedelta | |
| from openai import OpenAI | |
| from app.services.asset_analyzer import AssetAnalyzer | |
| class AgenticPlanner: | |
| """Agentic AI service for planning and generating content campaigns""" | |
| def __init__(self): | |
| self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", "")) | |
| self.model = os.getenv("OPENAI_MODEL", "gpt-4o") | |
| self.asset_analyzer = AssetAnalyzer() | |
| async def plan_campaign( | |
| self, | |
| date_range_start: datetime, | |
| date_range_end: datetime, | |
| products: List[str], | |
| post_types: List[str], | |
| posts_per_week: int, | |
| assets: Optional[List[Dict[str, Any]]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Agentic planning: Multi-step process to create a content campaign | |
| Steps: | |
| 1. Analyze available assets and extract insights | |
| 2. Plan content distribution across date range | |
| 3. Select appropriate post types for each content piece | |
| 4. Generate content themes and topics | |
| 5. Optimize posting schedule | |
| """ | |
| # Step 1: Analyze assets and extract insights | |
| asset_insights = await self._analyze_assets(assets or []) | |
| # Step 2: Calculate campaign parameters | |
| total_days = (date_range_end - date_range_start).days + 1 | |
| total_weeks = max(1, total_days / 7) | |
| total_posts = int(posts_per_week * total_weeks) | |
| # Step 3: Generate content plan using AI | |
| content_plan = await self._generate_content_plan( | |
| products=products, | |
| post_types=post_types, | |
| total_posts=total_posts, | |
| date_range_start=date_range_start, | |
| date_range_end=date_range_end, | |
| asset_insights=asset_insights | |
| ) | |
| # Step 4: Create detailed schedule | |
| schedule = self._create_schedule( | |
| content_plan=content_plan, | |
| date_range_start=date_range_start, | |
| date_range_end=date_range_end, | |
| posts_per_week=posts_per_week | |
| ) | |
| return { | |
| "campaign_id": None, # Will be set when saved to DB | |
| "generated_posts": len(schedule), | |
| "schedule": schedule, | |
| "asset_insights": asset_insights, | |
| "content_themes": content_plan.get("themes", []) | |
| } | |
| async def _analyze_assets(self, assets: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze all assets and extract key insights""" | |
| insights_by_category = {} | |
| total_assets = len(assets) | |
| for asset in assets: | |
| category = asset.get("product_category", "ocr") | |
| if category not in insights_by_category: | |
| insights_by_category[category] = { | |
| "count": 0, | |
| "insights": [], | |
| "assets": [] | |
| } | |
| insights_by_category[category]["count"] += 1 | |
| # Extract insights from analyzed content | |
| extracted_content = asset.get("extracted_content") | |
| if extracted_content: | |
| insight = self.asset_analyzer.extract_key_insights(extracted_content) | |
| if insight: | |
| insights_by_category[category]["insights"].append(insight) | |
| insights_by_category[category]["assets"].append({ | |
| "id": asset.get("id"), | |
| "name": asset.get("name"), | |
| "insight": insight | |
| }) | |
| return { | |
| "total_assets": total_assets, | |
| "by_category": insights_by_category, | |
| "summary": f"Analyzed {total_assets} assets across {len(insights_by_category)} product categories" | |
| } | |
| async def _generate_content_plan( | |
| self, | |
| products: List[str], | |
| post_types: List[str], | |
| total_posts: int, | |
| date_range_start: datetime, | |
| date_range_end: datetime, | |
| asset_insights: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Use AI to generate a content plan""" | |
| product_descriptions = { | |
| "ocr": "Intelligent Document Parsing (OCR) - AI-powered document processing and data extraction", | |
| "p2p": "Purchase To Pay (P2P) - End-to-end procurement and accounts payable automation", | |
| "o2c": "Order to Cash (O2C) - Complete order management and accounts receivable workflow" | |
| } | |
| post_type_descriptions = { | |
| "carousel": "Multi-slide carousel post with visual storytelling", | |
| "cover_content": "Post with cover image and engaging text content", | |
| "content_only": "Text-only post focused on valuable insights", | |
| "webinar": "Webinar invitation post to promote an upcoming event" | |
| } | |
| # Build asset context | |
| asset_context = "" | |
| if asset_insights.get("by_category"): | |
| asset_context = "\n\nAvailable Asset Insights:\n" | |
| for category, data in asset_insights["by_category"].items(): | |
| asset_context += f"\n{product_descriptions.get(category, category)}:\n" | |
| asset_context += f"- {data['count']} assets available\n" | |
| if data.get("insights"): | |
| asset_context += f"- Key insights: {len(data['insights'])} extracted\n" | |
| system_prompt = """You are an expert content strategist for B2B SaaS marketing on LinkedIn. | |
| Your task is to create a comprehensive content plan that: | |
| - Distributes content evenly across the date range | |
| - Varies post types to maintain engagement | |
| - Uses available assets and insights effectively | |
| - Creates diverse, valuable content themes | |
| - Follows LinkedIn best practices | |
| Return a JSON structure with themes and recommended post types for each theme.""" | |
| user_prompt = f"""Create a content plan for a LinkedIn campaign: | |
| Products to focus on: {', '.join([product_descriptions.get(p, p) for p in products])} | |
| Available post types: {', '.join([post_type_descriptions.get(pt, pt) for pt in post_types])} | |
| Total posts needed: {total_posts} | |
| Date range: {date_range_start.strftime('%Y-%m-%d')} to {date_range_end.strftime('%Y-%m-%d')} | |
| {asset_context} | |
| Generate {total_posts} content themes with: | |
| - Theme title | |
| - Brief description | |
| - Recommended post type | |
| - Product category | |
| - Key talking points | |
| Return as JSON with structure: | |
| {{ | |
| "themes": [ | |
| {{ | |
| "title": "Theme title", | |
| "description": "Brief description", | |
| "post_type": "carousel|cover_content|content_only|webinar", | |
| "product_category": "ocr|p2p|o2c", | |
| "talking_points": ["point1", "point2", "point3"] | |
| }} | |
| ] | |
| }}""" | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| temperature=0.8, | |
| max_tokens=2000, | |
| response_format={"type": "json_object"} | |
| ) | |
| import json | |
| content_plan = json.loads(response.choices[0].message.content) | |
| return content_plan | |
| except Exception as e: | |
| # Fallback: Generate basic themes | |
| return self._generate_fallback_themes(products, post_types, total_posts) | |
| def _generate_fallback_themes( | |
| self, | |
| products: List[str], | |
| post_types: List[str], | |
| total_posts: int | |
| ) -> Dict[str, Any]: | |
| """Generate basic themes if AI fails""" | |
| themes = [] | |
| theme_templates = { | |
| "ocr": [ | |
| "Document Automation Benefits", | |
| "OCR Technology Overview", | |
| "Efficiency Gains with Intelligent Parsing" | |
| ], | |
| "p2p": [ | |
| "Streamline Procurement Process", | |
| "Accounts Payable Automation", | |
| "Purchase Request Workflow" | |
| ], | |
| "o2c": [ | |
| "Order Management Best Practices", | |
| "Sales Order Processing", | |
| "Accounts Receivable Optimization" | |
| ] | |
| } | |
| posts_per_product = total_posts // len(products) if products else total_posts | |
| for product in products: | |
| for i in range(posts_per_product): | |
| theme_name = theme_templates.get(product, ["Product Feature"])[i % len(theme_templates.get(product, ["Feature"]))] | |
| themes.append({ | |
| "title": f"{theme_name} - Post {i+1}", | |
| "description": f"Content about {product}", | |
| "post_type": post_types[i % len(post_types)] if post_types else "content_only", | |
| "product_category": product, | |
| "talking_points": ["Key benefit 1", "Key benefit 2", "Use case"] | |
| }) | |
| return {"themes": themes[:total_posts]} | |
| def _create_schedule( | |
| self, | |
| content_plan: Dict[str, Any], | |
| date_range_start: datetime, | |
| date_range_end: datetime, | |
| posts_per_week: int | |
| ) -> List[Dict[str, Any]]: | |
| """Create a detailed posting schedule""" | |
| themes = content_plan.get("themes", []) | |
| if not themes: | |
| return [] | |
| schedule = [] | |
| total_days = (date_range_end - date_range_start).days + 1 | |
| days_between_posts = max(1, int(7 / posts_per_week)) # Distribute across week | |
| current_date = date_range_start | |
| theme_index = 0 | |
| while current_date <= date_range_end and theme_index < len(themes): | |
| theme = themes[theme_index] | |
| # Schedule post for this date | |
| schedule.append({ | |
| "date": current_date.isoformat(), | |
| "time": "10:00", # Default time, can be optimized | |
| "theme": theme.get("title", ""), | |
| "description": theme.get("description", ""), | |
| "post_type": theme.get("post_type", "content_only"), | |
| "product_category": theme.get("product_category", "ocr"), | |
| "talking_points": theme.get("talking_points", []), | |
| "status": "planned" | |
| }) | |
| # Move to next date | |
| current_date += timedelta(days=days_between_posts) | |
| theme_index += 1 | |
| return schedule | |