""" Custom Recursive Semantic Chunker v4.0 Contourne les limitations de chonkie 1.0.10 et implemente un chunking récursif intelligent avec hiérarchie et parentalité. Auteur: Assistant Claude Compatible avec: LlamaIndex v0.12, HuggingFace embeddings """ import re import hashlib import logging from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass from llama_index.core.schema import BaseEmbedding logger = logging.getLogger(__name__) @dataclass class ChunkResult: """Résultat d'un chunk avec métadonnées hiérarchiques""" id: str text: str level: int parent_id: Optional[str] = None children_ids: List[str] = None metadata: Dict[str, Any] = None embedding_vector: Optional[List[float]] = None semantic_similarity: Optional[float] = None def __post_init__(self): if self.children_ids is None: self.children_ids = [] if self.metadata is None: self.metadata = {} class CustomRecursiveChunker: """ Chunker récursif intelligent qui simule le comportement souhaité sans dépendre des versions instables de chonkie """ def __init__(self, embed_model: BaseEmbedding, chunk_sizes: List[int] = [2048, 512, 128], separators: List[str] = ["\n\n", "\n", ".", "!", "?", "—"], overlap_ratio: float = 0.1, min_chunk_size: int = 50, semantic_threshold: float = 0.75): """ Initialise le chunker personnalisé Args: embed_model: Modèle d'embedding LlamaIndex BaseEmbedding chunk_sizes: Tailles hiérarchiques des chunks [grand, moyen, petit] separators: Séparateurs pour découpage hiérarchique overlap_ratio: Ratio de chevauchement entre chunks min_chunk_size: Taille minimale d'un chunk semantic_threshold: Seuil de similarité sémantique """ self.embed_model = embed_model self.chunk_sizes = sorted(chunk_sizes, reverse=True) # [2048, 512, 128] self.separators = separators self.overlap_ratio = overlap_ratio self.min_chunk_size = min_chunk_size self.semantic_threshold = semantic_threshold logger.info(f"✅ CustomRecursiveChunker initialisé avec {len(chunk_sizes)} niveaux") def _generate_chunk_id(self, text: str, level: int, parent_id: str = None) -> str: """Génère un ID unique pour un chunk""" base_string = f"{text[:50]}-{level}-{parent_id or 'root'}" return hashlib.md5(base_string.encode()).hexdigest()[:12] def _split_by_separators(self, text: str, separators: List[str]) -> List[str]: """Découpe le texte selon une hiérarchie de séparateurs""" chunks = [text] for separator in separators: new_chunks = [] for chunk in chunks: if len(chunk) > self.min_chunk_size: split_parts = chunk.split(separator) # Nettoie et filtre les parties vides split_parts = [part.strip() for part in split_parts if part.strip()] new_chunks.extend(split_parts) else: new_chunks.append(chunk) chunks = new_chunks return [chunk for chunk in chunks if len(chunk.strip()) >= self.min_chunk_size] def _apply_size_constraint(self, chunks: List[str], max_size: int) -> List[str]: """Applique une contrainte de taille maximale aux chunks""" result_chunks = [] for chunk in chunks: if len(chunk) <= max_size: result_chunks.append(chunk) else: # Découpe les chunks trop longs words = chunk.split() current_chunk = [] current_size = 0 for word in words: word_size = len(word) + 1 # +1 pour l'espace if current_size + word_size > max_size and current_chunk: result_chunks.append(" ".join(current_chunk)) current_chunk = [word] current_size = word_size else: current_chunk.append(word) current_size += word_size if current_chunk: result_chunks.append(" ".join(current_chunk)) return result_chunks def _add_overlap(self, chunks: List[str]) -> List[str]: """Ajoute du chevauchement entre chunks adjacents""" if len(chunks) <= 1: return chunks overlapped_chunks = [] for i, chunk in enumerate(chunks): current_chunk = chunk # Ajoute le contexte du chunk précédent if i > 0: prev_words = chunks[i-1].split() overlap_size = int(len(prev_words) * self.overlap_ratio) if overlap_size > 0: prefix = " ".join(prev_words[-overlap_size:]) current_chunk = f"{prefix} {current_chunk}" # Ajoute le contexte du chunk suivant if i < len(chunks) - 1: next_words = chunks[i+1].split() overlap_size = int(len(next_words) * self.overlap_ratio) if overlap_size > 0: suffix = " ".join(next_words[:overlap_size]) current_chunk = f"{current_chunk} {suffix}" overlapped_chunks.append(current_chunk) return overlapped_chunks async def _get_embedding(self, text: str) -> Optional[List[float]]: """Obtient l'embedding d'un texte via le modèle LlamaIndex""" try: # Utilise la méthode standard LlamaIndex BaseEmbedding embedding = await self.embed_model.aget_text_embedding(text) return embedding except Exception as e: logger.warning(f"⚠️ Erreur embedding pour chunk: {e}") return None def _calculate_semantic_similarity(self, embedding1: List[float], embedding2: List[float]) -> float: """Calcule la similarité cosinus entre deux embeddings""" try: import numpy as np vec1 = np.array(embedding1) vec2 = np.array(embedding2) # Similarité cosinus dot_product = np.dot(vec1, vec2) magnitude1 = np.linalg.norm(vec1) magnitude2 = np.linalg.norm(vec2) if magnitude1 == 0 or magnitude2 == 0: return 0.0 similarity = dot_product / (magnitude1 * magnitude2) return float(similarity) except Exception as e: logger.warning(f"⚠️ Erreur calcul similarité: {e}") return 0.0 async def _chunk_recursive_level(self, text: str, level: int, parent_id: Optional[str] = None) -> List[ChunkResult]: """Applique le chunking récursif pour un niveau donné""" if level >= len(self.chunk_sizes): return [] max_size = self.chunk_sizes[level] # 1. Découpage initial par séparateurs raw_chunks = self._split_by_separators(text, self.separators) # 2. Application de la contrainte de taille sized_chunks = self._apply_size_constraint(raw_chunks, max_size) # 3. Ajout du chevauchement overlapped_chunks = self._add_overlap(sized_chunks) # 4. Création des objets ChunkResult chunk_results = [] for i, chunk_text in enumerate(overlapped_chunks): chunk_id = self._generate_chunk_id(chunk_text, level, parent_id) # Obtient l'embedding embedding = await self._get_embedding(chunk_text) chunk_result = ChunkResult( id=chunk_id, text=chunk_text, level=level, parent_id=parent_id, embedding_vector=embedding, metadata={ "position": i, "total_chunks": len(overlapped_chunks), "size": len(chunk_text), "max_size": max_size } ) chunk_results.append(chunk_result) # 5. Chunking récursif pour le niveau suivant all_chunks = chunk_results.copy() for chunk_result in chunk_results: if len(chunk_result.text) > self.min_chunk_size * 2: # Seulement si assez grand sub_chunks = await self._chunk_recursive_level( chunk_result.text, level + 1, chunk_result.id ) # Met à jour les relations parent-enfant chunk_result.children_ids = [sub_chunk.id for sub_chunk in sub_chunks] all_chunks.extend(sub_chunks) return all_chunks async def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[ChunkResult]: """ Point d'entrée principal pour le chunking récursif Args: text: Texte à chunker metadata: Métadonnées à attacher aux chunks Returns: Liste des chunks avec hiérarchie et relations """ if not text or len(text.strip()) < self.min_chunk_size: logger.warning("⚠️ Texte trop court pour chunking") return [] logger.info(f"🚀 Début chunking récursif - {len(text)} caractères") try: # Chunking récursif à partir du niveau 0 all_chunks = await self._chunk_recursive_level(text, level=0) # Enrichit les métadonnées for chunk in all_chunks: if metadata: chunk.metadata.update(metadata) chunk.metadata["total_levels"] = len(self.chunk_sizes) chunk.metadata["algorithm"] = "CustomRecursiveChunker" # Calcule les similarités sémantiques entre chunks du même niveau await self._compute_semantic_similarities(all_chunks) logger.info(f"✅ Chunking terminé - {len(all_chunks)} chunks générés") return all_chunks except Exception as e: logger.error(f"❌ Erreur chunking récursif: {e}") raise async def _compute_semantic_similarities(self, chunks: List[ChunkResult]): """Calcule les similarités sémantiques entre chunks""" # Groupe les chunks par niveau chunks_by_level = {} for chunk in chunks: if chunk.level not in chunks_by_level: chunks_by_level[chunk.level] = [] chunks_by_level[chunk.level].append(chunk) # Calcule les similarités pour chaque niveau for level, level_chunks in chunks_by_level.items(): for i, chunk1 in enumerate(level_chunks): if chunk1.embedding_vector is None: continue max_similarity = 0.0 for j, chunk2 in enumerate(level_chunks): if i != j and chunk2.embedding_vector is not None: similarity = self._calculate_semantic_similarity( chunk1.embedding_vector, chunk2.embedding_vector ) max_similarity = max(max_similarity, similarity) chunk1.semantic_similarity = max_similarity def to_obsidian_format(self, chunks: List[ChunkResult], source_title: str = "Document") -> str: """Convertit les chunks en format Obsidian avec liens hiérarchiques""" obsidian_content = [] obsidian_content.append(f"# {source_title} - Chunking Hiérarchique\n") # Groupe par niveau pour affichage structuré chunks_by_level = {} for chunk in chunks: if chunk.level not in chunks_by_level: chunks_by_level[chunk.level] = [] chunks_by_level[chunk.level].append(chunk) for level in sorted(chunks_by_level.keys()): level_chunks = chunks_by_level[level] obsidian_content.append(f"\n## Niveau {level} ({len(level_chunks)} chunks)\n") for chunk in level_chunks: # Titre du chunk avec ID obsidian_content.append(f"### [[{chunk.id}]] {chunk.id}") # Métadonnées obsidian_content.append("```yaml") obsidian_content.append(f"level: {chunk.level}") obsidian_content.append(f"parent: {chunk.parent_id or 'root'}") obsidian_content.append(f"children: {len(chunk.children_ids)}") obsidian_content.append(f"size: {len(chunk.text)}") if chunk.semantic_similarity: obsidian_content.append(f"similarity: {chunk.semantic_similarity:.3f}") obsidian_content.append("```\n") # Liens de navigation if chunk.parent_id: obsidian_content.append(f"**Parent:** [[{chunk.parent_id}]]") if chunk.children_ids: children_links = ", ".join([f"[[{child_id}]]" for child_id in chunk.children_ids]) obsidian_content.append(f"**Enfants:** {children_links}") # Contenu du chunk obsidian_content.append(f"\n**Contenu:**\n{chunk.text}\n") obsidian_content.append("---\n") return "\n".join(obsidian_content) def to_json_format(self, chunks: List[ChunkResult]) -> List[Dict[str, Any]]: """Convertit les chunks en format JSON pour API""" return [ { "id": chunk.id, "text": chunk.text, "level": chunk.level, "parent_id": chunk.parent_id, "children_ids": chunk.children_ids, "metadata": chunk.metadata, "has_embedding": chunk.embedding_vector is not None, "semantic_similarity": chunk.semantic_similarity } for chunk in chunks ]