cx_ai_agent_v1 / mcp /agents /autonomous_agent_groq.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Autonomous AI Agent with MCP Tool Calling using Groq API
Groq offers FREE API access with fast inference on Llama, Mixtral models.
No payment required - just need a free API key from console.groq.com
"""
import os
import json
import uuid
import logging
import asyncio
from typing import List, Dict, Any, AsyncGenerator, Optional
from mcp.tools.definitions import MCP_TOOLS
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
# Groq FREE models
GROQ_MODELS = [
"llama-3.1-70b-versatile", # Best quality, free
"llama-3.1-8b-instant", # Fast, free
"mixtral-8x7b-32768", # Good for complex tasks
"gemma2-9b-it", # Google's model
]
DEFAULT_MODEL = "llama-3.1-70b-versatile"
class AutonomousMCPAgentGroq:
"""
AI Agent using Groq API (FREE, fast inference)
Get your free API key at: https://console.groq.com
"""
def __init__(
self,
mcp_registry: MCPRegistry,
api_key: str = None,
model: str = None
):
self.mcp_registry = mcp_registry
self.api_key = api_key or os.getenv("GROQ_API_KEY")
self.model = model or os.getenv("GROQ_MODEL", DEFAULT_MODEL)
if not self.api_key:
raise ValueError("GROQ_API_KEY is required. Get free key at https://console.groq.com")
# Build tools for the prompt
self.tools_description = self._build_tools_description()
logger.info(f"Groq Agent initialized with model: {self.model}")
def _build_tools_description(self) -> str:
"""Build tool descriptions for the system prompt"""
tools_text = ""
for tool in MCP_TOOLS:
tools_text += f"\n- **{tool['name']}**: {tool['description']}"
props = tool.get('input_schema', {}).get('properties', {})
required = tool.get('input_schema', {}).get('required', [])
if props:
tools_text += "\n Parameters:"
for param, details in props.items():
req = "(required)" if param in required else "(optional)"
tools_text += f"\n - {param} {req}: {details.get('description', '')}"
return tools_text
def _build_system_prompt(self) -> str:
return f"""You are an AI sales agent with access to tools. Use tools to complete tasks.
AVAILABLE TOOLS:
{self.tools_description}
TO USE A TOOL, respond with JSON in this exact format:
```json
{{"tool": "tool_name", "parameters": {{"param1": "value1"}}}}
```
RULES:
1. Use search_web to find information
2. Use save_prospect, save_contact to store data
3. Use send_email to draft emails
4. After completing all tasks, provide a summary
5. Say "DONE" when finished
Be concise and focused."""
async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]:
"""Run the agent on a task"""
import requests
yield {
"type": "agent_start",
"message": f"Starting task with {self.model}",
"model": self.model
}
system_prompt = self._build_system_prompt()
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
]
for iteration in range(1, max_iterations + 1):
yield {
"type": "iteration_start",
"iteration": iteration,
"message": f"Iteration {iteration}: AI reasoning..."
}
try:
# Call Groq API
response = self._call_groq(messages)
assistant_content = response.get("choices", [{}])[0].get("message", {}).get("content", "")
if not assistant_content:
continue
# Check for completion
if "DONE" in assistant_content.upper():
yield {
"type": "thought",
"thought": assistant_content.replace("DONE", "").strip(),
"message": "Task complete"
}
yield {
"type": "agent_complete",
"message": "Task complete!",
"final_answer": assistant_content.replace("DONE", "").strip(),
"iterations": iteration
}
return
# Try to parse tool calls
tool_calls = self._parse_tool_calls(assistant_content)
if tool_calls:
messages.append({"role": "assistant", "content": assistant_content})
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call.get("tool", "")
tool_params = tool_call.get("parameters", {})
yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_params,
"message": f"Calling: {tool_name}"
}
try:
result = await self._execute_tool(tool_name, tool_params)
yield {
"type": "tool_result",
"tool": tool_name,
"result": result,
"message": f"Tool {tool_name} completed"
}
tool_results.append({"tool": tool_name, "result": result})
except Exception as e:
yield {
"type": "tool_error",
"tool": tool_name,
"error": str(e),
"message": f"Tool error: {e}"
}
tool_results.append({"tool": tool_name, "error": str(e)})
# Add tool results to conversation
results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000]
messages.append({"role": "user", "content": results_text})
else:
# No tool calls - just a response
yield {
"type": "thought",
"thought": assistant_content,
"message": f"AI: {assistant_content[:100]}..."
}
messages.append({"role": "assistant", "content": assistant_content})
messages.append({"role": "user", "content": "Continue with the task. Use tools to gather data. Say DONE when finished."})
except Exception as e:
logger.error(f"Error in iteration {iteration}: {e}")
yield {
"type": "agent_error",
"error": str(e),
"message": f"Error: {e}"
}
return
yield {
"type": "agent_max_iterations",
"message": f"Reached max iterations ({max_iterations})",
"iterations": max_iterations
}
def _call_groq(self, messages: List[Dict]) -> Dict:
"""Call Groq API"""
import requests
url = "https://api.groq.com/openai/v1/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7
}
response = requests.post(url, headers=headers, json=payload, timeout=60)
response.raise_for_status()
return response.json()
def _parse_tool_calls(self, text: str) -> List[Dict]:
"""Parse tool calls from response text"""
import re
tool_calls = []
# Match JSON blocks
patterns = [
r'```json\s*(\{[^`]+\})\s*```',
r'```\s*(\{[^`]+\})\s*```',
r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})',
]
for pattern in patterns:
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
try:
data = json.loads(match.strip())
if "tool" in data:
tool_calls.append(data)
except json.JSONDecodeError:
continue
return tool_calls
async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
"""Execute an MCP tool"""
if tool_name == "search_web":
query = tool_input.get("query", "")
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(query, max_results=max_results)
return {"results": results[:max_results], "count": len(results[:max_results])}
elif tool_name == "search_news":
query = tool_input.get("query", "")
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results)
return {"results": results[:max_results], "count": len(results[:max_results])}
elif tool_name == "save_prospect":
prospect_data = {
"id": tool_input.get("prospect_id", str(uuid.uuid4())),
"company": {
"id": tool_input.get("company_id"),
"name": tool_input.get("company_name"),
"domain": tool_input.get("company_domain")
},
"fit_score": tool_input.get("fit_score", 0),
"status": tool_input.get("status", "new"),
"metadata": tool_input.get("metadata", {})
}
result = await self.mcp_registry.store.save_prospect(prospect_data)
return {"status": result, "prospect_id": prospect_data["id"]}
elif tool_name == "save_company":
company_data = {
"id": tool_input.get("company_id", str(uuid.uuid4())),
"name": tool_input.get("name", ""),
"domain": tool_input.get("domain", ""),
"industry": tool_input.get("industry"),
"description": tool_input.get("description"),
"employee_count": tool_input.get("employee_count")
}
result = await self.mcp_registry.store.save_company(company_data)
return {"status": result, "company_id": company_data["id"]}
elif tool_name == "save_contact":
contact_data = {
"id": tool_input.get("contact_id", str(uuid.uuid4())),
"company_id": tool_input.get("company_id", ""),
"email": tool_input.get("email", ""),
"first_name": tool_input.get("first_name"),
"last_name": tool_input.get("last_name"),
"title": tool_input.get("title"),
"seniority": tool_input.get("seniority")
}
result = await self.mcp_registry.store.save_contact(contact_data)
return {"status": result, "contact_id": contact_data["id"]}
elif tool_name == "save_fact":
fact_data = {
"id": tool_input.get("fact_id", str(uuid.uuid4())),
"company_id": tool_input.get("company_id", ""),
"fact_type": tool_input.get("fact_type", ""),
"content": tool_input.get("content", ""),
"source_url": tool_input.get("source_url"),
"confidence_score": tool_input.get("confidence_score", 0.8)
}
result = await self.mcp_registry.store.save_fact(fact_data)
return {"status": result, "fact_id": fact_data["id"]}
elif tool_name == "send_email":
to = tool_input.get("to", "")
subject = tool_input.get("subject", "")
body = tool_input.get("body", "")
prospect_id = tool_input.get("prospect_id", "")
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id)
return {"status": "sent", "thread_id": thread_id, "to": to}
elif tool_name == "list_prospects":
prospects = await self.mcp_registry.store.list_prospects()
return {"prospects": prospects, "count": len(prospects)}
elif tool_name == "get_prospect":
prospect_id = tool_input.get("prospect_id", "")
prospect = await self.mcp_registry.store.get_prospect(prospect_id)
return prospect or {"error": "Prospect not found"}
elif tool_name == "suggest_meeting_slots":
slots = await self.mcp_registry.calendar.suggest_slots()
return {"slots": slots[:3], "count": len(slots[:3])}
else:
raise ValueError(f"Unknown tool: {tool_name}")