chunking-intelligent-api / custom_recursive_chunker.py
KJ24's picture
Upload custom_recursive_chunker.py
5250f87 verified
"""
Custom Recursive Semantic Chunker v4.0
Contourne les limitations de chonkie 1.0.10 et implemente
un chunking récursif intelligent avec hiérarchie et parentalité.
Auteur: Assistant Claude
Compatible avec: LlamaIndex v0.12, HuggingFace embeddings
"""
import re
import hashlib
import logging
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from llama_index.core.schema import BaseEmbedding
logger = logging.getLogger(__name__)
@dataclass
class ChunkResult:
"""Résultat d'un chunk avec métadonnées hiérarchiques"""
id: str
text: str
level: int
parent_id: Optional[str] = None
children_ids: List[str] = None
metadata: Dict[str, Any] = None
embedding_vector: Optional[List[float]] = None
semantic_similarity: Optional[float] = None
def __post_init__(self):
if self.children_ids is None:
self.children_ids = []
if self.metadata is None:
self.metadata = {}
class CustomRecursiveChunker:
"""
Chunker récursif intelligent qui simule le comportement
souhaité sans dépendre des versions instables de chonkie
"""
def __init__(self,
embed_model: BaseEmbedding,
chunk_sizes: List[int] = [2048, 512, 128],
separators: List[str] = ["\n\n", "\n", ".", "!", "?", "—"],
overlap_ratio: float = 0.1,
min_chunk_size: int = 50,
semantic_threshold: float = 0.75):
"""
Initialise le chunker personnalisé
Args:
embed_model: Modèle d'embedding LlamaIndex BaseEmbedding
chunk_sizes: Tailles hiérarchiques des chunks [grand, moyen, petit]
separators: Séparateurs pour découpage hiérarchique
overlap_ratio: Ratio de chevauchement entre chunks
min_chunk_size: Taille minimale d'un chunk
semantic_threshold: Seuil de similarité sémantique
"""
self.embed_model = embed_model
self.chunk_sizes = sorted(chunk_sizes, reverse=True) # [2048, 512, 128]
self.separators = separators
self.overlap_ratio = overlap_ratio
self.min_chunk_size = min_chunk_size
self.semantic_threshold = semantic_threshold
logger.info(f"✅ CustomRecursiveChunker initialisé avec {len(chunk_sizes)} niveaux")
def _generate_chunk_id(self, text: str, level: int, parent_id: str = None) -> str:
"""Génère un ID unique pour un chunk"""
base_string = f"{text[:50]}-{level}-{parent_id or 'root'}"
return hashlib.md5(base_string.encode()).hexdigest()[:12]
def _split_by_separators(self, text: str, separators: List[str]) -> List[str]:
"""Découpe le texte selon une hiérarchie de séparateurs"""
chunks = [text]
for separator in separators:
new_chunks = []
for chunk in chunks:
if len(chunk) > self.min_chunk_size:
split_parts = chunk.split(separator)
# Nettoie et filtre les parties vides
split_parts = [part.strip() for part in split_parts if part.strip()]
new_chunks.extend(split_parts)
else:
new_chunks.append(chunk)
chunks = new_chunks
return [chunk for chunk in chunks if len(chunk.strip()) >= self.min_chunk_size]
def _apply_size_constraint(self, chunks: List[str], max_size: int) -> List[str]:
"""Applique une contrainte de taille maximale aux chunks"""
result_chunks = []
for chunk in chunks:
if len(chunk) <= max_size:
result_chunks.append(chunk)
else:
# Découpe les chunks trop longs
words = chunk.split()
current_chunk = []
current_size = 0
for word in words:
word_size = len(word) + 1 # +1 pour l'espace
if current_size + word_size > max_size and current_chunk:
result_chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_size = word_size
else:
current_chunk.append(word)
current_size += word_size
if current_chunk:
result_chunks.append(" ".join(current_chunk))
return result_chunks
def _add_overlap(self, chunks: List[str]) -> List[str]:
"""Ajoute du chevauchement entre chunks adjacents"""
if len(chunks) <= 1:
return chunks
overlapped_chunks = []
for i, chunk in enumerate(chunks):
current_chunk = chunk
# Ajoute le contexte du chunk précédent
if i > 0:
prev_words = chunks[i-1].split()
overlap_size = int(len(prev_words) * self.overlap_ratio)
if overlap_size > 0:
prefix = " ".join(prev_words[-overlap_size:])
current_chunk = f"{prefix} {current_chunk}"
# Ajoute le contexte du chunk suivant
if i < len(chunks) - 1:
next_words = chunks[i+1].split()
overlap_size = int(len(next_words) * self.overlap_ratio)
if overlap_size > 0:
suffix = " ".join(next_words[:overlap_size])
current_chunk = f"{current_chunk} {suffix}"
overlapped_chunks.append(current_chunk)
return overlapped_chunks
async def _get_embedding(self, text: str) -> Optional[List[float]]:
"""Obtient l'embedding d'un texte via le modèle LlamaIndex"""
try:
# Utilise la méthode standard LlamaIndex BaseEmbedding
embedding = await self.embed_model.aget_text_embedding(text)
return embedding
except Exception as e:
logger.warning(f"⚠️ Erreur embedding pour chunk: {e}")
return None
def _calculate_semantic_similarity(self, embedding1: List[float],
embedding2: List[float]) -> float:
"""Calcule la similarité cosinus entre deux embeddings"""
try:
import numpy as np
vec1 = np.array(embedding1)
vec2 = np.array(embedding2)
# Similarité cosinus
dot_product = np.dot(vec1, vec2)
magnitude1 = np.linalg.norm(vec1)
magnitude2 = np.linalg.norm(vec2)
if magnitude1 == 0 or magnitude2 == 0:
return 0.0
similarity = dot_product / (magnitude1 * magnitude2)
return float(similarity)
except Exception as e:
logger.warning(f"⚠️ Erreur calcul similarité: {e}")
return 0.0
async def _chunk_recursive_level(self, text: str, level: int,
parent_id: Optional[str] = None) -> List[ChunkResult]:
"""Applique le chunking récursif pour un niveau donné"""
if level >= len(self.chunk_sizes):
return []
max_size = self.chunk_sizes[level]
# 1. Découpage initial par séparateurs
raw_chunks = self._split_by_separators(text, self.separators)
# 2. Application de la contrainte de taille
sized_chunks = self._apply_size_constraint(raw_chunks, max_size)
# 3. Ajout du chevauchement
overlapped_chunks = self._add_overlap(sized_chunks)
# 4. Création des objets ChunkResult
chunk_results = []
for i, chunk_text in enumerate(overlapped_chunks):
chunk_id = self._generate_chunk_id(chunk_text, level, parent_id)
# Obtient l'embedding
embedding = await self._get_embedding(chunk_text)
chunk_result = ChunkResult(
id=chunk_id,
text=chunk_text,
level=level,
parent_id=parent_id,
embedding_vector=embedding,
metadata={
"position": i,
"total_chunks": len(overlapped_chunks),
"size": len(chunk_text),
"max_size": max_size
}
)
chunk_results.append(chunk_result)
# 5. Chunking récursif pour le niveau suivant
all_chunks = chunk_results.copy()
for chunk_result in chunk_results:
if len(chunk_result.text) > self.min_chunk_size * 2: # Seulement si assez grand
sub_chunks = await self._chunk_recursive_level(
chunk_result.text,
level + 1,
chunk_result.id
)
# Met à jour les relations parent-enfant
chunk_result.children_ids = [sub_chunk.id for sub_chunk in sub_chunks]
all_chunks.extend(sub_chunks)
return all_chunks
async def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[ChunkResult]:
"""
Point d'entrée principal pour le chunking récursif
Args:
text: Texte à chunker
metadata: Métadonnées à attacher aux chunks
Returns:
Liste des chunks avec hiérarchie et relations
"""
if not text or len(text.strip()) < self.min_chunk_size:
logger.warning("⚠️ Texte trop court pour chunking")
return []
logger.info(f"🚀 Début chunking récursif - {len(text)} caractères")
try:
# Chunking récursif à partir du niveau 0
all_chunks = await self._chunk_recursive_level(text, level=0)
# Enrichit les métadonnées
for chunk in all_chunks:
if metadata:
chunk.metadata.update(metadata)
chunk.metadata["total_levels"] = len(self.chunk_sizes)
chunk.metadata["algorithm"] = "CustomRecursiveChunker"
# Calcule les similarités sémantiques entre chunks du même niveau
await self._compute_semantic_similarities(all_chunks)
logger.info(f"✅ Chunking terminé - {len(all_chunks)} chunks générés")
return all_chunks
except Exception as e:
logger.error(f"❌ Erreur chunking récursif: {e}")
raise
async def _compute_semantic_similarities(self, chunks: List[ChunkResult]):
"""Calcule les similarités sémantiques entre chunks"""
# Groupe les chunks par niveau
chunks_by_level = {}
for chunk in chunks:
if chunk.level not in chunks_by_level:
chunks_by_level[chunk.level] = []
chunks_by_level[chunk.level].append(chunk)
# Calcule les similarités pour chaque niveau
for level, level_chunks in chunks_by_level.items():
for i, chunk1 in enumerate(level_chunks):
if chunk1.embedding_vector is None:
continue
max_similarity = 0.0
for j, chunk2 in enumerate(level_chunks):
if i != j and chunk2.embedding_vector is not None:
similarity = self._calculate_semantic_similarity(
chunk1.embedding_vector,
chunk2.embedding_vector
)
max_similarity = max(max_similarity, similarity)
chunk1.semantic_similarity = max_similarity
def to_obsidian_format(self, chunks: List[ChunkResult],
source_title: str = "Document") -> str:
"""Convertit les chunks en format Obsidian avec liens hiérarchiques"""
obsidian_content = []
obsidian_content.append(f"# {source_title} - Chunking Hiérarchique\n")
# Groupe par niveau pour affichage structuré
chunks_by_level = {}
for chunk in chunks:
if chunk.level not in chunks_by_level:
chunks_by_level[chunk.level] = []
chunks_by_level[chunk.level].append(chunk)
for level in sorted(chunks_by_level.keys()):
level_chunks = chunks_by_level[level]
obsidian_content.append(f"\n## Niveau {level} ({len(level_chunks)} chunks)\n")
for chunk in level_chunks:
# Titre du chunk avec ID
obsidian_content.append(f"### [[{chunk.id}]] {chunk.id}")
# Métadonnées
obsidian_content.append("```yaml")
obsidian_content.append(f"level: {chunk.level}")
obsidian_content.append(f"parent: {chunk.parent_id or 'root'}")
obsidian_content.append(f"children: {len(chunk.children_ids)}")
obsidian_content.append(f"size: {len(chunk.text)}")
if chunk.semantic_similarity:
obsidian_content.append(f"similarity: {chunk.semantic_similarity:.3f}")
obsidian_content.append("```\n")
# Liens de navigation
if chunk.parent_id:
obsidian_content.append(f"**Parent:** [[{chunk.parent_id}]]")
if chunk.children_ids:
children_links = ", ".join([f"[[{child_id}]]" for child_id in chunk.children_ids])
obsidian_content.append(f"**Enfants:** {children_links}")
# Contenu du chunk
obsidian_content.append(f"\n**Contenu:**\n{chunk.text}\n")
obsidian_content.append("---\n")
return "\n".join(obsidian_content)
def to_json_format(self, chunks: List[ChunkResult]) -> List[Dict[str, Any]]:
"""Convertit les chunks en format JSON pour API"""
return [
{
"id": chunk.id,
"text": chunk.text,
"level": chunk.level,
"parent_id": chunk.parent_id,
"children_ids": chunk.children_ids,
"metadata": chunk.metadata,
"has_embedding": chunk.embedding_vector is not None,
"semantic_similarity": chunk.semantic_similarity
}
for chunk in chunks
]