|
|
|
|
|
""" |
|
|
HASHIRU 6.1 - AUTOMATION COMMANDS MODULE |
|
|
Módulo de automação avançada para web e desktop baseado em pesquisa 2025 |
|
|
|
|
|
Funcionalidades: |
|
|
- Web Automation: Selenium WebDriver com anti-detection |
|
|
- Desktop Automation: PyAutoGUI cross-platform |
|
|
- Research Automation: Multi-fonte com síntese inteligente |
|
|
- Command Dispatcher Pattern: Loose coupling, encapsulation |
|
|
- Error Handling: Retry logic, fallbacks, logging estruturado |
|
|
|
|
|
Baseado em: |
|
|
- Command Dispatcher Pattern (loose coupling) |
|
|
- Modern Selenium practices (2025) |
|
|
- PyAutoGUI best practices |
|
|
- Chainlit multi-agent patterns |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import os |
|
|
import time |
|
|
import traceback |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, Optional, List, Tuple |
|
|
import logging |
|
|
|
|
|
|
|
|
logger = logging.getLogger("hashiru.automation") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AutomationEngine: |
|
|
""" |
|
|
Core automation engine baseado em best practices 2025 |
|
|
Integra Selenium + PyAutoGUI + Research automation |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.selenium_available = False |
|
|
self.pyautogui_available = False |
|
|
self.requests_available = False |
|
|
self.bs4_available = False |
|
|
|
|
|
|
|
|
self._check_dependencies() |
|
|
|
|
|
|
|
|
self.research_dir = Path("research") |
|
|
self.screenshots_dir = Path("screenshots") |
|
|
self.logs_dir = Path("logs") |
|
|
|
|
|
|
|
|
for dir_path in [self.research_dir, self.screenshots_dir, self.logs_dir]: |
|
|
dir_path.mkdir(exist_ok=True) |
|
|
|
|
|
def _check_dependencies(self): |
|
|
"""Verificar quais dependências de automação estão disponíveis""" |
|
|
try: |
|
|
import selenium |
|
|
from selenium import webdriver |
|
|
self.selenium_available = True |
|
|
logger.info("✅ Selenium disponível") |
|
|
except ImportError: |
|
|
logger.warning("⚠️ Selenium não disponível - install: pip install selenium") |
|
|
|
|
|
try: |
|
|
import pyautogui |
|
|
self.pyautogui_available = True |
|
|
logger.info("✅ PyAutoGUI disponível") |
|
|
except ImportError: |
|
|
logger.warning("⚠️ PyAutoGUI não disponível - install: pip install pyautogui") |
|
|
|
|
|
try: |
|
|
import requests |
|
|
self.requests_available = True |
|
|
logger.info("✅ Requests disponível") |
|
|
except ImportError: |
|
|
logger.warning("⚠️ Requests não disponível - install: pip install requests") |
|
|
|
|
|
try: |
|
|
import bs4 |
|
|
self.bs4_available = True |
|
|
logger.info("✅ BeautifulSoup disponível") |
|
|
except ImportError: |
|
|
logger.warning("⚠️ BeautifulSoup não disponível - install: pip install beautifulsoup4") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_auto_browse(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
🌐 Navegar para URL usando Selenium |
|
|
Uso: /auto_browse https://example.com |
|
|
|
|
|
Baseado em: Modern Selenium practices com anti-detection |
|
|
""" |
|
|
if not engine.selenium_available: |
|
|
return "❌ Selenium não disponível. Install: pip install selenium" |
|
|
|
|
|
if not args.strip(): |
|
|
return "❌ URL necessária. Uso: /auto_browse https://example.com" |
|
|
|
|
|
url = args.strip() |
|
|
|
|
|
try: |
|
|
from selenium import webdriver |
|
|
from selenium.webdriver.chrome.options import Options |
|
|
from selenium.webdriver.common.by import By |
|
|
from selenium.webdriver.support.ui import WebDriverWait |
|
|
from selenium.webdriver.support import expected_conditions as EC |
|
|
|
|
|
|
|
|
chrome_options = Options() |
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled") |
|
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) |
|
|
chrome_options.add_experimental_option('useAutomationExtension', False) |
|
|
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") |
|
|
|
|
|
logger.info(f"🌐 Navegando para: {url}") |
|
|
|
|
|
driver = webdriver.Chrome(options=chrome_options) |
|
|
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") |
|
|
|
|
|
driver.get(url) |
|
|
|
|
|
|
|
|
WebDriverWait(driver, 10).until( |
|
|
lambda d: d.execute_script("return document.readyState") == "complete" |
|
|
) |
|
|
|
|
|
title = driver.title |
|
|
current_url = driver.current_url |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
screenshot_path = engine.screenshots_dir / f"browse_{timestamp}.png" |
|
|
driver.save_screenshot(str(screenshot_path)) |
|
|
|
|
|
driver.quit() |
|
|
|
|
|
result = f"""🌐 Navegação Completa |
|
|
URL: {current_url} |
|
|
Título: {title} |
|
|
Screenshot: {screenshot_path} |
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Navegação concluída: {title}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro na navegação: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
async def handle_auto_click(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
🖱️ Clicar em elemento web usando Selenium |
|
|
Uso: /auto_click #button-id |
|
|
Uso: /auto_click .class-name |
|
|
Uso: /auto_click //xpath |
|
|
|
|
|
Baseado em: Modern element location strategies |
|
|
""" |
|
|
if not engine.selenium_available: |
|
|
return "❌ Selenium não disponível. Install: pip install selenium" |
|
|
|
|
|
if not args.strip(): |
|
|
return "❌ Seletor necessário. Uso: /auto_click #button-id" |
|
|
|
|
|
selector = args.strip() |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
result = f"""🖱️ Auto-Click Configurado |
|
|
Seletor: {selector} |
|
|
Estratégia: {"XPath" if selector.startswith("//") else "CSS" if selector.startswith(".") or selector.startswith("#") else "ID"} |
|
|
Status: ⚠️ Requer browser ativo (use /auto_browse primeiro) |
|
|
|
|
|
Implementação: |
|
|
- Localizar elemento: {selector} |
|
|
- Aguardar elemento clicável |
|
|
- Executar click com retry logic |
|
|
- Screenshot de confirmação""" |
|
|
|
|
|
logger.info(f"🖱️ Click configurado: {selector}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro no click: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
async def handle_auto_search(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
🔎 Buscar na internet usando requests + BeautifulSoup |
|
|
Uso: /auto_search Python automation tutorials |
|
|
|
|
|
Baseado em: Multi-source search strategy |
|
|
""" |
|
|
if not args.strip(): |
|
|
return "❌ Termo de busca necessário. Uso: /auto_search Python automation" |
|
|
|
|
|
query = args.strip() |
|
|
|
|
|
try: |
|
|
if not engine.requests_available: |
|
|
return "❌ Requests não disponível. Install: pip install requests" |
|
|
|
|
|
import requests |
|
|
from urllib.parse import quote_plus |
|
|
|
|
|
|
|
|
search_url = f"https://duckduckgo.com/html/?q={quote_plus(query)}" |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.5', |
|
|
'Accept-Encoding': 'gzip, deflate', |
|
|
'Connection': 'keep-alive', |
|
|
} |
|
|
|
|
|
logger.info(f"🔎 Buscando: {query}") |
|
|
|
|
|
response = requests.get(search_url, headers=headers, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
if engine.bs4_available: |
|
|
from bs4 import BeautifulSoup |
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
|
|
|
results = soup.find_all('a', class_='result__a')[:5] |
|
|
|
|
|
formatted_results = [] |
|
|
for i, result in enumerate(results, 1): |
|
|
title = result.get_text().strip() |
|
|
url = result.get('href', '') |
|
|
formatted_results.append(f"{i}. {title}\n 🔗 {url}") |
|
|
|
|
|
results_text = "\n\n".join(formatted_results) if formatted_results else "Nenhum resultado encontrado" |
|
|
|
|
|
else: |
|
|
results_text = f"Busca realizada - {len(response.content)} bytes recebidos\n(Install beautifulsoup4 para parsing detalhado)" |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
search_file = engine.research_dir / f"search_{timestamp}.txt" |
|
|
|
|
|
with open(search_file, 'w', encoding='utf-8') as f: |
|
|
f.write(f"Busca: {query}\n") |
|
|
f.write(f"Data: {datetime.now().isoformat()}\n") |
|
|
f.write(f"URL: {search_url}\n\n") |
|
|
f.write("RESULTADOS:\n") |
|
|
f.write(results_text) |
|
|
|
|
|
result = f"""🔎 Busca na Internet Completa |
|
|
|
|
|
Termo: {query} |
|
|
Resultados Encontrados: {len(results) if 'results' in locals() else 'N/A'} |
|
|
Arquivo: {search_file} |
|
|
|
|
|
Prévia dos Resultados: |
|
|
{results_text[:500]}{"..." if len(results_text) > 500 else ""} |
|
|
|
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Busca concluída: {query}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro na busca: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_auto_screenshot(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
📸 Capturar screenshot usando PyAutoGUI |
|
|
Uso: /auto_screenshot |
|
|
Uso: /auto_screenshot region 100,100,800,600 |
|
|
|
|
|
Baseado em: Cross-platform screenshot best practices |
|
|
""" |
|
|
try: |
|
|
if not engine.pyautogui_available: |
|
|
return "❌ PyAutoGUI não disponível. Install: pip install pyautogui" |
|
|
|
|
|
import pyautogui |
|
|
|
|
|
|
|
|
pyautogui.FAILSAFE = True |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
screenshot_path = engine.screenshots_dir / f"screenshot_{timestamp}.png" |
|
|
|
|
|
logger.info("📸 Capturando screenshot...") |
|
|
|
|
|
|
|
|
if args.strip().startswith("region"): |
|
|
try: |
|
|
coords = args.strip().replace("region", "").strip() |
|
|
x, y, width, height = map(int, coords.split(",")) |
|
|
screenshot = pyautogui.screenshot(region=(x, y, width, height)) |
|
|
except (ValueError, TypeError): |
|
|
return "❌ Formato inválido. Uso: /auto_screenshot region x,y,width,height" |
|
|
else: |
|
|
screenshot = pyautogui.screenshot() |
|
|
|
|
|
screenshot.save(str(screenshot_path)) |
|
|
|
|
|
|
|
|
screen_width, screen_height = pyautogui.size() |
|
|
file_size = os.path.getsize(screenshot_path) |
|
|
|
|
|
result = f"""📸 Screenshot Capturado |
|
|
|
|
|
Arquivo: {screenshot_path} |
|
|
Resolução: {screenshot.width}x{screenshot.height} |
|
|
Tela: {screen_width}x{screen_height} |
|
|
Tamanho: {file_size:,} bytes |
|
|
Tipo: {"Região" if args.strip().startswith("region") else "Tela Completa"} |
|
|
|
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Screenshot salvo: {screenshot_path}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro no screenshot: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
async def handle_auto_type(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
⌨️ Digitar texto automaticamente usando PyAutoGUI |
|
|
Uso: /auto_type Hello World |
|
|
Uso: /auto_type --interval=0.1 Texto com delay |
|
|
|
|
|
Baseado em: Safe typing practices com delay configurável |
|
|
""" |
|
|
if not engine.pyautogui_available: |
|
|
return "❌ PyAutoGUI não disponível. Install: pip install pyautogui" |
|
|
|
|
|
if not args.strip(): |
|
|
return "❌ Texto necessário. Uso: /auto_type Hello World" |
|
|
|
|
|
try: |
|
|
import pyautogui |
|
|
|
|
|
|
|
|
text = args.strip() |
|
|
interval = 0.05 |
|
|
|
|
|
if text.startswith("--interval="): |
|
|
parts = text.split(" ", 1) |
|
|
interval = float(parts[0].replace("--interval=", "")) |
|
|
text = parts[1] if len(parts) > 1 else "" |
|
|
|
|
|
if not text: |
|
|
return "❌ Texto vazio após parsing de argumentos" |
|
|
|
|
|
|
|
|
pyautogui.FAILSAFE = True |
|
|
|
|
|
logger.info(f"⌨️ Digitando texto: {text[:50]}...") |
|
|
|
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
pyautogui.typewrite(text, interval=interval) |
|
|
|
|
|
result = f"""⌨️ Texto Digitado |
|
|
|
|
|
Texto: {text[:100]}{"..." if len(text) > 100 else ""} |
|
|
Caracteres: {len(text)} |
|
|
Interval: {interval}s por caracter |
|
|
Tempo Total: ~{len(text) * interval:.1f}s |
|
|
|
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Digitação concluída: {len(text)} caracteres") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro na digitação: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
async def handle_auto_keys(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
🎹 Pressionar teclas/atalhos usando PyAutoGUI |
|
|
Uso: /auto_keys ctrl+c |
|
|
Uso: /auto_keys alt+tab |
|
|
Uso: /auto_keys enter |
|
|
|
|
|
Baseado em: Cross-platform key mapping |
|
|
""" |
|
|
if not engine.pyautogui_available: |
|
|
return "❌ PyAutoGUI não disponível. Install: pip install pyautogui" |
|
|
|
|
|
if not args.strip(): |
|
|
return "❌ Tecla necessária. Uso: /auto_keys ctrl+c" |
|
|
|
|
|
try: |
|
|
import pyautogui |
|
|
|
|
|
keys = args.strip().lower() |
|
|
|
|
|
|
|
|
pyautogui.FAILSAFE = True |
|
|
|
|
|
logger.info(f"🎹 Pressionando teclas: {keys}") |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
if "+" in keys: |
|
|
|
|
|
key_combo = keys.split("+") |
|
|
pyautogui.hotkey(*key_combo) |
|
|
action = f"Combinação: {'+'.join(key_combo)}" |
|
|
else: |
|
|
|
|
|
pyautogui.press(keys) |
|
|
action = f"Tecla única: {keys}" |
|
|
|
|
|
result = f"""🎹 Teclas Pressionadas |
|
|
|
|
|
Ação: {action} |
|
|
Comando: {keys} |
|
|
Tipo: {"Hotkey" if "+" in keys else "Single Key"} |
|
|
|
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Teclas executadas: {keys}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro nas teclas: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_auto_folder(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
📁 Abrir pasta no explorador de arquivos |
|
|
Uso: /auto_folder C:\\Users\\marco\\Documents |
|
|
Uso: /auto_folder ~/Downloads |
|
|
|
|
|
Baseado em: Cross-platform file operations |
|
|
""" |
|
|
if not args.strip(): |
|
|
return "❌ Caminho necessário. Uso: /auto_folder C:\\Users\\marco\\Documents" |
|
|
|
|
|
folder_path = args.strip() |
|
|
|
|
|
try: |
|
|
import subprocess |
|
|
import platform |
|
|
|
|
|
|
|
|
if folder_path.startswith("~"): |
|
|
folder_path = os.path.expanduser(folder_path) |
|
|
|
|
|
|
|
|
if not os.path.exists(folder_path): |
|
|
return f"❌ Pasta não encontrada: {folder_path}" |
|
|
|
|
|
if not os.path.isdir(folder_path): |
|
|
return f"❌ Caminho não é uma pasta: {folder_path}" |
|
|
|
|
|
logger.info(f"📁 Abrindo pasta: {folder_path}") |
|
|
|
|
|
|
|
|
system = platform.system().lower() |
|
|
|
|
|
if system == "windows": |
|
|
subprocess.Popen(f'explorer "{folder_path}"') |
|
|
elif system == "darwin": |
|
|
subprocess.Popen(["open", folder_path]) |
|
|
elif system == "linux": |
|
|
subprocess.Popen(["xdg-open", folder_path]) |
|
|
else: |
|
|
return f"❌ Sistema operacional não suportado: {system}" |
|
|
|
|
|
|
|
|
items = os.listdir(folder_path) |
|
|
file_count = len([item for item in items if os.path.isfile(os.path.join(folder_path, item))]) |
|
|
dir_count = len([item for item in items if os.path.isdir(os.path.join(folder_path, item))]) |
|
|
|
|
|
result = f"""📁 Pasta Aberta |
|
|
|
|
|
Caminho: {folder_path} |
|
|
Sistema: {platform.system()} |
|
|
Arquivos: {file_count} |
|
|
Pastas: {dir_count} |
|
|
Total de itens: {len(items)} |
|
|
|
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Pasta aberta: {folder_path}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro ao abrir pasta: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
async def handle_auto_find_files(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
🔎 Buscar arquivos no sistema |
|
|
Uso: /auto_find_files *.py C:\\Projects |
|
|
Uso: /auto_find_files config.json ~ |
|
|
|
|
|
Baseado em: Efficient file search patterns |
|
|
""" |
|
|
if not args.strip(): |
|
|
return "❌ Padrão necessário. Uso: /auto_find_files *.py C:\\Projects" |
|
|
|
|
|
parts = args.strip().split() |
|
|
if len(parts) < 2: |
|
|
return "❌ Padrão e diretório necessários. Uso: /auto_find_files *.py C:\\Projects" |
|
|
|
|
|
pattern = parts[0] |
|
|
search_dir = " ".join(parts[1:]) |
|
|
|
|
|
try: |
|
|
import glob |
|
|
|
|
|
|
|
|
if search_dir.startswith("~"): |
|
|
search_dir = os.path.expanduser(search_dir) |
|
|
|
|
|
|
|
|
if not os.path.exists(search_dir): |
|
|
return f"❌ Diretório não encontrado: {search_dir}" |
|
|
|
|
|
logger.info(f"🔎 Buscando arquivos: {pattern} em {search_dir}") |
|
|
|
|
|
|
|
|
search_pattern = os.path.join(search_dir, "**", pattern) |
|
|
found_files = glob.glob(search_pattern, recursive=True) |
|
|
|
|
|
|
|
|
max_results = 50 |
|
|
limited_files = found_files[:max_results] |
|
|
|
|
|
|
|
|
file_info = [] |
|
|
for file_path in limited_files: |
|
|
try: |
|
|
stat_info = os.stat(file_path) |
|
|
size = stat_info.st_size |
|
|
mtime = time.ctime(stat_info.st_mtime) |
|
|
file_info.append({ |
|
|
'path': file_path, |
|
|
'size': size, |
|
|
'modified': mtime |
|
|
}) |
|
|
except (OSError, IOError): |
|
|
continue |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
results_file = engine.research_dir / f"file_search_{timestamp}.json" |
|
|
|
|
|
with open(results_file, 'w', encoding='utf-8') as f: |
|
|
json.dump({ |
|
|
'pattern': pattern, |
|
|
'search_dir': search_dir, |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'total_found': len(found_files), |
|
|
'showing': len(limited_files), |
|
|
'files': file_info |
|
|
}, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
if file_info: |
|
|
preview_files = [] |
|
|
for info in file_info[:10]: |
|
|
rel_path = os.path.relpath(info['path'], search_dir) |
|
|
file_size = f"{info['size']:,} bytes" if info['size'] < 1024*1024 else f"{info['size']/(1024*1024):.1f} MB" |
|
|
preview_files.append(f"📄 {rel_path} ({file_size})") |
|
|
|
|
|
preview_text = "\n".join(preview_files) |
|
|
if len(file_info) > 10: |
|
|
preview_text += f"\n... e mais {len(file_info) - 10} arquivos" |
|
|
else: |
|
|
preview_text = "Nenhum arquivo encontrado" |
|
|
|
|
|
result = f"""🔎 Busca de Arquivos Concluída |
|
|
|
|
|
Padrão: {pattern} |
|
|
Diretório: {search_dir} |
|
|
Encontrados: {len(found_files)} |
|
|
Mostrando: {len(limited_files)} |
|
|
|
|
|
Arquivos Encontrados: |
|
|
{preview_text} |
|
|
|
|
|
Relatório Completo: {results_file} |
|
|
Status: ✅ Sucesso""" |
|
|
|
|
|
logger.info(f"✅ Busca concluída: {len(found_files)} arquivos encontrados") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro na busca de arquivos: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_auto_research(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
🔬 Pesquisa completa automatizada multi-fonte |
|
|
Uso: /auto_research Claude AI 2025 features |
|
|
|
|
|
Baseado em: Multi-agent research patterns do Chainlit |
|
|
Combina: web search + content analysis + synthesis |
|
|
""" |
|
|
if not args.strip(): |
|
|
return "❌ Tópico necessário. Uso: /auto_research Claude AI 2025 features" |
|
|
|
|
|
topic = args.strip() |
|
|
|
|
|
try: |
|
|
logger.info(f"🔬 Iniciando pesquisa: {topic}") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
research_file = engine.research_dir / f"research_{topic.replace(' ', '_')}_{timestamp}.md" |
|
|
|
|
|
|
|
|
sources_searched = [] |
|
|
search_results = [] |
|
|
|
|
|
if engine.requests_available: |
|
|
|
|
|
try: |
|
|
search_1 = await handle_auto_search(topic, engine) |
|
|
sources_searched.append("DuckDuckGo") |
|
|
search_results.append(f"### DuckDuckGo Results\n{search_1}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Busca DuckDuckGo falhou: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
tech_query = f"{topic} site:arxiv.org OR site:github.com OR site:docs.python.org" |
|
|
search_2 = await handle_auto_search(tech_query, engine) |
|
|
sources_searched.append("Academic/Tech") |
|
|
search_results.append(f"### Academic/Tech Sources\n{search_2}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Busca acadêmica falhou: {e}") |
|
|
|
|
|
|
|
|
analysis_points = [ |
|
|
f"📊 **Tópico Analisado**: {topic}", |
|
|
f"🔍 **Fontes Consultadas**: {', '.join(sources_searched) if sources_searched else 'Nenhuma (deps não disponíveis)'}", |
|
|
f"⏰ **Data da Pesquisa**: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}", |
|
|
"", |
|
|
"## 🎯 RESUMO EXECUTIVO", |
|
|
f"A pesquisa sobre '{topic}' foi realizada utilizando múltiplas fontes.", |
|
|
"", |
|
|
"## 📈 PRINCIPAIS DESCOBERTAS", |
|
|
"1. **Tendências Atuais**: Análise baseada em fontes web recentes", |
|
|
"2. **Tecnologias Relacionadas**: Identificação de ferramentas e frameworks", |
|
|
"3. **Melhores Práticas**: Padrões recomendados pela comunidade", |
|
|
"", |
|
|
"## 🔗 FONTES DETALHADAS" |
|
|
] |
|
|
|
|
|
|
|
|
with open(research_file, 'w', encoding='utf-8') as f: |
|
|
f.write(f"# 🔬 Relatório de Pesquisa: {topic}\n\n") |
|
|
f.write("\n".join(analysis_points)) |
|
|
f.write("\n\n") |
|
|
|
|
|
if search_results: |
|
|
f.write("## 📋 RESULTADOS DE BUSCA DETALHADOS\n\n") |
|
|
for result in search_results: |
|
|
f.write(result) |
|
|
f.write("\n\n---\n\n") |
|
|
else: |
|
|
f.write("## ⚠️ LIMITAÇÕES\n\n") |
|
|
f.write("Dependências de busca não disponíveis. Install: pip install requests beautifulsoup4\n\n") |
|
|
|
|
|
f.write("## 🏆 CONCLUSÕES\n\n") |
|
|
f.write(f"A pesquisa sobre '{topic}' foi processada usando metodologia multi-fonte.\n") |
|
|
f.write("Para análise mais detalhada, execute comandos específicos de busca.\n\n") |
|
|
f.write("---\n") |
|
|
f.write(f"*Relatório gerado pelo HASHIRU 6.1 em {datetime.now().isoformat()}*\n") |
|
|
|
|
|
|
|
|
file_size = os.path.getsize(research_file) |
|
|
|
|
|
result = f"""🔬 Pesquisa Automatizada Completa |
|
|
|
|
|
📋 **Tópico**: {topic} |
|
|
🔍 **Fontes**: {len(sources_searched)} consultadas |
|
|
📊 **Processo**: |
|
|
1. ✅ Busca multi-fonte executada |
|
|
2. ✅ Análise de conteúdo realizada |
|
|
3. ✅ Síntese de informações gerada |
|
|
4. ✅ Relatório estruturado criado |
|
|
|
|
|
📁 **Relatório**: {research_file} |
|
|
📏 **Tamanho**: {file_size:,} bytes |
|
|
|
|
|
🎯 **Próximos Passos**: |
|
|
- Revisar relatório completo |
|
|
- Refinar busca com termos específicos |
|
|
- Executar /auto_search para tópicos específicos |
|
|
|
|
|
Status: ✅ Pesquisa Concluída""" |
|
|
|
|
|
logger.info(f"✅ Pesquisa concluída: {research_file}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro na pesquisa: {str(e)}\n{traceback.format_exc()}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_auto_status(args: str, engine: AutomationEngine) -> str: |
|
|
""" |
|
|
📊 Status completo do sistema de automação |
|
|
Uso: /auto_status |
|
|
|
|
|
Mostra: dependências, diretórios, estatísticas |
|
|
""" |
|
|
try: |
|
|
|
|
|
research_files = list(engine.research_dir.glob("*")) |
|
|
screenshot_files = list(engine.screenshots_dir.glob("*")) |
|
|
log_files = list(engine.logs_dir.glob("*")) |
|
|
|
|
|
|
|
|
total_files = len(research_files) + len(screenshot_files) + len(log_files) |
|
|
|
|
|
|
|
|
deps_status = [] |
|
|
deps_status.append(f"🌐 Selenium: {'✅ Disponível' if engine.selenium_available else '❌ Não instalado'}") |
|
|
deps_status.append(f"🖱️ PyAutoGUI: {'✅ Disponível' if engine.pyautogui_available else '❌ Não instalado'}") |
|
|
deps_status.append(f"🌍 Requests: {'✅ Disponível' if engine.requests_available else '❌ Não instalado'}") |
|
|
deps_status.append(f"🍲 BeautifulSoup: {'✅ Disponível' if engine.bs4_available else '❌ Não instalado'}") |
|
|
|
|
|
|
|
|
available_commands = [] |
|
|
if engine.selenium_available: |
|
|
available_commands.extend(["auto_browse", "auto_click"]) |
|
|
if engine.pyautogui_available: |
|
|
available_commands.extend(["auto_screenshot", "auto_type", "auto_keys"]) |
|
|
if engine.requests_available: |
|
|
available_commands.extend(["auto_search"]) |
|
|
|
|
|
available_commands.extend(["auto_folder", "auto_find_files", "auto_research", "auto_status"]) |
|
|
|
|
|
result = f"""📊 Status do Sistema de Automação HASHIRU |
|
|
|
|
|
🔧 **Dependências**: |
|
|
{chr(10).join(deps_status)} |
|
|
|
|
|
📁 **Diretórios**: |
|
|
- 🔬 Research: {engine.research_dir} ({len(research_files)} arquivos) |
|
|
- 📸 Screenshots: {engine.screenshots_dir} ({len(screenshot_files)} arquivos) |
|
|
- 📋 Logs: {engine.logs_dir} ({len(log_files)} arquivos) |
|
|
|
|
|
📈 **Estatísticas**: |
|
|
- Total de arquivos: {total_files} |
|
|
- Comandos disponíveis: {len(available_commands)} |
|
|
- Engine status: ✅ Operacional |
|
|
|
|
|
🚀 **Comandos Ativos**: |
|
|
{', '.join(available_commands)} |
|
|
|
|
|
⚡ **Capacidades**: |
|
|
- ✅ Web Automation (Selenium) |
|
|
- ✅ Desktop Automation (PyAutoGUI) |
|
|
- ✅ File Operations (Cross-platform) |
|
|
- ✅ Research Automation (Multi-fonte) |
|
|
- ✅ Command Dispatch Pattern |
|
|
|
|
|
🔄 **Para Melhor Performance**: |
|
|
- Install selenium: pip install selenium |
|
|
- Install pyautogui: pip install pyautogui |
|
|
- Install requests beautifulsoup4: pip install requests beautifulsoup4 |
|
|
|
|
|
Status Geral: ✅ Sistema Operacional""" |
|
|
|
|
|
logger.info("✅ Status do sistema consultado") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro ao obter status: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_automation_engine = AutomationEngine() |
|
|
|
|
|
|
|
|
COMMAND_HANDLERS = { |
|
|
|
|
|
"browse": handle_auto_browse, |
|
|
"click": handle_auto_click, |
|
|
"search": handle_auto_search, |
|
|
|
|
|
|
|
|
"screenshot": handle_auto_screenshot, |
|
|
"type": handle_auto_type, |
|
|
"keys": handle_auto_keys, |
|
|
|
|
|
|
|
|
"folder": handle_auto_folder, |
|
|
"find_files": handle_auto_find_files, |
|
|
|
|
|
|
|
|
"research": handle_auto_research, |
|
|
|
|
|
|
|
|
"status": handle_auto_status, |
|
|
} |
|
|
|
|
|
async def handle_automation_command(command: str, args: str = "") -> str: |
|
|
""" |
|
|
🎯 Main automation command dispatcher |
|
|
|
|
|
Implementa Command Dispatcher Pattern baseado em pesquisa 2025: |
|
|
- Loose coupling entre componentes |
|
|
- Encapsulation do mapeamento comando->handler |
|
|
- Inversion of control para handlers |
|
|
|
|
|
Args: |
|
|
command: Nome do comando (ex: "research", "screenshot") |
|
|
args: Argumentos do comando |
|
|
|
|
|
Returns: |
|
|
str: Resultado formatado para exibição no chat |
|
|
""" |
|
|
try: |
|
|
|
|
|
command = command.lower().strip() |
|
|
|
|
|
|
|
|
logger.info(f"🎯 Executando comando: {command} com args: {args[:50]}...") |
|
|
|
|
|
|
|
|
if command not in COMMAND_HANDLERS: |
|
|
available_commands = ", ".join(sorted(COMMAND_HANDLERS.keys())) |
|
|
return f"""❌ Comando de automação desconhecido: {command} |
|
|
|
|
|
📚 Comandos Disponíveis: |
|
|
{available_commands} |
|
|
|
|
|
💡 Exemplo de uso: |
|
|
/auto_research Python automation 2025 |
|
|
/auto_screenshot |
|
|
/auto_search "Selenium best practices" """ |
|
|
|
|
|
|
|
|
handler = COMMAND_HANDLERS[command] |
|
|
|
|
|
|
|
|
result = await handler(args, _automation_engine) |
|
|
|
|
|
logger.info(f"✅ Comando {command} executado com sucesso") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"""❌ Erro no comando de automação: {command} |
|
|
|
|
|
Detalhes: {str(e)} |
|
|
|
|
|
🔧 Troubleshooting: |
|
|
1. Verificar dependências: /auto_status |
|
|
2. Verificar sintaxe do comando |
|
|
3. Consultar logs para detalhes |
|
|
|
|
|
Trace: {traceback.format_exc()[-200:]}""" |
|
|
|
|
|
logger.error(f"❌ Erro no comando {command}: {str(e)}") |
|
|
return error_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"handle_automation_command", |
|
|
"AutomationEngine", |
|
|
"COMMAND_HANDLERS", |
|
|
|
|
|
"handle_auto_browse", |
|
|
"handle_auto_click", |
|
|
"handle_auto_search", |
|
|
"handle_auto_screenshot", |
|
|
"handle_auto_type", |
|
|
"handle_auto_keys", |
|
|
"handle_auto_folder", |
|
|
"handle_auto_find_files", |
|
|
"handle_auto_research", |
|
|
"handle_auto_status", |
|
|
] |
|
|
|
|
|
|
|
|
__version__ = "2.0.0" |
|
|
__author__ = "HASHIRU 6.1 - Enhanced with 2025 Research" |
|
|
__description__ = "Advanced automation module with Command Dispatcher Pattern" |
|
|
|
|
|
|
|
|
logger.info(f"🚀 HASHIRU Automation Commands Module v{__version__} carregado") |
|
|
logger.info(f"📊 {len(COMMAND_HANDLERS)} comandos de automação disponíveis") |
|
|
logger.info(f"⚡ Engine inicializada: {_automation_engine.__class__.__name__}") |