water3 / agent /agent_service.py
onewayto's picture
Upload 187 files
070daf8 verified
"""
Factor Agent - Full Agent Service with LLM Integration
======================================================
This module provides the complete agent loop with:
- LLM-powered responses
- Tool calling and execution
- Streaming responses
- Session memory management
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import Any, AsyncGenerator, Dict, List, Optional
import httpx
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("factor_agent")
# OpenRouter configuration
OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY')
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
DEFAULT_MODEL = "meta-llama/llama-3.3-70b-instruct"
# Available tools for the agent
TOOLS = [
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Execute Python or bash code and return the output",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The command to execute"
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds",
"default": 60
}
},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for current information",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results",
"default": 5
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "generate_image",
"description": "Generate an image using AI based on a text prompt",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Description of the image to generate"
},
"size": {
"type": "string",
"enum": ["256x256", "512x512", "1024x1024"],
"default": "512x512"
}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "create_slides",
"description": "Create a PowerPoint presentation",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Presentation title"
},
"slides": {
"type": "array",
"description": "Array of slide objects"
},
"filename": {
"type": "string",
"description": "Output filename"
}
},
"required": ["title", "slides", "filename"]
}
}
},
{
"type": "function",
"function": {
"name": "create_document",
"description": "Create a Word document",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Document title"
},
"sections": {
"type": "array",
"description": "Array of section objects"
},
"filename": {
"type": "string",
"description": "Output filename"
}
},
"required": ["title", "sections", "filename"]
}
}
}
]
class AgentService:
"""Main agent service that handles LLM interactions and tool execution"""
def __init__(self):
self.api_key = OPENROUTER_API_KEY
self.base_url = OPENROUTER_BASE_URL
self.model = DEFAULT_MODEL
self.client = httpx.AsyncClient(timeout=120.0)
async def chat_completion(
self,
messages: List[Dict[str, str]],
tools: Optional[List[Dict]] = None,
stream: bool = False
) -> AsyncGenerator[str, None]:
"""
Send a chat completion request to OpenRouter
Args:
messages: List of message objects with role and content
tools: Optional list of tools for function calling
stream: Whether to stream the response
Yields:
Chunks of the response for streaming, or full response
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://factor.ai",
"X-Title": "Factor Agent"
}
payload = {
"model": self.model,
"messages": messages,
"temperature": 0.7,
"max_tokens": 4096,
"stream": stream
}
if tools:
payload["tools"] = tools
try:
if stream:
async with self.client.stream(
"POST",
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
delta = chunk.get("choices", [{}])[0].get("delta", {})
if "content" in delta and delta["content"]:
yield delta["content"]
except json.JSONDecodeError:
continue
else:
response = await self.client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
# Check for tool calls
message = data.get("choices", [{}])[0].get("message", {})
if "tool_calls" in message:
yield json.dumps({
"type": "tool_calls",
"tool_calls": message["tool_calls"]
})
else:
else:
content = message.get("content", "")
if content: # Only yield non-empty content
yield content
logger.error(f"HTTP error in chat completion: {e}")
yield f"Error: Failed to get response from LLM - {str(e)}"
except Exception as e:
logger.error(f"Error in chat completion: {e}")
yield f"Error: {str(e)}"
async def process_message(
self,
session_id: str,
user_message: str,
message_history: List[Dict[str, str]]
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Process a user message through the agent loop
Yields events like:
- {"type": "thinking", "content": "..."}
- {"type": "tool_call", "tool": "...", "params": {...}}
- {"type": "tool_result", "tool": "...", "result": "..."}
- {"type": "response", "content": "..."}
- {"type": "complete"}
"""
# Build system prompt
system_prompt = """You are Factor Agent, an advanced AI assistant with access to powerful tools.
Your capabilities include:
- execute_code: Run Python or bash commands
- web_search: Search the web for current information
- generate_image: Create images from text descriptions
- create_slides: Generate PowerPoint presentations
- create_document: Create Word documents
YOLO mode is ENABLED - you can execute tools without asking for confirmation. Be helpful and proactive!
When the user asks you to:
- Write code: Use execute_code to run and verify it
- Search for info: Use web_search to get current data
- Create images: Use generate_image
- Make presentations: Use create_slides
- Write documents: Use create_document
Always provide clear, helpful responses and use tools when appropriate."""
# Build messages
messages = [{"role": "system", "content": system_prompt}]
messages.extend(message_history)
messages.append({"role": "user", "content": user_message})
# Yield thinking event
yield {"type": "thinking", "content": "Processing your request..."}
# Get response from LLM with tools
response_chunks = []
tool_calls_detected = False
async for chunk in self.chat_completion(messages, tools=TOOLS, stream=False):
try:
data = json.loads(chunk)
if data.get("type") == "tool_calls":
tool_calls_detected = True
for tool_call in data.get("tool_calls", []):
function = tool_call.get("function", {})
tool_name = function.get("name")
tool_params = json.loads(function.get("arguments", "{}"))
yield {
"type": "tool_call",
"tool": tool_name,
"params": tool_params
}
# Execute the tool
result = await self.execute_tool(tool_name, tool_params)
yield {
"type": "tool_result",
"tool": tool_name,
"result": result
}
except json.JSONDecodeError:
# Regular text response
response_chunks.append(chunk)
# If no tool calls, yield the regular response
if not tool_calls_detected:
# Filter out empty strings and join
full_response = "".join([c for c in response_chunks if c and c.strip()])
if full_response:
yield {"type": "response", "content": full_response}
else:
yield {"type": "error", "content": "No response received from the model"}
yield {"type": "complete"}
yield {"type": "complete"}
async def execute_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool and return the result"""
logger.info(f"Executing tool: {tool_name} with params: {params}")
try:
if tool_name == "execute_code":
import subprocess
result = subprocess.run(
params["command"],
shell=True,
capture_output=True,
text=True,
timeout=params.get("timeout", 60)
)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr if result.stderr else None,
"exit_code": result.returncode
}
elif tool_name == "web_search":
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(params["query"], max_results=params.get("max_results", 5)))
return {
"success": True,
"query": params["query"],
"results": results,
"count": len(results)
}
elif tool_name == "generate_image":
import base64
width, height = params.get("size", "512x512").split("x")
encoded_prompt = params["prompt"].replace(" ", "%20")
image_url = f"https://image.pollinations.ai/prompt/{encoded_prompt}?width={width}&height={height}&nologo=true"
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(image_url)
response.raise_for_status()
image_data = base64.b64encode(response.content).decode()
return {
"success": True,
"image_data": f"data:image/png;base64,{image_data}",
"prompt": params["prompt"]
}
elif tool_name == "create_slides":
# This would call the slides tool
return {
"success": True,
"message": "Slides created successfully",
"filename": params.get("filename", "presentation.pptx")
}
elif tool_name == "create_document":
# This would call the document tool
return {
"success": True,
"message": "Document created successfully",
"filename": params.get("filename", "document.docx")
}
else:
return {"success": False, "error": f"Unknown tool: {tool_name}"}
except Exception as e:
logger.error(f"Tool execution error: {e}")
return {"success": False, "error": str(e)}
# Global agent service instance
agent_service = AgentService()