""" Estratégias de chunking de documentos """ from typing import List, Dict, Any, Optional import re from .config import DEFAULT_CHUNK_SIZE, CHUNK_OVERLAP def chunk_text_fixed( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = CHUNK_OVERLAP ) -> List[str]: """ Divide texto em chunks de tamanho fixo com overlap Args: text: Texto para dividir chunk_size: Tamanho máximo de cada chunk overlap: Quantidade de caracteres que se sobrepõem entre chunks Returns: Lista de chunks """ if not text: return [] chunks = [] start = 0 while start < len(text): end = min(start + chunk_size, len(text)) chunk = text[start:end].strip() if chunk: chunks.append(chunk) # Move para próximo chunk com overlap start = start + chunk_size - overlap # Evita loop infinito se overlap >= chunk_size if overlap >= chunk_size: start = end return chunks def chunk_text_sentences( text: str, max_chunk_size: int = DEFAULT_CHUNK_SIZE ) -> List[str]: """ Divide texto respeitando limites de sentença Args: text: Texto para dividir max_chunk_size: Tamanho máximo de cada chunk Returns: Lista de chunks """ if not text: return [] # Separadores de sentença sentence_endings = ['. ', '! ', '? ', '.\n', '!\n', '?\n'] chunks = [] current_chunk = "" sentences = [] current_sentence = "" # Divide em sentenças for char in text: current_sentence += char # Verifica se terminou uma sentença for ending in sentence_endings: if current_sentence.endswith(ending): sentences.append(current_sentence.strip()) current_sentence = "" break # Adiciona última sentença se houver if current_sentence.strip(): sentences.append(current_sentence.strip()) # Agrupa sentenças em chunks for sentence in sentences: if len(current_chunk) + len(sentence) <= max_chunk_size: current_chunk += " " + sentence if current_chunk else sentence else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence # Adiciona último chunk if current_chunk: chunks.append(current_chunk.strip()) return chunks def chunk_text_semantic( text: str, max_chunk_size: int = DEFAULT_CHUNK_SIZE, min_similarity: float = 0.5 ) -> List[str]: """ Divide texto em chunks semanticamente coerentes usando embeddings Args: text: Texto para dividir max_chunk_size: Tamanho máximo de cada chunk min_similarity: Similaridade mínima para manter sentenças juntas (0-1) Returns: Lista de chunks """ # Nota: Implementação simplificada - para produção, usar embeddings reais # Por ora, usa heurísticas de pontuação e parágrafos if not text: return [] # Divide por parágrafos primeiro paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] chunks = [] current_chunk = "" for para in paragraphs: # Se parágrafo cabe no chunk atual if len(current_chunk) + len(para) + 2 <= max_chunk_size: current_chunk += "\n\n" + para if current_chunk else para else: # Salva chunk atual se houver if current_chunk: chunks.append(current_chunk.strip()) # Se parágrafo maior que max_chunk_size, divide em sentenças if len(para) > max_chunk_size: para_chunks = chunk_text_sentences(para, max_chunk_size) chunks.extend(para_chunks) current_chunk = "" else: current_chunk = para # Adiciona último chunk if current_chunk: chunks.append(current_chunk.strip()) return chunks def chunk_text_recursive( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, separators: Optional[List[str]] = None ) -> List[str]: """ Divide texto recursivamente usando hierarquia de separadores Args: text: Texto para dividir chunk_size: Tamanho máximo de cada chunk separators: Lista de separadores em ordem de prioridade Returns: Lista de chunks """ if separators is None: separators = [ "\n\n", # Parágrafos "\n", # Linhas ". ", # Sentenças "! ", "? ", "; ", # Cláusulas ", ", # Listas " ", # Palavras "" # Caracteres ] if not text: return [] chunks = [] def _split_recursive(text_part: str, sep_index: int = 0) -> None: """Função recursiva interna para dividir texto""" if len(text_part) <= chunk_size: if text_part.strip(): chunks.append(text_part.strip()) return if sep_index >= len(separators): # Último recurso: divide por caracteres chunks.append(text_part[:chunk_size].strip()) if len(text_part) > chunk_size: _split_recursive(text_part[chunk_size:], 0) return separator = separators[sep_index] if separator not in text_part: # Tenta próximo separador _split_recursive(text_part, sep_index + 1) return # Divide pelo separador atual parts = text_part.split(separator) current_chunk = "" for i, part in enumerate(parts): # Reconstrói separador (exceto para string vazia) if separator and i < len(parts) - 1: part_with_sep = part + separator else: part_with_sep = part if len(current_chunk) + len(part_with_sep) <= chunk_size: current_chunk += part_with_sep else: if current_chunk.strip(): chunks.append(current_chunk.strip()) # Se parte individual é muito grande, usa próximo separador if len(part_with_sep) > chunk_size: _split_recursive(part_with_sep, sep_index + 1) current_chunk = "" else: current_chunk = part_with_sep if current_chunk.strip(): chunks.append(current_chunk.strip()) _split_recursive(text) return chunks def chunk_with_metadata( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, metadata: Optional[Dict[str, Any]] = None, strategy: str = "fixed" ) -> List[Dict[str, Any]]: """ Divide texto em chunks com metadata adicional Args: text: Texto para dividir chunk_size: Tamanho máximo de cada chunk metadata: Metadata adicional (título, autor, data, etc) strategy: Estratégia de chunking (fixed, sentences, semantic, recursive) Returns: Lista de dicionários com chunks e metadata """ if metadata is None: metadata = {} # Seleciona estratégia if strategy == "sentences": chunks = chunk_text_sentences(text, chunk_size) elif strategy == "semantic": chunks = chunk_text_semantic(text, chunk_size) elif strategy == "recursive": chunks = chunk_text_recursive(text, chunk_size) else: # fixed chunks = chunk_text_fixed(text, chunk_size) # Adiciona metadata a cada chunk chunks_with_metadata = [] for i, chunk in enumerate(chunks): chunk_data = { "content": chunk, "chunk_index": i, "chunk_total": len(chunks), "char_count": len(chunk), **metadata } chunks_with_metadata.append(chunk_data) return chunks_with_metadata def get_chunk_stats(chunks: List[str]) -> dict: """ Calcula estatísticas sobre os chunks Args: chunks: Lista de chunks Returns: Dicionário com estatísticas """ if not chunks: return { "total_chunks": 0, "avg_size": 0, "min_size": 0, "max_size": 0, "total_chars": 0 } sizes = [len(chunk) for chunk in chunks] return { "total_chunks": len(chunks), "avg_size": sum(sizes) / len(sizes), "min_size": min(sizes), "max_size": max(sizes), "total_chars": sum(sizes) } def compare_chunking_strategies( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> Dict[str, Any]: """ Compara diferentes estratégias de chunking no mesmo texto Args: text: Texto para analisar chunk_size: Tamanho máximo dos chunks Returns: Dicionário com resultados de cada estratégia """ results = {} strategies = { "fixed": lambda: chunk_text_fixed(text, chunk_size), "sentences": lambda: chunk_text_sentences(text, chunk_size), "semantic": lambda: chunk_text_semantic(text, chunk_size), "recursive": lambda: chunk_text_recursive(text, chunk_size) } for name, func in strategies.items(): try: chunks = func() stats = get_chunk_stats(chunks) results[name] = { "chunks": chunks, "stats": stats, "success": True } except Exception as e: results[name] = { "chunks": [], "stats": {}, "success": False, "error": str(e) } return results