|
|
|
|
|
import json |
|
|
import re |
|
|
import logging |
|
|
from typing import AsyncGenerator |
|
|
from app.schema import Prospect |
|
|
from app.config import MODEL_NAME, HF_API_TOKEN, MODEL_NAME_FALLBACK |
|
|
from app.logging_utils import log_event |
|
|
from vector.retriever import Retriever |
|
|
from huggingface_hub import AsyncInferenceClient |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Writer: |
|
|
"""Generates outreach content with HuggingFace Inference API streaming""" |
|
|
|
|
|
def __init__(self, mcp_registry): |
|
|
self.mcp = mcp_registry |
|
|
self.store = mcp_registry.get_store_client() |
|
|
self.retriever = Retriever() |
|
|
|
|
|
self.hf_client = AsyncInferenceClient(token=HF_API_TOKEN if HF_API_TOKEN else None) |
|
|
|
|
|
async def run_streaming(self, prospect: Prospect) -> AsyncGenerator[dict, None]: |
|
|
"""Generate content with streaming tokens""" |
|
|
|
|
|
|
|
|
if prospect.contacts: |
|
|
for contact in prospect.contacts: |
|
|
log_event("writer", f"Using contact: {contact.name} ({contact.title}) - {contact.email}", "agent_log") |
|
|
logger.info(f"Writer: Using contact: {contact.name} ({contact.title}) - {contact.email}") |
|
|
else: |
|
|
log_event("writer", "WARNING: No contacts found for this prospect!", "agent_log") |
|
|
logger.warning(f"Writer: No contacts found for prospect {prospect.company.name}") |
|
|
|
|
|
|
|
|
try: |
|
|
relevant_facts = self.retriever.retrieve(prospect.company.id, k=5) |
|
|
except: |
|
|
relevant_facts = [] |
|
|
|
|
|
|
|
|
context = f""" |
|
|
COMPANY PROFILE: |
|
|
Name: {prospect.company.name} |
|
|
Industry: {prospect.company.industry} |
|
|
Size: {prospect.company.size} employees |
|
|
Domain: {prospect.company.domain} |
|
|
|
|
|
KEY CHALLENGES: |
|
|
{chr(10).join(f'• {pain}' for pain in prospect.company.pains)} |
|
|
|
|
|
BUSINESS CONTEXT: |
|
|
{chr(10).join(f'• {note}' for note in prospect.company.notes) if prospect.company.notes else '• No additional notes'} |
|
|
|
|
|
RELEVANT INSIGHTS: |
|
|
{chr(10).join(f'• {fact["text"]} (confidence: {fact.get("score", 0.7):.2f})' for fact in relevant_facts[:3]) if relevant_facts else '• Industry best practices suggest focusing on customer experience improvements'} |
|
|
""" |
|
|
|
|
|
|
|
|
summary_prompt = f"""{context} |
|
|
|
|
|
Generate a comprehensive bullet-point summary for {prospect.company.name} that includes: |
|
|
1. Company overview (industry, size) |
|
|
2. Main challenges they face |
|
|
3. Specific opportunities for improvement |
|
|
4. Recommended actions |
|
|
|
|
|
Format: Use 5-7 bullets, each starting with "•". Be specific and actionable. |
|
|
Include the industry and size context in your summary.""" |
|
|
|
|
|
summary_text = "" |
|
|
|
|
|
|
|
|
yield log_event("writer", f"Generating content for {prospect.company.name}", "company_start", |
|
|
{"company": prospect.company.name, |
|
|
"industry": prospect.company.industry, |
|
|
"size": prospect.company.size}) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
stream = await self.hf_client.text_generation( |
|
|
summary_prompt, |
|
|
model=MODEL_NAME, |
|
|
max_new_tokens=500, |
|
|
temperature=0.7, |
|
|
stream=True |
|
|
) |
|
|
|
|
|
async for token in stream: |
|
|
summary_text += token |
|
|
yield log_event( |
|
|
"writer", |
|
|
token, |
|
|
"llm_token", |
|
|
{ |
|
|
"type": "summary", |
|
|
"token": token, |
|
|
"prospect_id": prospect.id, |
|
|
"company_id": prospect.company.id, |
|
|
"company_name": prospect.company.name, |
|
|
}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
summary_text = f"""• {prospect.company.name} is a {prospect.company.industry} company with {prospect.company.size} employees |
|
|
• Main challenge: {prospect.company.pains[0] if prospect.company.pains else 'Customer experience improvement'} |
|
|
• Opportunity: Implement modern CX solutions to improve customer satisfaction |
|
|
• Recommended action: Schedule a consultation to discuss specific needs""" |
|
|
yield log_event("writer", f"Summary generation failed, using default: {e}", "llm_error") |
|
|
|
|
|
|
|
|
|
|
|
greeting_hint = "" |
|
|
contact_context = "" |
|
|
if prospect.contacts: |
|
|
contact = prospect.contacts[0] |
|
|
first_name = (contact.name or "").split()[0] |
|
|
full_name = contact.name |
|
|
title = contact.title |
|
|
|
|
|
if first_name: |
|
|
greeting_hint = f"IMPORTANT: Start the email EXACTLY with this greeting: 'Hi {first_name},'\n" |
|
|
contact_context = f"\nTARGET RECIPIENT:\nName: {full_name}\nTitle: {title}\nEmail: {contact.email}\n" |
|
|
|
|
|
email_prompt = f"""{context} |
|
|
{contact_context} |
|
|
Company Summary: |
|
|
{summary_text} |
|
|
|
|
|
Write a highly personalized outreach email from a CX AI platform provider to {prospect.contacts[0].name if prospect.contacts else 'leaders'} at {prospect.company.name}. |
|
|
{greeting_hint} |
|
|
Requirements: |
|
|
- Subject line that mentions their company name and industry |
|
|
- Body: 150-180 words, professional and friendly |
|
|
- Reference their specific industry ({prospect.company.industry}) and size ({prospect.company.size} employees) |
|
|
- Address them by their first name in the greeting (e.g., "Hi {prospect.contacts[0].name.split()[0] if prospect.contacts else 'there'},") |
|
|
- Acknowledge their role as {prospect.contacts[0].title if prospect.contacts else 'a leader'} in the organization |
|
|
- Clearly connect their challenges to AI-powered customer experience solutions |
|
|
- One clear call-to-action to schedule a short conversation or demo next week |
|
|
- Do not write as if the email is from the company to us |
|
|
- No exaggerated claims |
|
|
- Sign off as: "The CX Team" |
|
|
|
|
|
Format response exactly as: |
|
|
Subject: [subject line] |
|
|
Body: [email body] |
|
|
""" |
|
|
|
|
|
email_text = "" |
|
|
|
|
|
|
|
|
yield log_event("writer", f"Generating email for {prospect.company.name}", "email_start", |
|
|
{"company": prospect.company.name}) |
|
|
|
|
|
|
|
|
try: |
|
|
stream = await self.hf_client.text_generation( |
|
|
email_prompt, |
|
|
model=MODEL_NAME, |
|
|
max_new_tokens=400, |
|
|
temperature=0.7, |
|
|
stream=True |
|
|
) |
|
|
|
|
|
async for token in stream: |
|
|
email_text += token |
|
|
yield log_event( |
|
|
"writer", |
|
|
token, |
|
|
"llm_token", |
|
|
{ |
|
|
"type": "email", |
|
|
"token": token, |
|
|
"prospect_id": prospect.id, |
|
|
"company_id": prospect.company.id, |
|
|
"company_name": prospect.company.name, |
|
|
}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
contact_greeting = "Hi there," |
|
|
if prospect.contacts: |
|
|
first_name = prospect.contacts[0].name.split()[0] if prospect.contacts[0].name else "there" |
|
|
contact_greeting = f"Hi {first_name}," |
|
|
|
|
|
email_text = f"""Subject: Improve {prospect.company.name}'s Customer Experience |
|
|
|
|
|
Body: {contact_greeting} |
|
|
|
|
|
As a {prospect.company.industry} company with {prospect.company.size} employees, you face unique customer experience challenges. We understand that {prospect.company.pains[0] if prospect.company.pains else 'improving customer satisfaction'} is a priority for your organization. |
|
|
|
|
|
Our AI-powered platform has helped similar companies in the {prospect.company.industry} industry improve their customer experience metrics significantly. We'd love to discuss how we can help {prospect.company.name} achieve similar results. |
|
|
|
|
|
Would you be available for a brief call next week to explore how we can address your specific needs? |
|
|
|
|
|
Best regards, |
|
|
The CX Team""" |
|
|
yield log_event("writer", f"Email generation failed, using default: {e}", "llm_error") |
|
|
|
|
|
|
|
|
email_parts = {"subject": "", "body": ""} |
|
|
if "Subject:" in email_text and "Body:" in email_text: |
|
|
parts = email_text.split("Body:") |
|
|
email_parts["subject"] = parts[0].replace("Subject:", "").strip() |
|
|
email_parts["body"] = parts[1].strip() |
|
|
else: |
|
|
|
|
|
contact_greeting = "Hi there," |
|
|
if prospect.contacts: |
|
|
first_name = prospect.contacts[0].name.split()[0] if prospect.contacts[0].name else "there" |
|
|
contact_greeting = f"Hi {first_name}," |
|
|
|
|
|
email_parts["subject"] = f"Transform {prospect.company.name}'s Customer Experience" |
|
|
email_parts["body"] = email_text or f"""{contact_greeting} |
|
|
|
|
|
As a leading {prospect.company.industry} company with {prospect.company.size} employees, we know you're focused on delivering exceptional customer experiences. |
|
|
|
|
|
We'd like to discuss how our AI-powered platform can help address your specific challenges and improve your customer satisfaction metrics. |
|
|
|
|
|
Best regards, |
|
|
The CX Team""" |
|
|
|
|
|
|
|
|
if prospect.contacts: |
|
|
contact_name = prospect.contacts[0].name |
|
|
if email_parts.get("subject"): |
|
|
email_parts["subject"] = re.sub(r"\[[^\]]+\]", contact_name, email_parts["subject"]) |
|
|
if email_parts.get("body"): |
|
|
email_parts["body"] = re.sub(r"\[[^\]]+\]", contact_name, email_parts["body"]) |
|
|
|
|
|
|
|
|
prospect.summary = f"**{prospect.company.name} ({prospect.company.industry}, {prospect.company.size} employees)**\n\n{summary_text}" |
|
|
prospect.email_draft = email_parts |
|
|
prospect.status = "drafted" |
|
|
await self.store.save_prospect(prospect) |
|
|
|
|
|
|
|
|
yield log_event( |
|
|
"writer", |
|
|
f"Generation complete for {prospect.company.name}", |
|
|
"llm_done", |
|
|
{ |
|
|
"prospect": prospect, |
|
|
"summary": prospect.summary, |
|
|
"email": email_parts, |
|
|
"company_name": prospect.company.name, |
|
|
"prospect_id": prospect.id, |
|
|
"company_id": prospect.company.id, |
|
|
}, |
|
|
) |
|
|
|
|
|
async def run(self, prospect: Prospect) -> Prospect: |
|
|
"""Non-streaming version for compatibility""" |
|
|
async for event in self.run_streaming(prospect): |
|
|
if event["type"] == "llm_done": |
|
|
return event["payload"]["prospect"] |
|
|
return prospect |
|
|
|