|
|
import os |
|
|
import json |
|
|
import random |
|
|
import asyncio |
|
|
import time |
|
|
import cohere |
|
|
from typing import List, Dict, Any, Optional |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
DOMAIN_CONTEXTS = [ |
|
|
"B2B SaaS workflow automation for enterprise teams", |
|
|
"Consumer fintech budgeting assistant rolling out in LATAM", |
|
|
"Healthcare patient engagement platform coordinating compliance content", |
|
|
"Retail omnichannel loyalty program for a fashion brand", |
|
|
"EdTech company designing AI tutoring playbooks", |
|
|
"Hospitality chain redefining guest personalization across regions", |
|
|
"Developer tools startup improving product-led growth motions", |
|
|
"Sports media network negotiating sponsorship activations", |
|
|
"Gaming studio planning live-ops launches", |
|
|
"Non-profit fundraising platform balancing donor messaging", |
|
|
"Enterprise cybersecurity firm running incident response playbooks", |
|
|
"Supply-chain analytics platform optimizing vendor collaboration", |
|
|
"CPG beverage brand planning seasonal launches with agencies", |
|
|
"Real-estate marketplace coordinating broker enablement", |
|
|
"Mobility/ride-hailing service planning driver communications", |
|
|
"Streaming media company managing international content drops", |
|
|
"Insurance carrier modernizing agent training workflows", |
|
|
"Energy provider coordinating demand-response campaigns", |
|
|
"Professional services firm standardizing proposal playbooks", |
|
|
"AI infrastructure startup refining go-to-market with partners", |
|
|
"Luxury beauty brand orchestrating influencer activations", |
|
|
"Food delivery platform improving courier retention messaging", |
|
|
"Corporate learning company updating compliance curricula", |
|
|
"Outdoor gear company rolling out omnichannel retail pilots" |
|
|
] |
|
|
|
|
|
class SyntheticDataPipeline: |
|
|
def __init__(self, api_key: Optional[str] = None, max_retries: int = 5): |
|
|
self.api_key = api_key or os.getenv("COHERE_API_KEY") |
|
|
if not self.api_key: |
|
|
raise ValueError("COHERE_API_KEY not found in environment variables") |
|
|
self.client = cohere.ClientV2(api_key=self.api_key) |
|
|
|
|
|
self.model = "command-r-plus-08-2024" |
|
|
self.max_retries = max_retries |
|
|
|
|
|
def _sample_domain_context(self) -> str: |
|
|
return random.choice(DOMAIN_CONTEXTS) |
|
|
|
|
|
@staticmethod |
|
|
def _extract_text(response) -> Optional[str]: |
|
|
"""Extract the first text block from a Cohere response.""" |
|
|
if not response or not getattr(response, "message", None): |
|
|
return None |
|
|
blocks = getattr(response.message, "content", []) or [] |
|
|
for block in blocks: |
|
|
text = getattr(block, "text", None) |
|
|
if isinstance(text, str) and text.strip(): |
|
|
return text |
|
|
return None |
|
|
|
|
|
def generate_scenario_spec(self, category: str, distractor: Optional[str] = None, |
|
|
persistence: str = "long", tone: str = "neutral", |
|
|
turns: int = 6, special_reqs: str = "") -> Dict[str, Any]: |
|
|
"""Stage 1: Generate a scenario specification.""" |
|
|
domain_context = self._sample_domain_context() |
|
|
midstream_note = "Conversation should start mid-thread (no greetings) and refer back to earlier collaboration." |
|
|
diversity_note = "Keep subject matter aligned with the given domain context; avoid repeating eco/climate themes unless category demands it." |
|
|
combined_reqs = " | ".join(filter(None, [special_reqs, midstream_note, diversity_note])) |
|
|
|
|
|
if category == "none": |
|
|
prompt = f"""Generate a JSON scenario specification for a conversation that has NO long-term memory value (Category: none). |
|
|
The conversation should be strictly transactional, vague, or temporary. |
|
|
Examples: checking status, scheduling a meeting, asking a clarification, greeting, small talk, or discussing weather/lunch. |
|
|
|
|
|
CONTEXT: General professional setting. Do NOT include any strategic projects, specific brand details, or user preferences that would trigger memory storage. |
|
|
|
|
|
Requirements: |
|
|
- Primary Category: none |
|
|
- Distractor Category: {distractor if distractor else "None"} |
|
|
- Persistence Level: short |
|
|
- Turn Count: {turns} |
|
|
- Special Requirements: {combined_reqs} |
|
|
|
|
|
Return a JSON object with: |
|
|
{{ |
|
|
"scenario_description": "Brief narrative setup (2-3 sentences) - MUST BE NON-MEMORABLE", |
|
|
"user_profile": "User role", |
|
|
"key_signals_to_include": ["List of 2-4 signals that are specifically IRRELEVANT or TEMPORARY"], |
|
|
"distractor_signals": ["Optional list of signals"], |
|
|
"suggested_turn_breakdown": "Flow of conversation" |
|
|
}} |
|
|
""" |
|
|
else: |
|
|
prompt = f"""You are designing training scenarios for an AI memory system in marketing context. Generate a scenario specification tailored to this business setting: {domain_context}. |
|
|
|
|
|
Requirements: |
|
|
- Primary Category: {category} |
|
|
- Distractor Category: {distractor if distractor else "None"} |
|
|
- Persistence Level: {persistence} |
|
|
- Emotional Tone: {tone} |
|
|
- Turn Count: {turns} |
|
|
- Special Requirements: {combined_reqs} |
|
|
|
|
|
Return a JSON object with: |
|
|
{{ |
|
|
"scenario_description": "Brief narrative setup (2-3 sentences)", |
|
|
"user_profile": "User role and context", |
|
|
"key_signals_to_include": ["List of 2-4 specific memory-worthy signals"], |
|
|
"distractor_signals": ["Optional list of noise/irrelevant info"], |
|
|
"suggested_turn_breakdown": "How the conversation should flow" |
|
|
}} |
|
|
""" |
|
|
|
|
|
for attempt in range(self.max_retries + 1): |
|
|
try: |
|
|
response = self.client.chat( |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.7, |
|
|
model=self.model, |
|
|
response_format={"type": "json_object"} |
|
|
) |
|
|
content = self._extract_text(response) |
|
|
if not content: |
|
|
raise ValueError("No text content found in scenario response") |
|
|
|
|
|
if content.startswith("```json"): |
|
|
content = content[7:] |
|
|
if content.endswith("```"): |
|
|
content = content[:-3] |
|
|
return json.loads(content.strip()) |
|
|
except Exception as e: |
|
|
print(f"Scenario generation failed (attempt {attempt+1}/{self.max_retries+1}): {e}") |
|
|
if attempt < self.max_retries: |
|
|
sleep_time = 10 * (2 ** attempt) |
|
|
print(f"Retrying in {sleep_time}s...") |
|
|
time.sleep(sleep_time) |
|
|
return {} |
|
|
|
|
|
def generate_conversation(self, scenario_spec: Dict[str, Any], turn_count: int = 6, category: Optional[str] = None) -> Dict[str, Any]: |
|
|
"""Stage 2: Generate conversation based on scenario spec.""" |
|
|
|
|
|
domain_context = self._sample_domain_context() |
|
|
|
|
|
|
|
|
is_none = category == "none" or (category is None and "none" in str(scenario_spec).lower()) |
|
|
|
|
|
if is_none: |
|
|
prompt = f"""You are generating a realistic conversation between a user and an AI assistant. |
|
|
The conversation should be transactional, casual, or vague. IT SHOULD NOT contain any significant long-term memory value for a marketing context. |
|
|
|
|
|
CONTEXT: General professional setting. |
|
|
SCENARIO SPECIFICATION: |
|
|
{json.dumps(scenario_spec, indent=2)} |
|
|
|
|
|
GENERATION RULES: |
|
|
1. Make it natural and fluid. |
|
|
2. DO NOT include detailed strategic plans, brand values, or user preferences. |
|
|
3. Focus on immediate tasks (scheduling, clarifications, small talk). |
|
|
4. Length: {turn_count} turns. |
|
|
5. Avoid opening pleasantries like "Hi" - start mid-thread if appropriate, or just dive in. |
|
|
|
|
|
OUTPUT FORMAT: |
|
|
Return a JSON object with: |
|
|
{{ |
|
|
"scenario_id": "none_transactional_{{random_3_digit_number}}", |
|
|
"conversation": [ |
|
|
{{"role": "user", "content": "..."}}, |
|
|
{{"role": "assistant", "content": "..."}} |
|
|
], |
|
|
"labels": {{ |
|
|
"categories": ["none"], |
|
|
"persistence_horizon": "short", |
|
|
"memory_scope": "none", |
|
|
"rationale": "Explanation why this is not memory-worthy" |
|
|
}}, |
|
|
"metadata": {{ |
|
|
"scenario_type": "negative_example", |
|
|
"primary_category": "none", |
|
|
"distractor_present": false, |
|
|
"turn_count": {turn_count}, |
|
|
"signals_present": [] |
|
|
}} |
|
|
}} |
|
|
|
|
|
CRITICAL: Respond with ONLY the JSON object. |
|
|
""" |
|
|
else: |
|
|
prompt = f"""You are generating realistic marketing conversations between a user and an AI marketing assistant. Generate natural dialogue that contains specific information worth storing in long-term memory. The conversation should start mid-thread (no greetings) and reference the ongoing initiative described below. |
|
|
|
|
|
CONTEXT: |
|
|
You will create a conversation that exemplifies certain memory categories while maintaining realism and natural flow. Assume this is part of {domain_context}. |
|
|
|
|
|
SCENARIO SPECIFICATION: |
|
|
{json.dumps(scenario_spec, indent=2)} |
|
|
|
|
|
MEMORY TAXONOMY (for reference): |
|
|
COMPANY MEMORY: |
|
|
- company.brand_core: Voice, values, positioning, identity anchors (Persistence: Long >1y) |
|
|
- company.strategic_signatures: Decision frameworks, strategic heuristics (Persistence: Long >1y) |
|
|
- company.knowledge_artifacts: Docs, style guides, playbooks (Persistence: Long >1y) |
|
|
- company.business_priorities: Quarterly/seasonal goals, active campaigns (Persistence: Short <3m) |
|
|
- company.tools_config: Integrations, API keys, workflow settings (Persistence: Medium ~6m) |
|
|
- company.performance_context: Campaign metrics, retrospectives, learnings (Persistence: Rolling ~6m) |
|
|
|
|
|
USER MEMORY: |
|
|
- user.communication_style: Tone, verbosity, format expectations (Persistence: Long >1y) |
|
|
- user.strategic_approach: Personal priorities, success definitions (Persistence: Long >1y) |
|
|
- user.role_context: Title, scope, decision authority (Persistence: Medium ~1y) |
|
|
- user.workflow_patterns: Review cadence, collaboration norms (Persistence: Medium ~1y) |
|
|
- user.session_history: Immediate context, recent asks (Persistence: Short <2w) |
|
|
- user.interaction_preferences: Coaching style, feedback expectations (Persistence: Evolving) |
|
|
|
|
|
SPECIAL: |
|
|
- none: Irrelevant, vague, or transactional content |
|
|
|
|
|
GENERATION RULES: |
|
|
1. Make conversations feel natural - include some filler, transitions, acknowledgments |
|
|
2. Embed memory-worthy information organically (don't make it too obvious) |
|
|
3. Include 1-2 utterances that should map to "none" for realism |
|
|
4. If multi-label scenario, ensure signals for both categories are present |
|
|
5. Length: {turn_count} turns (alternating user/assistant) |
|
|
6. Include specific, concrete details (not generic statements) |
|
|
7. For company.* categories: use "we", "our company", "our brand" |
|
|
8. For user.* categories: use "I prefer", "my approach", "I typically" |
|
|
9. Avoid opening pleasantries like "Hi" or "Hello"—jump straight into the ongoing topic. |
|
|
10. **CRITICAL CONSTRAINT**: Limit output to 1-3 categories maximum. |
|
|
11. **EXCLUSIVE NONE**: If "none" is in the categories list, it MUST be the ONLY category. NEVER mix "none" with other categories. If valid signals exist, do NOT include "none". |
|
|
|
|
|
OUTPUT FORMAT: |
|
|
Return a JSON object with: |
|
|
{{ |
|
|
"scenario_id": "{{primary_category}}_{{scenario_type}}_{{random_3_digit_number}}", |
|
|
"conversation": [ |
|
|
{{"role": "user", "content": "..."}}, |
|
|
{{"role": "assistant", "content": "..."}}, |
|
|
... |
|
|
], |
|
|
"labels": {{ |
|
|
"categories": ["array of applicable categories"], |
|
|
"persistence_horizon": "long|medium|short", |
|
|
"memory_scope": "company|user|mixed|none", |
|
|
"rationale": "1-2 sentence explanation of category choices" |
|
|
}}, |
|
|
"metadata": {{ |
|
|
"scenario_type": "descriptive_label", |
|
|
"primary_category": "main_category", |
|
|
"distractor_present": true|false, |
|
|
"turn_count": integer, |
|
|
"signals_present": ["list of specific signals included"] |
|
|
}} |
|
|
}} |
|
|
|
|
|
CRITICAL: Respond with ONLY the JSON object. No markdown formatting, no explanation, no preamble. |
|
|
|
|
|
Generate the conversation now.""" |
|
|
|
|
|
for attempt in range(self.max_retries + 1): |
|
|
try: |
|
|
response = self.client.chat( |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.7, |
|
|
model=self.model, |
|
|
response_format={"type": "json_object"} |
|
|
) |
|
|
content = self._extract_text(response) |
|
|
if not content: |
|
|
raise ValueError("No text content found in conversation response") |
|
|
|
|
|
if content.startswith("```json"): |
|
|
content = content[7:] |
|
|
if content.endswith("```"): |
|
|
content = content[:-3] |
|
|
return json.loads(content.strip()) |
|
|
except Exception as e: |
|
|
print(f"Conversation generation failed (attempt {attempt+1}/{self.max_retries+1}): {e}") |
|
|
if attempt < self.max_retries: |
|
|
sleep_time = 10 * (2 ** attempt) |
|
|
print(f"Retrying in {sleep_time}s...") |
|
|
time.sleep(sleep_time) |
|
|
return {} |
|
|
|
|
|
def run_batch(self, count: int = 1, category: str = "company.brand_core") -> List[Dict[str, Any]]: |
|
|
"""Run a batch generation.""" |
|
|
results = [] |
|
|
print(f"Starting batch generation for {count} examples of {category}...") |
|
|
|
|
|
for i in range(count): |
|
|
print(f"Generating example {i+1}/{count}...") |
|
|
scenario = self.generate_scenario_spec(category=category) |
|
|
if not scenario: |
|
|
print("Skipping due to scenario generation failure") |
|
|
continue |
|
|
|
|
|
conversation = self.generate_conversation(scenario) |
|
|
if conversation: |
|
|
results.append(conversation) |
|
|
print(f"Successfully generated conversation: {conversation.get('scenario_id', 'unknown')}") |
|
|
else: |
|
|
print("Failed to generate conversation") |
|
|
|
|
|
return results |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
pipeline = SyntheticDataPipeline() |
|
|
results = pipeline.run_batch(count=1) |
|
|
print(json.dumps(results, indent=2)) |
|
|
|
|
|
|