File size: 13,962 Bytes
685d968
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
"""
World-Class Diverse Dataset Generation - 20 Concurrent API Calls per Batch

Key features:
- 20 API calls simultaneously per batch
- Wait for batch to complete, then next batch
- Temperature 0.95 for maximum diversity
- No templates, maximum creative freedom
"""

import json
import random
import os
import asyncio
from typing import List, Dict, Optional
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import cohere
from dotenv import load_dotenv

load_dotenv()

CATEGORY_TARGETS = {
    "company.brand_core": 77,
    "company.strategic_signatures": 77,
    "company.knowledge_artifacts": 77,
    "company.business_priorities": 77,
    "company.tools_config": 77,
    "company.performance_context": 77,
    "user.communication_style": 77,
    "user.strategic_approach": 77,
    "user.role_context": 77,
    "user.workflow_patterns": 77,
    "user.session_history": 77,
    "user.interaction_preferences": 77,
    "none": 77,
}

INDUSTRIES = [
    "Series A fintech building a neobank", "hospital network digitizing patient intake",
    "DTC sneaker brand scaling to retail", "industrial valve manufacturer going digital",
    "K-12 tutoring platform expanding to Asia", "commercial real estate analytics startup",
    "ghost kitchen aggregator in NYC", "enterprise zero-trust security vendor",
    "luxury cruise line post-pandemic", "connected fitness hardware company",
    "immigration law firm automating visas", "recruiting platform for nurses",
    "pet insurance disruptor", "last-mile drone delivery startup",
    "indie game studio with a viral hit", "podcast network monetizing premium content",
    "EV charging network operator", "solar panel installer franchise",
    "modular home construction startup", "veterinary telehealth platform",
    "wine subscription service", "corporate wellness SaaS",
    "NFT marketplace pivoting to digital art", "AI code review tool for enterprises",
    "climate risk analytics for insurers", "restaurant POS system provider",
    "online therapy platform", "B2B payments infrastructure",
    "influencer marketing agency", "smart home security company"
]

PERSONAS = [
    "a stressed CMO preparing for board review",
    "a junior marketing coordinator on their first campaign",
    "a VP who just joined from a competitor",
    "a founder wearing multiple hats",
    "a seasoned brand director with 20 years experience",
    "a growth lead obsessed with metrics",
    "a creative director frustrated with process",
    "a demand gen manager under pressure to hit pipeline",
    "a content strategist building a new team",
    "a marketing ops person drowning in tools",
    "a product marketer launching next week",
    "an email specialist optimizing deliverability",
    "a social media manager handling a PR crisis",
    "a field marketer planning regional events",
    "a partner marketing lead negotiating co-marketing",
    "an analyst presenting attribution findings"
]

SITUATIONS = [
    "in the middle of a heated planning session",
    "wrapping up a long day before vacation",
    "preparing for a last-minute executive ask",
    "debugging why a campaign tanked",
    "celebrating a successful launch",
    "onboarding after joining last week",
    "dealing with budget cuts",
    "scaling something that unexpectedly worked",
    "cleaning up a predecessor's mess",
    "trying to align with a difficult stakeholder"
]

TONES = ["urgent", "casual", "frustrated", "excited", "methodical", "skeptical", "collaborative", "directive"]

CATEGORY_HINTS = {
    "company.brand_core": "The conversation should naturally surface brand identity elements - could be voice, visuals, values, positioning, or personality.",
    "company.strategic_signatures": "The conversation should reveal how this company makes decisions - their frameworks, principles, or recurring patterns.",
    "company.knowledge_artifacts": "The conversation should reference internal documentation - guides, playbooks, templates, or SOPs.",
    "company.business_priorities": "The conversation should touch on current goals, quarterly targets, or active initiatives.",
    "company.tools_config": "The conversation should involve tool setup, integrations, APIs, or workflow automation.",
    "company.performance_context": "The conversation should discuss metrics, campaign results, or performance learnings.",
    "user.communication_style": "The user should express how they prefer to receive information - format, length, tone, or style.",
    "user.strategic_approach": "The user should reveal their personal philosophy, priorities, or decision-making style.",
    "user.role_context": "The user should mention their role, responsibilities, authority, or team structure.",
    "user.workflow_patterns": "The user should describe their schedule, review process, or collaboration preferences.",
    "user.session_history": "The conversation should reference recent context, ongoing work, or previous discussions.",
    "user.interaction_preferences": "The user should express how they want the AI to behave - proactivity, feedback style, or coaching level.",
    "none": "The conversation should be purely transactional with nothing worth remembering long-term."
}


class ConcurrentGenerator:
    def __init__(self):
        self.api_key = os.getenv("COHERE_API_KEY")
        if not self.api_key:
            raise ValueError("COHERE_API_KEY not found")
        self.client = cohere.ClientV2(api_key=self.api_key)
        self.model = "command-r-plus-08-2024"
        self.executor = ThreadPoolExecutor(max_workers=20)
    
    def _extract_text(self, response) -> Optional[str]:
        if not response or not getattr(response, "message", None):
            return None
        blocks = getattr(response.message, "content", []) or []
        for block in blocks:
            text = getattr(block, "text", None)
            if isinstance(text, str) and text.strip():
                return text
        return None
    
    def _generate_one(self, category: str) -> Optional[Dict]:
        """Generate a single example with maximum creativity."""
        
        industry = random.choice(INDUSTRIES)
        persona = random.choice(PERSONAS)
        situation = random.choice(SITUATIONS)
        tone = random.choice(TONES)
        turns = random.randint(3, 10)
        hint = CATEGORY_HINTS.get(category, "")
        
        if category == "none":
            prompt = f"""You are a creative writer generating training data for an AI memory system.

Create a completely realistic conversation between {persona} at a {industry} and their AI marketing assistant.

Context: They are {situation}. The tone is {tone}.

THIS CONVERSATION MUST BE FORGETTABLE - nothing worth storing in long-term memory:
- Quick status checks, scheduling, or confirmations
- Vague questions without actionable details
- Chitchat or temporary context that expires immediately

Be creative. Make it feel real. No templates. Surprise me.

Output as JSON with this structure:
{{"scenario_id": "unique_id", "conversation": [{{"role": "user", "content": "..."}}, {{"role": "assistant", "content": "..."}}], "labels": {{"categories": ["none"], "persistence_horizon": "short", "memory_scope": "none", "rationale": "why this is unmemorable"}}, "metadata": {{"primary_category": "none", "turn_count": {turns}, "industry": "{industry}"}}}}"""

        else:
            prompt = f"""You are a world-class creative writer generating training data for an AI memory routing system.

Create a completely unique, realistic conversation between {persona} at a {industry} and their AI marketing assistant.

Context: They are {situation}. The tone is {tone}.

CATEGORY TO DEMONSTRATE: {category}
{hint}

CREATIVE FREEDOM:
- Invent specific, realistic details (names, numbers, dates, products)
- The conversation can start anywhere - mid-thought, mid-project, mid-crisis
- Vary structure dramatically - could be rapid-fire, could be detailed
- Include natural speech patterns, interruptions, tangents
- Make it feel like eavesdropping on a real conversation
- {turns} turns, but quality over quantity

The ONLY hard requirement: the conversation must clearly demonstrate {category}.

Output as JSON:
{{"scenario_id": "unique_id", "conversation": [{{"role": "user", "content": "..."}}, {{"role": "assistant", "content": "..."}}], "labels": {{"categories": ["{category}"], "persistence_horizon": "long/medium/short", "memory_scope": "{category.split('.')[0]}", "rationale": "why this fits {category}"}}, "metadata": {{"primary_category": "{category}", "turn_count": {turns}, "industry": "{industry}"}}}}"""

        try:
            response = self.client.chat(
                messages=[{"role": "user", "content": prompt}],
                temperature=0.95,
                model=self.model,
                response_format={"type": "json_object"}
            )
            
            content = self._extract_text(response)
            if not content:
                return None
            
            if content.startswith("```"):
                content = content.split("\n", 1)[1] if "\n" in content else content[3:]
            if content.endswith("```"):
                content = content[:-3]
            
            data = json.loads(content.strip())
            
            categories = data.get("labels", {}).get("categories", [])
            if category.lower() not in [c.lower() for c in categories]:
                return None
            
            if len(categories) > 1 and "none" in [c.lower() for c in categories]:
                data["labels"]["categories"] = [c for c in categories if c.lower() != "none"]
            
            return data
            
        except Exception as e:
            return None
    
    async def generate_batch_concurrent(self, categories: List[str]) -> List[Dict]:
        """Generate 20 items concurrently."""
        loop = asyncio.get_event_loop()
        tasks = [
            loop.run_in_executor(self.executor, self._generate_one, cat)
            for cat in categories
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if isinstance(r, dict)]


async def run_generation():
    generator = ConcurrentGenerator()
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = f"synthetic_data/diverse_dataset_{timestamp}.jsonl"
    
    category_counts = {cat: 0 for cat in CATEGORY_TARGETS}
    all_data = []
    
    print("=" * 70, flush=True)
    print("WORLD-CLASS DATASET GENERATION (20 Concurrent)", flush=True)
    print("=" * 70, flush=True)
    print(f"Batch size: 20 concurrent API calls", flush=True)
    print(f"Temperature: 0.95", flush=True)
    print(f"Target: 77 per category x 13 = 1001 total", flush=True)
    print(f"Output: {output_file}", flush=True)
    print("=" * 70, flush=True)
    
    batch_num = 0
    start_time = datetime.now()
    
    while True:
        # Build list of needed categories
        needed = []
        for cat, target in CATEGORY_TARGETS.items():
            remaining = target - category_counts[cat]
            if remaining > 0:
                needed.extend([cat] * min(remaining, 3))  # Up to 3 per category per batch
        
        if not needed:
            break
        
        random.shuffle(needed)
        batch_categories = needed[:20]  # 20 concurrent
        batch_num += 1
        
        print(f"\n[Batch {batch_num}] Launching 20 concurrent requests...", flush=True)
        batch_start = datetime.now()
        
        results = await generator.generate_batch_concurrent(batch_categories)
        
        batch_time = (datetime.now() - batch_start).seconds
        
        for result in results:
            if result:
                primary = result.get("metadata", {}).get("primary_category") or \
                         result.get("labels", {}).get("categories", ["unknown"])[0]
                
                if primary in category_counts:
                    category_counts[primary] += 1
                    all_data.append(result)
                    
                    with open(output_file, "a") as f:
                        f.write(json.dumps(result) + "\n")
                    
                    conv = result.get("conversation", [])
                    if conv and len(conv) > 0:
                        first_msg = conv[0].get("content", "") if isinstance(conv[0], dict) else str(conv[0])
                        print(f"  [{primary}] {first_msg[:60]}...", flush=True)
        
        total_done = sum(category_counts.values())
        total_target = sum(CATEGORY_TARGETS.values())
        elapsed = (datetime.now() - start_time).seconds
        rate = total_done / max(elapsed, 1) * 60
        eta = (total_target - total_done) / max(rate, 0.1)
        
        print(f"  Batch: {len(results)}/20 success in {batch_time}s | Total: {total_done}/{total_target} | Rate: {rate:.1f}/min | ETA: {eta:.0f}min", flush=True)
        
        # Progress every 10 batches
        if batch_num % 10 == 0:
            print("\n  === Category Breakdown ===", flush=True)
            for cat in sorted(category_counts.keys()):
                count = category_counts[cat]
                target = CATEGORY_TARGETS[cat]
                bar = "█" * (count * 20 // target) + "░" * (20 - count * 20 // target)
                print(f"    {cat:<35} [{bar}] {count:>3}/{target}", flush=True)
            print()
        
        # Wait 3 seconds between batches
        await asyncio.sleep(3)
    
    print("\n" + "=" * 70, flush=True)
    print("GENERATION COMPLETE", flush=True)
    print("=" * 70, flush=True)
    elapsed_total = (datetime.now() - start_time).seconds / 60
    print(f"Total: {len(all_data)} examples in {elapsed_total:.1f} minutes", flush=True)
    print(f"Output: {output_file}", flush=True)
    
    return output_file


if __name__ == "__main__":
    asyncio.run(run_generation())