Spaces:
Running
Running
Guilherme Favaron
Major update: Add hybrid search, reranking, multiple LLMs, and UI improvements
1b447de
| """ | |
| Estratégias de chunking de documentos | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| import re | |
| from .config import DEFAULT_CHUNK_SIZE, CHUNK_OVERLAP | |
| def chunk_text_fixed( | |
| text: str, | |
| chunk_size: int = DEFAULT_CHUNK_SIZE, | |
| overlap: int = CHUNK_OVERLAP | |
| ) -> List[str]: | |
| """ | |
| Divide texto em chunks de tamanho fixo com overlap | |
| Args: | |
| text: Texto para dividir | |
| chunk_size: Tamanho máximo de cada chunk | |
| overlap: Quantidade de caracteres que se sobrepõem entre chunks | |
| Returns: | |
| Lista de chunks | |
| """ | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = min(start + chunk_size, len(text)) | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| # Move para próximo chunk com overlap | |
| start = start + chunk_size - overlap | |
| # Evita loop infinito se overlap >= chunk_size | |
| if overlap >= chunk_size: | |
| start = end | |
| return chunks | |
| def chunk_text_sentences( | |
| text: str, | |
| max_chunk_size: int = DEFAULT_CHUNK_SIZE | |
| ) -> List[str]: | |
| """ | |
| Divide texto respeitando limites de sentença | |
| Args: | |
| text: Texto para dividir | |
| max_chunk_size: Tamanho máximo de cada chunk | |
| Returns: | |
| Lista de chunks | |
| """ | |
| if not text: | |
| return [] | |
| # Separadores de sentença | |
| sentence_endings = ['. ', '! ', '? ', '.\n', '!\n', '?\n'] | |
| chunks = [] | |
| current_chunk = "" | |
| sentences = [] | |
| current_sentence = "" | |
| # Divide em sentenças | |
| for char in text: | |
| current_sentence += char | |
| # Verifica se terminou uma sentença | |
| for ending in sentence_endings: | |
| if current_sentence.endswith(ending): | |
| sentences.append(current_sentence.strip()) | |
| current_sentence = "" | |
| break | |
| # Adiciona última sentença se houver | |
| if current_sentence.strip(): | |
| sentences.append(current_sentence.strip()) | |
| # Agrupa sentenças em chunks | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) <= max_chunk_size: | |
| current_chunk += " " + sentence if current_chunk else sentence | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence | |
| # Adiciona último chunk | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def chunk_text_semantic( | |
| text: str, | |
| max_chunk_size: int = DEFAULT_CHUNK_SIZE, | |
| min_similarity: float = 0.5 | |
| ) -> List[str]: | |
| """ | |
| Divide texto em chunks semanticamente coerentes usando embeddings | |
| Args: | |
| text: Texto para dividir | |
| max_chunk_size: Tamanho máximo de cada chunk | |
| min_similarity: Similaridade mínima para manter sentenças juntas (0-1) | |
| Returns: | |
| Lista de chunks | |
| """ | |
| # Nota: Implementação simplificada - para produção, usar embeddings reais | |
| # Por ora, usa heurísticas de pontuação e parágrafos | |
| if not text: | |
| return [] | |
| # Divide por parágrafos primeiro | |
| paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] | |
| chunks = [] | |
| current_chunk = "" | |
| for para in paragraphs: | |
| # Se parágrafo cabe no chunk atual | |
| if len(current_chunk) + len(para) + 2 <= max_chunk_size: | |
| current_chunk += "\n\n" + para if current_chunk else para | |
| else: | |
| # Salva chunk atual se houver | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| # Se parágrafo maior que max_chunk_size, divide em sentenças | |
| if len(para) > max_chunk_size: | |
| para_chunks = chunk_text_sentences(para, max_chunk_size) | |
| chunks.extend(para_chunks) | |
| current_chunk = "" | |
| else: | |
| current_chunk = para | |
| # Adiciona último chunk | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def chunk_text_recursive( | |
| text: str, | |
| chunk_size: int = DEFAULT_CHUNK_SIZE, | |
| separators: Optional[List[str]] = None | |
| ) -> List[str]: | |
| """ | |
| Divide texto recursivamente usando hierarquia de separadores | |
| Args: | |
| text: Texto para dividir | |
| chunk_size: Tamanho máximo de cada chunk | |
| separators: Lista de separadores em ordem de prioridade | |
| Returns: | |
| Lista de chunks | |
| """ | |
| if separators is None: | |
| separators = [ | |
| "\n\n", # Parágrafos | |
| "\n", # Linhas | |
| ". ", # Sentenças | |
| "! ", | |
| "? ", | |
| "; ", # Cláusulas | |
| ", ", # Listas | |
| " ", # Palavras | |
| "" # Caracteres | |
| ] | |
| if not text: | |
| return [] | |
| chunks = [] | |
| def _split_recursive(text_part: str, sep_index: int = 0) -> None: | |
| """Função recursiva interna para dividir texto""" | |
| if len(text_part) <= chunk_size: | |
| if text_part.strip(): | |
| chunks.append(text_part.strip()) | |
| return | |
| if sep_index >= len(separators): | |
| # Último recurso: divide por caracteres | |
| chunks.append(text_part[:chunk_size].strip()) | |
| if len(text_part) > chunk_size: | |
| _split_recursive(text_part[chunk_size:], 0) | |
| return | |
| separator = separators[sep_index] | |
| if separator not in text_part: | |
| # Tenta próximo separador | |
| _split_recursive(text_part, sep_index + 1) | |
| return | |
| # Divide pelo separador atual | |
| parts = text_part.split(separator) | |
| current_chunk = "" | |
| for i, part in enumerate(parts): | |
| # Reconstrói separador (exceto para string vazia) | |
| if separator and i < len(parts) - 1: | |
| part_with_sep = part + separator | |
| else: | |
| part_with_sep = part | |
| if len(current_chunk) + len(part_with_sep) <= chunk_size: | |
| current_chunk += part_with_sep | |
| else: | |
| if current_chunk.strip(): | |
| chunks.append(current_chunk.strip()) | |
| # Se parte individual é muito grande, usa próximo separador | |
| if len(part_with_sep) > chunk_size: | |
| _split_recursive(part_with_sep, sep_index + 1) | |
| current_chunk = "" | |
| else: | |
| current_chunk = part_with_sep | |
| if current_chunk.strip(): | |
| chunks.append(current_chunk.strip()) | |
| _split_recursive(text) | |
| return chunks | |
| def chunk_with_metadata( | |
| text: str, | |
| chunk_size: int = DEFAULT_CHUNK_SIZE, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| strategy: str = "fixed" | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Divide texto em chunks com metadata adicional | |
| Args: | |
| text: Texto para dividir | |
| chunk_size: Tamanho máximo de cada chunk | |
| metadata: Metadata adicional (título, autor, data, etc) | |
| strategy: Estratégia de chunking (fixed, sentences, semantic, recursive) | |
| Returns: | |
| Lista de dicionários com chunks e metadata | |
| """ | |
| if metadata is None: | |
| metadata = {} | |
| # Seleciona estratégia | |
| if strategy == "sentences": | |
| chunks = chunk_text_sentences(text, chunk_size) | |
| elif strategy == "semantic": | |
| chunks = chunk_text_semantic(text, chunk_size) | |
| elif strategy == "recursive": | |
| chunks = chunk_text_recursive(text, chunk_size) | |
| else: # fixed | |
| chunks = chunk_text_fixed(text, chunk_size) | |
| # Adiciona metadata a cada chunk | |
| chunks_with_metadata = [] | |
| for i, chunk in enumerate(chunks): | |
| chunk_data = { | |
| "content": chunk, | |
| "chunk_index": i, | |
| "chunk_total": len(chunks), | |
| "char_count": len(chunk), | |
| **metadata | |
| } | |
| chunks_with_metadata.append(chunk_data) | |
| return chunks_with_metadata | |
| def get_chunk_stats(chunks: List[str]) -> dict: | |
| """ | |
| Calcula estatísticas sobre os chunks | |
| Args: | |
| chunks: Lista de chunks | |
| Returns: | |
| Dicionário com estatísticas | |
| """ | |
| if not chunks: | |
| return { | |
| "total_chunks": 0, | |
| "avg_size": 0, | |
| "min_size": 0, | |
| "max_size": 0, | |
| "total_chars": 0 | |
| } | |
| sizes = [len(chunk) for chunk in chunks] | |
| return { | |
| "total_chunks": len(chunks), | |
| "avg_size": sum(sizes) / len(sizes), | |
| "min_size": min(sizes), | |
| "max_size": max(sizes), | |
| "total_chars": sum(sizes) | |
| } | |
| def compare_chunking_strategies( | |
| text: str, | |
| chunk_size: int = DEFAULT_CHUNK_SIZE | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compara diferentes estratégias de chunking no mesmo texto | |
| Args: | |
| text: Texto para analisar | |
| chunk_size: Tamanho máximo dos chunks | |
| Returns: | |
| Dicionário com resultados de cada estratégia | |
| """ | |
| results = {} | |
| strategies = { | |
| "fixed": lambda: chunk_text_fixed(text, chunk_size), | |
| "sentences": lambda: chunk_text_sentences(text, chunk_size), | |
| "semantic": lambda: chunk_text_semantic(text, chunk_size), | |
| "recursive": lambda: chunk_text_recursive(text, chunk_size) | |
| } | |
| for name, func in strategies.items(): | |
| try: | |
| chunks = func() | |
| stats = get_chunk_stats(chunks) | |
| results[name] = { | |
| "chunks": chunks, | |
| "stats": stats, | |
| "success": True | |
| } | |
| except Exception as e: | |
| results[name] = { | |
| "chunks": [], | |
| "stats": {}, | |
| "success": False, | |
| "error": str(e) | |
| } | |
| return results | |