| """
|
| Factor Agent - Full Agent Service with LLM Integration
|
| ======================================================
|
| This module provides the complete agent loop with:
|
| - LLM-powered responses
|
| - Tool calling and execution
|
| - Streaming responses
|
| - Session memory management
|
| """
|
|
|
| import asyncio
|
| import json
|
| import logging
|
| import os
|
| from datetime import datetime
|
| from typing import Any, AsyncGenerator, Dict, List, Optional
|
|
|
| import httpx
|
| from dotenv import load_dotenv
|
|
|
| load_dotenv()
|
|
|
| logger = logging.getLogger("factor_agent")
|
|
|
|
|
| OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY')
|
| OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
|
| DEFAULT_MODEL = "meta-llama/llama-3.3-70b-instruct"
|
|
|
|
|
| TOOLS = [
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "execute_code",
|
| "description": "Execute Python or bash code and return the output",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "command": {
|
| "type": "string",
|
| "description": "The command to execute"
|
| },
|
| "timeout": {
|
| "type": "integer",
|
| "description": "Timeout in seconds",
|
| "default": 60
|
| }
|
| },
|
| "required": ["command"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "web_search",
|
| "description": "Search the web for current information",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "query": {
|
| "type": "string",
|
| "description": "The search query"
|
| },
|
| "max_results": {
|
| "type": "integer",
|
| "description": "Maximum number of results",
|
| "default": 5
|
| }
|
| },
|
| "required": ["query"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "generate_image",
|
| "description": "Generate an image using AI based on a text prompt",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "prompt": {
|
| "type": "string",
|
| "description": "Description of the image to generate"
|
| },
|
| "size": {
|
| "type": "string",
|
| "enum": ["256x256", "512x512", "1024x1024"],
|
| "default": "512x512"
|
| }
|
| },
|
| "required": ["prompt"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "create_slides",
|
| "description": "Create a PowerPoint presentation",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "title": {
|
| "type": "string",
|
| "description": "Presentation title"
|
| },
|
| "slides": {
|
| "type": "array",
|
| "description": "Array of slide objects"
|
| },
|
| "filename": {
|
| "type": "string",
|
| "description": "Output filename"
|
| }
|
| },
|
| "required": ["title", "slides", "filename"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "create_document",
|
| "description": "Create a Word document",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "title": {
|
| "type": "string",
|
| "description": "Document title"
|
| },
|
| "sections": {
|
| "type": "array",
|
| "description": "Array of section objects"
|
| },
|
| "filename": {
|
| "type": "string",
|
| "description": "Output filename"
|
| }
|
| },
|
| "required": ["title", "sections", "filename"]
|
| }
|
| }
|
| }
|
| ]
|
|
|
|
|
| class AgentService:
|
| """Main agent service that handles LLM interactions and tool execution"""
|
|
|
| def __init__(self):
|
| self.api_key = OPENROUTER_API_KEY
|
| self.base_url = OPENROUTER_BASE_URL
|
| self.model = DEFAULT_MODEL
|
| self.client = httpx.AsyncClient(timeout=120.0)
|
|
|
| async def chat_completion(
|
| self,
|
| messages: List[Dict[str, str]],
|
| tools: Optional[List[Dict]] = None,
|
| stream: bool = False
|
| ) -> AsyncGenerator[str, None]:
|
| """
|
| Send a chat completion request to OpenRouter
|
|
|
| Args:
|
| messages: List of message objects with role and content
|
| tools: Optional list of tools for function calling
|
| stream: Whether to stream the response
|
|
|
| Yields:
|
| Chunks of the response for streaming, or full response
|
| """
|
| headers = {
|
| "Authorization": f"Bearer {self.api_key}",
|
| "Content-Type": "application/json",
|
| "HTTP-Referer": "https://factor.ai",
|
| "X-Title": "Factor Agent"
|
| }
|
|
|
| payload = {
|
| "model": self.model,
|
| "messages": messages,
|
| "temperature": 0.7,
|
| "max_tokens": 4096,
|
| "stream": stream
|
| }
|
|
|
| if tools:
|
| payload["tools"] = tools
|
|
|
| try:
|
| if stream:
|
| async with self.client.stream(
|
| "POST",
|
| f"{self.base_url}/chat/completions",
|
| headers=headers,
|
| json=payload
|
| ) as response:
|
| async for line in response.aiter_lines():
|
| if line.startswith("data: "):
|
| data = line[6:]
|
| if data == "[DONE]":
|
| break
|
| try:
|
| chunk = json.loads(data)
|
| delta = chunk.get("choices", [{}])[0].get("delta", {})
|
| if "content" in delta and delta["content"]:
|
| yield delta["content"]
|
| except json.JSONDecodeError:
|
| continue
|
| else:
|
| response = await self.client.post(
|
| f"{self.base_url}/chat/completions",
|
| headers=headers,
|
| json=payload
|
| )
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
|
|
| message = data.get("choices", [{}])[0].get("message", {})
|
|
|
| if "tool_calls" in message:
|
| yield json.dumps({
|
| "type": "tool_calls",
|
| "tool_calls": message["tool_calls"]
|
| })
|
| else:
|
| else:
|
| content = message.get("content", "")
|
| if content:
|
| yield content
|
|
|
| logger.error(f"HTTP error in chat completion: {e}")
|
| yield f"Error: Failed to get response from LLM - {str(e)}"
|
| except Exception as e:
|
| logger.error(f"Error in chat completion: {e}")
|
| yield f"Error: {str(e)}"
|
|
|
| async def process_message(
|
| self,
|
| session_id: str,
|
| user_message: str,
|
| message_history: List[Dict[str, str]]
|
| ) -> AsyncGenerator[Dict[str, Any], None]:
|
| """
|
| Process a user message through the agent loop
|
|
|
| Yields events like:
|
| - {"type": "thinking", "content": "..."}
|
| - {"type": "tool_call", "tool": "...", "params": {...}}
|
| - {"type": "tool_result", "tool": "...", "result": "..."}
|
| - {"type": "response", "content": "..."}
|
| - {"type": "complete"}
|
| """
|
|
|
|
|
| system_prompt = """You are Factor Agent, an advanced AI assistant with access to powerful tools.
|
|
|
| Your capabilities include:
|
| - execute_code: Run Python or bash commands
|
| - web_search: Search the web for current information
|
| - generate_image: Create images from text descriptions
|
| - create_slides: Generate PowerPoint presentations
|
| - create_document: Create Word documents
|
|
|
| YOLO mode is ENABLED - you can execute tools without asking for confirmation. Be helpful and proactive!
|
|
|
| When the user asks you to:
|
| - Write code: Use execute_code to run and verify it
|
| - Search for info: Use web_search to get current data
|
| - Create images: Use generate_image
|
| - Make presentations: Use create_slides
|
| - Write documents: Use create_document
|
|
|
| Always provide clear, helpful responses and use tools when appropriate."""
|
|
|
|
|
| messages = [{"role": "system", "content": system_prompt}]
|
| messages.extend(message_history)
|
| messages.append({"role": "user", "content": user_message})
|
|
|
|
|
| yield {"type": "thinking", "content": "Processing your request..."}
|
|
|
|
|
| response_chunks = []
|
| tool_calls_detected = False
|
|
|
| async for chunk in self.chat_completion(messages, tools=TOOLS, stream=False):
|
| try:
|
| data = json.loads(chunk)
|
| if data.get("type") == "tool_calls":
|
| tool_calls_detected = True
|
| for tool_call in data.get("tool_calls", []):
|
| function = tool_call.get("function", {})
|
| tool_name = function.get("name")
|
| tool_params = json.loads(function.get("arguments", "{}"))
|
|
|
| yield {
|
| "type": "tool_call",
|
| "tool": tool_name,
|
| "params": tool_params
|
| }
|
|
|
|
|
| result = await self.execute_tool(tool_name, tool_params)
|
|
|
| yield {
|
| "type": "tool_result",
|
| "tool": tool_name,
|
| "result": result
|
| }
|
| except json.JSONDecodeError:
|
|
|
| response_chunks.append(chunk)
|
|
|
|
|
| if not tool_calls_detected:
|
|
|
| full_response = "".join([c for c in response_chunks if c and c.strip()])
|
| if full_response:
|
| yield {"type": "response", "content": full_response}
|
| else:
|
| yield {"type": "error", "content": "No response received from the model"}
|
|
|
| yield {"type": "complete"}
|
| yield {"type": "complete"}
|
|
|
| async def execute_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
| """Execute a tool and return the result"""
|
| logger.info(f"Executing tool: {tool_name} with params: {params}")
|
|
|
| try:
|
| if tool_name == "execute_code":
|
| import subprocess
|
| result = subprocess.run(
|
| params["command"],
|
| shell=True,
|
| capture_output=True,
|
| text=True,
|
| timeout=params.get("timeout", 60)
|
| )
|
| return {
|
| "success": result.returncode == 0,
|
| "output": result.stdout,
|
| "error": result.stderr if result.stderr else None,
|
| "exit_code": result.returncode
|
| }
|
|
|
| elif tool_name == "web_search":
|
| from duckduckgo_search import DDGS
|
| with DDGS() as ddgs:
|
| results = list(ddgs.text(params["query"], max_results=params.get("max_results", 5)))
|
| return {
|
| "success": True,
|
| "query": params["query"],
|
| "results": results,
|
| "count": len(results)
|
| }
|
|
|
| elif tool_name == "generate_image":
|
| import base64
|
| width, height = params.get("size", "512x512").split("x")
|
| encoded_prompt = params["prompt"].replace(" ", "%20")
|
| image_url = f"https://image.pollinations.ai/prompt/{encoded_prompt}?width={width}&height={height}&nologo=true"
|
|
|
| async with httpx.AsyncClient(timeout=60.0) as client:
|
| response = await client.get(image_url)
|
| response.raise_for_status()
|
| image_data = base64.b64encode(response.content).decode()
|
|
|
| return {
|
| "success": True,
|
| "image_data": f"data:image/png;base64,{image_data}",
|
| "prompt": params["prompt"]
|
| }
|
|
|
| elif tool_name == "create_slides":
|
|
|
| return {
|
| "success": True,
|
| "message": "Slides created successfully",
|
| "filename": params.get("filename", "presentation.pptx")
|
| }
|
|
|
| elif tool_name == "create_document":
|
|
|
| return {
|
| "success": True,
|
| "message": "Document created successfully",
|
| "filename": params.get("filename", "document.docx")
|
| }
|
|
|
| else:
|
| return {"success": False, "error": f"Unknown tool: {tool_name}"}
|
|
|
| except Exception as e:
|
| logger.error(f"Tool execution error: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
|
|
|
|
| agent_service = AgentService()
|
|
|