KJ24 commited on
Commit
5250f87
·
verified ·
1 Parent(s): e9e9fcf

Upload custom_recursive_chunker.py

Browse files
Files changed (1) hide show
  1. custom_recursive_chunker.py +366 -0
custom_recursive_chunker.py ADDED
@@ -0,0 +1,366 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Custom Recursive Semantic Chunker v4.0
3
+ Contourne les limitations de chonkie 1.0.10 et implemente
4
+ un chunking récursif intelligent avec hiérarchie et parentalité.
5
+
6
+ Auteur: Assistant Claude
7
+ Compatible avec: LlamaIndex v0.12, HuggingFace embeddings
8
+ """
9
+
10
+ import re
11
+ import hashlib
12
+ import logging
13
+ from typing import List, Dict, Any, Optional, Tuple
14
+ from dataclasses import dataclass
15
+ from llama_index.core.schema import BaseEmbedding
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+ @dataclass
20
+ class ChunkResult:
21
+ """Résultat d'un chunk avec métadonnées hiérarchiques"""
22
+ id: str
23
+ text: str
24
+ level: int
25
+ parent_id: Optional[str] = None
26
+ children_ids: List[str] = None
27
+ metadata: Dict[str, Any] = None
28
+ embedding_vector: Optional[List[float]] = None
29
+ semantic_similarity: Optional[float] = None
30
+
31
+ def __post_init__(self):
32
+ if self.children_ids is None:
33
+ self.children_ids = []
34
+ if self.metadata is None:
35
+ self.metadata = {}
36
+
37
+ class CustomRecursiveChunker:
38
+ """
39
+ Chunker récursif intelligent qui simule le comportement
40
+ souhaité sans dépendre des versions instables de chonkie
41
+ """
42
+
43
+ def __init__(self,
44
+ embed_model: BaseEmbedding,
45
+ chunk_sizes: List[int] = [2048, 512, 128],
46
+ separators: List[str] = ["\n\n", "\n", ".", "!", "?", "—"],
47
+ overlap_ratio: float = 0.1,
48
+ min_chunk_size: int = 50,
49
+ semantic_threshold: float = 0.75):
50
+ """
51
+ Initialise le chunker personnalisé
52
+
53
+ Args:
54
+ embed_model: Modèle d'embedding LlamaIndex BaseEmbedding
55
+ chunk_sizes: Tailles hiérarchiques des chunks [grand, moyen, petit]
56
+ separators: Séparateurs pour découpage hiérarchique
57
+ overlap_ratio: Ratio de chevauchement entre chunks
58
+ min_chunk_size: Taille minimale d'un chunk
59
+ semantic_threshold: Seuil de similarité sémantique
60
+ """
61
+ self.embed_model = embed_model
62
+ self.chunk_sizes = sorted(chunk_sizes, reverse=True) # [2048, 512, 128]
63
+ self.separators = separators
64
+ self.overlap_ratio = overlap_ratio
65
+ self.min_chunk_size = min_chunk_size
66
+ self.semantic_threshold = semantic_threshold
67
+
68
+ logger.info(f"✅ CustomRecursiveChunker initialisé avec {len(chunk_sizes)} niveaux")
69
+
70
+ def _generate_chunk_id(self, text: str, level: int, parent_id: str = None) -> str:
71
+ """Génère un ID unique pour un chunk"""
72
+ base_string = f"{text[:50]}-{level}-{parent_id or 'root'}"
73
+ return hashlib.md5(base_string.encode()).hexdigest()[:12]
74
+
75
+ def _split_by_separators(self, text: str, separators: List[str]) -> List[str]:
76
+ """Découpe le texte selon une hiérarchie de séparateurs"""
77
+ chunks = [text]
78
+
79
+ for separator in separators:
80
+ new_chunks = []
81
+ for chunk in chunks:
82
+ if len(chunk) > self.min_chunk_size:
83
+ split_parts = chunk.split(separator)
84
+ # Nettoie et filtre les parties vides
85
+ split_parts = [part.strip() for part in split_parts if part.strip()]
86
+ new_chunks.extend(split_parts)
87
+ else:
88
+ new_chunks.append(chunk)
89
+ chunks = new_chunks
90
+
91
+ return [chunk for chunk in chunks if len(chunk.strip()) >= self.min_chunk_size]
92
+
93
+ def _apply_size_constraint(self, chunks: List[str], max_size: int) -> List[str]:
94
+ """Applique une contrainte de taille maximale aux chunks"""
95
+ result_chunks = []
96
+
97
+ for chunk in chunks:
98
+ if len(chunk) <= max_size:
99
+ result_chunks.append(chunk)
100
+ else:
101
+ # Découpe les chunks trop longs
102
+ words = chunk.split()
103
+ current_chunk = []
104
+ current_size = 0
105
+
106
+ for word in words:
107
+ word_size = len(word) + 1 # +1 pour l'espace
108
+ if current_size + word_size > max_size and current_chunk:
109
+ result_chunks.append(" ".join(current_chunk))
110
+ current_chunk = [word]
111
+ current_size = word_size
112
+ else:
113
+ current_chunk.append(word)
114
+ current_size += word_size
115
+
116
+ if current_chunk:
117
+ result_chunks.append(" ".join(current_chunk))
118
+
119
+ return result_chunks
120
+
121
+ def _add_overlap(self, chunks: List[str]) -> List[str]:
122
+ """Ajoute du chevauchement entre chunks adjacents"""
123
+ if len(chunks) <= 1:
124
+ return chunks
125
+
126
+ overlapped_chunks = []
127
+
128
+ for i, chunk in enumerate(chunks):
129
+ current_chunk = chunk
130
+
131
+ # Ajoute le contexte du chunk pr��cédent
132
+ if i > 0:
133
+ prev_words = chunks[i-1].split()
134
+ overlap_size = int(len(prev_words) * self.overlap_ratio)
135
+ if overlap_size > 0:
136
+ prefix = " ".join(prev_words[-overlap_size:])
137
+ current_chunk = f"{prefix} {current_chunk}"
138
+
139
+ # Ajoute le contexte du chunk suivant
140
+ if i < len(chunks) - 1:
141
+ next_words = chunks[i+1].split()
142
+ overlap_size = int(len(next_words) * self.overlap_ratio)
143
+ if overlap_size > 0:
144
+ suffix = " ".join(next_words[:overlap_size])
145
+ current_chunk = f"{current_chunk} {suffix}"
146
+
147
+ overlapped_chunks.append(current_chunk)
148
+
149
+ return overlapped_chunks
150
+
151
+ async def _get_embedding(self, text: str) -> Optional[List[float]]:
152
+ """Obtient l'embedding d'un texte via le modèle LlamaIndex"""
153
+ try:
154
+ # Utilise la méthode standard LlamaIndex BaseEmbedding
155
+ embedding = await self.embed_model.aget_text_embedding(text)
156
+ return embedding
157
+ except Exception as e:
158
+ logger.warning(f"⚠️ Erreur embedding pour chunk: {e}")
159
+ return None
160
+
161
+ def _calculate_semantic_similarity(self, embedding1: List[float],
162
+ embedding2: List[float]) -> float:
163
+ """Calcule la similarité cosinus entre deux embeddings"""
164
+ try:
165
+ import numpy as np
166
+
167
+ vec1 = np.array(embedding1)
168
+ vec2 = np.array(embedding2)
169
+
170
+ # Similarité cosinus
171
+ dot_product = np.dot(vec1, vec2)
172
+ magnitude1 = np.linalg.norm(vec1)
173
+ magnitude2 = np.linalg.norm(vec2)
174
+
175
+ if magnitude1 == 0 or magnitude2 == 0:
176
+ return 0.0
177
+
178
+ similarity = dot_product / (magnitude1 * magnitude2)
179
+ return float(similarity)
180
+
181
+ except Exception as e:
182
+ logger.warning(f"⚠️ Erreur calcul similarité: {e}")
183
+ return 0.0
184
+
185
+ async def _chunk_recursive_level(self, text: str, level: int,
186
+ parent_id: Optional[str] = None) -> List[ChunkResult]:
187
+ """Applique le chunking récursif pour un niveau donné"""
188
+ if level >= len(self.chunk_sizes):
189
+ return []
190
+
191
+ max_size = self.chunk_sizes[level]
192
+
193
+ # 1. Découpage initial par séparateurs
194
+ raw_chunks = self._split_by_separators(text, self.separators)
195
+
196
+ # 2. Application de la contrainte de taille
197
+ sized_chunks = self._apply_size_constraint(raw_chunks, max_size)
198
+
199
+ # 3. Ajout du chevauchement
200
+ overlapped_chunks = self._add_overlap(sized_chunks)
201
+
202
+ # 4. Création des objets ChunkResult
203
+ chunk_results = []
204
+
205
+ for i, chunk_text in enumerate(overlapped_chunks):
206
+ chunk_id = self._generate_chunk_id(chunk_text, level, parent_id)
207
+
208
+ # Obtient l'embedding
209
+ embedding = await self._get_embedding(chunk_text)
210
+
211
+ chunk_result = ChunkResult(
212
+ id=chunk_id,
213
+ text=chunk_text,
214
+ level=level,
215
+ parent_id=parent_id,
216
+ embedding_vector=embedding,
217
+ metadata={
218
+ "position": i,
219
+ "total_chunks": len(overlapped_chunks),
220
+ "size": len(chunk_text),
221
+ "max_size": max_size
222
+ }
223
+ )
224
+
225
+ chunk_results.append(chunk_result)
226
+
227
+ # 5. Chunking récursif pour le niveau suivant
228
+ all_chunks = chunk_results.copy()
229
+
230
+ for chunk_result in chunk_results:
231
+ if len(chunk_result.text) > self.min_chunk_size * 2: # Seulement si assez grand
232
+ sub_chunks = await self._chunk_recursive_level(
233
+ chunk_result.text,
234
+ level + 1,
235
+ chunk_result.id
236
+ )
237
+
238
+ # Met à jour les relations parent-enfant
239
+ chunk_result.children_ids = [sub_chunk.id for sub_chunk in sub_chunks]
240
+ all_chunks.extend(sub_chunks)
241
+
242
+ return all_chunks
243
+
244
+ async def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[ChunkResult]:
245
+ """
246
+ Point d'entrée principal pour le chunking récursif
247
+
248
+ Args:
249
+ text: Texte à chunker
250
+ metadata: Métadonnées à attacher aux chunks
251
+
252
+ Returns:
253
+ Liste des chunks avec hiérarchie et relations
254
+ """
255
+ if not text or len(text.strip()) < self.min_chunk_size:
256
+ logger.warning("⚠️ Texte trop court pour chunking")
257
+ return []
258
+
259
+ logger.info(f"�� Début chunking récursif - {len(text)} caractères")
260
+
261
+ try:
262
+ # Chunking récursif à partir du niveau 0
263
+ all_chunks = await self._chunk_recursive_level(text, level=0)
264
+
265
+ # Enrichit les métadonnées
266
+ for chunk in all_chunks:
267
+ if metadata:
268
+ chunk.metadata.update(metadata)
269
+ chunk.metadata["total_levels"] = len(self.chunk_sizes)
270
+ chunk.metadata["algorithm"] = "CustomRecursiveChunker"
271
+
272
+ # Calcule les similarités sémantiques entre chunks du même niveau
273
+ await self._compute_semantic_similarities(all_chunks)
274
+
275
+ logger.info(f"✅ Chunking terminé - {len(all_chunks)} chunks générés")
276
+ return all_chunks
277
+
278
+ except Exception as e:
279
+ logger.error(f"❌ Erreur chunking récursif: {e}")
280
+ raise
281
+
282
+ async def _compute_semantic_similarities(self, chunks: List[ChunkResult]):
283
+ """Calcule les similarités sémantiques entre chunks"""
284
+ # Groupe les chunks par niveau
285
+ chunks_by_level = {}
286
+ for chunk in chunks:
287
+ if chunk.level not in chunks_by_level:
288
+ chunks_by_level[chunk.level] = []
289
+ chunks_by_level[chunk.level].append(chunk)
290
+
291
+ # Calcule les similarités pour chaque niveau
292
+ for level, level_chunks in chunks_by_level.items():
293
+ for i, chunk1 in enumerate(level_chunks):
294
+ if chunk1.embedding_vector is None:
295
+ continue
296
+
297
+ max_similarity = 0.0
298
+ for j, chunk2 in enumerate(level_chunks):
299
+ if i != j and chunk2.embedding_vector is not None:
300
+ similarity = self._calculate_semantic_similarity(
301
+ chunk1.embedding_vector,
302
+ chunk2.embedding_vector
303
+ )
304
+ max_similarity = max(max_similarity, similarity)
305
+
306
+ chunk1.semantic_similarity = max_similarity
307
+
308
+ def to_obsidian_format(self, chunks: List[ChunkResult],
309
+ source_title: str = "Document") -> str:
310
+ """Convertit les chunks en format Obsidian avec liens hiérarchiques"""
311
+ obsidian_content = []
312
+ obsidian_content.append(f"# {source_title} - Chunking Hiérarchique\n")
313
+
314
+ # Groupe par niveau pour affichage structuré
315
+ chunks_by_level = {}
316
+ for chunk in chunks:
317
+ if chunk.level not in chunks_by_level:
318
+ chunks_by_level[chunk.level] = []
319
+ chunks_by_level[chunk.level].append(chunk)
320
+
321
+ for level in sorted(chunks_by_level.keys()):
322
+ level_chunks = chunks_by_level[level]
323
+ obsidian_content.append(f"\n## Niveau {level} ({len(level_chunks)} chunks)\n")
324
+
325
+ for chunk in level_chunks:
326
+ # Titre du chunk avec ID
327
+ obsidian_content.append(f"### [[{chunk.id}]] {chunk.id}")
328
+
329
+ # Métadonnées
330
+ obsidian_content.append("```yaml")
331
+ obsidian_content.append(f"level: {chunk.level}")
332
+ obsidian_content.append(f"parent: {chunk.parent_id or 'root'}")
333
+ obsidian_content.append(f"children: {len(chunk.children_ids)}")
334
+ obsidian_content.append(f"size: {len(chunk.text)}")
335
+ if chunk.semantic_similarity:
336
+ obsidian_content.append(f"similarity: {chunk.semantic_similarity:.3f}")
337
+ obsidian_content.append("```\n")
338
+
339
+ # Liens de navigation
340
+ if chunk.parent_id:
341
+ obsidian_content.append(f"**Parent:** [[{chunk.parent_id}]]")
342
+ if chunk.children_ids:
343
+ children_links = ", ".join([f"[[{child_id}]]" for child_id in chunk.children_ids])
344
+ obsidian_content.append(f"**Enfants:** {children_links}")
345
+
346
+ # Contenu du chunk
347
+ obsidian_content.append(f"\n**Contenu:**\n{chunk.text}\n")
348
+ obsidian_content.append("---\n")
349
+
350
+ return "\n".join(obsidian_content)
351
+
352
+ def to_json_format(self, chunks: List[ChunkResult]) -> List[Dict[str, Any]]:
353
+ """Convertit les chunks en format JSON pour API"""
354
+ return [
355
+ {
356
+ "id": chunk.id,
357
+ "text": chunk.text,
358
+ "level": chunk.level,
359
+ "parent_id": chunk.parent_id,
360
+ "children_ids": chunk.children_ids,
361
+ "metadata": chunk.metadata,
362
+ "has_embedding": chunk.embedding_vector is not None,
363
+ "semantic_similarity": chunk.semantic_similarity
364
+ }
365
+ for chunk in chunks
366
+ ]