ToM / app.py
caarleexx's picture
Update app.py
3346c34 verified
raw
history blame
25.6 kB
from fastapi import FastAPI, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse
from typing import List
import uvicorn
import os
import json
# Dependências para PDF
try:
import PyPDF2
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
print("⚠️ PyPDF2 não instalado. Instale com: pip install PyPDF2")
# ==================== 1. CONFIGURAÇÃO ====================
api_key = os.getenv("GOOGLE_API_KEY", "SUA_API_KEY_AQUI")
if api_key and api_key != "SUA_API_KEY_AQUI":
genai.configure(api_key=api_key)
# Modelos do Gemini
model_flash = genai.GenerativeModel("gemini-flash-latest")
model_pro = genai.GenerativeModel("gemini-pro-latest")
ARQUIVO_CONFIG = "protocolo.json"
PASTA_TRANSCRICOES = "transcricoes"
PAGES_PER_CHUNK = 10
MAX_WORKERS = 5 # Limite de chamadas paralelas
os.makedirs(PASTA_TRANSCRICOES, exist_ok=True)
# ==================== 2. UTILIDADES ====================
def carregar_protocolo():
""" Carrega o protocolo. Se não existir, cria um com exemplo de STOP. """
try:
with open(ARQUIVO_CONFIG, "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
# Protocolo padrão que inclui um agente com a lógica de pergunta ao usuário
protocolo_padrao = [
{"nome": "Leitor Inicial", "modelo": "flash", "missao": "Leia o contexto e resuma os fatos principais em 3 a 5 pontos."},
{
"nome": "Advogado de Acusação",
"modelo": "pro",
"missao": "Com base nos fatos, formule uma pergunta crucial para o usuário para fortalecer um caso. Sua resposta DEVE ser APENAS um JSON no formato: {\"tipo\": \"pergunta_usuario\", \"pergunta\": \"Sua pergunta aqui\"}"
},
{"nome": "Analista Final", "modelo": "pro", "missao": "Considere a resposta do usuário e os fatos iniciais para dar um parecer final sobre o caso."}
]
return json.dumps(protocolo_padrao, indent=2)
def salvar_protocolo(conteudo):
try:
json.loads(conteudo)
with open(ARQUIVO_CONFIG, "w", encoding="utf-8") as f:
f.write(conteudo)
return "✅ Protocolo salvo com sucesso!"
except Exception as e:
return f"❌ Erro ao salvar: {str(e)}"
def limpar_nome_arquivo(nome):
nome_base = os.path.basename(nome)
nome_limpo = "".join([c for c in nome_base if c.isalnum() or c in (' ', '.', '_', '-')]).strip()
return nome_limpo + ".json"
def extrair_texto_pdf(caminho_pdf):
# (Implementação existente, sem alterações)
try:
with open(caminho_pdf, 'rb') as f:
reader = PyPDF2.PdfReader(f)
paginas = []
for i, page in enumerate(reader.pages):
texto = page.extract_text()
paginas.append({
"numero": i + 1,
"texto": texto,
"metadata": str(page)[:200]
})
return paginas, None
except Exception as e:
return None, str(e)
def fragmentar_pdf(paginas, tamanho_chunk=PAGES_PER_CHUNK):
# (Implementação existente, sem alterações)
chunks = []
for i in range(0, len(paginas), tamanho_chunk):
chunk = paginas[i:i + tamanho_chunk]
num_inicio = chunk[0]["numero"]
num_fim = chunk[-1]["numero"]
texto_consolidado = "\n---QUEBRA DE PÁGINA---\n".join(
[f"[PÁGINA {p['numero']}]\n{p['texto']}" for p in chunk]
)
chunks.append({
"id": f"chunk_{num_inicio}_{num_fim}",
"paginas": f"{num_inicio}-{num_fim}",
"num_paginas": len(chunk),
"texto": texto_consolidado,
"metadata": [p["metadata"] for p in chunk]
})
return chunks
def processar_pdf_completo(arquivo_pdf):
# (Implementação existente, sem alterações)
if not PDF_SUPPORT:
return None, "❌ PyPDF2 não disponível"
try:
paginas, erro = extrair_texto_pdf(arquivo_pdf.name if hasattr(arquivo_pdf, 'name') else arquivo_pdf)
if erro:
return None, f"❌ Erro ao ler PDF: {erro}"
chunks = fragmentar_pdf(paginas)
nome_arquivo = os.path.basename(arquivo_pdf.name if hasattr(arquivo_pdf, 'name') else arquivo_pdf)
return {
"arquivo": nome_arquivo,
"total_paginas": len(paginas),
"total_chunks": len(chunks),
"chunks": chunks,
"tipo": "pdf"
}, None
except Exception as e:
return None, f"❌ Erro no processamento: {str(e)}"
def ler_arquivo_texto(arquivo):
# (Implementação existente, sem alterações)
if arquivo is None: return None
try:
with open(arquivo.name, "r", encoding="utf-8") as f:
conteudo = f.read()
return {
"arquivo": os.path.basename(arquivo.name),
"conteudo": conteudo,
"tipo": "texto"
}
except: return None
# ==================== 3. PIPELINE DE IA ====================
def transcrever_chunk(chunk_data, config_agentes):
# (Implementação existente, sem alterações)
modelo = model_flash
try:
if config_agentes and isinstance(config_agentes, list):
if config_agentes[0].get("modelo") == "pro":
modelo = model_pro
except:
pass
prompt = f"""
ANÁLISE DE DOCUMENTO (OCR/LEITURA):
Transcreva e estruture o conteúdo das páginas {chunk_data['paginas']}.
Texto extraído:
{chunk_data['texto']}
Retorne JSON: {{ "transcricao": "...", "objetos": ["..."], "resumo": "..." }}
"""
try:
for tentativa in range(3):
try:
resposta = modelo.generate_content(prompt)
texto_resp = resposta.text.replace("```json", "").replace("```", "")
return json.loads(texto_resp.strip()), None
except Exception as inner_e:
if "429" in str(inner_e):
time.sleep(2 * (tentativa + 1))
continue
raise inner_e
except Exception as e:
return None, str(e)
# ==================== 4. GERENCIADOR DE ARQUIVOS ====================
class GerenciadorArquivos:
# (Implementação existente, sem alterações)
def __init__(self):
self.arquivos = {}
def adicionar(self, arquivo, arquivo_id):
self.arquivos[arquivo_id] = {
"arquivo": arquivo,
"nome": os.path.basename(arquivo.name),
"status": "adicionado",
"processado": None,
"transcricao": None
}
def gerar_prompt_com_transcricoes(self, texto_usuario):
prompt = texto_usuario + "\n\n--- CONTEXTO DOS ARQUIVOS ---\n"
count = 0
for _, item in self.arquivos.items():
if item["status"] == "processado" and item["transcricao"]:
count += 1
trans = item["transcricao"]
nome = item["nome"]
prompt += f"\n[ARQUIVO: {nome}]\n"
if isinstance(trans, dict) and "chunks_processados" in trans:
for chunk in trans["chunks_processados"]:
if chunk.get("status") == "OK":
resumo = chunk.get('resumo', '')
resumo = str(resumo) if resumo else ""
prompt += f"Páginas {chunk['paginas']}: {resumo}\n"
texto_full = chunk.get('transcricao', '')
if texto_full:
texto_seguro = str(texto_full)
prompt += f"Trecho: {texto_seguro[:400]}...\n"
else:
prompt += "Trecho: (vazio)\n"
elif isinstance(trans, dict) and "conteudo" in trans:
conteudo = str(trans['conteudo'])
prompt += f"Conteúdo: {conteudo[:1000]}...\n"
if count == 0:
prompt += "(Nenhum arquivo processado ainda)"
return prompt
# Instância Global
gerenciador = GerenciadorArquivos()
# ==================== 5. FUNÇÕES DE ORQUESTRAÇÃO ====================
def automacao_upload_processamento(files, history, config_json):
# (Implementação existente, sem alterações)
if not files:
return history
try:
config_agentes = json.loads(config_json)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Configuração JSON inválida.")
if history is None:
history = []
history.append([None, f"📂 **SISTEMA:** Recebi {len(files)} arquivo(s). Verificando cache e processando..."])
yield history
ids_para_processar = []
for f in files:
arquivo_id = f"arq_{int(time.time()*1000)}_{f.name}"
gerenciador.adicionar(f, arquivo_id)
ids_para_processar.append(arquivo_id)
for arq_id in ids_para_processar:
item = gerenciador.arquivos[arq_id]
nome = item["nome"]
nome_cache = limpar_nome_arquivo(nome)
caminho_cache = os.path.join(PASTA_TRANSCRICOES, nome_cache)
if os.path.exists(caminho_cache):
try:
with open(caminho_cache, "r", encoding="utf-8") as cache_file:
dados_cache = json.load(cache_file)
item["transcricao"] = dados_cache
item["status"] = "processado"
if nome.lower().endswith('.pdf') and "chunks_processados" in dados_cache:
item["processado"] = {"tipo": "pdf", "chunks": []}
history.append([None, f"♻️ **Cache Encontrado:** `{nome}` já foi processado. Carregando..."])
yield history
continue
except Exception as e:
history.append([None, f"⚠️ Erro cache `{nome}`: {e}. Reprocessando..."])
history.append([None, f"⚙️ **Processando:** `{nome}`..."])
yield history
if nome.lower().endswith('.pdf'):
if not PDF_SUPPORT:
history.append([None, f"❌ Erro em `{nome}`: Biblioteca PDF ausente."])
yield history
continue
pdf_proc, erro = processar_pdf_completo(item["arquivo"])
if erro:
history.append([None, f"❌ Erro em `{nome}`: {erro}"])
yield history
continue
item["processado"] = pdf_proc
chunks = pdf_proc["chunks"]
total_chunks = len(chunks)
chunks_ordenados = [None] * total_chunks
history.append([None, f"📄 `{nome}` fragmentado em {total_chunks} partes. Iniciando IA (Paralelo: {MAX_WORKERS} threads)..."])
yield history
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures_map = {}
for i, chunk in enumerate(chunks):
future = executor.submit(transcrever_chunk, chunk, config_agentes)
futures_map[future] = i
concluidos = 0
for future in as_completed(futures_map):
index_original = futures_map[future]
res, err = future.result()
if err:
chunks_ordenados[index_original] = {"status": "ERRO", "paginas": chunks[index_original]["paginas"]}
else:
chunks_ordenados[index_original] = {
"status": "OK",
"paginas": chunks[index_original]["paginas"],
"transcricao": res.get("transcricao"),
"resumo": res.get("resumo")
}
concluidos += 1
if concluidos % 2 == 0 or concluidos == total_chunks:
msg_base = f"📄 `{nome}`: Processando partes... ({concluidos}/{total_chunks})"
history[-1][1] = msg_base
yield history
dados_finais = {
"arquivo": nome,
"data_processamento": str(datetime.now()),
"chunks_processados": chunks_ordenados
}
item["transcricao"] = dados_finais
item["status"] = "processado"
try:
with open(caminho_cache, "w", encoding="utf-8") as f_out:
json.dump(dados_finais, f_out, indent=2, ensure_ascii=False)
history.append([None, f"💾 `{nome}` processado e salvo no cache."])
except Exception as e:
history.append([None, f"⚠️ Erro ao salvar cache: {e}"])
yield history
else:
res = ler_arquivo_texto(item["arquivo"])
if res:
item["processado"] = res
dados_finais = {"conteudo": res["conteudo"], "data_processamento": str(datetime.now())}
item["transcricao"] = dados_finais
item["status"] = "processado"
with open(caminho_cache, "w", encoding="utf-8") as f_out:
json.dump(dados_finais, f_out, indent=2, ensure_ascii=False)
history.append([None, f"✅ `{nome}` (Texto) lido e salvo."])
else:
history.append([None, f"❌ Falha ao ler `{nome}`."])
yield history
history.append([None, "🏁 **Processamento de lote finalizado.** Os arquivos estão prontos para análise."])
yield history
def chat_orquestrador(message, history, config_json, pipeline_state):
"""
Orquestra a conversa. Pode iniciar uma nova pipeline ou continuar uma que foi pausada.
"""
# --- LÓGICA DE CONTINUAÇÃO ---
if pipeline_state.get("is_paused"):
history.append([message, None])
# Recupera o estado
timeline_execucao = pipeline_state["timeline"]
agentes_restantes = pipeline_state["remaining_agents"]
# Adiciona a resposta do usuário à trilha de auditoria
timeline_execucao.append({
"passo": len(timeline_execucao) + 1,
"tipo": "resposta_usuario",
"conteudo": message
})
# Reseta o estado para evitar loops
pipeline_state["is_paused"] = False
# Continua a execução do ponto onde parou
yield from executar_pipeline(history, timeline_execucao, agentes_restantes, pipeline_state)
return
# --- LÓGICA DE INÍCIO DE UMA NOVA CONVERSA ---
try:
prompt_contexto = gerenciador.gerar_prompt_com_transcricoes(message)
except Exception as e:
history.append([message, f"❌ Erro ao gerar contexto: {str(e)}"])
yield history, [], pipeline_state
return
try:
protocolo = json.loads(config_json)
except:
history.append([message, "❌ Erro no JSON de Configuração do Protocolo."])
yield history, [], pipeline_state
return
history.append([message, None])
# Inicia uma nova trilha de auditoria
timeline_execucao = [{"passo": 1, "tipo": "prompt_usuario", "conteudo": prompt_contexto}]
yield history, timeline_execucao, pipeline_state
# Inicia a execução com todos os agentes do protocolo
yield from executar_pipeline(history, timeline_execucao, protocolo, pipeline_state)
def executar_pipeline(history, timeline_execucao, agentes_a_executar, pipeline_state):
"""
Função core que executa a lista de agentes em sequência.
Pode ser pausada se um agente pedir input do usuário.
"""
passo_atual = len(timeline_execucao) + 1
for i, cfg in enumerate(agentes_a_executar):
nome_agente = cfg.get("nome", "Agente")
modelo_agente = model_pro if cfg.get("modelo") == "pro" else model_flash
msg_atual = history[-1][1] or ""
history[-1][1] = msg_atual + f"⏳ **{nome_agente}** está analisando...\n"
yield history, timeline_execucao, pipeline_state
prompt_agente = f"""
--- HISTÓRICO DA CONVERSA ATÉ AGORA ---
{json.dumps(timeline_execucao, ensure_ascii=False, indent=2)}
-----------------
Sua Identidade: {nome_agente}
Sua Missão Específica Agora: {cfg['missao']}
Responda de forma concisa e direta, focando apenas na sua missão.
"""
try:
inicio = time.time()
resp = modelo_agente.generate_content(prompt_agente)
texto_resp = resp.text
duracao = time.time() - inicio
# --- LÓGICA DE PAUSA (STOP) ---
try:
# Tenta interpretar a resposta como JSON para verificar se é uma pergunta
resposta_json = json.loads(texto_resp)
if resposta_json.get("tipo") == "pergunta_usuario":
pergunta = resposta_json.get("pergunta", "Não foi possível extrair a pergunta.")
# Salva o estado atual da pipeline
pipeline_state["is_paused"] = True
pipeline_state["timeline"] = timeline_execucao
# Salva os agentes que AINDA NÃO rodaram
pipeline_state["remaining_agents"] = agentes_a_executar[i+1:]
# Adiciona a pergunta à auditoria e ao chat
timeline_execucao.append({"passo": passo_atual, "tipo": "pergunta_agente", "agente": nome_agente, "pergunta": pergunta})
msg_atual = history[-1][1].replace(f"⏳ **{nome_agente}** está analisando...\n", "")
history[-1][1] = msg_atual + f"**{nome_agente}** precisa de mais informações:\n\n> *{pergunta}*\n\nAguardando sua resposta na caixa de texto abaixo..."
# Encerra a execução atual e aguarda o usuário
yield history, timeline_execucao, pipeline_state
return # Sai da função
except (json.JSONDecodeError, TypeError):
# Se não for um JSON de pergunta, é uma resposta normal
pass
# ---------------------------
timeline_execucao.append({"passo": passo_atual, "tipo": "resposta_agente", "agente": nome_agente, "resposta": texto_resp})
msg_atual = history[-1][1].replace(f"⏳ **{nome_agente}** está analisando...\n", "")
# Não exibe o conteúdo da resposta do modelo no chat, apenas a confirmação
novo_trecho = f"✅ **[{nome_agente}]** concluiu sua análise em ({duracao:.1f}s).\n"
history[-1][1] = msg_atual + novo_trecho
yield history, timeline_execucao, pipeline_state
except Exception as e:
timeline_execucao.append({"passo": passo_atual, "tipo": "erro_agente", "agente": nome_agente, "erro": str(e)})
msg_atual = history[-1][1]
history[-1][1] = msg_atual.replace(f"⏳ **{nome_agente}** está analisando...\n", "") + f"\n❌ Erro em {nome_agente}: {str(e)}\n"
yield history, timeline_execucao, pipeline_state
passo_atual += 1
# ==================== 6. UI (Gradio) ====================
def ui_v29_stop_logic():
css = """
footer {display: none !important;}
.contain {border: none !important;}
"""
config_inicial = carregar_protocolo()
with gr.Blocks(title="AI Forensics Auto", css=css, theme=gr.themes.Soft()) as app:
# Estado da configuração dos agentes
state_config = gr.State(config_inicial)
# NOVO: Estado para controlar a pausa/continuação da pipeline
pipeline_state = gr.State({"is_paused": False, "timeline": [], "remaining_agents": []})
with gr.Tabs():
with gr.Tab("💬 Investigação"):
chatbot = gr.Chatbot(
height=400,
show_label=False,
show_copy_button=True,
render_markdown=True,
label="Chat de Investigação"
)
with gr.Row():
txt_input = gr.Textbox(
scale=8,
show_label=False,
placeholder="Digite sua instrução ou responda à pergunta do agente...",
lines=1
)
btn_enviar = gr.Button("Enviar 📨", variant="primary", scale=1)
with gr.Accordion("📂 Adicionar Arquivos para Análise", open=False):
gr.Markdown("Selecione arquivos (PDF, TXT). A transcrição iniciará **automaticamente**.")
file_uploader = gr.File(
file_count="multiple",
file_types=[".pdf", ".txt", ".json", ".md"],
label="Arraste arquivos aqui ou clique para selecionar"
)
with gr.Tab("🕵️ Auditoria"):
gr.Markdown("### Trilha de Auditoria\nExibe o histórico completo de prompts e respostas de cada agente na última execução.")
json_audit = gr.JSON(label="Timeline da Execução da Última Mensagem")
with gr.Tab("⚙️ Contexto & Config"):
gr.Markdown("### Protocolo dos Agentes\nDefina a sequência e as missões dos agentes de IA. Para pausar e pedir input, use a missão de exemplo para o agente retornar um JSON específico.")
with gr.Row():
btn_save_cfg = gr.Button("💾 Salvar Alterações")
lbl_cfg_status = gr.Label(show_label=False)
code_config = gr.Code(value=config_inicial, language="json", label="protocolo.json")
btn_save_cfg.click(salvar_protocolo, inputs=[code_config], outputs=[lbl_cfg_status])
# Atualiza o state_config em memória após salvar
btn_save_cfg.click(lambda x: x, inputs=[code_config], outputs=[state_config])
# Ação de clique agora passa o pipeline_state para o orquestrador
btn_enviar.click(
chat_orquestrador,
inputs=[txt_input, chatbot, state_config, pipeline_state],
outputs=[chatbot, json_audit, pipeline_state] # Atualiza o estado da pipeline
).then(
lambda: "", outputs=[txt_input]
)
file_uploader.upload(
automacao_upload_processamento,
inputs=[file_uploader, chatbot, state_config],
outputs=[chatbot]
)
return app
# if __name__ == "__main__":
# ui_v29_stop_logic().launch()
from fastapi import FastAPI, UploadFile, Form
from fastapi.responses import JSONResponse
from typing import List
import uvicorn
# Criação da aplicação FastAPI
app = FastAPI(title="AI Forensics API", description="API para análise de casos de indenizações por morte ou invalidez.")
# Endpoint para upload e processamento de arquivos
@app.post("/upload/")
async def upload_files(files: List[UploadFile], config_json: str = Form(...)):
history = []
try:
config_agentes = json.loads(config_json)
except json.JSONDecodeError:
return JSONResponse(content={"error": "Configuração JSON inválida."}, status_code=400)
for file in files:
nome = file.filename
if nome.lower().endswith('.pdf'):
pdf_proc, erro = processar_pdf_completo(file.file)
if erro:
history.append({"file": nome, "status": "error", "message": erro})
else:
history.append({"file": nome, "status": "processed", "data": pdf_proc})
else:
res = ler_arquivo_texto(file.file)
if res:
history.append({"file": nome, "status": "processed", "data": res})
else:
history.append({"file": nome, "status": "error", "message": "Falha ao ler o arquivo."})
return history
# Endpoint para iniciar uma análise com base em uma mensagem
@app.post("/analyze/")
async def analyze_message(message: str, config_json: str):
try:
protocolo = json.loads(config_json)
except json.JSONDecodeError:
return JSONResponse(content={"error": "Configuração JSON inválida."}, status_code=400)
try:
prompt_contexto = gerenciador.gerar_prompt_com_transcricoes(message)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao gerar contexto: {str(e)}")
timeline_execucao = [{"passo": 1, "tipo": "prompt_usuario", "conteudo": prompt_contexto}]
pipeline_state = {"is_paused": False, "timeline": timeline_execucao, "remaining_agents": protocolo}
history = []
for response in executar_pipeline(history, timeline_execucao, protocolo, pipeline_state):
pass # Processa os passos da pipeline
return {"history": history, "timeline": timeline_execucao}
# Endpoint para consultar o estado dos arquivos processados
@app.get("/files/")
async def list_files():
return gerenciador.arquivos
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)