File size: 13,611 Bytes
8bab08d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# file: app/orchestrator.py
import asyncio
from typing import List, AsyncGenerator, Optional
from app.schema import Prospect, PipelineEvent, Company
from app.logging_utils import log_event, logger
from agents import (
    Hunter, Enricher, Contactor, Scorer, 
    Writer, Compliance, Sequencer, Curator
)
from mcp.registry import MCPRegistry

class Orchestrator:
    def __init__(self):
        self.mcp = MCPRegistry()
        self.hunter = Hunter(self.mcp)
        self.enricher = Enricher(self.mcp)
        self.contactor = Contactor(self.mcp)
        self.scorer = Scorer(self.mcp)
        self.writer = Writer(self.mcp)
        self.compliance = Compliance(self.mcp)
        self.sequencer = Sequencer(self.mcp)
        self.curator = Curator(self.mcp)
    
    async def run_pipeline(
        self,
        company_ids: Optional[List[str]] = None,
        company_names: Optional[List[str]] = None,
        use_seed_file: bool = False
    ) -> AsyncGenerator[dict, None]:
        """
        Run the full pipeline with streaming events and detailed MCP tracking

        Args:
            company_ids: Legacy mode - company IDs from seed file
            company_names: Dynamic mode - company names to discover
            use_seed_file: Force legacy mode with seed file
        """

        # Hunter phase
        if company_names and not use_seed_file:
            yield log_event("hunter", "Starting dynamic company discovery", "agent_start")
            yield log_event("hunter", f"Discovering {len(company_names)} companies via web search", "mcp_call",
                           {"mcp_server": "web_search", "method": "discover_companies", "count": len(company_names)})

            prospects = await self.hunter.run(company_names=company_names, use_seed_file=False)

            yield log_event("hunter", f"Discovered {len(prospects)} companies from web search", "mcp_response",
                           {"mcp_server": "web_search", "companies_discovered": len(prospects)})
        else:
            yield log_event("hunter", "Starting prospect discovery (legacy mode)", "agent_start")
            yield log_event("hunter", "Calling MCP Store to load seed companies", "mcp_call",
                           {"mcp_server": "store", "method": "load_companies"})

            prospects = await self.hunter.run(company_ids=company_ids, use_seed_file=True)
        
        yield log_event("hunter", f"MCP Store returned {len(prospects)} companies", "mcp_response",
                       {"mcp_server": "store", "companies_count": len(prospects)})
        yield log_event("hunter", f"Found {len(prospects)} prospects", "agent_end", 
                       {"count": len(prospects)})
        
        for prospect in prospects:
            try:
                company_name = prospect.company.name
                
                # Enricher phase
                yield log_event("enricher", f"Enriching {company_name}", "agent_start")
                yield log_event("enricher", f"Calling MCP Search for company facts", "mcp_call",
                               {"mcp_server": "search", "company": company_name})
                
                prospect = await self.enricher.run(prospect)
                
                yield log_event("enricher", f"MCP Search returned facts", "mcp_response",
                               {"mcp_server": "search", "facts_found": len(prospect.facts)})
                yield log_event("enricher", f"Calling MCP Store to save {len(prospect.facts)} facts", "mcp_call",
                               {"mcp_server": "store", "method": "save_facts"})
                yield log_event("enricher", f"Added {len(prospect.facts)} facts", "agent_end",
                               {"facts_count": len(prospect.facts)})
                
                # Contactor phase
                yield log_event("contactor", f"Finding contacts for {company_name}", "agent_start")
                yield log_event("contactor", f"Calling MCP Store to check suppressions", "mcp_call",
                               {"mcp_server": "store", "method": "check_suppression", "domain": prospect.company.domain})
                
                # Check suppression
                store = self.mcp.get_store_client()
                suppressed = await store.check_suppression("domain", prospect.company.domain)
                
                if suppressed:
                    yield log_event("contactor", f"Domain {prospect.company.domain} is suppressed", "mcp_response",
                                   {"mcp_server": "store", "suppressed": True})
                else:
                    yield log_event("contactor", f"Domain {prospect.company.domain} is not suppressed", "mcp_response",
                                   {"mcp_server": "store", "suppressed": False})
                
                prospect = await self.contactor.run(prospect)
                
                if prospect.contacts:
                    yield log_event("contactor", f"Calling MCP Store to save {len(prospect.contacts)} contacts", "mcp_call",
                                   {"mcp_server": "store", "method": "save_contacts"})
                
                yield log_event("contactor", f"Found {len(prospect.contacts)} contacts", "agent_end",
                               {"contacts_count": len(prospect.contacts)})
                
                # Scorer phase
                yield log_event("scorer", f"Scoring {company_name}", "agent_start")
                yield log_event("scorer", "Calculating fit score based on industry, size, and pain points", "agent_log")
                
                prospect = await self.scorer.run(prospect)
                
                yield log_event("scorer", f"Calling MCP Store to save prospect with score", "mcp_call",
                               {"mcp_server": "store", "method": "save_prospect", "fit_score": prospect.fit_score})
                yield log_event("scorer", f"Fit score: {prospect.fit_score:.2f}", "agent_end",
                               {"fit_score": prospect.fit_score, "status": prospect.status})
                
                if prospect.status == "dropped":
                    yield log_event("scorer", f"Dropped: {prospect.dropped_reason}", "agent_log",
                                   {"reason": prospect.dropped_reason})
                    continue
                
                # Writer phase with streaming
                yield log_event("writer", f"Drafting outreach for {company_name}", "agent_start")
                yield log_event("writer", "Calling Vector Store for relevant facts", "mcp_call",
                               {"mcp_server": "vector", "method": "retrieve", "company_id": prospect.company.id})
                yield log_event("writer", "Calling HuggingFace Inference API for content generation", "mcp_call",
                               {"mcp_server": "hf_inference", "model": "Qwen/Qwen2.5-7B-Instruct"})
                
                async for event in self.writer.run_streaming(prospect):
                    if event["type"] == "llm_token":
                        yield event
                    elif event["type"] == "llm_done":
                        yield event
                        prospect = event["payload"]["prospect"]
                        yield log_event("writer", "HuggingFace Inference completed generation", "mcp_response",
                                       {"mcp_server": "hf_inference", "has_summary": bool(prospect.summary),
                                        "has_email": bool(prospect.email_draft)})
                
                yield log_event("writer", f"Calling MCP Store to save draft", "mcp_call",
                               {"mcp_server": "store", "method": "save_prospect"})
                yield log_event("writer", "Draft complete", "agent_end",
                               {"has_summary": bool(prospect.summary),
                                "has_email": bool(prospect.email_draft)})
                
                # Compliance phase
                yield log_event("compliance", f"Checking compliance for {company_name}", "agent_start")
                yield log_event("compliance", "Calling MCP Store to check email/domain suppressions", "mcp_call",
                               {"mcp_server": "store", "method": "check_suppression"})
                
                # Check each contact for suppression
                for contact in prospect.contacts:
                    email_suppressed = await store.check_suppression("email", contact.email)
                    if email_suppressed:
                        yield log_event("compliance", f"Email {contact.email} is suppressed", "mcp_response",
                                       {"mcp_server": "store", "suppressed": True})
                
                yield log_event("compliance", "Checking CAN-SPAM, PECR, CASL requirements", "agent_log")
                
                prospect = await self.compliance.run(prospect)
                
                if prospect.status == "blocked":
                    yield log_event("compliance", f"Blocked: {prospect.dropped_reason}", "policy_block",
                                   {"reason": prospect.dropped_reason})
                    continue
                else:
                    yield log_event("compliance", "All compliance checks passed", "policy_pass")
                    yield log_event("compliance", "Footer appended to email", "agent_log")
                
                # Sequencer phase
                yield log_event("sequencer", f"Sequencing outreach for {company_name}", "agent_start")
                
                if not prospect.contacts or not prospect.email_draft:
                    yield log_event("sequencer", "Missing contacts or email draft", "agent_log",
                                   {"has_contacts": bool(prospect.contacts),
                                    "has_email": bool(prospect.email_draft)})
                    prospect.status = "blocked"
                    prospect.dropped_reason = "No contacts or email draft available"
                    await store.save_prospect(prospect)
                    yield log_event("sequencer", f"Blocked: {prospect.dropped_reason}", "agent_end")
                    continue
                
                yield log_event("sequencer", "Calling MCP Calendar for available slots", "mcp_call",
                               {"mcp_server": "calendar", "method": "suggest_slots"})
                
                calendar = self.mcp.get_calendar_client()
                slots = await calendar.suggest_slots()
                
                yield log_event("sequencer", f"MCP Calendar returned {len(slots)} slots", "mcp_response",
                               {"mcp_server": "calendar", "slots_count": len(slots)})
                
                if slots:
                    yield log_event("sequencer", "Calling MCP Calendar to generate ICS", "mcp_call",
                                   {"mcp_server": "calendar", "method": "generate_ics"})
                
                yield log_event("sequencer", f"Calling MCP Email to send to {prospect.contacts[0].email}", "mcp_call",
                               {"mcp_server": "email", "method": "send", "recipient": prospect.contacts[0].email})
                
                prospect = await self.sequencer.run(prospect)
                
                yield log_event("sequencer", f"MCP Email created thread", "mcp_response",
                               {"mcp_server": "email", "thread_id": prospect.thread_id})
                yield log_event("sequencer", f"Thread created: {prospect.thread_id}", "agent_end",
                               {"thread_id": prospect.thread_id})
                
                # Curator phase
                yield log_event("curator", f"Creating handoff for {company_name}", "agent_start")
                yield log_event("curator", "Calling MCP Email to retrieve thread", "mcp_call",
                               {"mcp_server": "email", "method": "get_thread", "prospect_id": prospect.id})
                
                email_client = self.mcp.get_email_client()
                thread = await email_client.get_thread(prospect.id) if prospect.thread_id else None
                
                if thread:
                    yield log_event("curator", f"MCP Email returned thread with messages", "mcp_response",
                                   {"mcp_server": "email", "has_thread": True})
                
                yield log_event("curator", "Calling MCP Calendar for meeting slots", "mcp_call",
                               {"mcp_server": "calendar", "method": "suggest_slots"})
                
                prospect = await self.curator.run(prospect)
                
                yield log_event("curator", "Calling MCP Store to save handoff packet", "mcp_call",
                               {"mcp_server": "store", "method": "save_handoff"})
                yield log_event("curator", "Handoff packet created and saved", "mcp_response",
                               {"mcp_server": "store", "saved": True})
                yield log_event("curator", "Handoff ready", "agent_end",
                               {"prospect_id": prospect.id, "status": "ready_for_handoff"})
                
            except Exception as e:
                logger.error(f"Pipeline error for {prospect.company.name}: {e}")
                yield log_event("orchestrator", f"Error: {str(e)}", "agent_log",
                               {"error": str(e), "prospect_id": prospect.id})