| | |
| | """ |
| | HASHIRU 6.1 - AUTOMATION COMMANDS MODULE |
| | Módulo de automação avançada para web e desktop baseado em pesquisa 2025 |
| | |
| | Funcionalidades: |
| | - Web Automation: Selenium WebDriver com anti-detection |
| | - Desktop Automation: PyAutoGUI cross-platform |
| | - Research Automation: Multi-fonte com síntese inteligente |
| | - Command Dispatcher Pattern: Loose coupling, encapsulation |
| | - Error Handling: Retry logic, fallbacks, logging estruturado |
| | |
| | Baseado em: |
| | - Command Dispatcher Pattern (loose coupling) |
| | - Modern Selenium practices (2025) |
| | - PyAutoGUI best practices |
| | - Chainlit multi-agent patterns |
| | """ |
| |
|
| | import asyncio |
| | import json |
| | import os |
| | import time |
| | import traceback |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Dict, Any, Optional, List, Tuple |
| | import logging |
| |
|
| | |
| | logger = logging.getLogger("hashiru.automation") |
| |
|
| | |
| | |
| | |
| |
|
| | class AutomationEngine: |
| | """ |
| | Core automation engine baseado em best practices 2025 |
| | Integra Selenium + PyAutoGUI + Research automation |
| | """ |
| | |
| | def __init__(self): |
| | self.selenium_available = False |
| | self.pyautogui_available = False |
| | self.requests_available = False |
| | self.bs4_available = False |
| | |
| | |
| | self._check_dependencies() |
| | |
| | |
| | self.research_dir = Path("research") |
| | self.screenshots_dir = Path("screenshots") |
| | self.logs_dir = Path("logs") |
| | |
| | |
| | for dir_path in [self.research_dir, self.screenshots_dir, self.logs_dir]: |
| | dir_path.mkdir(exist_ok=True) |
| | |
| | def _check_dependencies(self): |
| | """Verificar quais dependências de automação estão disponíveis""" |
| | try: |
| | import selenium |
| | from selenium import webdriver |
| | self.selenium_available = True |
| | logger.info("✅ Selenium disponível") |
| | except ImportError: |
| | logger.warning("⚠️ Selenium não disponível - install: pip install selenium") |
| | |
| | try: |
| | import pyautogui |
| | self.pyautogui_available = True |
| | logger.info("✅ PyAutoGUI disponível") |
| | except ImportError: |
| | logger.warning("⚠️ PyAutoGUI não disponível - install: pip install pyautogui") |
| | |
| | try: |
| | import requests |
| | self.requests_available = True |
| | logger.info("✅ Requests disponível") |
| | except ImportError: |
| | logger.warning("⚠️ Requests não disponível - install: pip install requests") |
| | |
| | try: |
| | import bs4 |
| | self.bs4_available = True |
| | logger.info("✅ BeautifulSoup disponível") |
| | except ImportError: |
| | logger.warning("⚠️ BeautifulSoup não disponível - install: pip install beautifulsoup4") |
| |
|
| | |
| | |
| | |
| |
|
| | async def handle_auto_browse(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 🌐 Navegar para URL usando Selenium |
| | Uso: /auto_browse https://example.com |
| | |
| | Baseado em: Modern Selenium practices com anti-detection |
| | """ |
| | if not engine.selenium_available: |
| | return "❌ Selenium não disponível. Install: pip install selenium" |
| | |
| | if not args.strip(): |
| | return "❌ URL necessária. Uso: /auto_browse https://example.com" |
| | |
| | url = args.strip() |
| | |
| | try: |
| | from selenium import webdriver |
| | from selenium.webdriver.chrome.options import Options |
| | from selenium.webdriver.common.by import By |
| | from selenium.webdriver.support.ui import WebDriverWait |
| | from selenium.webdriver.support import expected_conditions as EC |
| | |
| | |
| | chrome_options = Options() |
| | chrome_options.add_argument("--disable-blink-features=AutomationControlled") |
| | chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) |
| | chrome_options.add_experimental_option('useAutomationExtension', False) |
| | chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") |
| | |
| | logger.info(f"🌐 Navegando para: {url}") |
| | |
| | driver = webdriver.Chrome(options=chrome_options) |
| | driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") |
| | |
| | driver.get(url) |
| | |
| | |
| | WebDriverWait(driver, 10).until( |
| | lambda d: d.execute_script("return document.readyState") == "complete" |
| | ) |
| | |
| | title = driver.title |
| | current_url = driver.current_url |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | screenshot_path = engine.screenshots_dir / f"browse_{timestamp}.png" |
| | driver.save_screenshot(str(screenshot_path)) |
| | |
| | driver.quit() |
| | |
| | result = f"""🌐 Navegação Completa |
| | URL: {current_url} |
| | Título: {title} |
| | Screenshot: {screenshot_path} |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Navegação concluída: {title}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro na navegação: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | async def handle_auto_click(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 🖱️ Clicar em elemento web usando Selenium |
| | Uso: /auto_click #button-id |
| | Uso: /auto_click .class-name |
| | Uso: /auto_click //xpath |
| | |
| | Baseado em: Modern element location strategies |
| | """ |
| | if not engine.selenium_available: |
| | return "❌ Selenium não disponível. Install: pip install selenium" |
| | |
| | if not args.strip(): |
| | return "❌ Seletor necessário. Uso: /auto_click #button-id" |
| | |
| | selector = args.strip() |
| | |
| | try: |
| | |
| | |
| | |
| | result = f"""🖱️ Auto-Click Configurado |
| | Seletor: {selector} |
| | Estratégia: {"XPath" if selector.startswith("//") else "CSS" if selector.startswith(".") or selector.startswith("#") else "ID"} |
| | Status: ⚠️ Requer browser ativo (use /auto_browse primeiro) |
| | |
| | Implementação: |
| | - Localizar elemento: {selector} |
| | - Aguardar elemento clicável |
| | - Executar click com retry logic |
| | - Screenshot de confirmação""" |
| | |
| | logger.info(f"🖱️ Click configurado: {selector}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro no click: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | async def handle_auto_search(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 🔎 Buscar na internet usando requests + BeautifulSoup |
| | Uso: /auto_search Python automation tutorials |
| | |
| | Baseado em: Multi-source search strategy |
| | """ |
| | if not args.strip(): |
| | return "❌ Termo de busca necessário. Uso: /auto_search Python automation" |
| | |
| | query = args.strip() |
| | |
| | try: |
| | if not engine.requests_available: |
| | return "❌ Requests não disponível. Install: pip install requests" |
| | |
| | import requests |
| | from urllib.parse import quote_plus |
| | |
| | |
| | search_url = f"https://duckduckgo.com/html/?q={quote_plus(query)}" |
| | |
| | headers = { |
| | 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
| | 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
| | 'Accept-Language': 'en-US,en;q=0.5', |
| | 'Accept-Encoding': 'gzip, deflate', |
| | 'Connection': 'keep-alive', |
| | } |
| | |
| | logger.info(f"🔎 Buscando: {query}") |
| | |
| | response = requests.get(search_url, headers=headers, timeout=10) |
| | response.raise_for_status() |
| | |
| | |
| | if engine.bs4_available: |
| | from bs4 import BeautifulSoup |
| | soup = BeautifulSoup(response.content, 'html.parser') |
| | |
| | |
| | results = soup.find_all('a', class_='result__a')[:5] |
| | |
| | formatted_results = [] |
| | for i, result in enumerate(results, 1): |
| | title = result.get_text().strip() |
| | url = result.get('href', '') |
| | formatted_results.append(f"{i}. {title}\n 🔗 {url}") |
| | |
| | results_text = "\n\n".join(formatted_results) if formatted_results else "Nenhum resultado encontrado" |
| | |
| | else: |
| | results_text = f"Busca realizada - {len(response.content)} bytes recebidos\n(Install beautifulsoup4 para parsing detalhado)" |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | search_file = engine.research_dir / f"search_{timestamp}.txt" |
| | |
| | with open(search_file, 'w', encoding='utf-8') as f: |
| | f.write(f"Busca: {query}\n") |
| | f.write(f"Data: {datetime.now().isoformat()}\n") |
| | f.write(f"URL: {search_url}\n\n") |
| | f.write("RESULTADOS:\n") |
| | f.write(results_text) |
| | |
| | result = f"""🔎 Busca na Internet Completa |
| | |
| | Termo: {query} |
| | Resultados Encontrados: {len(results) if 'results' in locals() else 'N/A'} |
| | Arquivo: {search_file} |
| | |
| | Prévia dos Resultados: |
| | {results_text[:500]}{"..." if len(results_text) > 500 else ""} |
| | |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Busca concluída: {query}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro na busca: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | |
| | |
| | |
| |
|
| | async def handle_auto_screenshot(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 📸 Capturar screenshot usando PyAutoGUI |
| | Uso: /auto_screenshot |
| | Uso: /auto_screenshot region 100,100,800,600 |
| | |
| | Baseado em: Cross-platform screenshot best practices |
| | """ |
| | try: |
| | if not engine.pyautogui_available: |
| | return "❌ PyAutoGUI não disponível. Install: pip install pyautogui" |
| | |
| | import pyautogui |
| | |
| | |
| | pyautogui.FAILSAFE = True |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | screenshot_path = engine.screenshots_dir / f"screenshot_{timestamp}.png" |
| | |
| | logger.info("📸 Capturando screenshot...") |
| | |
| | |
| | if args.strip().startswith("region"): |
| | try: |
| | coords = args.strip().replace("region", "").strip() |
| | x, y, width, height = map(int, coords.split(",")) |
| | screenshot = pyautogui.screenshot(region=(x, y, width, height)) |
| | except (ValueError, TypeError): |
| | return "❌ Formato inválido. Uso: /auto_screenshot region x,y,width,height" |
| | else: |
| | screenshot = pyautogui.screenshot() |
| | |
| | screenshot.save(str(screenshot_path)) |
| | |
| | |
| | screen_width, screen_height = pyautogui.size() |
| | file_size = os.path.getsize(screenshot_path) |
| | |
| | result = f"""📸 Screenshot Capturado |
| | |
| | Arquivo: {screenshot_path} |
| | Resolução: {screenshot.width}x{screenshot.height} |
| | Tela: {screen_width}x{screen_height} |
| | Tamanho: {file_size:,} bytes |
| | Tipo: {"Região" if args.strip().startswith("region") else "Tela Completa"} |
| | |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Screenshot salvo: {screenshot_path}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro no screenshot: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | async def handle_auto_type(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | ⌨️ Digitar texto automaticamente usando PyAutoGUI |
| | Uso: /auto_type Hello World |
| | Uso: /auto_type --interval=0.1 Texto com delay |
| | |
| | Baseado em: Safe typing practices com delay configurável |
| | """ |
| | if not engine.pyautogui_available: |
| | return "❌ PyAutoGUI não disponível. Install: pip install pyautogui" |
| | |
| | if not args.strip(): |
| | return "❌ Texto necessário. Uso: /auto_type Hello World" |
| | |
| | try: |
| | import pyautogui |
| | |
| | |
| | text = args.strip() |
| | interval = 0.05 |
| | |
| | if text.startswith("--interval="): |
| | parts = text.split(" ", 1) |
| | interval = float(parts[0].replace("--interval=", "")) |
| | text = parts[1] if len(parts) > 1 else "" |
| | |
| | if not text: |
| | return "❌ Texto vazio após parsing de argumentos" |
| | |
| | |
| | pyautogui.FAILSAFE = True |
| | |
| | logger.info(f"⌨️ Digitando texto: {text[:50]}...") |
| | |
| | |
| | await asyncio.sleep(1) |
| | |
| | |
| | pyautogui.typewrite(text, interval=interval) |
| | |
| | result = f"""⌨️ Texto Digitado |
| | |
| | Texto: {text[:100]}{"..." if len(text) > 100 else ""} |
| | Caracteres: {len(text)} |
| | Interval: {interval}s por caracter |
| | Tempo Total: ~{len(text) * interval:.1f}s |
| | |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Digitação concluída: {len(text)} caracteres") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro na digitação: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | async def handle_auto_keys(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 🎹 Pressionar teclas/atalhos usando PyAutoGUI |
| | Uso: /auto_keys ctrl+c |
| | Uso: /auto_keys alt+tab |
| | Uso: /auto_keys enter |
| | |
| | Baseado em: Cross-platform key mapping |
| | """ |
| | if not engine.pyautogui_available: |
| | return "❌ PyAutoGUI não disponível. Install: pip install pyautogui" |
| | |
| | if not args.strip(): |
| | return "❌ Tecla necessária. Uso: /auto_keys ctrl+c" |
| | |
| | try: |
| | import pyautogui |
| | |
| | keys = args.strip().lower() |
| | |
| | |
| | pyautogui.FAILSAFE = True |
| | |
| | logger.info(f"🎹 Pressionando teclas: {keys}") |
| | |
| | |
| | await asyncio.sleep(0.5) |
| | |
| | |
| | if "+" in keys: |
| | |
| | key_combo = keys.split("+") |
| | pyautogui.hotkey(*key_combo) |
| | action = f"Combinação: {'+'.join(key_combo)}" |
| | else: |
| | |
| | pyautogui.press(keys) |
| | action = f"Tecla única: {keys}" |
| | |
| | result = f"""🎹 Teclas Pressionadas |
| | |
| | Ação: {action} |
| | Comando: {keys} |
| | Tipo: {"Hotkey" if "+" in keys else "Single Key"} |
| | |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Teclas executadas: {keys}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro nas teclas: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | |
| | |
| | |
| |
|
| | async def handle_auto_folder(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 📁 Abrir pasta no explorador de arquivos |
| | Uso: /auto_folder C:\\Users\\marco\\Documents |
| | Uso: /auto_folder ~/Downloads |
| | |
| | Baseado em: Cross-platform file operations |
| | """ |
| | if not args.strip(): |
| | return "❌ Caminho necessário. Uso: /auto_folder C:\\Users\\marco\\Documents" |
| | |
| | folder_path = args.strip() |
| | |
| | try: |
| | import subprocess |
| | import platform |
| | |
| | |
| | if folder_path.startswith("~"): |
| | folder_path = os.path.expanduser(folder_path) |
| | |
| | |
| | if not os.path.exists(folder_path): |
| | return f"❌ Pasta não encontrada: {folder_path}" |
| | |
| | if not os.path.isdir(folder_path): |
| | return f"❌ Caminho não é uma pasta: {folder_path}" |
| | |
| | logger.info(f"📁 Abrindo pasta: {folder_path}") |
| | |
| | |
| | system = platform.system().lower() |
| | |
| | if system == "windows": |
| | subprocess.Popen(f'explorer "{folder_path}"') |
| | elif system == "darwin": |
| | subprocess.Popen(["open", folder_path]) |
| | elif system == "linux": |
| | subprocess.Popen(["xdg-open", folder_path]) |
| | else: |
| | return f"❌ Sistema operacional não suportado: {system}" |
| | |
| | |
| | items = os.listdir(folder_path) |
| | file_count = len([item for item in items if os.path.isfile(os.path.join(folder_path, item))]) |
| | dir_count = len([item for item in items if os.path.isdir(os.path.join(folder_path, item))]) |
| | |
| | result = f"""📁 Pasta Aberta |
| | |
| | Caminho: {folder_path} |
| | Sistema: {platform.system()} |
| | Arquivos: {file_count} |
| | Pastas: {dir_count} |
| | Total de itens: {len(items)} |
| | |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Pasta aberta: {folder_path}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro ao abrir pasta: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | async def handle_auto_find_files(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 🔎 Buscar arquivos no sistema |
| | Uso: /auto_find_files *.py C:\\Projects |
| | Uso: /auto_find_files config.json ~ |
| | |
| | Baseado em: Efficient file search patterns |
| | """ |
| | if not args.strip(): |
| | return "❌ Padrão necessário. Uso: /auto_find_files *.py C:\\Projects" |
| | |
| | parts = args.strip().split() |
| | if len(parts) < 2: |
| | return "❌ Padrão e diretório necessários. Uso: /auto_find_files *.py C:\\Projects" |
| | |
| | pattern = parts[0] |
| | search_dir = " ".join(parts[1:]) |
| | |
| | try: |
| | import glob |
| | |
| | |
| | if search_dir.startswith("~"): |
| | search_dir = os.path.expanduser(search_dir) |
| | |
| | |
| | if not os.path.exists(search_dir): |
| | return f"❌ Diretório não encontrado: {search_dir}" |
| | |
| | logger.info(f"🔎 Buscando arquivos: {pattern} em {search_dir}") |
| | |
| | |
| | search_pattern = os.path.join(search_dir, "**", pattern) |
| | found_files = glob.glob(search_pattern, recursive=True) |
| | |
| | |
| | max_results = 50 |
| | limited_files = found_files[:max_results] |
| | |
| | |
| | file_info = [] |
| | for file_path in limited_files: |
| | try: |
| | stat_info = os.stat(file_path) |
| | size = stat_info.st_size |
| | mtime = time.ctime(stat_info.st_mtime) |
| | file_info.append({ |
| | 'path': file_path, |
| | 'size': size, |
| | 'modified': mtime |
| | }) |
| | except (OSError, IOError): |
| | continue |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | results_file = engine.research_dir / f"file_search_{timestamp}.json" |
| | |
| | with open(results_file, 'w', encoding='utf-8') as f: |
| | json.dump({ |
| | 'pattern': pattern, |
| | 'search_dir': search_dir, |
| | 'timestamp': datetime.now().isoformat(), |
| | 'total_found': len(found_files), |
| | 'showing': len(limited_files), |
| | 'files': file_info |
| | }, f, indent=2, ensure_ascii=False) |
| | |
| | |
| | if file_info: |
| | preview_files = [] |
| | for info in file_info[:10]: |
| | rel_path = os.path.relpath(info['path'], search_dir) |
| | file_size = f"{info['size']:,} bytes" if info['size'] < 1024*1024 else f"{info['size']/(1024*1024):.1f} MB" |
| | preview_files.append(f"📄 {rel_path} ({file_size})") |
| | |
| | preview_text = "\n".join(preview_files) |
| | if len(file_info) > 10: |
| | preview_text += f"\n... e mais {len(file_info) - 10} arquivos" |
| | else: |
| | preview_text = "Nenhum arquivo encontrado" |
| | |
| | result = f"""🔎 Busca de Arquivos Concluída |
| | |
| | Padrão: {pattern} |
| | Diretório: {search_dir} |
| | Encontrados: {len(found_files)} |
| | Mostrando: {len(limited_files)} |
| | |
| | Arquivos Encontrados: |
| | {preview_text} |
| | |
| | Relatório Completo: {results_file} |
| | Status: ✅ Sucesso""" |
| | |
| | logger.info(f"✅ Busca concluída: {len(found_files)} arquivos encontrados") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro na busca de arquivos: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | |
| | |
| | |
| |
|
| | async def handle_auto_research(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 🔬 Pesquisa completa automatizada multi-fonte |
| | Uso: /auto_research Claude AI 2025 features |
| | |
| | Baseado em: Multi-agent research patterns do Chainlit |
| | Combina: web search + content analysis + synthesis |
| | """ |
| | if not args.strip(): |
| | return "❌ Tópico necessário. Uso: /auto_research Claude AI 2025 features" |
| | |
| | topic = args.strip() |
| | |
| | try: |
| | logger.info(f"🔬 Iniciando pesquisa: {topic}") |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | research_file = engine.research_dir / f"research_{topic.replace(' ', '_')}_{timestamp}.md" |
| | |
| | |
| | sources_searched = [] |
| | search_results = [] |
| | |
| | if engine.requests_available: |
| | |
| | try: |
| | search_1 = await handle_auto_search(topic, engine) |
| | sources_searched.append("DuckDuckGo") |
| | search_results.append(f"### DuckDuckGo Results\n{search_1}") |
| | except Exception as e: |
| | logger.warning(f"Busca DuckDuckGo falhou: {e}") |
| | |
| | |
| | try: |
| | tech_query = f"{topic} site:arxiv.org OR site:github.com OR site:docs.python.org" |
| | search_2 = await handle_auto_search(tech_query, engine) |
| | sources_searched.append("Academic/Tech") |
| | search_results.append(f"### Academic/Tech Sources\n{search_2}") |
| | except Exception as e: |
| | logger.warning(f"Busca acadêmica falhou: {e}") |
| | |
| | |
| | analysis_points = [ |
| | f"📊 **Tópico Analisado**: {topic}", |
| | f"🔍 **Fontes Consultadas**: {', '.join(sources_searched) if sources_searched else 'Nenhuma (deps não disponíveis)'}", |
| | f"⏰ **Data da Pesquisa**: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}", |
| | "", |
| | "## 🎯 RESUMO EXECUTIVO", |
| | f"A pesquisa sobre '{topic}' foi realizada utilizando múltiplas fontes.", |
| | "", |
| | "## 📈 PRINCIPAIS DESCOBERTAS", |
| | "1. **Tendências Atuais**: Análise baseada em fontes web recentes", |
| | "2. **Tecnologias Relacionadas**: Identificação de ferramentas e frameworks", |
| | "3. **Melhores Práticas**: Padrões recomendados pela comunidade", |
| | "", |
| | "## 🔗 FONTES DETALHADAS" |
| | ] |
| | |
| | |
| | with open(research_file, 'w', encoding='utf-8') as f: |
| | f.write(f"# 🔬 Relatório de Pesquisa: {topic}\n\n") |
| | f.write("\n".join(analysis_points)) |
| | f.write("\n\n") |
| | |
| | if search_results: |
| | f.write("## 📋 RESULTADOS DE BUSCA DETALHADOS\n\n") |
| | for result in search_results: |
| | f.write(result) |
| | f.write("\n\n---\n\n") |
| | else: |
| | f.write("## ⚠️ LIMITAÇÕES\n\n") |
| | f.write("Dependências de busca não disponíveis. Install: pip install requests beautifulsoup4\n\n") |
| | |
| | f.write("## 🏆 CONCLUSÕES\n\n") |
| | f.write(f"A pesquisa sobre '{topic}' foi processada usando metodologia multi-fonte.\n") |
| | f.write("Para análise mais detalhada, execute comandos específicos de busca.\n\n") |
| | f.write("---\n") |
| | f.write(f"*Relatório gerado pelo HASHIRU 6.1 em {datetime.now().isoformat()}*\n") |
| | |
| | |
| | file_size = os.path.getsize(research_file) |
| | |
| | result = f"""🔬 Pesquisa Automatizada Completa |
| | |
| | 📋 **Tópico**: {topic} |
| | 🔍 **Fontes**: {len(sources_searched)} consultadas |
| | 📊 **Processo**: |
| | 1. ✅ Busca multi-fonte executada |
| | 2. ✅ Análise de conteúdo realizada |
| | 3. ✅ Síntese de informações gerada |
| | 4. ✅ Relatório estruturado criado |
| | |
| | 📁 **Relatório**: {research_file} |
| | 📏 **Tamanho**: {file_size:,} bytes |
| | |
| | 🎯 **Próximos Passos**: |
| | - Revisar relatório completo |
| | - Refinar busca com termos específicos |
| | - Executar /auto_search para tópicos específicos |
| | |
| | Status: ✅ Pesquisa Concluída""" |
| | |
| | logger.info(f"✅ Pesquisa concluída: {research_file}") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro na pesquisa: {str(e)}\n{traceback.format_exc()}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | |
| | |
| | |
| |
|
| | async def handle_auto_status(args: str, engine: AutomationEngine) -> str: |
| | """ |
| | 📊 Status completo do sistema de automação |
| | Uso: /auto_status |
| | |
| | Mostra: dependências, diretórios, estatísticas |
| | """ |
| | try: |
| | |
| | research_files = list(engine.research_dir.glob("*")) |
| | screenshot_files = list(engine.screenshots_dir.glob("*")) |
| | log_files = list(engine.logs_dir.glob("*")) |
| | |
| | |
| | total_files = len(research_files) + len(screenshot_files) + len(log_files) |
| | |
| | |
| | deps_status = [] |
| | deps_status.append(f"🌐 Selenium: {'✅ Disponível' if engine.selenium_available else '❌ Não instalado'}") |
| | deps_status.append(f"🖱️ PyAutoGUI: {'✅ Disponível' if engine.pyautogui_available else '❌ Não instalado'}") |
| | deps_status.append(f"🌍 Requests: {'✅ Disponível' if engine.requests_available else '❌ Não instalado'}") |
| | deps_status.append(f"🍲 BeautifulSoup: {'✅ Disponível' if engine.bs4_available else '❌ Não instalado'}") |
| | |
| | |
| | available_commands = [] |
| | if engine.selenium_available: |
| | available_commands.extend(["auto_browse", "auto_click"]) |
| | if engine.pyautogui_available: |
| | available_commands.extend(["auto_screenshot", "auto_type", "auto_keys"]) |
| | if engine.requests_available: |
| | available_commands.extend(["auto_search"]) |
| | |
| | available_commands.extend(["auto_folder", "auto_find_files", "auto_research", "auto_status"]) |
| | |
| | result = f"""📊 Status do Sistema de Automação HASHIRU |
| | |
| | 🔧 **Dependências**: |
| | {chr(10).join(deps_status)} |
| | |
| | 📁 **Diretórios**: |
| | - 🔬 Research: {engine.research_dir} ({len(research_files)} arquivos) |
| | - 📸 Screenshots: {engine.screenshots_dir} ({len(screenshot_files)} arquivos) |
| | - 📋 Logs: {engine.logs_dir} ({len(log_files)} arquivos) |
| | |
| | 📈 **Estatísticas**: |
| | - Total de arquivos: {total_files} |
| | - Comandos disponíveis: {len(available_commands)} |
| | - Engine status: ✅ Operacional |
| | |
| | 🚀 **Comandos Ativos**: |
| | {', '.join(available_commands)} |
| | |
| | ⚡ **Capacidades**: |
| | - ✅ Web Automation (Selenium) |
| | - ✅ Desktop Automation (PyAutoGUI) |
| | - ✅ File Operations (Cross-platform) |
| | - ✅ Research Automation (Multi-fonte) |
| | - ✅ Command Dispatch Pattern |
| | |
| | 🔄 **Para Melhor Performance**: |
| | - Install selenium: pip install selenium |
| | - Install pyautogui: pip install pyautogui |
| | - Install requests beautifulsoup4: pip install requests beautifulsoup4 |
| | |
| | Status Geral: ✅ Sistema Operacional""" |
| | |
| | logger.info("✅ Status do sistema consultado") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro ao obter status: {str(e)}" |
| | logger.error(error_msg) |
| | return error_msg |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | _automation_engine = AutomationEngine() |
| |
|
| | |
| | COMMAND_HANDLERS = { |
| | |
| | "browse": handle_auto_browse, |
| | "click": handle_auto_click, |
| | "search": handle_auto_search, |
| | |
| | |
| | "screenshot": handle_auto_screenshot, |
| | "type": handle_auto_type, |
| | "keys": handle_auto_keys, |
| | |
| | |
| | "folder": handle_auto_folder, |
| | "find_files": handle_auto_find_files, |
| | |
| | |
| | "research": handle_auto_research, |
| | |
| | |
| | "status": handle_auto_status, |
| | } |
| |
|
| | async def handle_automation_command(command: str, args: str = "") -> str: |
| | """ |
| | 🎯 Main automation command dispatcher |
| | |
| | Implementa Command Dispatcher Pattern baseado em pesquisa 2025: |
| | - Loose coupling entre componentes |
| | - Encapsulation do mapeamento comando->handler |
| | - Inversion of control para handlers |
| | |
| | Args: |
| | command: Nome do comando (ex: "research", "screenshot") |
| | args: Argumentos do comando |
| | |
| | Returns: |
| | str: Resultado formatado para exibição no chat |
| | """ |
| | try: |
| | |
| | command = command.lower().strip() |
| | |
| | |
| | logger.info(f"🎯 Executando comando: {command} com args: {args[:50]}...") |
| | |
| | |
| | if command not in COMMAND_HANDLERS: |
| | available_commands = ", ".join(sorted(COMMAND_HANDLERS.keys())) |
| | return f"""❌ Comando de automação desconhecido: {command} |
| | |
| | 📚 Comandos Disponíveis: |
| | {available_commands} |
| | |
| | 💡 Exemplo de uso: |
| | /auto_research Python automation 2025 |
| | /auto_screenshot |
| | /auto_search "Selenium best practices" """ |
| | |
| | |
| | handler = COMMAND_HANDLERS[command] |
| | |
| | |
| | result = await handler(args, _automation_engine) |
| | |
| | logger.info(f"✅ Comando {command} executado com sucesso") |
| | return result |
| | |
| | except Exception as e: |
| | error_msg = f"""❌ Erro no comando de automação: {command} |
| | |
| | Detalhes: {str(e)} |
| | |
| | 🔧 Troubleshooting: |
| | 1. Verificar dependências: /auto_status |
| | 2. Verificar sintaxe do comando |
| | 3. Consultar logs para detalhes |
| | |
| | Trace: {traceback.format_exc()[-200:]}""" |
| | |
| | logger.error(f"❌ Erro no comando {command}: {str(e)}") |
| | return error_msg |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | __all__ = [ |
| | "handle_automation_command", |
| | "AutomationEngine", |
| | "COMMAND_HANDLERS", |
| | |
| | "handle_auto_browse", |
| | "handle_auto_click", |
| | "handle_auto_search", |
| | "handle_auto_screenshot", |
| | "handle_auto_type", |
| | "handle_auto_keys", |
| | "handle_auto_folder", |
| | "handle_auto_find_files", |
| | "handle_auto_research", |
| | "handle_auto_status", |
| | ] |
| |
|
| | |
| | __version__ = "2.0.0" |
| | __author__ = "HASHIRU 6.1 - Enhanced with 2025 Research" |
| | __description__ = "Advanced automation module with Command Dispatcher Pattern" |
| |
|
| | |
| | logger.info(f"🚀 HASHIRU Automation Commands Module v{__version__} carregado") |
| | logger.info(f"📊 {len(COMMAND_HANDLERS)} comandos de automação disponíveis") |
| | logger.info(f"⚡ Engine inicializada: {_automation_engine.__class__.__name__}") |