Izzak / app.py
caarleexx's picture
Update app.py
5aa1fdc verified
# ╔════════════════════════════════════════════════════════════════════════════╗
# ║ PIPELINE v40: SÍNTESE FINAL COM CHAMADA DIRETA ║
# ╚════════════════════════════════════════════════════════════════════════════╝
#
# ==================== RESUMO TÉCNICO DA PIPELINE ====================
#
# OBJETIVO: Versão final que integra todas as melhorias:
#
# 1. FLUXO DE INICIAÇÃO (v39): UI começa com uma tela de "setup" e só
# exibe o chat após a primeira análise.
# 2. CHAMADA DE INICIALIZAÇÃO DIRETA: O botão "Iniciar" faz uma chamada
# única e direta ao modelo, sem usar o pipeline, para uma configuração
# rápida e eficiente.
# 3. ORQUESTRADOR DE PIPELINE: O chat interativo subsequente utiliza o
# `orquestrador` e o `protocolo.json` para análises multiagente.
# 4. EXTRAÇÃO DE RESPOSTA LIMPA: A lógica do `STOP_PIPELINE` foi aprimorada
# para extrair a mensagem da chave "proximo_passo" do JSON e exibi-la
# de forma limpa ao usuário.
# 5. COMPATIBILIDADE GRADIO: O código é compatível com versões mais antigas
# da biblioteca Gradio, evitando erros comuns.
#
# =====================================================================
import os
import json
import re
import time
from datetime import datetime
import gradio as gr
from groq import Groq
# ==================== 1. CONFIGURAÇÃO ====================
groq_key = os.getenv("GROQ_API_KEY", "SUA_GROQ_KEY_AQUI")
groq_client = Groq(api_key=groq_key)
# Arquivos de persistência e configuração
ARQUIVO_CONFIG = "protocolo.json"
ARQUIVO_HELP = "help.md"
ARQUIVO_CONTEXTO = "contexto_persistente.json"
ARQUIVO_DATA_MD = "data.md"
DELAY_ENTRE_AGENTES = 1
STOP_KEYWORD = "STOP_PIPELINE"
print("🚀 App inicializada - GROQ v40 (Síntese Final)")
print(f" ✅ Groq: {'OK' if groq_key != 'SUA_GROQ_KEY_AQUI' else '⚠️ placeholder'}")
# ==================== 2. UTILIDADES ====================
def estimar_tokens(texto):
return len(str(texto)) // 4
def carregar_protocolo():
try:
with open(ARQUIVO_CONFIG, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"❌ Erro carregar_protocolo: {e}")
return "[]"
def salvar_protocolo(conteudo):
try:
json.loads(conteudo)
with open(ARQUIVO_CONFIG, "w", encoding="utf-8") as f:
f.write(conteudo)
print(f"💾 Protocolo salvo: {len(json.loads(conteudo))} agentes")
return "✅ Protocolo salvo com sucesso"
except Exception as e:
print(f"❌ Erro salvar_protocolo: {e}")
return f"❌ Erro JSON: {str(e)}"
def carregar_help():
try:
with open(ARQUIVO_HELP, "r", encoding="utf-8") as f:
return f.read()
except:
return "# Help não encontrado."
def carregar_contexto_persistente():
try:
with open(ARQUIVO_CONTEXTO, "r", encoding="utf-8") as f:
return json.load(f)
except:
print("📝 Contexto persistente vazio, iniciando novo")
return []
def salvar_contexto_persistente(contexto):
try:
with open(ARQUIVO_CONTEXTO, "w", encoding="utf-8") as f:
json.dump(contexto, f, ensure_ascii=False, indent=2)
print(f"💾 Contexto persistente salvo: {len(contexto)} mensagens")
except Exception as e:
print(f"❌ Erro salvar contexto: {e}")
def limpar_contexto_persistente():
try:
with open(ARQUIVO_CONTEXTO, "w", encoding="utf-8") as f:
json.dump([], f)
print("🗑️ Contexto persistente limpo")
return "✅ Contexto limpo com sucesso"
except Exception as e:
return f"❌ Erro: {str(e)}"
def limitar_timeline(timeline, max_chars=12000, max_msgs=12):
acumulado = 0
selecionadas = []
for msg in reversed(timeline):
texto = str(msg.get("content", ""))
acumulado += len(texto)
if acumulado > max_chars or len(selecionadas) >= max_msgs:
break
selecionadas.append(msg)
selecionadas.reverse()
tokens_est = estimar_tokens(acumulado)
print(f"✂️ Timeline limitada: {len(selecionadas)} msgs, {acumulado} chars (~{tokens_est} tokens)")
return selecionadas, acumulado, tokens_est
def ler_anexo(arquivo):
if arquivo is None:
return ""
try:
with open(arquivo.name, "r", encoding="utf-8") as f:
conteudo = f.read()
print(f"📎 Anexo lido: {os.path.basename(arquivo.name)} ({len(conteudo)} chars)")
return f"\n\n[ANEXO: {os.path.basename(arquivo.name)}]\n{conteudo}\n[FIM ANEXO]\n"
except Exception as e:
print(f"❌ Erro ler_anexo {arquivo.name}: {e}")
return ""
def verificar_stop(texto):
if not texto:
return False
stop_detectado = bool(re.search(r'\b' + re.escape(STOP_KEYWORD) + r'\b', str(texto), re.IGNORECASE))
print(f"🛑 STOP detectado? {stop_detectado} em '{str(texto)[:100]}...'")
return stop_detectado
# ==================== 3. ENGINE DE EXECUÇÃO (GROQ) ====================
def executar_no(timeline, config):
print(f"\n🔥 === EXECUTANDO {config['nome']} ===")
modelo = config.get('modelo', 'groq/compound')
print(f" Modelo Groq: {modelo}")
try:
inicio = time.time()
timeline_limited, _, tokens_est = limitar_timeline(timeline)
messages = []
system_msg = f"AGENTE: {config['nome']}\nMISSÃO: {config['missao']}"
messages.append({"role": "system", "content": system_msg})
tokens_system = estimar_tokens(system_msg)
for msg in timeline_limited:
role = msg.get('role')
if role in ['user', 'assistant']:
content = msg.get('content', '')
if isinstance(content, (dict, list)):
content = json.dumps(content, ensure_ascii=False)
messages.append({"role": role, "content": str(content)})
user_prompt_final = f"Com base em TODO o contexto acima e na missão do agente '{config['nome']}', execute a missão agora e produza APENAS a saída esperada para este agente."
messages.append({"role": "user", "content": user_prompt_final})
tokens_user_final = estimar_tokens(user_prompt_final)
tokens_total = tokens_est + tokens_system + tokens_user_final
print(f"📤 Groq messages: {len(messages)} (última=user, OK), Tokens: ~{tokens_total}")
completion = groq_client.chat.completions.create(
model=modelo, messages=messages, temperature=1,
max_tokens=8192, top_p=1, stream=True, stop=None
)
out_raw = "".join(chunk.choices[0].delta.content or "" for chunk in completion)
tempo_exec = time.time() - inicio
tokens_resposta = estimar_tokens(out_raw)
print(f"📥 OUTPUT GROQ ({len(out_raw)} chars, ~{tokens_resposta} tokens, {tempo_exec:.2f}s):")
print(out_raw[:500] + ("..." if len(out_raw) > 500 else ""))
content = out_raw
if config.get('tipo_saida', 'texto').lower() in ['json', 'jshon']:
try:
cleaned = out_raw.strip()
if cleaned.startswith("```"):
cleaned = re.sub(r'^```json\s*|\s*```$', '', cleaned)
content = json.loads(cleaned)
print("✅ JSON parseado com sucesso")
except Exception as parse_e:
print(f"⚠️ Erro parse JSON: {parse_e}")
content = out_raw
return {"role": "assistant", "agent": config['nome'], "content": content, "raw": out_raw, "tempo": tempo_exec, "tokens_input": tokens_total, "tokens_output": tokens_resposta}, True
except Exception as e:
msg = str(e)
print(f"💥 ERRO GROQ: {msg}")
if "context_length_exceeded" in msg:
erro_amistoso = "STOP: o contexto ficou grande demais. Tente limpar a memória."
else:
erro_amistoso = msg
return {"role": "system", "error": erro_amistoso, "agent": config['nome']}, False
# ==================== 4. ORQUESTRADOR COM CONTEXTO PERSISTENTE (ADUC-SDR) ====================
def orquestrador(texto, anexos_list, history, json_config, contexto_objetivo):
print("\n" + "="*80 + "\n🎬 INICIANDO ORQUESTRADOR (PIPELINE)")
if not texto.strip():
yield history, [], carregar_contexto_persistente()
return
history.append({"role": "user", "content": texto})
try:
protocolo = json.loads(json_config)
except Exception as e:
history.append({"role": "assistant", "content": f"❌ Erro no JSON de Configuração: {str(e)}"})
yield history, [], carregar_contexto_persistente()
return
history.append({"role": "assistant", "content": "Processando..."})
contexto_persistente = carregar_contexto_persistente()
contexto_persistente.append({"role": "user", "content": f"[USUÁRIO] {texto}", "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
contexto_inicial = ""
if contexto_objetivo and contexto_objetivo.strip():
contexto_inicial += f"[OBJETIVO DO MODELO]\n{contexto_objetivo.strip()}\n[FIM OBJETIVO]\n\n"
timeline = [{"role": msg["role"], "content": msg["content"]} for msg in contexto_persistente[:-1]]
timeline.append({"role": "user", "content": f"{contexto_inicial}{texto}".strip()})
audit_data = []
for idx, cfg in enumerate(protocolo):
print(f"\n{'='*50}\n🚀 FASE {idx+1}/{len(protocolo)}: {cfg['nome']}")
history[-1]["content"] = f"⏳ **Agente {idx+1}/{len(protocolo)}: {cfg['nome']}**..."
yield history, audit_data, contexto_persistente
time.sleep(DELAY_ENTRE_AGENTES)
res, sucesso = executar_no(timeline, cfg)
resposta_content = res.get('content', '')
if verificar_stop(str(resposta_content)):
print("🛑 STOP_PIPELINE detectado - extraindo resposta final.")
texto_final = "Análise concluída."
json_data = None
if isinstance(resposta_content, dict):
json_data = resposta_content
elif isinstance(resposta_content, str):
try:
json_match = re.search(r'\{.*\}', resposta_content, re.DOTALL)
if json_match: json_data = json.loads(json_match.group(0))
except json.JSONDecodeError:
texto_final = str(resposta_content).replace(STOP_KEYWORD, "").strip()
if json_data and 'proximo_passo' in json_data and isinstance(json_data['proximo_passo'], str):
texto_final = json_data['proximo_passo']
texto_final = texto_final.replace(STOP_KEYWORD, "").strip()
history[-1]["content"] = texto_final
yield history, audit_data, contexto_persistente
return
if sucesso and resposta_content:
content_to_persist = f"[{cfg['nome']}] {json.dumps(resposta_content, ensure_ascii=False) if not isinstance(resposta_content, str) else resposta_content}"
contexto_persistente.append({"role": "assistant", "agent": cfg['nome'], "content": content_to_persist, "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
timeline.append({"role": "assistant", "content": resposta_content})
salvar_contexto_persistente(contexto_persistente)
else:
erro_msg = res.get("error", "Erro desconhecido.")
history[-1]["content"] = f"❌ **Erro no agente {cfg['nome']}:**\n\n{erro_msg}"
yield history, audit_data, contexto_persistente
return
audit_entry = {k:v for k,v in res.items() if k not in ['raw', 'content']}
audit_entry.update({"step": idx + 1, "agent": cfg['nome'], "model": cfg.get('modelo', 'default'), "response_preview": str(resposta_content)[:100] + "...", "sucesso": sucesso})
audit_data.append(audit_entry)
if idx == len(protocolo) - 1:
texto_final = json.dumps(resposta_content, ensure_ascii=False, indent=2) if not isinstance(resposta_content, str) else str(resposta_content)
history[-1]["content"] = texto_final
yield history, audit_data, contexto_persistente
print("🏁 Pipeline concluída com sucesso\n" + "="*80)
# ==================== 5. UI (GRADIO) ====================
def iniciar_analise(usar_data_md, anexo_inicial, objetivo):
print("\n" + "="*80 + "\n🚀 ANÁLISE INICIAL ACIONADA (CHAMADA DIRETA)")
prompt_inicial_docs = ""
if usar_data_md:
if os.path.exists(ARQUIVO_DATA_MD):
class MockFile:
def __init__(self, name): self.name = name
prompt_inicial_docs += ler_anexo(MockFile(ARQUIVO_DATA_MD))
print(f"✅ Anexado '{ARQUIVO_DATA_MD}'")
else:
print(f"⚠️ Checkbox 'data.md' marcado, mas o arquivo não foi encontrado.")
if anexo_inicial is not None:
prompt_inicial_docs += ler_anexo(anexo_inicial)
if not prompt_inicial_docs:
prompt_inicial_docs = "Nenhum documento foi fornecido para análise."
config_inicializacao = {
"nome": "Agente_Inicializador",
"missao": f"Sua tarefa é analisar o objetivo do usuário e os documentos anexados, fornecer um resumo conciso e descrever os próximos passos sugeridos.\n\nOBJETIVO DO USUÁRIO:\n{objetivo}",
"modelo": "groq/compound"
}
timeline_inicial = [{"role": "user", "content": prompt_inicial_docs}]
res, sucesso = executar_no(timeline_inicial, config_inicializacao)
if sucesso:
resposta_content = res.get('content', 'Não foi possível gerar um resumo inicial.')
history = [
{"role": "user", "content": "Análise iniciada com base nos documentos fornecidos."},
{"role": "assistant", "content": resposta_content}
]
contexto_persistente = carregar_contexto_persistente()
contexto_persistente.append({"role": "user", "content": f"[USUÁRIO] {prompt_inicial_docs}", "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
contexto_persistente.append({"role": "assistant", "agent": config_inicializacao['nome'], "content": f"[{config_inicializacao['nome']}] {resposta_content}", "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
salvar_contexto_persistente(contexto_persistente)
audit_data = [{"step": 1, "agent": config_inicializacao['nome'], "model": config_inicializacao.get('modelo'), "response_preview": str(resposta_content)[:100] + "...", "sucesso": True, "tempo": res.get('tempo'), "tokens_input": res.get('tokens_input'), "tokens_output": res.get('tokens_output')}]
print("✅ Análise inicial (chamada direta) concluída. Exibindo chatbot.")
return history, audit_data, contexto_persistente, gr.update(visible=False), gr.update(visible=True)
else:
erro_msg = res.get("error", "Ocorreu um erro desconhecido durante a inicialização.")
history = [{"role": "user", "content": "Tentativa de iniciar análise."}, {"role": "assistant", "content": f"❌ **Falha na Inicialização:**\n\n{erro_msg}"}]
return history, [], [], gr.update(visible=True), gr.update(visible=False)
def ui_clean():
config_init = carregar_protocolo()
help_init = carregar_help()
with gr.Blocks(title="AI Forensics - Groq (v40)") as app:
with gr.Tabs():
with gr.Tab("💬 Investigação"):
with gr.Column(visible=True) as start_view:
gr.Markdown("## Iniciar Nova Análise\nConfigure os dados iniciais. Após iniciar, esta tela será substituída pelo chat interativo.")
cb_data_md = gr.Checkbox(label=f"Anexar arquivo de dados principal ({ARQUIVO_DATA_MD})", value=True)
upload_inicial = gr.File(label="Anexar arquivo de texto adicional (Opcional)", file_types=[".txt", ".md", ".json"])
btn_iniciar = gr.Button("🚀 Iniciar Análise", variant="primary")
with gr.Column(visible=False) as chat_view:
gr.Markdown("## Investigador AI (Orquestração ADUC-SDR)")
chatbot = gr.Chatbot(label="Histórico Conversacional", height=500, value=[])
with gr.Row():
txt_in = gr.Textbox(show_label=False, placeholder="Digite sua mensagem...", lines=2, scale=9)
btn_send = gr.Button("📤 Enviar", variant="primary", scale=1)
with gr.Tab("🎯 Objetivo & Configs"):
objetivo_text = gr.Textbox(
label="Objetivo do Modelo (System Prompt Global)",
value="Voce é um agente chamado IndenizaAI...",
placeholder="Ex: Você é um analista forense imparcial...",
lines=8
)
with gr.Tab("🧠 Contexto Persistente"):
contexto_display = gr.JSON(label="Contexto Persistente", value=carregar_contexto_persistente())
with gr.Row():
btn_reload_ctx = gr.Button("🔄 Recarregar", size="sm")
btn_limpar_ctx = gr.Button("🗑️ Limpar Contexto", size="sm", variant="stop")
status_ctx = gr.Markdown("")
btn_reload_ctx.click(lambda: carregar_contexto_persistente(), outputs=contexto_display)
def limpar_e_recarregar():
msg = limpar_contexto_persistente()
return carregar_contexto_persistente(), msg
btn_limpar_ctx.click(limpar_e_recarregar, outputs=[contexto_display, status_ctx])
with gr.Tab("⚙️ Protocolo"):
with gr.Row():
btn_save_proto = gr.Button("💾 Salvar", variant="primary", size="sm")
btn_reload_proto = gr.Button("🔄 Recarregar", size="sm")
proto_status = gr.Markdown("")
code_json = gr.Code(value=config_init, language="json", lines=30)
btn_save_proto.click(salvar_protocolo, code_json, proto_status)
btn_reload_proto.click(lambda: carregar_protocolo(), outputs=code_json)
with gr.Tab("🔍 Auditoria"):
audit_display = gr.JSON(label="Dados de Auditoria", value=[])
with gr.Tab("❓ Ajuda"):
help_content = gr.Markdown(help_init)
btn_reload_help = gr.Button("🔄 Recarregar Help")
btn_reload_help.click(lambda: carregar_help(), outputs=help_content)
btn_iniciar.click(
fn=iniciar_analise,
inputs=[cb_data_md, upload_inicial, objetivo_text],
outputs=[chatbot, audit_display, contexto_display, start_view, chat_view]
)
btn_send.click(
orquestrador,
[txt_in, gr.State([]), chatbot, code_json, objetivo_text],
[chatbot, audit_display, contexto_display]
).then(lambda: "", outputs=txt_in)
txt_in.submit(
orquestrador,
[txt_in, gr.State([]), chatbot, code_json, objetivo_text],
[chatbot, audit_display, contexto_display]
).then(lambda: "", outputs=txt_in)
return app
if __name__ == "__main__":
print("🎉 Lançando app Groq v40 (Síntese Final)...")
ui_clean().launch(css="footer{display:none!important;}")