File size: 55,110 Bytes
8bab08d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 |
"""
Autonomous AI Agent with MCP Tool Calling using HuggingFace Inference Providers
This agent uses HuggingFace's Inference Providers API with native tool calling
support to autonomously decide which MCP tools to call.
Benefits:
- Uses HuggingFace unified API (single HF token for all providers)
- Native tool calling support (OpenAI-compatible API)
- Multiple providers: Nebius, Together, Sambanova, etc.
- Models like Qwen2.5-72B-Instruct with strong tool calling
- Free tier available with HuggingFace account
"""
import os
import json
import uuid
import logging
import asyncio
from typing import List, Dict, Any, AsyncGenerator
from mcp.tools.definitions import MCP_TOOLS, list_all_tools
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
# Free models available via HuggingFace Serverless Inference API
# These don't require paid provider credits
FREE_MODELS = [
"mistralai/Mistral-7B-Instruct-v0.3", # Fast, good quality
"microsoft/Phi-3-mini-4k-instruct", # Small, fast
"HuggingFaceH4/zephyr-7b-beta", # Good for chat
"meta-llama/Llama-3.2-3B-Instruct", # Meta's small model
"Qwen/Qwen2.5-3B-Instruct", # Qwen small
]
# Paid provider models (require credits)
QWEN3_MODELS = [
"Qwen/Qwen3-32B",
"Qwen/Qwen3-8B",
"Qwen/Qwen3-4B",
]
# HuggingFace Inference Providers
HF_PROVIDERS = {
"nscale": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"}, # nscale provider
"nebius": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"},
"together": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"},
"sambanova": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-8B"},
"fireworks-ai": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-8B"},
"cerebras": {"models": ["Qwen/Qwen3-32B"], "default": "Qwen/Qwen3-32B"},
}
# Default to FREE serverless API (no provider = serverless)
DEFAULT_PROVIDER = "hf-inference" # Special value for free serverless
DEFAULT_MODEL = "mistralai/Mistral-7B-Instruct-v0.3"
class AutonomousMCPAgentHF:
"""
AI Agent that autonomously uses MCP servers as tools using HuggingFace Inference Providers.
Uses native tool calling (OpenAI-compatible) for reliable tool execution.
HuggingFace routes requests to inference providers like Nebius, Together, etc.
"""
def __init__(
self,
mcp_registry: MCPRegistry,
hf_token: str = None,
provider: str = None,
model: str = None
):
"""
Initialize the autonomous agent with HuggingFace Inference Providers
Args:
mcp_registry: MCP registry with all servers
hf_token: HuggingFace token (get at huggingface.co/settings/tokens)
provider: Inference provider (nebius, together, sambanova, etc.)
model: Model to use (default: Qwen/Qwen2.5-72B-Instruct)
"""
self.mcp_registry = mcp_registry
self.hf_token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")
self.model = model or os.getenv("HF_MODEL") or DEFAULT_MODEL
# Use provider in this order: passed param > env var > auto-detect
if provider:
self.provider = provider
elif os.getenv("HF_PROVIDER"):
self.provider = os.getenv("HF_PROVIDER")
elif self.model in QWEN3_MODELS or self.model.startswith("Qwen/Qwen3"):
# Qwen3 models need a provider (use nscale by default)
self.provider = "nscale"
else:
self.provider = DEFAULT_PROVIDER
if not self.hf_token:
raise ValueError(
"HF_TOKEN is required!\n"
"Get a token at: https://huggingface.co/settings/tokens\n"
"Then set: export HF_TOKEN=hf_your_token_here"
)
# Initialize HuggingFace InferenceClient
try:
from huggingface_hub import InferenceClient
# For serverless API (hf-inference), don't pass provider
if self.provider == "hf-inference":
self.client = InferenceClient(token=self.hf_token)
else:
self.client = InferenceClient(
provider=self.provider,
token=self.hf_token
)
logger.info(f"HuggingFace InferenceClient initialized")
logger.info(f" Provider: {self.provider}")
logger.info(f" Model: {self.model}")
except ImportError:
raise ImportError(
"huggingface_hub package not installed or outdated!\n"
"Install/upgrade with: pip install --upgrade huggingface_hub"
)
# Create tool definitions in OpenAI/HF format
self.tools = self._create_tool_definitions()
logger.info(f"Autonomous MCP Agent initialized with HuggingFace ({self.provider})")
logger.info(f"Available tools: {len(self.tools)}")
def _create_tool_definitions(self) -> List[Dict[str, Any]]:
"""Convert MCP tool definitions to OpenAI/HuggingFace function calling format"""
tools = []
for mcp_tool in MCP_TOOLS:
tool = {
"type": "function",
"function": {
"name": mcp_tool["name"],
"description": mcp_tool["description"],
"parameters": mcp_tool["input_schema"]
}
}
tools.append(tool)
return tools
async def run(
self,
task: str,
max_iterations: int = 15
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Run the agent autonomously on a task using native tool calling.
Args:
task: The task to complete
max_iterations: Maximum tool calls to prevent infinite loops
Yields:
Events showing agent's progress and tool calls
"""
yield {
"type": "agent_start",
"message": f"Autonomous AI Agent (HuggingFace) starting task",
"task": task,
"model": self.model,
"provider": self.provider
}
# System prompt for the agent
system_prompt = """You are an autonomous AI agent for B2B sales automation.
You have access to MCP tools including:
- search_web: Search the web for company information
- find_verified_contacts: Find REAL decision-makers (searches LinkedIn, company websites, directories)
- save_prospect: Save a prospect company to the database
- send_email: Draft outreach emails
CRITICAL RULE: Only save prospects that have verified contacts. No contacts = don't save.
REQUIRED WORKFLOW:
1. search_web to find potential prospect companies
2. find_verified_contacts FIRST to check if contacts exist
3. IF contacts found (count > 0): save_prospect, then send_email
4. IF no contacts found (count = 0): SKIP this company, try the next one
TOOL CALL FORMAT - output valid JSON:
Step 1 - Find contacts FIRST:
{"company_name": "Acme Corp", "company_domain": "acme.com", "target_titles": ["CEO", "Founder", "VP Sales", "CTO"], "max_contacts": 3}
Step 2 - ONLY if contacts found, save prospect:
{"prospect_id": "prospect_1", "company_id": "company_1", "company_name": "Acme Corp", "company_domain": "acme.com", "fit_score": 85}
The find_verified_contacts tool searches:
- Company website (team/about pages)
- LinkedIn profiles
- Crunchbase, ZoomInfo, directories
- Press releases and news
- Social media profiles
IMPORTANT:
- A prospect without contacts is USELESS - don't save it
- NEVER invent contact names or emails
- Keep searching until you find prospects WITH verified contacts
After completing, summarize:
- Prospects saved (with contacts)
- Companies skipped (no contacts)"""
# Initialize conversation
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
]
iteration = 0
while iteration < max_iterations:
iteration += 1
yield {
"type": "iteration_start",
"iteration": iteration,
"message": f"Iteration {iteration}: AI reasoning..."
}
try:
# Call HuggingFace Inference API with tools
logger.info(f"Calling HuggingFace API (iteration {iteration})...")
logger.info(f" Provider: {self.provider}, Model: {self.model}")
# Run synchronous API call in executor
response = await asyncio.get_event_loop().run_in_executor(
None,
self._call_inference_api,
messages
)
# Handle response
if response is None:
yield {
"type": "agent_error",
"error": "Empty response from API",
"message": "API returned empty response"
}
break
# Get the assistant message
assistant_message = response.choices[0].message
# Check if AI wants to call tools
if hasattr(assistant_message, 'tool_calls') and assistant_message.tool_calls:
# Process each tool call
tool_results = []
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
try:
tool_input = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
tool_input = {}
yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_input,
"message": f"Action: {tool_name}"
}
# Execute the MCP tool
try:
result = await self._execute_mcp_tool(tool_name, tool_input)
yield {
"type": "tool_result",
"tool": tool_name,
"result": result,
"message": f"Tool {tool_name} completed"
}
tool_results.append({
"tool_call_id": tool_call.id,
"role": "tool",
"content": json.dumps(result, default=str)
})
except Exception as e:
error_msg = str(e)
logger.error(f"Tool execution failed: {tool_name} - {error_msg}")
yield {
"type": "tool_error",
"tool": tool_name,
"error": error_msg,
"message": f"Tool {tool_name} failed: {error_msg}"
}
tool_results.append({
"tool_call_id": tool_call.id,
"role": "tool",
"content": json.dumps({"error": error_msg})
})
# Add assistant message and tool results to conversation
messages.append({
"role": "assistant",
"content": assistant_message.content or "",
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in assistant_message.tool_calls
]
})
messages.extend(tool_results)
else:
# No tool calls - AI is done or providing response
final_content = assistant_message.content or ""
raw_content = getattr(assistant_message, 'raw_content', final_content)
# Log for debugging
logger.info(f"Iteration {iteration}: No tool calls")
logger.info(f" Raw content length: {len(raw_content)}")
logger.info(f" Stripped content length: {len(final_content)}")
if raw_content and not final_content:
logger.info(f" Raw content preview: {raw_content[:200]}...")
# Always yield thought event if we have ANY content (for tracking)
if final_content:
yield {
"type": "thought",
"thought": final_content,
"message": f"AI Response: {final_content[:100]}..." if len(final_content) > 100 else f"AI Response: {final_content}"
}
elif raw_content:
# Content was stripped but raw exists - yield a minimal thought
yield {
"type": "thought",
"thought": f"[Processing: {len(raw_content)} chars of reasoning]",
"message": "AI is reasoning..."
}
# Check if this looks like a final answer (after at least one iteration)
if iteration > 1:
# Ensure we have some content for final answer
if not final_content and raw_content:
# Try to extract something useful from raw thinking
import re
think_match = re.search(r'<think>(.*?)</think>', raw_content, flags=re.DOTALL)
if think_match:
think_text = think_match.group(1).strip()
# Get last meaningful portion
sentences = [s.strip() for s in think_text.split('.') if len(s.strip()) > 20]
if sentences:
final_content = '. '.join(sentences[-5:]) + '.'
logger.info(f"Extracted final answer from thinking: {final_content[:100]}...")
yield {
"type": "agent_complete",
"message": "Task complete!",
"final_answer": final_content,
"iterations": iteration
}
break
# Add response to messages and continue
messages.append({
"role": "assistant",
"content": final_content or raw_content[:500] if raw_content else ""
})
except Exception as e:
error_msg = str(e)
logger.error(f"HuggingFace API error: {error_msg}", exc_info=True)
# Check for common errors
if "401" in error_msg or "unauthorized" in error_msg.lower():
yield {
"type": "agent_error",
"error": "Invalid HF_TOKEN",
"message": "Authentication failed. Please check your HF_TOKEN."
}
elif "rate" in error_msg.lower() or "limit" in error_msg.lower():
yield {
"type": "agent_error",
"error": "Rate limit reached",
"message": "Rate limit reached. Try again later or upgrade to HF PRO."
}
else:
yield {
"type": "agent_error",
"error": error_msg,
"message": f"API error: {error_msg}"
}
break
if iteration >= max_iterations:
yield {
"type": "agent_max_iterations",
"message": f"Reached maximum iterations ({max_iterations})",
"iterations": iteration
}
def _call_inference_api(self, messages: List[Dict], retry_count: int = 0) -> Any:
"""
Call HuggingFace Inference API via the new router endpoint.
Uses the configured provider (e.g., nscale for Qwen3-32B).
"""
import requests
headers = {
"Authorization": f"Bearer {self.hf_token}",
"Content-Type": "application/json"
}
last_error = None
# Add provider header if using a specific provider
if self.provider and self.provider != "hf-inference":
headers["X-HF-Provider"] = self.provider
# Use the router endpoint for chat completions
api_url = "https://router.huggingface.co/v1/chat/completions"
# Try the configured model first
try:
logger.info(f"Trying primary model: {self.model} via {self.provider}")
payload = {
"model": self.model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7,
"stream": False,
"tools": self.tools, # Include tool definitions!
"tool_choice": "auto" # Let model decide when to use tools
}
response = requests.post(api_url, headers=headers, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
logger.info(f"Success with {self.model} via {self.provider}")
return self._create_chat_response(result)
elif response.status_code == 402:
logger.warning(f"Payment required for {self.model} via {self.provider}. Falling back...")
last_error = "Payment required - exceeded monthly credits"
elif response.status_code == 404:
logger.warning(f"Model {self.model} not found via {self.provider}. Falling back...")
last_error = f"Model not found via {self.provider}"
else:
logger.warning(f"Model {self.model} returned {response.status_code}: {response.text[:200]}")
last_error = f"HTTP {response.status_code}"
except Exception as e:
last_error = str(e)
logger.warning(f"Primary model failed: {last_error}")
# Fallback models with their providers
fallback_models = [
("Qwen/Qwen2.5-72B-Instruct", None), # No provider = serverless
("meta-llama/Llama-3.1-70B-Instruct", None),
("mistralai/Mixtral-8x7B-Instruct-v0.1", None),
("Qwen/Qwen3-32B", "nebius"), # Try nebius as backup
("Qwen/Qwen3-8B", "together"), # Try together as backup
]
for model, provider in fallback_models:
try:
logger.info(f"Trying fallback model: {model}" + (f" via {provider}" if provider else ""))
payload = {
"model": model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7,
"stream": False,
"tools": self.tools, # Include tool definitions!
"tool_choice": "auto"
}
# Set headers for this fallback
fallback_headers = {
"Authorization": f"Bearer {self.hf_token}",
"Content-Type": "application/json"
}
if provider:
fallback_headers["X-HF-Provider"] = provider
response = requests.post(api_url, headers=fallback_headers, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
logger.info(f"Success with fallback model: {model}")
return self._create_chat_response(result)
elif response.status_code in [402, 404]:
logger.warning(f"Model {model} returned {response.status_code}, trying next...")
continue
elif response.status_code == 503:
logger.info(f"Model {model} is loading, trying next...")
continue
else:
logger.warning(f"Model {model} returned {response.status_code}")
continue
except Exception as e:
last_error = str(e)
logger.warning(f"Model {model} failed: {str(e)[:100]}")
continue
logger.error(f"All models failed. Last error: {last_error}")
raise Exception(f"All inference attempts failed: {last_error}")
def _strip_thinking_tags(self, text: str) -> str:
"""Remove Qwen3's <think>...</think> tags and return the actual response"""
import re
if not text:
return ""
# Remove <think>...</think> blocks (Qwen3 chain-of-thought)
cleaned = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
result = cleaned.strip()
# If stripped content is empty but original had thinking, extract a summary
if not result and '<think>' in text:
# Try to extract the last meaningful sentence from thinking as a fallback
think_match = re.search(r'<think>(.*?)</think>', text, flags=re.DOTALL)
if think_match:
think_content = think_match.group(1).strip()
# Get last few sentences as summary (model's conclusion)
sentences = [s.strip() for s in think_content.split('.') if s.strip()]
if sentences:
# Return last 2-3 meaningful sentences as the response
result = '. '.join(sentences[-3:]) + '.'
logger.info(f"Extracted thinking summary: {result[:100]}...")
return result
def _create_chat_response(self, result: dict) -> Any:
"""Create a response object from chat completion result"""
strip_thinking = self._strip_thinking_tags
class MockChoice:
def __init__(self, message_data):
self.message = MockMessage(message_data)
class MockMessage:
def __init__(self, data):
# Handle None content properly (API might return {"content": null})
raw_content = data.get("content") or ""
# Strip Qwen3 thinking tags to get actual response
self.content = strip_thinking(raw_content)
# Store raw content for debugging/fallback
self.raw_content = raw_content
self.tool_calls = self._parse_tool_calls_from_response(data, raw_content)
def _parse_tool_calls_from_response(self, data, raw_content):
"""Parse tool calls from API response or from content"""
# Check if API returned tool_calls directly
if "tool_calls" in data and data["tool_calls"]:
return [MockToolCall(tc) for tc in data["tool_calls"]]
# Otherwise try to parse from content (use raw content to find tool calls)
return self._parse_tool_calls_from_text(raw_content)
def _infer_tool_from_params(self, params):
"""Infer tool name from parameter keys"""
if not isinstance(params, dict):
return None
keys = set(params.keys())
# Check for discover_prospects_with_contacts (HIGHEST PRIORITY - all-in-one tool)
if "client_company" in keys and "client_industry" in keys:
return "discover_prospects_with_contacts"
if "client_company" in keys and "target_prospects" in keys:
return "discover_prospects_with_contacts"
# Check for find_verified_contacts patterns (single company)
if "company_name" in keys and "company_domain" in keys and "target_titles" in keys:
return "find_verified_contacts"
if "company_name" in keys and "company_domain" in keys and "max_contacts" in keys:
return "find_verified_contacts"
# Check for save_prospect patterns
if "prospect_id" in keys or ("company_name" in keys and "fit_score" in keys):
return "save_prospect"
# Check for save_company patterns
if "company_id" in keys and ("name" in keys or "domain" in keys) and "prospect_id" not in keys:
return "save_company"
# Check for save_contact patterns (only for contacts returned by find_verified_contacts)
if "contact_id" in keys or ("email" in keys and ("first_name" in keys or "last_name" in keys)):
return "save_contact"
# Check for send_email patterns
if "to" in keys and "subject" in keys and "body" in keys:
return "send_email"
# Check for search patterns
if "query" in keys and len(keys) <= 2:
return "search_web"
# Check for save_fact patterns
if "fact_type" in keys or ("content" in keys and "company_id" in keys):
return "save_fact"
return None
def _parse_tool_calls_from_text(self, text):
"""Try to parse tool calls from text response - handles Qwen3 text-based tool descriptions"""
import re
tool_calls = []
def extract_json_objects(text):
"""Extract all JSON objects from text, handling nested braces"""
objects = []
i = 0
while i < len(text):
if text[i] == '{':
start = i
depth = 1
i += 1
while i < len(text) and depth > 0:
if text[i] == '{':
depth += 1
elif text[i] == '}':
depth -= 1
i += 1
if depth == 0:
try:
obj = json.loads(text[start:i])
objects.append(obj)
except:
pass
else:
i += 1
return objects
# IMPORTANT: Search BOTH raw text AND stripped text for JSON objects
# Qwen3 may put tool calls inside <think> tags
all_json_objects = extract_json_objects(text) # Search raw first
# Also search stripped version in case JSON is outside think tags
text_clean = strip_thinking(text)
if text_clean != text:
all_json_objects.extend(extract_json_objects(text_clean))
logger.info(f"Found {len(all_json_objects)} JSON objects in response")
# Process each JSON object and infer tool
seen_signatures = set() # Avoid duplicates
for obj in all_json_objects:
tool_name = self._infer_tool_from_params(obj)
if tool_name:
# Create a signature to avoid duplicates
sig = f"{tool_name}:{json.dumps(obj, sort_keys=True)}"
if sig not in seen_signatures:
seen_signatures.add(sig)
tool_calls.append(MockToolCallFromText({"tool": tool_name, "parameters": obj}))
logger.info(f"Parsed tool call: {tool_name} with params: {list(obj.keys())}")
# Also check code fence blocks (sometimes JSON is formatted there)
code_blocks = re.findall(r'```(?:json)?\s*(.+?)\s*```', text_clean, re.DOTALL)
for block in code_blocks:
block_objects = extract_json_objects(block)
for obj in block_objects:
tool_name = self._infer_tool_from_params(obj)
if tool_name:
sig = f"{tool_name}:{json.dumps(obj, sort_keys=True)}"
if sig not in seen_signatures:
seen_signatures.add(sig)
tool_calls.append(MockToolCallFromText({"tool": tool_name, "parameters": obj}))
logger.info(f"Parsed tool from code block: {tool_name}")
if tool_calls:
logger.info(f"Total tool calls parsed from text: {len(tool_calls)}")
return tool_calls if tool_calls else None
class MockToolCall:
def __init__(self, data):
self.function = MockFunction(data.get("function", {}))
self.id = data.get("id", f"call_{id(self)}")
class MockToolCallFromText:
def __init__(self, data):
self.function = MockFunctionFromText(data)
self.id = f"call_{id(self)}"
class MockFunction:
def __init__(self, data):
self.name = data.get("name", "")
self.arguments = data.get("arguments", "{}")
class MockFunctionFromText:
def __init__(self, data):
self.name = data.get("tool", data.get("name", ""))
self.arguments = json.dumps(data.get("parameters", data.get("arguments", {})))
class MockResponse:
def __init__(self, result):
choices_data = result.get("choices", [])
if choices_data:
self.choices = [MockChoice(c.get("message", {})) for c in choices_data]
else:
self.choices = []
return MockResponse(result)
async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
"""
Execute an MCP tool by routing to the appropriate MCP server.
This is where we actually call the MCP servers!
"""
# ============ SEARCH MCP SERVER ============
if tool_name == "search_web":
query = tool_input["query"]
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(query, max_results=max_results)
return {
"results": results[:max_results],
"count": len(results[:max_results])
}
elif tool_name == "search_news":
query = tool_input["query"]
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results)
return {
"results": results[:max_results],
"count": len(results[:max_results])
}
# ============ OPTIMIZED PROSPECT DISCOVERY WITH CONTACTS ============
elif tool_name == "discover_prospects_with_contacts":
from services.enhanced_contact_finder import EnhancedContactFinder
from urllib.parse import urlparse
client_company = tool_input["client_company"]
client_industry = tool_input["client_industry"]
target_prospects = tool_input.get("target_prospects", 3)
target_titles = tool_input.get("target_titles", ["CEO", "Founder", "VP Sales", "CTO", "Head of Sales"])
logger.info(f"Discovering {target_prospects} prospects with contacts for {client_company}")
print(f"\n[PROSPECT DISCOVERY] ========================================")
print(f"[PROSPECT DISCOVERY] Finding {target_prospects} prospects WITH verified contacts")
print(f"[PROSPECT DISCOVERY] Client: {client_company}")
print(f"[PROSPECT DISCOVERY] ========================================")
contact_finder = EnhancedContactFinder(mcp_registry=self.mcp_registry)
saved_prospects = []
all_contacts = []
skipped_companies = []
companies_checked = 0
max_companies_to_check = target_prospects * 8 # Check more companies to find enough with contacts
# Build smart search queries based on what the client company does
# The goal is to find CUSTOMERS for the client, not articles ABOUT the client
client_lower = client_company.lower()
industry_lower = client_industry.lower()
# Determine prospect type based on client business
# E-commerce platforms (Shopify, BigCommerce, etc.) -> retailers, DTC brands
# CRM software -> B2B companies, sales teams
# Marketing tools -> businesses needing marketing
# etc.
search_queries = []
# Check for e-commerce/retail platform clients
if any(kw in client_lower or kw in industry_lower for kw in ['ecommerce', 'e-commerce', 'shopify', 'online store', 'retail platform', 'shopping cart']):
search_queries = [
"DTC brands fashion apparel company",
"online boutique store founder CEO",
"independent retail brand ecommerce",
"emerging consumer brands direct to consumer",
"small business online store owner",
"handmade crafts seller business",
"subscription box company founder",
]
# Check for CRM/Sales software clients
elif any(kw in client_lower or kw in industry_lower for kw in ['crm', 'salesforce', 'sales software', 'customer relationship']):
search_queries = [
"B2B SaaS company sales team",
"growing startup sales operations",
"enterprise software company VP Sales",
"technology company Head of Sales",
]
# Check for marketing/advertising clients
elif any(kw in client_lower or kw in industry_lower for kw in ['marketing', 'advertising', 'ads', 'seo', 'content']):
search_queries = [
"growing startup marketing director",
"ecommerce brand marketing team",
"B2B company CMO marketing",
"technology startup growth marketing",
]
# Default: find growing companies that might need the client's services
else:
search_queries = [
f"growing companies {industry_lower} customers list",
f"startups using {industry_lower} solutions",
f"businesses {industry_lower} case study customer",
f"companies similar to {client_company} customers",
"fast growing startups Series A B2B",
"emerging technology companies founder CEO",
"mid-market companies digital transformation",
]
# Add generic business-finding queries
search_queries.extend([
"Inc 5000 fastest growing companies",
"emerging brands startup founders",
"venture backed startups series A",
])
seen_domains = set()
# Skip domains that are NOT actual company websites
skip_domains = [
# Social media
'linkedin.com', 'facebook.com', 'twitter.com', 'instagram.com', 'tiktok.com',
# Reference/directory sites
'wikipedia.org', 'crunchbase.com', 'zoominfo.com', 'apollo.io', 'yelp.com',
'glassdoor.com', 'g2.com', 'capterra.com', 'trustpilot.com', 'bbb.org',
# News/media sites
'forbes.com', 'businessinsider.com', 'techcrunch.com', 'bloomberg.com',
'cnbc.com', 'reuters.com', 'wsj.com', 'nytimes.com', 'theverge.com',
'wired.com', 'mashable.com', 'venturebeat.com', 'inc.com', 'entrepreneur.com',
# Blog/article/review sites
'medium.com', 'hubspot.com', 'blog.', 'wordpress.com', 'blogspot.com',
'quora.com', 'reddit.com', 'youtube.com', 'vimeo.com',
# Generic/aggregator sites
'amazon.com', 'ebay.com', 'alibaba.com', 'aliexpress.com',
'google.com', 'bing.com', 'yahoo.com', 'duckduckgo.com',
# The client company itself (don't prospect yourself!)
client_company.lower().replace(' ', '') + '.com',
]
# Also skip titles that look like articles, not company names
skip_title_patterns = [
'what is', 'how to', 'guide', 'review', 'best ', 'top ', 'vs ',
' vs ', 'comparison', 'tutorial', 'tips', 'ways to', 'complete',
'everything you need', 'beginner', 'introduction', 'explained',
'2024', '2025', '2023', '[', ']', 'list of', 'examples'
]
for query in search_queries:
if len(saved_prospects) >= target_prospects:
break
try:
print(f"\n[PROSPECT DISCOVERY] Searching: {query}")
results = await self.mcp_registry.search.query(query, max_results=10)
for result in results:
if len(saved_prospects) >= target_prospects:
break
if companies_checked >= max_companies_to_check:
break
url = result.get('url', '')
title = result.get('title', '')
# Extract domain from URL
try:
parsed = urlparse(url)
domain = parsed.netloc.replace('www.', '')
if not domain or domain in seen_domains:
continue
seen_domains.add(domain)
except:
continue
# Skip non-company domains
if any(skip in domain.lower() for skip in skip_domains):
print(f"[PROSPECT DISCOVERY] ⏭️ Skipping non-company domain: {domain}")
continue
# Skip titles that look like articles, not companies
title_lower = title.lower()
if any(pattern in title_lower for pattern in skip_title_patterns):
print(f"[PROSPECT DISCOVERY] ⏭️ Skipping article title: {title[:50]}...")
continue
# Extract company name from title - be smarter about it
# Try to get actual company name, not article title
company_name = title.split(' - ')[0].split(' | ')[0].split(':')[0].strip()
# If company name is too long (probably article title), use domain
if len(company_name) > 40 or ' ' in company_name and len(company_name.split()) > 5:
# Extract company name from domain instead
company_name = domain.split('.')[0].replace('-', ' ').title()
if not company_name or len(company_name) < 2:
continue
companies_checked += 1
print(f"\n[PROSPECT DISCOVERY] Checking ({companies_checked}/{max_companies_to_check}): {company_name} ({domain})")
# Find contacts for this company
try:
contacts = await contact_finder.find_real_contacts(
company_name=company_name,
domain=domain,
target_titles=target_titles,
max_contacts=3
)
if contacts and len(contacts) > 0:
# Save prospect
prospect_id = f"prospect_{len(saved_prospects) + 1}"
company_id = domain.replace(".", "_")
prospect_data = {
"id": prospect_id,
"company": {
"id": company_id,
"name": company_name,
"domain": domain
},
"fit_score": 75,
"status": "new",
"metadata": {"source": "automated_discovery"}
}
await self.mcp_registry.store.save_prospect(prospect_data)
# Save contacts
contact_list = []
for contact in contacts:
contact_data = {
"id": contact.id,
"name": contact.name,
"email": contact.email,
"title": contact.title,
"company": company_name,
"domain": domain,
"verified": True,
"source": "web_search_and_scraping"
}
contact_list.append(contact_data)
all_contacts.append(contact_data)
await self.mcp_registry.store.save_contact({
"id": contact.id,
"company_id": company_id,
"email": contact.email,
"first_name": contact.name.split()[0] if contact.name else "",
"last_name": contact.name.split()[-1] if len(contact.name.split()) > 1 else "",
"title": contact.title
})
saved_prospects.append({
"prospect_id": prospect_id,
"company_name": company_name,
"domain": domain,
"contacts": contact_list,
"contact_count": len(contact_list)
})
print(f"[PROSPECT DISCOVERY] ✅ SAVED: {company_name} with {len(contacts)} contacts")
else:
skipped_companies.append({"name": company_name, "domain": domain, "reason": "no_contacts"})
print(f"[PROSPECT DISCOVERY] ⏭️ SKIPPED: {company_name} (no verified contacts)")
except Exception as e:
logger.debug(f"Error checking {company_name}: {str(e)}")
skipped_companies.append({"name": company_name, "domain": domain, "reason": str(e)})
continue
except Exception as e:
logger.debug(f"Search error: {str(e)}")
continue
print(f"\n[PROSPECT DISCOVERY] ========================================")
print(f"[PROSPECT DISCOVERY] DISCOVERY COMPLETE")
print(f"[PROSPECT DISCOVERY] ========================================")
print(f"[PROSPECT DISCOVERY] Prospects saved: {len(saved_prospects)}/{target_prospects}")
print(f"[PROSPECT DISCOVERY] Total contacts: {len(all_contacts)}")
print(f"[PROSPECT DISCOVERY] Companies checked: {companies_checked}")
print(f"[PROSPECT DISCOVERY] Companies skipped: {len(skipped_companies)}")
print(f"[PROSPECT DISCOVERY] ========================================\n")
return {
"status": "success" if len(saved_prospects) > 0 else "no_prospects_found",
"prospects": saved_prospects,
"prospects_count": len(saved_prospects),
"contacts_count": len(all_contacts),
"companies_checked": companies_checked,
"companies_skipped": len(skipped_companies),
"target_met": len(saved_prospects) >= target_prospects,
"message": f"Found {len(saved_prospects)} prospects with {len(all_contacts)} verified contacts. Checked {companies_checked} companies, skipped {len(skipped_companies)} (no contacts)."
}
# ============ VERIFIED CONTACT FINDER (Single Company) ============
elif tool_name == "find_verified_contacts":
from services.enhanced_contact_finder import EnhancedContactFinder
company_name = tool_input["company_name"]
company_domain = tool_input["company_domain"]
target_titles = tool_input.get("target_titles", ["CEO", "Founder", "VP Sales", "CTO", "Head of Sales"])
max_contacts = tool_input.get("max_contacts", 3)
logger.info(f"Finding verified contacts for {company_name} ({company_domain})")
contact_finder = EnhancedContactFinder(mcp_registry=self.mcp_registry)
try:
contacts = await contact_finder.find_real_contacts(
company_name=company_name,
domain=company_domain,
target_titles=target_titles,
max_contacts=max_contacts
)
contact_list = []
for contact in contacts:
contact_data = {
"id": contact.id,
"name": contact.name,
"email": contact.email,
"title": contact.title,
"company": company_name,
"domain": company_domain,
"verified": True,
"source": "web_search_and_scraping"
}
contact_list.append(contact_data)
await self.mcp_registry.store.save_contact({
"id": contact.id,
"company_id": company_domain.replace(".", "_"),
"email": contact.email,
"first_name": contact.name.split()[0] if contact.name else "",
"last_name": contact.name.split()[-1] if contact.name and len(contact.name.split()) > 1 else "",
"title": contact.title
})
if contact_list:
return {
"status": "success",
"contacts": contact_list,
"count": len(contact_list),
"message": f"Found {len(contact_list)} verified contacts at {company_name}",
"should_save_prospect": True
}
else:
return {
"status": "no_contacts_found",
"contacts": [],
"count": 0,
"message": f"No verified contacts found for {company_name}. Skip this prospect.",
"should_save_prospect": False
}
except Exception as e:
logger.error(f"Error finding contacts for {company_name}: {str(e)}")
return {
"status": "error",
"contacts": [],
"count": 0,
"message": f"Error searching for contacts: {str(e)}",
"should_save_prospect": False
}
# ============ STORE MCP SERVER ============
elif tool_name == "save_prospect":
prospect_data = {
"id": tool_input.get("prospect_id", str(uuid.uuid4())),
"company": {
"id": tool_input.get("company_id"),
"name": tool_input.get("company_name"),
"domain": tool_input.get("company_domain")
},
"fit_score": tool_input.get("fit_score", 0),
"status": tool_input.get("status", "new"),
"metadata": tool_input.get("metadata", {})
}
result = await self.mcp_registry.store.save_prospect(prospect_data)
return {"status": result, "prospect_id": prospect_data["id"]}
elif tool_name == "get_prospect":
prospect_id = tool_input["prospect_id"]
prospect = await self.mcp_registry.store.get_prospect(prospect_id)
return prospect or {"error": "Prospect not found"}
elif tool_name == "list_prospects":
prospects = await self.mcp_registry.store.list_prospects()
status_filter = tool_input.get("status")
if status_filter:
prospects = [p for p in prospects if p.get("status") == status_filter]
return {
"prospects": prospects,
"count": len(prospects)
}
elif tool_name == "save_company":
company_data = {
"id": tool_input.get("company_id", str(uuid.uuid4())),
"name": tool_input["name"],
"domain": tool_input["domain"],
"industry": tool_input.get("industry"),
"description": tool_input.get("description"),
"employee_count": tool_input.get("employee_count")
}
result = await self.mcp_registry.store.save_company(company_data)
return {"status": result, "company_id": company_data["id"]}
elif tool_name == "get_company":
company_id = tool_input["company_id"]
company = await self.mcp_registry.store.get_company(company_id)
return company or {"error": "Company not found"}
elif tool_name == "save_fact":
fact_data = {
"id": tool_input.get("fact_id", str(uuid.uuid4())),
"company_id": tool_input["company_id"],
"fact_type": tool_input["fact_type"],
"content": tool_input["content"],
"source_url": tool_input.get("source_url"),
"confidence_score": tool_input.get("confidence_score", 0.8)
}
result = await self.mcp_registry.store.save_fact(fact_data)
return {"status": result, "fact_id": fact_data["id"]}
elif tool_name == "save_contact":
contact_data = {
"id": tool_input.get("contact_id", str(uuid.uuid4())),
"company_id": tool_input["company_id"],
"email": tool_input["email"],
"first_name": tool_input.get("first_name"),
"last_name": tool_input.get("last_name"),
"title": tool_input.get("title"),
"seniority": tool_input.get("seniority")
}
result = await self.mcp_registry.store.save_contact(contact_data)
return {"status": result, "contact_id": contact_data["id"]}
elif tool_name == "list_contacts_by_domain":
domain = tool_input["domain"]
contacts = await self.mcp_registry.store.list_contacts_by_domain(domain)
return {
"contacts": contacts,
"count": len(contacts)
}
elif tool_name == "check_suppression":
supp_type = tool_input["suppression_type"]
value = tool_input["value"]
is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value)
return {
"suppressed": is_suppressed,
"value": value,
"type": supp_type
}
# ============ EMAIL MCP SERVER ============
elif tool_name == "send_email":
to = tool_input["to"]
subject = tool_input["subject"]
body = tool_input["body"]
prospect_id = tool_input["prospect_id"]
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id)
return {
"status": "sent",
"thread_id": thread_id,
"to": to
}
elif tool_name == "get_email_thread":
prospect_id = tool_input["prospect_id"]
thread = await self.mcp_registry.email.get_thread(prospect_id)
return thread or {"error": "No email thread found"}
# ============ CALENDAR MCP SERVER ============
elif tool_name == "suggest_meeting_slots":
num_slots = tool_input.get("num_slots", 3)
slots = await self.mcp_registry.calendar.suggest_slots()
return {
"slots": slots[:num_slots],
"count": len(slots[:num_slots])
}
elif tool_name == "generate_calendar_invite":
start_time = tool_input["start_time"]
end_time = tool_input["end_time"]
title = tool_input["title"]
slot = {
"start_iso": start_time,
"end_iso": end_time,
"title": title
}
ics = await self.mcp_registry.calendar.generate_ics(slot)
return {
"ics_content": ics,
"meeting": slot
}
else:
raise ValueError(f"Unknown MCP tool: {tool_name}")
|