import os import json import time import hashlib from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed import gradio as gr import google.generativeai as genai # Dependências para PDF try: import PyPDF2 PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False print("⚠️ PyPDF2 não instalado. Instale com: pip install PyPDF2") # ==================== 1. CONFIGURAÇÃO ==================== api_key = os.getenv("GOOGLE_API_KEY", "SUA_API_KEY_AQUI") if api_key and api_key != "SUA_API_KEY_AQUI": genai.configure(api_key=api_key) # Modelos do Gemini model_flash = genai.GenerativeModel("gemini-flash-latest") model_pro = genai.GenerativeModel("gemini-flash-latest") ARQUIVO_CONFIG = "protocolo.json" PASTA_TRANSCRICOES = "transcricoes" PAGES_PER_CHUNK = 10 MAX_WORKERS = 5 # Limite de chamadas paralelas os.makedirs(PASTA_TRANSCRICOES, exist_ok=True) # ==================== 2. UTILIDADES ==================== def carregar_protocolo(): """ Carrega o protocolo. Se não existir, cria um com exemplo de STOP. """ try: with open(ARQUIVO_CONFIG, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: # Protocolo padrão que inclui um agente com a lógica de pergunta ao usuário protocolo_padrao = [ {"nome": "Leitor Inicial", "modelo": "flash", "missao": "Leia o contexto e resuma os fatos principais em 3 a 5 pontos."}, { "nome": "Advogado de Acusação", "modelo": "pro", "missao": "Com base nos fatos, formule uma pergunta crucial para o usuário para fortalecer um caso. Sua resposta DEVE ser APENAS um JSON no formato: {\"tipo\": \"pergunta_usuario\", \"pergunta\": \"Sua pergunta aqui\"}" }, {"nome": "Analista Final", "modelo": "pro", "missao": "Considere a resposta do usuário e os fatos iniciais para dar um parecer final sobre o caso."} ] return json.dumps(protocolo_padrao, indent=2) def salvar_protocolo(conteudo): try: json.loads(conteudo) with open(ARQUIVO_CONFIG, "w", encoding="utf-8") as f: f.write(conteudo) return "✅ Protocolo salvo com sucesso!" except Exception as e: return f"❌ Erro ao salvar: {str(e)}" def limpar_nome_arquivo(nome): nome_base = os.path.basename(nome) nome_limpo = "".join([c for c in nome_base if c.isalnum() or c in (' ', '.', '_', '-')]).strip() return nome_limpo + ".json" def extrair_texto_pdf(caminho_pdf): # (Implementação existente, sem alterações) try: with open(caminho_pdf, 'rb') as f: reader = PyPDF2.PdfReader(f) paginas = [] for i, page in enumerate(reader.pages): texto = page.extract_text() paginas.append({ "numero": i + 1, "texto": texto, "metadata": str(page)[:200] }) return paginas, None except Exception as e: return None, str(e) def fragmentar_pdf(paginas, tamanho_chunk=PAGES_PER_CHUNK): # (Implementação existente, sem alterações) chunks = [] for i in range(0, len(paginas), tamanho_chunk): chunk = paginas[i:i + tamanho_chunk] num_inicio = chunk[0]["numero"] num_fim = chunk[-1]["numero"] texto_consolidado = "\n---QUEBRA DE PÁGINA---\n".join( [f"[PÁGINA {p['numero']}]\n{p['texto']}" for p in chunk] ) chunks.append({ "id": f"chunk_{num_inicio}_{num_fim}", "paginas": f"{num_inicio}-{num_fim}", "num_paginas": len(chunk), "texto": texto_consolidado, "metadata": [p["metadata"] for p in chunk] }) return chunks def processar_pdf_completo(arquivo_pdf): # (Implementação existente, sem alterações) if not PDF_SUPPORT: return None, "❌ PyPDF2 não disponível" try: paginas, erro = extrair_texto_pdf(arquivo_pdf.name if hasattr(arquivo_pdf, 'name') else arquivo_pdf) if erro: return None, f"❌ Erro ao ler PDF: {erro}" chunks = fragmentar_pdf(paginas) nome_arquivo = os.path.basename(arquivo_pdf.name if hasattr(arquivo_pdf, 'name') else arquivo_pdf) return { "arquivo": nome_arquivo, "total_paginas": len(paginas), "total_chunks": len(chunks), "chunks": chunks, "tipo": "pdf" }, None except Exception as e: return None, f"❌ Erro no processamento: {str(e)}" def ler_arquivo_texto(arquivo): # (Implementação existente, sem alterações) if arquivo is None: return None try: with open(arquivo.name, "r", encoding="utf-8") as f: conteudo = f.read() return { "arquivo": os.path.basename(arquivo.name), "conteudo": conteudo, "tipo": "texto" } except: return None # ==================== 3. PIPELINE DE IA ==================== def transcrever_chunk(chunk_data, config_agentes): # (Implementação existente, sem alterações) modelo = model_flash try: if config_agentes and isinstance(config_agentes, list): if config_agentes[0].get("modelo") == "pro": modelo = model_pro except: pass prompt = f""" ANÁLISE DE DOCUMENTO (OCR/LEITURA): Transcreva e estruture o conteúdo das páginas {chunk_data['paginas']}. Texto extraído: {chunk_data['texto']} Retorne JSON: {{ "transcricao": "...", "objetos": ["..."], "resumo": "..." }} """ try: for tentativa in range(3): try: resposta = modelo.generate_content(prompt) texto_resp = resposta.text.replace("```json", "").replace("```", "") return json.loads(texto_resp.strip()), None except Exception as inner_e: if "429" in str(inner_e): time.sleep(2 * (tentativa + 1)) continue raise inner_e except Exception as e: return None, str(e) # ==================== 4. GERENCIADOR DE ARQUIVOS ==================== class GerenciadorArquivos: # (Implementação existente, sem alterações) def __init__(self): self.arquivos = {} def adicionar(self, arquivo, arquivo_id): self.arquivos[arquivo_id] = { "arquivo": arquivo, "nome": os.path.basename(arquivo.name), "status": "adicionado", "processado": None, "transcricao": None } def gerar_prompt_com_transcricoes(self, texto_usuario): prompt = texto_usuario + "\n\n--- CONTEXTO DOS ARQUIVOS ---\n" count = 0 for _, item in self.arquivos.items(): if item["status"] == "processado" and item["transcricao"]: count += 1 trans = item["transcricao"] nome = item["nome"] prompt += f"\n[ARQUIVO: {nome}]\n" if isinstance(trans, dict) and "chunks_processados" in trans: for chunk in trans["chunks_processados"]: if chunk.get("status") == "OK": resumo = chunk.get('resumo', '') resumo = str(resumo) if resumo else "" prompt += f"Páginas {chunk['paginas']}: {resumo}\n" texto_full = chunk.get('transcricao', '') if texto_full: texto_seguro = str(texto_full) prompt += f"Trecho: {texto_seguro[:400]}...\n" else: prompt += "Trecho: (vazio)\n" elif isinstance(trans, dict) and "conteudo" in trans: conteudo = str(trans['conteudo']) prompt += f"Conteúdo: {conteudo[:1000]}...\n" if count == 0: prompt += "(Nenhum arquivo processado ainda)" return prompt # Instância Global gerenciador = GerenciadorArquivos() # ==================== 5. FUNÇÕES DE ORQUESTRAÇÃO ==================== def automacao_upload_processamento(files, history, config_json): # (Implementação existente, sem alterações) if not files: return history try: config_agentes = json.loads(config_json) except: config_agentes = [] if history is None: history = [] history.append([None, f"📂 **SISTEMA:** Recebi {len(files)} arquivo(s). Verificando cache e processando..."]) yield history ids_para_processar = [] for f in files: arquivo_id = f"arq_{int(time.time()*1000)}_{f.name}" gerenciador.adicionar(f, arquivo_id) ids_para_processar.append(arquivo_id) for arq_id in ids_para_processar: item = gerenciador.arquivos[arq_id] nome = item["nome"] nome_cache = limpar_nome_arquivo(nome) caminho_cache = os.path.join(PASTA_TRANSCRICOES, nome_cache) if os.path.exists(caminho_cache): try: with open(caminho_cache, "r", encoding="utf-8") as cache_file: dados_cache = json.load(cache_file) item["transcricao"] = dados_cache item["status"] = "processado" if nome.lower().endswith('.pdf') and "chunks_processados" in dados_cache: item["processado"] = {"tipo": "pdf", "chunks": []} history.append([None, f"♻️ **Cache Encontrado:** `{nome}` já foi processado. Carregando..."]) yield history continue except Exception as e: history.append([None, f"⚠️ Erro cache `{nome}`: {e}. Reprocessando..."]) history.append([None, f"⚙️ **Processando:** `{nome}`..."]) yield history if nome.lower().endswith('.pdf'): if not PDF_SUPPORT: history.append([None, f"❌ Erro em `{nome}`: Biblioteca PDF ausente."]) yield history continue pdf_proc, erro = processar_pdf_completo(item["arquivo"]) if erro: history.append([None, f"❌ Erro em `{nome}`: {erro}"]) yield history continue item["processado"] = pdf_proc chunks = pdf_proc["chunks"] total_chunks = len(chunks) chunks_ordenados = [None] * total_chunks history.append([None, f"📄 `{nome}` fragmentado em {total_chunks} partes. Iniciando IA (Paralelo: {MAX_WORKERS} threads)..."]) yield history with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures_map = {} for i, chunk in enumerate(chunks): future = executor.submit(transcrever_chunk, chunk, config_agentes) futures_map[future] = i concluidos = 0 for future in as_completed(futures_map): index_original = futures_map[future] res, err = future.result() if err: chunks_ordenados[index_original] = {"status": "ERRO", "paginas": chunks[index_original]["paginas"]} else: chunks_ordenados[index_original] = { "status": "OK", "paginas": chunks[index_original]["paginas"], "transcricao": res.get("transcricao"), "resumo": res.get("resumo") } concluidos += 1 if concluidos % 2 == 0 or concluidos == total_chunks: msg_base = f"📄 `{nome}`: Processando partes... ({concluidos}/{total_chunks})" history[-1][1] = msg_base yield history dados_finais = { "arquivo": nome, "data_processamento": str(datetime.now()), "chunks_processados": chunks_ordenados } item["transcricao"] = dados_finais item["status"] = "processado" try: with open(caminho_cache, "w", encoding="utf-8") as f_out: json.dump(dados_finais, f_out, indent=2, ensure_ascii=False) history.append([None, f"💾 `{nome}` processado e salvo no cache."]) except Exception as e: history.append([None, f"⚠️ Erro ao salvar cache: {e}"]) yield history else: res = ler_arquivo_texto(item["arquivo"]) if res: item["processado"] = res dados_finais = {"conteudo": res["conteudo"], "data_processamento": str(datetime.now())} item["transcricao"] = dados_finais item["status"] = "processado" with open(caminho_cache, "w", encoding="utf-8") as f_out: json.dump(dados_finais, f_out, indent=2, ensure_ascii=False) history.append([None, f"✅ `{nome}` (Texto) lido e salvo."]) else: history.append([None, f"❌ Falha ao ler `{nome}`."]) yield history history.append([None, "🏁 **Processamento de lote finalizado.** Os arquivos estão prontos para análise."]) yield history def chat_orquestrador(message, history, config_json, pipeline_state): """ Orquestra a conversa. Pode iniciar uma nova pipeline ou continuar uma que foi pausada. """ # --- LÓGICA DE CONTINUAÇÃO --- if pipeline_state.get("is_paused"): history.append([message, None]) # Recupera o estado timeline_execucao = pipeline_state["timeline"] agentes_restantes = pipeline_state["remaining_agents"] # Adiciona a resposta do usuário à trilha de auditoria timeline_execucao.append({ "passo": len(timeline_execucao) + 1, "tipo": "resposta_usuario", "conteudo": message }) # Reseta o estado para evitar loops pipeline_state["is_paused"] = False # Continua a execução do ponto onde parou yield from executar_pipeline(history, timeline_execucao, agentes_restantes, pipeline_state) return # --- LÓGICA DE INÍCIO DE UMA NOVA CONVERSA --- try: prompt_contexto = gerenciador.gerar_prompt_com_transcricoes(message) except Exception as e: history.append([message, f"❌ Erro ao gerar contexto: {str(e)}"]) yield history, [], pipeline_state return try: protocolo = json.loads(config_json) except: history.append([message, "❌ Erro no JSON de Configuração do Protocolo."]) yield history, [], pipeline_state return history.append([message, None]) # Inicia uma nova trilha de auditoria timeline_execucao = [{"passo": 1, "tipo": "prompt_usuario", "conteudo": prompt_contexto}] yield history, timeline_execucao, pipeline_state # Inicia a execução com todos os agentes do protocolo yield from executar_pipeline(history, timeline_execucao, protocolo, pipeline_state) def executar_pipeline(history, timeline_execucao, agentes_a_executar, pipeline_state): """ Função core que executa a lista de agentes em sequência. Pode ser pausada se um agente pedir input do usuário. """ passo_atual = len(timeline_execucao) + 1 for i, cfg in enumerate(agentes_a_executar): time.sleep(3) nome_agente = cfg.get("nome", "Agente") modelo_agente = model_pro if cfg.get("modelo") == "pro" else model_flash msg_atual = history[-1][1] or "" history[-1][1] = msg_atual + f"⏳ **{nome_agente}** está analisando...\n" yield history, timeline_execucao, pipeline_state prompt_agente = f""" --- HISTÓRICO DA CONVERSA ATÉ AGORA --- {json.dumps(timeline_execucao, ensure_ascii=False, indent=2)} ----------------- Sua Identidade: {nome_agente} Sua Missão Específica Agora: {cfg['missao']} Responda de forma concisa e direta, focando apenas na sua missão. """ try: inicio = time.time() resp = modelo_agente.generate_content(prompt_agente) texto_resp = resp.text duracao = time.time() - inicio # --- LÓGICA DE PAUSA (STOP) --- try: # Tenta interpretar a resposta como JSON para verificar se é uma pergunta resposta_json = json.loads(texto_resp) if resposta_json.get("tipo") == "pergunta_usuario": pergunta = resposta_json.get("pergunta", "Não foi possível extrair a pergunta.") # Salva o estado atual da pipeline pipeline_state["is_paused"] = True pipeline_state["timeline"] = timeline_execucao # Salva os agentes que AINDA NÃO rodaram pipeline_state["remaining_agents"] = agentes_a_executar[i+1:] # Adiciona a pergunta à auditoria e ao chat timeline_execucao.append({"passo": passo_atual, "tipo": "pergunta_agente", "agente": nome_agente, "pergunta": pergunta}) msg_atual = history[-1][1].replace(f"⏳ **{nome_agente}** está analisando...\n", "") history[-1][1] = msg_atual + f"**{nome_agente}** precisa de mais informações:\n\n> *{pergunta}*\n\nAguardando sua resposta na caixa de texto abaixo..." # Encerra a execução atual e aguarda o usuário yield history, timeline_execucao, pipeline_state return # Sai da função except (json.JSONDecodeError, TypeError): # Se não for um JSON de pergunta, é uma resposta normal pass # --------------------------- timeline_execucao.append({"passo": passo_atual, "tipo": "resposta_agente", "agente": nome_agente, "resposta": texto_resp}) msg_atual = history[-1][1].replace(f"⏳ **{nome_agente}** está analisando...\n", "") # Não exibe o conteúdo da resposta do modelo no chat, apenas a confirmação novo_trecho = f"✅ **[{nome_agente}]** concluiu sua análise em ({duracao:.1f}s).\n" history[-1][1] = msg_atual + novo_trecho yield history, timeline_execucao, pipeline_state except Exception as e: timeline_execucao.append({"passo": passo_atual, "tipo": "erro_agente", "agente": nome_agente, "erro": str(e)}) msg_atual = history[-1][1] history[-1][1] = msg_atual.replace(f"⏳ **{nome_agente}** está analisando...\n", "") + f"\n❌ Erro em {nome_agente}: {str(e)}\n" yield history, timeline_execucao, pipeline_state passo_atual += 1 # ==================== 6. UI (Gradio) ==================== def ui_v29_stop_logic(): css = """ footer {display: none !important;} .contain {border: none !important;} """ config_inicial = carregar_protocolo() with gr.Blocks(title="AI Forensics Auto", css=css, theme=gr.themes.Soft()) as app: # Estado da configuração dos agentes state_config = gr.State(config_inicial) # NOVO: Estado para controlar a pausa/continuação da pipeline pipeline_state = gr.State({"is_paused": False, "timeline": [], "remaining_agents": []}) with gr.Tabs(): with gr.Tab("💬 Investigação"): chatbot = gr.Chatbot( height=400, show_label=False, show_copy_button=True, render_markdown=True, label="Chat de Investigação" ) with gr.Row(): txt_input = gr.Textbox( scale=8, show_label=False, placeholder="Digite sua instrução ou responda à pergunta do agente...", lines=1 ) btn_enviar = gr.Button("Enviar 📨", variant="primary", scale=1) with gr.Accordion("📂 Adicionar Arquivos para Análise", open=False): gr.Markdown("Selecione arquivos (PDF, TXT). A transcrição iniciará **automaticamente**.") file_uploader = gr.File( file_count="multiple", file_types=[".pdf", ".txt", ".json", ".md"], label="Arraste arquivos aqui ou clique para selecionar" ) with gr.Tab("🕵️ Auditoria"): gr.Markdown("### Trilha de Auditoria\nExibe o histórico completo de prompts e respostas de cada agente na última execução.") json_audit = gr.JSON(label="Timeline da Execução da Última Mensagem") with gr.Tab("⚙️ Contexto & Config"): gr.Markdown("### Protocolo dos Agentes\nDefina a sequência e as missões dos agentes de IA. Para pausar e pedir input, use a missão de exemplo para o agente retornar um JSON específico.") with gr.Row(): btn_save_cfg = gr.Button("💾 Salvar Alterações") lbl_cfg_status = gr.Label(show_label=False) code_config = gr.Code(value=config_inicial, language="json", label="protocolo.json") btn_save_cfg.click(salvar_protocolo, inputs=[code_config], outputs=[lbl_cfg_status]) # Atualiza o state_config em memória após salvar btn_save_cfg.click(lambda x: x, inputs=[code_config], outputs=[state_config]) # Ação de clique agora passa o pipeline_state para o orquestrador btn_enviar.click( chat_orquestrador, inputs=[txt_input, chatbot, state_config, pipeline_state], outputs=[chatbot, json_audit, pipeline_state] # Atualiza o estado da pipeline ).then( lambda: "", outputs=[txt_input] ) file_uploader.upload( automacao_upload_processamento, inputs=[file_uploader, chatbot, state_config], outputs=[chatbot] ) return app if __name__ == "__main__": ui_v29_stop_logic().launch()