Spaces:
Runtime error
Runtime error
| """ | |
| Chunker Pipeline v4.0 - PARTIE 1 - CORRIGÉ ET COMPLET | |
| Chunking Sémantique Intelligent Récursif avec Parentalité | |
| CORRECTIONS MAJEURES v4.0: | |
| ✅ Chonkie au lieu de SemanticSplitterNodeParser (LlamaIndex buggy) | |
| ✅ Modèle LLM local gratuit au lieu de GPT-4o-mini | |
| ✅ Format Obsidian correct avec [[Titre]], id | |
| ✅ Variables d'environnement sécuriséesf | |
| ✅ Optimisations HF Space gratuit (2GB RAM) | |
| ✅ Relations parent/enfant bidirectionnelles complètes | |
| """ | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import re | |
| import time | |
| import hashlib | |
| import logging | |
| import yaml | |
| import asyncio | |
| import gc | |
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| from dataclasses import dataclass, asdict | |
| from concurrent.futures import ThreadPoolExecutor | |
| from pathlib import Path | |
| #from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer, AutoModel | |
| from sentence_transformers import SentenceTransformer, models, LoggingHandler | |
| # from llama_index.embeddings.interfaces import BaseEmbedding | |
| # from llama_index.core.embeddings.base import BaseEmbedding | |
| from llama_index.core.base.embeddings.base import BaseEmbedding | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.core.settings import Settings | |
| from chonkie import RecursiveChunker # ou le nom exact selon ta lib | |
| from llama_index.llms.llama_cpp import LlamaCPP # ✅ CORRECT | |
| # LlamaIndex v0.12 - IMPORTS CORRIGÉS (sans SemanticSplitterNodeParser buggy) | |
| from llama_index.core import Document, Settings | |
| from llama_index.core.node_parser import ( | |
| SentenceSplitter, # ✅ Stable et fonctionnel | |
| TokenTextSplitter, # ✅ Pour hiérarchie | |
| MarkdownNodeParser # ✅ Pour structure | |
| ) | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| # ✅ CORRECTION: Modèle LLM local gratuit au lieu de GPT-4o-mini payant | |
| from llama_index.llms.huggingface import HuggingFaceLLM | |
| # ✅ Chonkie pour chunking sémantique (remplacement SemanticSplitterNodeParser) | |
| try: | |
| from chonkie import SemanticChunker, RecursiveChunker | |
| CHONKIE_AVAILABLE = True | |
| except ImportError: | |
| CHONKIE_AVAILABLE = False | |
| logging.warning("⚠️ Chonkie non disponible - fallback vers LlamaIndex uniquement") | |
| # Imports locaux | |
| from schemas import ( | |
| ChunkRequest, ChunkResponse, SemanticChunk, | |
| ChunkMetadata, ChunkLevel, ContentType | |
| ) | |
| # Configuration logging optimisée | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ✅ Wrapper pour rendre le modèle SentenceTransformer compatible avec Chonkie | |
| # class EmbeddingWrapper: | |
| # def __init__(self, model): | |
| # self.model = model | |
| # def get_text_embedding(self, text: str) -> List[float]: | |
| # return self.model.encode([text])[0] # format LlamaIndex | |
| # def encode(self, texts: List[str]) -> List[List[float]]: | |
| # return self.model.encode(texts) # format Chonkie | |
| # class EmbeddingWrapper: | |
| # def __init__(self, model): | |
| # self.model = model | |
| # def encode(self, texts): | |
| # vectors = self.model.encode(texts) | |
| # if isinstance(vectors, list): | |
| # vectors = np.array(vectors) | |
| # return vectors | |
| # def get_text_embedding(self, text): | |
| # vec = self.model.encode([text]) | |
| # return np.array(vec[0]) if isinstance(vec, list) else vec[0] | |
| # def get_text_embeddings(self, texts): | |
| # vectors = self.model.encode(texts) | |
| # return np.array(vectors) | |
| class EmbeddingWrapper: | |
| def __init__(self, embedding): | |
| self.embedding = embedding | |
| def encode(self, texts): | |
| # Renvoie la liste des vecteurs depuis l'interface LlamaIndex | |
| return self.embedding._get_text_embeddings(texts) | |
| def get_text_embeddings(self, texts): | |
| # Alias de encode() | |
| return self.encode(texts) | |
| def get_text_embedding(self, text): | |
| # Cas d’un seul texte | |
| return self.encode([text])[0] | |
| # class EmbeddingWrapper(BaseEmbedding): | |
| # def __init__(self, model): | |
| # self.model = model | |
| # def get_text_embedding(self, text: str) -> list: | |
| # vec = self.model.encode([text]) | |
| # return vec[0] if isinstance(vec, list) else np.array(vec[0]) | |
| # def get_text_embeddings(self, texts: list) -> list: | |
| # vectors = self.model.encode(texts) | |
| # return vectors if isinstance(vectors, list) else np.array(vectors) | |
| # class EmbeddingWrapper: | |
| # def __init__(self, model): | |
| # self.model = model | |
| # def encode(self, texts): | |
| # return self.model.encode(texts) # format Chonkie | |
| # def get_text_embedding(self, text): | |
| # return self.model.encode([text])[0] # format LlamaIndex | |
| # def get_text_embeddings(self, texts): | |
| # return self.model.encode(texts) | |
| # class PatchedSentenceTransformer(SentenceTransformer): | |
| # def __init__(self, *args, **kwargs): | |
| # super().__init__(*args, **kwargs) | |
| class ChunkNode: | |
| """Représentation interne d'un chunk avec relations hiérarchiques complètes""" | |
| id: str | |
| content: str | |
| level: int | |
| title: Optional[str] = None | |
| parent_id: Optional[str] = None | |
| parent_title: Optional[str] = None # ✅ NOUVEAU: pour format Obsidian | |
| children_ids: List[str] = None | |
| prev_id: Optional[str] = None | |
| next_id: Optional[str] = None | |
| metadata: Dict[str, Any] = None | |
| confidence: float = 1.0 | |
| def __post_init__(self): | |
| if self.children_ids is None: | |
| self.children_ids = [] | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class SmartChunkerPipeline: | |
| """ | |
| Pipeline principal pour le chunking sémantique intelligent récursif v4.0 | |
| NOUVEAUTÉS v4.0: | |
| ✅ Chonkie SemanticChunker (fiable) au lieu de LlamaIndex SemanticSplitterNodeParser (buggy) | |
| ✅ HuggingFace LLM local gratuit au lieu de OpenAI GPT-4o-mini (payant) | |
| ✅ Format Obsidian correct: [[Titre Parent]], parent_id | |
| ✅ Variables d'environnement sécurisées pour HF Space | |
| ✅ Optimisations mémoire pour Space gratuit (2GB) | |
| ✅ Relations bidirectionnelles complètes | |
| """ | |
| def __init__(self, config_path: str = "config.yaml"): | |
| """Initialisation avec configuration YAML et sécurité renforcée""" | |
| self.config_path = config_path | |
| self.config = self._load_config() | |
| # Modèles IA - NOUVEAUX TYPES v4.0 | |
| self.llm = None # ✅ HuggingFace LLM local au lieu d'OpenAI | |
| self.embed_model = None # ✅ HuggingFace embedding local | |
| self.chonkie_semantic = None # ✅ Chonkie SemanticChunker | |
| self.chonkie_recursive = None # ✅ Chonkie RecursiveChunker | |
| self.sentence_splitter = None # ✅ LlamaIndex SentenceSplitter (stable) | |
| self.markdown_parser = None # ✅ MarkdownNodeParser | |
| # Cache et optimisations pour HF Space gratuit | |
| self._embedding_cache = {} | |
| self._concept_cache = {} | |
| self._text_cache = {} | |
| self._chunk_registry = {} # ✅ NOUVEAU: registry pour relations bidirectionnelles | |
| self._is_initialized = False | |
| # Threading optimisé pour Space gratuit (1 worker max) | |
| self.executor = ThreadPoolExecutor(max_workers=1) | |
| # Variables d'environnement sécurisées | |
| self._setup_environment() | |
| logger.info(f"🚀 SmartChunkerPipeline v4.0 initialisé avec config: {config_path}") | |
| def _setup_environment(self): | |
| """✅ Configuration sécurisée pour Hugging Face Space (mode gratuit, écriture uniquement dans /tmp)""" | |
| import tempfile | |
| import os | |
| tmp_dir = tempfile.gettempdir() | |
| os.environ["HF_HOME"] = os.path.join(tmp_dir, "huggingface") | |
| os.environ["TRANSFORMERS_CACHE"] = os.path.join(tmp_dir, "transformers") | |
| os.environ["HF_HUB_CACHE"] = os.path.join(tmp_dir, "hub") | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" | |
| cache_dirs = [ | |
| os.environ["HF_HOME"], | |
| os.environ["TRANSFORMERS_CACHE"], | |
| os.environ["HF_HUB_CACHE"], | |
| os.path.join(tmp_dir, "llm"), | |
| os.path.join(tmp_dir, "embeddings"), | |
| os.path.join(tmp_dir, "logs") | |
| ] | |
| print("HF_HOME:", os.environ.get("HF_HOME")) | |
| print("TRANSFORMERS_CACHE:", os.environ.get("TRANSFORMERS_CACHE")) | |
| for cache_dir in cache_dirs: | |
| try: | |
| os.makedirs(cache_dir, exist_ok=True) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Impossible de créer {cache_dir}: {e}") | |
| def _load_config(self) -> Dict[str, Any]: | |
| """Chargement configuration YAML avec fallback sécurisé""" | |
| try: | |
| if os.path.exists(self.config_path): | |
| with open(self.config_path, 'r', encoding='utf-8') as f: | |
| config = yaml.safe_load(f) | |
| logger.info(f"✅ Configuration chargée depuis {self.config_path}") | |
| return config | |
| else: | |
| logger.warning(f"⚠️ Config {self.config_path} non trouvée, utilisation config par défaut") | |
| return self._get_default_config() | |
| except Exception as e: | |
| logger.error(f"❌ Erreur chargement config: {e}") | |
| return self._get_default_config() | |
| def _get_default_config(self) -> Dict[str, Any]: | |
| """Configuration par défaut optimisée v4.0""" | |
| return { | |
| "models": { | |
| "llm": { | |
| # ✅ CORRECTION: Modèle HuggingFace local gratuit | |
| "provider": "huggingface", | |
| "model_name": "llama-2-7b-chat", # Gratuit, local, rapide | |
| "temperature": 0.1, | |
| "max_tokens": 512, | |
| "device": "cpu", # HF Space gratuit = CPU only | |
| "cache_dir": os.path.join(tempfile.gettempdir(), "llm") | |
| }, | |
| "embedding": { | |
| "provider": "huggingface", | |
| "model_name": "all-mpnet-base-v2", # Léger et performant | |
| "cache_dir": os.path.join(tempfile.gettempdir(), "embeddings"), | |
| "max_length": 512, | |
| "normalize": True, | |
| "device": "cpu" | |
| } | |
| }, | |
| "chunking": { | |
| "chonkie": { | |
| # ✅ Configuration Chonkie SemanticChunker | |
| "semantic": { | |
| "enabled": True, | |
| "threshold": 0.75, # Seuil similarité sémantique | |
| "chunk_size": 512, | |
| "min_sentences": 1, | |
| "max_sentences": 8 | |
| }, | |
| "recursive": { | |
| "enabled": True, | |
| "chunk_sizes": [2048, 512, 128], # Hiérarchie 3 niveaux | |
| "overlap": 20, | |
| "separators": ["\n\n", "\n", ".", "!", "?"] | |
| } | |
| }, | |
| "structure_detection": { | |
| "markdown": {"enabled": True}, | |
| "chapters": { | |
| "enabled": True, | |
| "patterns": [ | |
| r'CHAPITRE\s+\d+', | |
| r'SECTION\s+\d+', | |
| r'PARTIE\s+\d+', | |
| r'Chapter\s+\d+', | |
| r'^#{1,3}\s+.+$' | |
| ] | |
| } | |
| } | |
| }, | |
| "obsidian": { | |
| # ✅ CORRECTION: Format correct avec double crochets | |
| "parent_format": "[[{title}]], {id}", # [[Titre Parent]], parent_id | |
| "use_bidirectional_links": True, | |
| "frontmatter_enabled": True, | |
| "backmatter_enabled": True | |
| }, | |
| "performance": { | |
| "memory": { | |
| "max_memory_mb": 1800, # Limite HF Space gratuit | |
| "enable_garbage_collection": True, | |
| "cleanup_interval": 50 | |
| }, | |
| "concurrency": { | |
| "max_workers": 1, # HF Space gratuit limitation | |
| "timeout_seconds": 30 | |
| }, | |
| "caching": { | |
| "enabled": True, | |
| "max_cache_size_mb": 100 | |
| } | |
| } | |
| } | |
| async def initialize(self): | |
| """Initialisation complète optimisée pour HF Space gratuit v4.0""" | |
| if self._is_initialized: | |
| return | |
| try: | |
| logger.info("🚀 Initialisation chunker intelligent v4.0...") | |
| # 1. Configuration modèles depuis config | |
| llm_config = self.config.get("models", {}).get("llm", {}) | |
| embed_config = self.config.get("models", {}).get("embedding", {}) | |
| # 2. ✅ CORRECTION: HuggingFace LLM local au lieu d'OpenAI payant | |
| cache_dir_llm = llm_config.get("cache_dir", os.path.join(tempfile.gettempdir(), "llm")) | |
| os.makedirs(cache_dir_llm, exist_ok=True) | |
| self.llm = LlamaCPP( | |
| model_url="https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q4_K_M.gguf", | |
| temperature=0.1, | |
| max_new_tokens=512, | |
| context_window=2048, | |
| generate_kwargs={ | |
| "top_p": 0.95, | |
| "top_k": 50, | |
| }, | |
| model_kwargs={ | |
| "n_gpu_layers": 0, | |
| "cache_dir": cache_dir_llm, | |
| "torch_dtype": "float32", | |
| }, | |
| #tokenizer_path=None, | |
| verbose=True, | |
| ) | |
| logger.info("✅ Modèle Llama-2 7B Q4_K_M chargé avec succès !") | |
| # 3. ✅ Embedding local HuggingFace optimisé | |
| cache_dir_embed = embed_config.get("cache_dir", os.path.join(tempfile.gettempdir(), "embeddings")) | |
| os.makedirs(cache_dir_embed, exist_ok=True) | |
| # ✅ Chargement via HuggingFaceEmbedding compatible LlamaIndex | |
| # self.embed_model = HuggingFaceEmbedding( | |
| # model_name=embed_config.get("model_name", "sentence-transformers/all-MiniLM-L6-v2"), | |
| # device="cpu", # Ou "cuda" si GPU dans ton Space | |
| # cache_folder=cache_dir_embed, | |
| # ) | |
| # self.embed_model = SentenceTransformer( | |
| # embed_config.get("model_name", "thenlper/gte-small"), | |
| # cache_folder=cache_dir_embed | |
| # ) | |
| # self.embed_model.max_seq_length = embed_config.get("max_length", 512) | |
| # self.embed_model = PatchedSentenceTransformer( | |
| # embed_config.get("model_name", "all-mpnet-base-v2"), | |
| # cache_folder=cache_dir_embed | |
| # ) | |
| # self.embed_model.max_seq_length = embed_config.get("max_length", 512) | |
| # ✅ Chargement du modèle embedding | |
| # raw_model = SentenceTransformer( | |
| # embed_config.get("model_name", "sentence-transformers/all-MiniLM-L6-v2"), | |
| # cache_folder=cache_dir_embed | |
| # ) | |
| # raw_model.max_seq_length = embed_config.get("max_length", 512) | |
| # # ✅ Encapsulation dans le wrapper compatible Chonkie | |
| # self.embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| # Settings.embed_model = self.embed_model | |
| # ✅ Embedding pour LlamaIndex (HuggingFaceEmbedding encapsule déjà BaseEmbedding) | |
| sentence_model_name = embed_config.get("model_name", "sentence-transformers/all-MiniLM-L6-v2") | |
| self.embed_model = HuggingFaceEmbedding(model_name=sentence_model_name) | |
| # ✅ Nécessaire pour que LlamaIndex fonctionne | |
| Settings.embed_model = self.embed_model | |
| #embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| # self.embed_model = EmbeddingWrapper(embedding_model) | |
| # Settings.embed_model = self.embed_model | |
| # self.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| # self.embed_model = EmbeddingWrapper(HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")) | |
| # Settings.embed_model = self.embed_model | |
| # 🔐 Test de sécurité complet | |
| # try: | |
| # test_vec = self.embed_model.encode(["test phrase"]) | |
| # if not isinstance(test_vec, (list, tuple)) or len(test_vec) == 0: | |
| # raise ValueError("❌ encode() a retourné un vecteur vide ou invalide") | |
| # if not hasattr(self.embed_model, "get_text_embedding"): | |
| # raise ValueError("❌ Le modèle n’implémente pas get_text_embedding() (incompatible avec Chonkie)") | |
| # except Exception as e: | |
| # raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}") | |
| # ✅ Test de sécurité complet – version LlamaIndex v0.12 | |
| try: | |
| if not isinstance(self.embed_model, BaseEmbedding): | |
| raise ValueError("❌ Le modèle d'embedding n'est pas une instance de BaseEmbedding") | |
| logger.info("✅ Le modèle d'embedding est conforme à BaseEmbedding (test réussi)") | |
| except Exception as e: | |
| raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}") | |
| # try: | |
| # #test_vector = self.embed_model.embed(["test chunk"]) | |
| # test_vector = self.embed_model.get_text_embeddings(["test chunk"]) | |
| # # test_vector = self.embed_model.get_text_embeddings(["test chunk"]) | |
| # # test_vector = self.embed_model.encode(["test chunk"]) | |
| # if not isinstance(test_vector, (list, np.ndarray)) or len(test_vector) == 0: | |
| # raise ValueError("❌ encode() a retourné un vecteur vide (niveau 1)") | |
| # if isinstance(test_vector[0], (list, np.ndarray)) and len(test_vector[0]) == 0: | |
| # raise ValueError("❌ encode() a retourné un vecteur vide (niveau 2)") | |
| # if isinstance(test_vector, list) and isinstance(test_vector[0], float): | |
| # | |
| # # Cas particulier : encode() renvoie un seul vecteur (non batché) | |
| # logger.warning("⚠️ encode() a retourné un seul vecteur — test accepté par tolérance") | |
| # | |
| # logger.info("✅ Test de sécurité embedding réussi") | |
| # | |
| # except Exception as e: | |
| # raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}") | |
| # ✅ Test de sécurité pour HuggingFaceEmbedding | |
| # try: | |
| # test_vec = self.embed_model.get_text_embedding("test phrase") | |
| # if not test_vec or not isinstance(test_vec, list) or len(test_vec) == 0: | |
| # raise ValueError("Embedding.get_text_embedding() n’a pas renvoyé de vecteur valide") | |
| # except Exception as e: | |
| # raise ValueError(f"❌ Le modèle d'embedding a échoué au test get_text_embedding(): {e}") | |
| # ✅ Test de sécurité réel encode () | |
| # try: | |
| # test_vec = self.embed_model.encode(["test phrase"]) | |
| # if not hasattr(test_vec, "__len__") or len(test_vec) == 0: | |
| # raise ValueError("Embedding encode() n’a pas renvoyé de vecteur valide") | |
| # except Exception as e: | |
| # raise ValueError(f"❌ Le modèle d'embedding a échoué au test encode(): {e}") | |
| # if not self.embed_model or not hasattr(self.embed_model, "get_text_embedding"): | |
| # raise ValueError(f"{self.embed_model} is not a valid embedding model") | |
| # 4. Configuration Settings LlamaIndex v0.12 | |
| Settings.llm = self.llm | |
| Settings.embed_model = self.embed_model | |
| Settings.chunk_size = 512 | |
| Settings.chunk_overlap = 20 | |
| # 5. ✅ Chonkie SemanticChunker (remplacement SemanticSplitterNodeParser) | |
| if CHONKIE_AVAILABLE: | |
| await self._init_chonkie_chunkers() | |
| else: | |
| logger.warning("⚠️ Chonkie non disponible - utilisation LlamaIndex uniquement") | |
| # 6. Parsers LlamaIndex stables | |
| await self._init_llamaindex_parsers() | |
| self._is_initialized = True | |
| logger.info("✅ SmartChunkerPipeline v4.0 initialisé avec succès") | |
| except Exception as e: | |
| logger.error(f"❌ Erreur initialisation chunker v4.0: {e}") | |
| raise | |
| async def _init_chonkie_chunkers(self): | |
| """ | |
| Initialise les chunkers Chonkie : SemanticChunker et RecursiveChunker. | |
| Gestion des erreurs avec fallback propre à None si l'initialisation échoue. | |
| """ | |
| semantic_config = self.config.get("chunking", {}).get("chonkie", {}).get("semantic", {}) | |
| recursive_config = self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}) | |
| # 🔹 Initialisation du SemanticChunker (embeddings denses via SentenceTransformers) | |
| try: | |
| embedding_model = self.embed_model # Doit être déjà chargé dans initialize() | |
| # 🔐 Test de sécurité : encode() fonctionne et renvoie un vecteur correct | |
| # try: | |
| # test_vec = embedding_model.encode(["test phrase"]) | |
| # if not hasattr(test_vec, "__len__") or len(test_vec) == 0: | |
| # raise ValueError("❌ encode() a retourné un vecteur vide ou invalide") | |
| # except Exception as e: | |
| # raise ValueError(f"❌ Le modèle d'embedding a échoué au test de sécurité : {e}") | |
| self.chonkie_semantic = SemanticChunker( | |
| # embedding_model=embedding_model, | |
| # embed_model=self.embed_model, # ✅ Passe bien un objet BaseEmbedding | |
| embedding_model=self.embed_model, | |
| threshold=semantic_config.get("threshold", 0.75), | |
| chunk_size=semantic_config.get("chunk_size", 512), | |
| min_sentences=semantic_config.get("min_sentences", 1) | |
| ) | |
| logger.info("✅ SemanticChunker (Chonkie) initialisé avec succès") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur initialisation Chonkie SemanticChunker: {e}") | |
| self.chonkie_semantic = None | |
| # 🔹 Initialisation du RecursiveChunker (avec hiérarchie) | |
| try: | |
| self.chonkie_recursive = RecursiveChunker( | |
| # self.chonkie_recursive = ChonkieRecursiveChunker( | |
| # chunk_sizes=recursive_config.get("chunk_sizes", [2048, 512, 128]), | |
| # separators=recursive_config.get("separators", ["\n\n", "\n", ".", "!", "?", "—"]), | |
| # shrink_size=recursive_config.get("shrink_size", None), | |
| # preserve_separators=recursive_config.get("preserve_separators", False), | |
| include_raw_chunks=recursive_config.get("include_raw_chunks", False) | |
| ) | |
| logger.info("✅ RecursiveChunker (Chonkie) initialisé avec succès") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur initialisation Chonkie RecursiveChunker: {e}") | |
| self.chonkie_recursive = None | |
| # try: | |
| # embedding_model = self.embed_model # Doit être déjà chargé dans initialize() | |
| # if not embedding_model or not callable(getattr(embedding_model, "encode", None)) \ | |
| # or not callable(getattr(embedding_model, "get_sentence_embedding_dimension", None)): | |
| # raise ValueError(f"{embedding_model} is not a valid embedding model") | |
| # self.chonkie_semantic = SemanticChunker( | |
| # embedding_model=embedding_model, | |
| # threshold=semantic_config.get("threshold", 0.75), | |
| # chunk_size=semantic_config.get("chunk_size", 512), | |
| # min_sentences=semantic_config.get("min_sentences", 1) | |
| # ) | |
| # logger.info("✅ SemanticChunker (Chonkie) initialisé avec succès") | |
| # except Exception as e: | |
| # logger.warning(f"⚠️ Erreur initialisation Chonkie SemanticChunker: {e}") | |
| # self.chonkie_semantic = None | |
| async def _init_llamaindex_parsers(self): | |
| """Initialisation parsers LlamaIndex v0.12 STABLES""" | |
| # SentenceSplitter - stable et fiable | |
| self.sentence_splitter = SentenceSplitter( | |
| chunk_size=512, | |
| chunk_overlap=20, | |
| include_metadata=True, | |
| include_prev_next_rel=True | |
| ) | |
| # MarkdownNodeParser - pour détection structure | |
| self.markdown_parser = MarkdownNodeParser() | |
| logger.info("✅ Parsers LlamaIndex v0.12 initialisés") | |
| async def process_text(self, request: ChunkRequest) -> ChunkResponse: | |
| """ | |
| Point d'entrée principal - Workflow complet v4.0 | |
| Chunking récursif intelligent avec Chonkie + LlamaIndex optimisé | |
| """ | |
| start_time = time.time() | |
| try: | |
| if not self._is_initialized: | |
| await self.initialize() | |
| logger.info(f"📝 Début chunking intelligent v4.0: {request.titre or 'Sans titre'}") | |
| # 1. Preprocessing et nettoyage amélioré | |
| cleaned_text = self._preprocess_text_v4(request.text) | |
| # 2. Détection structure automatique avancée | |
| documents = await self._detect_structure_v4(cleaned_text, request) | |
| # 3. ✅ Chunking hiérarchique avec Chonkie (si disponible) | |
| if CHONKIE_AVAILABLE and self.chonkie_recursive: | |
| hierarchical_chunks = await self._apply_chonkie_hierarchical_chunking(documents, request) | |
| else: | |
| hierarchical_chunks = await self._apply_llamaindex_hierarchical_chunking(documents, request) | |
| # 4. ✅ Chunking sémantique avec Chonkie SemanticChunker | |
| if CHONKIE_AVAILABLE and self.chonkie_semantic: | |
| semantic_chunks = await self._apply_chonkie_semantic_chunking(hierarchical_chunks, request) | |
| else: | |
| semantic_chunks = await self._apply_fallback_semantic_chunking(hierarchical_chunks, request) | |
| # 5. ✅ Construction relations bidirectionnelles complètes | |
| enriched_chunks = await self._build_bidirectional_relationships_v4(semantic_chunks) | |
| # 6. Extraction concepts et métadonnées intelligentes | |
| final_chunks = await self._enrich_with_intelligence_v4(enriched_chunks, request) | |
| # 7. ✅ Génération exports avec format Obsidian corrigé | |
| exports = await self._generate_exports_v4(final_chunks, request) | |
| processing_time = time.time() - start_time | |
| # 8. Nettoyage mémoire automatique HF Space | |
| if self.config.get("performance", {}).get("memory", {}).get("enable_garbage_collection", True): | |
| await self._cleanup_memory_v4() | |
| # Construction réponse finale | |
| response = ChunkResponse( | |
| chunks=final_chunks, | |
| hierarchy=self._build_hierarchy_levels_v4(final_chunks), | |
| total_chunks=len(final_chunks), | |
| total_tokens=sum(c.metadata.tokens_count for c in final_chunks), | |
| processing_time=processing_time, | |
| source_metadata=self._build_source_metadata_v4(request), | |
| concept_graph=exports.get("concept_graph", {}), | |
| obsidian_export=exports.get("obsidian"), | |
| agent_knowledge=exports.get("agents") | |
| ) | |
| logger.info(f"✅ Chunking v4.0 terminé: {len(final_chunks)} chunks en {processing_time:.2f}s") | |
| return response | |
| except Exception as e: | |
| logger.error(f"❌ Erreur chunking v4.0: {e}") | |
| raise | |
| def _preprocess_text_v4(self, text: str) -> str: | |
| """Preprocessing amélioré v4.0 avec détection patterns avancés""" | |
| # Normalisation base | |
| text = re.sub(r'\r\n|\r', '\n', text) | |
| text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) | |
| text = re.sub(r'[ \t]+', ' ', text) | |
| # ✅ NOUVEAU v4.0: Nettoyage patterns spécifiques | |
| # Suppression références inutiles | |
| text = re.sub(r'\[?\d+\]?', '', text) # Références numériques [1], [2] | |
| text = re.sub(r'http[s]?://\S+', '[URL]', text) # URLs anonymisées | |
| # Normalisation caractères spéciaux | |
| text = text.replace('"', '"').replace('"', '"') | |
| text = text.replace(''', "'").replace(''', "'") | |
| text = text.replace('–', '-').replace('—', '-') | |
| # ✅ Préservation structure importante | |
| # Protéger patterns structurels importants | |
| text = re.sub(r'^(CHAPITRE|SECTION|PARTIE)\s+', r'\n\n\1 ', text, flags=re.MULTILINE | re.IGNORECASE) | |
| return text.strip() | |
| async def _detect_structure_v4(self, text: str, request: ChunkRequest) -> List[Document]: | |
| """Détection structure automatique améliorée v4.0""" | |
| documents = [] | |
| structure_config = self.config.get("chunking", {}).get("structure_detection", {}) | |
| # 1. ✅ Détection Markdown avancée | |
| if structure_config.get("markdown", {}).get("enabled", True) and self._has_markdown_structure_v4(text): | |
| logger.info("📄 Structure Markdown détectée (v4.0)") | |
| documents = await self._split_markdown_structure_v4(text, request) | |
| # 2. ✅ Détection chapitres/sections améliorée | |
| elif structure_config.get("chapters", {}).get("enabled", True) and self._has_chapter_structure_v4(text): | |
| logger.info("📚 Structure chapitres détectée (v4.0)") | |
| documents = await self._split_by_chapters_v4(text, request) | |
| # 3. ✅ Fallback intelligent par paragraphes | |
| else: | |
| logger.info("📝 Texte brut - découpage intelligent par paragraphes (v4.0)") | |
| documents = await self._split_by_paragraphs_v4(text, request) | |
| return documents | |
| def _has_markdown_structure_v4(self, text: str) -> bool: | |
| """Détection Markdown améliorée avec scoring pondéré v4.0""" | |
| patterns_weighted = [ | |
| (r'^#{1,6}\s+.+$', 5), # Titres (poids fort) | |
| (r'^\*\*.*\*\*$', 2), # Gras | |
| (r'^\*\s+.+$', 2), # Listes puces | |
| (r'^\d+\.\s+.+$', 2), # Listes numérotées | |
| (r'```[\s\S]*?```', 3), # Code blocks | |
| (r'^\|.*\|$', 2), # Tableaux | |
| (r'\[.*\]\(.*\)', 1), # Liens | |
| (r'^>\s+.+$', 1), # Citations | |
| ] | |
| total_score = 0 | |
| total_lines = len(text.split('\n')) | |
| for pattern, weight in patterns_weighted: | |
| matches = len(re.findall(pattern, text, re.MULTILINE)) | |
| total_score += matches * weight | |
| # Score normalisé | |
| score_threshold = self.config.get("chunking", {}).get("structure_detection", {}).get("markdown", {}).get("minimum_score", 0.15) | |
| normalized_score = (total_score / total_lines) if total_lines > 0 else 0 | |
| return normalized_score > score_threshold | |
| def _has_chapter_structure_v4(self, text: str) -> bool: | |
| """Détection chapitres améliorée v4.0""" | |
| patterns = self.config.get("chunking", {}).get("structure_detection", {}).get("chapters", {}).get("patterns", []) | |
| chapter_count = 0 | |
| for pattern in patterns: | |
| matches = len(re.findall(pattern, text, re.IGNORECASE | re.MULTILINE)) | |
| chapter_count += matches | |
| # Seuil adaptatif selon longueur texte | |
| text_length = len(text.split()) | |
| if text_length < 1000: | |
| min_chapters = 2 | |
| elif text_length < 5000: | |
| min_chapters = 3 | |
| else: | |
| min_chapters = 4 | |
| return chapter_count >= min_chapters | |
| async def _split_by_paragraphs_v4(self, text: str, request: ChunkRequest) -> List[Document]: | |
| """✅ NOUVEAU: Fallback intelligent par paragraphes v4.0""" | |
| documents = [] | |
| # Division par paragraphes avec logique intelligente | |
| paragraphs = re.split(r'\n\s*\n', text) | |
| current_section = "" | |
| section_index = 0 | |
| for para_idx, paragraph in enumerate(paragraphs): | |
| paragraph = paragraph.strip() | |
| if not paragraph: | |
| continue | |
| # Logique de regroupement intelligent | |
| # Si paragraphe court (< 100 mots), regrouper avec suivant | |
| word_count = len(paragraph.split()) | |
| if word_count < 100 and para_idx < len(paragraphs) - 1: | |
| current_section += paragraph + "\n\n" | |
| else: | |
| current_section += paragraph | |
| # Créer document si assez de contenu | |
| if len(current_section.split()) >= 50: # Minimum 50 mots | |
| documents.append(Document( | |
| text=current_section.strip(), | |
| metadata={ | |
| "structure_type": "paragraph_group", | |
| "section_index": section_index, | |
| "word_count": len(current_section.split()), | |
| "source_id": request.source_id, | |
| "titre": request.titre, | |
| "level": 0 | |
| } | |
| )) | |
| section_index += 1 | |
| current_section = "" | |
| # Dernier section si non vide | |
| if current_section.strip() and len(current_section.split()) >= 20: | |
| documents.append(Document( | |
| text=current_section.strip(), | |
| metadata={ | |
| "structure_type": "paragraph_group", | |
| "section_index": section_index, | |
| "word_count": len(current_section.split()), | |
| "source_id": request.source_id, | |
| "titre": request.titre, | |
| "level": 0 | |
| } | |
| )) | |
| return documents | |
| """ | |
| Chunker Pipeline v4.0 - PARTIE 2 - CORRIGÉ ET COMPLET | |
| Suite et fin de chunker_pipeline_v4_part1.py | |
| CONTINUATION DES MÉTHODES: | |
| ✅ Chunking Chonkie hiérarchique et sémantique | |
| ✅ Relations bidirectionnelles complètes | |
| ✅ Export Obsidian avec format [[Titre]], id | |
| ✅ Génération agents spécialisés | |
| ✅ Nettoyage mémoire optimisé HF Space | |
| ✅ Health check et monitoring | |
| """ | |
| async def _split_markdown_structure_v4(self, text: str, request: ChunkRequest) -> List[Document]: | |
| """Division Markdown hiérarchique améliorée v4.0""" | |
| documents = [] | |
| # Utilisation MarkdownNodeParser de LlamaIndex | |
| try: | |
| markdown_docs = [Document(text=text)] | |
| nodes = self.markdown_parser.get_nodes_from_documents(markdown_docs) | |
| for node_idx, node in enumerate(nodes): | |
| # Détection niveau hiérarchique depuis le contenu | |
| title_match = re.search(r'^(#{1,6})\s+(.+)$', node.text, re.MULTILINE) | |
| level = len(title_match.group(1)) if title_match else 0 | |
| detected_title = title_match.group(2).strip() if title_match else None | |
| documents.append(Document( | |
| text=node.text, | |
| metadata={ | |
| "structure_type": "markdown", | |
| "detected_title": detected_title, | |
| "level": level, | |
| "node_index": node_idx, | |
| "source_id": request.source_id, | |
| "titre": request.titre, | |
| **node.metadata | |
| } | |
| )) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur parsing Markdown: {e}") | |
| # Fallback manuel | |
| return await self._split_by_paragraphs_v4(text, request) | |
| return documents | |
| async def _split_by_chapters_v4(self, text: str, request: ChunkRequest) -> List[Document]: | |
| """Division par chapitres avec patterns configurables v4.0""" | |
| documents = [] | |
| patterns = self.config.get("chunking", {}).get("structure_detection", {}).get("chapters", {}).get("patterns", []) | |
| # Pattern combiné pour division | |
| combined_pattern = '|'.join(f'({p})' for p in patterns) | |
| try: | |
| parts = re.split(f'({combined_pattern})', text, flags=re.IGNORECASE | re.MULTILINE) | |
| current_title = None | |
| current_content = "" | |
| chapter_index = 0 | |
| for part in parts: | |
| part = part.strip() | |
| if not part: | |
| continue | |
| # Vérifier si c'est un titre de chapitre | |
| is_title = any(re.match(pattern, part, re.IGNORECASE) for pattern in patterns) | |
| if is_title: | |
| # Sauvegarder chapitre précédent | |
| if current_content.strip() and len(current_content.split()) >= 30: | |
| documents.append(Document( | |
| text=current_content.strip(), | |
| metadata={ | |
| "structure_type": "chapter", | |
| "detected_title": current_title, | |
| "level": 1, | |
| "chapter_index": chapter_index, | |
| "word_count": len(current_content.split()), | |
| "source_id": request.source_id, | |
| "titre": request.titre | |
| } | |
| )) | |
| chapter_index += 1 | |
| current_title = part | |
| current_content = part + "\n\n" | |
| else: | |
| current_content += part + "\n" | |
| # Dernier chapitre | |
| if current_content.strip() and len(current_content.split()) >= 20: | |
| documents.append(Document( | |
| text=current_content.strip(), | |
| metadata={ | |
| "structure_type": "chapter", | |
| "detected_title": current_title, | |
| "level": 1, | |
| "chapter_index": chapter_index, | |
| "word_count": len(current_content.split()), | |
| "source_id": request.source_id, | |
| "titre": request.titre | |
| } | |
| )) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur parsing chapitres: {e}") | |
| return await self._split_by_paragraphs_v4(text, request) | |
| return documents | |
| async def _apply_chonkie_hierarchical_chunking(self, documents: List[Document], request: ChunkRequest) -> List[ChunkNode]: | |
| """✅ Chunking hiérarchique avec Chonkie RecursiveChunker v4.0""" | |
| all_chunks = [] | |
| for doc_idx, document in enumerate(documents): | |
| try: | |
| # Utilisation Chonkie RecursiveChunker | |
| chunks = self.chonkie_recursive.chunk(document.text) | |
| for chunk_idx, chunk in enumerate(chunks): | |
| chunk_node = ChunkNode( | |
| id=self._generate_chunk_id_v4(chunk.text, doc_idx, 0, chunk_idx), | |
| content=chunk.text, | |
| level=0, # Niveau base pour Chonkie | |
| title=document.metadata.get("detected_title"), | |
| metadata={ | |
| **document.metadata, | |
| "chunker": "chonkie_recursive", | |
| "doc_index": doc_idx, | |
| "chunk_index": chunk_idx, | |
| "token_count": len(chunk.text.split()), | |
| "original_chunk_size": getattr(chunk, 'token_count', len(chunk.text.split())) | |
| } | |
| ) | |
| all_chunks.append(chunk_node) | |
| # Enregistrement dans registry pour relations | |
| self._chunk_registry[chunk_node.id] = chunk_node | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur Chonkie hierarchical chunking doc {doc_idx}: {e}") | |
| # Fallback vers LlamaIndex | |
| fallback_chunks = await self._apply_llamaindex_hierarchical_chunking([document], request) | |
| all_chunks.extend(fallback_chunks) | |
| return all_chunks | |
| async def _apply_llamaindex_hierarchical_chunking(self, documents: List[Document], request: ChunkRequest) -> List[ChunkNode]: | |
| """Fallback chunking hiérarchique avec LlamaIndex SentenceSplitter v4.0""" | |
| all_chunks = [] | |
| chunk_sizes = self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}).get("chunk_sizes", [2048, 512, 128]) | |
| for doc_idx, document in enumerate(documents): | |
| try: | |
| # Application chunking multi-niveaux | |
| for level, chunk_size in enumerate(chunk_sizes): | |
| # Configuration splitter pour ce niveau | |
| splitter = SentenceSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=20, | |
| include_metadata=True, | |
| include_prev_next_rel=True | |
| ) | |
| # Application du splitter | |
| if level == 0: | |
| nodes = splitter.get_nodes_from_documents([document]) | |
| else: | |
| # Subdiviser chunks du niveau précédent | |
| prev_level_chunks = [c for c in all_chunks if c.level == level - 1 and c.metadata.get("doc_index") == doc_idx] | |
| nodes = [] | |
| for parent_chunk in prev_level_chunks: | |
| sub_doc = Document(text=parent_chunk.content, metadata=parent_chunk.metadata) | |
| sub_nodes = splitter.get_nodes_from_documents([sub_doc]) | |
| for sub_node in sub_nodes: | |
| sub_node.metadata["parent_chunk_id"] = parent_chunk.id | |
| nodes.append(sub_node) | |
| # Conversion en ChunkNode | |
| for node_idx, node in enumerate(nodes): | |
| chunk_node = ChunkNode( | |
| id=self._generate_chunk_id_v4(node.text, doc_idx, level, node_idx), | |
| content=node.text, | |
| level=level, | |
| title=document.metadata.get("detected_title"), | |
| parent_id=node.metadata.get("parent_chunk_id"), | |
| metadata={ | |
| **node.metadata, | |
| "chunker": "llamaindex_sentence", | |
| "doc_index": doc_idx, | |
| "level": level, | |
| "node_index": node_idx, | |
| "chunk_size_used": chunk_size | |
| } | |
| ) | |
| all_chunks.append(chunk_node) | |
| # Enregistrement dans registry | |
| self._chunk_registry[chunk_node.id] = chunk_node | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur LlamaIndex hierarchical chunking doc {doc_idx}: {e}") | |
| # Fallback simple | |
| fallback_chunk = ChunkNode( | |
| id=f"fallback_{doc_idx}_{int(time.time())}", | |
| content=document.text, | |
| level=0, | |
| title=document.metadata.get("detected_title"), | |
| metadata={"fallback": True, "doc_index": doc_idx} | |
| ) | |
| all_chunks.append(fallback_chunk) | |
| self._chunk_registry[fallback_chunk.id] = fallback_chunk | |
| return all_chunks | |
| async def _apply_chonkie_semantic_chunking(self, chunk_nodes: List[ChunkNode], request: ChunkRequest) -> List[ChunkNode]: | |
| """✅ Chunking sémantique avec Chonkie SemanticChunker v4.0""" | |
| semantic_chunks = [] | |
| for chunk_node in chunk_nodes: | |
| try: | |
| # Vérifier si chunking sémantique nécessaire | |
| if len(chunk_node.content.split()) < 20: # Trop petit | |
| semantic_chunks.append(chunk_node) | |
| continue | |
| # Application Chonkie SemanticChunker | |
| chunks = self.chonkie_semantic.chunk(chunk_node.content) | |
| # Si un seul chunk retourné, garder l'original | |
| if len(chunks) <= 1: | |
| semantic_chunks.append(chunk_node) | |
| continue | |
| # Conversion des chunks sémantiques | |
| for sem_idx, chunk in enumerate(chunks): | |
| semantic_chunk = ChunkNode( | |
| id=f"{chunk_node.id}_sem_{sem_idx}", | |
| content=chunk.text, | |
| level=chunk_node.level + 1, | |
| title=chunk_node.title, | |
| parent_id=chunk_node.id, | |
| parent_title=chunk_node.title, # ✅ Pour format Obsidian | |
| metadata={ | |
| **chunk_node.metadata, | |
| "chunker": "chonkie_semantic", | |
| "semantic_index": sem_idx, | |
| "parent_chunk_id": chunk_node.id, | |
| "semantic_similarity": getattr(chunk, 'similarity_score', 0.75) | |
| } | |
| ) | |
| semantic_chunks.append(semantic_chunk) | |
| # Mise à jour relations parent | |
| if semantic_chunk.id not in chunk_node.children_ids: | |
| chunk_node.children_ids.append(semantic_chunk.id) | |
| # Enregistrement registry | |
| self._chunk_registry[semantic_chunk.id] = semantic_chunk | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur Chonkie semantic chunking {chunk_node.id}: {e}") | |
| semantic_chunks.append(chunk_node) | |
| return semantic_chunks | |
| async def _apply_fallback_semantic_chunking(self, chunk_nodes: List[ChunkNode], request: ChunkRequest) -> List[ChunkNode]: | |
| """Fallback chunking sémantique sans Chonkie v4.0""" | |
| semantic_chunks = [] | |
| for chunk_node in chunk_nodes: | |
| try: | |
| # Chunking simple par phrases si texte assez long | |
| if len(chunk_node.content.split()) >= 50: | |
| sentences = self._split_into_sentences_v4(chunk_node.content) | |
| # Regroupement par groupes de 3-5 phrases | |
| buffer_size = min(5, max(2, len(sentences) // 3)) | |
| sentence_groups = [sentences[i:i+buffer_size] for i in range(0, len(sentences), buffer_size)] | |
| for group_idx, group in enumerate(sentence_groups): | |
| if len(group) == 0: | |
| continue | |
| group_text = ' '.join(group) | |
| if len(group_text.split()) < 10: # Trop petit | |
| continue | |
| semantic_chunk = ChunkNode( | |
| id=f"{chunk_node.id}_fallback_sem_{group_idx}", | |
| content=group_text, | |
| level=chunk_node.level + 1, | |
| title=chunk_node.title, | |
| parent_id=chunk_node.id, | |
| parent_title=chunk_node.title, | |
| metadata={ | |
| **chunk_node.metadata, | |
| "chunker": "fallback_semantic", | |
| "semantic_index": group_idx, | |
| "sentences_count": len(group) | |
| } | |
| ) | |
| semantic_chunks.append(semantic_chunk) | |
| # Mise à jour relations | |
| if semantic_chunk.id not in chunk_node.children_ids: | |
| chunk_node.children_ids.append(semantic_chunk.id) | |
| self._chunk_registry[semantic_chunk.id] = semantic_chunk | |
| else: | |
| # Garder chunk original si trop petit | |
| semantic_chunks.append(chunk_node) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur fallback semantic chunking {chunk_node.id}: {e}") | |
| semantic_chunks.append(chunk_node) | |
| return semantic_chunks | |
| def _split_into_sentences_v4(self, text: str) -> List[str]: | |
| """Division en phrases améliorée v4.0""" | |
| # Patterns pour fins de phrases | |
| sentence_endings = r'[.!?]+(?:\s|$|")' | |
| # Split avec préservation des points dans acronymes | |
| sentences = re.split(sentence_endings, text) | |
| # Nettoyage et filtrage | |
| clean_sentences = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and not re.match(r'^[A-Z]{2,}\.?$', sentence): # Éviter acronymes | |
| clean_sentences.append(sentence) | |
| return clean_sentences | |
| async def _build_bidirectional_relationships_v4(self, chunks: List[ChunkNode]) -> List[ChunkNode]: | |
| """✅ Construction relations bidirectionnelles complètes v4.0""" | |
| # 1. Tri par niveau et index pour relations séquentielles | |
| sorted_chunks = sorted(chunks, key=lambda x: (x.level, x.metadata.get("node_index", 0))) | |
| # 2. Construction relations prev/next par niveau | |
| level_groups = {} | |
| for chunk in sorted_chunks: | |
| level = chunk.level | |
| if level not in level_groups: | |
| level_groups[level] = [] | |
| level_groups[level].append(chunk) | |
| for level, level_chunks in level_groups.items(): | |
| for i, chunk in enumerate(level_chunks): | |
| # Relations précédent/suivant | |
| if i > 0: | |
| chunk.prev_id = level_chunks[i-1].id | |
| if i < len(level_chunks) - 1: | |
| chunk.next_id = level_chunks[i+1].id | |
| # 3. Validation et correction relations parent/enfant | |
| for chunk in chunks: | |
| # Validation parent existe | |
| if chunk.parent_id and chunk.parent_id in self._chunk_registry: | |
| parent = self._chunk_registry[chunk.parent_id] | |
| # Mise à jour titre parent pour Obsidian | |
| chunk.parent_title = parent.title or parent.metadata.get("detected_title") or f"Chunk {parent.id[:8]}" | |
| # Ajout enfant dans parent | |
| if chunk.id not in parent.children_ids: | |
| parent.children_ids.append(chunk.id) | |
| # Validation enfants existent | |
| valid_children = [] | |
| for child_id in chunk.children_ids: | |
| if child_id in self._chunk_registry: | |
| child = self._chunk_registry[child_id] | |
| child.parent_id = chunk.id | |
| child.parent_title = chunk.title or chunk.metadata.get("detected_title") or f"Chunk {chunk.id[:8]}" | |
| valid_children.append(child_id) | |
| chunk.children_ids = valid_children | |
| return chunks | |
| async def _enrich_with_intelligence_v4(self, chunks: List[ChunkNode], request: ChunkRequest) -> List[SemanticChunk]: | |
| """Enrichissement intelligent avec concepts et métadonnées v4.0""" | |
| semantic_chunks = [] | |
| for chunk_idx, chunk_node in enumerate(chunks): | |
| # Génération métadonnées enrichies | |
| metadata = ChunkMetadata( | |
| chunk_id=chunk_node.id, | |
| level=chunk_node.level, | |
| level_name=self._determine_chunk_level_v4(chunk_node.level), | |
| parent_id=chunk_node.parent_id, | |
| children_ids=chunk_node.children_ids, | |
| prev_id=chunk_node.prev_id, | |
| next_id=chunk_node.next_id, | |
| global_index=chunk_idx, | |
| local_index=chunk_node.metadata.get("node_index", 0), | |
| source_id=request.source_id, | |
| source_title=request.titre, | |
| source_url=request.source, | |
| content_type=request.type or ContentType.TEXT, | |
| tokens_count=len(chunk_node.content.split()), | |
| sentences_count=len(chunk_node.content.split('.')), | |
| detected_title=chunk_node.title or chunk_node.metadata.get("detected_title"), | |
| main_concepts=[], | |
| keywords=[], | |
| chunk_type=None, | |
| confidence_score=chunk_node.confidence, | |
| contextual_summary=None, | |
| related_chunks=[] | |
| ) | |
| # Création SemanticChunk | |
| semantic_chunk = SemanticChunk( | |
| content=chunk_node.content, | |
| metadata=metadata, | |
| embedding=None, # Optionnel pour économie bande passante | |
| similarity_scores={} | |
| ) | |
| # Enrichissement intelligent avec LLM (si disponible et activé) | |
| if (request.include_metadata and | |
| self.llm and | |
| len(chunk_node.content.split()) >= 20): # Seulement si chunk assez long | |
| try: | |
| await self._extract_semantic_intelligence_v4(semantic_chunk) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Extraction intelligente échouée {chunk_node.id}: {e}") | |
| # Fallback extraction simple | |
| await self._extract_simple_keywords_v4(semantic_chunk) | |
| else: | |
| # Extraction simple par défaut | |
| await self._extract_simple_keywords_v4(semantic_chunk) | |
| semantic_chunks.append(semantic_chunk) | |
| return semantic_chunks | |
| async def _extract_semantic_intelligence_v4(self, chunk: SemanticChunk): | |
| """Extraction sémantique avancée avec LLM local v4.0""" | |
| try: | |
| # Prompt optimisé pour modèle local | |
| prompt = f"""Analyser ce texte et extraire: | |
| 1. 3 concepts principaux (séparés par virgules) | |
| 2. 5 mots-clés (séparés par virgules) | |
| 3. Type: concept/principe/méthode/exemple/définition | |
| Texte: {chunk.content[:400]} | |
| Format: | |
| Concepts: concept1, concept2, concept3 | |
| Mots-clés: mot1, mot2, mot3, mot4, mot5 | |
| Type: type_détecté""" | |
| response = await self.llm.acomplete(prompt) | |
| result_text = response.text | |
| # Parsing robuste | |
| concepts = [] | |
| keywords = [] | |
| chunk_type = "concept" | |
| # Extraction concepts | |
| if "Concepts:" in result_text: | |
| concepts_line = result_text.split("Concepts:")[1].split("\n")[0] | |
| concepts = [c.strip() for c in concepts_line.split(",") if c.strip()][:3] | |
| # Extraction mots-clés | |
| if "Mots-clés:" in result_text: | |
| keywords_line = result_text.split("Mots-clés:")[1].split("\n")[0] | |
| keywords = [k.strip() for k in keywords_line.split(",") if k.strip()][:5] | |
| # Extraction type | |
| if "Type:" in result_text: | |
| type_line = result_text.split("Type:")[1].split("\n")[0] | |
| extracted_type = type_line.strip().lower() | |
| valid_types = ["concept", "principe", "méthode", "exemple", "définition", "framework"] | |
| if extracted_type in valid_types: | |
| chunk_type = extracted_type | |
| # Mise à jour chunk | |
| chunk.metadata.main_concepts = concepts | |
| chunk.metadata.keywords = keywords | |
| chunk.metadata.chunk_type = chunk_type | |
| except Exception as e: | |
| logger.warning(f"⚠️ Extraction LLM échouée: {e}") | |
| await self._extract_simple_keywords_v4(chunk) | |
| async def _extract_simple_keywords_v4(self, chunk: SemanticChunk): | |
| """Extraction simple mots-clés par fréquence v4.0""" | |
| import collections | |
| # Stop words français et anglais | |
| stop_words = { | |
| "le", "la", "les", "un", "une", "des", "de", "du", "et", "ou", "mais", "donc", "car", | |
| "pour", "par", "avec", "sans", "dans", "sur", "sous", "ce", "cette", "ces", "il", | |
| "elle", "ils", "elles", "que", "qui", "quoi", "dont", "où", "the", "a", "an", "and", | |
| "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "this", "that", "is", "are" | |
| } | |
| # Extraction mots significatifs | |
| words = re.findall(r'\b[a-zA-ZÀ-ÿ]{3,}\b', chunk.content.lower()) | |
| words = [w for w in words if w not in stop_words and len(w) > 2] | |
| # Comptage fréquences | |
| word_counts = collections.Counter(words) | |
| top_words = [word for word, count in word_counts.most_common(5)] | |
| # Extraction concepts simples (mots capitalisés ou répétés) | |
| concept_candidates = re.findall(r'\b[A-ZÀ-Ÿ][a-zA-ZÀ-ÿ]{4,}\b', chunk.content) | |
| concepts = list(set(concept_candidates))[:3] | |
| # Mise à jour | |
| chunk.metadata.keywords = top_words | |
| chunk.metadata.main_concepts = concepts if concepts else top_words[:3] | |
| chunk.metadata.chunk_type = "concept" | |
| def _determine_chunk_level_v4(self, level: int) -> ChunkLevel: | |
| """Mapping niveau vers ChunkLevel enum v4.0""" | |
| mapping = { | |
| 0: ChunkLevel.DOCUMENT, | |
| 1: ChunkLevel.CHAPTER, | |
| 2: ChunkLevel.SECTION, | |
| 3: ChunkLevel.SUBSECTION, | |
| 4: ChunkLevel.CONCEPT, | |
| 5: ChunkLevel.DETAIL | |
| } | |
| return mapping.get(level, ChunkLevel.DETAIL) | |
| def _generate_chunk_id_v4(self, text: str, doc_idx: int, level: int, node_idx: int) -> str: | |
| """Génération ID unique traçable v4.0""" | |
| content_hash = hashlib.md5(text.encode()).hexdigest()[:8] | |
| timestamp = int(time.time()) % 10000 | |
| return f"chk_{doc_idx:02d}_{level}_{node_idx:03d}_{content_hash}_{timestamp}" | |
| async def _generate_exports_v4(self, chunks: List[SemanticChunk], request: ChunkRequest) -> Dict[str, Any]: | |
| """Génération exports Second Cerveau et Agents v4.0""" | |
| exports = {} | |
| # Export Obsidian avec format corrigé | |
| exports["obsidian"] = await self._generate_obsidian_export_v4(chunks, request) | |
| # Export Agents spécialisés | |
| exports["agents"] = await self._generate_agent_knowledge_v4(chunks, request) | |
| # Export graphe concepts | |
| exports["concept_graph"] = self._extract_concept_graph_v4(chunks) | |
| return exports | |
| async def _generate_obsidian_export_v4(self, chunks: List[SemanticChunk], request: ChunkRequest) -> Dict[str, Any]: | |
| """✅ Export Obsidian avec format [[Titre]], id corrigé v4.0""" | |
| obsidian_config = self.config.get("obsidian", {}) | |
| parent_format = obsidian_config.get("parent_format", "[[{title}]], {id}") | |
| notes = [] | |
| for chunk in chunks: | |
| # ✅ Format parent corrigé selon tes spécifications | |
| parent_link = None | |
| if chunk.metadata.parent_id and chunk.metadata.get("parent_id") in self._chunk_registry: | |
| parent_chunk = self._chunk_registry[chunk.metadata.get("parent_id")] | |
| parent_title = parent_chunk.title or parent_chunk.metadata.get("detected_title") or f"Chunk {parent_chunk.id[:8]}" | |
| # parent_title = parent_chunk.title or parent_chunk.metadata.detected_title or f"Chunk {parent_chunk.id[:8]}" | |
| parent_link = parent_format.format( | |
| title=parent_title, | |
| id=chunk.metadata.get("parent_id") | |
| ) | |
| # Construction note complète | |
| note = { | |
| "filename": f"{chunk.metadata.chunk_id}.md", | |
| # "title": chunk.metadata.detected_title or f"Note {chunk.metadata.chunk_id[:8]}", | |
| "title": chunk.metadata.get("detected_title") or f"Note {chunk.metadata.chunk_id[:8]}", | |
| "content": chunk.content, | |
| # Front matter | |
| "frontmatter": { | |
| "id": chunk.metadata.get("chunk_id"), | |
| "title": chunk.metadata.get("detected_title"), | |
| "level": chunk.metadata.level_name.value, | |
| "concepts": chunk.metadata.get("main_concepts"), | |
| "tags": chunk.metadata.get("keywords"), | |
| "source": chunk.metadata.get("source_title"), | |
| "source_url": chunk.metadata.get("source_url"), | |
| "created": time.strftime("%Y-%m-%d"), | |
| "type": chunk.metadata.get("chunk_type"), | |
| "confidence": chunk.metadata.get("confidence_score"), | |
| "tokens": chunk.metadata.get("tokens_count") | |
| }, | |
| # ✅ Back matter avec format corrigé | |
| "backmatter": { | |
| "basé_sur": parent_link, # Format: [[Titre Parent]], parent_id | |
| "parent_id": chunk.metadata.get("parent_id"), | |
| "enfants": [ | |
| parent_format.format( | |
| title=self._get_chunk_title_by_id(child_id), | |
| id=child_id | |
| ) for child_id in chunk.metadata.get("children_ids") | |
| ], | |
| "précédent": chunk.metadata.get("prev_id"), | |
| "suivant": chunk.metadata.get("next_id"), | |
| "niveau": chunk.metadata.get("level"), | |
| "confiance": chunk.metadata.get("confidence_score") | |
| } | |
| } | |
| notes.append(note) | |
| return { | |
| "format": "obsidian_vault_v4", | |
| "version": "4.0.0", | |
| "notes": notes, | |
| "vault_config": { | |
| "name": f"Vault_{request.source_id or 'default'}", | |
| "bidirectional_links": obsidian_config.get("use_bidirectional_links", True), | |
| "parent_format": parent_format | |
| }, | |
| "statistics": { | |
| "total_notes": len(notes), | |
| "total_concepts": len(set(c for chunk in chunks for c in chunk.metadata.get("main_concepts"))), | |
| "hierarchy_levels": len(set(chunk.metadata.get("level") for chunk in chunks)) | |
| } | |
| } | |
| def _get_chunk_title_by_id(self, chunk_id: str) -> str: | |
| """Récupération titre chunk par ID pour liens Obsidian""" | |
| if chunk_id in self._chunk_registry: | |
| chunk = self._chunk_registry[chunk_id] | |
| return chunk.title or chunk.metadata.get("detected_title") or f"Chunk {chunk_id[:8]}" | |
| return f"Chunk {chunk_id[:8]}" | |
| async def _generate_agent_knowledge_v4(self, chunks: List[SemanticChunk], request: ChunkRequest) -> Dict[str, Any]: | |
| """Génération base connaissance agents spécialisés v4.0""" | |
| # Classification par type pour agents | |
| knowledge_base = { | |
| "principles": [], | |
| "methods": [], | |
| "examples": [], | |
| "concepts": [], | |
| "frameworks": [], | |
| "definitions": [] | |
| } | |
| for chunk in chunks: | |
| chunk_type = chunk.metadata.chunk_type or "concept" | |
| knowledge_item = { | |
| "id": chunk.metadata.get("chunk_id"), | |
| "content": chunk.content, | |
| "concepts": chunk.metadata.get("main_concepts"), | |
| "keywords": chunk.metadata.get("keywords"), | |
| "confidence": chunk.metadata.get("confidence_score"), | |
| "level": chunk.metadata.get("level"), | |
| "source": chunk.metadata.get("source_title"), | |
| "detected_title": chunk.metadata.get("detected_title"), | |
| "relations": { | |
| "parent": chunk.metadata.get("parent_id"), | |
| "children": chunk.metadata.get("children_ids"), | |
| "siblings": [chunk.metadata.get("prev_id"), chunk.metadata.get("next_id")] | |
| } | |
| } | |
| # Dispatch selon type avec fallback | |
| type_mapping = { | |
| "principe": "principles", | |
| "méthode": "methods", | |
| "exemple": "examples", | |
| "framework": "frameworks", | |
| "définition": "definitions" | |
| } | |
| target_category = type_mapping.get(chunk_type, "concepts") | |
| knowledge_base[target_category].append(knowledge_item) | |
| return { | |
| "format": "agent_specialist_knowledge_v4", | |
| "version": "4.0.0", | |
| "source_id": request.source_id, | |
| "source_title": request.titre, | |
| "knowledge_base": knowledge_base, | |
| "metadata": { | |
| "total_items": sum(len(items) for items in knowledge_base.values()), | |
| "extraction_quality": self._calculate_extraction_quality_v4(chunks), | |
| "specialization_domains": self._extract_domains_v4(chunks) | |
| } | |
| } | |
| def _extract_concept_graph_v4(self, chunks: List[SemanticChunk]) -> Dict[str, Any]: | |
| """Extraction graphe concepts enrichi v4.0""" | |
| concept_graph = {} | |
| concept_weights = {} | |
| for chunk in chunks: | |
| concepts = chunk.metadata.main_concepts | |
| for concept in concepts: | |
| if concept not in concept_graph: | |
| concept_graph[concept] = [] | |
| concept_weights[concept] = 0 | |
| concept_weights[concept] += 1 | |
| # Relations avec autres concepts du même chunk | |
| for other_concept in concepts: | |
| if (other_concept != concept and | |
| other_concept not in concept_graph[concept]): | |
| concept_graph[concept].append(other_concept) | |
| return { | |
| "format": "concept_graph_v4", | |
| "version": "4.0.0", | |
| "nodes": list(concept_graph.keys()), | |
| "edges": concept_graph, | |
| "weights": concept_weights, | |
| "statistics": { | |
| "total_concepts": len(concept_graph), | |
| "total_edges": sum(len(edges) for edges in concept_graph.values()), | |
| "avg_connections": round( | |
| sum(len(edges) for edges in concept_graph.values()) / len(concept_graph), 2 | |
| ) if concept_graph else 0 | |
| } | |
| } | |
| def _calculate_extraction_quality_v4(self, chunks: List[SemanticChunk]) -> float: | |
| """Calcul qualité extraction v4.0""" | |
| if not chunks: | |
| return 0.0 | |
| total_confidence = sum(chunk.metadata.get("confidence_score") or 0.5 for chunk in chunks) | |
| avg_confidence = total_confidence / len(chunks) | |
| concept_coverage = sum(1 for chunk in chunks if chunk.metadata.get("main_concepts")) / len(chunks) | |
| keyword_coverage = sum(1 for chunk in chunks if chunk.metadata.get("keywords")) / len(chunks) | |
| quality_score = (avg_confidence * 0.5 + concept_coverage * 0.3 + keyword_coverage * 0.2) | |
| return round(quality_score, 3) | |
| def _extract_domains_v4(self, chunks: List[SemanticChunk]) -> List[str]: | |
| """Extraction domaines spécialisation v4.0""" | |
| import collections | |
| all_concepts = [] | |
| for chunk in chunks: | |
| all_concepts.extend(chunk.metadata.get("main_concepts")) | |
| if not all_concepts: | |
| return [] | |
| concept_counts = collections.Counter(all_concepts) | |
| min_frequency = max(2, len(chunks) // 10) | |
| domains = [ | |
| concept for concept, count in concept_counts.most_common(10) | |
| if count >= min_frequency | |
| ] | |
| return domains | |
| def _build_hierarchy_levels_v4(self, chunks: List[SemanticChunk]): | |
| """Construction structure hiérarchique v4.0""" | |
| from schemas import HierarchyLevel | |
| hierarchy = {} | |
| # Groupement par niveau | |
| for chunk in chunks: | |
| level = chunk.metadata.get("level") | |
| if level not in hierarchy: | |
| hierarchy[level] = [] | |
| hierarchy[level].append(chunk) | |
| # Construction niveaux hiérarchiques | |
| hierarchy_levels = [] | |
| for level, level_chunks in sorted(hierarchy.items()): | |
| total_tokens = sum(c.metadata.tokens_count for c in level_chunks) | |
| avg_chunk_size = total_tokens / len(level_chunks) if level_chunks else 0 | |
| hierarchy_level = HierarchyLevel( | |
| level=level, | |
| level_name=self._determine_chunk_level_v4(level), | |
| chunks=level_chunks, | |
| total_tokens=total_tokens, | |
| avg_chunk_size=round(avg_chunk_size, 2) | |
| ) | |
| hierarchy_levels.append(hierarchy_level) | |
| return hierarchy_levels | |
| def _build_source_metadata_v4(self, request: ChunkRequest) -> Dict[str, Any]: | |
| """Construction métadonnées source v4.0""" | |
| return { | |
| "source_id": request.source_id, | |
| "titre": request.titre, | |
| "source": request.source, | |
| "type": request.type.value if request.type else "text", | |
| "processing_timestamp": time.time(), | |
| "chunker_version": "4.0.0", | |
| "total_input_length": len(request.text), | |
| "preprocessing_applied": True, | |
| "chonkie_enabled": CHONKIE_AVAILABLE | |
| } | |
| async def _cleanup_memory_v4(self): | |
| """Nettoyage mémoire optimisé HF Space v4.0""" | |
| memory_config = self.config.get("performance", {}).get("memory", {}) | |
| if memory_config.get("enable_garbage_collection", True): | |
| # Nettoyage caches si trop volumineux | |
| max_cache_mb = memory_config.get("max_cache_size_mb", 100) | |
| # Estimation taille cache | |
| cache_size_estimate = ( | |
| len(self._embedding_cache) * 0.1 + | |
| len(self._concept_cache) * 0.01 + | |
| len(self._text_cache) * 0.05 | |
| ) | |
| if cache_size_estimate > max_cache_mb: | |
| # Nettoyage partiel LRU | |
| cache_limit = max_cache_mb // 3 | |
| self._embedding_cache = dict(list(self._embedding_cache.items())[-cache_limit:]) | |
| self._concept_cache = dict(list(self._concept_cache.items())[-cache_limit:]) | |
| self._text_cache = dict(list(self._text_cache.items())[-cache_limit:]) | |
| logger.info(f"🧹 Cache nettoyé - taille réduite à ~{max_cache_mb//3}MB") | |
| # Garbage collection Python | |
| gc.collect() | |
| async def health_check_v4(self) -> Dict[str, Any]: | |
| """Vérification santé complète v4.0""" | |
| health_status = { | |
| "status": "unknown", | |
| "checks": {}, | |
| "timestamp": time.time(), | |
| "version": "4.0.0" | |
| } | |
| try: | |
| # Test initialisation | |
| health_status["checks"]["initialization"] = self._is_initialized | |
| if not self._is_initialized: | |
| health_status["status"] = "not_initialized" | |
| return health_status | |
| # Test LLM local | |
| if self.llm: | |
| try: | |
| test_response = await asyncio.wait_for( | |
| self.llm.acomplete("Test santé"), | |
| timeout=10 | |
| ) | |
| health_status["checks"]["llm"] = bool(test_response and test_response.text) | |
| except Exception as e: | |
| health_status["checks"]["llm"] = False | |
| health_status["checks"]["llm_error"] = str(e) | |
| else: | |
| health_status["checks"]["llm"] = False | |
| # Test Embedding | |
| if self.embed_model: | |
| try: | |
| # test_embedding = self.embed_model.get_text_embedding("test santé") | |
| test_embedding = await self.embed_model._aget_text_embedding("test santé") | |
| health_status["checks"]["embedding"] = bool(test_embedding and len(test_embedding) > 0) | |
| except Exception as e: | |
| health_status["checks"]["embedding"] = False | |
| health_status["checks"]["embedding_error"] = str(e) | |
| else: | |
| health_status["checks"]["embedding"] = False | |
| # Test Chonkie | |
| health_status["checks"]["chonkie_available"] = CHONKIE_AVAILABLE | |
| health_status["checks"]["chonkie_semantic"] = self.chonkie_semantic is not None | |
| health_status["checks"]["chonkie_recursive"] = self.chonkie_recursive is not None | |
| # Test Parsers LlamaIndex | |
| health_status["checks"]["sentence_splitter"] = self.sentence_splitter is not None | |
| health_status["checks"]["markdown_parser"] = self.markdown_parser is not None | |
| # Test Cache | |
| health_status["checks"]["cache_functional"] = True | |
| # Test Mémoire | |
| memory_info = self.get_memory_usage_v4() | |
| max_memory = self.config.get("performance", {}).get("memory", {}).get("max_memory_mb", 1800) | |
| if "memory_usage_mb" in memory_info: | |
| memory_ok = memory_info["memory_usage_mb"] < max_memory * 0.9 | |
| health_status["checks"]["memory"] = memory_ok | |
| health_status["memory_usage"] = memory_info["memory_usage_mb"] | |
| else: | |
| health_status["checks"]["memory"] = True | |
| # Status global | |
| critical_checks = ["initialization", "embedding", "sentence_splitter", "markdown_parser"] | |
| critical_passed = all(health_status["checks"].get(check, False) for check in critical_checks) | |
| optional_checks = ["llm", "memory", "chonkie_available"] | |
| optional_passed = sum(health_status["checks"].get(check, False) for check in optional_checks) | |
| if critical_passed and optional_passed >= 2: | |
| health_status["status"] = "healthy" | |
| elif critical_passed: | |
| health_status["status"] = "degraded" | |
| else: | |
| health_status["status"] = "unhealthy" | |
| return health_status | |
| except Exception as e: | |
| health_status["status"] = "error" | |
| health_status["error"] = str(e) | |
| logger.error(f"❌ Health check v4.0 failed: {e}") | |
| return health_status | |
| def get_memory_usage_v4(self) -> Dict[str, Any]: | |
| """Monitoring mémoire détaillé v4.0""" | |
| try: | |
| import psutil | |
| process = psutil.Process() | |
| memory_info = process.memory_info() | |
| return { | |
| "memory_usage_mb": round(memory_info.rss / 1024 / 1024, 2), | |
| "memory_percent": round(process.memory_percent(), 2), | |
| "cpu_percent": round(process.cpu_percent(), 2), | |
| "cache_sizes": { | |
| "embedding_cache": len(self._embedding_cache), | |
| "concept_cache": len(self._concept_cache), | |
| "text_cache": len(self._text_cache), | |
| "chunk_registry": len(self._chunk_registry) | |
| }, | |
| "system_info": { | |
| "is_initialized": self._is_initialized, | |
| "config_loaded": bool(self.config), | |
| "llm_available": self.llm is not None, | |
| "embed_model_available": self.embed_model is not None, | |
| "chonkie_available": CHONKIE_AVAILABLE | |
| }, | |
| "thresholds": { | |
| "max_memory_mb": self.config.get("performance", {}).get("memory", {}).get("max_memory_mb", 1800), | |
| "cache_limit_mb": self.config.get("performance", {}).get("caching", {}).get("max_cache_size_mb", 100) | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"Unable to get memory info: {e}", | |
| "fallback_info": { | |
| "is_initialized": self._is_initialized, | |
| "cache_sizes": { | |
| "embedding_cache": len(self._embedding_cache), | |
| "concept_cache": len(self._concept_cache), | |
| "text_cache": len(self._text_cache), | |
| "chunk_registry": len(self._chunk_registry) | |
| } | |
| } | |
| } | |
| async def get_config_info_v4(self) -> Dict[str, Any]: | |
| """Informations détaillées configuration v4.0""" | |
| return { | |
| "config_source": self.config_path, | |
| "config_loaded": bool(self.config), | |
| "version": "4.0.0", | |
| "models": { | |
| "llm_model": self.config.get("models", {}).get("llm", {}).get("model_name", "unknown"), | |
| "embedding_model": self.config.get("models", {}).get("embedding", {}).get("model_name", "unknown"), | |
| "chonkie_available": CHONKIE_AVAILABLE | |
| }, | |
| "chunking_config": { | |
| "chonkie_semantic_enabled": self.config.get("chunking", {}).get("chonkie", {}).get("semantic", {}).get("enabled", False), | |
| "chonkie_recursive_enabled": self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}).get("enabled", False), | |
| "hierarchical_levels": len(self.config.get("chunking", {}).get("chonkie", {}).get("recursive", {}).get("chunk_sizes", [])), | |
| "structure_detection": { | |
| "markdown": self.config.get("chunking", {}).get("structure_detection", {}).get("markdown", {}).get("enabled", False), | |
| "chapters": self.config.get("chunking", {}).get("structure_detection", {}).get("chapters", {}).get("enabled", False) | |
| } | |
| }, | |
| "obsidian_config": { | |
| "parent_format": self.config.get("obsidian", {}).get("parent_format", "[[{title}]], {id}"), | |
| "bidirectional_links": self.config.get("obsidian", {}).get("use_bidirectional_links", True) | |
| }, | |
| "performance_config": { | |
| "max_workers": self.config.get("performance", {}).get("concurrency", {}).get("max_workers", 1), | |
| "memory_limit_mb": self.config.get("performance", {}).get("memory", {}).get("max_memory_mb", 1800), | |
| "caching_enabled": self.config.get("performance", {}).get("caching", {}).get("enabled", True) | |
| } | |
| } | |
| async def cleanup(self): | |
| """Nettoyage complet des ressources v4.0""" | |
| try: | |
| # Nettoyage caches | |
| self._embedding_cache.clear() | |
| self._concept_cache.clear() | |
| self._text_cache.clear() | |
| self._chunk_registry.clear() | |
| # Nettoyage modèles | |
| if hasattr(self.embed_model, 'cleanup'): | |
| self.embed_model.cleanup() | |
| if hasattr(self.llm, 'cleanup'): | |
| self.llm.cleanup() | |
| # Nettoyage threading | |
| if self.executor: | |
| self.executor.shutdown(wait=False) | |
| # Garbage collection final | |
| gc.collect() | |
| logger.info("🧹 SmartChunkerPipeline v4.0 nettoyé complètement") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Erreur lors du nettoyage: {e}") | |
| def __str__(self) -> str: | |
| """Représentation string du pipeline v4.0""" | |
| return f"SmartChunkerPipeline v4.0 (initialized: {self._is_initialized}, chonkie: {CHONKIE_AVAILABLE})" | |
| def __repr__(self) -> str: | |
| """Représentation détaillée du pipeline v4.0""" | |
| return (f"SmartChunkerPipeline(" | |
| f"config_path='{self.config_path}', " | |
| f"initialized={self._is_initialized}, " | |
| f"llm_available={self.llm is not None}, " | |
| f"embed_model_available={self.embed_model is not None}, " | |
| f"chonkie_available={CHONKIE_AVAILABLE})") | |
| # ===== POINT D'ENTRÉE POUR TESTS ET UTILISATION DIRECTE ===== | |
| if __name__ == "__main__": | |
| import asyncio | |
| async def test_pipeline_v4(): | |
| """Test rapide du pipeline v4.0""" | |
| pipeline = SmartChunkerPipeline() | |
| try: | |
| await pipeline.initialize() | |
| print("✅ Pipeline v4.0 initialisé avec succès") | |
| # Test health check | |
| health = await pipeline.health_check_v4() | |
| print(f"🏥 Status santé: {health['status']}") | |
| # Test mémoire | |
| memory = pipeline.get_memory_usage_v4() | |
| print(f"💾 Mémoire: {memory.get('memory_usage_mb', 'N/A')} MB") | |
| # Test configuration | |
| config_info = await pipeline.get_config_info_v4() | |
| print(f"⚙️ Modèles: LLM={config_info['models']['llm_model']}, Embed={config_info['models']['embedding_model']}") | |
| print(f"🔧 Chonkie: {config_info['models']['chonkie_available']}") | |
| # Test chunking simple | |
| from schemas import ChunkRequest | |
| test_request = ChunkRequest( | |
| text="Ceci est un test de chunking sémantique intelligent. Il contient plusieurs phrases pour tester la fonctionnalité. Le système doit créer des chunks cohérents et maintenir les relations hiérarchiques.", | |
| titre="Test Chunking v4.0", | |
| source_id="test_001" | |
| ) | |
| result = await pipeline.process_text(test_request) | |
| print(f"📝 Test chunking: {result.total_chunks} chunks générés en {result.processing_time:.2f}s") | |
| except Exception as e: | |
| print(f"❌ Erreur test v4.0: {e}") | |
| finally: | |
| await pipeline.cleanup() | |
| # Exécution du test | |
| print("🚀 Test SmartChunkerPipeline v4.0...") | |
| asyncio.run(test_pipeline_v4()) | |