cx_ai_agent_v1 / mcp /agents /autonomous_agent_hf.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Autonomous AI Agent with MCP Tool Calling using HuggingFace Inference Providers
This agent uses HuggingFace's Inference Providers API with native tool calling
support to autonomously decide which MCP tools to call.
Benefits:
- Uses HuggingFace unified API (single HF token for all providers)
- Native tool calling support (OpenAI-compatible API)
- Multiple providers: Nebius, Together, Sambanova, etc.
- Models like Qwen2.5-72B-Instruct with strong tool calling
- Free tier available with HuggingFace account
"""
import os
import json
import uuid
import logging
import asyncio
from typing import List, Dict, Any, AsyncGenerator
from mcp.tools.definitions import MCP_TOOLS, list_all_tools
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
# Free models available via HuggingFace Serverless Inference API
# These don't require paid provider credits
FREE_MODELS = [
"mistralai/Mistral-7B-Instruct-v0.3", # Fast, good quality
"microsoft/Phi-3-mini-4k-instruct", # Small, fast
"HuggingFaceH4/zephyr-7b-beta", # Good for chat
"meta-llama/Llama-3.2-3B-Instruct", # Meta's small model
"Qwen/Qwen2.5-3B-Instruct", # Qwen small
]
# Paid provider models (require credits)
QWEN3_MODELS = [
"Qwen/Qwen3-32B",
"Qwen/Qwen3-8B",
"Qwen/Qwen3-4B",
]
# HuggingFace Inference Providers
HF_PROVIDERS = {
"nscale": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"}, # nscale provider
"nebius": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"},
"together": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"},
"sambanova": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-8B"},
"fireworks-ai": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-8B"},
"cerebras": {"models": ["Qwen/Qwen3-32B"], "default": "Qwen/Qwen3-32B"},
}
# Default to FREE serverless API (no provider = serverless)
DEFAULT_PROVIDER = "hf-inference" # Special value for free serverless
DEFAULT_MODEL = "mistralai/Mistral-7B-Instruct-v0.3"
class AutonomousMCPAgentHF:
"""
AI Agent that autonomously uses MCP servers as tools using HuggingFace Inference Providers.
Uses native tool calling (OpenAI-compatible) for reliable tool execution.
HuggingFace routes requests to inference providers like Nebius, Together, etc.
"""
def __init__(
self,
mcp_registry: MCPRegistry,
hf_token: str = None,
provider: str = None,
model: str = None
):
"""
Initialize the autonomous agent with HuggingFace Inference Providers
Args:
mcp_registry: MCP registry with all servers
hf_token: HuggingFace token (get at huggingface.co/settings/tokens)
provider: Inference provider (nebius, together, sambanova, etc.)
model: Model to use (default: Qwen/Qwen2.5-72B-Instruct)
"""
self.mcp_registry = mcp_registry
self.hf_token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")
self.model = model or os.getenv("HF_MODEL") or DEFAULT_MODEL
# Use provider in this order: passed param > env var > auto-detect
if provider:
self.provider = provider
elif os.getenv("HF_PROVIDER"):
self.provider = os.getenv("HF_PROVIDER")
elif self.model in QWEN3_MODELS or self.model.startswith("Qwen/Qwen3"):
# Qwen3 models need a provider (use nscale by default)
self.provider = "nscale"
else:
self.provider = DEFAULT_PROVIDER
if not self.hf_token:
raise ValueError(
"HF_TOKEN is required!\n"
"Get a token at: https://huggingface.co/settings/tokens\n"
"Then set: export HF_TOKEN=hf_your_token_here"
)
# Initialize HuggingFace InferenceClient
try:
from huggingface_hub import InferenceClient
# For serverless API (hf-inference), don't pass provider
if self.provider == "hf-inference":
self.client = InferenceClient(token=self.hf_token)
else:
self.client = InferenceClient(
provider=self.provider,
token=self.hf_token
)
logger.info(f"HuggingFace InferenceClient initialized")
logger.info(f" Provider: {self.provider}")
logger.info(f" Model: {self.model}")
except ImportError:
raise ImportError(
"huggingface_hub package not installed or outdated!\n"
"Install/upgrade with: pip install --upgrade huggingface_hub"
)
# Create tool definitions in OpenAI/HF format
self.tools = self._create_tool_definitions()
logger.info(f"Autonomous MCP Agent initialized with HuggingFace ({self.provider})")
logger.info(f"Available tools: {len(self.tools)}")
def _create_tool_definitions(self) -> List[Dict[str, Any]]:
"""Convert MCP tool definitions to OpenAI/HuggingFace function calling format"""
tools = []
for mcp_tool in MCP_TOOLS:
tool = {
"type": "function",
"function": {
"name": mcp_tool["name"],
"description": mcp_tool["description"],
"parameters": mcp_tool["input_schema"]
}
}
tools.append(tool)
return tools
async def run(
self,
task: str,
max_iterations: int = 15
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Run the agent autonomously on a task using native tool calling.
Args:
task: The task to complete
max_iterations: Maximum tool calls to prevent infinite loops
Yields:
Events showing agent's progress and tool calls
"""
yield {
"type": "agent_start",
"message": f"Autonomous AI Agent (HuggingFace) starting task",
"task": task,
"model": self.model,
"provider": self.provider
}
# System prompt for the agent
system_prompt = """You are an autonomous AI agent for B2B sales automation.
You have access to MCP tools including:
- search_web: Search the web for company information
- find_verified_contacts: Find REAL decision-makers (searches LinkedIn, company websites, directories)
- save_prospect: Save a prospect company to the database
- send_email: Draft outreach emails
CRITICAL RULE: Only save prospects that have verified contacts. No contacts = don't save.
REQUIRED WORKFLOW:
1. search_web to find potential prospect companies
2. find_verified_contacts FIRST to check if contacts exist
3. IF contacts found (count > 0): save_prospect, then send_email
4. IF no contacts found (count = 0): SKIP this company, try the next one
TOOL CALL FORMAT - output valid JSON:
Step 1 - Find contacts FIRST:
{"company_name": "Acme Corp", "company_domain": "acme.com", "target_titles": ["CEO", "Founder", "VP Sales", "CTO"], "max_contacts": 3}
Step 2 - ONLY if contacts found, save prospect:
{"prospect_id": "prospect_1", "company_id": "company_1", "company_name": "Acme Corp", "company_domain": "acme.com", "fit_score": 85}
The find_verified_contacts tool searches:
- Company website (team/about pages)
- LinkedIn profiles
- Crunchbase, ZoomInfo, directories
- Press releases and news
- Social media profiles
IMPORTANT:
- A prospect without contacts is USELESS - don't save it
- NEVER invent contact names or emails
- Keep searching until you find prospects WITH verified contacts
After completing, summarize:
- Prospects saved (with contacts)
- Companies skipped (no contacts)"""
# Initialize conversation
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
]
iteration = 0
while iteration < max_iterations:
iteration += 1
yield {
"type": "iteration_start",
"iteration": iteration,
"message": f"Iteration {iteration}: AI reasoning..."
}
try:
# Call HuggingFace Inference API with tools
logger.info(f"Calling HuggingFace API (iteration {iteration})...")
logger.info(f" Provider: {self.provider}, Model: {self.model}")
# Run synchronous API call in executor
response = await asyncio.get_event_loop().run_in_executor(
None,
self._call_inference_api,
messages
)
# Handle response
if response is None:
yield {
"type": "agent_error",
"error": "Empty response from API",
"message": "API returned empty response"
}
break
# Get the assistant message
assistant_message = response.choices[0].message
# Check if AI wants to call tools
if hasattr(assistant_message, 'tool_calls') and assistant_message.tool_calls:
# Process each tool call
tool_results = []
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
try:
tool_input = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
tool_input = {}
yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_input,
"message": f"Action: {tool_name}"
}
# Execute the MCP tool
try:
result = await self._execute_mcp_tool(tool_name, tool_input)
yield {
"type": "tool_result",
"tool": tool_name,
"result": result,
"message": f"Tool {tool_name} completed"
}
tool_results.append({
"tool_call_id": tool_call.id,
"role": "tool",
"content": json.dumps(result, default=str)
})
except Exception as e:
error_msg = str(e)
logger.error(f"Tool execution failed: {tool_name} - {error_msg}")
yield {
"type": "tool_error",
"tool": tool_name,
"error": error_msg,
"message": f"Tool {tool_name} failed: {error_msg}"
}
tool_results.append({
"tool_call_id": tool_call.id,
"role": "tool",
"content": json.dumps({"error": error_msg})
})
# Add assistant message and tool results to conversation
messages.append({
"role": "assistant",
"content": assistant_message.content or "",
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in assistant_message.tool_calls
]
})
messages.extend(tool_results)
else:
# No tool calls - AI is done or providing response
final_content = assistant_message.content or ""
raw_content = getattr(assistant_message, 'raw_content', final_content)
# Log for debugging
logger.info(f"Iteration {iteration}: No tool calls")
logger.info(f" Raw content length: {len(raw_content)}")
logger.info(f" Stripped content length: {len(final_content)}")
if raw_content and not final_content:
logger.info(f" Raw content preview: {raw_content[:200]}...")
# Always yield thought event if we have ANY content (for tracking)
if final_content:
yield {
"type": "thought",
"thought": final_content,
"message": f"AI Response: {final_content[:100]}..." if len(final_content) > 100 else f"AI Response: {final_content}"
}
elif raw_content:
# Content was stripped but raw exists - yield a minimal thought
yield {
"type": "thought",
"thought": f"[Processing: {len(raw_content)} chars of reasoning]",
"message": "AI is reasoning..."
}
# Check if this looks like a final answer (after at least one iteration)
if iteration > 1:
# Ensure we have some content for final answer
if not final_content and raw_content:
# Try to extract something useful from raw thinking
import re
think_match = re.search(r'<think>(.*?)</think>', raw_content, flags=re.DOTALL)
if think_match:
think_text = think_match.group(1).strip()
# Get last meaningful portion
sentences = [s.strip() for s in think_text.split('.') if len(s.strip()) > 20]
if sentences:
final_content = '. '.join(sentences[-5:]) + '.'
logger.info(f"Extracted final answer from thinking: {final_content[:100]}...")
yield {
"type": "agent_complete",
"message": "Task complete!",
"final_answer": final_content,
"iterations": iteration
}
break
# Add response to messages and continue
messages.append({
"role": "assistant",
"content": final_content or raw_content[:500] if raw_content else ""
})
except Exception as e:
error_msg = str(e)
logger.error(f"HuggingFace API error: {error_msg}", exc_info=True)
# Check for common errors
if "401" in error_msg or "unauthorized" in error_msg.lower():
yield {
"type": "agent_error",
"error": "Invalid HF_TOKEN",
"message": "Authentication failed. Please check your HF_TOKEN."
}
elif "rate" in error_msg.lower() or "limit" in error_msg.lower():
yield {
"type": "agent_error",
"error": "Rate limit reached",
"message": "Rate limit reached. Try again later or upgrade to HF PRO."
}
else:
yield {
"type": "agent_error",
"error": error_msg,
"message": f"API error: {error_msg}"
}
break
if iteration >= max_iterations:
yield {
"type": "agent_max_iterations",
"message": f"Reached maximum iterations ({max_iterations})",
"iterations": iteration
}
def _call_inference_api(self, messages: List[Dict], retry_count: int = 0) -> Any:
"""
Call HuggingFace Inference API via the new router endpoint.
Uses the configured provider (e.g., nscale for Qwen3-32B).
"""
import requests
headers = {
"Authorization": f"Bearer {self.hf_token}",
"Content-Type": "application/json"
}
last_error = None
# Add provider header if using a specific provider
if self.provider and self.provider != "hf-inference":
headers["X-HF-Provider"] = self.provider
# Use the router endpoint for chat completions
api_url = "https://router.huggingface.co/v1/chat/completions"
# Try the configured model first
try:
logger.info(f"Trying primary model: {self.model} via {self.provider}")
payload = {
"model": self.model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7,
"stream": False,
"tools": self.tools, # Include tool definitions!
"tool_choice": "auto" # Let model decide when to use tools
}
response = requests.post(api_url, headers=headers, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
logger.info(f"Success with {self.model} via {self.provider}")
return self._create_chat_response(result)
elif response.status_code == 402:
logger.warning(f"Payment required for {self.model} via {self.provider}. Falling back...")
last_error = "Payment required - exceeded monthly credits"
elif response.status_code == 404:
logger.warning(f"Model {self.model} not found via {self.provider}. Falling back...")
last_error = f"Model not found via {self.provider}"
else:
logger.warning(f"Model {self.model} returned {response.status_code}: {response.text[:200]}")
last_error = f"HTTP {response.status_code}"
except Exception as e:
last_error = str(e)
logger.warning(f"Primary model failed: {last_error}")
# Fallback models with their providers
fallback_models = [
("Qwen/Qwen2.5-72B-Instruct", None), # No provider = serverless
("meta-llama/Llama-3.1-70B-Instruct", None),
("mistralai/Mixtral-8x7B-Instruct-v0.1", None),
("Qwen/Qwen3-32B", "nebius"), # Try nebius as backup
("Qwen/Qwen3-8B", "together"), # Try together as backup
]
for model, provider in fallback_models:
try:
logger.info(f"Trying fallback model: {model}" + (f" via {provider}" if provider else ""))
payload = {
"model": model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7,
"stream": False,
"tools": self.tools, # Include tool definitions!
"tool_choice": "auto"
}
# Set headers for this fallback
fallback_headers = {
"Authorization": f"Bearer {self.hf_token}",
"Content-Type": "application/json"
}
if provider:
fallback_headers["X-HF-Provider"] = provider
response = requests.post(api_url, headers=fallback_headers, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
logger.info(f"Success with fallback model: {model}")
return self._create_chat_response(result)
elif response.status_code in [402, 404]:
logger.warning(f"Model {model} returned {response.status_code}, trying next...")
continue
elif response.status_code == 503:
logger.info(f"Model {model} is loading, trying next...")
continue
else:
logger.warning(f"Model {model} returned {response.status_code}")
continue
except Exception as e:
last_error = str(e)
logger.warning(f"Model {model} failed: {str(e)[:100]}")
continue
logger.error(f"All models failed. Last error: {last_error}")
raise Exception(f"All inference attempts failed: {last_error}")
def _strip_thinking_tags(self, text: str) -> str:
"""Remove Qwen3's <think>...</think> tags and return the actual response"""
import re
if not text:
return ""
# Remove <think>...</think> blocks (Qwen3 chain-of-thought)
cleaned = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
result = cleaned.strip()
# If stripped content is empty but original had thinking, extract a summary
if not result and '<think>' in text:
# Try to extract the last meaningful sentence from thinking as a fallback
think_match = re.search(r'<think>(.*?)</think>', text, flags=re.DOTALL)
if think_match:
think_content = think_match.group(1).strip()
# Get last few sentences as summary (model's conclusion)
sentences = [s.strip() for s in think_content.split('.') if s.strip()]
if sentences:
# Return last 2-3 meaningful sentences as the response
result = '. '.join(sentences[-3:]) + '.'
logger.info(f"Extracted thinking summary: {result[:100]}...")
return result
def _create_chat_response(self, result: dict) -> Any:
"""Create a response object from chat completion result"""
strip_thinking = self._strip_thinking_tags
class MockChoice:
def __init__(self, message_data):
self.message = MockMessage(message_data)
class MockMessage:
def __init__(self, data):
# Handle None content properly (API might return {"content": null})
raw_content = data.get("content") or ""
# Strip Qwen3 thinking tags to get actual response
self.content = strip_thinking(raw_content)
# Store raw content for debugging/fallback
self.raw_content = raw_content
self.tool_calls = self._parse_tool_calls_from_response(data, raw_content)
def _parse_tool_calls_from_response(self, data, raw_content):
"""Parse tool calls from API response or from content"""
# Check if API returned tool_calls directly
if "tool_calls" in data and data["tool_calls"]:
return [MockToolCall(tc) for tc in data["tool_calls"]]
# Otherwise try to parse from content (use raw content to find tool calls)
return self._parse_tool_calls_from_text(raw_content)
def _infer_tool_from_params(self, params):
"""Infer tool name from parameter keys"""
if not isinstance(params, dict):
return None
keys = set(params.keys())
# Check for discover_prospects_with_contacts (HIGHEST PRIORITY - all-in-one tool)
if "client_company" in keys and "client_industry" in keys:
return "discover_prospects_with_contacts"
if "client_company" in keys and "target_prospects" in keys:
return "discover_prospects_with_contacts"
# Check for find_verified_contacts patterns (single company)
if "company_name" in keys and "company_domain" in keys and "target_titles" in keys:
return "find_verified_contacts"
if "company_name" in keys and "company_domain" in keys and "max_contacts" in keys:
return "find_verified_contacts"
# Check for save_prospect patterns
if "prospect_id" in keys or ("company_name" in keys and "fit_score" in keys):
return "save_prospect"
# Check for save_company patterns
if "company_id" in keys and ("name" in keys or "domain" in keys) and "prospect_id" not in keys:
return "save_company"
# Check for save_contact patterns (only for contacts returned by find_verified_contacts)
if "contact_id" in keys or ("email" in keys and ("first_name" in keys or "last_name" in keys)):
return "save_contact"
# Check for send_email patterns
if "to" in keys and "subject" in keys and "body" in keys:
return "send_email"
# Check for search patterns
if "query" in keys and len(keys) <= 2:
return "search_web"
# Check for save_fact patterns
if "fact_type" in keys or ("content" in keys and "company_id" in keys):
return "save_fact"
return None
def _parse_tool_calls_from_text(self, text):
"""Try to parse tool calls from text response - handles Qwen3 text-based tool descriptions"""
import re
tool_calls = []
def extract_json_objects(text):
"""Extract all JSON objects from text, handling nested braces"""
objects = []
i = 0
while i < len(text):
if text[i] == '{':
start = i
depth = 1
i += 1
while i < len(text) and depth > 0:
if text[i] == '{':
depth += 1
elif text[i] == '}':
depth -= 1
i += 1
if depth == 0:
try:
obj = json.loads(text[start:i])
objects.append(obj)
except:
pass
else:
i += 1
return objects
# IMPORTANT: Search BOTH raw text AND stripped text for JSON objects
# Qwen3 may put tool calls inside <think> tags
all_json_objects = extract_json_objects(text) # Search raw first
# Also search stripped version in case JSON is outside think tags
text_clean = strip_thinking(text)
if text_clean != text:
all_json_objects.extend(extract_json_objects(text_clean))
logger.info(f"Found {len(all_json_objects)} JSON objects in response")
# Process each JSON object and infer tool
seen_signatures = set() # Avoid duplicates
for obj in all_json_objects:
tool_name = self._infer_tool_from_params(obj)
if tool_name:
# Create a signature to avoid duplicates
sig = f"{tool_name}:{json.dumps(obj, sort_keys=True)}"
if sig not in seen_signatures:
seen_signatures.add(sig)
tool_calls.append(MockToolCallFromText({"tool": tool_name, "parameters": obj}))
logger.info(f"Parsed tool call: {tool_name} with params: {list(obj.keys())}")
# Also check code fence blocks (sometimes JSON is formatted there)
code_blocks = re.findall(r'```(?:json)?\s*(.+?)\s*```', text_clean, re.DOTALL)
for block in code_blocks:
block_objects = extract_json_objects(block)
for obj in block_objects:
tool_name = self._infer_tool_from_params(obj)
if tool_name:
sig = f"{tool_name}:{json.dumps(obj, sort_keys=True)}"
if sig not in seen_signatures:
seen_signatures.add(sig)
tool_calls.append(MockToolCallFromText({"tool": tool_name, "parameters": obj}))
logger.info(f"Parsed tool from code block: {tool_name}")
if tool_calls:
logger.info(f"Total tool calls parsed from text: {len(tool_calls)}")
return tool_calls if tool_calls else None
class MockToolCall:
def __init__(self, data):
self.function = MockFunction(data.get("function", {}))
self.id = data.get("id", f"call_{id(self)}")
class MockToolCallFromText:
def __init__(self, data):
self.function = MockFunctionFromText(data)
self.id = f"call_{id(self)}"
class MockFunction:
def __init__(self, data):
self.name = data.get("name", "")
self.arguments = data.get("arguments", "{}")
class MockFunctionFromText:
def __init__(self, data):
self.name = data.get("tool", data.get("name", ""))
self.arguments = json.dumps(data.get("parameters", data.get("arguments", {})))
class MockResponse:
def __init__(self, result):
choices_data = result.get("choices", [])
if choices_data:
self.choices = [MockChoice(c.get("message", {})) for c in choices_data]
else:
self.choices = []
return MockResponse(result)
async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
"""
Execute an MCP tool by routing to the appropriate MCP server.
This is where we actually call the MCP servers!
"""
# ============ SEARCH MCP SERVER ============
if tool_name == "search_web":
query = tool_input["query"]
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(query, max_results=max_results)
return {
"results": results[:max_results],
"count": len(results[:max_results])
}
elif tool_name == "search_news":
query = tool_input["query"]
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results)
return {
"results": results[:max_results],
"count": len(results[:max_results])
}
# ============ OPTIMIZED PROSPECT DISCOVERY WITH CONTACTS ============
elif tool_name == "discover_prospects_with_contacts":
from services.enhanced_contact_finder import EnhancedContactFinder
from urllib.parse import urlparse
client_company = tool_input["client_company"]
client_industry = tool_input["client_industry"]
target_prospects = tool_input.get("target_prospects", 3)
target_titles = tool_input.get("target_titles", ["CEO", "Founder", "VP Sales", "CTO", "Head of Sales"])
logger.info(f"Discovering {target_prospects} prospects with contacts for {client_company}")
print(f"\n[PROSPECT DISCOVERY] ========================================")
print(f"[PROSPECT DISCOVERY] Finding {target_prospects} prospects WITH verified contacts")
print(f"[PROSPECT DISCOVERY] Client: {client_company}")
print(f"[PROSPECT DISCOVERY] ========================================")
contact_finder = EnhancedContactFinder(mcp_registry=self.mcp_registry)
saved_prospects = []
all_contacts = []
skipped_companies = []
companies_checked = 0
max_companies_to_check = target_prospects * 8 # Check more companies to find enough with contacts
# Build smart search queries based on what the client company does
# The goal is to find CUSTOMERS for the client, not articles ABOUT the client
client_lower = client_company.lower()
industry_lower = client_industry.lower()
# Determine prospect type based on client business
# E-commerce platforms (Shopify, BigCommerce, etc.) -> retailers, DTC brands
# CRM software -> B2B companies, sales teams
# Marketing tools -> businesses needing marketing
# etc.
search_queries = []
# Check for e-commerce/retail platform clients
if any(kw in client_lower or kw in industry_lower for kw in ['ecommerce', 'e-commerce', 'shopify', 'online store', 'retail platform', 'shopping cart']):
search_queries = [
"DTC brands fashion apparel company",
"online boutique store founder CEO",
"independent retail brand ecommerce",
"emerging consumer brands direct to consumer",
"small business online store owner",
"handmade crafts seller business",
"subscription box company founder",
]
# Check for CRM/Sales software clients
elif any(kw in client_lower or kw in industry_lower for kw in ['crm', 'salesforce', 'sales software', 'customer relationship']):
search_queries = [
"B2B SaaS company sales team",
"growing startup sales operations",
"enterprise software company VP Sales",
"technology company Head of Sales",
]
# Check for marketing/advertising clients
elif any(kw in client_lower or kw in industry_lower for kw in ['marketing', 'advertising', 'ads', 'seo', 'content']):
search_queries = [
"growing startup marketing director",
"ecommerce brand marketing team",
"B2B company CMO marketing",
"technology startup growth marketing",
]
# Default: find growing companies that might need the client's services
else:
search_queries = [
f"growing companies {industry_lower} customers list",
f"startups using {industry_lower} solutions",
f"businesses {industry_lower} case study customer",
f"companies similar to {client_company} customers",
"fast growing startups Series A B2B",
"emerging technology companies founder CEO",
"mid-market companies digital transformation",
]
# Add generic business-finding queries
search_queries.extend([
"Inc 5000 fastest growing companies",
"emerging brands startup founders",
"venture backed startups series A",
])
seen_domains = set()
# Skip domains that are NOT actual company websites
skip_domains = [
# Social media
'linkedin.com', 'facebook.com', 'twitter.com', 'instagram.com', 'tiktok.com',
# Reference/directory sites
'wikipedia.org', 'crunchbase.com', 'zoominfo.com', 'apollo.io', 'yelp.com',
'glassdoor.com', 'g2.com', 'capterra.com', 'trustpilot.com', 'bbb.org',
# News/media sites
'forbes.com', 'businessinsider.com', 'techcrunch.com', 'bloomberg.com',
'cnbc.com', 'reuters.com', 'wsj.com', 'nytimes.com', 'theverge.com',
'wired.com', 'mashable.com', 'venturebeat.com', 'inc.com', 'entrepreneur.com',
# Blog/article/review sites
'medium.com', 'hubspot.com', 'blog.', 'wordpress.com', 'blogspot.com',
'quora.com', 'reddit.com', 'youtube.com', 'vimeo.com',
# Generic/aggregator sites
'amazon.com', 'ebay.com', 'alibaba.com', 'aliexpress.com',
'google.com', 'bing.com', 'yahoo.com', 'duckduckgo.com',
# The client company itself (don't prospect yourself!)
client_company.lower().replace(' ', '') + '.com',
]
# Also skip titles that look like articles, not company names
skip_title_patterns = [
'what is', 'how to', 'guide', 'review', 'best ', 'top ', 'vs ',
' vs ', 'comparison', 'tutorial', 'tips', 'ways to', 'complete',
'everything you need', 'beginner', 'introduction', 'explained',
'2024', '2025', '2023', '[', ']', 'list of', 'examples'
]
for query in search_queries:
if len(saved_prospects) >= target_prospects:
break
try:
print(f"\n[PROSPECT DISCOVERY] Searching: {query}")
results = await self.mcp_registry.search.query(query, max_results=10)
for result in results:
if len(saved_prospects) >= target_prospects:
break
if companies_checked >= max_companies_to_check:
break
url = result.get('url', '')
title = result.get('title', '')
# Extract domain from URL
try:
parsed = urlparse(url)
domain = parsed.netloc.replace('www.', '')
if not domain or domain in seen_domains:
continue
seen_domains.add(domain)
except:
continue
# Skip non-company domains
if any(skip in domain.lower() for skip in skip_domains):
print(f"[PROSPECT DISCOVERY] ⏭️ Skipping non-company domain: {domain}")
continue
# Skip titles that look like articles, not companies
title_lower = title.lower()
if any(pattern in title_lower for pattern in skip_title_patterns):
print(f"[PROSPECT DISCOVERY] ⏭️ Skipping article title: {title[:50]}...")
continue
# Extract company name from title - be smarter about it
# Try to get actual company name, not article title
company_name = title.split(' - ')[0].split(' | ')[0].split(':')[0].strip()
# If company name is too long (probably article title), use domain
if len(company_name) > 40 or ' ' in company_name and len(company_name.split()) > 5:
# Extract company name from domain instead
company_name = domain.split('.')[0].replace('-', ' ').title()
if not company_name or len(company_name) < 2:
continue
companies_checked += 1
print(f"\n[PROSPECT DISCOVERY] Checking ({companies_checked}/{max_companies_to_check}): {company_name} ({domain})")
# Find contacts for this company
try:
contacts = await contact_finder.find_real_contacts(
company_name=company_name,
domain=domain,
target_titles=target_titles,
max_contacts=3
)
if contacts and len(contacts) > 0:
# Save prospect
prospect_id = f"prospect_{len(saved_prospects) + 1}"
company_id = domain.replace(".", "_")
prospect_data = {
"id": prospect_id,
"company": {
"id": company_id,
"name": company_name,
"domain": domain
},
"fit_score": 75,
"status": "new",
"metadata": {"source": "automated_discovery"}
}
await self.mcp_registry.store.save_prospect(prospect_data)
# Save contacts
contact_list = []
for contact in contacts:
contact_data = {
"id": contact.id,
"name": contact.name,
"email": contact.email,
"title": contact.title,
"company": company_name,
"domain": domain,
"verified": True,
"source": "web_search_and_scraping"
}
contact_list.append(contact_data)
all_contacts.append(contact_data)
await self.mcp_registry.store.save_contact({
"id": contact.id,
"company_id": company_id,
"email": contact.email,
"first_name": contact.name.split()[0] if contact.name else "",
"last_name": contact.name.split()[-1] if len(contact.name.split()) > 1 else "",
"title": contact.title
})
saved_prospects.append({
"prospect_id": prospect_id,
"company_name": company_name,
"domain": domain,
"contacts": contact_list,
"contact_count": len(contact_list)
})
print(f"[PROSPECT DISCOVERY] ✅ SAVED: {company_name} with {len(contacts)} contacts")
else:
skipped_companies.append({"name": company_name, "domain": domain, "reason": "no_contacts"})
print(f"[PROSPECT DISCOVERY] ⏭️ SKIPPED: {company_name} (no verified contacts)")
except Exception as e:
logger.debug(f"Error checking {company_name}: {str(e)}")
skipped_companies.append({"name": company_name, "domain": domain, "reason": str(e)})
continue
except Exception as e:
logger.debug(f"Search error: {str(e)}")
continue
print(f"\n[PROSPECT DISCOVERY] ========================================")
print(f"[PROSPECT DISCOVERY] DISCOVERY COMPLETE")
print(f"[PROSPECT DISCOVERY] ========================================")
print(f"[PROSPECT DISCOVERY] Prospects saved: {len(saved_prospects)}/{target_prospects}")
print(f"[PROSPECT DISCOVERY] Total contacts: {len(all_contacts)}")
print(f"[PROSPECT DISCOVERY] Companies checked: {companies_checked}")
print(f"[PROSPECT DISCOVERY] Companies skipped: {len(skipped_companies)}")
print(f"[PROSPECT DISCOVERY] ========================================\n")
return {
"status": "success" if len(saved_prospects) > 0 else "no_prospects_found",
"prospects": saved_prospects,
"prospects_count": len(saved_prospects),
"contacts_count": len(all_contacts),
"companies_checked": companies_checked,
"companies_skipped": len(skipped_companies),
"target_met": len(saved_prospects) >= target_prospects,
"message": f"Found {len(saved_prospects)} prospects with {len(all_contacts)} verified contacts. Checked {companies_checked} companies, skipped {len(skipped_companies)} (no contacts)."
}
# ============ VERIFIED CONTACT FINDER (Single Company) ============
elif tool_name == "find_verified_contacts":
from services.enhanced_contact_finder import EnhancedContactFinder
company_name = tool_input["company_name"]
company_domain = tool_input["company_domain"]
target_titles = tool_input.get("target_titles", ["CEO", "Founder", "VP Sales", "CTO", "Head of Sales"])
max_contacts = tool_input.get("max_contacts", 3)
logger.info(f"Finding verified contacts for {company_name} ({company_domain})")
contact_finder = EnhancedContactFinder(mcp_registry=self.mcp_registry)
try:
contacts = await contact_finder.find_real_contacts(
company_name=company_name,
domain=company_domain,
target_titles=target_titles,
max_contacts=max_contacts
)
contact_list = []
for contact in contacts:
contact_data = {
"id": contact.id,
"name": contact.name,
"email": contact.email,
"title": contact.title,
"company": company_name,
"domain": company_domain,
"verified": True,
"source": "web_search_and_scraping"
}
contact_list.append(contact_data)
await self.mcp_registry.store.save_contact({
"id": contact.id,
"company_id": company_domain.replace(".", "_"),
"email": contact.email,
"first_name": contact.name.split()[0] if contact.name else "",
"last_name": contact.name.split()[-1] if contact.name and len(contact.name.split()) > 1 else "",
"title": contact.title
})
if contact_list:
return {
"status": "success",
"contacts": contact_list,
"count": len(contact_list),
"message": f"Found {len(contact_list)} verified contacts at {company_name}",
"should_save_prospect": True
}
else:
return {
"status": "no_contacts_found",
"contacts": [],
"count": 0,
"message": f"No verified contacts found for {company_name}. Skip this prospect.",
"should_save_prospect": False
}
except Exception as e:
logger.error(f"Error finding contacts for {company_name}: {str(e)}")
return {
"status": "error",
"contacts": [],
"count": 0,
"message": f"Error searching for contacts: {str(e)}",
"should_save_prospect": False
}
# ============ STORE MCP SERVER ============
elif tool_name == "save_prospect":
prospect_data = {
"id": tool_input.get("prospect_id", str(uuid.uuid4())),
"company": {
"id": tool_input.get("company_id"),
"name": tool_input.get("company_name"),
"domain": tool_input.get("company_domain")
},
"fit_score": tool_input.get("fit_score", 0),
"status": tool_input.get("status", "new"),
"metadata": tool_input.get("metadata", {})
}
result = await self.mcp_registry.store.save_prospect(prospect_data)
return {"status": result, "prospect_id": prospect_data["id"]}
elif tool_name == "get_prospect":
prospect_id = tool_input["prospect_id"]
prospect = await self.mcp_registry.store.get_prospect(prospect_id)
return prospect or {"error": "Prospect not found"}
elif tool_name == "list_prospects":
prospects = await self.mcp_registry.store.list_prospects()
status_filter = tool_input.get("status")
if status_filter:
prospects = [p for p in prospects if p.get("status") == status_filter]
return {
"prospects": prospects,
"count": len(prospects)
}
elif tool_name == "save_company":
company_data = {
"id": tool_input.get("company_id", str(uuid.uuid4())),
"name": tool_input["name"],
"domain": tool_input["domain"],
"industry": tool_input.get("industry"),
"description": tool_input.get("description"),
"employee_count": tool_input.get("employee_count")
}
result = await self.mcp_registry.store.save_company(company_data)
return {"status": result, "company_id": company_data["id"]}
elif tool_name == "get_company":
company_id = tool_input["company_id"]
company = await self.mcp_registry.store.get_company(company_id)
return company or {"error": "Company not found"}
elif tool_name == "save_fact":
fact_data = {
"id": tool_input.get("fact_id", str(uuid.uuid4())),
"company_id": tool_input["company_id"],
"fact_type": tool_input["fact_type"],
"content": tool_input["content"],
"source_url": tool_input.get("source_url"),
"confidence_score": tool_input.get("confidence_score", 0.8)
}
result = await self.mcp_registry.store.save_fact(fact_data)
return {"status": result, "fact_id": fact_data["id"]}
elif tool_name == "save_contact":
contact_data = {
"id": tool_input.get("contact_id", str(uuid.uuid4())),
"company_id": tool_input["company_id"],
"email": tool_input["email"],
"first_name": tool_input.get("first_name"),
"last_name": tool_input.get("last_name"),
"title": tool_input.get("title"),
"seniority": tool_input.get("seniority")
}
result = await self.mcp_registry.store.save_contact(contact_data)
return {"status": result, "contact_id": contact_data["id"]}
elif tool_name == "list_contacts_by_domain":
domain = tool_input["domain"]
contacts = await self.mcp_registry.store.list_contacts_by_domain(domain)
return {
"contacts": contacts,
"count": len(contacts)
}
elif tool_name == "check_suppression":
supp_type = tool_input["suppression_type"]
value = tool_input["value"]
is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value)
return {
"suppressed": is_suppressed,
"value": value,
"type": supp_type
}
# ============ EMAIL MCP SERVER ============
elif tool_name == "send_email":
to = tool_input["to"]
subject = tool_input["subject"]
body = tool_input["body"]
prospect_id = tool_input["prospect_id"]
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id)
return {
"status": "sent",
"thread_id": thread_id,
"to": to
}
elif tool_name == "get_email_thread":
prospect_id = tool_input["prospect_id"]
thread = await self.mcp_registry.email.get_thread(prospect_id)
return thread or {"error": "No email thread found"}
# ============ CALENDAR MCP SERVER ============
elif tool_name == "suggest_meeting_slots":
num_slots = tool_input.get("num_slots", 3)
slots = await self.mcp_registry.calendar.suggest_slots()
return {
"slots": slots[:num_slots],
"count": len(slots[:num_slots])
}
elif tool_name == "generate_calendar_invite":
start_time = tool_input["start_time"]
end_time = tool_input["end_time"]
title = tool_input["title"]
slot = {
"start_iso": start_time,
"end_iso": end_time,
"title": title
}
ics = await self.mcp_registry.calendar.generate_ics(slot)
return {
"ics_content": ics,
"meeting": slot
}
else:
raise ValueError(f"Unknown MCP tool: {tool_name}")