Spaces:
Sleeping
Sleeping
| """ | |
| Service de chunking pour la gestion des textes longs. | |
| Divise intelligemment les documents en chunks pour le traitement par LLM. | |
| """ | |
| import re | |
| from typing import List, Dict, Tuple, Optional | |
| from dataclasses import dataclass | |
| from src.core.logging import setup_logger | |
| class TextChunk: | |
| """Représente un chunk de texte avec métadonnées.""" | |
| content: str | |
| start_index: int | |
| end_index: int | |
| chunk_id: int | |
| total_chunks: int | |
| word_count: int | |
| has_heading: bool = False | |
| heading_text: Optional[str] = None | |
| class TextChunker: | |
| """ | |
| Service de découpage intelligent de texte pour le traitement par LLM. | |
| Fonctionnalités: | |
| - Découpage respectant les phrases et paragraphes | |
| - Préservation des titres et structure | |
| - Gestion du chevauchement entre chunks | |
| - Optimisation pour les limites de tokens LLM | |
| """ | |
| def __init__( | |
| self, | |
| max_chunk_size: int = 4000, # En caractères | |
| overlap_size: int = 200, # Chevauchement entre chunks | |
| min_chunk_size: int = 500 # Taille minimale d'un chunk | |
| ): | |
| self.max_chunk_size = max_chunk_size | |
| self.overlap_size = overlap_size | |
| self.min_chunk_size = min_chunk_size | |
| self.logger = setup_logger("text_chunker") | |
| # Patterns pour identifier la structure | |
| self.heading_patterns = [ | |
| r'^#{1,6}\s+.+$', # Markdown headings | |
| r'^\d+\.\s+.+$', # Numérotations | |
| r'^[A-Z\s]{5,}$', # Titres en majuscules | |
| r'^\w+:$', # Labels avec deux-points | |
| ] | |
| self.sentence_endings = r'[.!?]+(?:\s|$)' | |
| self.paragraph_breaks = r'\n\s*\n' | |
| def chunk_text(self, text: str, preserve_structure: bool = True) -> List[TextChunk]: | |
| """ | |
| Découpe un texte en chunks intelligents. | |
| Args: | |
| text: Texte à découper | |
| preserve_structure: Préserver la structure (titres, paragraphes) | |
| Returns: | |
| Liste des chunks créés | |
| """ | |
| if not text or len(text.strip()) == 0: | |
| return [] | |
| # Nettoyage préliminaire | |
| text = self._clean_text(text) | |
| # Si le texte est assez court, retourner un seul chunk | |
| if len(text) <= self.max_chunk_size: | |
| return [TextChunk( | |
| content=text, | |
| start_index=0, | |
| end_index=len(text), | |
| chunk_id=1, | |
| total_chunks=1, | |
| word_count=len(text.split()) | |
| )] | |
| # Découpage intelligent | |
| if preserve_structure: | |
| chunks = self._chunk_with_structure(text) | |
| else: | |
| chunks = self._chunk_simple(text) | |
| # Post-traitement des chunks | |
| chunks = self._post_process_chunks(chunks) | |
| self.logger.info(f"Texte découpé en {len(chunks)} chunks (taille moyenne: {sum(len(c.content) for c in chunks) // len(chunks)} caractères)") | |
| return chunks | |
| def _clean_text(self, text: str) -> str: | |
| """Nettoie le texte avant découpage.""" | |
| # Normaliser les espaces | |
| text = re.sub(r'\s+', ' ', text) | |
| # Normaliser les sauts de ligne | |
| text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) | |
| # Supprimer les espaces en début et fin | |
| text = text.strip() | |
| return text | |
| def _chunk_with_structure(self, text: str) -> List[TextChunk]: | |
| """Découpage en préservant la structure du document.""" | |
| chunks = [] | |
| current_chunk = "" | |
| current_start = 0 | |
| # Diviser en paragraphes | |
| paragraphs = re.split(self.paragraph_breaks, text) | |
| text_position = 0 | |
| for paragraph in paragraphs: | |
| if not paragraph.strip(): | |
| continue | |
| # Vérifier si le paragraphe contient un titre | |
| is_heading, heading_text = self._detect_heading(paragraph) | |
| # Si ajouter ce paragraphe dépasse la limite | |
| if len(current_chunk) + len(paragraph) > self.max_chunk_size and current_chunk: | |
| # Sauvegarder le chunk actuel | |
| chunk = self._create_chunk( | |
| current_chunk.strip(), | |
| current_start, | |
| text_position, | |
| len(chunks) + 1 | |
| ) | |
| chunks.append(chunk) | |
| # Commencer un nouveau chunk avec chevauchement | |
| overlap_text = self._get_overlap_text(current_chunk) | |
| current_chunk = overlap_text + paragraph | |
| current_start = text_position - len(overlap_text) | |
| else: | |
| # Ajouter le paragraphe au chunk actuel | |
| if current_chunk: | |
| current_chunk += "\n\n" + paragraph | |
| else: | |
| current_chunk = paragraph | |
| current_start = text_position | |
| text_position += len(paragraph) + 2 # +2 pour \n\n | |
| # Ajouter le dernier chunk | |
| if current_chunk.strip(): | |
| chunk = self._create_chunk( | |
| current_chunk.strip(), | |
| current_start, | |
| len(text), | |
| len(chunks) + 1 | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| def _chunk_simple(self, text: str) -> List[TextChunk]: | |
| """Découpage simple par phrases.""" | |
| chunks = [] | |
| sentences = re.split(self.sentence_endings, text) | |
| current_chunk = "" | |
| current_start = 0 | |
| text_position = 0 | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Estimer la position dans le texte original | |
| sentence_in_text = sentence + "." # Approximation | |
| if len(current_chunk) + len(sentence_in_text) > self.max_chunk_size and current_chunk: | |
| # Sauvegarder le chunk actuel | |
| chunk = self._create_chunk( | |
| current_chunk.strip(), | |
| current_start, | |
| text_position, | |
| len(chunks) + 1 | |
| ) | |
| chunks.append(chunk) | |
| # Nouveau chunk avec chevauchement | |
| overlap_text = self._get_overlap_text(current_chunk) | |
| current_chunk = overlap_text + sentence_in_text | |
| current_start = text_position - len(overlap_text) | |
| else: | |
| if current_chunk: | |
| current_chunk += " " + sentence_in_text | |
| else: | |
| current_chunk = sentence_in_text | |
| current_start = text_position | |
| text_position += len(sentence_in_text) | |
| # Dernier chunk | |
| if current_chunk.strip(): | |
| chunk = self._create_chunk( | |
| current_chunk.strip(), | |
| current_start, | |
| len(text), | |
| len(chunks) + 1 | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| def _detect_heading(self, paragraph: str) -> Tuple[bool, Optional[str]]: | |
| """Détecte si un paragraphe est un titre.""" | |
| lines = paragraph.strip().split('\n') | |
| first_line = lines[0].strip() | |
| for pattern in self.heading_patterns: | |
| if re.match(pattern, first_line, re.MULTILINE): | |
| return True, first_line | |
| # Détection heuristique | |
| if (len(first_line) < 100 and | |
| len(first_line.split()) < 10 and | |
| first_line[0].isupper()): | |
| return True, first_line | |
| return False, None | |
| def _get_overlap_text(self, chunk: str) -> str: | |
| """Extrait le texte de chevauchement à la fin d'un chunk.""" | |
| if len(chunk) <= self.overlap_size: | |
| return "" | |
| # Prendre les dernières phrases jusqu'à overlap_size | |
| sentences = re.split(self.sentence_endings, chunk[-self.overlap_size:]) | |
| if len(sentences) > 1: | |
| # Garder les phrases complètes | |
| return ". ".join(sentences[1:]) + ". " | |
| else: | |
| # Fallback: prendre les derniers mots | |
| words = chunk.split() | |
| overlap_words = [] | |
| char_count = 0 | |
| for word in reversed(words): | |
| if char_count + len(word) > self.overlap_size: | |
| break | |
| overlap_words.insert(0, word) | |
| char_count += len(word) + 1 | |
| return " ".join(overlap_words) + " " if overlap_words else "" | |
| def _create_chunk(self, content: str, start: int, end: int, chunk_id: int) -> TextChunk: | |
| """Crée un objet TextChunk avec métadonnées.""" | |
| is_heading, heading_text = self._detect_heading(content) | |
| return TextChunk( | |
| content=content, | |
| start_index=start, | |
| end_index=end, | |
| chunk_id=chunk_id, | |
| total_chunks=0, # Sera mis à jour dans post_process | |
| word_count=len(content.split()), | |
| has_heading=is_heading, | |
| heading_text=heading_text | |
| ) | |
| def _post_process_chunks(self, chunks: List[TextChunk]) -> List[TextChunk]: | |
| """Post-traitement des chunks.""" | |
| total_chunks = len(chunks) | |
| # Mettre à jour le nombre total de chunks | |
| for chunk in chunks: | |
| chunk.total_chunks = total_chunks | |
| # Fusionner les chunks trop petits | |
| merged_chunks = [] | |
| i = 0 | |
| while i < len(chunks): | |
| current_chunk = chunks[i] | |
| # Si le chunk est trop petit et qu'il y a un chunk suivant | |
| if (len(current_chunk.content) < self.min_chunk_size and | |
| i + 1 < len(chunks) and | |
| len(current_chunk.content) + len(chunks[i + 1].content) <= self.max_chunk_size): | |
| # Fusionner avec le chunk suivant | |
| next_chunk = chunks[i + 1] | |
| merged_content = current_chunk.content + "\n\n" + next_chunk.content | |
| merged_chunk = TextChunk( | |
| content=merged_content, | |
| start_index=current_chunk.start_index, | |
| end_index=next_chunk.end_index, | |
| chunk_id=len(merged_chunks) + 1, | |
| total_chunks=0, # Sera mis à jour à la fin | |
| word_count=len(merged_content.split()), | |
| has_heading=current_chunk.has_heading or next_chunk.has_heading, | |
| heading_text=current_chunk.heading_text or next_chunk.heading_text | |
| ) | |
| merged_chunks.append(merged_chunk) | |
| i += 2 # Passer les deux chunks fusionnés | |
| else: | |
| # Garder le chunk tel quel | |
| current_chunk.chunk_id = len(merged_chunks) + 1 | |
| merged_chunks.append(current_chunk) | |
| i += 1 | |
| # Mettre à jour le nombre total final | |
| for chunk in merged_chunks: | |
| chunk.total_chunks = len(merged_chunks) | |
| return merged_chunks | |
| def get_chunking_stats(self, chunks: List[TextChunk]) -> Dict[str, any]: | |
| """Calcule les statistiques de découpage.""" | |
| if not chunks: | |
| return {} | |
| chunk_sizes = [len(chunk.content) for chunk in chunks] | |
| word_counts = [chunk.word_count for chunk in chunks] | |
| return { | |
| "total_chunks": len(chunks), | |
| "total_characters": sum(chunk_sizes), | |
| "total_words": sum(word_counts), | |
| "average_chunk_size": sum(chunk_sizes) // len(chunks), | |
| "average_words_per_chunk": sum(word_counts) // len(chunks), | |
| "min_chunk_size": min(chunk_sizes), | |
| "max_chunk_size": max(chunk_sizes), | |
| "chunks_with_headings": sum(1 for chunk in chunks if chunk.has_heading) | |
| } | |
| class ChunkingManager: | |
| """ | |
| Gestionnaire de chunking avec différentes stratégies. | |
| """ | |
| def __init__(self): | |
| self.logger = setup_logger("chunking_manager") | |
| # Chunkers spécialisés | |
| self.chunkers = { | |
| "default": TextChunker(max_chunk_size=4000, overlap_size=200), | |
| "small": TextChunker(max_chunk_size=2000, overlap_size=100), | |
| "large": TextChunker(max_chunk_size=20000, overlap_size=300), | |
| "precise": TextChunker(max_chunk_size=3000, overlap_size=150, min_chunk_size=800) | |
| } | |
| def chunk_document( | |
| self, | |
| content: str, | |
| strategy: str = "default", | |
| preserve_structure: bool = True | |
| ) -> List[TextChunk]: | |
| """ | |
| Découpe un document selon la stratégie spécifiée. | |
| Args: | |
| content: Contenu à découper | |
| strategy: Stratégie de découpage (default, small, large, precise) | |
| preserve_structure: Préserver la structure du document | |
| Returns: | |
| Liste des chunks créés | |
| """ | |
| if strategy not in self.chunkers: | |
| self.logger.warning(f"Stratégie inconnue '{strategy}', utilisation de 'default'") | |
| strategy = "default" | |
| chunker = self.chunkers[strategy] | |
| chunks = chunker.chunk_text(content, preserve_structure) | |
| # Statistiques | |
| stats = chunker.get_chunking_stats(chunks) | |
| self.logger.info(f"Chunking '{strategy}': {stats['total_chunks']} chunks créés") | |
| return chunks | |
| def auto_select_strategy(self, content: str) -> str: | |
| """Sélectionne automatiquement la meilleure stratégie de chunking.""" | |
| content_length = len(content) | |
| word_count = len(content.split()) | |
| # Heuristiques pour sélectionner la stratégie | |
| if content_length < 5000: | |
| return "small" | |
| elif content_length > 20000: | |
| return "large" | |
| elif word_count > 3000: # Texte dense | |
| return "precise" | |
| else: | |
| return "default" |