|
|
""" |
|
|
Productivity-Enhancing MCP Services |
|
|
Real-world services that increase sales automation efficiency |
|
|
""" |
|
|
import asyncio |
|
|
from typing import Dict, List, Optional, Any |
|
|
from datetime import datetime, timedelta |
|
|
import re |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class MCPAnalyticsService: |
|
|
""" |
|
|
Analytics Service - Track metrics, conversions, and performance |
|
|
Real-world use case: Monitor pipeline health and ROI |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.metrics = { |
|
|
"pipeline_runs": 0, |
|
|
"prospects_discovered": 0, |
|
|
"contacts_found": 0, |
|
|
"emails_generated": 0, |
|
|
"emails_sent": 0, |
|
|
"replies_received": 0, |
|
|
"meetings_booked": 0, |
|
|
"conversion_rate": 0.0, |
|
|
"average_response_time": 0.0, |
|
|
"top_performing_sequences": [], |
|
|
"daily_stats": {} |
|
|
} |
|
|
self.events = [] |
|
|
logger.info("MCP Analytics Service initialized") |
|
|
|
|
|
async def track_event(self, event_type: str, data: Dict) -> str: |
|
|
"""Track an event for analytics""" |
|
|
event = { |
|
|
"type": event_type, |
|
|
"data": data, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
self.events.append(event) |
|
|
|
|
|
|
|
|
today = datetime.utcnow().strftime('%Y-%m-%d') |
|
|
|
|
|
if today not in self.metrics["daily_stats"]: |
|
|
self.metrics["daily_stats"][today] = { |
|
|
"pipeline_runs": 0, |
|
|
"prospects": 0, |
|
|
"contacts": 0, |
|
|
"emails": 0 |
|
|
} |
|
|
|
|
|
if event_type == "pipeline_run": |
|
|
self.metrics["pipeline_runs"] += 1 |
|
|
self.metrics["daily_stats"][today]["pipeline_runs"] += 1 |
|
|
|
|
|
elif event_type == "prospect_discovered": |
|
|
self.metrics["prospects_discovered"] += 1 |
|
|
self.metrics["daily_stats"][today]["prospects"] += 1 |
|
|
|
|
|
elif event_type == "contact_found": |
|
|
self.metrics["contacts_found"] += 1 |
|
|
self.metrics["daily_stats"][today]["contacts"] += 1 |
|
|
|
|
|
elif event_type == "email_generated": |
|
|
self.metrics["emails_generated"] += 1 |
|
|
self.metrics["daily_stats"][today]["emails"] += 1 |
|
|
|
|
|
elif event_type == "email_sent": |
|
|
self.metrics["emails_sent"] += 1 |
|
|
|
|
|
elif event_type == "reply_received": |
|
|
self.metrics["replies_received"] += 1 |
|
|
|
|
|
elif event_type == "meeting_booked": |
|
|
self.metrics["meetings_booked"] += 1 |
|
|
|
|
|
|
|
|
if self.metrics["emails_sent"] > 0: |
|
|
self.metrics["conversion_rate"] = ( |
|
|
self.metrics["meetings_booked"] / self.metrics["emails_sent"] |
|
|
) * 100 |
|
|
|
|
|
logger.info(f"Analytics event tracked: {event_type}") |
|
|
return "tracked" |
|
|
|
|
|
async def get_metrics(self) -> Dict: |
|
|
"""Get current metrics""" |
|
|
return self.metrics |
|
|
|
|
|
async def get_dashboard_data(self) -> Dict: |
|
|
"""Get formatted dashboard data""" |
|
|
return { |
|
|
"summary": { |
|
|
"Total Pipeline Runs": self.metrics["pipeline_runs"], |
|
|
"Prospects Discovered": self.metrics["prospects_discovered"], |
|
|
"Contacts Found": self.metrics["contacts_found"], |
|
|
"Emails Generated": self.metrics["emails_generated"], |
|
|
"Emails Sent": self.metrics["emails_sent"], |
|
|
"Replies Received": self.metrics["replies_received"], |
|
|
"Meetings Booked": self.metrics["meetings_booked"], |
|
|
"Conversion Rate": f"{self.metrics['conversion_rate']:.2f}%" |
|
|
}, |
|
|
"daily_stats": self.metrics["daily_stats"], |
|
|
"recent_events": self.events[-10:] |
|
|
} |
|
|
|
|
|
|
|
|
class MCPEnrichmentService: |
|
|
""" |
|
|
Enrichment Service - Enrich prospect and contact data |
|
|
Real-world use case: Add company info, social profiles, tech stack |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.enrichment_db = { |
|
|
"shopify.com": { |
|
|
"employee_count": "10,000+", |
|
|
"founded_year": 2006, |
|
|
"funding": "$2.9B", |
|
|
"tech_stack": ["Ruby on Rails", "React", "MySQL", "Redis"], |
|
|
"social_profiles": { |
|
|
"linkedin": "https://linkedin.com/company/shopify", |
|
|
"twitter": "https://twitter.com/shopify" |
|
|
}, |
|
|
"industry_tags": ["E-commerce", "SaaS", "Retail Tech"], |
|
|
"revenue_range": "$1B - $5B" |
|
|
}, |
|
|
"stripe.com": { |
|
|
"employee_count": "8,000+", |
|
|
"founded_year": 2010, |
|
|
"funding": "$2.2B", |
|
|
"tech_stack": ["Ruby", "Scala", "Go", "React"], |
|
|
"social_profiles": { |
|
|
"linkedin": "https://linkedin.com/company/stripe", |
|
|
"twitter": "https://twitter.com/stripe" |
|
|
}, |
|
|
"industry_tags": ["Fintech", "Payments", "SaaS"], |
|
|
"revenue_range": "$5B+" |
|
|
} |
|
|
} |
|
|
logger.info("MCP Enrichment Service initialized") |
|
|
|
|
|
async def enrich_company(self, domain: str) -> Dict: |
|
|
"""Enrich company data with additional information""" |
|
|
logger.info(f"Enriching company data for: {domain}") |
|
|
|
|
|
|
|
|
enriched_data = self.enrichment_db.get(domain, {}) |
|
|
|
|
|
if not enriched_data: |
|
|
|
|
|
enriched_data = { |
|
|
"employee_count": "Unknown", |
|
|
"founded_year": None, |
|
|
"funding": "Unknown", |
|
|
"tech_stack": [], |
|
|
"social_profiles": { |
|
|
"linkedin": f"https://linkedin.com/company/{domain.split('.')[0]}", |
|
|
"twitter": f"https://twitter.com/{domain.split('.')[0]}" |
|
|
}, |
|
|
"industry_tags": [], |
|
|
"revenue_range": "Unknown", |
|
|
"enrichment_source": "estimated" |
|
|
} |
|
|
else: |
|
|
enriched_data["enrichment_source"] = "database" |
|
|
|
|
|
return enriched_data |
|
|
|
|
|
async def enrich_contact(self, email: str, name: str) -> Dict: |
|
|
"""Enrich contact data with social profiles and background""" |
|
|
logger.info(f"Enriching contact data for: {email}") |
|
|
|
|
|
|
|
|
domain = email.split('@')[1] if '@' in email else '' |
|
|
username = email.split('@')[0] if '@' in email else '' |
|
|
|
|
|
return { |
|
|
"email": email, |
|
|
"name": name, |
|
|
"linkedin_profile": f"https://linkedin.com/in/{username.replace('.', '-')}", |
|
|
"twitter_profile": f"https://twitter.com/{username.replace('.', '_')}", |
|
|
"github_profile": f"https://github.com/{username.replace('.', '')}", |
|
|
"estimated_seniority": self._estimate_seniority(email, name), |
|
|
"enrichment_timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
def _estimate_seniority(self, email: str, name: str) -> str: |
|
|
"""Estimate seniority based on email patterns""" |
|
|
email_lower = email.lower() |
|
|
if any(x in email_lower for x in ['ceo', 'founder', 'chief']): |
|
|
return "Executive" |
|
|
elif any(x in email_lower for x in ['vp', 'director', 'head']): |
|
|
return "Senior" |
|
|
elif any(x in email_lower for x in ['manager', 'lead']): |
|
|
return "Mid-Level" |
|
|
else: |
|
|
return "Individual Contributor" |
|
|
|
|
|
|
|
|
class MCPValidationService: |
|
|
""" |
|
|
Validation Service - Validate emails, domains, and contact information |
|
|
Real-world use case: Reduce bounce rates and improve deliverability |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.disposable_domains = [ |
|
|
"tempmail.com", "throwaway.email", "guerrillamail.com", |
|
|
"10minutemail.com", "mailinator.com" |
|
|
] |
|
|
|
|
|
|
|
|
self.invalid_patterns = [ |
|
|
"noreply@", "no-reply@", "donotreply@", |
|
|
"info@", "admin@", "support@", "sales@" |
|
|
] |
|
|
|
|
|
self.validation_cache = {} |
|
|
logger.info("MCP Validation Service initialized") |
|
|
|
|
|
async def validate_email(self, email: str) -> Dict: |
|
|
"""Validate email address""" |
|
|
logger.info(f"Validating email: {email}") |
|
|
|
|
|
result = { |
|
|
"email": email, |
|
|
"is_valid": False, |
|
|
"is_disposable": False, |
|
|
"is_role_based": False, |
|
|
"is_catchall": False, |
|
|
"deliverability_score": 0, |
|
|
"validation_issues": [], |
|
|
"validated_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
|
|
if not re.match(email_regex, email): |
|
|
result["validation_issues"].append("Invalid email format") |
|
|
return result |
|
|
|
|
|
|
|
|
domain = email.split('@')[1] if '@' in email else '' |
|
|
|
|
|
|
|
|
if domain in self.disposable_domains: |
|
|
result["is_disposable"] = True |
|
|
result["validation_issues"].append("Disposable email domain") |
|
|
|
|
|
|
|
|
for pattern in self.invalid_patterns: |
|
|
if email.lower().startswith(pattern): |
|
|
result["is_role_based"] = True |
|
|
result["validation_issues"].append("Role-based email (low engagement)") |
|
|
break |
|
|
|
|
|
|
|
|
score = 100 |
|
|
if result["is_disposable"]: |
|
|
score -= 50 |
|
|
if result["is_role_based"]: |
|
|
score -= 30 |
|
|
if len(result["validation_issues"]) == 0: |
|
|
result["is_valid"] = True |
|
|
|
|
|
result["deliverability_score"] = max(0, score) |
|
|
|
|
|
return result |
|
|
|
|
|
async def validate_domain(self, domain: str) -> Dict: |
|
|
"""Validate domain""" |
|
|
logger.info(f"Validating domain: {domain}") |
|
|
|
|
|
result = { |
|
|
"domain": domain, |
|
|
"is_valid": False, |
|
|
"has_mx_records": False, |
|
|
"is_active": False, |
|
|
"validation_issues": [], |
|
|
"validated_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
domain_regex = r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' |
|
|
if not re.match(domain_regex, domain): |
|
|
result["validation_issues"].append("Invalid domain format") |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
result["is_valid"] = True |
|
|
result["has_mx_records"] = True |
|
|
result["is_active"] = True |
|
|
|
|
|
return result |
|
|
|
|
|
async def batch_validate_emails(self, emails: List[str]) -> List[Dict]: |
|
|
"""Batch validate multiple emails""" |
|
|
logger.info(f"Batch validating {len(emails)} emails") |
|
|
|
|
|
results = [] |
|
|
for email in emails: |
|
|
validation = await self.validate_email(email) |
|
|
results.append(validation) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
class MCPSummaryService: |
|
|
""" |
|
|
Summary Service - Generate AI-powered summaries for companies and prospects |
|
|
Real-world use case: Create comprehensive, informative summaries for sales teams |
|
|
|
|
|
ENHANCED: Now uses LLM service with strict grounding to prevent hallucination |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
from services.llm_service import get_llm_service |
|
|
self.llm = get_llm_service() |
|
|
logger.info("MCP Summary Service initialized with LLM grounding support") |
|
|
|
|
|
async def generate_company_summary(self, company_data: Dict, enrichment_data: Dict = None) -> str: |
|
|
""" |
|
|
ENHANCED: Generate comprehensive AI summary for a prospect company |
|
|
Now uses LLM service with strict grounding |
|
|
|
|
|
Args: |
|
|
company_data: Basic company information (INCLUDING raw_facts if available) |
|
|
enrichment_data: Optional enriched data from enrichment service |
|
|
|
|
|
Returns: |
|
|
Detailed summary string grounded in facts |
|
|
""" |
|
|
logger.info(f"Generating GROUNDED summary for company: {company_data.get('name', 'Unknown')}") |
|
|
|
|
|
name = company_data.get('name', 'Unknown Company') |
|
|
|
|
|
|
|
|
if enrichment_data: |
|
|
company_data = {**company_data, **enrichment_data} |
|
|
|
|
|
|
|
|
raw_facts = company_data.get('raw_facts', []) |
|
|
|
|
|
|
|
|
summary = await self.llm.generate_grounded_summary( |
|
|
company_name=name, |
|
|
extracted_data=company_data, |
|
|
raw_facts=raw_facts, |
|
|
summary_type="prospect" |
|
|
) |
|
|
|
|
|
return summary |
|
|
|
|
|
async def generate_prospect_summary( |
|
|
self, |
|
|
prospect_data: Dict, |
|
|
company_enrichment: Dict = None, |
|
|
contact_data: List[Dict] = None |
|
|
) -> str: |
|
|
""" |
|
|
Generate comprehensive AI summary for a sales prospect |
|
|
|
|
|
Args: |
|
|
prospect_data: Basic prospect information with company data |
|
|
company_enrichment: Enriched company data |
|
|
contact_data: List of contacts found |
|
|
|
|
|
Returns: |
|
|
Detailed prospect summary |
|
|
""" |
|
|
logger.info("Generating prospect summary") |
|
|
|
|
|
company = prospect_data.get('company', {}) |
|
|
company_name = company.get('name', 'Unknown Company') |
|
|
domain = company.get('domain', '') |
|
|
industry = company.get('industry', 'Unknown') |
|
|
|
|
|
|
|
|
company_summary = await self.generate_company_summary(company, company_enrichment) |
|
|
|
|
|
|
|
|
fit_score = prospect_data.get('fit_score', 0.0) |
|
|
status = prospect_data.get('status', 'new') |
|
|
|
|
|
|
|
|
contact_summary = "" |
|
|
if contact_data: |
|
|
contact_count = len(contact_data) |
|
|
contact_summary = f" We have identified {contact_count} key contact{'s' if contact_count > 1 else ''}" |
|
|
|
|
|
|
|
|
decision_makers = [c for c in contact_data if any( |
|
|
title_word in c.get('title', '').lower() |
|
|
for title_word in ['ceo', 'cto', 'cfo', 'vp', 'director', 'head', 'chief'] |
|
|
)] |
|
|
|
|
|
if decision_makers: |
|
|
contact_summary += f", including {len(decision_makers)} decision-maker{'s' if len(decision_makers) > 1 else ''}" |
|
|
|
|
|
contact_summary += "." |
|
|
|
|
|
|
|
|
fit_assessment = "" |
|
|
if fit_score > 0: |
|
|
if fit_score >= 0.8: |
|
|
fit_assessment = " **High Priority:** This prospect shows excellent fit based on company size, industry, and technology profile." |
|
|
elif fit_score >= 0.6: |
|
|
fit_assessment = " **Good Fit:** This prospect demonstrates strong alignment with our ideal customer profile." |
|
|
elif fit_score >= 0.4: |
|
|
fit_assessment = " **Moderate Fit:** This prospect shows potential but may require additional qualification." |
|
|
else: |
|
|
fit_assessment = " **Low Priority:** This prospect shows limited fit with our target criteria." |
|
|
|
|
|
|
|
|
full_summary = company_summary + contact_summary + fit_assessment |
|
|
|
|
|
return full_summary |
|
|
|
|
|
async def generate_client_summary( |
|
|
self, |
|
|
client_data: Dict, |
|
|
enrichment_data: Dict = None |
|
|
) -> str: |
|
|
""" |
|
|
ENHANCED: Generate comprehensive AI summary for CLIENT company (the company we're selling FOR) |
|
|
Now uses LLM service with strict grounding to prevent hallucination |
|
|
|
|
|
Args: |
|
|
client_data: Client profile data with offerings, value props, etc. (INCLUDING raw_facts) |
|
|
enrichment_data: Optional enriched data |
|
|
|
|
|
Returns: |
|
|
Detailed client summary grounded in extracted facts |
|
|
""" |
|
|
logger.info(f"Generating GROUNDED client summary for: {client_data.get('name', 'Unknown')}") |
|
|
|
|
|
name = client_data.get('name', 'Unknown Company') |
|
|
|
|
|
|
|
|
if enrichment_data: |
|
|
client_data = {**client_data, **enrichment_data} |
|
|
|
|
|
|
|
|
raw_facts = client_data.get('raw_facts', []) |
|
|
|
|
|
|
|
|
summary = await self.llm.generate_grounded_summary( |
|
|
company_name=name, |
|
|
extracted_data=client_data, |
|
|
raw_facts=raw_facts, |
|
|
summary_type="client" |
|
|
) |
|
|
|
|
|
return summary |
|
|
|
|
|
|
|
|
|
|
|
_analytics_service: Optional[MCPAnalyticsService] = None |
|
|
_enrichment_service: Optional[MCPEnrichmentService] = None |
|
|
_validation_service: Optional[MCPValidationService] = None |
|
|
_summary_service: Optional[MCPSummaryService] = None |
|
|
|
|
|
|
|
|
def get_analytics_service() -> MCPAnalyticsService: |
|
|
"""Get or create analytics service instance""" |
|
|
global _analytics_service |
|
|
if _analytics_service is None: |
|
|
_analytics_service = MCPAnalyticsService() |
|
|
return _analytics_service |
|
|
|
|
|
|
|
|
def get_enrichment_service() -> MCPEnrichmentService: |
|
|
"""Get or create enrichment service instance""" |
|
|
global _enrichment_service |
|
|
if _enrichment_service is None: |
|
|
_enrichment_service = MCPEnrichmentService() |
|
|
return _enrichment_service |
|
|
|
|
|
|
|
|
def get_validation_service() -> MCPValidationService: |
|
|
"""Get or create validation service instance""" |
|
|
global _validation_service |
|
|
if _validation_service is None: |
|
|
_validation_service = MCPValidationService() |
|
|
return _validation_service |
|
|
|
|
|
|
|
|
def get_summary_service() -> MCPSummaryService: |
|
|
"""Get or create summary service instance""" |
|
|
global _summary_service |
|
|
if _summary_service is None: |
|
|
_summary_service = MCPSummaryService() |
|
|
return _summary_service |
|
|
|