Spaces:
Runtime error
Runtime error
| """ | |
| Custom Recursive Semantic Chunker v4.0 | |
| Contourne les limitations de chonkie 1.0.10 et implemente | |
| un chunking récursif intelligent avec hiérarchie et parentalité. | |
| Auteur: Assistant Claude | |
| Compatible avec: LlamaIndex v0.12, HuggingFace embeddings | |
| """ | |
| import re | |
| import hashlib | |
| import logging | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from dataclasses import dataclass | |
| from llama_index.core.schema import BaseEmbedding | |
| logger = logging.getLogger(__name__) | |
| class ChunkResult: | |
| """Résultat d'un chunk avec métadonnées hiérarchiques""" | |
| id: str | |
| text: str | |
| level: int | |
| parent_id: Optional[str] = None | |
| children_ids: List[str] = None | |
| metadata: Dict[str, Any] = None | |
| embedding_vector: Optional[List[float]] = None | |
| semantic_similarity: Optional[float] = None | |
| def __post_init__(self): | |
| if self.children_ids is None: | |
| self.children_ids = [] | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class CustomRecursiveChunker: | |
| """ | |
| Chunker récursif intelligent qui simule le comportement | |
| souhaité sans dépendre des versions instables de chonkie | |
| """ | |
| def __init__(self, | |
| embed_model: BaseEmbedding, | |
| chunk_sizes: List[int] = [2048, 512, 128], | |
| separators: List[str] = ["\n\n", "\n", ".", "!", "?", "—"], | |
| overlap_ratio: float = 0.1, | |
| min_chunk_size: int = 50, | |
| semantic_threshold: float = 0.75): | |
| """ | |
| Initialise le chunker personnalisé | |
| Args: | |
| embed_model: Modèle d'embedding LlamaIndex BaseEmbedding | |
| chunk_sizes: Tailles hiérarchiques des chunks [grand, moyen, petit] | |
| separators: Séparateurs pour découpage hiérarchique | |
| overlap_ratio: Ratio de chevauchement entre chunks | |
| min_chunk_size: Taille minimale d'un chunk | |
| semantic_threshold: Seuil de similarité sémantique | |
| """ | |
| self.embed_model = embed_model | |
| self.chunk_sizes = sorted(chunk_sizes, reverse=True) # [2048, 512, 128] | |
| self.separators = separators | |
| self.overlap_ratio = overlap_ratio | |
| self.min_chunk_size = min_chunk_size | |
| self.semantic_threshold = semantic_threshold | |
| logger.info(f"✅ CustomRecursiveChunker initialisé avec {len(chunk_sizes)} niveaux") | |
| def _generate_chunk_id(self, text: str, level: int, parent_id: str = None) -> str: | |
| """Génère un ID unique pour un chunk""" | |
| base_string = f"{text[:50]}-{level}-{parent_id or 'root'}" | |
| return hashlib.md5(base_string.encode()).hexdigest()[:12] | |
| def _split_by_separators(self, text: str, separators: List[str]) -> List[str]: | |
| """Découpe le texte selon une hiérarchie de séparateurs""" | |
| chunks = [text] | |
| for separator in separators: | |
| new_chunks = [] | |
| for chunk in chunks: | |
| if len(chunk) > self.min_chunk_size: | |
| split_parts = chunk.split(separator) | |
| # Nettoie et filtre les parties vides | |
| split_parts = [part.strip() for part in split_parts if part.strip()] | |
| new_chunks.extend(split_parts) | |
| else: | |
| new_chunks.append(chunk) | |
| chunks = new_chunks | |
| return [chunk for chunk in chunks if len(chunk.strip()) >= self.min_chunk_size] | |
| def _apply_size_constraint(self, chunks: List[str], max_size: int) -> List[str]: | |
| """Applique une contrainte de taille maximale aux chunks""" | |
| result_chunks = [] | |
| for chunk in chunks: | |
| if len(chunk) <= max_size: | |
| result_chunks.append(chunk) | |
| else: | |
| # Découpe les chunks trop longs | |
| words = chunk.split() | |
| current_chunk = [] | |
| current_size = 0 | |
| for word in words: | |
| word_size = len(word) + 1 # +1 pour l'espace | |
| if current_size + word_size > max_size and current_chunk: | |
| result_chunks.append(" ".join(current_chunk)) | |
| current_chunk = [word] | |
| current_size = word_size | |
| else: | |
| current_chunk.append(word) | |
| current_size += word_size | |
| if current_chunk: | |
| result_chunks.append(" ".join(current_chunk)) | |
| return result_chunks | |
| def _add_overlap(self, chunks: List[str]) -> List[str]: | |
| """Ajoute du chevauchement entre chunks adjacents""" | |
| if len(chunks) <= 1: | |
| return chunks | |
| overlapped_chunks = [] | |
| for i, chunk in enumerate(chunks): | |
| current_chunk = chunk | |
| # Ajoute le contexte du chunk précédent | |
| if i > 0: | |
| prev_words = chunks[i-1].split() | |
| overlap_size = int(len(prev_words) * self.overlap_ratio) | |
| if overlap_size > 0: | |
| prefix = " ".join(prev_words[-overlap_size:]) | |
| current_chunk = f"{prefix} {current_chunk}" | |
| # Ajoute le contexte du chunk suivant | |
| if i < len(chunks) - 1: | |
| next_words = chunks[i+1].split() | |
| overlap_size = int(len(next_words) * self.overlap_ratio) | |
| if overlap_size > 0: | |
| suffix = " ".join(next_words[:overlap_size]) | |
| current_chunk = f"{current_chunk} {suffix}" | |
| overlapped_chunks.append(current_chunk) | |
| return overlapped_chunks | |
| async def _get_embedding(self, text: str) -> Optional[List[float]]: | |
| """Obtient l'embedding d'un texte via le modèle LlamaIndex""" | |
| try: | |
| # Utilise la méthode standard LlamaIndex BaseEmbedding | |
| embedding = await self.embed_model.aget_text_embedding(text) | |
| return embedding | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur embedding pour chunk: {e}") | |
| return None | |
| def _calculate_semantic_similarity(self, embedding1: List[float], | |
| embedding2: List[float]) -> float: | |
| """Calcule la similarité cosinus entre deux embeddings""" | |
| try: | |
| import numpy as np | |
| vec1 = np.array(embedding1) | |
| vec2 = np.array(embedding2) | |
| # Similarité cosinus | |
| dot_product = np.dot(vec1, vec2) | |
| magnitude1 = np.linalg.norm(vec1) | |
| magnitude2 = np.linalg.norm(vec2) | |
| if magnitude1 == 0 or magnitude2 == 0: | |
| return 0.0 | |
| similarity = dot_product / (magnitude1 * magnitude2) | |
| return float(similarity) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur calcul similarité: {e}") | |
| return 0.0 | |
| async def _chunk_recursive_level(self, text: str, level: int, | |
| parent_id: Optional[str] = None) -> List[ChunkResult]: | |
| """Applique le chunking récursif pour un niveau donné""" | |
| if level >= len(self.chunk_sizes): | |
| return [] | |
| max_size = self.chunk_sizes[level] | |
| # 1. Découpage initial par séparateurs | |
| raw_chunks = self._split_by_separators(text, self.separators) | |
| # 2. Application de la contrainte de taille | |
| sized_chunks = self._apply_size_constraint(raw_chunks, max_size) | |
| # 3. Ajout du chevauchement | |
| overlapped_chunks = self._add_overlap(sized_chunks) | |
| # 4. Création des objets ChunkResult | |
| chunk_results = [] | |
| for i, chunk_text in enumerate(overlapped_chunks): | |
| chunk_id = self._generate_chunk_id(chunk_text, level, parent_id) | |
| # Obtient l'embedding | |
| embedding = await self._get_embedding(chunk_text) | |
| chunk_result = ChunkResult( | |
| id=chunk_id, | |
| text=chunk_text, | |
| level=level, | |
| parent_id=parent_id, | |
| embedding_vector=embedding, | |
| metadata={ | |
| "position": i, | |
| "total_chunks": len(overlapped_chunks), | |
| "size": len(chunk_text), | |
| "max_size": max_size | |
| } | |
| ) | |
| chunk_results.append(chunk_result) | |
| # 5. Chunking récursif pour le niveau suivant | |
| all_chunks = chunk_results.copy() | |
| for chunk_result in chunk_results: | |
| if len(chunk_result.text) > self.min_chunk_size * 2: # Seulement si assez grand | |
| sub_chunks = await self._chunk_recursive_level( | |
| chunk_result.text, | |
| level + 1, | |
| chunk_result.id | |
| ) | |
| # Met à jour les relations parent-enfant | |
| chunk_result.children_ids = [sub_chunk.id for sub_chunk in sub_chunks] | |
| all_chunks.extend(sub_chunks) | |
| return all_chunks | |
| async def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[ChunkResult]: | |
| """ | |
| Point d'entrée principal pour le chunking récursif | |
| Args: | |
| text: Texte à chunker | |
| metadata: Métadonnées à attacher aux chunks | |
| Returns: | |
| Liste des chunks avec hiérarchie et relations | |
| """ | |
| if not text or len(text.strip()) < self.min_chunk_size: | |
| logger.warning("⚠️ Texte trop court pour chunking") | |
| return [] | |
| logger.info(f"🚀 Début chunking récursif - {len(text)} caractères") | |
| try: | |
| # Chunking récursif à partir du niveau 0 | |
| all_chunks = await self._chunk_recursive_level(text, level=0) | |
| # Enrichit les métadonnées | |
| for chunk in all_chunks: | |
| if metadata: | |
| chunk.metadata.update(metadata) | |
| chunk.metadata["total_levels"] = len(self.chunk_sizes) | |
| chunk.metadata["algorithm"] = "CustomRecursiveChunker" | |
| # Calcule les similarités sémantiques entre chunks du même niveau | |
| await self._compute_semantic_similarities(all_chunks) | |
| logger.info(f"✅ Chunking terminé - {len(all_chunks)} chunks générés") | |
| return all_chunks | |
| except Exception as e: | |
| logger.error(f"❌ Erreur chunking récursif: {e}") | |
| raise | |
| async def _compute_semantic_similarities(self, chunks: List[ChunkResult]): | |
| """Calcule les similarités sémantiques entre chunks""" | |
| # Groupe les chunks par niveau | |
| chunks_by_level = {} | |
| for chunk in chunks: | |
| if chunk.level not in chunks_by_level: | |
| chunks_by_level[chunk.level] = [] | |
| chunks_by_level[chunk.level].append(chunk) | |
| # Calcule les similarités pour chaque niveau | |
| for level, level_chunks in chunks_by_level.items(): | |
| for i, chunk1 in enumerate(level_chunks): | |
| if chunk1.embedding_vector is None: | |
| continue | |
| max_similarity = 0.0 | |
| for j, chunk2 in enumerate(level_chunks): | |
| if i != j and chunk2.embedding_vector is not None: | |
| similarity = self._calculate_semantic_similarity( | |
| chunk1.embedding_vector, | |
| chunk2.embedding_vector | |
| ) | |
| max_similarity = max(max_similarity, similarity) | |
| chunk1.semantic_similarity = max_similarity | |
| def to_obsidian_format(self, chunks: List[ChunkResult], | |
| source_title: str = "Document") -> str: | |
| """Convertit les chunks en format Obsidian avec liens hiérarchiques""" | |
| obsidian_content = [] | |
| obsidian_content.append(f"# {source_title} - Chunking Hiérarchique\n") | |
| # Groupe par niveau pour affichage structuré | |
| chunks_by_level = {} | |
| for chunk in chunks: | |
| if chunk.level not in chunks_by_level: | |
| chunks_by_level[chunk.level] = [] | |
| chunks_by_level[chunk.level].append(chunk) | |
| for level in sorted(chunks_by_level.keys()): | |
| level_chunks = chunks_by_level[level] | |
| obsidian_content.append(f"\n## Niveau {level} ({len(level_chunks)} chunks)\n") | |
| for chunk in level_chunks: | |
| # Titre du chunk avec ID | |
| obsidian_content.append(f"### [[{chunk.id}]] {chunk.id}") | |
| # Métadonnées | |
| obsidian_content.append("```yaml") | |
| obsidian_content.append(f"level: {chunk.level}") | |
| obsidian_content.append(f"parent: {chunk.parent_id or 'root'}") | |
| obsidian_content.append(f"children: {len(chunk.children_ids)}") | |
| obsidian_content.append(f"size: {len(chunk.text)}") | |
| if chunk.semantic_similarity: | |
| obsidian_content.append(f"similarity: {chunk.semantic_similarity:.3f}") | |
| obsidian_content.append("```\n") | |
| # Liens de navigation | |
| if chunk.parent_id: | |
| obsidian_content.append(f"**Parent:** [[{chunk.parent_id}]]") | |
| if chunk.children_ids: | |
| children_links = ", ".join([f"[[{child_id}]]" for child_id in chunk.children_ids]) | |
| obsidian_content.append(f"**Enfants:** {children_links}") | |
| # Contenu du chunk | |
| obsidian_content.append(f"\n**Contenu:**\n{chunk.text}\n") | |
| obsidian_content.append("---\n") | |
| return "\n".join(obsidian_content) | |
| def to_json_format(self, chunks: List[ChunkResult]) -> List[Dict[str, Any]]: | |
| """Convertit les chunks en format JSON pour API""" | |
| return [ | |
| { | |
| "id": chunk.id, | |
| "text": chunk.text, | |
| "level": chunk.level, | |
| "parent_id": chunk.parent_id, | |
| "children_ids": chunk.children_ids, | |
| "metadata": chunk.metadata, | |
| "has_embedding": chunk.embedding_vector is not None, | |
| "semantic_similarity": chunk.semantic_similarity | |
| } | |
| for chunk in chunks | |
| ] | |