|
|
""" |
|
|
Autonomous AI Agent with MCP Tool Calling using Local Transformers |
|
|
|
|
|
This agent uses Hugging Face Transformers library to run models locally, |
|
|
avoiding inference API delays and availability issues. |
|
|
|
|
|
Uses Qwen3-0.6B for fast, local inference with tool calling support. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import logging |
|
|
import asyncio |
|
|
import re |
|
|
from typing import List, Dict, Any, AsyncGenerator, Optional |
|
|
|
|
|
from mcp.tools.definitions import MCP_TOOLS, list_all_tools |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
DEFAULT_MODEL = "Qwen/Qwen3-0.6B" |
|
|
|
|
|
|
|
|
class AutonomousMCPAgentTransformers: |
|
|
""" |
|
|
AI Agent that autonomously uses MCP servers as tools using local Transformers. |
|
|
|
|
|
Runs models locally for fast, reliable inference without API dependencies. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
mcp_registry: MCPRegistry, |
|
|
model_name: str = None, |
|
|
device: str = None |
|
|
): |
|
|
""" |
|
|
Initialize the autonomous agent with local Transformers |
|
|
|
|
|
Args: |
|
|
mcp_registry: MCP registry with all servers |
|
|
model_name: Model to use (default: Qwen/Qwen3-0.6B) |
|
|
device: Device to run on ('cuda', 'cpu', or 'auto') |
|
|
""" |
|
|
self.mcp_registry = mcp_registry |
|
|
self.model_name = model_name or os.getenv("TRANSFORMERS_MODEL", DEFAULT_MODEL) |
|
|
self.device = device or os.getenv("TRANSFORMERS_DEVICE", "auto") |
|
|
|
|
|
|
|
|
self.pipeline = None |
|
|
self.tokenizer = None |
|
|
self.model = None |
|
|
self._initialized = False |
|
|
|
|
|
|
|
|
self.tools_description = self._create_tools_description() |
|
|
|
|
|
logger.info(f"Autonomous MCP Agent (Transformers) initialized") |
|
|
logger.info(f" Model: {self.model_name}") |
|
|
logger.info(f" Device: {self.device}") |
|
|
logger.info(f" Available tools: {len(MCP_TOOLS)}") |
|
|
|
|
|
def _initialize_model(self): |
|
|
"""Lazy initialization of the model""" |
|
|
if self._initialized: |
|
|
return |
|
|
|
|
|
logger.info(f"Loading model {self.model_name}...") |
|
|
|
|
|
try: |
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM |
|
|
import torch |
|
|
|
|
|
|
|
|
if self.device == "auto": |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
else: |
|
|
device = self.device |
|
|
|
|
|
logger.info(f"Using device: {device}") |
|
|
|
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained( |
|
|
self.model_name, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
|
|
|
model_kwargs = { |
|
|
"trust_remote_code": True, |
|
|
} |
|
|
|
|
|
if device == "cuda": |
|
|
model_kwargs["torch_dtype"] = torch.float16 |
|
|
model_kwargs["device_map"] = "auto" |
|
|
else: |
|
|
model_kwargs["torch_dtype"] = torch.float32 |
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
self.model_name, |
|
|
**model_kwargs |
|
|
) |
|
|
|
|
|
if device == "cpu": |
|
|
self.model = self.model.to(device) |
|
|
|
|
|
|
|
|
self.pipeline = pipeline( |
|
|
"text-generation", |
|
|
model=self.model, |
|
|
tokenizer=self.tokenizer, |
|
|
device=None if device == "cuda" else device, |
|
|
) |
|
|
|
|
|
self._initialized = True |
|
|
logger.info(f"Model {self.model_name} loaded successfully") |
|
|
|
|
|
except ImportError as e: |
|
|
raise ImportError( |
|
|
f"transformers package not installed or missing dependencies!\n" |
|
|
f"Install with: pip install transformers torch\n" |
|
|
f"Error: {e}" |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load model: {e}") |
|
|
raise |
|
|
|
|
|
def _create_tools_description(self) -> str: |
|
|
"""Create a description of available tools for the prompt""" |
|
|
tools_text = "Available tools:\n\n" |
|
|
|
|
|
for tool in MCP_TOOLS: |
|
|
tools_text += f"- **{tool['name']}**: {tool['description']}\n" |
|
|
if tool.get('input_schema', {}).get('properties'): |
|
|
tools_text += " Parameters:\n" |
|
|
for param, details in tool['input_schema']['properties'].items(): |
|
|
required = param in tool['input_schema'].get('required', []) |
|
|
req_str = " (required)" if required else " (optional)" |
|
|
tools_text += f" - {param}{req_str}: {details.get('description', '')}\n" |
|
|
tools_text += "\n" |
|
|
|
|
|
return tools_text |
|
|
|
|
|
def _build_system_prompt(self) -> str: |
|
|
"""Build the system prompt with tool instructions""" |
|
|
return f"""You are an autonomous AI agent for B2B sales automation. |
|
|
|
|
|
You have access to MCP (Model Context Protocol) tools that let you: |
|
|
- Search the web for company information and news |
|
|
- Save prospects, companies, contacts, and facts to a database |
|
|
- Send emails and manage email threads |
|
|
- Schedule meetings and generate calendar invites |
|
|
|
|
|
{self.tools_description} |
|
|
|
|
|
To use a tool, respond with a JSON block in this exact format: |
|
|
```tool |
|
|
{{"tool": "tool_name", "parameters": {{"param1": "value1", "param2": "value2"}}}} |
|
|
``` |
|
|
|
|
|
You can call multiple tools by including multiple tool blocks. |
|
|
|
|
|
After using tools and gathering information, provide your final response. |
|
|
When the task is complete, end with "TASK_COMPLETE" on a new line. |
|
|
|
|
|
Be concise and efficient. Focus on completing the task.""" |
|
|
|
|
|
def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
|
|
"""Parse tool calls from the model's response""" |
|
|
tool_calls = [] |
|
|
|
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'```tool\s*\n?(.*?)\n?```', |
|
|
r'```json\s*\n?(.*?)\n?```', |
|
|
r'\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\}', |
|
|
] |
|
|
|
|
|
for pattern in patterns[:2]: |
|
|
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) |
|
|
for match in matches: |
|
|
try: |
|
|
tool_data = json.loads(match.strip()) |
|
|
if "tool" in tool_data: |
|
|
tool_calls.append(tool_data) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
|
|
|
direct_matches = re.findall(patterns[2], response) |
|
|
for match in direct_matches: |
|
|
try: |
|
|
tool_data = json.loads(match) |
|
|
if tool_data not in tool_calls: |
|
|
tool_calls.append(tool_data) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
return tool_calls |
|
|
|
|
|
def _generate_response(self, messages: List[Dict[str, str]], max_new_tokens: int = 512) -> str: |
|
|
"""Generate a response from the model""" |
|
|
self._initialize_model() |
|
|
|
|
|
try: |
|
|
|
|
|
inputs = self.tokenizer.apply_chat_template( |
|
|
messages, |
|
|
add_generation_prompt=True, |
|
|
tokenize=True, |
|
|
return_dict=True, |
|
|
return_tensors="pt", |
|
|
) |
|
|
|
|
|
|
|
|
if hasattr(self.model, 'device'): |
|
|
inputs = {k: v.to(self.model.device) for k, v in inputs.items()} |
|
|
|
|
|
|
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=max_new_tokens, |
|
|
do_sample=True, |
|
|
temperature=0.7, |
|
|
top_p=0.9, |
|
|
pad_token_id=self.tokenizer.eos_token_id, |
|
|
) |
|
|
|
|
|
|
|
|
input_length = inputs["input_ids"].shape[-1] |
|
|
response = self.tokenizer.decode( |
|
|
outputs[0][input_length:], |
|
|
skip_special_tokens=True |
|
|
) |
|
|
|
|
|
return response.strip() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Generation error: {e}") |
|
|
raise |
|
|
|
|
|
async def run( |
|
|
self, |
|
|
task: str, |
|
|
max_iterations: int = 10 |
|
|
) -> AsyncGenerator[Dict[str, Any], None]: |
|
|
""" |
|
|
Run the agent autonomously on a task. |
|
|
|
|
|
Args: |
|
|
task: The task to complete |
|
|
max_iterations: Maximum tool calls to prevent infinite loops |
|
|
|
|
|
Yields: |
|
|
Events showing agent's progress and tool calls |
|
|
""" |
|
|
|
|
|
yield { |
|
|
"type": "agent_start", |
|
|
"message": f"Autonomous AI Agent (Transformers) starting task", |
|
|
"task": task, |
|
|
"model": self.model_name |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
self._initialize_model() |
|
|
yield { |
|
|
"type": "model_loaded", |
|
|
"message": f"Model {self.model_name} ready" |
|
|
} |
|
|
except Exception as e: |
|
|
yield { |
|
|
"type": "agent_error", |
|
|
"error": str(e), |
|
|
"message": f"Failed to load model: {e}" |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
system_prompt = self._build_system_prompt() |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": task} |
|
|
] |
|
|
|
|
|
iteration = 0 |
|
|
accumulated_results = [] |
|
|
|
|
|
while iteration < max_iterations: |
|
|
iteration += 1 |
|
|
|
|
|
yield { |
|
|
"type": "iteration_start", |
|
|
"iteration": iteration, |
|
|
"message": f"Iteration {iteration}: Thinking..." |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
response = await asyncio.get_event_loop().run_in_executor( |
|
|
None, |
|
|
self._generate_response, |
|
|
messages, |
|
|
512 |
|
|
) |
|
|
|
|
|
logger.info(f"Model response (iteration {iteration}): {response[:200]}...") |
|
|
|
|
|
|
|
|
if "TASK_COMPLETE" in response: |
|
|
|
|
|
final_answer = response.replace("TASK_COMPLETE", "").strip() |
|
|
|
|
|
yield { |
|
|
"type": "thought", |
|
|
"thought": final_answer, |
|
|
"message": f"AI Response: {final_answer[:100]}..." |
|
|
} |
|
|
|
|
|
yield { |
|
|
"type": "agent_complete", |
|
|
"message": "Task complete!", |
|
|
"final_answer": final_answer, |
|
|
"iterations": iteration |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
tool_calls = self._parse_tool_calls(response) |
|
|
|
|
|
if tool_calls: |
|
|
|
|
|
tool_results = [] |
|
|
|
|
|
for tool_call in tool_calls: |
|
|
tool_name = tool_call.get("tool", "") |
|
|
tool_params = tool_call.get("parameters", {}) |
|
|
|
|
|
yield { |
|
|
"type": "tool_call", |
|
|
"tool": tool_name, |
|
|
"input": tool_params, |
|
|
"message": f"Action: {tool_name}" |
|
|
} |
|
|
|
|
|
try: |
|
|
result = await self._execute_mcp_tool(tool_name, tool_params) |
|
|
|
|
|
yield { |
|
|
"type": "tool_result", |
|
|
"tool": tool_name, |
|
|
"result": result, |
|
|
"message": f"Tool {tool_name} completed" |
|
|
} |
|
|
|
|
|
tool_results.append({ |
|
|
"tool": tool_name, |
|
|
"result": result |
|
|
}) |
|
|
accumulated_results.append({ |
|
|
"tool": tool_name, |
|
|
"params": tool_params, |
|
|
"result": result |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
logger.error(f"Tool execution failed: {tool_name} - {error_msg}") |
|
|
|
|
|
yield { |
|
|
"type": "tool_error", |
|
|
"tool": tool_name, |
|
|
"error": error_msg, |
|
|
"message": f"Tool {tool_name} failed: {error_msg}" |
|
|
} |
|
|
|
|
|
tool_results.append({ |
|
|
"tool": tool_name, |
|
|
"error": error_msg |
|
|
}) |
|
|
|
|
|
|
|
|
messages.append({"role": "assistant", "content": response}) |
|
|
|
|
|
|
|
|
results_text = "Tool results:\n" |
|
|
for tr in tool_results: |
|
|
if "error" in tr: |
|
|
results_text += f"- {tr['tool']}: Error - {tr['error']}\n" |
|
|
else: |
|
|
result_str = json.dumps(tr['result'], default=str)[:500] |
|
|
results_text += f"- {tr['tool']}: {result_str}\n" |
|
|
|
|
|
messages.append({"role": "user", "content": results_text}) |
|
|
|
|
|
else: |
|
|
|
|
|
yield { |
|
|
"type": "thought", |
|
|
"thought": response, |
|
|
"message": f"AI Response: {response[:100]}..." |
|
|
} |
|
|
|
|
|
|
|
|
messages.append({"role": "assistant", "content": response}) |
|
|
messages.append({ |
|
|
"role": "user", |
|
|
"content": "Continue with the task. Use the available tools to gather information and complete the task. When done, say TASK_COMPLETE." |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
logger.error(f"Error in iteration {iteration}: {error_msg}", exc_info=True) |
|
|
|
|
|
yield { |
|
|
"type": "agent_error", |
|
|
"error": error_msg, |
|
|
"message": f"Error: {error_msg}" |
|
|
} |
|
|
|
|
|
|
|
|
if accumulated_results: |
|
|
break |
|
|
return |
|
|
|
|
|
|
|
|
yield { |
|
|
"type": "agent_max_iterations", |
|
|
"message": f"Reached maximum iterations ({max_iterations})", |
|
|
"iterations": iteration, |
|
|
"accumulated_results": accumulated_results |
|
|
} |
|
|
|
|
|
async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: |
|
|
""" |
|
|
Execute an MCP tool by routing to the appropriate MCP server. |
|
|
""" |
|
|
|
|
|
|
|
|
if tool_name == "search_web": |
|
|
query = tool_input.get("query", "") |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
|
|
|
results = await self.mcp_registry.search.query(query, max_results=max_results) |
|
|
return { |
|
|
"results": results[:max_results], |
|
|
"count": len(results[:max_results]) |
|
|
} |
|
|
|
|
|
elif tool_name == "search_news": |
|
|
query = tool_input.get("query", "") |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
|
|
|
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) |
|
|
return { |
|
|
"results": results[:max_results], |
|
|
"count": len(results[:max_results]) |
|
|
} |
|
|
|
|
|
|
|
|
elif tool_name == "save_prospect": |
|
|
prospect_data = { |
|
|
"id": tool_input.get("prospect_id", str(uuid.uuid4())), |
|
|
"company": { |
|
|
"id": tool_input.get("company_id"), |
|
|
"name": tool_input.get("company_name"), |
|
|
"domain": tool_input.get("company_domain") |
|
|
}, |
|
|
"fit_score": tool_input.get("fit_score", 0), |
|
|
"status": tool_input.get("status", "new"), |
|
|
"metadata": tool_input.get("metadata", {}) |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_prospect(prospect_data) |
|
|
return {"status": result, "prospect_id": prospect_data["id"]} |
|
|
|
|
|
elif tool_name == "get_prospect": |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
prospect = await self.mcp_registry.store.get_prospect(prospect_id) |
|
|
return prospect or {"error": "Prospect not found"} |
|
|
|
|
|
elif tool_name == "list_prospects": |
|
|
prospects = await self.mcp_registry.store.list_prospects() |
|
|
status_filter = tool_input.get("status") |
|
|
|
|
|
if status_filter: |
|
|
prospects = [p for p in prospects if p.get("status") == status_filter] |
|
|
|
|
|
return { |
|
|
"prospects": prospects, |
|
|
"count": len(prospects) |
|
|
} |
|
|
|
|
|
elif tool_name == "save_company": |
|
|
company_data = { |
|
|
"id": tool_input.get("company_id", str(uuid.uuid4())), |
|
|
"name": tool_input.get("name", ""), |
|
|
"domain": tool_input.get("domain", ""), |
|
|
"industry": tool_input.get("industry"), |
|
|
"description": tool_input.get("description"), |
|
|
"employee_count": tool_input.get("employee_count") |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_company(company_data) |
|
|
return {"status": result, "company_id": company_data["id"]} |
|
|
|
|
|
elif tool_name == "get_company": |
|
|
company_id = tool_input.get("company_id", "") |
|
|
company = await self.mcp_registry.store.get_company(company_id) |
|
|
return company or {"error": "Company not found"} |
|
|
|
|
|
elif tool_name == "save_fact": |
|
|
fact_data = { |
|
|
"id": tool_input.get("fact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input.get("company_id", ""), |
|
|
"fact_type": tool_input.get("fact_type", ""), |
|
|
"content": tool_input.get("content", ""), |
|
|
"source_url": tool_input.get("source_url"), |
|
|
"confidence_score": tool_input.get("confidence_score", 0.8) |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_fact(fact_data) |
|
|
return {"status": result, "fact_id": fact_data["id"]} |
|
|
|
|
|
elif tool_name == "save_contact": |
|
|
contact_data = { |
|
|
"id": tool_input.get("contact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input.get("company_id", ""), |
|
|
"email": tool_input.get("email", ""), |
|
|
"first_name": tool_input.get("first_name"), |
|
|
"last_name": tool_input.get("last_name"), |
|
|
"title": tool_input.get("title"), |
|
|
"seniority": tool_input.get("seniority") |
|
|
} |
|
|
|
|
|
result = await self.mcp_registry.store.save_contact(contact_data) |
|
|
return {"status": result, "contact_id": contact_data["id"]} |
|
|
|
|
|
elif tool_name == "list_contacts_by_domain": |
|
|
domain = tool_input.get("domain", "") |
|
|
contacts = await self.mcp_registry.store.list_contacts_by_domain(domain) |
|
|
return { |
|
|
"contacts": contacts, |
|
|
"count": len(contacts) |
|
|
} |
|
|
|
|
|
elif tool_name == "check_suppression": |
|
|
supp_type = tool_input.get("suppression_type", "email") |
|
|
value = tool_input.get("value", "") |
|
|
|
|
|
is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value) |
|
|
return { |
|
|
"suppressed": is_suppressed, |
|
|
"value": value, |
|
|
"type": supp_type |
|
|
} |
|
|
|
|
|
|
|
|
elif tool_name == "send_email": |
|
|
to = tool_input.get("to", "") |
|
|
subject = tool_input.get("subject", "") |
|
|
body = tool_input.get("body", "") |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
|
|
|
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) |
|
|
return { |
|
|
"status": "sent", |
|
|
"thread_id": thread_id, |
|
|
"to": to |
|
|
} |
|
|
|
|
|
elif tool_name == "get_email_thread": |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
thread = await self.mcp_registry.email.get_thread(prospect_id) |
|
|
return thread or {"error": "No email thread found"} |
|
|
|
|
|
|
|
|
elif tool_name == "suggest_meeting_slots": |
|
|
num_slots = tool_input.get("num_slots", 3) |
|
|
slots = await self.mcp_registry.calendar.suggest_slots() |
|
|
return { |
|
|
"slots": slots[:num_slots], |
|
|
"count": len(slots[:num_slots]) |
|
|
} |
|
|
|
|
|
elif tool_name == "generate_calendar_invite": |
|
|
start_time = tool_input.get("start_time", "") |
|
|
end_time = tool_input.get("end_time", "") |
|
|
title = tool_input.get("title", "Meeting") |
|
|
|
|
|
slot = { |
|
|
"start_iso": start_time, |
|
|
"end_iso": end_time, |
|
|
"title": title |
|
|
} |
|
|
|
|
|
ics = await self.mcp_registry.calendar.generate_ics(slot) |
|
|
return { |
|
|
"ics_content": ics, |
|
|
"meeting": slot |
|
|
} |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unknown MCP tool: {tool_name}") |
|
|
|