""" Productivity-Enhancing MCP Services Real-world services that increase sales automation efficiency """ import asyncio from typing import Dict, List, Optional, Any from datetime import datetime, timedelta import re import logging logger = logging.getLogger(__name__) class MCPAnalyticsService: """ Analytics Service - Track metrics, conversions, and performance Real-world use case: Monitor pipeline health and ROI """ def __init__(self): self.metrics = { "pipeline_runs": 0, "prospects_discovered": 0, "contacts_found": 0, "emails_generated": 0, "emails_sent": 0, "replies_received": 0, "meetings_booked": 0, "conversion_rate": 0.0, "average_response_time": 0.0, "top_performing_sequences": [], "daily_stats": {} } self.events = [] logger.info("MCP Analytics Service initialized") async def track_event(self, event_type: str, data: Dict) -> str: """Track an event for analytics""" event = { "type": event_type, "data": data, "timestamp": datetime.utcnow().isoformat() } self.events.append(event) # Update aggregated metrics today = datetime.utcnow().strftime('%Y-%m-%d') if today not in self.metrics["daily_stats"]: self.metrics["daily_stats"][today] = { "pipeline_runs": 0, "prospects": 0, "contacts": 0, "emails": 0 } if event_type == "pipeline_run": self.metrics["pipeline_runs"] += 1 self.metrics["daily_stats"][today]["pipeline_runs"] += 1 elif event_type == "prospect_discovered": self.metrics["prospects_discovered"] += 1 self.metrics["daily_stats"][today]["prospects"] += 1 elif event_type == "contact_found": self.metrics["contacts_found"] += 1 self.metrics["daily_stats"][today]["contacts"] += 1 elif event_type == "email_generated": self.metrics["emails_generated"] += 1 self.metrics["daily_stats"][today]["emails"] += 1 elif event_type == "email_sent": self.metrics["emails_sent"] += 1 elif event_type == "reply_received": self.metrics["replies_received"] += 1 elif event_type == "meeting_booked": self.metrics["meetings_booked"] += 1 # Calculate conversion rate if self.metrics["emails_sent"] > 0: self.metrics["conversion_rate"] = ( self.metrics["meetings_booked"] / self.metrics["emails_sent"] ) * 100 logger.info(f"Analytics event tracked: {event_type}") return "tracked" async def get_metrics(self) -> Dict: """Get current metrics""" return self.metrics async def get_dashboard_data(self) -> Dict: """Get formatted dashboard data""" return { "summary": { "Total Pipeline Runs": self.metrics["pipeline_runs"], "Prospects Discovered": self.metrics["prospects_discovered"], "Contacts Found": self.metrics["contacts_found"], "Emails Generated": self.metrics["emails_generated"], "Emails Sent": self.metrics["emails_sent"], "Replies Received": self.metrics["replies_received"], "Meetings Booked": self.metrics["meetings_booked"], "Conversion Rate": f"{self.metrics['conversion_rate']:.2f}%" }, "daily_stats": self.metrics["daily_stats"], "recent_events": self.events[-10:] # Last 10 events } class MCPEnrichmentService: """ Enrichment Service - Enrich prospect and contact data Real-world use case: Add company info, social profiles, tech stack """ def __init__(self): # Mock enrichment database self.enrichment_db = { "shopify.com": { "employee_count": "10,000+", "founded_year": 2006, "funding": "$2.9B", "tech_stack": ["Ruby on Rails", "React", "MySQL", "Redis"], "social_profiles": { "linkedin": "https://linkedin.com/company/shopify", "twitter": "https://twitter.com/shopify" }, "industry_tags": ["E-commerce", "SaaS", "Retail Tech"], "revenue_range": "$1B - $5B" }, "stripe.com": { "employee_count": "8,000+", "founded_year": 2010, "funding": "$2.2B", "tech_stack": ["Ruby", "Scala", "Go", "React"], "social_profiles": { "linkedin": "https://linkedin.com/company/stripe", "twitter": "https://twitter.com/stripe" }, "industry_tags": ["Fintech", "Payments", "SaaS"], "revenue_range": "$5B+" } } logger.info("MCP Enrichment Service initialized") async def enrich_company(self, domain: str) -> Dict: """Enrich company data with additional information""" logger.info(f"Enriching company data for: {domain}") # Check if we have enrichment data enriched_data = self.enrichment_db.get(domain, {}) if not enriched_data: # Generate estimated data based on domain enriched_data = { "employee_count": "Unknown", "founded_year": None, "funding": "Unknown", "tech_stack": [], "social_profiles": { "linkedin": f"https://linkedin.com/company/{domain.split('.')[0]}", "twitter": f"https://twitter.com/{domain.split('.')[0]}" }, "industry_tags": [], "revenue_range": "Unknown", "enrichment_source": "estimated" } else: enriched_data["enrichment_source"] = "database" return enriched_data async def enrich_contact(self, email: str, name: str) -> Dict: """Enrich contact data with social profiles and background""" logger.info(f"Enriching contact data for: {email}") # Extract info from email domain = email.split('@')[1] if '@' in email else '' username = email.split('@')[0] if '@' in email else '' return { "email": email, "name": name, "linkedin_profile": f"https://linkedin.com/in/{username.replace('.', '-')}", "twitter_profile": f"https://twitter.com/{username.replace('.', '_')}", "github_profile": f"https://github.com/{username.replace('.', '')}", "estimated_seniority": self._estimate_seniority(email, name), "enrichment_timestamp": datetime.utcnow().isoformat() } def _estimate_seniority(self, email: str, name: str) -> str: """Estimate seniority based on email patterns""" email_lower = email.lower() if any(x in email_lower for x in ['ceo', 'founder', 'chief']): return "Executive" elif any(x in email_lower for x in ['vp', 'director', 'head']): return "Senior" elif any(x in email_lower for x in ['manager', 'lead']): return "Mid-Level" else: return "Individual Contributor" class MCPValidationService: """ Validation Service - Validate emails, domains, and contact information Real-world use case: Reduce bounce rates and improve deliverability """ def __init__(self): # Common disposable email domains self.disposable_domains = [ "tempmail.com", "throwaway.email", "guerrillamail.com", "10minutemail.com", "mailinator.com" ] # Known invalid patterns self.invalid_patterns = [ "noreply@", "no-reply@", "donotreply@", "info@", "admin@", "support@", "sales@" ] self.validation_cache = {} logger.info("MCP Validation Service initialized") async def validate_email(self, email: str) -> Dict: """Validate email address""" logger.info(f"Validating email: {email}") result = { "email": email, "is_valid": False, "is_disposable": False, "is_role_based": False, "is_catchall": False, "deliverability_score": 0, "validation_issues": [], "validated_at": datetime.utcnow().isoformat() } # Basic format validation email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_regex, email): result["validation_issues"].append("Invalid email format") return result # Extract domain domain = email.split('@')[1] if '@' in email else '' # Check if disposable if domain in self.disposable_domains: result["is_disposable"] = True result["validation_issues"].append("Disposable email domain") # Check if role-based for pattern in self.invalid_patterns: if email.lower().startswith(pattern): result["is_role_based"] = True result["validation_issues"].append("Role-based email (low engagement)") break # Calculate deliverability score score = 100 if result["is_disposable"]: score -= 50 if result["is_role_based"]: score -= 30 if len(result["validation_issues"]) == 0: result["is_valid"] = True result["deliverability_score"] = max(0, score) return result async def validate_domain(self, domain: str) -> Dict: """Validate domain""" logger.info(f"Validating domain: {domain}") result = { "domain": domain, "is_valid": False, "has_mx_records": False, "is_active": False, "validation_issues": [], "validated_at": datetime.utcnow().isoformat() } # Basic domain format validation domain_regex = r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' if not re.match(domain_regex, domain): result["validation_issues"].append("Invalid domain format") return result # For demo purposes, assume most domains are valid # In production, would do DNS lookups result["is_valid"] = True result["has_mx_records"] = True result["is_active"] = True return result async def batch_validate_emails(self, emails: List[str]) -> List[Dict]: """Batch validate multiple emails""" logger.info(f"Batch validating {len(emails)} emails") results = [] for email in emails: validation = await self.validate_email(email) results.append(validation) return results class MCPSummaryService: """ Summary Service - Generate AI-powered summaries for companies and prospects Real-world use case: Create comprehensive, informative summaries for sales teams ENHANCED: Now uses LLM service with strict grounding to prevent hallucination """ def __init__(self): from services.llm_service import get_llm_service self.llm = get_llm_service() logger.info("MCP Summary Service initialized with LLM grounding support") async def generate_company_summary(self, company_data: Dict, enrichment_data: Dict = None) -> str: """ ENHANCED: Generate comprehensive AI summary for a prospect company Now uses LLM service with strict grounding Args: company_data: Basic company information (INCLUDING raw_facts if available) enrichment_data: Optional enriched data from enrichment service Returns: Detailed summary string grounded in facts """ logger.info(f"Generating GROUNDED summary for company: {company_data.get('name', 'Unknown')}") name = company_data.get('name', 'Unknown Company') # Merge enrichment data if available if enrichment_data: company_data = {**company_data, **enrichment_data} # Get raw facts for grounding raw_facts = company_data.get('raw_facts', []) # Use LLM service for grounded summarization summary = await self.llm.generate_grounded_summary( company_name=name, extracted_data=company_data, raw_facts=raw_facts, summary_type="prospect" ) return summary async def generate_prospect_summary( self, prospect_data: Dict, company_enrichment: Dict = None, contact_data: List[Dict] = None ) -> str: """ Generate comprehensive AI summary for a sales prospect Args: prospect_data: Basic prospect information with company data company_enrichment: Enriched company data contact_data: List of contacts found Returns: Detailed prospect summary """ logger.info("Generating prospect summary") company = prospect_data.get('company', {}) company_name = company.get('name', 'Unknown Company') domain = company.get('domain', '') industry = company.get('industry', 'Unknown') # Start with company summary company_summary = await self.generate_company_summary(company, company_enrichment) # Add prospect-specific insights fit_score = prospect_data.get('fit_score', 0.0) status = prospect_data.get('status', 'new') # Contacts analysis contact_summary = "" if contact_data: contact_count = len(contact_data) contact_summary = f" We have identified {contact_count} key contact{'s' if contact_count > 1 else ''}" # Identify decision makers decision_makers = [c for c in contact_data if any( title_word in c.get('title', '').lower() for title_word in ['ceo', 'cto', 'cfo', 'vp', 'director', 'head', 'chief'] )] if decision_makers: contact_summary += f", including {len(decision_makers)} decision-maker{'s' if len(decision_makers) > 1 else ''}" contact_summary += "." # Fit assessment fit_assessment = "" if fit_score > 0: if fit_score >= 0.8: fit_assessment = " **High Priority:** This prospect shows excellent fit based on company size, industry, and technology profile." elif fit_score >= 0.6: fit_assessment = " **Good Fit:** This prospect demonstrates strong alignment with our ideal customer profile." elif fit_score >= 0.4: fit_assessment = " **Moderate Fit:** This prospect shows potential but may require additional qualification." else: fit_assessment = " **Low Priority:** This prospect shows limited fit with our target criteria." # Combine all sections full_summary = company_summary + contact_summary + fit_assessment return full_summary async def generate_client_summary( self, client_data: Dict, enrichment_data: Dict = None ) -> str: """ ENHANCED: Generate comprehensive AI summary for CLIENT company (the company we're selling FOR) Now uses LLM service with strict grounding to prevent hallucination Args: client_data: Client profile data with offerings, value props, etc. (INCLUDING raw_facts) enrichment_data: Optional enriched data Returns: Detailed client summary grounded in extracted facts """ logger.info(f"Generating GROUNDED client summary for: {client_data.get('name', 'Unknown')}") name = client_data.get('name', 'Unknown Company') # Merge enrichment data if available if enrichment_data: client_data = {**client_data, **enrichment_data} # Get raw facts for grounding (if available) raw_facts = client_data.get('raw_facts', []) # Use LLM service for grounded summarization summary = await self.llm.generate_grounded_summary( company_name=name, extracted_data=client_data, raw_facts=raw_facts, summary_type="client" ) return summary # Singleton instances _analytics_service: Optional[MCPAnalyticsService] = None _enrichment_service: Optional[MCPEnrichmentService] = None _validation_service: Optional[MCPValidationService] = None _summary_service: Optional[MCPSummaryService] = None def get_analytics_service() -> MCPAnalyticsService: """Get or create analytics service instance""" global _analytics_service if _analytics_service is None: _analytics_service = MCPAnalyticsService() return _analytics_service def get_enrichment_service() -> MCPEnrichmentService: """Get or create enrichment service instance""" global _enrichment_service if _enrichment_service is None: _enrichment_service = MCPEnrichmentService() return _enrichment_service def get_validation_service() -> MCPValidationService: """Get or create validation service instance""" global _validation_service if _validation_service is None: _validation_service = MCPValidationService() return _validation_service def get_summary_service() -> MCPSummaryService: """Get or create summary service instance""" global _summary_service if _summary_service is None: _summary_service = MCPSummaryService() return _summary_service