rag_template / src /chunking.py
Guilherme Favaron
Major update: Add hybrid search, reranking, multiple LLMs, and UI improvements
1b447de
"""
Estratégias de chunking de documentos
"""
from typing import List, Dict, Any, Optional
import re
from .config import DEFAULT_CHUNK_SIZE, CHUNK_OVERLAP
def chunk_text_fixed(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = CHUNK_OVERLAP
) -> List[str]:
"""
Divide texto em chunks de tamanho fixo com overlap
Args:
text: Texto para dividir
chunk_size: Tamanho máximo de cada chunk
overlap: Quantidade de caracteres que se sobrepõem entre chunks
Returns:
Lista de chunks
"""
if not text:
return []
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move para próximo chunk com overlap
start = start + chunk_size - overlap
# Evita loop infinito se overlap >= chunk_size
if overlap >= chunk_size:
start = end
return chunks
def chunk_text_sentences(
text: str,
max_chunk_size: int = DEFAULT_CHUNK_SIZE
) -> List[str]:
"""
Divide texto respeitando limites de sentença
Args:
text: Texto para dividir
max_chunk_size: Tamanho máximo de cada chunk
Returns:
Lista de chunks
"""
if not text:
return []
# Separadores de sentença
sentence_endings = ['. ', '! ', '? ', '.\n', '!\n', '?\n']
chunks = []
current_chunk = ""
sentences = []
current_sentence = ""
# Divide em sentenças
for char in text:
current_sentence += char
# Verifica se terminou uma sentença
for ending in sentence_endings:
if current_sentence.endswith(ending):
sentences.append(current_sentence.strip())
current_sentence = ""
break
# Adiciona última sentença se houver
if current_sentence.strip():
sentences.append(current_sentence.strip())
# Agrupa sentenças em chunks
for sentence in sentences:
if len(current_chunk) + len(sentence) <= max_chunk_size:
current_chunk += " " + sentence if current_chunk else sentence
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
# Adiciona último chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def chunk_text_semantic(
text: str,
max_chunk_size: int = DEFAULT_CHUNK_SIZE,
min_similarity: float = 0.5
) -> List[str]:
"""
Divide texto em chunks semanticamente coerentes usando embeddings
Args:
text: Texto para dividir
max_chunk_size: Tamanho máximo de cada chunk
min_similarity: Similaridade mínima para manter sentenças juntas (0-1)
Returns:
Lista de chunks
"""
# Nota: Implementação simplificada - para produção, usar embeddings reais
# Por ora, usa heurísticas de pontuação e parágrafos
if not text:
return []
# Divide por parágrafos primeiro
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_chunk = ""
for para in paragraphs:
# Se parágrafo cabe no chunk atual
if len(current_chunk) + len(para) + 2 <= max_chunk_size:
current_chunk += "\n\n" + para if current_chunk else para
else:
# Salva chunk atual se houver
if current_chunk:
chunks.append(current_chunk.strip())
# Se parágrafo maior que max_chunk_size, divide em sentenças
if len(para) > max_chunk_size:
para_chunks = chunk_text_sentences(para, max_chunk_size)
chunks.extend(para_chunks)
current_chunk = ""
else:
current_chunk = para
# Adiciona último chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def chunk_text_recursive(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
separators: Optional[List[str]] = None
) -> List[str]:
"""
Divide texto recursivamente usando hierarquia de separadores
Args:
text: Texto para dividir
chunk_size: Tamanho máximo de cada chunk
separators: Lista de separadores em ordem de prioridade
Returns:
Lista de chunks
"""
if separators is None:
separators = [
"\n\n", # Parágrafos
"\n", # Linhas
". ", # Sentenças
"! ",
"? ",
"; ", # Cláusulas
", ", # Listas
" ", # Palavras
"" # Caracteres
]
if not text:
return []
chunks = []
def _split_recursive(text_part: str, sep_index: int = 0) -> None:
"""Função recursiva interna para dividir texto"""
if len(text_part) <= chunk_size:
if text_part.strip():
chunks.append(text_part.strip())
return
if sep_index >= len(separators):
# Último recurso: divide por caracteres
chunks.append(text_part[:chunk_size].strip())
if len(text_part) > chunk_size:
_split_recursive(text_part[chunk_size:], 0)
return
separator = separators[sep_index]
if separator not in text_part:
# Tenta próximo separador
_split_recursive(text_part, sep_index + 1)
return
# Divide pelo separador atual
parts = text_part.split(separator)
current_chunk = ""
for i, part in enumerate(parts):
# Reconstrói separador (exceto para string vazia)
if separator and i < len(parts) - 1:
part_with_sep = part + separator
else:
part_with_sep = part
if len(current_chunk) + len(part_with_sep) <= chunk_size:
current_chunk += part_with_sep
else:
if current_chunk.strip():
chunks.append(current_chunk.strip())
# Se parte individual é muito grande, usa próximo separador
if len(part_with_sep) > chunk_size:
_split_recursive(part_with_sep, sep_index + 1)
current_chunk = ""
else:
current_chunk = part_with_sep
if current_chunk.strip():
chunks.append(current_chunk.strip())
_split_recursive(text)
return chunks
def chunk_with_metadata(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
metadata: Optional[Dict[str, Any]] = None,
strategy: str = "fixed"
) -> List[Dict[str, Any]]:
"""
Divide texto em chunks com metadata adicional
Args:
text: Texto para dividir
chunk_size: Tamanho máximo de cada chunk
metadata: Metadata adicional (título, autor, data, etc)
strategy: Estratégia de chunking (fixed, sentences, semantic, recursive)
Returns:
Lista de dicionários com chunks e metadata
"""
if metadata is None:
metadata = {}
# Seleciona estratégia
if strategy == "sentences":
chunks = chunk_text_sentences(text, chunk_size)
elif strategy == "semantic":
chunks = chunk_text_semantic(text, chunk_size)
elif strategy == "recursive":
chunks = chunk_text_recursive(text, chunk_size)
else: # fixed
chunks = chunk_text_fixed(text, chunk_size)
# Adiciona metadata a cada chunk
chunks_with_metadata = []
for i, chunk in enumerate(chunks):
chunk_data = {
"content": chunk,
"chunk_index": i,
"chunk_total": len(chunks),
"char_count": len(chunk),
**metadata
}
chunks_with_metadata.append(chunk_data)
return chunks_with_metadata
def get_chunk_stats(chunks: List[str]) -> dict:
"""
Calcula estatísticas sobre os chunks
Args:
chunks: Lista de chunks
Returns:
Dicionário com estatísticas
"""
if not chunks:
return {
"total_chunks": 0,
"avg_size": 0,
"min_size": 0,
"max_size": 0,
"total_chars": 0
}
sizes = [len(chunk) for chunk in chunks]
return {
"total_chunks": len(chunks),
"avg_size": sum(sizes) / len(sizes),
"min_size": min(sizes),
"max_size": max(sizes),
"total_chars": sum(sizes)
}
def compare_chunking_strategies(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE
) -> Dict[str, Any]:
"""
Compara diferentes estratégias de chunking no mesmo texto
Args:
text: Texto para analisar
chunk_size: Tamanho máximo dos chunks
Returns:
Dicionário com resultados de cada estratégia
"""
results = {}
strategies = {
"fixed": lambda: chunk_text_fixed(text, chunk_size),
"sentences": lambda: chunk_text_sentences(text, chunk_size),
"semantic": lambda: chunk_text_semantic(text, chunk_size),
"recursive": lambda: chunk_text_recursive(text, chunk_size)
}
for name, func in strategies.items():
try:
chunks = func()
stats = get_chunk_stats(chunks)
results[name] = {
"chunks": chunks,
"stats": stats,
"success": True
}
except Exception as e:
results[name] = {
"chunks": [],
"stats": {},
"success": False,
"error": str(e)
}
return results