|
|
""" |
|
|
Autonomous AI Agent with MCP Tool Calling using Groq API |
|
|
|
|
|
Groq offers FREE API access with fast inference on Llama, Mixtral models. |
|
|
No payment required - just need a free API key from console.groq.com |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import logging |
|
|
import asyncio |
|
|
from typing import List, Dict, Any, AsyncGenerator, Optional |
|
|
|
|
|
from mcp.tools.definitions import MCP_TOOLS |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
GROQ_MODELS = [ |
|
|
"llama-3.1-70b-versatile", |
|
|
"llama-3.1-8b-instant", |
|
|
"mixtral-8x7b-32768", |
|
|
"gemma2-9b-it", |
|
|
] |
|
|
|
|
|
DEFAULT_MODEL = "llama-3.1-70b-versatile" |
|
|
|
|
|
|
|
|
class AutonomousMCPAgentGroq: |
|
|
""" |
|
|
AI Agent using Groq API (FREE, fast inference) |
|
|
|
|
|
Get your free API key at: https://console.groq.com |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
mcp_registry: MCPRegistry, |
|
|
api_key: str = None, |
|
|
model: str = None |
|
|
): |
|
|
self.mcp_registry = mcp_registry |
|
|
self.api_key = api_key or os.getenv("GROQ_API_KEY") |
|
|
self.model = model or os.getenv("GROQ_MODEL", DEFAULT_MODEL) |
|
|
|
|
|
if not self.api_key: |
|
|
raise ValueError("GROQ_API_KEY is required. Get free key at https://console.groq.com") |
|
|
|
|
|
|
|
|
self.tools_description = self._build_tools_description() |
|
|
|
|
|
logger.info(f"Groq Agent initialized with model: {self.model}") |
|
|
|
|
|
def _build_tools_description(self) -> str: |
|
|
"""Build tool descriptions for the system prompt""" |
|
|
tools_text = "" |
|
|
for tool in MCP_TOOLS: |
|
|
tools_text += f"\n- **{tool['name']}**: {tool['description']}" |
|
|
props = tool.get('input_schema', {}).get('properties', {}) |
|
|
required = tool.get('input_schema', {}).get('required', []) |
|
|
if props: |
|
|
tools_text += "\n Parameters:" |
|
|
for param, details in props.items(): |
|
|
req = "(required)" if param in required else "(optional)" |
|
|
tools_text += f"\n - {param} {req}: {details.get('description', '')}" |
|
|
return tools_text |
|
|
|
|
|
def _build_system_prompt(self) -> str: |
|
|
return f"""You are an AI sales agent with access to tools. Use tools to complete tasks. |
|
|
|
|
|
AVAILABLE TOOLS: |
|
|
{self.tools_description} |
|
|
|
|
|
TO USE A TOOL, respond with JSON in this exact format: |
|
|
```json |
|
|
{{"tool": "tool_name", "parameters": {{"param1": "value1"}}}} |
|
|
``` |
|
|
|
|
|
RULES: |
|
|
1. Use search_web to find information |
|
|
2. Use save_prospect, save_contact to store data |
|
|
3. Use send_email to draft emails |
|
|
4. After completing all tasks, provide a summary |
|
|
5. Say "DONE" when finished |
|
|
|
|
|
Be concise and focused.""" |
|
|
|
|
|
async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]: |
|
|
"""Run the agent on a task""" |
|
|
import requests |
|
|
|
|
|
yield { |
|
|
"type": "agent_start", |
|
|
"message": f"Starting task with {self.model}", |
|
|
"model": self.model |
|
|
} |
|
|
|
|
|
system_prompt = self._build_system_prompt() |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": task} |
|
|
] |
|
|
|
|
|
for iteration in range(1, max_iterations + 1): |
|
|
yield { |
|
|
"type": "iteration_start", |
|
|
"iteration": iteration, |
|
|
"message": f"Iteration {iteration}: AI reasoning..." |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
response = self._call_groq(messages) |
|
|
assistant_content = response.get("choices", [{}])[0].get("message", {}).get("content", "") |
|
|
|
|
|
if not assistant_content: |
|
|
continue |
|
|
|
|
|
|
|
|
if "DONE" in assistant_content.upper(): |
|
|
yield { |
|
|
"type": "thought", |
|
|
"thought": assistant_content.replace("DONE", "").strip(), |
|
|
"message": "Task complete" |
|
|
} |
|
|
yield { |
|
|
"type": "agent_complete", |
|
|
"message": "Task complete!", |
|
|
"final_answer": assistant_content.replace("DONE", "").strip(), |
|
|
"iterations": iteration |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
tool_calls = self._parse_tool_calls(assistant_content) |
|
|
|
|
|
if tool_calls: |
|
|
messages.append({"role": "assistant", "content": assistant_content}) |
|
|
tool_results = [] |
|
|
|
|
|
for tool_call in tool_calls: |
|
|
tool_name = tool_call.get("tool", "") |
|
|
tool_params = tool_call.get("parameters", {}) |
|
|
|
|
|
yield { |
|
|
"type": "tool_call", |
|
|
"tool": tool_name, |
|
|
"input": tool_params, |
|
|
"message": f"Calling: {tool_name}" |
|
|
} |
|
|
|
|
|
try: |
|
|
result = await self._execute_tool(tool_name, tool_params) |
|
|
yield { |
|
|
"type": "tool_result", |
|
|
"tool": tool_name, |
|
|
"result": result, |
|
|
"message": f"Tool {tool_name} completed" |
|
|
} |
|
|
tool_results.append({"tool": tool_name, "result": result}) |
|
|
except Exception as e: |
|
|
yield { |
|
|
"type": "tool_error", |
|
|
"tool": tool_name, |
|
|
"error": str(e), |
|
|
"message": f"Tool error: {e}" |
|
|
} |
|
|
tool_results.append({"tool": tool_name, "error": str(e)}) |
|
|
|
|
|
|
|
|
results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000] |
|
|
messages.append({"role": "user", "content": results_text}) |
|
|
else: |
|
|
|
|
|
yield { |
|
|
"type": "thought", |
|
|
"thought": assistant_content, |
|
|
"message": f"AI: {assistant_content[:100]}..." |
|
|
} |
|
|
messages.append({"role": "assistant", "content": assistant_content}) |
|
|
messages.append({"role": "user", "content": "Continue with the task. Use tools to gather data. Say DONE when finished."}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in iteration {iteration}: {e}") |
|
|
yield { |
|
|
"type": "agent_error", |
|
|
"error": str(e), |
|
|
"message": f"Error: {e}" |
|
|
} |
|
|
return |
|
|
|
|
|
yield { |
|
|
"type": "agent_max_iterations", |
|
|
"message": f"Reached max iterations ({max_iterations})", |
|
|
"iterations": max_iterations |
|
|
} |
|
|
|
|
|
def _call_groq(self, messages: List[Dict]) -> Dict: |
|
|
"""Call Groq API""" |
|
|
import requests |
|
|
|
|
|
url = "https://api.groq.com/openai/v1/chat/completions" |
|
|
headers = { |
|
|
"Authorization": f"Bearer {self.api_key}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
payload = { |
|
|
"model": self.model, |
|
|
"messages": messages, |
|
|
"max_tokens": 2048, |
|
|
"temperature": 0.7 |
|
|
} |
|
|
|
|
|
response = requests.post(url, headers=headers, json=payload, timeout=60) |
|
|
response.raise_for_status() |
|
|
return response.json() |
|
|
|
|
|
def _parse_tool_calls(self, text: str) -> List[Dict]: |
|
|
"""Parse tool calls from response text""" |
|
|
import re |
|
|
|
|
|
tool_calls = [] |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'```json\s*(\{[^`]+\})\s*```', |
|
|
r'```\s*(\{[^`]+\})\s*```', |
|
|
r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})', |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
for match in matches: |
|
|
try: |
|
|
data = json.loads(match.strip()) |
|
|
if "tool" in data: |
|
|
tool_calls.append(data) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
return tool_calls |
|
|
|
|
|
async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: |
|
|
"""Execute an MCP tool""" |
|
|
|
|
|
if tool_name == "search_web": |
|
|
query = tool_input.get("query", "") |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
results = await self.mcp_registry.search.query(query, max_results=max_results) |
|
|
return {"results": results[:max_results], "count": len(results[:max_results])} |
|
|
|
|
|
elif tool_name == "search_news": |
|
|
query = tool_input.get("query", "") |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) |
|
|
return {"results": results[:max_results], "count": len(results[:max_results])} |
|
|
|
|
|
elif tool_name == "save_prospect": |
|
|
prospect_data = { |
|
|
"id": tool_input.get("prospect_id", str(uuid.uuid4())), |
|
|
"company": { |
|
|
"id": tool_input.get("company_id"), |
|
|
"name": tool_input.get("company_name"), |
|
|
"domain": tool_input.get("company_domain") |
|
|
}, |
|
|
"fit_score": tool_input.get("fit_score", 0), |
|
|
"status": tool_input.get("status", "new"), |
|
|
"metadata": tool_input.get("metadata", {}) |
|
|
} |
|
|
result = await self.mcp_registry.store.save_prospect(prospect_data) |
|
|
return {"status": result, "prospect_id": prospect_data["id"]} |
|
|
|
|
|
elif tool_name == "save_company": |
|
|
company_data = { |
|
|
"id": tool_input.get("company_id", str(uuid.uuid4())), |
|
|
"name": tool_input.get("name", ""), |
|
|
"domain": tool_input.get("domain", ""), |
|
|
"industry": tool_input.get("industry"), |
|
|
"description": tool_input.get("description"), |
|
|
"employee_count": tool_input.get("employee_count") |
|
|
} |
|
|
result = await self.mcp_registry.store.save_company(company_data) |
|
|
return {"status": result, "company_id": company_data["id"]} |
|
|
|
|
|
elif tool_name == "save_contact": |
|
|
contact_data = { |
|
|
"id": tool_input.get("contact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input.get("company_id", ""), |
|
|
"email": tool_input.get("email", ""), |
|
|
"first_name": tool_input.get("first_name"), |
|
|
"last_name": tool_input.get("last_name"), |
|
|
"title": tool_input.get("title"), |
|
|
"seniority": tool_input.get("seniority") |
|
|
} |
|
|
result = await self.mcp_registry.store.save_contact(contact_data) |
|
|
return {"status": result, "contact_id": contact_data["id"]} |
|
|
|
|
|
elif tool_name == "save_fact": |
|
|
fact_data = { |
|
|
"id": tool_input.get("fact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input.get("company_id", ""), |
|
|
"fact_type": tool_input.get("fact_type", ""), |
|
|
"content": tool_input.get("content", ""), |
|
|
"source_url": tool_input.get("source_url"), |
|
|
"confidence_score": tool_input.get("confidence_score", 0.8) |
|
|
} |
|
|
result = await self.mcp_registry.store.save_fact(fact_data) |
|
|
return {"status": result, "fact_id": fact_data["id"]} |
|
|
|
|
|
elif tool_name == "send_email": |
|
|
to = tool_input.get("to", "") |
|
|
subject = tool_input.get("subject", "") |
|
|
body = tool_input.get("body", "") |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) |
|
|
return {"status": "sent", "thread_id": thread_id, "to": to} |
|
|
|
|
|
elif tool_name == "list_prospects": |
|
|
prospects = await self.mcp_registry.store.list_prospects() |
|
|
return {"prospects": prospects, "count": len(prospects)} |
|
|
|
|
|
elif tool_name == "get_prospect": |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
prospect = await self.mcp_registry.store.get_prospect(prospect_id) |
|
|
return prospect or {"error": "Prospect not found"} |
|
|
|
|
|
elif tool_name == "suggest_meeting_slots": |
|
|
slots = await self.mcp_registry.calendar.suggest_slots() |
|
|
return {"slots": slots[:3], "count": len(slots[:3])} |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unknown tool: {tool_name}") |
|
|
|