File size: 10,571 Bytes
6d1e595
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import os
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from openai import OpenAI
from app.services.asset_analyzer import AssetAnalyzer

class AgenticPlanner:
    """Agentic AI service for planning and generating content campaigns"""
    
    def __init__(self):
        self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
        self.model = os.getenv("OPENAI_MODEL", "gpt-4o")
        self.asset_analyzer = AssetAnalyzer()
    
    async def plan_campaign(
        self,
        date_range_start: datetime,
        date_range_end: datetime,
        products: List[str],
        post_types: List[str],
        posts_per_week: int,
        assets: Optional[List[Dict[str, Any]]] = None
    ) -> Dict[str, Any]:
        """
        Agentic planning: Multi-step process to create a content campaign
        
        Steps:
        1. Analyze available assets and extract insights
        2. Plan content distribution across date range
        3. Select appropriate post types for each content piece
        4. Generate content themes and topics
        5. Optimize posting schedule
        """
        
        # Step 1: Analyze assets and extract insights
        asset_insights = await self._analyze_assets(assets or [])
        
        # Step 2: Calculate campaign parameters
        total_days = (date_range_end - date_range_start).days + 1
        total_weeks = max(1, total_days / 7)
        total_posts = int(posts_per_week * total_weeks)
        
        # Step 3: Generate content plan using AI
        content_plan = await self._generate_content_plan(
            products=products,
            post_types=post_types,
            total_posts=total_posts,
            date_range_start=date_range_start,
            date_range_end=date_range_end,
            asset_insights=asset_insights
        )
        
        # Step 4: Create detailed schedule
        schedule = self._create_schedule(
            content_plan=content_plan,
            date_range_start=date_range_start,
            date_range_end=date_range_end,
            posts_per_week=posts_per_week
        )
        
        return {
            "campaign_id": None,  # Will be set when saved to DB
            "generated_posts": len(schedule),
            "schedule": schedule,
            "asset_insights": asset_insights,
            "content_themes": content_plan.get("themes", [])
        }
    
    async def _analyze_assets(self, assets: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Analyze all assets and extract key insights"""
        insights_by_category = {}
        total_assets = len(assets)
        
        for asset in assets:
            category = asset.get("product_category", "ocr")
            if category not in insights_by_category:
                insights_by_category[category] = {
                    "count": 0,
                    "insights": [],
                    "assets": []
                }
            
            insights_by_category[category]["count"] += 1
            
            # Extract insights from analyzed content
            extracted_content = asset.get("extracted_content")
            if extracted_content:
                insight = self.asset_analyzer.extract_key_insights(extracted_content)
                if insight:
                    insights_by_category[category]["insights"].append(insight)
                    insights_by_category[category]["assets"].append({
                        "id": asset.get("id"),
                        "name": asset.get("name"),
                        "insight": insight
                    })
        
        return {
            "total_assets": total_assets,
            "by_category": insights_by_category,
            "summary": f"Analyzed {total_assets} assets across {len(insights_by_category)} product categories"
        }
    
    async def _generate_content_plan(
        self,
        products: List[str],
        post_types: List[str],
        total_posts: int,
        date_range_start: datetime,
        date_range_end: datetime,
        asset_insights: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Use AI to generate a content plan"""
        
        product_descriptions = {
            "ocr": "Intelligent Document Parsing (OCR) - AI-powered document processing and data extraction",
            "p2p": "Purchase To Pay (P2P) - End-to-end procurement and accounts payable automation",
            "o2c": "Order to Cash (O2C) - Complete order management and accounts receivable workflow"
        }
        
        post_type_descriptions = {
            "carousel": "Multi-slide carousel post with visual storytelling",
            "cover_content": "Post with cover image and engaging text content",
            "content_only": "Text-only post focused on valuable insights",
            "webinar": "Webinar invitation post to promote an upcoming event"
        }
        
        # Build asset context
        asset_context = ""
        if asset_insights.get("by_category"):
            asset_context = "\n\nAvailable Asset Insights:\n"
            for category, data in asset_insights["by_category"].items():
                asset_context += f"\n{product_descriptions.get(category, category)}:\n"
                asset_context += f"- {data['count']} assets available\n"
                if data.get("insights"):
                    asset_context += f"- Key insights: {len(data['insights'])} extracted\n"
        
        system_prompt = """You are an expert content strategist for B2B SaaS marketing on LinkedIn.
Your task is to create a comprehensive content plan that:
- Distributes content evenly across the date range
- Varies post types to maintain engagement
- Uses available assets and insights effectively
- Creates diverse, valuable content themes
- Follows LinkedIn best practices

Return a JSON structure with themes and recommended post types for each theme."""
        
        user_prompt = f"""Create a content plan for a LinkedIn campaign:

Products to focus on: {', '.join([product_descriptions.get(p, p) for p in products])}
Available post types: {', '.join([post_type_descriptions.get(pt, pt) for pt in post_types])}
Total posts needed: {total_posts}
Date range: {date_range_start.strftime('%Y-%m-%d')} to {date_range_end.strftime('%Y-%m-%d')}
{asset_context}

Generate {total_posts} content themes with:
- Theme title
- Brief description
- Recommended post type
- Product category
- Key talking points

Return as JSON with structure:
{{
  "themes": [
    {{
      "title": "Theme title",
      "description": "Brief description",
      "post_type": "carousel|cover_content|content_only|webinar",
      "product_category": "ocr|p2p|o2c",
      "talking_points": ["point1", "point2", "point3"]
    }}
  ]
}}"""
        
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=0.8,
                max_tokens=2000,
                response_format={"type": "json_object"}
            )
            
            import json
            content_plan = json.loads(response.choices[0].message.content)
            return content_plan
        except Exception as e:
            # Fallback: Generate basic themes
            return self._generate_fallback_themes(products, post_types, total_posts)
    
    def _generate_fallback_themes(
        self,
        products: List[str],
        post_types: List[str],
        total_posts: int
    ) -> Dict[str, Any]:
        """Generate basic themes if AI fails"""
        themes = []
        theme_templates = {
            "ocr": [
                "Document Automation Benefits",
                "OCR Technology Overview",
                "Efficiency Gains with Intelligent Parsing"
            ],
            "p2p": [
                "Streamline Procurement Process",
                "Accounts Payable Automation",
                "Purchase Request Workflow"
            ],
            "o2c": [
                "Order Management Best Practices",
                "Sales Order Processing",
                "Accounts Receivable Optimization"
            ]
        }
        
        posts_per_product = total_posts // len(products) if products else total_posts
        for product in products:
            for i in range(posts_per_product):
                theme_name = theme_templates.get(product, ["Product Feature"])[i % len(theme_templates.get(product, ["Feature"]))]
                themes.append({
                    "title": f"{theme_name} - Post {i+1}",
                    "description": f"Content about {product}",
                    "post_type": post_types[i % len(post_types)] if post_types else "content_only",
                    "product_category": product,
                    "talking_points": ["Key benefit 1", "Key benefit 2", "Use case"]
                })
        
        return {"themes": themes[:total_posts]}
    
    def _create_schedule(
        self,
        content_plan: Dict[str, Any],
        date_range_start: datetime,
        date_range_end: datetime,
        posts_per_week: int
    ) -> List[Dict[str, Any]]:
        """Create a detailed posting schedule"""
        themes = content_plan.get("themes", [])
        if not themes:
            return []
        
        schedule = []
        total_days = (date_range_end - date_range_start).days + 1
        days_between_posts = max(1, int(7 / posts_per_week))  # Distribute across week
        
        current_date = date_range_start
        theme_index = 0
        
        while current_date <= date_range_end and theme_index < len(themes):
            theme = themes[theme_index]
            
            # Schedule post for this date
            schedule.append({
                "date": current_date.isoformat(),
                "time": "10:00",  # Default time, can be optimized
                "theme": theme.get("title", ""),
                "description": theme.get("description", ""),
                "post_type": theme.get("post_type", "content_only"),
                "product_category": theme.get("product_category", "ocr"),
                "talking_points": theme.get("talking_points", []),
                "status": "planned"
            })
            
            # Move to next date
            current_date += timedelta(days=days_between_posts)
            theme_index += 1
        
        return schedule