cx_ai_agent_v1 / app /orchestrator.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
# file: app/orchestrator.py
import asyncio
from typing import List, AsyncGenerator, Optional
from app.schema import Prospect, PipelineEvent, Company
from app.logging_utils import log_event, logger
from agents import (
Hunter, Enricher, Contactor, Scorer,
Writer, Compliance, Sequencer, Curator
)
from mcp.registry import MCPRegistry
class Orchestrator:
def __init__(self):
self.mcp = MCPRegistry()
self.hunter = Hunter(self.mcp)
self.enricher = Enricher(self.mcp)
self.contactor = Contactor(self.mcp)
self.scorer = Scorer(self.mcp)
self.writer = Writer(self.mcp)
self.compliance = Compliance(self.mcp)
self.sequencer = Sequencer(self.mcp)
self.curator = Curator(self.mcp)
async def run_pipeline(
self,
company_ids: Optional[List[str]] = None,
company_names: Optional[List[str]] = None,
use_seed_file: bool = False
) -> AsyncGenerator[dict, None]:
"""
Run the full pipeline with streaming events and detailed MCP tracking
Args:
company_ids: Legacy mode - company IDs from seed file
company_names: Dynamic mode - company names to discover
use_seed_file: Force legacy mode with seed file
"""
# Hunter phase
if company_names and not use_seed_file:
yield log_event("hunter", "Starting dynamic company discovery", "agent_start")
yield log_event("hunter", f"Discovering {len(company_names)} companies via web search", "mcp_call",
{"mcp_server": "web_search", "method": "discover_companies", "count": len(company_names)})
prospects = await self.hunter.run(company_names=company_names, use_seed_file=False)
yield log_event("hunter", f"Discovered {len(prospects)} companies from web search", "mcp_response",
{"mcp_server": "web_search", "companies_discovered": len(prospects)})
else:
yield log_event("hunter", "Starting prospect discovery (legacy mode)", "agent_start")
yield log_event("hunter", "Calling MCP Store to load seed companies", "mcp_call",
{"mcp_server": "store", "method": "load_companies"})
prospects = await self.hunter.run(company_ids=company_ids, use_seed_file=True)
yield log_event("hunter", f"MCP Store returned {len(prospects)} companies", "mcp_response",
{"mcp_server": "store", "companies_count": len(prospects)})
yield log_event("hunter", f"Found {len(prospects)} prospects", "agent_end",
{"count": len(prospects)})
for prospect in prospects:
try:
company_name = prospect.company.name
# Enricher phase
yield log_event("enricher", f"Enriching {company_name}", "agent_start")
yield log_event("enricher", f"Calling MCP Search for company facts", "mcp_call",
{"mcp_server": "search", "company": company_name})
prospect = await self.enricher.run(prospect)
yield log_event("enricher", f"MCP Search returned facts", "mcp_response",
{"mcp_server": "search", "facts_found": len(prospect.facts)})
yield log_event("enricher", f"Calling MCP Store to save {len(prospect.facts)} facts", "mcp_call",
{"mcp_server": "store", "method": "save_facts"})
yield log_event("enricher", f"Added {len(prospect.facts)} facts", "agent_end",
{"facts_count": len(prospect.facts)})
# Contactor phase
yield log_event("contactor", f"Finding contacts for {company_name}", "agent_start")
yield log_event("contactor", f"Calling MCP Store to check suppressions", "mcp_call",
{"mcp_server": "store", "method": "check_suppression", "domain": prospect.company.domain})
# Check suppression
store = self.mcp.get_store_client()
suppressed = await store.check_suppression("domain", prospect.company.domain)
if suppressed:
yield log_event("contactor", f"Domain {prospect.company.domain} is suppressed", "mcp_response",
{"mcp_server": "store", "suppressed": True})
else:
yield log_event("contactor", f"Domain {prospect.company.domain} is not suppressed", "mcp_response",
{"mcp_server": "store", "suppressed": False})
prospect = await self.contactor.run(prospect)
if prospect.contacts:
yield log_event("contactor", f"Calling MCP Store to save {len(prospect.contacts)} contacts", "mcp_call",
{"mcp_server": "store", "method": "save_contacts"})
yield log_event("contactor", f"Found {len(prospect.contacts)} contacts", "agent_end",
{"contacts_count": len(prospect.contacts)})
# Scorer phase
yield log_event("scorer", f"Scoring {company_name}", "agent_start")
yield log_event("scorer", "Calculating fit score based on industry, size, and pain points", "agent_log")
prospect = await self.scorer.run(prospect)
yield log_event("scorer", f"Calling MCP Store to save prospect with score", "mcp_call",
{"mcp_server": "store", "method": "save_prospect", "fit_score": prospect.fit_score})
yield log_event("scorer", f"Fit score: {prospect.fit_score:.2f}", "agent_end",
{"fit_score": prospect.fit_score, "status": prospect.status})
if prospect.status == "dropped":
yield log_event("scorer", f"Dropped: {prospect.dropped_reason}", "agent_log",
{"reason": prospect.dropped_reason})
continue
# Writer phase with streaming
yield log_event("writer", f"Drafting outreach for {company_name}", "agent_start")
yield log_event("writer", "Calling Vector Store for relevant facts", "mcp_call",
{"mcp_server": "vector", "method": "retrieve", "company_id": prospect.company.id})
yield log_event("writer", "Calling HuggingFace Inference API for content generation", "mcp_call",
{"mcp_server": "hf_inference", "model": "Qwen/Qwen2.5-7B-Instruct"})
async for event in self.writer.run_streaming(prospect):
if event["type"] == "llm_token":
yield event
elif event["type"] == "llm_done":
yield event
prospect = event["payload"]["prospect"]
yield log_event("writer", "HuggingFace Inference completed generation", "mcp_response",
{"mcp_server": "hf_inference", "has_summary": bool(prospect.summary),
"has_email": bool(prospect.email_draft)})
yield log_event("writer", f"Calling MCP Store to save draft", "mcp_call",
{"mcp_server": "store", "method": "save_prospect"})
yield log_event("writer", "Draft complete", "agent_end",
{"has_summary": bool(prospect.summary),
"has_email": bool(prospect.email_draft)})
# Compliance phase
yield log_event("compliance", f"Checking compliance for {company_name}", "agent_start")
yield log_event("compliance", "Calling MCP Store to check email/domain suppressions", "mcp_call",
{"mcp_server": "store", "method": "check_suppression"})
# Check each contact for suppression
for contact in prospect.contacts:
email_suppressed = await store.check_suppression("email", contact.email)
if email_suppressed:
yield log_event("compliance", f"Email {contact.email} is suppressed", "mcp_response",
{"mcp_server": "store", "suppressed": True})
yield log_event("compliance", "Checking CAN-SPAM, PECR, CASL requirements", "agent_log")
prospect = await self.compliance.run(prospect)
if prospect.status == "blocked":
yield log_event("compliance", f"Blocked: {prospect.dropped_reason}", "policy_block",
{"reason": prospect.dropped_reason})
continue
else:
yield log_event("compliance", "All compliance checks passed", "policy_pass")
yield log_event("compliance", "Footer appended to email", "agent_log")
# Sequencer phase
yield log_event("sequencer", f"Sequencing outreach for {company_name}", "agent_start")
if not prospect.contacts or not prospect.email_draft:
yield log_event("sequencer", "Missing contacts or email draft", "agent_log",
{"has_contacts": bool(prospect.contacts),
"has_email": bool(prospect.email_draft)})
prospect.status = "blocked"
prospect.dropped_reason = "No contacts or email draft available"
await store.save_prospect(prospect)
yield log_event("sequencer", f"Blocked: {prospect.dropped_reason}", "agent_end")
continue
yield log_event("sequencer", "Calling MCP Calendar for available slots", "mcp_call",
{"mcp_server": "calendar", "method": "suggest_slots"})
calendar = self.mcp.get_calendar_client()
slots = await calendar.suggest_slots()
yield log_event("sequencer", f"MCP Calendar returned {len(slots)} slots", "mcp_response",
{"mcp_server": "calendar", "slots_count": len(slots)})
if slots:
yield log_event("sequencer", "Calling MCP Calendar to generate ICS", "mcp_call",
{"mcp_server": "calendar", "method": "generate_ics"})
yield log_event("sequencer", f"Calling MCP Email to send to {prospect.contacts[0].email}", "mcp_call",
{"mcp_server": "email", "method": "send", "recipient": prospect.contacts[0].email})
prospect = await self.sequencer.run(prospect)
yield log_event("sequencer", f"MCP Email created thread", "mcp_response",
{"mcp_server": "email", "thread_id": prospect.thread_id})
yield log_event("sequencer", f"Thread created: {prospect.thread_id}", "agent_end",
{"thread_id": prospect.thread_id})
# Curator phase
yield log_event("curator", f"Creating handoff for {company_name}", "agent_start")
yield log_event("curator", "Calling MCP Email to retrieve thread", "mcp_call",
{"mcp_server": "email", "method": "get_thread", "prospect_id": prospect.id})
email_client = self.mcp.get_email_client()
thread = await email_client.get_thread(prospect.id) if prospect.thread_id else None
if thread:
yield log_event("curator", f"MCP Email returned thread with messages", "mcp_response",
{"mcp_server": "email", "has_thread": True})
yield log_event("curator", "Calling MCP Calendar for meeting slots", "mcp_call",
{"mcp_server": "calendar", "method": "suggest_slots"})
prospect = await self.curator.run(prospect)
yield log_event("curator", "Calling MCP Store to save handoff packet", "mcp_call",
{"mcp_server": "store", "method": "save_handoff"})
yield log_event("curator", "Handoff packet created and saved", "mcp_response",
{"mcp_server": "store", "saved": True})
yield log_event("curator", "Handoff ready", "agent_end",
{"prospect_id": prospect.id, "status": "ready_for_handoff"})
except Exception as e:
logger.error(f"Pipeline error for {prospect.company.name}: {e}")
yield log_event("orchestrator", f"Error: {str(e)}", "agent_log",
{"error": str(e), "prospect_id": prospect.id})