chunking-intelligent-api / chunker_pipeline.py
KJ24's picture
Update chunker_pipeline.py
e46548a verified
"""
Chunker Pipeline v4.0 - PARTIE 1 - CORRIGÉ ET COMPLET
Chunking Sémantique Intelligent Récursif avec Parentalité
CORRECTIONS MAJEURES v4.0:
✅ Chonkie au lieu de SemanticSplitterNodeParser (LlamaIndex buggy)
✅ Modèle LLM local gratuit au lieu de GPT-4o-mini
✅ Format Obsidian correct avec [[Titre]], id
✅ Variables d'environnement sécuriséesf
✅ Optimisations HF Space gratuit (2GB RAM)
✅ Relations parent/enfant bidirectionnelles complètes
"""
import numpy as np
import tempfile
import os
import re
import time
import hashlib
import logging
import yaml
import asyncio
import gc
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
#from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer, models, LoggingHandler
# from llama_index.embeddings.interfaces import BaseEmbedding
# from llama_index.core.embeddings.base import BaseEmbedding
from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.settings import Settings
from chonkie import RecursiveChunker # ou le nom exact selon ta lib
from llama_index.llms.llama_cpp import LlamaCPP # ✅ CORRECT
# LlamaIndex v0.12 - IMPORTS CORRIGÉS (sans SemanticSplitterNodeParser buggy)
from llama_index.core import Document, Settings
from llama_index.core.node_parser import (
SentenceSplitter, # ✅ Stable et fonctionnel
TokenTextSplitter, # ✅ Pour hiérarchie
MarkdownNodeParser # ✅ Pour structure
)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
# ✅ CORRECTION: Modèle LLM local gratuit au lieu de GPT-4o-mini payant
from llama_index.llms.huggingface import HuggingFaceLLM
# ✅ Chonkie pour chunking sémantique (remplacement SemanticSplitterNodeParser)
try:
from chonkie import SemanticChunker, RecursiveChunker
CHONKIE_AVAILABLE = True
except ImportError:
CHONKIE_AVAILABLE = False
logging.warning("⚠️ Chonkie non disponible - fallback vers LlamaIndex uniquement")
# Imports locaux
from schemas import (
ChunkRequest, ChunkResponse, SemanticChunk,
ChunkMetadata, ChunkLevel, ContentType
)
# Configuration logging optimisée
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ✅ Wrapper pour rendre le modèle SentenceTransformer compatible avec Chonkie
# class EmbeddingWrapper:
# def __init__(self, model):
# self.model = model
# def get_text_embedding(self, text: str) -> List[float]:
# return self.model.encode([text])[0] # format LlamaIndex
# def encode(self, texts: List[str]) -> List[List[float]]:
# return self.model.encode(texts) # format Chonkie
# class EmbeddingWrapper:
# def __init__(self, model):
# self.model = model
# def encode(self, texts):
# vectors = self.model.encode(texts)
# if isinstance(vectors, list):
# vectors = np.array(vectors)
# return vectors
# def get_text_embedding(self, text):
# vec = self.model.encode([text])
# return np.array(vec[0]) if isinstance(vec, list) else vec[0]
# def get_text_embeddings(self, texts):
# vectors = self.model.encode(texts)
# return np.array(vectors)
class EmbeddingWrapper:
def __init__(self, embedding):
self.embedding = embedding
def encode(self, texts):
# Renvoie la liste des vecteurs depuis l'interface LlamaIndex
return self.embedding._get_text_embeddings(texts)
def get_text_embeddings(self, texts):
# Alias de encode()
return self.encode(texts)
def get_text_embedding(self, text):
# Cas d’un seul texte
return self.encode([text])[0]
# class EmbeddingWrapper(BaseEmbedding):
# def __init__(self, model):
# self.model = model
# def get_text_embedding(self, text: str) -> list:
# vec = self.model.encode([text])
# return vec[0] if isinstance(vec, list) else np.array(vec[0])
# def get_text_embeddings(self, texts: list) -> list:
# vectors = self.model.encode(texts)
# return vectors if isinstance(vectors, list) else np.array(vectors)
# class EmbeddingWrapper:
# def __init__(self, model):
# self.model = model
# def encode(self, texts):
# return self.model.encode(texts) # format Chonkie
# def get_text_embedding(self, text):
# return self.model.encode([text])[0] # format LlamaIndex
# def get_text_embeddings(self, texts):
# return self.model.encode(texts)
# class PatchedSentenceTransformer(SentenceTransformer):
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
@dataclass
class ChunkNode:
"""Représentation interne d'un chunk avec relations hiérarchiques complètes"""
id: str
content: str
level: int
title: Optional[str] = None
parent_id: Optional[str] = None
parent_title: Optional[str] = None # ✅ NOUVEAU: pour format Obsidian
children_ids: List[str] = None
prev_id: Optional[str] = None
next_id: Optional[str] = None
metadata: Dict[str, Any] = None
confidence: float = 1.0
def __post_init__(self):
if self.children_ids is None:
self.children_ids = []
if self.metadata is None:
self.metadata = {}
class SmartChunkerPipeline:
"""
Pipeline principal pour le chunking sémantique intelligent récursif v4.0
NOUVEAUTÉS v4.0:
✅ Chonkie SemanticChunker (fiable) au lieu de LlamaIndex SemanticSplitterNodeParser (buggy)
✅ HuggingFace LLM local gratuit au lieu de OpenAI GPT-4o-mini (payant)
✅ Format Obsidian correct: [[Titre Parent]], parent_id
✅ Variables d'environnement sécurisées pour HF Space
✅ Optimisations mémoire pour Space gratuit (2GB)
✅ Relations bidirectionnelles complètes
"""
def __init__(self, config_path: str = "config.yaml"):
"""Initialisation avec configuration YAML et sécurité renforcée"""
self.config_path = config_path
self.config = self._load_config()
# Modèles IA - NOUVEAUX TYPES v4.0
self.llm = None # ✅ HuggingFace LLM local au lieu d'OpenAI
self.embed_model = None # ✅ HuggingFace embedding local
self.chonkie_semantic = None # ✅ Chonkie SemanticChunker
self.chonkie_recursive = None # ✅ Chonkie RecursiveChunker
self.sentence_splitter = None # ✅ LlamaIndex SentenceSplitter (stable)
self.markdown_parser = None # ✅ MarkdownNodeParser
# Cache et optimisations pour HF Space gratuit
self._embedding_cache = {}
self._concept_cache = {}
self._text_cache = {}
self._chunk_registry = {} # ✅ NOUVEAU: registry pour relations bidirectionnelles
self._is_initialized = False
# Threading optimisé pour Space gratuit (1 worker max)
self.executor = ThreadPoolExecutor(max_workers=1)
# Variables d'environnement sécurisées
self._setup_environment()
logger.info(f"🚀 SmartChunkerPipeline v4.0 initialisé avec config: {config_path}")
def _setup_environment(self):
"""✅ Configuration sécurisée pour Hugging Face Space (mode gratuit, écriture uniquement dans /tmp)"""
import tempfile
import os
tmp_dir = tempfile.gettempdir()
os.environ["HF_HOME"] = os.path.join(tmp_dir, "huggingface")
os.environ["TRANSFORMERS_CACHE"] = os.path.join(tmp_dir, "transformers")
os.environ["HF_HUB_CACHE"] = os.path.join(tmp_dir, "hub")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
cache_dirs = [
os.environ["HF_HOME"],
os.environ["TRANSFORMERS_CACHE"],
os.environ["HF_HUB_CACHE"],
os.path.join(tmp_dir, "llm"),
os.path.join(tmp_dir, "embeddings"),
os.path.join(tmp_dir, "logs")
]
print("HF_HOME:", os.environ.get("HF_HOME"))
print("TRANSFORMERS_CACHE:", os.environ.get("TRANSFORMERS_CACHE"))
for cache_dir in cache_dirs:
try:
os.makedirs(cache_dir, exist_ok=True)
except Exception as e:
logger.warning(f"⚠️ Impossible de créer {cache_dir}: {e}")
def _load_config(self) -> Dict[str, Any]:
"""Chargement configuration YAML avec fallback sécurisé"""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
logger.info(f"✅ Configuration chargée depuis {self.config_path}")
return config
else:
logger.warning(f"⚠️ Config {self.config_path} non trouvée, utilisation config par défaut")
return self._get_default_config()
except Exception as e:
logger.error(f"❌ Erreur chargement config: {e}")
return self._get_default_config()
def _get_default_config(self) -> Dict[str, Any]:
"""Configuration par défaut optimisée v4.0"""
return {
"models": {
"llm": {
# ✅ CORRECTION: Modèle HuggingFace local gratuit
"provider": "huggingface",
"model_name": "llama-2-7b-chat", # Gratuit, local, rapide
"temperature": 0.1,
"max_tokens": 512,
"device": "cpu", # HF Space gratuit = CPU only
"cache_dir": os.path.join(tempfile.gettempdir(), "llm")
},
"embedding": {
"provider": "huggingface",
"model_name": "all-mpnet-base-v2", # Léger et performant
"cache_dir": os.path.join(tempfile.gettempdir(), "embeddings"),
"max_length": 512,
"normalize": True,
"device": "cpu"
}
},
"chunking": {
"chonkie": {
# ✅ Configuration Chonkie SemanticChunker
"semantic": {
"enabled": True,
"threshold": 0.75, # Seuil similarité sémantique
"chunk_size": 512,
"min_sentences": 1,
"max_sentences": 8
},
"recursive": {
"enabled": True,
"chunk_sizes": [2048, 512, 128], # Hiérarchie 3 niveaux
"overlap": 20,
"separators": ["\n\n", "\n", ".", "!", "?"]
}
},
"structure_detection": {
"markdown": {"enabled": True},
"chapters": {
"enabled": True,
"patterns": [
r'CHAPITRE\s+\d+',
r'SECTION\s+\d+',
r'PARTIE\s+\d+',
r'Chapter\s+\d+',
r'^#{1,3}\s+.+$'
]
}
}
},
"obsidian": {
# ✅ CORRECTION: Format correct avec double crochets
"parent_format": "[[{title}]], {id}", # [[Titre Parent]], parent_id
"use_bidirectional_links": True,
"frontmatter_enabled": True,
"backmatter_enabled": True
},
"performance": {
"memory": {
"max_memory_mb": 1800, # Limite HF Space gratuit
"enable_garbage_collection": True,
"cleanup_interval": 50
},
"concurrency": {
"max_workers": 1, # HF Space gratuit limitation
"timeout_seconds": 30
},
"caching": {
"enabled": True,
"max_cache_size_mb": 100
}
}
}
async def initialize(self):
"""Initialisation complète optimisée pour HF Space gratuit v4.0"""
if self._is_initialized:
return
try:
logger.info("🚀 Initialisation chunker intelligent v4.0...")
# 1. Configuration modèles depuis config
llm_config = self.config.get("models", {}).get("llm", {})
embed_config = self.config.get("models", {}).get("embedding", {})
# 2. ✅ CORRECTION: HuggingFace LLM local au lieu d'OpenAI payant
cache_dir_llm = llm_config.get("cache_dir", os.path.join(tempfile.gettempdir(), "llm"))
os.makedirs(cache_dir_llm, exist_ok=True)
self.llm = LlamaCPP(
model_url="https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q4_K_M.gguf",
temperature=0.1,
max_new_tokens=512,
context_window=2048,
generate_kwargs={
"top_p": 0.95,
"top_k": 50,
},
model_kwargs={
"n_gpu_layers": 0,
"cache_dir": cache_dir_llm,
"torch_dtype": "float32",
},
#tokenizer_path=None,
verbose=True,
)
logger.info("✅ Modèle Llama-2 7B Q4_K_M chargé avec succès !")
# 3. ✅ Embedding local HuggingFace optimisé
cache_dir_embed = embed_config.get("cache_dir", os.path.join(tempfile.gettempdir(), "embeddings"))
os.makedirs(cache_dir_embed, exist_ok=True)
# ✅ Chargement via HuggingFaceEmbedding compatible LlamaIndex
# self.embed_model = HuggingFaceEmbedding(
# model_name=embed_config.get("model_name", "sentence-transformers/all-MiniLM-L6-v2"),
# device="cpu", # Ou "cuda" si GPU dans ton Space
# cache_folder=cache_dir_embed,
# )
# self.embed_model = SentenceTransformer(
# embed_config.get("model_name", "thenlper/gte-small"),
# cache_folder=cache_dir_embed
# )
# self.embed_model.max_seq_length = embed_config.get("max_length", 512)
# self.embed_model = PatchedSentenceTransformer(
# embed_config.get("model_name", "all-mpnet-base-v2"),
# cache_folder=cache_dir_embed
# )
# self.embed_model.max_seq_length = embed_config.get("max_length", 512)
# ✅ Chargement du modèle embedding
# raw_model = SentenceTransformer(
# embed_config.get("model_name", "sentence-transformers/all-MiniLM-L6-v2"),
# cache_folder=cache_dir_embed
# )
# raw_model.max_seq_length = embed_config.get("max_length", 512)
# # ✅ Encapsulation dans le wrapper compatible Chonkie
# self.embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
# Settings.embed_model = self.embed_model
# ✅ Embedding pour LlamaIndex (HuggingFaceEmbedding encapsule déjà BaseEmbedding)
sentence_model_name = embed_config.get("model_name", "sentence-transformers/all-MiniLM-L6-v2")
self.embed_model = HuggingFaceEmbedding(model_name=sentence_model_name)
# ✅ Nécessaire pour que LlamaIndex fonctionne
Settings.embed_model = self.embed_model
#embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
# self.embed_model = EmbeddingWrapper(embedding_model)
# Settings.embed_model = self.embed_model
# self.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")
# self.embed_model = EmbeddingWrapper(HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2"))
# Settings.embed_model = self.embed_model
# 🔐 Test de sécurité complet
# try:
# test_vec = self.embed_model.encode(["test phrase"])
# if not isinstance(test_vec, (list, tuple)) or len(test_vec) == 0:
# raise ValueError("❌ encode() a retourné un vecteur vide ou invalide")
# if not hasattr(self.embed_model, "get_text_embedding"):
# raise ValueError("❌ Le modèle n’implémente pas get_text_embedding() (incompatible avec Chonkie)")
# except Exception as e:
# raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}")
# ✅ Test de sécurité complet – version LlamaIndex v0.12
try:
if not isinstance(self.embed_model, BaseEmbedding):
raise ValueError("❌ Le modèle d'embedding n'est pas une instance de BaseEmbedding")
logger.info("✅ Le modèle d'embedding est conforme à BaseEmbedding (test réussi)")
except Exception as e:
raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}")
# try:
# #test_vector = self.embed_model.embed(["test chunk"])
# test_vector = self.embed_model.get_text_embeddings(["test chunk"])
# # test_vector = self.embed_model.get_text_embeddings(["test chunk"])
# # test_vector = self.embed_model.encode(["test chunk"])
# if not isinstance(test_vector, (list, np.ndarray)) or len(test_vector) == 0:
# raise ValueError("❌ encode() a retourné un vecteur vide (niveau 1)")
# if isinstance(test_vector[0], (list, np.ndarray)) and len(test_vector[0]) == 0:
# raise ValueError("❌ encode() a retourné un vecteur vide (niveau 2)")
# if isinstance(test_vector, list) and isinstance(test_vector[0], float):
#
# # Cas particulier : encode() renvoie un seul vecteur (non batché)
# logger.warning("⚠️ encode() a retourné un seul vecteur — test accepté par tolérance")
#
# logger.info("✅ Test de sécurité embedding réussi")
#
# except Exception as e:
# raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}")
# ✅ Test de sécurité pour HuggingFaceEmbedding
# try:
# test_vec = self.embed_model.get_text_embedding("test phrase")
# if not test_vec or not isinstance(test_vec, list) or len(test_vec) == 0:
# raise ValueError("Embedding.get_text_embedding() n’a pas renvoyé de vecteur valide")
# except Exception as e:
# raise ValueError(f"❌ Le modèle d'embedding a échoué au test get_text_embedding(): {e}")
# ✅ Test de sécurité réel encode ()
# try:
# test_vec = self.embed_model.encode(["test phrase"])
# if not hasattr(test_vec, "__len__") or len(test_vec) == 0:
# raise ValueError("Embedding encode() n’a pas renvoyé de vecteur valide")
# except Exception as e:
# raise ValueError(f"❌ Le modèle d'embedding a échoué au test encode(): {e}")
# if not self.embed_model or not hasattr(self.embed_model, "get_text_embedding"):
# raise ValueError(f"{self.embed_model} is not a valid embedding model")
# 4. Configuration Settings LlamaIndex v0.12
Settings.llm = self.llm
Settings.embed_model = self.embed_model
Settings.chunk_size = 512
Settings.chunk_overlap = 20
# 5. ✅ Chonkie SemanticChunker (remplacement SemanticSplitterNodeParser)
if CHONKIE_AVAILABLE:
await self._init_chonkie_chunkers()
else:
logger.warning("⚠️ Chonkie non disponible - utilisation LlamaIndex uniquement")
# 6. Parsers LlamaIndex stables
await self._init_llamaindex_parsers()
self._is_initialized = True
logger.info("✅ SmartChunkerPipeline v4.0 initialisé avec succès")
except Exception as e:
logger.error(f"❌ Erreur initialisation chunker v4.0: {e}")
raise
async def _init_chonkie_chunkers(self):
"""
Initialise les chunkers Chonkie : SemanticChunker et RecursiveChunker.
Gestion des erreurs avec fallback propre à None si l'initialisation échoue.
"""
semantic_config = self.config.get("chunking", {}).get("chonkie", {}).get("semantic", {})
recursive_config = self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {})
# 🔹 Initialisation du SemanticChunker (embeddings denses via SentenceTransformers)
try:
embedding_model = self.embed_model # Doit être déjà chargé dans initialize()
# 🔐 Test de sécurité : encode() fonctionne et renvoie un vecteur correct
# try:
# test_vec = embedding_model.encode(["test phrase"])
# if not hasattr(test_vec, "__len__") or len(test_vec) == 0:
# raise ValueError("❌ encode() a retourné un vecteur vide ou invalide")
# except Exception as e:
# raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}")
self.chonkie_semantic = SemanticChunker(
# embedding_model=embedding_model,
# embed_model=self.embed_model, # ✅ Passe bien un objet BaseEmbedding
embedding_model=self.embed_model,
threshold=semantic_config.get("threshold", 0.75),
chunk_size=semantic_config.get("chunk_size", 512),
min_sentences=semantic_config.get("min_sentences", 1)
)
logger.info("✅ SemanticChunker (Chonkie) initialisé avec succès")
except Exception as e:
logger.warning(f"⚠️ Erreur initialisation Chonkie SemanticChunker: {e}")
self.chonkie_semantic = None
# 🔹 Initialisation du RecursiveChunker (avec hiérarchie)
try:
self.chonkie_recursive = RecursiveChunker(
# self.chonkie_recursive = ChonkieRecursiveChunker(
# chunk_sizes=recursive_config.get("chunk_sizes", [2048, 512, 128]),
# separators=recursive_config.get("separators", ["\n\n", "\n", ".", "!", "?", "—"]),
# shrink_size=recursive_config.get("shrink_size", None),
# preserve_separators=recursive_config.get("preserve_separators", False),
include_raw_chunks=recursive_config.get("include_raw_chunks", False)
)
logger.info("✅ RecursiveChunker (Chonkie) initialisé avec succès")
except Exception as e:
logger.warning(f"⚠️ Erreur initialisation Chonkie RecursiveChunker: {e}")
self.chonkie_recursive = None
# try:
# embedding_model = self.embed_model # Doit être déjà chargé dans initialize()
# if not embedding_model or not callable(getattr(embedding_model, "encode", None)) \
# or not callable(getattr(embedding_model, "get_sentence_embedding_dimension", None)):
# raise ValueError(f"{embedding_model} is not a valid embedding model")
# self.chonkie_semantic = SemanticChunker(
# embedding_model=embedding_model,
# threshold=semantic_config.get("threshold", 0.75),
# chunk_size=semantic_config.get("chunk_size", 512),
# min_sentences=semantic_config.get("min_sentences", 1)
# )
# logger.info("✅ SemanticChunker (Chonkie) initialisé avec succès")
# except Exception as e:
# logger.warning(f"⚠️ Erreur initialisation Chonkie SemanticChunker: {e}")
# self.chonkie_semantic = None
async def _init_llamaindex_parsers(self):
"""Initialisation parsers LlamaIndex v0.12 STABLES"""
# SentenceSplitter - stable et fiable
self.sentence_splitter = SentenceSplitter(
chunk_size=512,
chunk_overlap=20,
include_metadata=True,
include_prev_next_rel=True
)
# MarkdownNodeParser - pour détection structure
self.markdown_parser = MarkdownNodeParser()
logger.info("✅ Parsers LlamaIndex v0.12 initialisés")
async def process_text(self, request: ChunkRequest) -> ChunkResponse:
"""
Point d'entrée principal - Workflow complet v4.0
Chunking récursif intelligent avec Chonkie + LlamaIndex optimisé
"""
start_time = time.time()
try:
if not self._is_initialized:
await self.initialize()
logger.info(f"📝 Début chunking intelligent v4.0: {request.titre or 'Sans titre'}")
# 1. Preprocessing et nettoyage amélioré
cleaned_text = self._preprocess_text_v4(request.text)
# 2. Détection structure automatique avancée
documents = await self._detect_structure_v4(cleaned_text, request)
# 3. ✅ Chunking hiérarchique avec Chonkie (si disponible)
if CHONKIE_AVAILABLE and self.chonkie_recursive:
hierarchical_chunks = await self._apply_chonkie_hierarchical_chunking(documents, request)
else:
hierarchical_chunks = await self._apply_llamaindex_hierarchical_chunking(documents, request)
# 4. ✅ Chunking sémantique avec Chonkie SemanticChunker
if CHONKIE_AVAILABLE and self.chonkie_semantic:
semantic_chunks = await self._apply_chonkie_semantic_chunking(hierarchical_chunks, request)
else:
semantic_chunks = await self._apply_fallback_semantic_chunking(hierarchical_chunks, request)
# 5. ✅ Construction relations bidirectionnelles complètes
enriched_chunks = await self._build_bidirectional_relationships_v4(semantic_chunks)
# 6. Extraction concepts et métadonnées intelligentes
final_chunks = await self._enrich_with_intelligence_v4(enriched_chunks, request)
# 7. ✅ Génération exports avec format Obsidian corrigé
exports = await self._generate_exports_v4(final_chunks, request)
processing_time = time.time() - start_time
# 8. Nettoyage mémoire automatique HF Space
if self.config.get("performance", {}).get("memory", {}).get("enable_garbage_collection", True):
await self._cleanup_memory_v4()
# Construction réponse finale
response = ChunkResponse(
chunks=final_chunks,
hierarchy=self._build_hierarchy_levels_v4(final_chunks),
total_chunks=len(final_chunks),
total_tokens=sum(c.metadata.tokens_count for c in final_chunks),
processing_time=processing_time,
source_metadata=self._build_source_metadata_v4(request),
concept_graph=exports.get("concept_graph", {}),
obsidian_export=exports.get("obsidian"),
agent_knowledge=exports.get("agents")
)
logger.info(f"✅ Chunking v4.0 terminé: {len(final_chunks)} chunks en {processing_time:.2f}s")
return response
except Exception as e:
logger.error(f"❌ Erreur chunking v4.0: {e}")
raise
def _preprocess_text_v4(self, text: str) -> str:
"""Preprocessing amélioré v4.0 avec détection patterns avancés"""
# Normalisation base
text = re.sub(r'\r\n|\r', '\n', text)
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
# ✅ NOUVEAU v4.0: Nettoyage patterns spécifiques
# Suppression références inutiles
text = re.sub(r'\[?\d+\]?', '', text) # Références numériques [1], [2]
text = re.sub(r'http[s]?://\S+', '[URL]', text) # URLs anonymisées
# Normalisation caractères spéciaux
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
text = text.replace('–', '-').replace('—', '-')
# ✅ Préservation structure importante
# Protéger patterns structurels importants
text = re.sub(r'^(CHAPITRE|SECTION|PARTIE)\s+', r'\n\n\1 ', text, flags=re.MULTILINE | re.IGNORECASE)
return text.strip()
async def _detect_structure_v4(self, text: str, request: ChunkRequest) -> List[Document]:
"""Détection structure automatique améliorée v4.0"""
documents = []
structure_config = self.config.get("chunking", {}).get("structure_detection", {})
# 1. ✅ Détection Markdown avancée
if structure_config.get("markdown", {}).get("enabled", True) and self._has_markdown_structure_v4(text):
logger.info("📄 Structure Markdown détectée (v4.0)")
documents = await self._split_markdown_structure_v4(text, request)
# 2. ✅ Détection chapitres/sections améliorée
elif structure_config.get("chapters", {}).get("enabled", True) and self._has_chapter_structure_v4(text):
logger.info("📚 Structure chapitres détectée (v4.0)")
documents = await self._split_by_chapters_v4(text, request)
# 3. ✅ Fallback intelligent par paragraphes
else:
logger.info("📝 Texte brut - découpage intelligent par paragraphes (v4.0)")
documents = await self._split_by_paragraphs_v4(text, request)
return documents
def _has_markdown_structure_v4(self, text: str) -> bool:
"""Détection Markdown améliorée avec scoring pondéré v4.0"""
patterns_weighted = [
(r'^#{1,6}\s+.+$', 5), # Titres (poids fort)
(r'^\*\*.*\*\*$', 2), # Gras
(r'^\*\s+.+$', 2), # Listes puces
(r'^\d+\.\s+.+$', 2), # Listes numérotées
(r'```[\s\S]*?```', 3), # Code blocks
(r'^\|.*\|$', 2), # Tableaux
(r'\[.*\]\(.*\)', 1), # Liens
(r'^>\s+.+$', 1), # Citations
]
total_score = 0
total_lines = len(text.split('\n'))
for pattern, weight in patterns_weighted:
matches = len(re.findall(pattern, text, re.MULTILINE))
total_score += matches * weight
# Score normalisé
score_threshold = self.config.get("chunking", {}).get("structure_detection", {}).get("markdown", {}).get("minimum_score", 0.15)
normalized_score = (total_score / total_lines) if total_lines > 0 else 0
return normalized_score > score_threshold
def _has_chapter_structure_v4(self, text: str) -> bool:
"""Détection chapitres améliorée v4.0"""
patterns = self.config.get("chunking", {}).get("structure_detection", {}).get("chapters", {}).get("patterns", [])
chapter_count = 0
for pattern in patterns:
matches = len(re.findall(pattern, text, re.IGNORECASE | re.MULTILINE))
chapter_count += matches
# Seuil adaptatif selon longueur texte
text_length = len(text.split())
if text_length < 1000:
min_chapters = 2
elif text_length < 5000:
min_chapters = 3
else:
min_chapters = 4
return chapter_count >= min_chapters
async def _split_by_paragraphs_v4(self, text: str, request: ChunkRequest) -> List[Document]:
"""✅ NOUVEAU: Fallback intelligent par paragraphes v4.0"""
documents = []
# Division par paragraphes avec logique intelligente
paragraphs = re.split(r'\n\s*\n', text)
current_section = ""
section_index = 0
for para_idx, paragraph in enumerate(paragraphs):
paragraph = paragraph.strip()
if not paragraph:
continue
# Logique de regroupement intelligent
# Si paragraphe court (< 100 mots), regrouper avec suivant
word_count = len(paragraph.split())
if word_count < 100 and para_idx < len(paragraphs) - 1:
current_section += paragraph + "\n\n"
else:
current_section += paragraph
# Créer document si assez de contenu
if len(current_section.split()) >= 50: # Minimum 50 mots
documents.append(Document(
text=current_section.strip(),
metadata={
"structure_type": "paragraph_group",
"section_index": section_index,
"word_count": len(current_section.split()),
"source_id": request.source_id,
"titre": request.titre,
"level": 0
}
))
section_index += 1
current_section = ""
# Dernier section si non vide
if current_section.strip() and len(current_section.split()) >= 20:
documents.append(Document(
text=current_section.strip(),
metadata={
"structure_type": "paragraph_group",
"section_index": section_index,
"word_count": len(current_section.split()),
"source_id": request.source_id,
"titre": request.titre,
"level": 0
}
))
return documents
"""
Chunker Pipeline v4.0 - PARTIE 2 - CORRIGÉ ET COMPLET
Suite et fin de chunker_pipeline_v4_part1.py
CONTINUATION DES MÉTHODES:
✅ Chunking Chonkie hiérarchique et sémantique
✅ Relations bidirectionnelles complètes
✅ Export Obsidian avec format [[Titre]], id
✅ Génération agents spécialisés
✅ Nettoyage mémoire optimisé HF Space
✅ Health check et monitoring
"""
async def _split_markdown_structure_v4(self, text: str, request: ChunkRequest) -> List[Document]:
"""Division Markdown hiérarchique améliorée v4.0"""
documents = []
# Utilisation MarkdownNodeParser de LlamaIndex
try:
markdown_docs = [Document(text=text)]
nodes = self.markdown_parser.get_nodes_from_documents(markdown_docs)
for node_idx, node in enumerate(nodes):
# Détection niveau hiérarchique depuis le contenu
title_match = re.search(r'^(#{1,6})\s+(.+)$', node.text, re.MULTILINE)
level = len(title_match.group(1)) if title_match else 0
detected_title = title_match.group(2).strip() if title_match else None
documents.append(Document(
text=node.text,
metadata={
"structure_type": "markdown",
"detected_title": detected_title,
"level": level,
"node_index": node_idx,
"source_id": request.source_id,
"titre": request.titre,
**node.metadata
}
))
except Exception as e:
logger.warning(f"⚠️ Erreur parsing Markdown: {e}")
# Fallback manuel
return await self._split_by_paragraphs_v4(text, request)
return documents
async def _split_by_chapters_v4(self, text: str, request: ChunkRequest) -> List[Document]:
"""Division par chapitres avec patterns configurables v4.0"""
documents = []
patterns = self.config.get("chunking", {}).get("structure_detection", {}).get("chapters", {}).get("patterns", [])
# Pattern combiné pour division
combined_pattern = '|'.join(f'({p})' for p in patterns)
try:
parts = re.split(f'({combined_pattern})', text, flags=re.IGNORECASE | re.MULTILINE)
current_title = None
current_content = ""
chapter_index = 0
for part in parts:
part = part.strip()
if not part:
continue
# Vérifier si c'est un titre de chapitre
is_title = any(re.match(pattern, part, re.IGNORECASE) for pattern in patterns)
if is_title:
# Sauvegarder chapitre précédent
if current_content.strip() and len(current_content.split()) >= 30:
documents.append(Document(
text=current_content.strip(),
metadata={
"structure_type": "chapter",
"detected_title": current_title,
"level": 1,
"chapter_index": chapter_index,
"word_count": len(current_content.split()),
"source_id": request.source_id,
"titre": request.titre
}
))
chapter_index += 1
current_title = part
current_content = part + "\n\n"
else:
current_content += part + "\n"
# Dernier chapitre
if current_content.strip() and len(current_content.split()) >= 20:
documents.append(Document(
text=current_content.strip(),
metadata={
"structure_type": "chapter",
"detected_title": current_title,
"level": 1,
"chapter_index": chapter_index,
"word_count": len(current_content.split()),
"source_id": request.source_id,
"titre": request.titre
}
))
except Exception as e:
logger.warning(f"⚠️ Erreur parsing chapitres: {e}")
return await self._split_by_paragraphs_v4(text, request)
return documents
async def _apply_chonkie_hierarchical_chunking(self, documents: List[Document], request: ChunkRequest) -> List[ChunkNode]:
"""✅ Chunking hiérarchique avec Chonkie RecursiveChunker v4.0"""
all_chunks = []
for doc_idx, document in enumerate(documents):
try:
# Utilisation Chonkie RecursiveChunker
chunks = self.chonkie_recursive.chunk(document.text)
for chunk_idx, chunk in enumerate(chunks):
chunk_node = ChunkNode(
id=self._generate_chunk_id_v4(chunk.text, doc_idx, 0, chunk_idx),
content=chunk.text,
level=0, # Niveau base pour Chonkie
title=document.metadata.get("detected_title"),
metadata={
**document.metadata,
"chunker": "chonkie_recursive",
"doc_index": doc_idx,
"chunk_index": chunk_idx,
"token_count": len(chunk.text.split()),
"original_chunk_size": getattr(chunk, 'token_count', len(chunk.text.split()))
}
)
all_chunks.append(chunk_node)
# Enregistrement dans registry pour relations
self._chunk_registry[chunk_node.id] = chunk_node
except Exception as e:
logger.warning(f"⚠️ Erreur Chonkie hierarchical chunking doc {doc_idx}: {e}")
# Fallback vers LlamaIndex
fallback_chunks = await self._apply_llamaindex_hierarchical_chunking([document], request)
all_chunks.extend(fallback_chunks)
return all_chunks
async def _apply_llamaindex_hierarchical_chunking(self, documents: List[Document], request: ChunkRequest) -> List[ChunkNode]:
"""Fallback chunking hiérarchique avec LlamaIndex SentenceSplitter v4.0"""
all_chunks = []
chunk_sizes = self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}).get("chunk_sizes", [2048, 512, 128])
for doc_idx, document in enumerate(documents):
try:
# Application chunking multi-niveaux
for level, chunk_size in enumerate(chunk_sizes):
# Configuration splitter pour ce niveau
splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=20,
include_metadata=True,
include_prev_next_rel=True
)
# Application du splitter
if level == 0:
nodes = splitter.get_nodes_from_documents([document])
else:
# Subdiviser chunks du niveau précédent
prev_level_chunks = [c for c in all_chunks if c.level == level - 1 and c.metadata.get("doc_index") == doc_idx]
nodes = []
for parent_chunk in prev_level_chunks:
sub_doc = Document(text=parent_chunk.content, metadata=parent_chunk.metadata)
sub_nodes = splitter.get_nodes_from_documents([sub_doc])
for sub_node in sub_nodes:
sub_node.metadata["parent_chunk_id"] = parent_chunk.id
nodes.append(sub_node)
# Conversion en ChunkNode
for node_idx, node in enumerate(nodes):
chunk_node = ChunkNode(
id=self._generate_chunk_id_v4(node.text, doc_idx, level, node_idx),
content=node.text,
level=level,
title=document.metadata.get("detected_title"),
parent_id=node.metadata.get("parent_chunk_id"),
metadata={
**node.metadata,
"chunker": "llamaindex_sentence",
"doc_index": doc_idx,
"level": level,
"node_index": node_idx,
"chunk_size_used": chunk_size
}
)
all_chunks.append(chunk_node)
# Enregistrement dans registry
self._chunk_registry[chunk_node.id] = chunk_node
except Exception as e:
logger.warning(f"⚠️ Erreur LlamaIndex hierarchical chunking doc {doc_idx}: {e}")
# Fallback simple
fallback_chunk = ChunkNode(
id=f"fallback_{doc_idx}_{int(time.time())}",
content=document.text,
level=0,
title=document.metadata.get("detected_title"),
metadata={"fallback": True, "doc_index": doc_idx}
)
all_chunks.append(fallback_chunk)
self._chunk_registry[fallback_chunk.id] = fallback_chunk
return all_chunks
async def _apply_chonkie_semantic_chunking(self, chunk_nodes: List[ChunkNode], request: ChunkRequest) -> List[ChunkNode]:
"""✅ Chunking sémantique avec Chonkie SemanticChunker v4.0"""
semantic_chunks = []
for chunk_node in chunk_nodes:
try:
# Vérifier si chunking sémantique nécessaire
if len(chunk_node.content.split()) < 20: # Trop petit
semantic_chunks.append(chunk_node)
continue
# Application Chonkie SemanticChunker
chunks = self.chonkie_semantic.chunk(chunk_node.content)
# Si un seul chunk retourné, garder l'original
if len(chunks) <= 1:
semantic_chunks.append(chunk_node)
continue
# Conversion des chunks sémantiques
for sem_idx, chunk in enumerate(chunks):
semantic_chunk = ChunkNode(
id=f"{chunk_node.id}_sem_{sem_idx}",
content=chunk.text,
level=chunk_node.level + 1,
title=chunk_node.title,
parent_id=chunk_node.id,
parent_title=chunk_node.title, # ✅ Pour format Obsidian
metadata={
**chunk_node.metadata,
"chunker": "chonkie_semantic",
"semantic_index": sem_idx,
"parent_chunk_id": chunk_node.id,
"semantic_similarity": getattr(chunk, 'similarity_score', 0.75)
}
)
semantic_chunks.append(semantic_chunk)
# Mise à jour relations parent
if semantic_chunk.id not in chunk_node.children_ids:
chunk_node.children_ids.append(semantic_chunk.id)
# Enregistrement registry
self._chunk_registry[semantic_chunk.id] = semantic_chunk
except Exception as e:
logger.warning(f"⚠️ Erreur Chonkie semantic chunking {chunk_node.id}: {e}")
semantic_chunks.append(chunk_node)
return semantic_chunks
async def _apply_fallback_semantic_chunking(self, chunk_nodes: List[ChunkNode], request: ChunkRequest) -> List[ChunkNode]:
"""Fallback chunking sémantique sans Chonkie v4.0"""
semantic_chunks = []
for chunk_node in chunk_nodes:
try:
# Chunking simple par phrases si texte assez long
if len(chunk_node.content.split()) >= 50:
sentences = self._split_into_sentences_v4(chunk_node.content)
# Regroupement par groupes de 3-5 phrases
buffer_size = min(5, max(2, len(sentences) // 3))
sentence_groups = [sentences[i:i+buffer_size] for i in range(0, len(sentences), buffer_size)]
for group_idx, group in enumerate(sentence_groups):
if len(group) == 0:
continue
group_text = ' '.join(group)
if len(group_text.split()) < 10: # Trop petit
continue
semantic_chunk = ChunkNode(
id=f"{chunk_node.id}_fallback_sem_{group_idx}",
content=group_text,
level=chunk_node.level + 1,
title=chunk_node.title,
parent_id=chunk_node.id,
parent_title=chunk_node.title,
metadata={
**chunk_node.metadata,
"chunker": "fallback_semantic",
"semantic_index": group_idx,
"sentences_count": len(group)
}
)
semantic_chunks.append(semantic_chunk)
# Mise à jour relations
if semantic_chunk.id not in chunk_node.children_ids:
chunk_node.children_ids.append(semantic_chunk.id)
self._chunk_registry[semantic_chunk.id] = semantic_chunk
else:
# Garder chunk original si trop petit
semantic_chunks.append(chunk_node)
except Exception as e:
logger.warning(f"⚠️ Erreur fallback semantic chunking {chunk_node.id}: {e}")
semantic_chunks.append(chunk_node)
return semantic_chunks
def _split_into_sentences_v4(self, text: str) -> List[str]:
"""Division en phrases améliorée v4.0"""
# Patterns pour fins de phrases
sentence_endings = r'[.!?]+(?:\s|$|")'
# Split avec préservation des points dans acronymes
sentences = re.split(sentence_endings, text)
# Nettoyage et filtrage
clean_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 15 and not re.match(r'^[A-Z]{2,}\.?$', sentence): # Éviter acronymes
clean_sentences.append(sentence)
return clean_sentences
async def _build_bidirectional_relationships_v4(self, chunks: List[ChunkNode]) -> List[ChunkNode]:
"""✅ Construction relations bidirectionnelles complètes v4.0"""
# 1. Tri par niveau et index pour relations séquentielles
sorted_chunks = sorted(chunks, key=lambda x: (x.level, x.metadata.get("node_index", 0)))
# 2. Construction relations prev/next par niveau
level_groups = {}
for chunk in sorted_chunks:
level = chunk.level
if level not in level_groups:
level_groups[level] = []
level_groups[level].append(chunk)
for level, level_chunks in level_groups.items():
for i, chunk in enumerate(level_chunks):
# Relations précédent/suivant
if i > 0:
chunk.prev_id = level_chunks[i-1].id
if i < len(level_chunks) - 1:
chunk.next_id = level_chunks[i+1].id
# 3. Validation et correction relations parent/enfant
for chunk in chunks:
# Validation parent existe
if chunk.parent_id and chunk.parent_id in self._chunk_registry:
parent = self._chunk_registry[chunk.parent_id]
# Mise à jour titre parent pour Obsidian
chunk.parent_title = parent.title or parent.metadata.get("detected_title") or f"Chunk {parent.id[:8]}"
# Ajout enfant dans parent
if chunk.id not in parent.children_ids:
parent.children_ids.append(chunk.id)
# Validation enfants existent
valid_children = []
for child_id in chunk.children_ids:
if child_id in self._chunk_registry:
child = self._chunk_registry[child_id]
child.parent_id = chunk.id
child.parent_title = chunk.title or chunk.metadata.get("detected_title") or f"Chunk {chunk.id[:8]}"
valid_children.append(child_id)
chunk.children_ids = valid_children
return chunks
async def _enrich_with_intelligence_v4(self, chunks: List[ChunkNode], request: ChunkRequest) -> List[SemanticChunk]:
"""Enrichissement intelligent avec concepts et métadonnées v4.0"""
semantic_chunks = []
for chunk_idx, chunk_node in enumerate(chunks):
# Génération métadonnées enrichies
metadata = ChunkMetadata(
chunk_id=chunk_node.id,
level=chunk_node.level,
level_name=self._determine_chunk_level_v4(chunk_node.level),
parent_id=chunk_node.parent_id,
children_ids=chunk_node.children_ids,
prev_id=chunk_node.prev_id,
next_id=chunk_node.next_id,
global_index=chunk_idx,
local_index=chunk_node.metadata.get("node_index", 0),
source_id=request.source_id,
source_title=request.titre,
source_url=request.source,
content_type=request.type or ContentType.TEXT,
tokens_count=len(chunk_node.content.split()),
sentences_count=len(chunk_node.content.split('.')),
detected_title=chunk_node.title or chunk_node.metadata.get("detected_title"),
main_concepts=[],
keywords=[],
chunk_type=None,
confidence_score=chunk_node.confidence,
contextual_summary=None,
related_chunks=[]
)
# Création SemanticChunk
semantic_chunk = SemanticChunk(
content=chunk_node.content,
metadata=metadata,
embedding=None, # Optionnel pour économie bande passante
similarity_scores={}
)
# Enrichissement intelligent avec LLM (si disponible et activé)
if (request.include_metadata and
self.llm and
len(chunk_node.content.split()) >= 20): # Seulement si chunk assez long
try:
await self._extract_semantic_intelligence_v4(semantic_chunk)
except Exception as e:
logger.warning(f"⚠️ Extraction intelligente échouée {chunk_node.id}: {e}")
# Fallback extraction simple
await self._extract_simple_keywords_v4(semantic_chunk)
else:
# Extraction simple par défaut
await self._extract_simple_keywords_v4(semantic_chunk)
semantic_chunks.append(semantic_chunk)
return semantic_chunks
async def _extract_semantic_intelligence_v4(self, chunk: SemanticChunk):
"""Extraction sémantique avancée avec LLM local v4.0"""
try:
# Prompt optimisé pour modèle local
prompt = f"""Analyser ce texte et extraire:
1. 3 concepts principaux (séparés par virgules)
2. 5 mots-clés (séparés par virgules)
3. Type: concept/principe/méthode/exemple/définition
Texte: {chunk.content[:400]}
Format:
Concepts: concept1, concept2, concept3
Mots-clés: mot1, mot2, mot3, mot4, mot5
Type: type_détecté"""
response = await self.llm.acomplete(prompt)
result_text = response.text
# Parsing robuste
concepts = []
keywords = []
chunk_type = "concept"
# Extraction concepts
if "Concepts:" in result_text:
concepts_line = result_text.split("Concepts:")[1].split("\n")[0]
concepts = [c.strip() for c in concepts_line.split(",") if c.strip()][:3]
# Extraction mots-clés
if "Mots-clés:" in result_text:
keywords_line = result_text.split("Mots-clés:")[1].split("\n")[0]
keywords = [k.strip() for k in keywords_line.split(",") if k.strip()][:5]
# Extraction type
if "Type:" in result_text:
type_line = result_text.split("Type:")[1].split("\n")[0]
extracted_type = type_line.strip().lower()
valid_types = ["concept", "principe", "méthode", "exemple", "définition", "framework"]
if extracted_type in valid_types:
chunk_type = extracted_type
# Mise à jour chunk
chunk.metadata.main_concepts = concepts
chunk.metadata.keywords = keywords
chunk.metadata.chunk_type = chunk_type
except Exception as e:
logger.warning(f"⚠️ Extraction LLM échouée: {e}")
await self._extract_simple_keywords_v4(chunk)
async def _extract_simple_keywords_v4(self, chunk: SemanticChunk):
"""Extraction simple mots-clés par fréquence v4.0"""
import collections
# Stop words français et anglais
stop_words = {
"le", "la", "les", "un", "une", "des", "de", "du", "et", "ou", "mais", "donc", "car",
"pour", "par", "avec", "sans", "dans", "sur", "sous", "ce", "cette", "ces", "il",
"elle", "ils", "elles", "que", "qui", "quoi", "dont", "où", "the", "a", "an", "and",
"or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "this", "that", "is", "are"
}
# Extraction mots significatifs
words = re.findall(r'\b[a-zA-ZÀ-ÿ]{3,}\b', chunk.content.lower())
words = [w for w in words if w not in stop_words and len(w) > 2]
# Comptage fréquences
word_counts = collections.Counter(words)
top_words = [word for word, count in word_counts.most_common(5)]
# Extraction concepts simples (mots capitalisés ou répétés)
concept_candidates = re.findall(r'\b[A-ZÀ-Ÿ][a-zA-ZÀ-ÿ]{4,}\b', chunk.content)
concepts = list(set(concept_candidates))[:3]
# Mise à jour
chunk.metadata.keywords = top_words
chunk.metadata.main_concepts = concepts if concepts else top_words[:3]
chunk.metadata.chunk_type = "concept"
def _determine_chunk_level_v4(self, level: int) -> ChunkLevel:
"""Mapping niveau vers ChunkLevel enum v4.0"""
mapping = {
0: ChunkLevel.DOCUMENT,
1: ChunkLevel.CHAPTER,
2: ChunkLevel.SECTION,
3: ChunkLevel.SUBSECTION,
4: ChunkLevel.CONCEPT,
5: ChunkLevel.DETAIL
}
return mapping.get(level, ChunkLevel.DETAIL)
def _generate_chunk_id_v4(self, text: str, doc_idx: int, level: int, node_idx: int) -> str:
"""Génération ID unique traçable v4.0"""
content_hash = hashlib.md5(text.encode()).hexdigest()[:8]
timestamp = int(time.time()) % 10000
return f"chk_{doc_idx:02d}_{level}_{node_idx:03d}_{content_hash}_{timestamp}"
async def _generate_exports_v4(self, chunks: List[SemanticChunk], request: ChunkRequest) -> Dict[str, Any]:
"""Génération exports Second Cerveau et Agents v4.0"""
exports = {}
# Export Obsidian avec format corrigé
exports["obsidian"] = await self._generate_obsidian_export_v4(chunks, request)
# Export Agents spécialisés
exports["agents"] = await self._generate_agent_knowledge_v4(chunks, request)
# Export graphe concepts
exports["concept_graph"] = self._extract_concept_graph_v4(chunks)
return exports
async def _generate_obsidian_export_v4(self, chunks: List[SemanticChunk], request: ChunkRequest) -> Dict[str, Any]:
"""✅ Export Obsidian avec format [[Titre]], id corrigé v4.0"""
obsidian_config = self.config.get("obsidian", {})
parent_format = obsidian_config.get("parent_format", "[[{title}]], {id}")
notes = []
for chunk in chunks:
# ✅ Format parent corrigé selon tes spécifications
parent_link = None
if chunk.metadata.parent_id and chunk.metadata.get("parent_id") in self._chunk_registry:
parent_chunk = self._chunk_registry[chunk.metadata.get("parent_id")]
parent_title = parent_chunk.title or parent_chunk.metadata.get("detected_title") or f"Chunk {parent_chunk.id[:8]}"
# parent_title = parent_chunk.title or parent_chunk.metadata.detected_title or f"Chunk {parent_chunk.id[:8]}"
parent_link = parent_format.format(
title=parent_title,
id=chunk.metadata.get("parent_id")
)
# Construction note complète
note = {
"filename": f"{chunk.metadata.chunk_id}.md",
# "title": chunk.metadata.detected_title or f"Note {chunk.metadata.chunk_id[:8]}",
"title": chunk.metadata.get("detected_title") or f"Note {chunk.metadata.chunk_id[:8]}",
"content": chunk.content,
# Front matter
"frontmatter": {
"id": chunk.metadata.get("chunk_id"),
"title": chunk.metadata.get("detected_title"),
"level": chunk.metadata.level_name.value,
"concepts": chunk.metadata.get("main_concepts"),
"tags": chunk.metadata.get("keywords"),
"source": chunk.metadata.get("source_title"),
"source_url": chunk.metadata.get("source_url"),
"created": time.strftime("%Y-%m-%d"),
"type": chunk.metadata.get("chunk_type"),
"confidence": chunk.metadata.get("confidence_score"),
"tokens": chunk.metadata.get("tokens_count")
},
# ✅ Back matter avec format corrigé
"backmatter": {
"basé_sur": parent_link, # Format: [[Titre Parent]], parent_id
"parent_id": chunk.metadata.get("parent_id"),
"enfants": [
parent_format.format(
title=self._get_chunk_title_by_id(child_id),
id=child_id
) for child_id in chunk.metadata.get("children_ids")
],
"précédent": chunk.metadata.get("prev_id"),
"suivant": chunk.metadata.get("next_id"),
"niveau": chunk.metadata.get("level"),
"confiance": chunk.metadata.get("confidence_score")
}
}
notes.append(note)
return {
"format": "obsidian_vault_v4",
"version": "4.0.0",
"notes": notes,
"vault_config": {
"name": f"Vault_{request.source_id or 'default'}",
"bidirectional_links": obsidian_config.get("use_bidirectional_links", True),
"parent_format": parent_format
},
"statistics": {
"total_notes": len(notes),
"total_concepts": len(set(c for chunk in chunks for c in chunk.metadata.get("main_concepts"))),
"hierarchy_levels": len(set(chunk.metadata.get("level") for chunk in chunks))
}
}
def _get_chunk_title_by_id(self, chunk_id: str) -> str:
"""Récupération titre chunk par ID pour liens Obsidian"""
if chunk_id in self._chunk_registry:
chunk = self._chunk_registry[chunk_id]
return chunk.title or chunk.metadata.get("detected_title") or f"Chunk {chunk_id[:8]}"
return f"Chunk {chunk_id[:8]}"
async def _generate_agent_knowledge_v4(self, chunks: List[SemanticChunk], request: ChunkRequest) -> Dict[str, Any]:
"""Génération base connaissance agents spécialisés v4.0"""
# Classification par type pour agents
knowledge_base = {
"principles": [],
"methods": [],
"examples": [],
"concepts": [],
"frameworks": [],
"definitions": []
}
for chunk in chunks:
chunk_type = chunk.metadata.chunk_type or "concept"
knowledge_item = {
"id": chunk.metadata.get("chunk_id"),
"content": chunk.content,
"concepts": chunk.metadata.get("main_concepts"),
"keywords": chunk.metadata.get("keywords"),
"confidence": chunk.metadata.get("confidence_score"),
"level": chunk.metadata.get("level"),
"source": chunk.metadata.get("source_title"),
"detected_title": chunk.metadata.get("detected_title"),
"relations": {
"parent": chunk.metadata.get("parent_id"),
"children": chunk.metadata.get("children_ids"),
"siblings": [chunk.metadata.get("prev_id"), chunk.metadata.get("next_id")]
}
}
# Dispatch selon type avec fallback
type_mapping = {
"principe": "principles",
"méthode": "methods",
"exemple": "examples",
"framework": "frameworks",
"définition": "definitions"
}
target_category = type_mapping.get(chunk_type, "concepts")
knowledge_base[target_category].append(knowledge_item)
return {
"format": "agent_specialist_knowledge_v4",
"version": "4.0.0",
"source_id": request.source_id,
"source_title": request.titre,
"knowledge_base": knowledge_base,
"metadata": {
"total_items": sum(len(items) for items in knowledge_base.values()),
"extraction_quality": self._calculate_extraction_quality_v4(chunks),
"specialization_domains": self._extract_domains_v4(chunks)
}
}
def _extract_concept_graph_v4(self, chunks: List[SemanticChunk]) -> Dict[str, Any]:
"""Extraction graphe concepts enrichi v4.0"""
concept_graph = {}
concept_weights = {}
for chunk in chunks:
concepts = chunk.metadata.main_concepts
for concept in concepts:
if concept not in concept_graph:
concept_graph[concept] = []
concept_weights[concept] = 0
concept_weights[concept] += 1
# Relations avec autres concepts du même chunk
for other_concept in concepts:
if (other_concept != concept and
other_concept not in concept_graph[concept]):
concept_graph[concept].append(other_concept)
return {
"format": "concept_graph_v4",
"version": "4.0.0",
"nodes": list(concept_graph.keys()),
"edges": concept_graph,
"weights": concept_weights,
"statistics": {
"total_concepts": len(concept_graph),
"total_edges": sum(len(edges) for edges in concept_graph.values()),
"avg_connections": round(
sum(len(edges) for edges in concept_graph.values()) / len(concept_graph), 2
) if concept_graph else 0
}
}
def _calculate_extraction_quality_v4(self, chunks: List[SemanticChunk]) -> float:
"""Calcul qualité extraction v4.0"""
if not chunks:
return 0.0
total_confidence = sum(chunk.metadata.get("confidence_score") or 0.5 for chunk in chunks)
avg_confidence = total_confidence / len(chunks)
concept_coverage = sum(1 for chunk in chunks if chunk.metadata.get("main_concepts")) / len(chunks)
keyword_coverage = sum(1 for chunk in chunks if chunk.metadata.get("keywords")) / len(chunks)
quality_score = (avg_confidence * 0.5 + concept_coverage * 0.3 + keyword_coverage * 0.2)
return round(quality_score, 3)
def _extract_domains_v4(self, chunks: List[SemanticChunk]) -> List[str]:
"""Extraction domaines spécialisation v4.0"""
import collections
all_concepts = []
for chunk in chunks:
all_concepts.extend(chunk.metadata.get("main_concepts"))
if not all_concepts:
return []
concept_counts = collections.Counter(all_concepts)
min_frequency = max(2, len(chunks) // 10)
domains = [
concept for concept, count in concept_counts.most_common(10)
if count >= min_frequency
]
return domains
def _build_hierarchy_levels_v4(self, chunks: List[SemanticChunk]):
"""Construction structure hiérarchique v4.0"""
from schemas import HierarchyLevel
hierarchy = {}
# Groupement par niveau
for chunk in chunks:
level = chunk.metadata.get("level")
if level not in hierarchy:
hierarchy[level] = []
hierarchy[level].append(chunk)
# Construction niveaux hiérarchiques
hierarchy_levels = []
for level, level_chunks in sorted(hierarchy.items()):
total_tokens = sum(c.metadata.tokens_count for c in level_chunks)
avg_chunk_size = total_tokens / len(level_chunks) if level_chunks else 0
hierarchy_level = HierarchyLevel(
level=level,
level_name=self._determine_chunk_level_v4(level),
chunks=level_chunks,
total_tokens=total_tokens,
avg_chunk_size=round(avg_chunk_size, 2)
)
hierarchy_levels.append(hierarchy_level)
return hierarchy_levels
def _build_source_metadata_v4(self, request: ChunkRequest) -> Dict[str, Any]:
"""Construction métadonnées source v4.0"""
return {
"source_id": request.source_id,
"titre": request.titre,
"source": request.source,
"type": request.type.value if request.type else "text",
"processing_timestamp": time.time(),
"chunker_version": "4.0.0",
"total_input_length": len(request.text),
"preprocessing_applied": True,
"chonkie_enabled": CHONKIE_AVAILABLE
}
async def _cleanup_memory_v4(self):
"""Nettoyage mémoire optimisé HF Space v4.0"""
memory_config = self.config.get("performance", {}).get("memory", {})
if memory_config.get("enable_garbage_collection", True):
# Nettoyage caches si trop volumineux
max_cache_mb = memory_config.get("max_cache_size_mb", 100)
# Estimation taille cache
cache_size_estimate = (
len(self._embedding_cache) * 0.1 +
len(self._concept_cache) * 0.01 +
len(self._text_cache) * 0.05
)
if cache_size_estimate > max_cache_mb:
# Nettoyage partiel LRU
cache_limit = max_cache_mb // 3
self._embedding_cache = dict(list(self._embedding_cache.items())[-cache_limit:])
self._concept_cache = dict(list(self._concept_cache.items())[-cache_limit:])
self._text_cache = dict(list(self._text_cache.items())[-cache_limit:])
logger.info(f"🧹 Cache nettoyé - taille réduite à ~{max_cache_mb//3}MB")
# Garbage collection Python
gc.collect()
async def health_check_v4(self) -> Dict[str, Any]:
"""Vérification santé complète v4.0"""
health_status = {
"status": "unknown",
"checks": {},
"timestamp": time.time(),
"version": "4.0.0"
}
try:
# Test initialisation
health_status["checks"]["initialization"] = self._is_initialized
if not self._is_initialized:
health_status["status"] = "not_initialized"
return health_status
# Test LLM local
if self.llm:
try:
test_response = await asyncio.wait_for(
self.llm.acomplete("Test santé"),
timeout=10
)
health_status["checks"]["llm"] = bool(test_response and test_response.text)
except Exception as e:
health_status["checks"]["llm"] = False
health_status["checks"]["llm_error"] = str(e)
else:
health_status["checks"]["llm"] = False
# Test Embedding
if self.embed_model:
try:
# test_embedding = self.embed_model.get_text_embedding("test santé")
test_embedding = await self.embed_model._aget_text_embedding("test santé")
health_status["checks"]["embedding"] = bool(test_embedding and len(test_embedding) > 0)
except Exception as e:
health_status["checks"]["embedding"] = False
health_status["checks"]["embedding_error"] = str(e)
else:
health_status["checks"]["embedding"] = False
# Test Chonkie
health_status["checks"]["chonkie_available"] = CHONKIE_AVAILABLE
health_status["checks"]["chonkie_semantic"] = self.chonkie_semantic is not None
health_status["checks"]["chonkie_recursive"] = self.chonkie_recursive is not None
# Test Parsers LlamaIndex
health_status["checks"]["sentence_splitter"] = self.sentence_splitter is not None
health_status["checks"]["markdown_parser"] = self.markdown_parser is not None
# Test Cache
health_status["checks"]["cache_functional"] = True
# Test Mémoire
memory_info = self.get_memory_usage_v4()
max_memory = self.config.get("performance", {}).get("memory", {}).get("max_memory_mb", 1800)
if "memory_usage_mb" in memory_info:
memory_ok = memory_info["memory_usage_mb"] < max_memory * 0.9
health_status["checks"]["memory"] = memory_ok
health_status["memory_usage"] = memory_info["memory_usage_mb"]
else:
health_status["checks"]["memory"] = True
# Status global
critical_checks = ["initialization", "embedding", "sentence_splitter", "markdown_parser"]
critical_passed = all(health_status["checks"].get(check, False) for check in critical_checks)
optional_checks = ["llm", "memory", "chonkie_available"]
optional_passed = sum(health_status["checks"].get(check, False) for check in optional_checks)
if critical_passed and optional_passed >= 2:
health_status["status"] = "healthy"
elif critical_passed:
health_status["status"] = "degraded"
else:
health_status["status"] = "unhealthy"
return health_status
except Exception as e:
health_status["status"] = "error"
health_status["error"] = str(e)
logger.error(f"❌ Health check v4.0 failed: {e}")
return health_status
def get_memory_usage_v4(self) -> Dict[str, Any]:
"""Monitoring mémoire détaillé v4.0"""
try:
import psutil
process = psutil.Process()
memory_info = process.memory_info()
return {
"memory_usage_mb": round(memory_info.rss / 1024 / 1024, 2),
"memory_percent": round(process.memory_percent(), 2),
"cpu_percent": round(process.cpu_percent(), 2),
"cache_sizes": {
"embedding_cache": len(self._embedding_cache),
"concept_cache": len(self._concept_cache),
"text_cache": len(self._text_cache),
"chunk_registry": len(self._chunk_registry)
},
"system_info": {
"is_initialized": self._is_initialized,
"config_loaded": bool(self.config),
"llm_available": self.llm is not None,
"embed_model_available": self.embed_model is not None,
"chonkie_available": CHONKIE_AVAILABLE
},
"thresholds": {
"max_memory_mb": self.config.get("performance", {}).get("memory", {}).get("max_memory_mb", 1800),
"cache_limit_mb": self.config.get("performance", {}).get("caching", {}).get("max_cache_size_mb", 100)
}
}
except Exception as e:
return {
"error": f"Unable to get memory info: {e}",
"fallback_info": {
"is_initialized": self._is_initialized,
"cache_sizes": {
"embedding_cache": len(self._embedding_cache),
"concept_cache": len(self._concept_cache),
"text_cache": len(self._text_cache),
"chunk_registry": len(self._chunk_registry)
}
}
}
async def get_config_info_v4(self) -> Dict[str, Any]:
"""Informations détaillées configuration v4.0"""
return {
"config_source": self.config_path,
"config_loaded": bool(self.config),
"version": "4.0.0",
"models": {
"llm_model": self.config.get("models", {}).get("llm", {}).get("model_name", "unknown"),
"embedding_model": self.config.get("models", {}).get("embedding", {}).get("model_name", "unknown"),
"chonkie_available": CHONKIE_AVAILABLE
},
"chunking_config": {
"chonkie_semantic_enabled": self.config.get("chunking", {}).get("chonkie", {}).get("semantic", {}).get("enabled", False),
"chonkie_recursive_enabled": self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}).get("enabled", False),
"hierarchical_levels": len(self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}).get("chunk_sizes", [])),
"structure_detection": {
"markdown": self.config.get("chunking", {}).get("structure_detection", {}).get("markdown", {}).get("enabled", False),
"chapters": self.config.get("chunking", {}).get("structure_detection", {}).get("chapters", {}).get("enabled", False)
}
},
"obsidian_config": {
"parent_format": self.config.get("obsidian", {}).get("parent_format", "[[{title}]], {id}"),
"bidirectional_links": self.config.get("obsidian", {}).get("use_bidirectional_links", True)
},
"performance_config": {
"max_workers": self.config.get("performance", {}).get("concurrency", {}).get("max_workers", 1),
"memory_limit_mb": self.config.get("performance", {}).get("memory", {}).get("max_memory_mb", 1800),
"caching_enabled": self.config.get("performance", {}).get("caching", {}).get("enabled", True)
}
}
async def cleanup(self):
"""Nettoyage complet des ressources v4.0"""
try:
# Nettoyage caches
self._embedding_cache.clear()
self._concept_cache.clear()
self._text_cache.clear()
self._chunk_registry.clear()
# Nettoyage modèles
if hasattr(self.embed_model, 'cleanup'):
self.embed_model.cleanup()
if hasattr(self.llm, 'cleanup'):
self.llm.cleanup()
# Nettoyage threading
if self.executor:
self.executor.shutdown(wait=False)
# Garbage collection final
gc.collect()
logger.info("🧹 SmartChunkerPipeline v4.0 nettoyé complètement")
except Exception as e:
logger.warning(f"⚠️ Erreur lors du nettoyage: {e}")
def __str__(self) -> str:
"""Représentation string du pipeline v4.0"""
return f"SmartChunkerPipeline v4.0 (initialized: {self._is_initialized}, chonkie: {CHONKIE_AVAILABLE})"
def __repr__(self) -> str:
"""Représentation détaillée du pipeline v4.0"""
return (f"SmartChunkerPipeline("
f"config_path='{self.config_path}', "
f"initialized={self._is_initialized}, "
f"llm_available={self.llm is not None}, "
f"embed_model_available={self.embed_model is not None}, "
f"chonkie_available={CHONKIE_AVAILABLE})")
# ===== POINT D'ENTRÉE POUR TESTS ET UTILISATION DIRECTE =====
if __name__ == "__main__":
import asyncio
async def test_pipeline_v4():
"""Test rapide du pipeline v4.0"""
pipeline = SmartChunkerPipeline()
try:
await pipeline.initialize()
print("✅ Pipeline v4.0 initialisé avec succès")
# Test health check
health = await pipeline.health_check_v4()
print(f"🏥 Status santé: {health['status']}")
# Test mémoire
memory = pipeline.get_memory_usage_v4()
print(f"💾 Mémoire: {memory.get('memory_usage_mb', 'N/A')} MB")
# Test configuration
config_info = await pipeline.get_config_info_v4()
print(f"⚙️ Modèles: LLM={config_info['models']['llm_model']}, Embed={config_info['models']['embedding_model']}")
print(f"🔧 Chonkie: {config_info['models']['chonkie_available']}")
# Test chunking simple
from schemas import ChunkRequest
test_request = ChunkRequest(
text="Ceci est un test de chunking sémantique intelligent. Il contient plusieurs phrases pour tester la fonctionnalité. Le système doit créer des chunks cohérents et maintenir les relations hiérarchiques.",
titre="Test Chunking v4.0",
source_id="test_001"
)
result = await pipeline.process_text(test_request)
print(f"📝 Test chunking: {result.total_chunks} chunks générés en {result.processing_time:.2f}s")
except Exception as e:
print(f"❌ Erreur test v4.0: {e}")
finally:
await pipeline.cleanup()
# Exécution du test
print("🚀 Test SmartChunkerPipeline v4.0...")
asyncio.run(test_pipeline_v4())