File size: 13,611 Bytes
8bab08d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# file: app/orchestrator.py
import asyncio
from typing import List, AsyncGenerator, Optional
from app.schema import Prospect, PipelineEvent, Company
from app.logging_utils import log_event, logger
from agents import (
Hunter, Enricher, Contactor, Scorer,
Writer, Compliance, Sequencer, Curator
)
from mcp.registry import MCPRegistry
class Orchestrator:
def __init__(self):
self.mcp = MCPRegistry()
self.hunter = Hunter(self.mcp)
self.enricher = Enricher(self.mcp)
self.contactor = Contactor(self.mcp)
self.scorer = Scorer(self.mcp)
self.writer = Writer(self.mcp)
self.compliance = Compliance(self.mcp)
self.sequencer = Sequencer(self.mcp)
self.curator = Curator(self.mcp)
async def run_pipeline(
self,
company_ids: Optional[List[str]] = None,
company_names: Optional[List[str]] = None,
use_seed_file: bool = False
) -> AsyncGenerator[dict, None]:
"""
Run the full pipeline with streaming events and detailed MCP tracking
Args:
company_ids: Legacy mode - company IDs from seed file
company_names: Dynamic mode - company names to discover
use_seed_file: Force legacy mode with seed file
"""
# Hunter phase
if company_names and not use_seed_file:
yield log_event("hunter", "Starting dynamic company discovery", "agent_start")
yield log_event("hunter", f"Discovering {len(company_names)} companies via web search", "mcp_call",
{"mcp_server": "web_search", "method": "discover_companies", "count": len(company_names)})
prospects = await self.hunter.run(company_names=company_names, use_seed_file=False)
yield log_event("hunter", f"Discovered {len(prospects)} companies from web search", "mcp_response",
{"mcp_server": "web_search", "companies_discovered": len(prospects)})
else:
yield log_event("hunter", "Starting prospect discovery (legacy mode)", "agent_start")
yield log_event("hunter", "Calling MCP Store to load seed companies", "mcp_call",
{"mcp_server": "store", "method": "load_companies"})
prospects = await self.hunter.run(company_ids=company_ids, use_seed_file=True)
yield log_event("hunter", f"MCP Store returned {len(prospects)} companies", "mcp_response",
{"mcp_server": "store", "companies_count": len(prospects)})
yield log_event("hunter", f"Found {len(prospects)} prospects", "agent_end",
{"count": len(prospects)})
for prospect in prospects:
try:
company_name = prospect.company.name
# Enricher phase
yield log_event("enricher", f"Enriching {company_name}", "agent_start")
yield log_event("enricher", f"Calling MCP Search for company facts", "mcp_call",
{"mcp_server": "search", "company": company_name})
prospect = await self.enricher.run(prospect)
yield log_event("enricher", f"MCP Search returned facts", "mcp_response",
{"mcp_server": "search", "facts_found": len(prospect.facts)})
yield log_event("enricher", f"Calling MCP Store to save {len(prospect.facts)} facts", "mcp_call",
{"mcp_server": "store", "method": "save_facts"})
yield log_event("enricher", f"Added {len(prospect.facts)} facts", "agent_end",
{"facts_count": len(prospect.facts)})
# Contactor phase
yield log_event("contactor", f"Finding contacts for {company_name}", "agent_start")
yield log_event("contactor", f"Calling MCP Store to check suppressions", "mcp_call",
{"mcp_server": "store", "method": "check_suppression", "domain": prospect.company.domain})
# Check suppression
store = self.mcp.get_store_client()
suppressed = await store.check_suppression("domain", prospect.company.domain)
if suppressed:
yield log_event("contactor", f"Domain {prospect.company.domain} is suppressed", "mcp_response",
{"mcp_server": "store", "suppressed": True})
else:
yield log_event("contactor", f"Domain {prospect.company.domain} is not suppressed", "mcp_response",
{"mcp_server": "store", "suppressed": False})
prospect = await self.contactor.run(prospect)
if prospect.contacts:
yield log_event("contactor", f"Calling MCP Store to save {len(prospect.contacts)} contacts", "mcp_call",
{"mcp_server": "store", "method": "save_contacts"})
yield log_event("contactor", f"Found {len(prospect.contacts)} contacts", "agent_end",
{"contacts_count": len(prospect.contacts)})
# Scorer phase
yield log_event("scorer", f"Scoring {company_name}", "agent_start")
yield log_event("scorer", "Calculating fit score based on industry, size, and pain points", "agent_log")
prospect = await self.scorer.run(prospect)
yield log_event("scorer", f"Calling MCP Store to save prospect with score", "mcp_call",
{"mcp_server": "store", "method": "save_prospect", "fit_score": prospect.fit_score})
yield log_event("scorer", f"Fit score: {prospect.fit_score:.2f}", "agent_end",
{"fit_score": prospect.fit_score, "status": prospect.status})
if prospect.status == "dropped":
yield log_event("scorer", f"Dropped: {prospect.dropped_reason}", "agent_log",
{"reason": prospect.dropped_reason})
continue
# Writer phase with streaming
yield log_event("writer", f"Drafting outreach for {company_name}", "agent_start")
yield log_event("writer", "Calling Vector Store for relevant facts", "mcp_call",
{"mcp_server": "vector", "method": "retrieve", "company_id": prospect.company.id})
yield log_event("writer", "Calling HuggingFace Inference API for content generation", "mcp_call",
{"mcp_server": "hf_inference", "model": "Qwen/Qwen2.5-7B-Instruct"})
async for event in self.writer.run_streaming(prospect):
if event["type"] == "llm_token":
yield event
elif event["type"] == "llm_done":
yield event
prospect = event["payload"]["prospect"]
yield log_event("writer", "HuggingFace Inference completed generation", "mcp_response",
{"mcp_server": "hf_inference", "has_summary": bool(prospect.summary),
"has_email": bool(prospect.email_draft)})
yield log_event("writer", f"Calling MCP Store to save draft", "mcp_call",
{"mcp_server": "store", "method": "save_prospect"})
yield log_event("writer", "Draft complete", "agent_end",
{"has_summary": bool(prospect.summary),
"has_email": bool(prospect.email_draft)})
# Compliance phase
yield log_event("compliance", f"Checking compliance for {company_name}", "agent_start")
yield log_event("compliance", "Calling MCP Store to check email/domain suppressions", "mcp_call",
{"mcp_server": "store", "method": "check_suppression"})
# Check each contact for suppression
for contact in prospect.contacts:
email_suppressed = await store.check_suppression("email", contact.email)
if email_suppressed:
yield log_event("compliance", f"Email {contact.email} is suppressed", "mcp_response",
{"mcp_server": "store", "suppressed": True})
yield log_event("compliance", "Checking CAN-SPAM, PECR, CASL requirements", "agent_log")
prospect = await self.compliance.run(prospect)
if prospect.status == "blocked":
yield log_event("compliance", f"Blocked: {prospect.dropped_reason}", "policy_block",
{"reason": prospect.dropped_reason})
continue
else:
yield log_event("compliance", "All compliance checks passed", "policy_pass")
yield log_event("compliance", "Footer appended to email", "agent_log")
# Sequencer phase
yield log_event("sequencer", f"Sequencing outreach for {company_name}", "agent_start")
if not prospect.contacts or not prospect.email_draft:
yield log_event("sequencer", "Missing contacts or email draft", "agent_log",
{"has_contacts": bool(prospect.contacts),
"has_email": bool(prospect.email_draft)})
prospect.status = "blocked"
prospect.dropped_reason = "No contacts or email draft available"
await store.save_prospect(prospect)
yield log_event("sequencer", f"Blocked: {prospect.dropped_reason}", "agent_end")
continue
yield log_event("sequencer", "Calling MCP Calendar for available slots", "mcp_call",
{"mcp_server": "calendar", "method": "suggest_slots"})
calendar = self.mcp.get_calendar_client()
slots = await calendar.suggest_slots()
yield log_event("sequencer", f"MCP Calendar returned {len(slots)} slots", "mcp_response",
{"mcp_server": "calendar", "slots_count": len(slots)})
if slots:
yield log_event("sequencer", "Calling MCP Calendar to generate ICS", "mcp_call",
{"mcp_server": "calendar", "method": "generate_ics"})
yield log_event("sequencer", f"Calling MCP Email to send to {prospect.contacts[0].email}", "mcp_call",
{"mcp_server": "email", "method": "send", "recipient": prospect.contacts[0].email})
prospect = await self.sequencer.run(prospect)
yield log_event("sequencer", f"MCP Email created thread", "mcp_response",
{"mcp_server": "email", "thread_id": prospect.thread_id})
yield log_event("sequencer", f"Thread created: {prospect.thread_id}", "agent_end",
{"thread_id": prospect.thread_id})
# Curator phase
yield log_event("curator", f"Creating handoff for {company_name}", "agent_start")
yield log_event("curator", "Calling MCP Email to retrieve thread", "mcp_call",
{"mcp_server": "email", "method": "get_thread", "prospect_id": prospect.id})
email_client = self.mcp.get_email_client()
thread = await email_client.get_thread(prospect.id) if prospect.thread_id else None
if thread:
yield log_event("curator", f"MCP Email returned thread with messages", "mcp_response",
{"mcp_server": "email", "has_thread": True})
yield log_event("curator", "Calling MCP Calendar for meeting slots", "mcp_call",
{"mcp_server": "calendar", "method": "suggest_slots"})
prospect = await self.curator.run(prospect)
yield log_event("curator", "Calling MCP Store to save handoff packet", "mcp_call",
{"mcp_server": "store", "method": "save_handoff"})
yield log_event("curator", "Handoff packet created and saved", "mcp_response",
{"mcp_server": "store", "saved": True})
yield log_event("curator", "Handoff ready", "agent_end",
{"prospect_id": prospect.id, "status": "ready_for_handoff"})
except Exception as e:
logger.error(f"Pipeline error for {prospect.company.name}: {e}")
yield log_event("orchestrator", f"Error: {str(e)}", "agent_log",
{"error": str(e), "prospect_id": prospect.id}) |