|
|
|
|
|
import asyncio |
|
|
from typing import List, AsyncGenerator, Optional |
|
|
from app.schema import Prospect, PipelineEvent, Company |
|
|
from app.logging_utils import log_event, logger |
|
|
from agents import ( |
|
|
Hunter, Enricher, Contactor, Scorer, |
|
|
Writer, Compliance, Sequencer, Curator |
|
|
) |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
class Orchestrator: |
|
|
def __init__(self): |
|
|
self.mcp = MCPRegistry() |
|
|
self.hunter = Hunter(self.mcp) |
|
|
self.enricher = Enricher(self.mcp) |
|
|
self.contactor = Contactor(self.mcp) |
|
|
self.scorer = Scorer(self.mcp) |
|
|
self.writer = Writer(self.mcp) |
|
|
self.compliance = Compliance(self.mcp) |
|
|
self.sequencer = Sequencer(self.mcp) |
|
|
self.curator = Curator(self.mcp) |
|
|
|
|
|
async def run_pipeline( |
|
|
self, |
|
|
company_ids: Optional[List[str]] = None, |
|
|
company_names: Optional[List[str]] = None, |
|
|
use_seed_file: bool = False |
|
|
) -> AsyncGenerator[dict, None]: |
|
|
""" |
|
|
Run the full pipeline with streaming events and detailed MCP tracking |
|
|
|
|
|
Args: |
|
|
company_ids: Legacy mode - company IDs from seed file |
|
|
company_names: Dynamic mode - company names to discover |
|
|
use_seed_file: Force legacy mode with seed file |
|
|
""" |
|
|
|
|
|
|
|
|
if company_names and not use_seed_file: |
|
|
yield log_event("hunter", "Starting dynamic company discovery", "agent_start") |
|
|
yield log_event("hunter", f"Discovering {len(company_names)} companies via web search", "mcp_call", |
|
|
{"mcp_server": "web_search", "method": "discover_companies", "count": len(company_names)}) |
|
|
|
|
|
prospects = await self.hunter.run(company_names=company_names, use_seed_file=False) |
|
|
|
|
|
yield log_event("hunter", f"Discovered {len(prospects)} companies from web search", "mcp_response", |
|
|
{"mcp_server": "web_search", "companies_discovered": len(prospects)}) |
|
|
else: |
|
|
yield log_event("hunter", "Starting prospect discovery (legacy mode)", "agent_start") |
|
|
yield log_event("hunter", "Calling MCP Store to load seed companies", "mcp_call", |
|
|
{"mcp_server": "store", "method": "load_companies"}) |
|
|
|
|
|
prospects = await self.hunter.run(company_ids=company_ids, use_seed_file=True) |
|
|
|
|
|
yield log_event("hunter", f"MCP Store returned {len(prospects)} companies", "mcp_response", |
|
|
{"mcp_server": "store", "companies_count": len(prospects)}) |
|
|
yield log_event("hunter", f"Found {len(prospects)} prospects", "agent_end", |
|
|
{"count": len(prospects)}) |
|
|
|
|
|
for prospect in prospects: |
|
|
try: |
|
|
company_name = prospect.company.name |
|
|
|
|
|
|
|
|
yield log_event("enricher", f"Enriching {company_name}", "agent_start") |
|
|
yield log_event("enricher", f"Calling MCP Search for company facts", "mcp_call", |
|
|
{"mcp_server": "search", "company": company_name}) |
|
|
|
|
|
prospect = await self.enricher.run(prospect) |
|
|
|
|
|
yield log_event("enricher", f"MCP Search returned facts", "mcp_response", |
|
|
{"mcp_server": "search", "facts_found": len(prospect.facts)}) |
|
|
yield log_event("enricher", f"Calling MCP Store to save {len(prospect.facts)} facts", "mcp_call", |
|
|
{"mcp_server": "store", "method": "save_facts"}) |
|
|
yield log_event("enricher", f"Added {len(prospect.facts)} facts", "agent_end", |
|
|
{"facts_count": len(prospect.facts)}) |
|
|
|
|
|
|
|
|
yield log_event("contactor", f"Finding contacts for {company_name}", "agent_start") |
|
|
yield log_event("contactor", f"Calling MCP Store to check suppressions", "mcp_call", |
|
|
{"mcp_server": "store", "method": "check_suppression", "domain": prospect.company.domain}) |
|
|
|
|
|
|
|
|
store = self.mcp.get_store_client() |
|
|
suppressed = await store.check_suppression("domain", prospect.company.domain) |
|
|
|
|
|
if suppressed: |
|
|
yield log_event("contactor", f"Domain {prospect.company.domain} is suppressed", "mcp_response", |
|
|
{"mcp_server": "store", "suppressed": True}) |
|
|
else: |
|
|
yield log_event("contactor", f"Domain {prospect.company.domain} is not suppressed", "mcp_response", |
|
|
{"mcp_server": "store", "suppressed": False}) |
|
|
|
|
|
prospect = await self.contactor.run(prospect) |
|
|
|
|
|
if prospect.contacts: |
|
|
yield log_event("contactor", f"Calling MCP Store to save {len(prospect.contacts)} contacts", "mcp_call", |
|
|
{"mcp_server": "store", "method": "save_contacts"}) |
|
|
|
|
|
yield log_event("contactor", f"Found {len(prospect.contacts)} contacts", "agent_end", |
|
|
{"contacts_count": len(prospect.contacts)}) |
|
|
|
|
|
|
|
|
yield log_event("scorer", f"Scoring {company_name}", "agent_start") |
|
|
yield log_event("scorer", "Calculating fit score based on industry, size, and pain points", "agent_log") |
|
|
|
|
|
prospect = await self.scorer.run(prospect) |
|
|
|
|
|
yield log_event("scorer", f"Calling MCP Store to save prospect with score", "mcp_call", |
|
|
{"mcp_server": "store", "method": "save_prospect", "fit_score": prospect.fit_score}) |
|
|
yield log_event("scorer", f"Fit score: {prospect.fit_score:.2f}", "agent_end", |
|
|
{"fit_score": prospect.fit_score, "status": prospect.status}) |
|
|
|
|
|
if prospect.status == "dropped": |
|
|
yield log_event("scorer", f"Dropped: {prospect.dropped_reason}", "agent_log", |
|
|
{"reason": prospect.dropped_reason}) |
|
|
continue |
|
|
|
|
|
|
|
|
yield log_event("writer", f"Drafting outreach for {company_name}", "agent_start") |
|
|
yield log_event("writer", "Calling Vector Store for relevant facts", "mcp_call", |
|
|
{"mcp_server": "vector", "method": "retrieve", "company_id": prospect.company.id}) |
|
|
yield log_event("writer", "Calling HuggingFace Inference API for content generation", "mcp_call", |
|
|
{"mcp_server": "hf_inference", "model": "Qwen/Qwen2.5-7B-Instruct"}) |
|
|
|
|
|
async for event in self.writer.run_streaming(prospect): |
|
|
if event["type"] == "llm_token": |
|
|
yield event |
|
|
elif event["type"] == "llm_done": |
|
|
yield event |
|
|
prospect = event["payload"]["prospect"] |
|
|
yield log_event("writer", "HuggingFace Inference completed generation", "mcp_response", |
|
|
{"mcp_server": "hf_inference", "has_summary": bool(prospect.summary), |
|
|
"has_email": bool(prospect.email_draft)}) |
|
|
|
|
|
yield log_event("writer", f"Calling MCP Store to save draft", "mcp_call", |
|
|
{"mcp_server": "store", "method": "save_prospect"}) |
|
|
yield log_event("writer", "Draft complete", "agent_end", |
|
|
{"has_summary": bool(prospect.summary), |
|
|
"has_email": bool(prospect.email_draft)}) |
|
|
|
|
|
|
|
|
yield log_event("compliance", f"Checking compliance for {company_name}", "agent_start") |
|
|
yield log_event("compliance", "Calling MCP Store to check email/domain suppressions", "mcp_call", |
|
|
{"mcp_server": "store", "method": "check_suppression"}) |
|
|
|
|
|
|
|
|
for contact in prospect.contacts: |
|
|
email_suppressed = await store.check_suppression("email", contact.email) |
|
|
if email_suppressed: |
|
|
yield log_event("compliance", f"Email {contact.email} is suppressed", "mcp_response", |
|
|
{"mcp_server": "store", "suppressed": True}) |
|
|
|
|
|
yield log_event("compliance", "Checking CAN-SPAM, PECR, CASL requirements", "agent_log") |
|
|
|
|
|
prospect = await self.compliance.run(prospect) |
|
|
|
|
|
if prospect.status == "blocked": |
|
|
yield log_event("compliance", f"Blocked: {prospect.dropped_reason}", "policy_block", |
|
|
{"reason": prospect.dropped_reason}) |
|
|
continue |
|
|
else: |
|
|
yield log_event("compliance", "All compliance checks passed", "policy_pass") |
|
|
yield log_event("compliance", "Footer appended to email", "agent_log") |
|
|
|
|
|
|
|
|
yield log_event("sequencer", f"Sequencing outreach for {company_name}", "agent_start") |
|
|
|
|
|
if not prospect.contacts or not prospect.email_draft: |
|
|
yield log_event("sequencer", "Missing contacts or email draft", "agent_log", |
|
|
{"has_contacts": bool(prospect.contacts), |
|
|
"has_email": bool(prospect.email_draft)}) |
|
|
prospect.status = "blocked" |
|
|
prospect.dropped_reason = "No contacts or email draft available" |
|
|
await store.save_prospect(prospect) |
|
|
yield log_event("sequencer", f"Blocked: {prospect.dropped_reason}", "agent_end") |
|
|
continue |
|
|
|
|
|
yield log_event("sequencer", "Calling MCP Calendar for available slots", "mcp_call", |
|
|
{"mcp_server": "calendar", "method": "suggest_slots"}) |
|
|
|
|
|
calendar = self.mcp.get_calendar_client() |
|
|
slots = await calendar.suggest_slots() |
|
|
|
|
|
yield log_event("sequencer", f"MCP Calendar returned {len(slots)} slots", "mcp_response", |
|
|
{"mcp_server": "calendar", "slots_count": len(slots)}) |
|
|
|
|
|
if slots: |
|
|
yield log_event("sequencer", "Calling MCP Calendar to generate ICS", "mcp_call", |
|
|
{"mcp_server": "calendar", "method": "generate_ics"}) |
|
|
|
|
|
yield log_event("sequencer", f"Calling MCP Email to send to {prospect.contacts[0].email}", "mcp_call", |
|
|
{"mcp_server": "email", "method": "send", "recipient": prospect.contacts[0].email}) |
|
|
|
|
|
prospect = await self.sequencer.run(prospect) |
|
|
|
|
|
yield log_event("sequencer", f"MCP Email created thread", "mcp_response", |
|
|
{"mcp_server": "email", "thread_id": prospect.thread_id}) |
|
|
yield log_event("sequencer", f"Thread created: {prospect.thread_id}", "agent_end", |
|
|
{"thread_id": prospect.thread_id}) |
|
|
|
|
|
|
|
|
yield log_event("curator", f"Creating handoff for {company_name}", "agent_start") |
|
|
yield log_event("curator", "Calling MCP Email to retrieve thread", "mcp_call", |
|
|
{"mcp_server": "email", "method": "get_thread", "prospect_id": prospect.id}) |
|
|
|
|
|
email_client = self.mcp.get_email_client() |
|
|
thread = await email_client.get_thread(prospect.id) if prospect.thread_id else None |
|
|
|
|
|
if thread: |
|
|
yield log_event("curator", f"MCP Email returned thread with messages", "mcp_response", |
|
|
{"mcp_server": "email", "has_thread": True}) |
|
|
|
|
|
yield log_event("curator", "Calling MCP Calendar for meeting slots", "mcp_call", |
|
|
{"mcp_server": "calendar", "method": "suggest_slots"}) |
|
|
|
|
|
prospect = await self.curator.run(prospect) |
|
|
|
|
|
yield log_event("curator", "Calling MCP Store to save handoff packet", "mcp_call", |
|
|
{"mcp_server": "store", "method": "save_handoff"}) |
|
|
yield log_event("curator", "Handoff packet created and saved", "mcp_response", |
|
|
{"mcp_server": "store", "saved": True}) |
|
|
yield log_event("curator", "Handoff ready", "agent_end", |
|
|
{"prospect_id": prospect.id, "status": "ready_for_handoff"}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Pipeline error for {prospect.company.name}: {e}") |
|
|
yield log_event("orchestrator", f"Error: {str(e)}", "agent_log", |
|
|
{"error": str(e), "prospect_id": prospect.id}) |