""" Autonomous AI Agent with MCP Tool Calling using Groq API Groq offers FREE API access with fast inference on Llama, Mixtral models. No payment required - just need a free API key from console.groq.com """ import os import json import uuid import logging import asyncio from typing import List, Dict, Any, AsyncGenerator, Optional from mcp.tools.definitions import MCP_TOOLS from mcp.registry import MCPRegistry logger = logging.getLogger(__name__) # Groq FREE models GROQ_MODELS = [ "llama-3.1-70b-versatile", # Best quality, free "llama-3.1-8b-instant", # Fast, free "mixtral-8x7b-32768", # Good for complex tasks "gemma2-9b-it", # Google's model ] DEFAULT_MODEL = "llama-3.1-70b-versatile" class AutonomousMCPAgentGroq: """ AI Agent using Groq API (FREE, fast inference) Get your free API key at: https://console.groq.com """ def __init__( self, mcp_registry: MCPRegistry, api_key: str = None, model: str = None ): self.mcp_registry = mcp_registry self.api_key = api_key or os.getenv("GROQ_API_KEY") self.model = model or os.getenv("GROQ_MODEL", DEFAULT_MODEL) if not self.api_key: raise ValueError("GROQ_API_KEY is required. Get free key at https://console.groq.com") # Build tools for the prompt self.tools_description = self._build_tools_description() logger.info(f"Groq Agent initialized with model: {self.model}") def _build_tools_description(self) -> str: """Build tool descriptions for the system prompt""" tools_text = "" for tool in MCP_TOOLS: tools_text += f"\n- **{tool['name']}**: {tool['description']}" props = tool.get('input_schema', {}).get('properties', {}) required = tool.get('input_schema', {}).get('required', []) if props: tools_text += "\n Parameters:" for param, details in props.items(): req = "(required)" if param in required else "(optional)" tools_text += f"\n - {param} {req}: {details.get('description', '')}" return tools_text def _build_system_prompt(self) -> str: return f"""You are an AI sales agent with access to tools. Use tools to complete tasks. AVAILABLE TOOLS: {self.tools_description} TO USE A TOOL, respond with JSON in this exact format: ```json {{"tool": "tool_name", "parameters": {{"param1": "value1"}}}} ``` RULES: 1. Use search_web to find information 2. Use save_prospect, save_contact to store data 3. Use send_email to draft emails 4. After completing all tasks, provide a summary 5. Say "DONE" when finished Be concise and focused.""" async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]: """Run the agent on a task""" import requests yield { "type": "agent_start", "message": f"Starting task with {self.model}", "model": self.model } system_prompt = self._build_system_prompt() messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": task} ] for iteration in range(1, max_iterations + 1): yield { "type": "iteration_start", "iteration": iteration, "message": f"Iteration {iteration}: AI reasoning..." } try: # Call Groq API response = self._call_groq(messages) assistant_content = response.get("choices", [{}])[0].get("message", {}).get("content", "") if not assistant_content: continue # Check for completion if "DONE" in assistant_content.upper(): yield { "type": "thought", "thought": assistant_content.replace("DONE", "").strip(), "message": "Task complete" } yield { "type": "agent_complete", "message": "Task complete!", "final_answer": assistant_content.replace("DONE", "").strip(), "iterations": iteration } return # Try to parse tool calls tool_calls = self._parse_tool_calls(assistant_content) if tool_calls: messages.append({"role": "assistant", "content": assistant_content}) tool_results = [] for tool_call in tool_calls: tool_name = tool_call.get("tool", "") tool_params = tool_call.get("parameters", {}) yield { "type": "tool_call", "tool": tool_name, "input": tool_params, "message": f"Calling: {tool_name}" } try: result = await self._execute_tool(tool_name, tool_params) yield { "type": "tool_result", "tool": tool_name, "result": result, "message": f"Tool {tool_name} completed" } tool_results.append({"tool": tool_name, "result": result}) except Exception as e: yield { "type": "tool_error", "tool": tool_name, "error": str(e), "message": f"Tool error: {e}" } tool_results.append({"tool": tool_name, "error": str(e)}) # Add tool results to conversation results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000] messages.append({"role": "user", "content": results_text}) else: # No tool calls - just a response yield { "type": "thought", "thought": assistant_content, "message": f"AI: {assistant_content[:100]}..." } messages.append({"role": "assistant", "content": assistant_content}) messages.append({"role": "user", "content": "Continue with the task. Use tools to gather data. Say DONE when finished."}) except Exception as e: logger.error(f"Error in iteration {iteration}: {e}") yield { "type": "agent_error", "error": str(e), "message": f"Error: {e}" } return yield { "type": "agent_max_iterations", "message": f"Reached max iterations ({max_iterations})", "iterations": max_iterations } def _call_groq(self, messages: List[Dict]) -> Dict: """Call Groq API""" import requests url = "https://api.groq.com/openai/v1/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": self.model, "messages": messages, "max_tokens": 2048, "temperature": 0.7 } response = requests.post(url, headers=headers, json=payload, timeout=60) response.raise_for_status() return response.json() def _parse_tool_calls(self, text: str) -> List[Dict]: """Parse tool calls from response text""" import re tool_calls = [] # Match JSON blocks patterns = [ r'```json\s*(\{[^`]+\})\s*```', r'```\s*(\{[^`]+\})\s*```', r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})', ] for pattern in patterns: matches = re.findall(pattern, text, re.DOTALL) for match in matches: try: data = json.loads(match.strip()) if "tool" in data: tool_calls.append(data) except json.JSONDecodeError: continue return tool_calls async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: """Execute an MCP tool""" if tool_name == "search_web": query = tool_input.get("query", "") max_results = tool_input.get("max_results", 5) results = await self.mcp_registry.search.query(query, max_results=max_results) return {"results": results[:max_results], "count": len(results[:max_results])} elif tool_name == "search_news": query = tool_input.get("query", "") max_results = tool_input.get("max_results", 5) results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) return {"results": results[:max_results], "count": len(results[:max_results])} elif tool_name == "save_prospect": prospect_data = { "id": tool_input.get("prospect_id", str(uuid.uuid4())), "company": { "id": tool_input.get("company_id"), "name": tool_input.get("company_name"), "domain": tool_input.get("company_domain") }, "fit_score": tool_input.get("fit_score", 0), "status": tool_input.get("status", "new"), "metadata": tool_input.get("metadata", {}) } result = await self.mcp_registry.store.save_prospect(prospect_data) return {"status": result, "prospect_id": prospect_data["id"]} elif tool_name == "save_company": company_data = { "id": tool_input.get("company_id", str(uuid.uuid4())), "name": tool_input.get("name", ""), "domain": tool_input.get("domain", ""), "industry": tool_input.get("industry"), "description": tool_input.get("description"), "employee_count": tool_input.get("employee_count") } result = await self.mcp_registry.store.save_company(company_data) return {"status": result, "company_id": company_data["id"]} elif tool_name == "save_contact": contact_data = { "id": tool_input.get("contact_id", str(uuid.uuid4())), "company_id": tool_input.get("company_id", ""), "email": tool_input.get("email", ""), "first_name": tool_input.get("first_name"), "last_name": tool_input.get("last_name"), "title": tool_input.get("title"), "seniority": tool_input.get("seniority") } result = await self.mcp_registry.store.save_contact(contact_data) return {"status": result, "contact_id": contact_data["id"]} elif tool_name == "save_fact": fact_data = { "id": tool_input.get("fact_id", str(uuid.uuid4())), "company_id": tool_input.get("company_id", ""), "fact_type": tool_input.get("fact_type", ""), "content": tool_input.get("content", ""), "source_url": tool_input.get("source_url"), "confidence_score": tool_input.get("confidence_score", 0.8) } result = await self.mcp_registry.store.save_fact(fact_data) return {"status": result, "fact_id": fact_data["id"]} elif tool_name == "send_email": to = tool_input.get("to", "") subject = tool_input.get("subject", "") body = tool_input.get("body", "") prospect_id = tool_input.get("prospect_id", "") thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) return {"status": "sent", "thread_id": thread_id, "to": to} elif tool_name == "list_prospects": prospects = await self.mcp_registry.store.list_prospects() return {"prospects": prospects, "count": len(prospects)} elif tool_name == "get_prospect": prospect_id = tool_input.get("prospect_id", "") prospect = await self.mcp_registry.store.get_prospect(prospect_id) return prospect or {"error": "Prospect not found"} elif tool_name == "suggest_meeting_slots": slots = await self.mcp_registry.calendar.suggest_slots() return {"slots": slots[:3], "count": len(slots[:3])} else: raise ValueError(f"Unknown tool: {tool_name}")