ToM / app.4py
caarleexx's picture
Rename app.py to app.4py
85d4632 verified
# ╔════════════════════════════════════════════════════════════════════════════╗
# ║ PIPELINE v43: FRAG + VISÃO PAGINADA + CONFEXT_UPLOAD + PARSE ROBUSTO ║
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
import os
import json
import time
from datetime import datetime
import gradio as gr
import google.generativeai as genai
import pypdf # pip install pypdf
# ==================== 1. CONFIGURAÇÃO ====================
api_key = os.getenv("GOOGLE_API_KEY", "SUA_API_KEY_AQUI")
if api_key:
genai.configure(api_key=api_key)
model_flash = genai.GenerativeModel("gemini-flash-latest")
model_pro = genai.GenerativeModel("gemini-pro-latest")
ARQUIVO_CONFIG = "protocolo_fragmentacao_visao-3.json"
# ==================== 2. UTILIDADES ====================
def log_point(msg, logs):
ts = datetime.now().strftime("%H:%M:%S")
return logs + f"[{ts}] {msg}\n"
def carregar_protocolo():
try:
with open(ARQUIVO_CONFIG, "r", encoding="utf-8") as f:
return f.read()
except:
# fallback mΓ­nimo vΓ‘lido
proto = [
{
"nome": "PAGINADOR_VISUAL",
"missao": (
"VocΓͺ recebe o texto bruto de um conjunto de pΓ‘ginas de um PDF. "
"Separe por pΓ‘gina e devolva uma lista JSON com objetos "
"{'pagina','transcricao_fiel','descricao_visual'}."
"Retorne APENAS essa lista JSON, sem texto extra."
),
"tipo_saida": "json",
"modelo": "flash",
}
]
return json.dumps(proto, ensure_ascii=False, indent=2)
def salvar_protocolo(conteudo):
try:
json.loads(conteudo)
with open(ARQUIVO_CONFIG, "w", encoding="utf-8") as f:
f.write(conteudo)
return "βœ… Salvo"
except:
return "❌ Erro JSON"
# --------- DIVISÃO BURRA COM TEXTO REAL + LOGS ---------
def ler_anexo_e_fragmentar(arquivo, paginas_por_fragmento=5, logs=""):
logs = log_point("ler_anexo_e_fragmentar() chamado", logs)
if arquivo is None:
logs = log_point("Nenhum arquivo recebido", logs)
return [], "", logs
filename = getattr(arquivo, "name", arquivo)
logs = log_point(f"Arquivo recebido: {filename}", logs)
if not os.path.exists(filename):
msg = f"Arquivo nΓ£o encontrado: {filename}"
logs = log_point(msg, logs)
return [], f"[ERRO: {msg}]", logs
anexo_info = f"[PDF: {os.path.basename(filename)}]"
if not filename.lower().endswith(".pdf"):
logs = log_point("Arquivo nΓ£o Γ© PDF; tratado como texto simples", logs)
return [f"[ARQUIVO_TEXTO: {os.path.basename(filename)}]"], anexo_info, logs
try:
reader = pypdf.PdfReader(filename)
total_pages = len(reader.pages)
logs = log_point(f"PDF com {total_pages} pΓ‘ginas", logs)
fragments = []
for i in range(0, total_pages, paginas_por_fragmento):
start = i + 1
end = min(i + paginas_por_fragmento, total_pages)
bloco_texto = ""
for p in range(i, end):
try:
t = reader.pages[p].extract_text() or ""
except Exception as e:
t = f"\n[ERRO_EXTRACT_PAG_{p+1}: {e}]\n"
bloco_texto += f"\n=== PAGINA {p+1}/{total_pages} ===\n{t}\n"
fragment = (
f"=== FRAG {i//paginas_por_fragmento + 1} "
f"(PÁGS {start}-{end}/{total_pages}) ===\n"
f"{bloco_texto.strip()}"
)
fragments.append(fragment)
logs = log_point(
f"Fragmento {i//paginas_por_fragmento + 1} criado (pags {start}-{end})",
logs,
)
logs = log_point(f"Total de fragmentos: {len(fragments)}", logs)
return fragments, anexo_info, logs
except Exception as e:
logs = log_point(f"ERRO PDF: {e}", logs)
return [f"[ERRO PDF: {str(e)}]"], anexo_info, logs
# ==================== 3. ENGINE DE EXECUÇÃO ====================
def _extrair_json_possivel(out_raw: str) -> str:
"""
Tenta isolar sΓ³ o bloco JSON de uma resposta que pode ter texto extra.
Procura o primeiro 'json.
"""
cleaned = out_raw.strip()
idx_abre_col = cleaned.find("
# menor Γ­ndice vΓ‘lido
candidatos = [i for i in [idx_abre_col, idx_abre_obj] if i != -1]
if candidatos:
start = min(candidatos)
cleaned = cleaned[start:]
cleaned = cleaned.replace("```json", "").replace("```")
return cleaned
def executar_no(timeline, config, fragmento_input=None, logs=""):
logs = log_point(f"executar_no({config['nome']}) chamado", logs)
modo = "input_fragmento" if fragmento_input is not None else "timeline"
logs = log_point(f"Modo de entrada: {modo}", logs)
modelo = model_pro if config.get("modelo") == "pro" else model_flash
if fragmento_input is not None:
input_para_prompt = fragmento_input
else:
input_para_prompt = json.dumps(timeline, ensure_ascii=False, indent=2)
prompt = (
"--- INPUT PARA O AGENTE ---\n"
f"{input_para_prompt}\n"
"----------------\n"
f"AGENTE: {config['nome']}\n"
f"MISSÃO: {config['missao']}"
)
try:
inicio = time.time()
logs = log_point("Chamando modelo.generate_content()", logs)
resp = modelo.generate_content(prompt)
out = resp.text or ""
tempo = time.time() - inicio
logs = log_point(f"Tempo de geraΓ§Γ£o: {tempo:.2f}s", logs)
logs = log_point(f"SaΓ­da bruta (120 chars): {out[:120]!r}", logs)
if config["tipo_saida"] == "json":
cleaned = _extrair_json_possivel(out)
logs = log_point(f"Trecho candidato a JSON (120): {cleaned[:120]!r}", logs)
try:
content = json.loads(cleaned)
except Exception as e:
content = []
logs = log_point(f"ERRO JSON parse: {e}", logs)
else:
content = out
logs = log_point("executar_no() concluΓ­do com sucesso", logs)
return {"role": "assistant", "agent": config["nome"], "content": content}, logs, out
except Exception as e:
logs = log_point(f"ERRO em executar_no: {e}", logs)
return {"role": "system", "error": str(e)}, logs, str(e)
# ==================== 4. ORQUESTRADOR ====================
def orquestrador(texto, arquivo, history, json_config, confext_state):
logs = f"πŸš€ START: {datetime.now().strftime('%H:%M:%S')}\n"
logs = log_point("orquestrador() iniciado", logs)
logs = log_point(f"Texto len={len(texto or '')}", logs)
fragmentos, anexo_info, logs = ler_anexo_e_fragmentar(
arquivo, paginas_por_fragmento=5, logs=logs
)
logs = log_point(f"Qtd fragmentos apΓ³s leitura: {len(fragmentos)}", logs)
if not texto and not fragmentos:
logs = log_point("Sem texto e sem fragmentos; encerrando", logs)
yield history, {}, logs, confext_state
return
history = history + [[texto + (" πŸ“Ž" if arquivo else ""), None]]
try:
protocolo = json.loads(json_config)
logs = log_point("Protocolo JSON carregado", logs)
except Exception as e:
history[-1] = "❌ Erro no JSON de Configuração."[3]
logs = log_point(f"ERRO carregando protocolo: {e}", logs)
yield history, {}, logs, confext_state
return
timeline = [{"role": "user", "content": texto}]
confext_upload = {
"arquivo": os.path.basename(getattr(arquivo, "name", "sem_arquivo"))
if arquivo else None,
"meta": anexo_info,
"paginas": []
}
logs = log_point(
f"confext_upload inicializado para arquivo={confext_upload['arquivo']}",
logs,
)
if fragmentos:
history[-1] = "⏳ Fragmentando + visão paginada..."[3]
logs = log_point("Fragmentos disponΓ­veis; iniciando visΓ£o paginada", logs)
yield history, timeline, logs, confext_upload
# PASSO PAGINADOR_VISUAL (primeiro agente, se existir)
if protocolo and fragmentos:
cfg_visao = protocolo
logs = log_point(f"Agente de visΓ£o selecionado: {cfg_visao['nome']}", logs)
for i, fragmento in enumerate(fragmentos):
history[-1] = f"πŸ‘οΈ {cfg_visao['nome']} frag {i+1}/{len(fragmentos)}..."[3]
logs = log_point(f"Enviando frag {i+1}", logs)
yield history, timeline, logs, confext_upload
res, logs, raw = executar_no(
timeline, cfg_visao, fragmento_input=fragmento, logs=logs
)
if "error" in res:
logs = log_point(f"Erro no frag {i+1}: {res['error']}", logs)
continue
try:
paginas_res = res["content"]
if isinstance(paginas_res, dict):
paginas_res = [paginas_res]
antes = len(confext_upload["paginas"])
for p in paginas_res:
confext_upload["paginas"].append(p)
depois = len(confext_upload["paginas"])
logs = log_point(
f"Frag {i+1} adicionou {depois-antes} pΓ‘ginas; total={depois}",
logs,
)
except Exception as e:
logs = log_point(f"Falha ao anexar pΓ‘ginas do frag {i+1}: {e}", logs)
logs = log_point(
f"VisΓ£o paginada concluΓ­da; paginas={len(confext_upload['paginas'])}",
logs,
)
timeline.append({
"role": "system",
"agent": "CONFEXT_UPLOAD",
"content": confext_upload
})
logs = log_point("CONFEXT_UPLOAD injetado na timeline", logs)
restante = protocolo[1:] if protocolo else []
final_response = ""
for cfg in restante:
history[-1] = f"βš™οΈ {cfg['nome']}..."[3]
logs = log_point(f"Iniciando passo adicional: {cfg['nome']}", logs)
yield history, timeline, logs, confext_upload
res, logs, raw = executar_no(timeline, cfg, fragmento_input=None, logs=logs)
timeline.append(res)
if cfg["tipo_saida"] == "texto":
final_response = res["content"]
history[-1] = final_response[3]
logs = log_point(f"Passo {cfg['nome']} produziu texto final", logs)
yield history, timeline, logs, confext_upload
if not restante and not texto:
history[-1] = "βœ… PDF processado. Pronto para perguntas usando confext_upload."[3]
final_response = history[-1][3]
logs = log_point("Nenhum passo adicional; apenas prΓ©-processamento", logs)
logs = log_point("FIM orquestrador()", logs)
yield history, timeline, logs, confext_upload
# ==================== 5. UI ====================
def ui_clean():
css = """
footer {display: none !important;}
.contain {border: none !important;}
"""
config_init = carregar_protocolo()
with gr.Blocks(title="AI Forensics – VisΓ£o Paginada", css=css, theme=gr.themes.Soft()) as app:
confext_state = gr.State(value=None)
with gr.Tabs():
with gr.Tab("πŸ’¬ Investigador"):
chatbot = gr.Chatbot(
label="",
show_label=False,
height=600,
show_copy_button=True,
render_markdown=True,
)
with gr.Row():
with gr.Column(scale=10):
txt_in = gr.Textbox(
show_label=False,
placeholder="Descreva o caso ou faΓ§a perguntas (opcional apΓ³s upload)...",
lines=1,
max_lines=5,
container=False,
)
with gr.Column(scale=1, min_width=50):
file_in = gr.UploadButton(
"πŸ“Ž",
file_types=[".txt", ".md", ".csv", ".json", ".pdf"],
size="sm",
)
with gr.Column(scale=1, min_width=80):
btn_send = gr.Button("Enviar", variant="primary", size="sm")
file_status = gr.Markdown("", visible=True)
def _on_upload(x):
nome = os.path.basename(getattr(x, "name", x))
print("[DEBUG] upload arquivo:", nome)
return f"πŸ“Ž Anexo recebido: {nome}"
file_in.upload(
_on_upload,
inputs=file_in,
outputs=file_status,
)
with gr.Tab("πŸ•΅οΈ DepuraΓ§Γ£o"):
with gr.Row():
out_dna = gr.JSON(label="DNA (Timeline)")
out_logs = gr.Textbox(label="Logs do Sistema", lines=20)
confext_view = gr.JSON(label="confext_upload")
with gr.Tab("βš™οΈ Config"):
with gr.Row():
btn_save = gr.Button("Salvar Config")
lbl_save = gr.Label(show_label=False)
code_json = gr.Code(value=config_init, language="json", label=ARQUIVO_CONFIG)
btn_save.click(salvar_protocolo, code_json, lbl_save)
def _orq_wrapper(texto, arquivo, history, json_cfg, confext_old):
print(
"[DEBUG] _orq_wrapper disparado",
"len_texto=", len(texto or ""),
"arquivo=", getattr(arquivo, "name", None),
)
for h, dna, logs, confext_new in orquestrador(
texto, arquivo, history, json_cfg, confext_old
):
yield h, dna, logs, confext_new
triggers = [btn_send.click, txt_in.submit]
for trig in triggers:
trig(
_orq_wrapper,
inputs=[txt_in, file_in, chatbot, code_json, confext_state],
outputs=[chatbot, out_dna, out_logs, confext_state],
).then(
lambda c: (
print(
"[DEBUG] pΓ³s-envio; paginas_confext=",
0 if not c else len(c.get("paginas", [])),
),
None,
None,
"",
c,
)[1:],
inputs=confext_state,
outputs=[txt_in, file_in, file_status, confext_state],
).then(
lambda c: c,
inputs=confext_state,
outputs=confext_view,
)
return app
if __name__ == "__main__":
ui_clean().launch()