|
|
""" |
|
|
Autonomous AI Agent with MCP Tool Calling using Ollama Python Client |
|
|
|
|
|
Uses the ollama Python package for LLM inference. |
|
|
Based on: https://github.com/ollama/ollama-python |
|
|
|
|
|
Example usage (from the guide): |
|
|
from ollama import chat |
|
|
response = chat( |
|
|
model='granite4:1b', |
|
|
messages=[ |
|
|
{'role': 'system', 'content': 'You are a helpful assistant.'}, |
|
|
{'role': 'user', 'content': user_input} |
|
|
], |
|
|
options={'temperature': 0.0, 'top_p': 1.0} |
|
|
) |
|
|
output = response.message.content |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import logging |
|
|
import asyncio |
|
|
from typing import List, Dict, Any, AsyncGenerator |
|
|
|
|
|
from mcp.tools.definitions import MCP_TOOLS |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
DEFAULT_MODEL = "granite4:1b" |
|
|
|
|
|
|
|
|
class AutonomousMCPAgentOllama: |
|
|
""" |
|
|
AI Agent using Ollama Python client (FREE local LLM) |
|
|
|
|
|
Uses ollama.chat() directly as per the official documentation. |
|
|
Temperature=0.0 and top_p=1.0 recommended for Granite family models. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
mcp_registry: MCPRegistry, |
|
|
model: str = None |
|
|
): |
|
|
self.mcp_registry = mcp_registry |
|
|
self.model = model or os.getenv("OLLAMA_MODEL", DEFAULT_MODEL) |
|
|
self.tools_description = self._build_tools_description() |
|
|
|
|
|
logger.info(f"Ollama Agent initialized with model: {self.model}") |
|
|
|
|
|
def _build_tools_description(self) -> str: |
|
|
"""Build tool descriptions for the system prompt""" |
|
|
tools_text = "" |
|
|
for tool in MCP_TOOLS: |
|
|
tools_text += f"\n- **{tool['name']}**: {tool['description']}" |
|
|
props = tool.get('input_schema', {}).get('properties', {}) |
|
|
required = tool.get('input_schema', {}).get('required', []) |
|
|
if props: |
|
|
tools_text += "\n Parameters:" |
|
|
for param, details in props.items(): |
|
|
req = "(required)" if param in required else "(optional)" |
|
|
tools_text += f"\n - {param} {req}: {details.get('description', '')}" |
|
|
return tools_text |
|
|
|
|
|
def _build_system_prompt(self) -> str: |
|
|
return f"""You are an AI sales agent with access to tools. |
|
|
|
|
|
AVAILABLE TOOLS: |
|
|
{self.tools_description} |
|
|
|
|
|
TO USE A TOOL, respond with JSON: |
|
|
```json |
|
|
{{"tool": "tool_name", "parameters": {{"param1": "value1"}}}} |
|
|
``` |
|
|
|
|
|
RULES: |
|
|
1. Use search_web to find information |
|
|
2. Use save_prospect, save_contact to store data |
|
|
3. Use send_email to draft emails |
|
|
4. Say "DONE" when finished with a summary |
|
|
|
|
|
Be concise.""" |
|
|
|
|
|
async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]: |
|
|
"""Run the agent on a task""" |
|
|
|
|
|
yield { |
|
|
"type": "agent_start", |
|
|
"message": f"Starting with Ollama ({self.model})", |
|
|
"model": self.model |
|
|
} |
|
|
|
|
|
system_prompt = self._build_system_prompt() |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": task} |
|
|
] |
|
|
|
|
|
for iteration in range(1, max_iterations + 1): |
|
|
yield { |
|
|
"type": "iteration_start", |
|
|
"iteration": iteration, |
|
|
"message": f"Iteration {iteration}: Thinking..." |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
response = await self._call_ollama(messages) |
|
|
assistant_content = response.get("content", "") |
|
|
|
|
|
if not assistant_content: |
|
|
continue |
|
|
|
|
|
|
|
|
if "DONE" in assistant_content.upper(): |
|
|
final_text = assistant_content.replace("DONE", "").replace("done", "").strip() |
|
|
yield { |
|
|
"type": "thought", |
|
|
"thought": final_text, |
|
|
"message": "Task complete" |
|
|
} |
|
|
yield { |
|
|
"type": "agent_complete", |
|
|
"message": "Task complete!", |
|
|
"final_answer": final_text, |
|
|
"iterations": iteration |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
tool_calls = self._parse_tool_calls(assistant_content) |
|
|
|
|
|
if tool_calls: |
|
|
messages.append({"role": "assistant", "content": assistant_content}) |
|
|
tool_results = [] |
|
|
|
|
|
for tool_call in tool_calls: |
|
|
tool_name = tool_call.get("tool", "") |
|
|
tool_params = tool_call.get("parameters", {}) |
|
|
|
|
|
yield { |
|
|
"type": "tool_call", |
|
|
"tool": tool_name, |
|
|
"input": tool_params, |
|
|
"message": f"Calling: {tool_name}" |
|
|
} |
|
|
|
|
|
try: |
|
|
result = await self._execute_tool(tool_name, tool_params) |
|
|
yield { |
|
|
"type": "tool_result", |
|
|
"tool": tool_name, |
|
|
"result": result, |
|
|
"message": f"{tool_name} completed" |
|
|
} |
|
|
tool_results.append({"tool": tool_name, "result": result}) |
|
|
except Exception as e: |
|
|
yield { |
|
|
"type": "tool_error", |
|
|
"tool": tool_name, |
|
|
"error": str(e) |
|
|
} |
|
|
tool_results.append({"tool": tool_name, "error": str(e)}) |
|
|
|
|
|
|
|
|
results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000] |
|
|
messages.append({"role": "user", "content": results_text}) |
|
|
else: |
|
|
|
|
|
yield { |
|
|
"type": "thought", |
|
|
"thought": assistant_content, |
|
|
"message": f"AI: {assistant_content[:100]}..." |
|
|
} |
|
|
messages.append({"role": "assistant", "content": assistant_content}) |
|
|
messages.append({"role": "user", "content": "Continue. Use tools to complete the task. Say DONE when finished."}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error: {e}") |
|
|
yield { |
|
|
"type": "agent_error", |
|
|
"error": str(e), |
|
|
"message": f"Error: {e}" |
|
|
} |
|
|
return |
|
|
|
|
|
yield { |
|
|
"type": "agent_max_iterations", |
|
|
"message": f"Reached max iterations ({max_iterations})", |
|
|
"iterations": max_iterations |
|
|
} |
|
|
|
|
|
async def _call_ollama(self, messages: List[Dict]) -> Dict: |
|
|
""" |
|
|
Call Ollama using the official Python client. |
|
|
|
|
|
Uses ollama.chat() directly as per the guide: |
|
|
https://github.com/ollama/ollama-python |
|
|
|
|
|
Temperature=0.0 and top_p=1.0 recommended for Granite models. |
|
|
""" |
|
|
try: |
|
|
from ollama import chat, ResponseError |
|
|
except ImportError: |
|
|
raise ImportError("ollama package not installed. Run: pip install ollama") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: chat( |
|
|
model=self.model, |
|
|
messages=messages, |
|
|
options={ |
|
|
"temperature": 0.0, |
|
|
"top_p": 1.0 |
|
|
} |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
content = "" |
|
|
if hasattr(response, 'message') and hasattr(response.message, 'content'): |
|
|
content = response.message.content |
|
|
elif isinstance(response, dict): |
|
|
content = response.get("message", {}).get("content", "") |
|
|
|
|
|
return {"content": content} |
|
|
|
|
|
except ResponseError as e: |
|
|
|
|
|
logger.error(f"Ollama ResponseError: {e}") |
|
|
raise Exception(f"Ollama error: {e}. Make sure Ollama is running and the model '{self.model}' is pulled.") |
|
|
except Exception as e: |
|
|
logger.error(f"Ollama call failed: {e}") |
|
|
raise Exception(f"Ollama error: {e}") |
|
|
|
|
|
def _parse_tool_calls(self, text: str) -> List[Dict]: |
|
|
"""Parse tool calls from response""" |
|
|
import re |
|
|
|
|
|
tool_calls = [] |
|
|
patterns = [ |
|
|
r'```json\s*(\{[^`]+\})\s*```', |
|
|
r'```\s*(\{[^`]+\})\s*```', |
|
|
r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})', |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
for match in matches: |
|
|
try: |
|
|
data = json.loads(match.strip()) |
|
|
if "tool" in data: |
|
|
tool_calls.append(data) |
|
|
except: |
|
|
continue |
|
|
|
|
|
return tool_calls |
|
|
|
|
|
async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: |
|
|
"""Execute an MCP tool""" |
|
|
|
|
|
if tool_name == "search_web": |
|
|
query = tool_input.get("query", "") |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
results = await self.mcp_registry.search.query(query, max_results=max_results) |
|
|
return {"results": results[:max_results], "count": len(results[:max_results])} |
|
|
|
|
|
elif tool_name == "search_news": |
|
|
query = tool_input.get("query", "") |
|
|
max_results = tool_input.get("max_results", 5) |
|
|
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) |
|
|
return {"results": results[:max_results], "count": len(results[:max_results])} |
|
|
|
|
|
elif tool_name == "save_prospect": |
|
|
prospect_data = { |
|
|
"id": tool_input.get("prospect_id", str(uuid.uuid4())), |
|
|
"company": { |
|
|
"id": tool_input.get("company_id"), |
|
|
"name": tool_input.get("company_name"), |
|
|
"domain": tool_input.get("company_domain") |
|
|
}, |
|
|
"fit_score": tool_input.get("fit_score", 0), |
|
|
"status": tool_input.get("status", "new"), |
|
|
"metadata": tool_input.get("metadata", {}) |
|
|
} |
|
|
result = await self.mcp_registry.store.save_prospect(prospect_data) |
|
|
return {"status": result, "prospect_id": prospect_data["id"]} |
|
|
|
|
|
elif tool_name == "save_company": |
|
|
company_data = { |
|
|
"id": tool_input.get("company_id", str(uuid.uuid4())), |
|
|
"name": tool_input.get("name", ""), |
|
|
"domain": tool_input.get("domain", ""), |
|
|
"industry": tool_input.get("industry"), |
|
|
"description": tool_input.get("description"), |
|
|
"employee_count": tool_input.get("employee_count") |
|
|
} |
|
|
result = await self.mcp_registry.store.save_company(company_data) |
|
|
return {"status": result, "company_id": company_data["id"]} |
|
|
|
|
|
elif tool_name == "save_contact": |
|
|
contact_data = { |
|
|
"id": tool_input.get("contact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input.get("company_id", ""), |
|
|
"email": tool_input.get("email", ""), |
|
|
"first_name": tool_input.get("first_name"), |
|
|
"last_name": tool_input.get("last_name"), |
|
|
"title": tool_input.get("title"), |
|
|
"seniority": tool_input.get("seniority") |
|
|
} |
|
|
result = await self.mcp_registry.store.save_contact(contact_data) |
|
|
return {"status": result, "contact_id": contact_data["id"]} |
|
|
|
|
|
elif tool_name == "save_fact": |
|
|
fact_data = { |
|
|
"id": tool_input.get("fact_id", str(uuid.uuid4())), |
|
|
"company_id": tool_input.get("company_id", ""), |
|
|
"fact_type": tool_input.get("fact_type", ""), |
|
|
"content": tool_input.get("content", ""), |
|
|
"source_url": tool_input.get("source_url"), |
|
|
"confidence_score": tool_input.get("confidence_score", 0.8) |
|
|
} |
|
|
result = await self.mcp_registry.store.save_fact(fact_data) |
|
|
return {"status": result, "fact_id": fact_data["id"]} |
|
|
|
|
|
elif tool_name == "send_email": |
|
|
to = tool_input.get("to", "") |
|
|
subject = tool_input.get("subject", "") |
|
|
body = tool_input.get("body", "") |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) |
|
|
return {"status": "drafted", "thread_id": thread_id, "to": to} |
|
|
|
|
|
elif tool_name == "list_prospects": |
|
|
prospects = await self.mcp_registry.store.list_prospects() |
|
|
return {"prospects": prospects, "count": len(prospects)} |
|
|
|
|
|
elif tool_name == "get_prospect": |
|
|
prospect_id = tool_input.get("prospect_id", "") |
|
|
prospect = await self.mcp_registry.store.get_prospect(prospect_id) |
|
|
return prospect or {"error": "Not found"} |
|
|
|
|
|
elif tool_name == "suggest_meeting_slots": |
|
|
slots = await self.mcp_registry.calendar.suggest_slots() |
|
|
return {"slots": slots[:3], "count": len(slots[:3])} |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unknown tool: {tool_name}") |
|
|
|