|
|
""" |
|
|
Autonomous AI Agent with MCP Tool Calling |
|
|
|
|
|
This agent uses Claude 3.5 Sonnet (or compatible LLM) to autonomously |
|
|
decide which MCP tools to call based on the user's task. |
|
|
|
|
|
This is TRUE AI-driven MCP usage - no hardcoded workflow! |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import logging |
|
|
from typing import List, Dict, Any, AsyncGenerator |
|
|
from anthropic import AsyncAnthropic |
|
|
|
|
|
from mcp.tools.definitions import MCP_TOOLS |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AutonomousMCPAgent: |
|
|
""" |
|
|
AI Agent that autonomously uses MCP servers as tools. |
|
|
|
|
|
Key Features: |
|
|
- Uses Claude 3.5 Sonnet for tool calling |
|
|
- Autonomously decides which MCP tools to use |
|
|
- No hardcoded workflow - AI makes all decisions |
|
|
- Proper MCP protocol implementation |
|
|
""" |
|
|
|
|
|
def __init__(self, mcp_registry: MCPRegistry, api_key: str = None): |
|
|
""" |
|
|
Initialize the autonomous agent |
|
|
|
|
|
Args: |
|
|
mcp_registry: MCP registry with all servers |
|
|
api_key: Anthropic API key (or use ANTHROPIC_API_KEY env var) |
|
|
""" |
|
|
self.mcp_registry = mcp_registry |
|
|
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY") |
|
|
|
|
|
if not self.api_key: |
|
|
raise ValueError( |
|
|
"Anthropic API key required for autonomous agent. " |
|
|
"Set ANTHROPIC_API_KEY environment variable or pass api_key parameter." |
|
|
) |
|
|
|
|
|
self.client = AsyncAnthropic(api_key=self.api_key) |
|
|
self.model = "claude-3-5-sonnet-20241022" |
|
|
|
|
|
|
|
|
self.system_prompt = """You are an autonomous AI agent for B2B sales automation. |
|
|
|
|
|
You have access to MCP (Model Context Protocol) servers that provide tools for: |
|
|
- Web search (find company information, news, insights) |
|
|
- Data storage (save prospects, companies, contacts, facts) |
|
|
- Email management (send emails, track threads) |
|
|
- Calendar (schedule meetings) |
|
|
|
|
|
Your goal is to help with B2B sales tasks like: |
|
|
- Finding and researching potential customers |
|
|
- Enriching company data with facts and insights |
|
|
- Finding decision-maker contacts |
|
|
- Drafting personalized outreach emails |
|
|
- Managing prospect pipeline |
|
|
|
|
|
IMPORTANT: |
|
|
1. Think step-by-step about what information you need |
|
|
2. Use tools autonomously to gather information |
|
|
3. Save important data to the store for persistence |
|
|
4. Be thorough in research before making recommendations |
|
|
5. Always check suppression list before suggesting email sends |
|
|
|
|
|
You should: |
|
|
- Search for company information when needed |
|
|
- Save prospects and companies to the database |
|
|
- Find and save contacts |
|
|
- Generate personalized outreach based on research |
|
|
- Track your progress and findings |
|
|
|
|
|
Work autonomously - decide which tools to use and when!""" |
|
|
|
|
|
logger.info(f"Autonomous MCP Agent initialized with model: {self.model}") |
|
|
|
|
|
async def run( |
|
|
self, |
|
|
task: str, |
|
|
max_iterations: int = 15 |
|
|
) -> AsyncGenerator[Dict[str, Any], None]: |
|
|
""" |
|
|
Run the agent autonomously on a task. |
|
|
|
|
|
The agent will: |
|
|
1. Understand the task |
|
|
2. Decide which MCP tools to call |
|
|
3. Execute tools autonomously |
|
|
4. Continue until task is complete or max iterations reached |
|
|
|
|
|
Args: |
|
|
task: The task to complete (e.g., "Research and create outreach for Shopify") |
|
|
max_iterations: Maximum tool calls to prevent infinite loops |
|
|
|
|
|
Yields: |
|
|
Events showing agent's progress and tool calls |
|
|
""" |
|
|
|
|
|
yield { |
|
|
"type": "agent_start", |
|
|
"message": f"🤖 Autonomous AI Agent starting task: {task}", |
|
|
"model": self.model |
|
|
} |
|
|
|
|
|
|
|
|
messages = [ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": task |
|
|
} |
|
|
] |
|
|
|
|
|
iteration = 0 |
|
|
|
|
|
while iteration < max_iterations: |
|
|
iteration += 1 |
|
|
|
|
|
yield { |
|
|
"type": "iteration_start", |
|
|
"iteration": iteration, |
|
|
"message": f"🔄 Iteration {iteration}: AI deciding next action..." |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
response = await self.client.messages.create( |
|
|
model=self.model, |
|
|
max_tokens=4096, |
|
|
system=self.system_prompt, |
|
|
messages=messages, |
|
|
tools=MCP_TOOLS |
|
|
) |
|
|
|
|
|
|
|
|
messages.append({ |
|
|
"role": "assistant", |
|
|
"content": response.content |
|
|
}) |
|
|
|
|
|
|
|
|
tool_calls = [block for block in response.content if block.type == "tool_use"] |
|
|
|
|
|
if not tool_calls: |
|
|
|
|
|
final_text = next( |
|
|
(block.text for block in response.content if hasattr(block, "text")), |
|
|
"Task completed!" |
|
|
) |
|
|
|
|
|
yield { |
|
|
"type": "agent_complete", |
|
|
"message": f"✅ Task complete!", |
|
|
"final_response": final_text, |
|
|
"iterations": iteration |
|
|
} |
|
|
break |
|
|
|
|
|
|
|
|
tool_results = [] |
|
|
|
|
|
for tool_call in tool_calls: |
|
|
tool_name = tool_call.name |
|
|
tool_input = tool_call.input |
|
|
|
|
|
yield { |
|
|
"type": "tool_call", |
|
|
"tool": tool_name, |
|
|
"input": tool_input, |
|
|
"message": f"🔧 AI calling tool: {tool_name}" |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
result = await self._execute_mcp_tool(tool_name, tool_input) |
|
|
|
|
|
yield { |
|
|
"type": "tool_result", |
|
|
"tool": tool_name, |
|
|
"result": result, |
|
|
"message": f"✓ Tool {tool_name} completed" |
|
|
} |
|
|
|
|
|
|
|
|
tool_results.append({ |
|
|
"type": "tool_result", |
|
|
"tool_use_id": tool_call.id, |
|
|
"content": json.dumps(result, default=str) |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
logger.error(f"Tool execution failed: {tool_name} - {error_msg}") |
|
|
|
|
|
yield { |
|
|
"type": "tool_error", |
|
|
"tool": tool_name, |
|
|
"error": error_msg, |
|
|
"message": f"❌ Tool {tool_name} failed: {error_msg}" |
|
|
} |
|
|
|
|
|
tool_results.append({ |
|
|
"type": "tool_result", |
|
|
"tool_use_id": tool_call.id, |
|
|
"content": json.dumps({"error": error_msg}), |
|
|
"is_error": True |
|
|
}) |
|
|
|
|
|
|
|
|
messages.append({ |
|
|
"role": "user", |
|
|
"content": tool_results |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Agent iteration failed: {e}") |
|
|
yield { |
|
|
"type": "agent_error", |
|
|
"error": str(e), |
|
|
"message": f"❌ Agent error: {str(e)}" |
|
|
} |
|
|
break |
|
|
|
|
|
if iteration >= max_iterations: |
|
|
yield { |
|
|
"type": "agent_max_iterations", |
|
|
"message": f"⚠️ Reached maximum iterations ({max_iterations})", |
|
|
"iterations": iteration |
|
|
} |
|
|
|
|
|
async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: |
|
|
""" |
|
|
Execute an MCP tool by routing to the appropriate MCP server. |
|
|
|
|
|
This is where we actually call the MCP servers! |
|
|
""" |
|
|
|
|
|
|
|
|
if tool_name == "search_web": |
|
|
query = tool_input["query"] |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
|
|
|
results = await self.mcp_registry.search.query(query, max_results=max_results) |
|
|
return { |
|
|
"results": results, |
|
|
"count": len(results) |
|
|
} |
|
|
|
|
|
elif tool_name == "search_news": |
|
|
query = tool_input["query"] |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
|
|
|
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) |
|
|
return { |
|
|
"results": results, |
|
|
"count": len(results) |
|
|
} |
|
|
|
|
|
|
|
|
elif tool_name == "save_prospect": |
|
|
prospect_data = { |
|
|
"id": tool_input.get("prospect_id", str(uuid.uuid4())), |
|
|
"company": { |
|
|
"id": tool_input.get("company_id"), |
|
|
"name": tool_input.get("company_name"), |
|
|
"domain": tool_input.get("company_domain") |
|
|
}, |
|
|
"fit_score": tool_input.get("fit_score", 0), |
|
|
"status": tool_input.get("status", "new"), |
|
|
"metadata": tool_input.get("metadata", {}) |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_prospect(prospect_data) |
|
|
return {"status": result, "prospect_id": prospect_data["id"]} |
|
|
|
|
|
elif tool_name == "get_prospect": |
|
|
prospect_id = tool_input["prospect_id"] |
|
|
prospect = await self.mcp_registry.store.get_prospect(prospect_id) |
|
|
return prospect or {"error": "Prospect not found"} |
|
|
|
|
|
elif tool_name == "list_prospects": |
|
|
prospects = await self.mcp_registry.store.list_prospects() |
|
|
status_filter = tool_input.get("status") |
|
|
|
|
|
if status_filter: |
|
|
prospects = [p for p in prospects if p.get("status") == status_filter] |
|
|
|
|
|
return { |
|
|
"prospects": prospects, |
|
|
"count": len(prospects) |
|
|
} |
|
|
|
|
|
elif tool_name == "save_company": |
|
|
company_data = { |
|
|
"id": tool_input.get("company_id", str(uuid.uuid4())), |
|
|
"name": tool_input["name"], |
|
|
"domain": tool_input["domain"], |
|
|
"industry": tool_input.get("industry"), |
|
|
"description": tool_input.get("description"), |
|
|
"employee_count": tool_input.get("employee_count") |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_company(company_data) |
|
|
return {"status": result, "company_id": company_data["id"]} |
|
|
|
|
|
elif tool_name == "get_company": |
|
|
company_id = tool_input["company_id"] |
|
|
company = await self.mcp_registry.store.get_company(company_id) |
|
|
return company or {"error": "Company not found"} |
|
|
|
|
|
elif tool_name == "save_fact": |
|
|
fact_data = { |
|
|
"id": tool_input.get("fact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input["company_id"], |
|
|
"fact_type": tool_input["fact_type"], |
|
|
"content": tool_input["content"], |
|
|
"source_url": tool_input.get("source_url"), |
|
|
"confidence_score": tool_input.get("confidence_score", 0.8) |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_fact(fact_data) |
|
|
return {"status": result, "fact_id": fact_data["id"]} |
|
|
|
|
|
elif tool_name == "save_contact": |
|
|
contact_data = { |
|
|
"id": tool_input.get("contact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input["company_id"], |
|
|
"email": tool_input["email"], |
|
|
"first_name": tool_input.get("first_name"), |
|
|
"last_name": tool_input.get("last_name"), |
|
|
"title": tool_input.get("title"), |
|
|
"seniority": tool_input.get("seniority") |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_contact(contact_data) |
|
|
return {"status": result, "contact_id": contact_data["id"]} |
|
|
|
|
|
elif tool_name == "list_contacts_by_domain": |
|
|
domain = tool_input["domain"] |
|
|
contacts = await self.mcp_registry.store.list_contacts_by_domain(domain) |
|
|
return { |
|
|
"contacts": contacts, |
|
|
"count": len(contacts) |
|
|
} |
|
|
|
|
|
elif tool_name == "check_suppression": |
|
|
supp_type = tool_input["suppression_type"] |
|
|
value = tool_input["value"] |
|
|
|
|
|
is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value) |
|
|
return { |
|
|
"suppressed": is_suppressed, |
|
|
"value": value, |
|
|
"type": supp_type |
|
|
} |
|
|
|
|
|
|
|
|
elif tool_name == "send_email": |
|
|
to = tool_input["to"] |
|
|
subject = tool_input["subject"] |
|
|
body = tool_input["body"] |
|
|
prospect_id = tool_input["prospect_id"] |
|
|
|
|
|
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) |
|
|
return { |
|
|
"status": "sent", |
|
|
"thread_id": thread_id, |
|
|
"to": to |
|
|
} |
|
|
|
|
|
elif tool_name == "get_email_thread": |
|
|
prospect_id = tool_input["prospect_id"] |
|
|
thread = await self.mcp_registry.email.get_thread(prospect_id) |
|
|
return thread or {"error": "No email thread found"} |
|
|
|
|
|
|
|
|
elif tool_name == "suggest_meeting_slots": |
|
|
num_slots = tool_input.get("num_slots", 3) |
|
|
slots = await self.mcp_registry.calendar.suggest_slots() |
|
|
return { |
|
|
"slots": slots[:num_slots], |
|
|
"count": len(slots[:num_slots]) |
|
|
} |
|
|
|
|
|
elif tool_name == "generate_calendar_invite": |
|
|
start_time = tool_input["start_time"] |
|
|
end_time = tool_input["end_time"] |
|
|
title = tool_input["title"] |
|
|
|
|
|
slot = { |
|
|
"start_iso": start_time, |
|
|
"end_iso": end_time, |
|
|
"title": title |
|
|
} |
|
|
|
|
|
ics = await self.mcp_registry.calendar.generate_ics(slot) |
|
|
return { |
|
|
"ics_content": ics, |
|
|
"meeting": slot |
|
|
} |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unknown MCP tool: {tool_name}") |
|
|
|