HASHIRU / agent_models.py
mulambo's picture
Initial commit
fea1bd1
# main_agent.py
# -*- coding: utf-8 -*-
"""
HASHIRU 6.8 - ARQUITETURA FINAL ESTÁVEL
Esta versão finaliza a separação de responsabilidades, movendo os
modelos de dados para seu próprio arquivo e estabilizando a arquitetura.
"""
from __future__ import annotations
import traceback
import sys
import pathlib
import logging
import json
import chainlit as cl
import httpx
# ----------------------------------------------------------------------
# 1. CONFIGURAÇÃO INICIAL E DEPENDÊNCIAS
# ----------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - [%(levelname)s] - %(message)s")
logger = logging.getLogger("HASHIRU_AGENT")
ROOT_PATH = pathlib.Path(__file__).resolve().parent
if str(ROOT_PATH) not in sys.path:
sys.path.insert(0, str(ROOT_PATH))
# --- Importações do Projeto ---
from autonomous_config import config, Config
from agent_tools import TOOL_REGISTRY, get_tools_description_for_llm
# >>> NOVO: Importa as estruturas de dados do seu próprio arquivo <<<
from agent_models import Task, ToolCall
# ----------------------------------------------------------------------
# 2. CLASSES DO AGENTE (Sem as definições de dataclass)
# ----------------------------------------------------------------------
class Planner:
"""Cria um plano de ação (uma lista de ToolCalls) para resolver a solicitação."""
def __init__(self, config: Config, http_client: httpx.AsyncClient):
self.config = config
self.http_client = http_client
self.system_prompt = (
"Você é um planejador para um agente de IA. Sua tarefa é decompor a solicitação do usuário em uma sequência de passos, onde cada passo é uma chamada a uma ferramenta. "
"As ferramentas disponíveis são:\n{tools}\n\n"
"Responda APENAS com um objeto JSON contendo uma chave 'plan', que é uma lista de ações. "
"Cada ação na lista deve ter 'tool_name' e 'tool_input'. Exemplo de resposta: "
'{"plan": [{"tool_name": "read_file", "tool_input": {"file_path": "main.py"}}]}'
)
async def create_task_with_plan(self, user_input: str) -> Task:
"""Usa o LLM para criar uma Task completa com um plano de ferramentas."""
model = self.config.get_model("reasoning")
prompt = self.system_prompt.format(tools=get_tools_description_for_llm())
try:
response_json = await self._call_ollama(model, f"Crie um plano para: '{user_input}'", prompt)
data = json.loads(response_json)
plan_data = data.get("plan", [])
plan = [ToolCall(**call) for call in plan_data]
return Task("tool_based_plan", "Executar plano de ferramentas", user_input, plan)
except (json.JSONDecodeError, httpx.RequestError, TypeError) as e:
logger.error(f"Falha ao criar plano, tratando como conversa. Erro: {e}", exc_info=True)
return Task("general_conversation", "Conversa geral", user_input, [])
async def _call_ollama(self, model: str, prompt: str, system: str) -> str:
payload = {"model": model, "prompt": prompt, "system": system, "stream": False, "format": "json"}
r = await self.http_client.post(f"{self.config.ollama.base_url}/api/generate", json=payload)
r.raise_for_status()
return r.json().get("response", "{}")
class Executor:
"""Executa um plano de ToolCalls em sequência."""
def __init__(self, config: Config):
self.config = config
async def execute_plan(self, plan: list[ToolCall]) -> str:
"""Itera sobre um plano e executa cada chamada de ferramenta."""
if not plan:
return "## ✅ Plano Concluído\nNenhuma ação foi planejada, pois a tarefa não exigia ferramentas."
results = []
for i, tool_call in enumerate(plan):
async with cl.Step(name=f"Passo {i+1}: `{tool_call.tool_name}`") as step:
step.input = str(tool_call.tool_input)
tool = TOOL_REGISTRY.get(tool_call.tool_name)
if not tool:
output = f"Erro: Ferramenta '{tool_call.tool_name}' não encontrada."
else:
try:
output = tool.execute(tool_call.tool_input)
except Exception as e:
output = f"Erro ao executar a ferramenta '{tool.name}': {e}"
step.output = output
results.append(f"#### ✅ Passo {i+1}: `{tool.name}`\n**Resultado:**\n```\n{output}\n```")
return "## 📝 Relatório Final de Execução\n\n" + "\n\n---\n\n".join(results)
class AgentSession:
"""Gerencia o ciclo de vida completo de uma interação."""
def __init__(self, config: Config):
self.config = config
self.http_client = httpx.AsyncClient(timeout=config.ollama.timeout)
self.planner = Planner(config, self.http_client)
self.executor = Executor(config)
async def handle_message(self, user_input: str):
"""Orquestra o fluxo de Planejar -> Executar."""
await cl.Message(content=self.config.processing_message).send()
task = await self.planner.create_task_with_plan(user_input)
if not task.plan:
plan_summary = "Não foi criado um plano de ferramentas. Tentando responder diretamente..."
await cl.Message(content=plan_summary).send()
# Fallback para uma conversa simples se o planejamento falhar
# (Em uma versão futura, isso também poderia ser uma chamada ao LLM)
response = "Desculpe, não consegui transformar sua solicitação em um plano de ação. Pode tentar ser mais específico ou pedir algo que use as ferramentas disponíveis?"
else:
plan_summary = "**Plano de Ação:**\n" + "\n".join([f"- Usar `{tc.tool_name}`" for tc in task.plan])
await cl.Message(content=plan_summary).send()
await cl.Message(content=self.config.executing_message).send()
response = await self.executor.execute_plan(task.plan)
await cl.Message(content=response).send()
async def cleanup(self):
await self.http_client.aclose()
logger.info("Sessão do agente finalizada.")
# ----------------------------------------------------------------------
# 3. HOOKS DO CHAINLIT (A Camada de Interface)
# ----------------------------------------------------------------------
@cl.on_chat_start
async def on_chat_start():
agent_session = AgentSession(config)
cl.user_session.set("agent_session", agent_session)
await cl.Message(content=config.startup_banner).send()
@cl.on_message
async def on_message(message: cl.Message):
agent = cl.user_session.get("agent_session")
user_input = (message.content or "").strip()
if not user_input or not agent: return
try:
await agent.handle_message(user_input)
except Exception:
tb = traceback.format_exc()
logger.error(f"Erro irrecuperável no fluxo principal:\n{tb}")
await cl.Message(content=f"💥 **Erro Inesperado no Sistema:**\n```\n{tb}\n```").send()
@cl.on_chat_end
async def on_chat_end():
if agent := cl.user_session.get("agent_session"):
await agent.cleanup()