cx_ai_agent_v1 / mcp /agents /autonomous_agent_ollama.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Autonomous AI Agent with MCP Tool Calling using Ollama Python Client
Uses the ollama Python package for LLM inference.
Based on: https://github.com/ollama/ollama-python
Example usage (from the guide):
from ollama import chat
response = chat(
model='granite4:1b',
messages=[
{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': user_input}
],
options={'temperature': 0.0, 'top_p': 1.0}
)
output = response.message.content
"""
import os
import json
import uuid
import logging
import asyncio
from typing import List, Dict, Any, AsyncGenerator
from mcp.tools.definitions import MCP_TOOLS
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
# Default model - IBM Granite 4 1B
DEFAULT_MODEL = "granite4:1b"
class AutonomousMCPAgentOllama:
"""
AI Agent using Ollama Python client (FREE local LLM)
Uses ollama.chat() directly as per the official documentation.
Temperature=0.0 and top_p=1.0 recommended for Granite family models.
"""
def __init__(
self,
mcp_registry: MCPRegistry,
model: str = None
):
self.mcp_registry = mcp_registry
self.model = model or os.getenv("OLLAMA_MODEL", DEFAULT_MODEL)
self.tools_description = self._build_tools_description()
logger.info(f"Ollama Agent initialized with model: {self.model}")
def _build_tools_description(self) -> str:
"""Build tool descriptions for the system prompt"""
tools_text = ""
for tool in MCP_TOOLS:
tools_text += f"\n- **{tool['name']}**: {tool['description']}"
props = tool.get('input_schema', {}).get('properties', {})
required = tool.get('input_schema', {}).get('required', [])
if props:
tools_text += "\n Parameters:"
for param, details in props.items():
req = "(required)" if param in required else "(optional)"
tools_text += f"\n - {param} {req}: {details.get('description', '')}"
return tools_text
def _build_system_prompt(self) -> str:
return f"""You are an AI sales agent with access to tools.
AVAILABLE TOOLS:
{self.tools_description}
TO USE A TOOL, respond with JSON:
```json
{{"tool": "tool_name", "parameters": {{"param1": "value1"}}}}
```
RULES:
1. Use search_web to find information
2. Use save_prospect, save_contact to store data
3. Use send_email to draft emails
4. Say "DONE" when finished with a summary
Be concise."""
async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]:
"""Run the agent on a task"""
yield {
"type": "agent_start",
"message": f"Starting with Ollama ({self.model})",
"model": self.model
}
system_prompt = self._build_system_prompt()
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
]
for iteration in range(1, max_iterations + 1):
yield {
"type": "iteration_start",
"iteration": iteration,
"message": f"Iteration {iteration}: Thinking..."
}
try:
# Call Ollama using the Python client
response = await self._call_ollama(messages)
assistant_content = response.get("content", "")
if not assistant_content:
continue
# Check for completion
if "DONE" in assistant_content.upper():
final_text = assistant_content.replace("DONE", "").replace("done", "").strip()
yield {
"type": "thought",
"thought": final_text,
"message": "Task complete"
}
yield {
"type": "agent_complete",
"message": "Task complete!",
"final_answer": final_text,
"iterations": iteration
}
return
# Parse tool calls
tool_calls = self._parse_tool_calls(assistant_content)
if tool_calls:
messages.append({"role": "assistant", "content": assistant_content})
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call.get("tool", "")
tool_params = tool_call.get("parameters", {})
yield {
"type": "tool_call",
"tool": tool_name,
"input": tool_params,
"message": f"Calling: {tool_name}"
}
try:
result = await self._execute_tool(tool_name, tool_params)
yield {
"type": "tool_result",
"tool": tool_name,
"result": result,
"message": f"{tool_name} completed"
}
tool_results.append({"tool": tool_name, "result": result})
except Exception as e:
yield {
"type": "tool_error",
"tool": tool_name,
"error": str(e)
}
tool_results.append({"tool": tool_name, "error": str(e)})
# Add results to conversation
results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000]
messages.append({"role": "user", "content": results_text})
else:
# No tool calls
yield {
"type": "thought",
"thought": assistant_content,
"message": f"AI: {assistant_content[:100]}..."
}
messages.append({"role": "assistant", "content": assistant_content})
messages.append({"role": "user", "content": "Continue. Use tools to complete the task. Say DONE when finished."})
except Exception as e:
logger.error(f"Error: {e}")
yield {
"type": "agent_error",
"error": str(e),
"message": f"Error: {e}"
}
return
yield {
"type": "agent_max_iterations",
"message": f"Reached max iterations ({max_iterations})",
"iterations": max_iterations
}
async def _call_ollama(self, messages: List[Dict]) -> Dict:
"""
Call Ollama using the official Python client.
Uses ollama.chat() directly as per the guide:
https://github.com/ollama/ollama-python
Temperature=0.0 and top_p=1.0 recommended for Granite models.
"""
try:
from ollama import chat, ResponseError
except ImportError:
raise ImportError("ollama package not installed. Run: pip install ollama")
try:
# Use ollama.chat() directly as shown in the guide
# Run in executor to not block the async event loop
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: chat(
model=self.model,
messages=messages,
options={
"temperature": 0.0, # Deterministic output for tool calling
"top_p": 1.0 # Full probability mass (Granite recommended)
}
)
)
# Extract response content: response.message.content
content = ""
if hasattr(response, 'message') and hasattr(response.message, 'content'):
content = response.message.content
elif isinstance(response, dict):
content = response.get("message", {}).get("content", "")
return {"content": content}
except ResponseError as e:
# Handle Ollama-specific errors (model not available, etc.)
logger.error(f"Ollama ResponseError: {e}")
raise Exception(f"Ollama error: {e}. Make sure Ollama is running and the model '{self.model}' is pulled.")
except Exception as e:
logger.error(f"Ollama call failed: {e}")
raise Exception(f"Ollama error: {e}")
def _parse_tool_calls(self, text: str) -> List[Dict]:
"""Parse tool calls from response"""
import re
tool_calls = []
patterns = [
r'```json\s*(\{[^`]+\})\s*```',
r'```\s*(\{[^`]+\})\s*```',
r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})',
]
for pattern in patterns:
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
try:
data = json.loads(match.strip())
if "tool" in data:
tool_calls.append(data)
except:
continue
return tool_calls
async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
"""Execute an MCP tool"""
if tool_name == "search_web":
query = tool_input.get("query", "")
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(query, max_results=max_results)
return {"results": results[:max_results], "count": len(results[:max_results])}
elif tool_name == "search_news":
query = tool_input.get("query", "")
max_results = tool_input.get("max_results", 5)
results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results)
return {"results": results[:max_results], "count": len(results[:max_results])}
elif tool_name == "save_prospect":
prospect_data = {
"id": tool_input.get("prospect_id", str(uuid.uuid4())),
"company": {
"id": tool_input.get("company_id"),
"name": tool_input.get("company_name"),
"domain": tool_input.get("company_domain")
},
"fit_score": tool_input.get("fit_score", 0),
"status": tool_input.get("status", "new"),
"metadata": tool_input.get("metadata", {})
}
result = await self.mcp_registry.store.save_prospect(prospect_data)
return {"status": result, "prospect_id": prospect_data["id"]}
elif tool_name == "save_company":
company_data = {
"id": tool_input.get("company_id", str(uuid.uuid4())),
"name": tool_input.get("name", ""),
"domain": tool_input.get("domain", ""),
"industry": tool_input.get("industry"),
"description": tool_input.get("description"),
"employee_count": tool_input.get("employee_count")
}
result = await self.mcp_registry.store.save_company(company_data)
return {"status": result, "company_id": company_data["id"]}
elif tool_name == "save_contact":
contact_data = {
"id": tool_input.get("contact_id", str(uuid.uuid4())),
"company_id": tool_input.get("company_id", ""),
"email": tool_input.get("email", ""),
"first_name": tool_input.get("first_name"),
"last_name": tool_input.get("last_name"),
"title": tool_input.get("title"),
"seniority": tool_input.get("seniority")
}
result = await self.mcp_registry.store.save_contact(contact_data)
return {"status": result, "contact_id": contact_data["id"]}
elif tool_name == "save_fact":
fact_data = {
"id": tool_input.get("fact_id", str(uuid.uuid4())),
"company_id": tool_input.get("company_id", ""),
"fact_type": tool_input.get("fact_type", ""),
"content": tool_input.get("content", ""),
"source_url": tool_input.get("source_url"),
"confidence_score": tool_input.get("confidence_score", 0.8)
}
result = await self.mcp_registry.store.save_fact(fact_data)
return {"status": result, "fact_id": fact_data["id"]}
elif tool_name == "send_email":
to = tool_input.get("to", "")
subject = tool_input.get("subject", "")
body = tool_input.get("body", "")
prospect_id = tool_input.get("prospect_id", "")
thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id)
return {"status": "drafted", "thread_id": thread_id, "to": to}
elif tool_name == "list_prospects":
prospects = await self.mcp_registry.store.list_prospects()
return {"prospects": prospects, "count": len(prospects)}
elif tool_name == "get_prospect":
prospect_id = tool_input.get("prospect_id", "")
prospect = await self.mcp_registry.store.get_prospect(prospect_id)
return prospect or {"error": "Not found"}
elif tool_name == "suggest_meeting_slots":
slots = await self.mcp_registry.calendar.suggest_slots()
return {"slots": slots[:3], "count": len(slots[:3])}
else:
raise ValueError(f"Unknown tool: {tool_name}")