rag_template / ui /ingestion_tab.py
Guilherme Favaron
Major update: Add hybrid search, reranking, multiple LLMs, and UI improvements
1b447de
"""
Aba de Ingestão de Documentos
Interface minimalista mostrando cada passo do processo
"""
import time
import gradio as gr
from typing import List
from src.database import DatabaseManager
from src.embeddings import EmbeddingManager
from src.chunking import (
chunk_text_fixed,
chunk_text_sentences,
chunk_text_semantic,
chunk_text_recursive,
chunk_with_metadata,
get_chunk_stats
)
from src.document_processing import process_uploaded_file, get_document_preview, get_document_stats
def create_ingestion_tab(db_manager: DatabaseManager, embedding_manager: EmbeddingManager, session_id: str):
"""Cria aba de ingestão de documentos"""
with gr.Tab("Ingestão de Documentos"):
gr.Markdown("""
## Processo de Ingestão de Documentos
Acompanhe cada etapa do processo RAG:
1. Upload: Envie arquivos PDF ou TXT
2. Extração: Texto é extraído dos documentos
3. Chunking: Texto é dividido em partes menores
4. Embeddings: Cada chunk é transformado em vetor
5. Armazenamento: Vetores são salvos no PostgreSQL com pgvector
""")
with gr.Row():
with gr.Column(scale=1):
file_upload = gr.File(
label="Selecione Arquivos (PDF ou TXT)",
file_count="multiple",
file_types=[".pdf", ".txt"]
)
with gr.Row():
chunk_strategy = gr.Radio(
choices=["Tamanho Fixo", "Por Sentenças", "Semântico", "Recursivo"],
value="Tamanho Fixo",
label="Estratégia de Chunking"
)
chunk_size = gr.Slider(
minimum=200,
maximum=2000,
value=1000,
step=100,
label="Tamanho do Chunk (caracteres)"
)
chunk_overlap = gr.Slider(
minimum=0,
maximum=500,
value=200,
step=50,
label="Overlap entre Chunks (caracteres)"
)
ingest_btn = gr.Button("Iniciar Ingestão", variant="primary", size="lg", elem_classes=["primary-button"])
with gr.Column(scale=2):
gr.Markdown("### Status do Processo")
status_display = gr.Markdown("Aguardando arquivos...")
with gr.Accordion("Texto Extraído", open=False):
extracted_text = gr.Textbox(
label="Preview do Texto",
lines=10,
max_lines=20,
interactive=False
)
with gr.Accordion("Chunks Gerados", open=False):
chunks_display = gr.Textbox(
label="Chunks",
lines=10,
max_lines=20,
interactive=False
)
with gr.Accordion("Estatísticas", open=True):
stats_display = gr.JSON(label="Métricas do Processo")
with gr.Accordion("Preview de Embeddings", open=False):
embeddings_preview = gr.Textbox(
label="Primeiros valores do embedding (dimensão do vetor)",
lines=5,
interactive=False
)
# Função de ingestão
def ingest_documents(files, strategy, chunk_size_val, chunk_overlap_val):
if not files:
return (
"Nenhum arquivo selecionado",
"",
"",
{},
""
)
total_start = time.time()
all_stats = {
"arquivos_processados": 0,
"total_chunks": 0,
"total_caracteres": 0,
"tempo_total_ms": 0,
"tempo_extracao_ms": 0,
"tempo_chunking_ms": 0,
"tempo_embedding_ms": 0,
"tempo_insercao_ms": 0,
"documentos_inseridos": 0
}
status_steps = []
all_chunks_preview = []
extracted_preview = ""
embedding_preview_text = ""
try:
for file_idx, file_obj in enumerate(files):
# Passo 1: Extração
status_steps.append(f"\n### Arquivo {file_idx + 1}")
extract_start = time.time()
filename, text = process_uploaded_file(file_obj)
extract_time = (time.time() - extract_start) * 1000
all_stats["tempo_extracao_ms"] += extract_time
status_steps.append(f"**Extração concluída**: {filename}")
status_steps.append(f"- Caracteres: {len(text):,}")
status_steps.append(f"- Tempo: {extract_time:.0f}ms")
if file_idx == 0:
extracted_preview = get_document_preview(text, 1000)
doc_stats = get_document_stats(text)
all_stats["total_caracteres"] = doc_stats["total_chars"]
# Passo 2: Chunking
chunk_start = time.time()
if strategy == "Por Sentenças":
chunks = chunk_text_sentences(text, int(chunk_size_val))
elif strategy == "Semântico":
chunks = chunk_text_semantic(text, int(chunk_size_val))
elif strategy == "Recursivo":
chunks = chunk_text_recursive(text, int(chunk_size_val))
else: # Tamanho Fixo
chunks = chunk_text_fixed(text, int(chunk_size_val), int(chunk_overlap_val))
chunk_time = (time.time() - chunk_start) * 1000
all_stats["tempo_chunking_ms"] += chunk_time
chunk_stats = get_chunk_stats(chunks)
all_stats["total_chunks"] += chunk_stats["total_chunks"]
status_steps.append(f"**Chunking concluído**: {chunk_stats['total_chunks']} chunks")
status_steps.append(f"- Tamanho médio: {chunk_stats['avg_size']:.0f} caracteres")
status_steps.append(f"- Tempo: {chunk_time:.0f}ms")
if file_idx == 0:
preview_chunks = chunks[:3]
for i, chunk in enumerate(preview_chunks):
all_chunks_preview.append(f"--- Chunk {i+1} ({len(chunk)} chars) ---\n{chunk}\n")
# Passo 3: Embeddings
embed_start = time.time()
embeddings = embedding_manager.encode(chunks, normalize=True, show_progress=False)
embed_time = (time.time() - embed_start) * 1000
all_stats["tempo_embedding_ms"] += embed_time
status_steps.append(f"**Embeddings gerados**: {len(embeddings)} vetores")
status_steps.append(f"- Dimensão: {embeddings.shape[1]}")
status_steps.append(f"- Tempo: {embed_time:.0f}ms")
if file_idx == 0 and len(embeddings) > 0:
first_embedding = embeddings[0][:10]
embedding_preview_text = f"Dimensão total: {embeddings.shape[1]}\n"
embedding_preview_text += f"Primeiros 10 valores: {first_embedding.tolist()}\n"
embedding_preview_text += f"Norma L2: {(first_embedding ** 2).sum() ** 0.5:.4f}"
# Passo 4: Inserção no banco
insert_start = time.time()
inserted_count = 0
for chunk_text, embedding_vec in zip(chunks, embeddings):
emb_list = embedding_vec.tolist()
doc_id = db_manager.insert_document(filename, chunk_text, emb_list, session_id)
if doc_id:
inserted_count += 1
insert_time = (time.time() - insert_start) * 1000
all_stats["tempo_insercao_ms"] += insert_time
status_steps.append(f"**Inserção no banco**: {inserted_count} documentos salvos")
status_steps.append(f"- Tempo: {insert_time:.0f}ms")
all_stats["documentos_inseridos"] += inserted_count
all_stats["arquivos_processados"] += 1
# Tempo total
total_time = (time.time() - total_start) * 1000
all_stats["tempo_total_ms"] = total_time
status_steps.append(f"\n## Processo Concluído")
status_steps.append(f"**Tempo total**: {total_time:.0f}ms ({total_time/1000:.2f}s)")
status_md = "\n".join(status_steps)
chunks_text = "\n".join(all_chunks_preview) if all_chunks_preview else "Nenhum chunk para preview"
return (
status_md,
extracted_preview,
chunks_text,
all_stats,
embedding_preview_text
)
except Exception as e:
return (
f"Erro durante processamento: {str(e)}",
"",
"",
{"erro": str(e)},
""
)
# Conecta evento
ingest_btn.click(
fn=ingest_documents,
inputs=[file_upload, chunk_strategy, chunk_size, chunk_overlap],
outputs=[status_display, extracted_text, chunks_display, stats_display, embeddings_preview]
)
return {
"file_upload": file_upload,
"ingest_btn": ingest_btn
}