""" 🤖 N8n Assistant - Versão Otimizada Chatbot inteligente para dúvidas sobre n8n com arquitetura robusta Melhorias implementadas: - Tratamento completo de erros - Sistema de logging - Arquitetura orientada a objetos - Interface melhorada com feedback visual - Sistema de cache para performance - Validações robustas """ import os import yaml import json import logging import pickle import hashlib import time from pathlib import Path from typing import Optional, Tuple import gradio as gr # Configurar logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Importações com tratamento de erro try: from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding from huggingface_hub import snapshot_download logger.info("Todas as bibliotecas importadas com sucesso") except ImportError as e: logger.error(f"Erro ao importar bibliotecas: {e}") raise class N8nChatbotPro: """ Chatbot profissional para n8n com sistema de cache e tratamento robusto de erros """ def __init__(self): self.index = None self.query_engine = None self.docs_dir = None self.cache_dir = Path("cache") self.cache_dir.mkdir(exist_ok=True) self.metricas = { 'total_perguntas': 0, 'tempo_inicializacao': 0, 'cache_hits': 0 } logger.info("Inicializando N8nChatbotPro...") def setup_openai(self) -> bool: """Configurar OpenAI com validação robusta""" try: api_key = os.getenv("OPENAI_API_KEY") if not api_key: logger.error("❌ OPENAI_API_KEY não encontrada nas variáveis de ambiente") return False if len(api_key) < 20: # Validação básica do formato logger.error("❌ OPENAI_API_KEY parece inválida (muito curta)") return False os.environ["OPENAI_API_KEY"] = api_key logger.info("✅ OpenAI API configurada com sucesso") return True except Exception as e: logger.error(f"❌ Erro ao configurar OpenAI: {e}") return False def calcular_hash_dataset(self, pasta: str) -> str: """Calcular hash MD5 do dataset para verificar mudanças""" try: hash_md5 = hashlib.md5() arquivos_processados = 0 for root, dirs, files in os.walk(pasta): for file in sorted(files): # Ordenar para hash consistente caminho_arquivo = os.path.join(root, file) try: with open(caminho_arquivo, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) arquivos_processados += 1 except Exception as e: logger.warning(f"Erro ao processar {file} para hash: {e}") continue hash_resultado = hash_md5.hexdigest() logger.info(f"Hash calculado para {arquivos_processados} arquivos: {hash_resultado[:8]}...") return hash_resultado except Exception as e: logger.error(f"Erro ao calcular hash: {e}") return "" def salvar_cache(self, index: VectorStoreIndex, hash_dataset: str) -> bool: """Salvar índice e hash em cache""" try: cache_index_path = self.cache_dir / "index.pkl" cache_hash_path = self.cache_dir / "dataset_hash.txt" cache_metadata_path = self.cache_dir / "metadata.json" # Salvar índice with open(cache_index_path, 'wb') as f: pickle.dump(index, f) # Salvar hash with open(cache_hash_path, 'w') as f: f.write(hash_dataset) # Salvar metadata metadata = { 'timestamp': time.time(), 'hash': hash_dataset, 'version': '1.0' } with open(cache_metadata_path, 'w') as f: json.dump(metadata, f) logger.info("✅ Cache salvo com sucesso") return True except Exception as e: logger.warning(f"⚠️ Erro ao salvar cache: {e}") return False def carregar_cache(self, hash_atual: str) -> Optional[VectorStoreIndex]: """Carregar índice do cache se hash for igual""" try: cache_index_path = self.cache_dir / "index.pkl" cache_hash_path = self.cache_dir / "dataset_hash.txt" if not (cache_index_path.exists() and cache_hash_path.exists()): logger.info("📁 Cache não encontrado") return None # Verificar hash with open(cache_hash_path, 'r') as f: hash_cache = f.read().strip() if hash_cache != hash_atual: logger.info("🔄 Dataset foi atualizado, cache inválido") return None # Carregar índice with open(cache_index_path, 'rb') as f: index = pickle.load(f) self.metricas['cache_hits'] += 1 logger.info("⚡ Cache carregado com sucesso") return index except Exception as e: logger.warning(f"⚠️ Erro ao carregar cache: {e}") return None def extrair_conteudo_dos_arquivos(self, pasta: str) -> Tuple[str, dict]: """Extrair conteúdo de arquivos com estatísticas detalhadas""" texto_final = "" estatisticas = { 'yaml': 0, 'json': 0, 'markdown': 0, 'txt': 0, 'erros': 0, 'total_caracteres': 0 } if not os.path.exists(pasta): logger.error(f"❌ Pasta não encontrada: {pasta}") return "", estatisticas logger.info(f"📂 Processando arquivos em: {pasta}") for root, dirs, files in os.walk(pasta): for file in files: caminho_arquivo = os.path.join(root, file) try: conteudo_arquivo = "" if file.endswith(('.yml', '.yaml')): with open(caminho_arquivo, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) conteudo_arquivo = yaml.dump(data, allow_unicode=True) estatisticas['yaml'] += 1 elif file.endswith('.json'): with open(caminho_arquivo, 'r', encoding='utf-8') as f: data = json.load(f) conteudo_arquivo = json.dumps(data, indent=2, ensure_ascii=False) estatisticas['json'] += 1 elif file.endswith('.md'): with open(caminho_arquivo, 'r', encoding='utf-8') as f: conteudo_arquivo = f.read() estatisticas['markdown'] += 1 elif file.endswith('.txt'): with open(caminho_arquivo, 'r', encoding='utf-8') as f: conteudo_arquivo = f.read() estatisticas['txt'] += 1 else: continue if conteudo_arquivo: texto_final += f"\n\n### Arquivo: {file}\n{conteudo_arquivo}" estatisticas['total_caracteres'] += len(conteudo_arquivo) except Exception as e: logger.warning(f"⚠️ Erro ao ler {file}: {e}") estatisticas['erros'] += 1 continue total_arquivos = sum([estatisticas[k] for k in ['yaml', 'json', 'markdown', 'txt']]) logger.info(f"✅ Processados {total_arquivos} arquivos ({estatisticas['total_caracteres']:,} caracteres)") return texto_final, estatisticas def gerar_arquivo_documentacao(self, pasta_origem: str, arquivo_destino: str = "documentacao.txt") -> Tuple[bool, dict]: """Gerar arquivo de documentação com estatísticas""" try: logger.info("📝 Gerando arquivo de documentação...") texto, estatisticas = self.extrair_conteudo_dos_arquivos(pasta_origem) if not texto.strip(): logger.warning("⚠️ Nenhum conteúdo encontrado nos arquivos!") return False, estatisticas with open(arquivo_destino, 'w', encoding='utf-8') as f: f.write(texto) # Verificar tamanho do arquivo gerado tamanho_arquivo = os.path.getsize(arquivo_destino) logger.info(f"✅ Arquivo {arquivo_destino} gerado ({tamanho_arquivo:,} bytes)") return True, estatisticas except Exception as e: logger.error(f"❌ Erro ao gerar documentação: {e}") return False, {} def baixar_documentacao(self) -> bool: """Baixar documentação do Hugging Face com retry""" max_tentativas = 3 for tentativa in range(max_tentativas): try: logger.info(f"📥 Baixando documentação do n8n (tentativa {tentativa + 1}/{max_tentativas})...") self.docs_dir = snapshot_download( repo_id="Jeice/n8n-docs-v2", repo_type="dataset", cache_dir="./hf_cache" ) # Verificar se o download foi bem-sucedido if os.path.exists(self.docs_dir) and os.listdir(self.docs_dir): logger.info(f"✅ Documentação baixada em: {self.docs_dir}") return True else: logger.warning("⚠️ Diretório vazio após download") except Exception as e: logger.warning(f"⚠️ Tentativa {tentativa + 1} falhou: {e}") if tentativa == max_tentativas - 1: logger.error("❌ Todas as tentativas de download falharam") return False time.sleep(2) # Aguardar antes da próxima tentativa return False def configurar_llm(self) -> bool: """Configurar LLM com parâmetros otimizados""" try: Settings.llm = OpenAI( model="gpt-3.5-turbo", temperature=0.1, # Baixa temperatura para respostas mais precisas max_tokens=1000, # Limite de tokens para controlar custos system_prompt=( "Você é um assistente especialista na plataforma n8n com vasta experiência. " "Responda sempre em português do Brasil, de forma clara, direta e objetiva. " "Base suas respostas exclusivamente na documentação fornecida. " "Se não houver informações suficientes na documentação, seja honesto e " "diga que não há informações suficientes para responder adequadamente. " "Forneça exemplos práticos quando possível e estruture suas respostas " "de forma didática e fácil de entender." ) ) Settings.embed_model = OpenAIEmbedding( model="text-embedding-ada-002" ) logger.info("✅ LLM configurado com sucesso") return True except Exception as e: logger.error(f"❌ Erro ao configurar LLM: {e}") return False def inicializar_index_com_cache(self) -> Tuple[bool, str]: """Inicializar índice com sistema de cache inteligente""" inicio_tempo = time.time() try: # Calcular hash do dataset hash_dataset = self.calcular_hash_dataset(self.docs_dir) if not hash_dataset: return False, "Erro ao calcular hash do dataset" # Tentar carregar do cache cached_index = self.carregar_cache(hash_dataset) if cached_index: self.index = cached_index # Configurar LLM (necessário mesmo com cache) if not self.configurar_llm(): return False, "Erro ao configurar LLM" self.query_engine = self.index.as_query_engine( similarity_top_k=5, # Retornar top 5 documentos mais similares response_mode="compact" # Modo compacto para respostas mais diretas ) tempo_cache = time.time() - inicio_tempo self.metricas['tempo_inicializacao'] = tempo_cache return True, f"Índice carregado do cache em {tempo_cache:.2f}s" # Se não há cache válido, processar normalmente logger.info("🔄 Processando documentação (sem cache válido)...") # Gerar arquivo de documentação sucesso, estatisticas = self.gerar_arquivo_documentacao(self.docs_dir) if not sucesso: return False, "Erro ao processar documentação" # Verificar se arquivo foi criado if not os.path.exists("documentacao.txt"): return False, "Arquivo documentacao.txt não foi criado" # Carregar documentos logger.info("📚 Carregando documentos...") documents = SimpleDirectoryReader(input_files=["documentacao.txt"]).load_data() if not documents: return False, "Nenhum documento foi carregado" logger.info(f"✅ {len(documents)} documentos carregados") # Configurar LLM if not self.configurar_llm(): return False, "Erro ao configurar LLM" # Criar índice vetorial logger.info("🧠 Criando índice vetorial...") self.index = VectorStoreIndex.from_documents( documents, show_progress=True ) # Configurar query engine self.query_engine = self.index.as_query_engine( similarity_top_k=5, response_mode="compact" ) # Salvar no cache self.salvar_cache(self.index, hash_dataset) tempo_total = time.time() - inicio_tempo self.metricas['tempo_inicializacao'] = tempo_total return True, f"Índice criado em {tempo_total:.2f}s ({estatisticas})" except Exception as e: logger.error(f"❌ Erro ao inicializar índice: {e}") return False, f"Erro na inicialização: {str(e)}" def setup(self) -> Tuple[bool, str]: """Configurar todo o sistema com validações completas""" try: logger.info("🚀 Iniciando configuração do sistema...") # 1. Configurar OpenAI if not self.setup_openai(): return False, "❌ Falha na configuração da OpenAI API" # 2. Baixar documentação if not self.baixar_documentacao(): return False, "❌ Falha no download da documentação" # 3. Inicializar índice com cache sucesso, mensagem = self.inicializar_index_com_cache() if not sucesso: return False, f"❌ {mensagem}" logger.info("✅ Sistema configurado com sucesso!") return True, f"✅ {mensagem}" except Exception as e: logger.error(f"❌ Erro crítico na configuração: {e}") return False, f"❌ Erro crítico: {str(e)}" def chatbot(self, input_text: str) -> str: """Função principal do chatbot com validações e métricas""" # Validações de entrada if not input_text or not input_text.strip(): return "⚠️ Por favor, digite uma pergunta válida." if len(input_text.strip()) < 3: return "⚠️ Pergunta muito curta. Por favor, seja mais específico." if not self.query_engine: return "❌ Sistema não inicializado. Tente recarregar a página." try: inicio = time.time() self.metricas['total_perguntas'] += 1 # Log da pergunta (primeiros 100 caracteres) pergunta_log = input_text[:100] + "..." if len(input_text) > 100 else input_text logger.info(f"🤔 Pergunta #{self.metricas['total_perguntas']}: {pergunta_log}") # Processar pergunta response = self.query_engine.query(input_text) # Calcular tempo de resposta tempo_resposta = time.time() - inicio logger.info(f"⚡ Resposta gerada em {tempo_resposta:.2f}s") # Formatar resposta resposta_formatada = str(response).strip() # Adicionar informações de debug se necessário if len(resposta_formatada) < 50: resposta_formatada += f"\n\n_Tempo de resposta: {tempo_resposta:.2f}s_" return resposta_formatada except Exception as e: logger.error(f"❌ Erro ao processar pergunta: {e}") return f"❌ Erro ao processar sua pergunta: {str(e)}\n\nTente reformular sua pergunta ou recarregar a página." def obter_estatisticas(self) -> str: """Obter estatísticas do sistema""" return f""" 📊 **Estatísticas do Sistema:** - Total de perguntas: {self.metricas['total_perguntas']} - Tempo de inicialização: {self.metricas['tempo_inicializacao']:.2f}s - Cache hits: {self.metricas['cache_hits']} - Status do índice: {'✅ Ativo' if self.index else '❌ Inativo'} """ # Inicialização global logger.info("🤖 Inicializando N8n Assistant...") bot = N8nChatbotPro() # Configurar sistema logger.info("⚙️ Configurando sistema...") success, message = bot.setup() if success: logger.info(f"✅ Sistema pronto: {message}") else: logger.error(f"❌ Falha na inicialização: {message}") def responder_pergunta(input_text: str) -> str: """Wrapper para interface Gradio""" if not success: return f"❌ Sistema não inicializado: {message}" return bot.chatbot(input_text) def obter_info_sistema() -> str: """Obter informações do sistema""" if not success: return f"❌ Sistema não inicializado: {message}" return bot.obter_estatisticas() # Interface Gradio Premium with gr.Blocks( theme=gr.themes.Soft(), title="N8n Assistant Pro", css=""" .status-success { color: #28a745; font-weight: bold; } .status-error { color: #dc3545; font-weight: bold; } .metrics { background-color: #f8f9fa; padding: 10px; border-radius: 5px; } """ ) as demo: # Cabeçalho gr.Markdown( f""" # 🤖 N8n Assistant Pro Assistente inteligente especializado em **n8n** com documentação oficial atualizada.
**Status do Sistema:** {message}
""", elem_classes=["status-success" if success else "status-error"] ) # Layout principal with gr.Row(): # Coluna da logo with gr.Column(scale=1): gr.Image( value="https://n8n.io/images/n8n-logo.png", width=120, show_label=False, interactive=False, ) # Botão de estatísticas if success: stats_btn = gr.Button("📊 Ver Estatísticas", size="sm") stats_output = gr.Textbox( label="Estatísticas do Sistema", visible=False, lines=6 ) # Coluna principal with gr.Column(scale=4): gr.Markdown( """ ## Como posso ajudar você com o n8n hoje? Digite sua pergunta abaixo e receba respostas baseadas na documentação oficial. """ ) # Área de chat with gr.Row(): with gr.Column(scale=3): input_text = gr.Textbox( label="Sua pergunta sobre n8n", placeholder="Ex: Como criar um workflow no n8n?", lines=3, max_lines=8 ) with gr.Row(): submit_btn = gr.Button("🚀 Perguntar", variant="primary", scale=2) clear_btn = gr.Button("🧹 Limpar", scale=1) with gr.Column(scale=4): output_text = gr.Textbox( label="Resposta do Assistant", placeholder="Sua resposta aparecerá aqui...", lines=15, max_lines=25 ) # Seção de exemplos with gr.Accordion("💡 Exemplos de Perguntas", open=False): gr.Markdown( """ **Workflows e Automação:** - Como criar meu primeiro workflow no n8n? - Quais são as melhores práticas para workflows? - Como usar condições e loops nos workflows? **Nodes e Integrações:** - Para que serve o node HTTP Request? - Como integrar n8n com Google Sheets? - Quais nodes usar para automação de email? **Configuração e Deploy:** - Como configurar webhooks no n8n? - Como fazer deploy do n8n em produção? - Como configurar autenticação OAuth? **Troubleshooting:** - Por que meu workflow não está executando? - Como debugar erros nos nodes? - Como otimizar performance dos workflows? """ ) # Seção de informações técnicas with gr.Accordion("ℹ️ Informações Técnicas", open=False): gr.Markdown( f""" **Sobre este Assistant:** - 🤖 **Modelo**: GPT-3.5-turbo otimizado para n8n - 📚 **Base de conhecimento**: Documentação oficial n8n v2 - ⚡ **Sistema de cache**: Ativo para respostas mais rápidas - 🔄 **Última atualização**: Documentação sincronizada automaticamente - 🛡️ **Tratamento de erros**: Sistema robusto com fallbacks **Status da Inicialização:** - ✅ Configuração OpenAI: {'Sucesso' if success else 'Falha'} - ✅ Download documentação: {'Sucesso' if success else 'Falha'} - ✅ Índice vetorial: {'Ativo' if success else 'Inativo'} - ✅ Cache system: {'Ativo' if success else 'Inativo'} **Métricas de Performance:** {bot.obter_estatisticas() if success else 'Sistema não inicializado'} """ ) # Eventos da interface submit_btn.click( fn=responder_pergunta, inputs=input_text, outputs=output_text ) clear_btn.click( lambda: ("", ""), None, [input_text, output_text] ) # Enter para enviar input_text.submit( fn=responder_pergunta, inputs=input_text, outputs=output_text ) # Botão de estatísticas if success: stats_btn.click( fn=lambda: gr.update(visible=True, value=bot.obter_estatisticas()), outputs=stats_output ) # Configuração de lançamento if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, show_api=False, quiet=False )