cx_ai_agent_v1 / mcp /agents /autonomous_agent_transformers.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Autonomous AI Agent with MCP Tool Calling using Local Transformers
This agent uses Hugging Face Transformers library to run models locally,
avoiding inference API delays and availability issues.
Uses Qwen3-0.6B for fast, local inference with tool calling support.
"""
import os
import json
import uuid
import logging
import asyncio
import re
from typing import List, Dict, Any, AsyncGenerator, Optional
from mcp.tools.definitions import MCP_TOOLS, list_all_tools
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
# Default model - small but capable
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
class AutonomousMCPAgentTransformers:
"""
AI Agent that autonomously uses MCP servers as tools using local Transformers.
Runs models locally for fast, reliable inference without API dependencies.
"""
def __init__(
self,
mcp_registry: MCPRegistry,
model_name: str = None,
device: str = None
):
"""
Initialize the autonomous agent with local Transformers
Args:
mcp_registry: MCP registry with all servers
model_name: Model to use (default: Qwen/Qwen3-0.6B)
device: Device to run on ('cuda', 'cpu', or 'auto')
"""
self.mcp_registry = mcp_registry
self.model_name = model_name or os.getenv("TRANSFORMERS_MODEL", DEFAULT_MODEL)
self.device = device or os.getenv("TRANSFORMERS_DEVICE", "auto")
# Lazy load model and tokenizer
self.pipeline = None
self.tokenizer = None
self.model = None
self._initialized = False
# Create tool definitions for the prompt
self.tools_description = self._create_tools_description()
logger.info(f"Autonomous MCP Agent (Transformers) initialized")
logger.info(f" Model: {self.model_name}")
logger.info(f" Device: {self.device}")
logger.info(f" Available tools: {len(MCP_TOOLS)}")
def _initialize_model(self):
"""Lazy initialization of the model"""
if self._initialized:
return
logger.info(f"Loading model {self.model_name}...")
try:
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch
# Determine device
if self.device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
else:
device = self.device
logger.info(f"Using device: {device}")
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True
)
# Load model with appropriate settings
model_kwargs = {
"trust_remote_code": True,
}
if device == "cuda":
model_kwargs["torch_dtype"] = torch.float16
model_kwargs["device_map"] = "auto"
else:
model_kwargs["torch_dtype"] = torch.float32
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
**model_kwargs
)
if device == "cpu":
self.model = self.model.to(device)
# Create pipeline for easier generation
self.pipeline = pipeline(
"text-generation",
model=self.model,
tokenizer=self.tokenizer,
device=None if device == "cuda" else device, # device_map handles cuda
)
self._initialized = True
logger.info(f"Model {self.model_name} loaded successfully")
except ImportError as e:
raise ImportError(
f"transformers package not installed or missing dependencies!\n"
f"Install with: pip install transformers torch\n"
f"Error: {e}"
)
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
def _create_tools_description(self) -> str:
"""Create a description of available tools for the prompt"""
tools_text = "Available tools:\n\n"
for tool in MCP_TOOLS:
tools_text += f"- **{tool['name']}**: {tool['description']}\n"
if tool.get('input_schema', {}).get('properties'):
tools_text += " Parameters:\n"
for param, details in tool['input_schema']['properties'].items():
required = param in tool['input_schema'].get('required', [])
req_str = " (required)" if required else " (optional)"
tools_text += f" - {param}{req_str}: {details.get('description', '')}\n"
tools_text += "\n"
return tools_text
def _build_system_prompt(self) -> str:
"""Build the system prompt with tool instructions"""
return f"""You are an autonomous AI agent for B2B sales automation.
You have access to MCP (Model Context Protocol) tools that let you:
- Search the web for company information and news
- Save prospects, companies, contacts, and facts to a database
- Send emails and manage email threads
- Schedule meetings and generate calendar invites
{self.tools_description}
To use a tool, respond with a JSON block in this exact format:
```tool
{{"tool": "tool_name", "parameters": {{"param1": "value1", "param2": "value2"}}}}
```
You can call multiple tools by including multiple tool blocks.
After using tools and gathering information, provide your final response.
When the task is complete, end with "TASK_COMPLETE" on a new line.
Be concise and efficient. Focus on completing the task."""
def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]:
"""Parse tool calls from the model's response"""
tool_calls = []
# Pattern to match tool JSON blocks
# Match ```tool ... ``` or ```json ... ``` or just JSON objects with "tool" key
patterns = [
r'```tool\s*\n?(.*?)\n?```',
r'```json\s*\n?(.*?)\n?```',
r'\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\}',
]
for pattern in patterns[:2]: # First two patterns use groups
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE)
for match in matches:
try:
tool_data = json.loads(match.strip())
if "tool" in tool_data:
tool_calls.append(tool_data)
except json.JSONDecodeError:
continue
# Try direct JSON pattern
direct_matches = re.findall(patterns[2], response)
for match in direct_matches:
try:
tool_data = json.loads(match)
if tool_data not in tool_calls: # Avoid duplicates
tool_calls.append(tool_data)
except json.JSONDecodeError:
continue
return tool_calls
def _generate_response(self, messages: List[Dict[str, str]], max_new_tokens: int = 512) -> str:
"""Generate a response from the model"""
self._initialize_model()
try:
# Apply chat template
inputs = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
)
# Move to model device
if hasattr(self.model, 'device'):
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
# Generate
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id,
)
# Decode only the new tokens
input_length = inputs["input_ids"].shape[-1]
response = self.tokenizer.decode(
outputs[0][input_length:],
skip_special_tokens=True
)
return response.strip()
except Exception as e:
logger.error(f"Generation error: {e}")
raise
async def run(
self,
task: str,
max_iterations: int = 10
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Run the agent autonomously on a task.
Args:
task: The task to complete
max_iterations: Maximum tool calls to prevent infinite loops
Yields:
Events showing agent's progress and tool calls
"""
yield {
"type": "agent_start",
"message": f"Autonomous AI Agent (Transformers) starting task",
"task": task,
"model": self.model_name
}
# Initialize model (lazy load)
try:
self._initialize_model()
yield {
"type": "model_loaded",
"message": f"Model {self.model_name} ready"
}
except Exception as e:
yield {
"type": "agent_error",
"error": str(e),
"message": f"Failed to load model: {e}"
}
return
# Build conversation
system_prompt = self._build_system_prompt()
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
]
iteration = 0
accumulated_results = []
while iteration < max_iterations:
iteration += 1
yield {
"type": "iteration_start",
"iteration": iteration,
"message": f"Iteration {iteration}: Thinking..."
}
try:
# Generate response
response = await asyncio.get_event_loop().run_in_executor(
None,
self._generate_response,
messages,
512
)
logger.info(f"Model response (iteration {iteration}): {response[:200]}...")
# Check for task completion
if "TASK_COMPLETE" in response:
# Extract final answer (everything before TASK_COMPLETE)
final_answer = response.replace("TASK_COMPLETE", "").strip()
yield {
"type": "thought",
"thought": final_answer,
"message": f"AI Response: {final_answer[:100]}..."
}
yield {
"type": "agent_complete",
"message": "Task complete!",
"final_answer": final_answer,
"iterations": iteration
}
return
# Parse tool calls
tool_calls = self._parse_tool_calls(response)
if tool_calls:
# Execute each tool call
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call.get("tool", "")
tool_params = tool_call.get("parameters", {})
yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_params,
"message": f"Action: {tool_name}"
}
try:
result = await self._execute_mcp_tool(tool_name, tool_params)
yield {
"type": "tool_result",
"tool": tool_name,
"result": result,
"message": f"Tool {tool_name} completed"
}
tool_results.append({
"tool": tool_name,
"result": result
})
accumulated_results.append({
"tool": tool_name,
"params": tool_params,
"result": result
})
except Exception as e:
error_msg = str(e)
logger.error(f"Tool execution failed: {tool_name} - {error_msg}")
yield {
"type": "tool_error",
"tool": tool_name,
"error": error_msg,
"message": f"Tool {tool_name} failed: {error_msg}"
}
tool_results.append({
"tool": tool_name,
"error": error_msg
})
# Add assistant response and tool results to conversation
messages.append({"role": "assistant", "content": response})
# Format tool results for the model
results_text = "Tool results:\n"
for tr in tool_results:
if "error" in tr:
results_text += f"- {tr['tool']}: Error - {tr['error']}\n"
else:
result_str = json.dumps(tr['result'], default=str)[:500]
results_text += f"- {tr['tool']}: {result_str}\n"
messages.append({"role": "user", "content": results_text})
else:
# No tool calls - this might be a thought or partial response
yield {
"type": "thought",
"thought": response,
"message": f"AI Response: {response[:100]}..."
}
# Add to conversation and prompt for continuation
messages.append({"role": "assistant", "content": response})
messages.append({
"role": "user",
"content": "Continue with the task. Use the available tools to gather information and complete the task. When done, say TASK_COMPLETE."
})
except Exception as e:
error_msg = str(e)
logger.error(f"Error in iteration {iteration}: {error_msg}", exc_info=True)
yield {
"type": "agent_error",
"error": error_msg,
"message": f"Error: {error_msg}"
}
# Try to continue if we have results
if accumulated_results:
break
return
# Max iterations reached
yield {
"type": "agent_max_iterations",
"message": f"Reached maximum iterations ({max_iterations})",
"iterations": iteration,
"accumulated_results": accumulated_results
}
async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
"""
Execute an MCP tool by routing to the appropriate MCP server.
"""
# ============ SEARCH MCP SERVER ============
if tool_name == "search_web":
query = tool_input.get("query", "")
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(query, max_results=max_results)
return {
"results": results[:max_results],
"count": len(results[:max_results])
}
elif tool_name == "search_news":
query = tool_input.get("query", "")
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results)
return {
"results": results[:max_results],
"count": len(results[:max_results])
}
# ============ STORE MCP SERVER ============
elif tool_name == "save_prospect":
prospect_data = {
"id": tool_input.get("prospect_id", str(uuid.uuid4())),
"company": {
"id": tool_input.get("company_id"),
"name": tool_input.get("company_name"),
"domain": tool_input.get("company_domain")
},
"fit_score": tool_input.get("fit_score", 0),
"status": tool_input.get("status", "new"),
"metadata": tool_input.get("metadata", {})
}
result = await self.mcp_registry.store.save_prospect(prospect_data)
return {"status": result, "prospect_id": prospect_data["id"]}
elif tool_name == "get_prospect":
prospect_id = tool_input.get("prospect_id", "")
prospect = await self.mcp_registry.store.get_prospect(prospect_id)
return prospect or {"error": "Prospect not found"}
elif tool_name == "list_prospects":
prospects = await self.mcp_registry.store.list_prospects()
status_filter = tool_input.get("status")
if status_filter:
prospects = [p for p in prospects if p.get("status") == status_filter]
return {
"prospects": prospects,
"count": len(prospects)
}
elif tool_name == "save_company":
company_data = {
"id": tool_input.get("company_id", str(uuid.uuid4())),
"name": tool_input.get("name", ""),
"domain": tool_input.get("domain", ""),
"industry": tool_input.get("industry"),
"description": tool_input.get("description"),
"employee_count": tool_input.get("employee_count")
}
result = await self.mcp_registry.store.save_company(company_data)
return {"status": result, "company_id": company_data["id"]}
elif tool_name == "get_company":
company_id = tool_input.get("company_id", "")
company = await self.mcp_registry.store.get_company(company_id)
return company or {"error": "Company not found"}
elif tool_name == "save_fact":
fact_data = {
"id": tool_input.get("fact_id", str(uuid.uuid4())),
"company_id": tool_input.get("company_id", ""),
"fact_type": tool_input.get("fact_type", ""),
"content": tool_input.get("content", ""),
"source_url": tool_input.get("source_url"),
"confidence_score": tool_input.get("confidence_score", 0.8)
}
result = await self.mcp_registry.store.save_fact(fact_data)
return {"status": result, "fact_id": fact_data["id"]}
elif tool_name == "save_contact":
contact_data = {
"id": tool_input.get("contact_id", str(uuid.uuid4())),
"company_id": tool_input.get("company_id", ""),
"email": tool_input.get("email", ""),
"first_name": tool_input.get("first_name"),
"last_name": tool_input.get("last_name"),
"title": tool_input.get("title"),
"seniority": tool_input.get("seniority")
}
result = await self.mcp_registry.store.save_contact(contact_data)
return {"status": result, "contact_id": contact_data["id"]}
elif tool_name == "list_contacts_by_domain":
domain = tool_input.get("domain", "")
contacts = await self.mcp_registry.store.list_contacts_by_domain(domain)
return {
"contacts": contacts,
"count": len(contacts)
}
elif tool_name == "check_suppression":
supp_type = tool_input.get("suppression_type", "email")
value = tool_input.get("value", "")
is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value)
return {
"suppressed": is_suppressed,
"value": value,
"type": supp_type
}
# ============ EMAIL MCP SERVER ============
elif tool_name == "send_email":
to = tool_input.get("to", "")
subject = tool_input.get("subject", "")
body = tool_input.get("body", "")
prospect_id = tool_input.get("prospect_id", "")
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id)
return {
"status": "sent",
"thread_id": thread_id,
"to": to
}
elif tool_name == "get_email_thread":
prospect_id = tool_input.get("prospect_id", "")
thread = await self.mcp_registry.email.get_thread(prospect_id)
return thread or {"error": "No email thread found"}
# ============ CALENDAR MCP SERVER ============
elif tool_name == "suggest_meeting_slots":
num_slots = tool_input.get("num_slots", 3)
slots = await self.mcp_registry.calendar.suggest_slots()
return {
"slots": slots[:num_slots],
"count": len(slots[:num_slots])
}
elif tool_name == "generate_calendar_invite":
start_time = tool_input.get("start_time", "")
end_time = tool_input.get("end_time", "")
title = tool_input.get("title", "Meeting")
slot = {
"start_iso": start_time,
"end_iso": end_time,
"title": title
}
ics = await self.mcp_registry.calendar.generate_ics(slot)
return {
"ics_content": ics,
"meeting": slot
}
else:
raise ValueError(f"Unknown MCP tool: {tool_name}")