File size: 14,888 Bytes
5250f87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
Custom Recursive Semantic Chunker v4.0
Contourne les limitations de chonkie 1.0.10 et implemente 
un chunking récursif intelligent avec hiérarchie et parentalité.

Auteur: Assistant Claude
Compatible avec: LlamaIndex v0.12, HuggingFace embeddings
"""

import re
import hashlib
import logging
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from llama_index.core.schema import BaseEmbedding

logger = logging.getLogger(__name__)

@dataclass
class ChunkResult:
    """Résultat d'un chunk avec métadonnées hiérarchiques"""
    id: str
    text: str
    level: int
    parent_id: Optional[str] = None
    children_ids: List[str] = None
    metadata: Dict[str, Any] = None
    embedding_vector: Optional[List[float]] = None
    semantic_similarity: Optional[float] = None
    
    def __post_init__(self):
        if self.children_ids is None:
            self.children_ids = []
        if self.metadata is None:
            self.metadata = {}

class CustomRecursiveChunker:
    """
    Chunker récursif intelligent qui simule le comportement 
    souhaité sans dépendre des versions instables de chonkie
    """
    
    def __init__(self, 
                 embed_model: BaseEmbedding,
                 chunk_sizes: List[int] = [2048, 512, 128],
                 separators: List[str] = ["\n\n", "\n", ".", "!", "?", "—"],
                 overlap_ratio: float = 0.1,
                 min_chunk_size: int = 50,
                 semantic_threshold: float = 0.75):
        """
        Initialise le chunker personnalisé
        
        Args:
            embed_model: Modèle d'embedding LlamaIndex BaseEmbedding
            chunk_sizes: Tailles hiérarchiques des chunks [grand, moyen, petit]
            separators: Séparateurs pour découpage hiérarchique
            overlap_ratio: Ratio de chevauchement entre chunks
            min_chunk_size: Taille minimale d'un chunk
            semantic_threshold: Seuil de similarité sémantique
        """
        self.embed_model = embed_model
        self.chunk_sizes = sorted(chunk_sizes, reverse=True)  # [2048, 512, 128]
        self.separators = separators
        self.overlap_ratio = overlap_ratio
        self.min_chunk_size = min_chunk_size
        self.semantic_threshold = semantic_threshold
        
        logger.info(f"✅ CustomRecursiveChunker initialisé avec {len(chunk_sizes)} niveaux")
    
    def _generate_chunk_id(self, text: str, level: int, parent_id: str = None) -> str:
        """Génère un ID unique pour un chunk"""
        base_string = f"{text[:50]}-{level}-{parent_id or 'root'}"
        return hashlib.md5(base_string.encode()).hexdigest()[:12]
    
    def _split_by_separators(self, text: str, separators: List[str]) -> List[str]:
        """Découpe le texte selon une hiérarchie de séparateurs"""
        chunks = [text]
        
        for separator in separators:
            new_chunks = []
            for chunk in chunks:
                if len(chunk) > self.min_chunk_size:
                    split_parts = chunk.split(separator)
                    # Nettoie et filtre les parties vides
                    split_parts = [part.strip() for part in split_parts if part.strip()]
                    new_chunks.extend(split_parts)
                else:
                    new_chunks.append(chunk)
            chunks = new_chunks
            
        return [chunk for chunk in chunks if len(chunk.strip()) >= self.min_chunk_size]
    
    def _apply_size_constraint(self, chunks: List[str], max_size: int) -> List[str]:
        """Applique une contrainte de taille maximale aux chunks"""
        result_chunks = []
        
        for chunk in chunks:
            if len(chunk) <= max_size:
                result_chunks.append(chunk)
            else:
                # Découpe les chunks trop longs
                words = chunk.split()
                current_chunk = []
                current_size = 0
                
                for word in words:
                    word_size = len(word) + 1  # +1 pour l'espace
                    if current_size + word_size > max_size and current_chunk:
                        result_chunks.append(" ".join(current_chunk))
                        current_chunk = [word]
                        current_size = word_size
                    else:
                        current_chunk.append(word)
                        current_size += word_size
                
                if current_chunk:
                    result_chunks.append(" ".join(current_chunk))
        
        return result_chunks
    
    def _add_overlap(self, chunks: List[str]) -> List[str]:
        """Ajoute du chevauchement entre chunks adjacents"""
        if len(chunks) <= 1:
            return chunks
            
        overlapped_chunks = []
        
        for i, chunk in enumerate(chunks):
            current_chunk = chunk
            
            # Ajoute le contexte du chunk précédent
            if i > 0:
                prev_words = chunks[i-1].split()
                overlap_size = int(len(prev_words) * self.overlap_ratio)
                if overlap_size > 0:
                    prefix = " ".join(prev_words[-overlap_size:])
                    current_chunk = f"{prefix} {current_chunk}"
            
            # Ajoute le contexte du chunk suivant
            if i < len(chunks) - 1:
                next_words = chunks[i+1].split()
                overlap_size = int(len(next_words) * self.overlap_ratio)
                if overlap_size > 0:
                    suffix = " ".join(next_words[:overlap_size])
                    current_chunk = f"{current_chunk} {suffix}"
            
            overlapped_chunks.append(current_chunk)
        
        return overlapped_chunks
    
    async def _get_embedding(self, text: str) -> Optional[List[float]]:
        """Obtient l'embedding d'un texte via le modèle LlamaIndex"""
        try:
            # Utilise la méthode standard LlamaIndex BaseEmbedding
            embedding = await self.embed_model.aget_text_embedding(text)
            return embedding
        except Exception as e:
            logger.warning(f"⚠️ Erreur embedding pour chunk: {e}")
            return None
    
    def _calculate_semantic_similarity(self, embedding1: List[float], 
                                     embedding2: List[float]) -> float:
        """Calcule la similarité cosinus entre deux embeddings"""
        try:
            import numpy as np
            
            vec1 = np.array(embedding1)
            vec2 = np.array(embedding2)
            
            # Similarité cosinus
            dot_product = np.dot(vec1, vec2)
            magnitude1 = np.linalg.norm(vec1)
            magnitude2 = np.linalg.norm(vec2)
            
            if magnitude1 == 0 or magnitude2 == 0:
                return 0.0
                
            similarity = dot_product / (magnitude1 * magnitude2)
            return float(similarity)
            
        except Exception as e:
            logger.warning(f"⚠️ Erreur calcul similarité: {e}")
            return 0.0
    
    async def _chunk_recursive_level(self, text: str, level: int, 
                                   parent_id: Optional[str] = None) -> List[ChunkResult]:
        """Applique le chunking récursif pour un niveau donné"""
        if level >= len(self.chunk_sizes):
            return []
        
        max_size = self.chunk_sizes[level]
        
        # 1. Découpage initial par séparateurs
        raw_chunks = self._split_by_separators(text, self.separators)
        
        # 2. Application de la contrainte de taille
        sized_chunks = self._apply_size_constraint(raw_chunks, max_size)
        
        # 3. Ajout du chevauchement
        overlapped_chunks = self._add_overlap(sized_chunks)
        
        # 4. Création des objets ChunkResult
        chunk_results = []
        
        for i, chunk_text in enumerate(overlapped_chunks):
            chunk_id = self._generate_chunk_id(chunk_text, level, parent_id)
            
            # Obtient l'embedding
            embedding = await self._get_embedding(chunk_text)
            
            chunk_result = ChunkResult(
                id=chunk_id,
                text=chunk_text,
                level=level,
                parent_id=parent_id,
                embedding_vector=embedding,
                metadata={
                    "position": i,
                    "total_chunks": len(overlapped_chunks),
                    "size": len(chunk_text),
                    "max_size": max_size
                }
            )
            
            chunk_results.append(chunk_result)
        
        # 5. Chunking récursif pour le niveau suivant
        all_chunks = chunk_results.copy()
        
        for chunk_result in chunk_results:
            if len(chunk_result.text) > self.min_chunk_size * 2:  # Seulement si assez grand
                sub_chunks = await self._chunk_recursive_level(
                    chunk_result.text, 
                    level + 1, 
                    chunk_result.id
                )
                
                # Met à jour les relations parent-enfant
                chunk_result.children_ids = [sub_chunk.id for sub_chunk in sub_chunks]
                all_chunks.extend(sub_chunks)
        
        return all_chunks
    
    async def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[ChunkResult]:
        """
        Point d'entrée principal pour le chunking récursif
        
        Args:
            text: Texte à chunker
            metadata: Métadonnées à attacher aux chunks
            
        Returns:
            Liste des chunks avec hiérarchie et relations
        """
        if not text or len(text.strip()) < self.min_chunk_size:
            logger.warning("⚠️ Texte trop court pour chunking")
            return []
        
        logger.info(f"🚀 Début chunking récursif - {len(text)} caractères")
        
        try:
            # Chunking récursif à partir du niveau 0
            all_chunks = await self._chunk_recursive_level(text, level=0)
            
            # Enrichit les métadonnées
            for chunk in all_chunks:
                if metadata:
                    chunk.metadata.update(metadata)
                chunk.metadata["total_levels"] = len(self.chunk_sizes)
                chunk.metadata["algorithm"] = "CustomRecursiveChunker"
            
            # Calcule les similarités sémantiques entre chunks du même niveau
            await self._compute_semantic_similarities(all_chunks)
            
            logger.info(f"✅ Chunking terminé - {len(all_chunks)} chunks générés")
            return all_chunks
            
        except Exception as e:
            logger.error(f"❌ Erreur chunking récursif: {e}")
            raise
    
    async def _compute_semantic_similarities(self, chunks: List[ChunkResult]):
        """Calcule les similarités sémantiques entre chunks"""
        # Groupe les chunks par niveau
        chunks_by_level = {}
        for chunk in chunks:
            if chunk.level not in chunks_by_level:
                chunks_by_level[chunk.level] = []
            chunks_by_level[chunk.level].append(chunk)
        
        # Calcule les similarités pour chaque niveau
        for level, level_chunks in chunks_by_level.items():
            for i, chunk1 in enumerate(level_chunks):
                if chunk1.embedding_vector is None:
                    continue
                    
                max_similarity = 0.0
                for j, chunk2 in enumerate(level_chunks):
                    if i != j and chunk2.embedding_vector is not None:
                        similarity = self._calculate_semantic_similarity(
                            chunk1.embedding_vector, 
                            chunk2.embedding_vector
                        )
                        max_similarity = max(max_similarity, similarity)
                
                chunk1.semantic_similarity = max_similarity
    
    def to_obsidian_format(self, chunks: List[ChunkResult], 
                          source_title: str = "Document") -> str:
        """Convertit les chunks en format Obsidian avec liens hiérarchiques"""
        obsidian_content = []
        obsidian_content.append(f"# {source_title} - Chunking Hiérarchique\n")
        
        # Groupe par niveau pour affichage structuré
        chunks_by_level = {}
        for chunk in chunks:
            if chunk.level not in chunks_by_level:
                chunks_by_level[chunk.level] = []
            chunks_by_level[chunk.level].append(chunk)
        
        for level in sorted(chunks_by_level.keys()):
            level_chunks = chunks_by_level[level]
            obsidian_content.append(f"\n## Niveau {level} ({len(level_chunks)} chunks)\n")
            
            for chunk in level_chunks:
                # Titre du chunk avec ID
                obsidian_content.append(f"### [[{chunk.id}]] {chunk.id}")
                
                # Métadonnées
                obsidian_content.append("```yaml")
                obsidian_content.append(f"level: {chunk.level}")
                obsidian_content.append(f"parent: {chunk.parent_id or 'root'}")
                obsidian_content.append(f"children: {len(chunk.children_ids)}")
                obsidian_content.append(f"size: {len(chunk.text)}")
                if chunk.semantic_similarity:
                    obsidian_content.append(f"similarity: {chunk.semantic_similarity:.3f}")
                obsidian_content.append("```\n")
                
                # Liens de navigation
                if chunk.parent_id:
                    obsidian_content.append(f"**Parent:** [[{chunk.parent_id}]]")
                if chunk.children_ids:
                    children_links = ", ".join([f"[[{child_id}]]" for child_id in chunk.children_ids])
                    obsidian_content.append(f"**Enfants:** {children_links}")
                
                # Contenu du chunk
                obsidian_content.append(f"\n**Contenu:**\n{chunk.text}\n")
                obsidian_content.append("---\n")
        
        return "\n".join(obsidian_content)
    
    def to_json_format(self, chunks: List[ChunkResult]) -> List[Dict[str, Any]]:
        """Convertit les chunks en format JSON pour API"""
        return [
            {
                "id": chunk.id,
                "text": chunk.text,
                "level": chunk.level,
                "parent_id": chunk.parent_id,
                "children_ids": chunk.children_ids,
                "metadata": chunk.metadata,
                "has_embedding": chunk.embedding_vector is not None,
                "semantic_similarity": chunk.semantic_similarity
            }
            for chunk in chunks
        ]