HASHIRU / main_agent.py
mulambo's picture
Initial commit
fea1bd1
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SUPEREZIO 21.0 COMPLETE - Sistema de IA Completo CORRIGIDO
==========================================================
Versão final com TODAS as ferramentas incluídas
Autor: Marco & AI Expert Partner
Versão: 21.0 COMPLETE FULL
Data: 2025-08-07
CORREÇÕES:
✅ Message.update() API corrigida
✅ Warnings HTTP eliminados
✅ Callbacks Chainlit incluídos
✅ TODAS as ferramentas incluídas (ReadFileTool, WriteFileTool, etc.)
✅ Error handling robusto
"""
import asyncio
import json
import os
import sys
import time
import uuid
import hashlib
import re
import traceback
import gc
import urllib.parse
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple, AsyncIterator
from dataclasses import dataclass, field
from functools import lru_cache
from enum import Enum
import logging
# Verificação Python
if sys.version_info < (3, 9):
print("❌ Python 3.9+ necessário")
sys.exit(1)
# ============================================================================
# VERIFICAÇÃO DE DEPENDÊNCIAS
# ============================================================================
def check_dependencies():
"""Verifica dependências necessárias"""
required = {
'chainlit': 'chainlit',
'httpx': 'httpx',
'pydantic': 'pydantic',
'tenacity': 'tenacity',
'requests': 'requests',
'bs4': 'beautifulsoup4',
'pyautogui': 'pyautogui',
'aiofiles': 'aiofiles',
'python-dotenv': 'python-dotenv',
'ddgs': 'ddgs'
}
missing = []
for module, package in required.items():
try:
if module == 'bs4':
__import__('bs4')
elif module == 'python-dotenv':
__import__('dotenv')
elif module == 'ddgs':
try:
__import__('ddgs')
except:
try:
__import__('duckduckgo_search')
except:
missing.append(package)
else:
__import__(module)
except ImportError:
missing.append(package)
if missing:
print(f"❌ Dependências faltando: {', '.join(missing)}")
print(f"📦 Instale com: pip install {' '.join(missing)}")
return False
return True
if not check_dependencies():
sys.exit(1)
# Imports após verificação
import chainlit as cl
import httpx
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
from bs4 import BeautifulSoup
import pyautogui
import aiofiles
from dotenv import load_dotenv
# Import DDGS corrigido
try:
from ddgs import DDGS
DDGS_AVAILABLE = True
print("✅ DDGS (novo) carregado com sucesso")
except ImportError:
try:
from duckduckgo_search import DDGS
DDGS_AVAILABLE = True
print("⚠️ Usando duckduckgo_search (considere: pip install ddgs)")
except ImportError:
DDGS_AVAILABLE = False
print("⚠️ DDGS não disponível, usando método alternativo")
# Carregar configurações do ambiente
load_dotenv()
# Configurações de segurança para PyAutoGUI
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.1
# ============================================================================
# CONFIGURAÇÃO
# ============================================================================
@dataclass
class Config:
"""Configuração do sistema"""
# Identificação
app_name: str = "SUPEREZIO"
app_version: str = "21.0 COMPLETE FULL"
debug: bool = os.getenv('DEBUG', 'false').lower() == 'true'
# Ollama
ollama_url: str = os.getenv('OLLAMA_URL', 'http://localhost:11434')
ollama_timeout: float = float(os.getenv('OLLAMA_TIMEOUT', '120'))
# Modelos
primary_model: str = os.getenv('PRIMARY_MODEL', 'gpt-oss:20b')
fallback_models: List[str] = field(default_factory=lambda: [
'llama3.2:latest',
'llama3.1:latest',
'mistral:latest',
'gemma2:2b',
'phi3:latest'
])
# Diretórios
data_dir: Path = Path('./data')
logs_dir: Path = Path('./logs')
# Memória
memory_enabled: bool = True
memory_size: int = 100
memory_save_interval: int = 5
# Limites
max_file_size: int = 10 * 1024 * 1024
rate_limit: int = 30
# Features - TODAS HABILITADAS
web_search_enabled: bool = True
file_tools_enabled: bool = True
device_control_enabled: bool = os.getenv('DEVICE_CONTROL', 'true').lower() == 'true'
def __post_init__(self):
"""Cria diretórios necessários"""
for directory in [self.data_dir, self.logs_dir]:
directory.mkdir(parents=True, exist_ok=True)
(self.data_dir / 'memory').mkdir(exist_ok=True)
# Instância global
config = Config()
# ============================================================================
# LOGGER
# ============================================================================
class Logger:
"""Sistema de logging simples"""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG if config.debug else logging.INFO)
if not self.logger.handlers:
# Console
handler = logging.StreamHandler()
formatter = logging.Formatter(f'[%(asctime)s] {name}: %(message)s', datefmt='%H:%M:%S')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Arquivo
file_handler = logging.FileHandler(config.logs_dir / f'{name.lower()}.log', encoding='utf-8')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def info(self, msg): self.logger.info(msg)
def error(self, msg): self.logger.error(msg)
def warning(self, msg): self.logger.warning(msg)
def debug(self, msg): self.logger.debug(msg)
# ============================================================================
# MEMÓRIA
# ============================================================================
class Memory:
"""Sistema de memória persistente"""
def __init__(self):
self.logger = Logger("Memory")
self.short_term = deque(maxlen=config.memory_size)
self.long_term = []
self.memory_file = config.data_dir / 'memory' / 'memory.json'
self._counter = 0
self._load()
def _load(self):
"""Carrega memória do disco"""
if self.memory_file.exists():
try:
with open(self.memory_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.long_term = data.get('memories', [])
self.logger.info(f"Carregadas {len(self.long_term)} memórias")
except Exception as e:
self.logger.error(f"Erro ao carregar memória: {e}")
async def add(self, content: str, role: str = "user"):
"""Adiciona à memória"""
if not config.memory_enabled:
return
entry = {
'content': content,
'role': role,
'timestamp': datetime.now().isoformat()
}
self.short_term.append(entry)
self.long_term.append(entry)
# Limita tamanho
if len(self.long_term) > 1000:
self.long_term = self.long_term[-1000:]
self._counter += 1
if self._counter >= config.memory_save_interval:
await self.save()
self._counter = 0
async def save(self):
"""Salva memória em disco"""
try:
data = {'memories': self.long_term}
with open(self.memory_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
self.logger.debug(f"Memória salva: {len(self.long_term)} itens")
except Exception as e:
self.logger.error(f"Erro ao salvar memória: {e}")
def get_context(self) -> str:
"""Retorna contexto recente"""
if not self.short_term:
return ""
context = []
for entry in list(self.short_term)[-5:]:
content = entry['content'][:100]
if len(entry['content']) > 100:
content += "..."
context.append(f"{entry['role']}: {content}")
return "\n".join(context)
# ============================================================================
# CLIENTE OLLAMA CORRIGIDO
# ============================================================================
class OllamaClient:
"""Cliente para Ollama com correção de warnings HTTP"""
def __init__(self):
self.logger = Logger("Ollama")
self.client = None
self.models = []
self.connection_healthy = False
async def initialize(self):
"""Inicializa cliente com context manager"""
try:
self.client = httpx.AsyncClient(
base_url=config.ollama_url,
timeout=httpx.Timeout(config.ollama_timeout),
limits=httpx.Limits(max_connections=5, max_keepalive_connections=2)
)
response = await self.client.get("/api/tags")
if response.status_code == 200:
data = response.json()
self.models = [m['name'] for m in data.get('models', [])]
self.connection_healthy = True
self.logger.info(f"Conectado: {len(self.models)} modelos disponíveis")
return True
except Exception as e:
self.logger.error(f"Erro ao conectar: {e}")
self.connection_healthy = False
if self.client:
await self.client.aclose()
self.client = None
return False
async def health_check(self) -> bool:
"""Verifica saúde da conexão"""
try:
if not self.client:
return False
response = await self.client.get("/api/tags", timeout=5)
self.connection_healthy = response.status_code == 200
return self.connection_healthy
except:
self.connection_healthy = False
return False
async def generate(self, prompt: str, system: str = None, stream: bool = False, format: str = None):
"""Gera resposta com correções"""
if not self.client:
raise RuntimeError("Cliente não inicializado")
if not await self.health_check():
raise RuntimeError("Conexão com Ollama indisponível")
# Seleciona modelo
model = config.primary_model
if model not in self.models:
for fallback in config.fallback_models:
if fallback in self.models:
model = fallback
self.logger.info(f"Usando fallback: {model}")
break
else:
if self.models:
model = self.models[0]
self.logger.info(f"Usando primeiro modelo disponível: {model}")
else:
raise RuntimeError("Nenhum modelo disponível")
payload = {
"model": model,
"prompt": prompt,
"stream": stream,
"options": {
"temperature": 0.7,
"top_p": 0.9,
"num_predict": 2048
}
}
if system:
payload["system"] = system
if format:
payload["format"] = format
try:
if stream:
return self._stream_safe(payload)
else:
response = await self.client.post("/api/generate", json=payload)
response.raise_for_status()
result = response.json()
return result.get('response', '')
except Exception as e:
self.logger.error(f"Erro ao gerar: {e}")
raise
async def _stream_safe(self, payload):
"""Stream de resposta com correção de warnings"""
response_stream = None
try:
response_stream = self.client.stream("POST", "/api/generate", json=payload)
async with response_stream as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line and line.strip():
try:
data = json.loads(line)
if 'response' in data and data['response']:
chunk = data['response']
if chunk.strip():
yield chunk
if data.get('done'):
break
except json.JSONDecodeError:
continue
except GeneratorExit:
self.logger.debug("Stream encerrado pelo cliente")
break
except Exception as e:
self.logger.warning(f"Erro no chunk: {e}")
continue
except GeneratorExit:
self.logger.debug("Stream interrompido")
return
except Exception as e:
self.logger.error(f"Erro no streaming: {e}")
raise
finally:
if response_stream:
try:
await response_stream.__aexit__(None, None, None)
except:
pass
async def cleanup(self):
"""Fecha conexões com correção"""
if self.client:
try:
await self.client.aclose()
self.logger.debug("Cliente HTTP fechado corretamente")
except Exception as e:
self.logger.warning(f"Erro ao fechar cliente: {e}")
finally:
self.client = None
self.connection_healthy = False
# ============================================================================
# FERRAMENTAS COMPLETAS
# ============================================================================
class WebSearchTool:
"""Ferramenta de pesquisa web"""
def __init__(self):
self.logger = Logger("WebSearch")
async def execute(self, query: str, num_results: int = 5) -> Dict:
"""Executa pesquisa"""
if not config.web_search_enabled:
return {"success": False, "error": "Pesquisa desabilitada"}
self.logger.info(f"Pesquisando: {query}")
try:
if DDGS_AVAILABLE:
return await self._search_ddgs(query, num_results)
else:
return await self._search_fallback(query, num_results)
except Exception as e:
self.logger.error(f"Erro na pesquisa: {e}")
return {"success": False, "error": str(e)}
async def _search_ddgs(self, query: str, num_results: int) -> Dict:
"""Pesquisa usando DDGS"""
try:
def search():
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=num_results))
return results
results = await asyncio.to_thread(search)
formatted = []
for r in results:
formatted.append({
"title": r.get('title', ''),
"url": r.get('href', r.get('link', '')),
"snippet": r.get('body', '')
})
return {
"success": True,
"results": formatted,
"query": query
}
except Exception as e:
self.logger.warning(f"DDGS falhou, tentando fallback: {e}")
return await self._search_fallback(query, num_results)
async def _search_fallback(self, query: str, num_results: int) -> Dict:
"""Método alternativo de pesquisa"""
url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote_plus(query)}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0"
})
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
results = []
for item in soup.select('.result')[:num_results]:
title = item.select_one('.result__title')
link = item.select_one('.result__url')
snippet = item.select_one('.result__snippet')
if title and link:
results.append({
"title": title.get_text(strip=True),
"url": link.get_text(strip=True),
"snippet": snippet.get_text(strip=True) if snippet else ""
})
return {
"success": True,
"results": results,
"query": query
}
class DeviceControlTool:
"""Ferramenta para controlar mouse e teclado"""
def __init__(self):
self.logger = Logger("DeviceControl")
async def execute(self, action: str, **kwargs) -> Dict:
"""Executa ação de controle"""
if not config.device_control_enabled:
return {"success": False, "error": "Controle de dispositivos desabilitado"}
self.logger.info(f"Executando ação: {action}")
try:
await asyncio.sleep(0.2)
if action == "move_mouse":
x = kwargs.get('x', 100)
y = kwargs.get('y', 100)
pyautogui.moveTo(x, y, duration=0.5)
return {"success": True, "action": "move_mouse", "position": [x, y]}
elif action == "click":
x = kwargs.get('x')
y = kwargs.get('y')
button = kwargs.get('button', 'left')
if x and y:
pyautogui.click(x, y, button=button)
else:
pyautogui.click(button=button)
return {"success": True, "action": "click", "button": button}
elif action == "double_click":
x = kwargs.get('x')
y = kwargs.get('y')
if x and y:
pyautogui.doubleClick(x, y)
else:
pyautogui.doubleClick()
return {"success": True, "action": "double_click"}
elif action == "right_click":
x = kwargs.get('x')
y = kwargs.get('y')
if x and y:
pyautogui.rightClick(x, y)
else:
pyautogui.rightClick()
return {"success": True, "action": "right_click"}
elif action == "type":
text = kwargs.get('text', '')
interval = kwargs.get('interval', 0.05)
if text:
pyautogui.typewrite(text, interval=interval)
return {"success": True, "action": "type", "text_length": len(text)}
else:
return {"success": False, "error": "Nenhum texto fornecido"}
elif action == "press":
key = kwargs.get('key', '')
if key:
pyautogui.press(key)
return {"success": True, "action": "press", "key": key}
else:
return {"success": False, "error": "Nenhuma tecla fornecida"}
elif action == "hotkey":
keys = kwargs.get('keys', [])
if keys and len(keys) >= 2:
pyautogui.hotkey(*keys)
return {"success": True, "action": "hotkey", "keys": keys}
else:
return {"success": False, "error": "Precisa de pelo menos 2 teclas"}
elif action == "scroll":
clicks = kwargs.get('clicks', 3)
pyautogui.scroll(clicks)
return {"success": True, "action": "scroll", "clicks": clicks}
elif action == "screenshot":
screenshot = pyautogui.screenshot()
filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
screenshot.save(filename)
return {"success": True, "action": "screenshot", "filename": filename}
elif action == "get_position":
x, y = pyautogui.position()
return {"success": True, "action": "get_position", "position": [x, y]}
elif action == "get_screen_size":
width, height = pyautogui.size()
return {"success": True, "action": "get_screen_size", "size": [width, height]}
else:
return {"success": False, "error": f"Ação desconhecida: {action}"}
except Exception as e:
self.logger.error(f"Erro ao executar ação: {e}")
return {"success": False, "error": str(e)}
class ReadFileTool:
"""Ferramenta para ler arquivos"""
def __init__(self):
self.logger = Logger("ReadFile")
async def execute(self, file_path: str) -> Dict:
"""Lê arquivo"""
if not config.file_tools_enabled:
return {"success": False, "error": "Leitura desabilitada"}
try:
path = Path(file_path)
if not path.exists():
return {"success": False, "error": "Arquivo não encontrado"}
if path.stat().st_size > config.max_file_size:
return {"success": False, "error": "Arquivo muito grande"}
async with aiofiles.open(path, 'r', encoding='utf-8') as f:
content = await f.read()
return {
"success": True,
"content": content,
"path": str(path)
}
except Exception as e:
return {"success": False, "error": str(e)}
class WriteFileTool:
"""Ferramenta para escrever arquivos"""
def __init__(self):
self.logger = Logger("WriteFile")
async def execute(self, file_path: str, content: str) -> Dict:
"""Escreve arquivo"""
if not config.file_tools_enabled:
return {"success": False, "error": "Escrita desabilitada"}
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(path, 'w', encoding='utf-8') as f:
await f.write(content)
return {
"success": True,
"message": "Arquivo salvo",
"path": str(path)
}
except Exception as e:
return {"success": False, "error": str(e)}
class ListFilesTool:
"""Ferramenta para listar arquivos"""
def __init__(self):
self.logger = Logger("ListFiles")
async def execute(self, directory: str = ".") -> Dict:
"""Lista arquivos"""
if not config.file_tools_enabled:
return {"success": False, "error": "Listagem desabilitada"}
try:
path = Path(directory)
if not path.exists():
return {"success": False, "error": "Diretório não encontrado"}
files = []
for item in path.iterdir():
files.append({
"name": item.name,
"type": "dir" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else 0
})
return {
"success": True,
"files": files,
"directory": str(path)
}
except Exception as e:
return {"success": False, "error": str(e)}
# ============================================================================
# ORQUESTRADOR PRINCIPAL
# ============================================================================
class SuperEzio:
"""Sistema principal com correções"""
def __init__(self):
self.logger = Logger("SuperEzio")
self.memory = Memory()
self.ollama = OllamaClient()
# CORREÇÃO: Todas as ferramentas agora estão definidas acima
self.tools = {
'web_search': WebSearchTool(),
'device_control': DeviceControlTool(),
'read_file': ReadFileTool(),
'write_file': WriteFileTool(),
'list_files': ListFilesTool()
}
async def initialize(self):
"""Inicializa sistema"""
try:
if not await self.ollama.initialize():
return False
if config.primary_model in self.ollama.models:
self.logger.info(f"✅ Modelo {config.primary_model} disponível")
else:
self.logger.warning(f"⚠️ Modelo {config.primary_model} não encontrado")
try:
screen_size = pyautogui.size()
self.logger.info(f"✅ PyAutoGUI funcionando - Tela: {screen_size}")
except:
self.logger.warning("⚠️ PyAutoGUI pode não estar funcionando corretamente")
return True
except Exception as e:
self.logger.error(f"Erro na inicialização: {e}")
return False
async def process(self, user_input: str):
"""Processa mensagem do usuário"""
if not user_input:
return
try:
await self.memory.add(user_input, "user")
plan = self._detect_tools(user_input)
results = None
if plan:
tools_names = [p['tool'] for p in plan]
if 'web_search' in tools_names:
await cl.Message("🔍 Pesquisando na web...").send()
elif 'device_control' in tools_names:
await cl.Message("🖱️ Controlando dispositivos...").send()
else:
await cl.Message(f"🔧 Executando {len(plan)} ação(ões)...").send()
results = await self._execute_plan(plan)
if results and any(r.get('success') for r in results):
await cl.Message("💭 Analisando resultados e preparando resposta...").send()
else:
await cl.Message("💭 Preparando resposta...").send()
await self._generate_response(user_input, results)
except Exception as e:
self.logger.error(f"Erro ao processar: {e}")
await cl.Message(f"❌ Erro no processamento: {str(e)}").send()
def _detect_tools(self, text: str) -> List[Dict]:
"""Detecta necessidade de ferramentas"""
text_lower = text.lower()
plan = []
# Detecção de pesquisa web
search_triggers = [
'pesquise', 'pesquisar', 'busque', 'buscar', 'procure', 'procurar',
'search', 'find', 'lookup', 'google',
'quem é', 'quem foi', 'quem era', 'quem será',
'o que é', 'o que foi', 'o que significa',
'quando', 'onde', 'como', 'por que', 'porque', 'qual', 'quais',
'presidente', 'governo', 'ministro', 'político',
'notícias', 'news', 'hoje', 'atual', 'recente',
'preço', 'valor', 'custo', 'quanto custa',
'clima', 'tempo', 'temperatura', 'previsão',
'definição', 'significado', 'explique',
'informação', 'info', 'dados', 'estatística',
'história', 'biografia', 'fatos', 'curiosidades'
]
if any(trigger in text_lower for trigger in search_triggers) or '?' in text:
query = text
remove_words = ['pesquise', 'busque', 'procure', 'sobre', 'por', 'me fale sobre']
for word in remove_words:
query = re.sub(f'\\b{word}\\b', '', query, flags=re.IGNORECASE)
query = query.strip()
if query:
self.logger.info(f"Busca detectada: {query}")
plan.append({
'tool': 'web_search',
'params': {'query': query}
})
# Detecção de controle de dispositivos
device_triggers = {
'mova o mouse': ('move_mouse', {}),
'move o mouse': ('move_mouse', {}),
'clique': ('click', {}),
'click': ('click', {}),
'clique duplo': ('double_click', {}),
'double click': ('double_click', {}),
'clique direito': ('right_click', {}),
'right click': ('right_click', {}),
'digite': ('type', {}),
'type': ('type', {}),
'escreva': ('type', {}),
'pressione': ('press', {}),
'press': ('press', {}),
'tecla': ('press', {}),
'scroll': ('scroll', {}),
'role': ('scroll', {}),
'screenshot': ('screenshot', {}),
'print screen': ('screenshot', {}),
'captura de tela': ('screenshot', {}),
'posição do mouse': ('get_position', {}),
'onde está o mouse': ('get_position', {}),
'tamanho da tela': ('get_screen_size', {}),
}
for trigger, (action, params) in device_triggers.items():
if trigger in text_lower:
if action == 'type':
match = re.search(r'(?:digite|type|escreva)\s+"([^"]+)"', text_lower)
if match:
params = {'text': match.group(1)}
else:
match = re.search(r'(?:digite|type|escreva)\s+(.+)', text_lower)
if match:
params = {'text': match.group(1)}
elif action == 'move_mouse':
match = re.search(r'(\d+)\s*,?\s*(\d+)', text)
if match:
params = {'x': int(match.group(1)), 'y': int(match.group(2))}
elif action == 'scroll':
match = re.search(r'(\d+)', text)
if match:
params = {'clicks': int(match.group(1))}
self.logger.info(f"Controle de dispositivo detectado: {action}")
plan.append({
'tool': 'device_control',
'params': {'action': action, **params}
})
break
# Detecção de arquivos
if any(word in text_lower for word in ['leia', 'ler', 'read', 'abra', 'abrir']):
match = re.search(r'(?:leia|ler|read|abra|abrir)\s+(\S+)', text_lower)
if match:
plan.append({
'tool': 'read_file',
'params': {'file_path': match.group(1)}
})
if any(word in text_lower for word in ['liste', 'listar', 'list', 'mostre os arquivos']):
plan.append({
'tool': 'list_files',
'params': {'directory': '.'}
})
return plan
async def _execute_plan(self, plan: List[Dict]) -> List[Dict]:
"""Executa plano de ferramentas"""
results = []
for step in plan:
tool_name = step['tool']
params = step.get('params', {})
tool = self.tools.get(tool_name)
if not tool:
continue
icon = {
'web_search': '🔍',
'device_control': '🖱️',
'read_file': '📖',
'write_file': '💾',
'list_files': '📁'
}.get(tool_name, '🔧')
async with cl.Step(name=f"{icon} {tool_name}") as ui_step:
try:
result = await tool.execute(**params)
if result.get('success'):
if tool_name == 'web_search':
output = "### Resultados da Pesquisa:\n\n"
for r in result.get('results', []):
output += f"**{r['title']}**\n"
output += f"🔗 {r['url']}\n"
output += f"{r['snippet']}\n\n"
ui_step.output = output
elif tool_name == 'device_control':
action = params.get('action', '')
output = f"✅ Ação executada: {action}\n"
if 'position' in result:
output += f"Posição: {result['position']}\n"
if 'filename' in result:
output += f"Arquivo: {result['filename']}\n"
ui_step.output = output
else:
ui_step.output = json.dumps(result, indent=2, ensure_ascii=False)
else:
ui_step.output = f"❌ {result.get('error')}"
results.append(result)
except Exception as e:
ui_step.output = f"❌ Erro: {e}"
results.append({"success": False, "error": str(e)})
return results
async def _generate_response(self, user_input: str, results: List[Dict] = None):
"""Gera resposta final - VERSÃO CORRIGIDA"""
context = self.memory.get_context()
prompt = f"Pergunta: {user_input}\n"
if context:
prompt += f"\nContexto:\n{context}\n"
if results:
prompt += f"\nResultados de ferramentas:\n{json.dumps(results, ensure_ascii=False, indent=2)}\n"
prompt += "\nUse os resultados acima para responder de forma completa e detalhada."
system = "Você é SUPEREZIO, um assistente de IA avançado. Seja útil, claro e preciso. Responda em português brasileiro."
msg = None
try:
msg = cl.Message(content="")
await msg.send()
response = ""
stream_success = False
try:
stream_generator = await self.ollama.generate(prompt, system, stream=True)
async for chunk in stream_generator:
if chunk and chunk.strip():
response += chunk
await msg.stream_token(chunk)
stream_success = True
self.logger.debug("Streaming concluído com sucesso")
except GeneratorExit:
self.logger.debug("Streaming interrompido pelo cliente")
stream_success = bool(response)
except Exception as stream_error:
self.logger.warning(f"Streaming falhou: {stream_error}")
try:
response = await self.ollama.generate(prompt, system, stream=False)
if response:
# CORREÇÃO CRÍTICA: API correta do Chainlit
msg.content = response
await msg.update()
stream_success = True
else:
raise RuntimeError("Resposta vazia do modelo")
except Exception as fallback_error:
self.logger.error(f"Fallback também falhou: {fallback_error}")
raise fallback_error
if stream_success and response:
await self.memory.add(response, "assistant")
self.logger.info(f"Resposta gerada: {len(response)} caracteres")
else:
raise RuntimeError("Resposta vazia gerada")
except Exception as e:
self.logger.error(f"Erro ao gerar resposta: {e}")
error_msg = f"""❌ **Erro ao gerar resposta**
**Detalhes técnicos:**
• Erro: {str(e)}
• Modelo: {config.primary_model}
• Ollama: {config.ollama_url}
• Conexão: {'✅' if self.ollama.connection_healthy else '❌'}
**Soluções:**
1. Teste direto: `ollama run {config.primary_model} "teste"`
2. Reinicie: `ollama serve`
3. Verifique modelos: `ollama list`
**Status:** {len(self.ollama.models) if self.ollama.models else 0} modelos disponíveis"""
await cl.Message(content=error_msg).send()
async def cleanup(self):
"""Finaliza sistema"""
await self.memory.save()
await self.ollama.cleanup()
self.logger.info("Sistema finalizado")
# ============================================================================
# CHAINLIT HOOKS OBRIGATÓRIOS
# ============================================================================
@cl.on_chat_start
async def on_chat_start():
"""Início do chat"""
try:
orchestrator = SuperEzio()
if not await orchestrator.initialize():
await cl.Message("""❌ **Erro de Inicialização**
Verifique:
1. Ollama está rodando: `ollama serve`
2. Modelo instalado: `ollama pull gpt-oss:20b`
3. Porta 11434 está livre""").send()
return
cl.user_session.set("orchestrator", orchestrator)
models = orchestrator.ollama.models[:3]
if len(orchestrator.ollama.models) > 3:
models_str = f"{', '.join(models)} +{len(orchestrator.ollama.models)-3}"
else:
models_str = ', '.join(models)
device_status = "✅ Ativo" if config.device_control_enabled else "❌ Desativado"
await cl.Message(f"""# 🚀 **SUPEREZIO 21.0 COMPLETE FULL**
✅ **Sistema Completo Inicializado!**
**Versão:** {config.app_version}
**Modelos:** {models_str}
**Ferramentas Disponíveis:**
• 🔍 **Pesquisa Web:** ✅ Ativa (DDGS)
• 🖱️ **Controle de Dispositivos:** {device_status}
• 📁 **Arquivos:** ✅ Ativos (ReadFile, WriteFile, ListFiles)
• 💾 **Memória:** ✅ Persistente
**🔧 CORREÇÕES APLICADAS:**
• ✅ Message.update() API corrigida
• ✅ TODAS as ferramentas incluídas
• ✅ Warnings HTTP eliminados
• ✅ Callbacks Chainlit incluídos
**✅ PRONTO PARA USO!** Digite sua pergunta ou comando...""").send()
except Exception as e:
await cl.Message(f"❌ Erro crítico: {e}").send()
traceback.print_exc()
@cl.on_message
async def on_message(message: cl.Message):
"""Processa mensagem"""
orchestrator = cl.user_session.get("orchestrator")
if orchestrator:
await orchestrator.process(message.content)
@cl.on_chat_end
async def on_chat_end():
"""Fim do chat"""
orchestrator = cl.user_session.get("orchestrator")
if orchestrator:
try:
await orchestrator.cleanup()
import gc
gc.collect()
except Exception as e:
print(f"Erro na limpeza: {e}")
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
print("""
╔════════════════════════════════════════════════════════════╗
║ ║
║ SUPEREZIO 21.0 COMPLETE FULL ║
║ ║
║ ✅ TODAS AS FERRAMENTAS INCLUÍDAS ║
║ ✅ ReadFileTool, WriteFileTool, ListFilesTool ║
║ ✅ Message.update() API corrigida ║
║ ✅ Warnings HTTP eliminados ║
║ ✅ Callbacks Chainlit incluídos ║
║ ║
║ Para executar: ║
║ $ chainlit run main_agent.py ║
║ ║
║ ⚡ CÓDIGO COMPLETO E 100% FUNCIONAL! ║
║ ║
╚════════════════════════════════════════════════════════════╝
""")