import re import logging from typing import List, Optional import chromadb from chromadb.config import Settings from sentence_transformers import SentenceTransformer import os logger = logging.getLogger(__name__) class DocumentProcessor: """ Service pour le traitement des documents: nettoyage, chunking et indexation dans ChromaDB. """ def __init__(self): # Configuration de ChromaDB self.chroma_client = chromadb.PersistentClient( path="./storage/chroma", settings=Settings(anonymized_telemetry=False) ) # Initialisation du modèle d'embeddings try: self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') logger.info("Modèle d'embeddings all-MiniLM-L6-v2 chargé avec succès") except Exception as e: logger.error(f"Erreur lors du chargement du modèle d'embeddings: {str(e)}") raise # Paramètres de chunking self.chunk_size = 1000 self.chunk_overlap = 200 async def process_and_index(self, markdown_content: str, site_id: str) -> int: """ Traite le contenu Markdown et l'indexe dans ChromaDB. Args: markdown_content: Contenu Markdown brut site_id: Identifiant unique pour la collection Returns: Nombre de chunks indexés """ try: logger.info(f"Début du processing pour le site: {site_id}") # Étape 1: Nettoyage du contenu cleaned_content = self._clean_markdown(markdown_content) logger.info(f"Nettoyage terminé - {len(cleaned_content)} caractères") # Étape 2: Chunking chunks = self._chunk_text(cleaned_content) logger.info(f"Chunking terminé - {len(chunks)} chunks créés") if not chunks: logger.warning("Aucun chunk généré après nettoyage") return 0 # Étape 3: Indexation dans ChromaDB chunks_indexed = await self._index_chunks(chunks, site_id) logger.info(f"Indexation terminée - {chunks_indexed} chunks indexés") return chunks_indexed except Exception as e: logger.error(f"Erreur lors du processing du document: {str(e)}") raise def _clean_markdown(self, content: str) -> str: """ Clean Markdown content by removing boilerplate without destroying useful e-commerce data (prices, links, tables, images). """ try: # Remove HTML boilerplate tags (keep their inner text where possible) content = re.sub(r']*>.*?', '', content, flags=re.IGNORECASE | re.DOTALL) content = re.sub(r']*>.*?', '', content, flags=re.IGNORECASE | re.DOTALL) content = re.sub(r']*>.*?', '', content, flags=re.IGNORECASE | re.DOTALL) content = re.sub(r']*>.*?', '', content, flags=re.IGNORECASE | re.DOTALL) content = re.sub(r']*>.*?', '', content, flags=re.IGNORECASE | re.DOTALL) # Remove remaining HTML tags but keep their text content content = re.sub(r'<[^>]+>', ' ', content) # Remove YAML front matter content = re.sub(r'^---\n.*?\n---\n', '', content, flags=re.DOTALL) # Collapse 3+ blank lines into 2 content = re.sub(r'\n{3,}', '\n\n', content) # Normalize spaces within each line (preserve newlines for tables/structure) lines = content.split('\n') cleaned_lines = [] for line in lines: line = re.sub(r'[ \t]+', ' ', line).strip() # Keep lines with meaningful content (>3 chars) if len(line) > 3: cleaned_lines.append(line) cleaned_content = '\n'.join(cleaned_lines) if len(cleaned_content.strip()) < 100: logger.warning("Cleaned content is very short — site may be JS-heavy or poorly crawled") return cleaned_content.strip() except Exception as e: logger.error(f"Error cleaning markdown: {str(e)}") return content def _extract_page_title(self, content: str) -> str: """ Extract the first H1 or H2 heading from markdown content as the page title. """ match = re.search(r'^#{1,2}\s+(.+)$', content, flags=re.MULTILINE) if match: return match.group(1).strip() # Fallback: first non-empty line for line in content.split('\n'): line = line.strip() if len(line) > 5: return line[:80] return "" def _chunk_text(self, text: str) -> List[str]: """ Split text into chunks using semantic boundaries first (headings, separators), then fall back to character-based splitting for large sections. Preserves table rows and product card blocks together. """ try: if not text or len(text.strip()) == 0: return [] # Step 1: split on heading boundaries (H1, H2) or horizontal rules raw_sections = re.split(r'\n(?=#{1,2} )', text) all_chunks = [] for section in raw_sections: section = section.strip() if not section: continue if len(section) <= self.chunk_size: all_chunks.append(section) else: # Section is too large — split by characters with overlap all_chunks.extend(self._split_by_chars(section)) # Filter out chunks that are too short to be meaningful result = [] for chunk in all_chunks: chunk = chunk.strip() if len(chunk) >= 50: result.append(chunk) logger.info( f"Chunking: {len(result)} chunks " f"(avg {sum(len(c) for c in result) // len(result) if result else 0} chars)" ) return result except Exception as e: logger.error(f"Error chunking text: {str(e)}") return [] def _split_by_chars(self, text: str) -> List[str]: """ Character-based splitting with overlap — used as fallback for large sections. """ chunks = [] start = 0 text_length = len(text) while start < text_length: end = min(start + self.chunk_size, text_length) if end < text_length: last_space = text.rfind(' ', start, end) if last_space > start + (self.chunk_size // 2): end = last_space chunk = text[start:end].strip() if chunk: chunks.append(chunk) if end >= text_length: break start = max(start + 1, end - self.chunk_overlap) return chunks async def process_and_index_records(self, records: List[dict], site_id: str) -> int: """ Process structured page records [{url, markdown}, ...] and index into ChromaDB. Stores source_url and page_title per chunk for richer retrieval. Args: records: List of page records with url and markdown fields site_id: Unique identifier for the ChromaDB collection Returns: Number of chunks indexed """ try: logger.info(f"Processing {len(records)} page(s) for site: {site_id}") all_chunks: List[str] = [] all_extra_meta: List[dict] = [] for record in records: source_url = record.get("url", "") markdown = record.get("markdown", "") if not markdown: continue cleaned = self._clean_markdown(markdown) page_title = self._extract_page_title(cleaned) chunks = self._chunk_text(cleaned) logger.info(f" {source_url or '(no url)'} → {len(chunks)} chunks") for chunk in chunks: all_chunks.append(chunk) all_extra_meta.append({ "source_url": source_url, "page_title": page_title, }) if not all_chunks: logger.warning("No chunks generated from records") return 0 return await self._index_chunks(all_chunks, site_id, extra_metadatas=all_extra_meta) except Exception as e: logger.error(f"Error in process_and_index_records: {str(e)}") raise async def _index_chunks( self, chunks: List[str], site_id: str, extra_metadatas: Optional[List[dict]] = None ) -> int: """ Index chunks into ChromaDB with embeddings. Args: chunks: List of text chunks to index site_id: Collection identifier extra_metadatas: Optional list of extra metadata dicts (one per chunk) Returns: Number of chunks indexed successfully """ try: # Drop existing collection if it exists try: self.chroma_client.delete_collection(name=site_id) logger.info(f"Existing collection '{site_id}' dropped") except Exception: pass # Create new collection with cosine distance # (SentenceTransformers produces L2-normalized embeddings → cosine scores in [0,1]) collection = self.chroma_client.create_collection( name=site_id, metadata={"hnsw:space": "cosine"} ) logger.info(f"Collection '{site_id}' created (cosine distance)") if not chunks: logger.warning("No chunks to index") return 0 # Generate embeddings for all chunks at once logger.info("Generating embeddings...") embeddings = self.embedding_model.encode(chunks, convert_to_tensor=False) metadatas = [] ids = [] for i, chunk in enumerate(chunks): meta = { "site_id": site_id, "chunk_index": i, "chunk_length": len(chunk), "source_url": "", "page_title": "", "preview": chunk[:100] + "..." if len(chunk) > 100 else chunk, } if extra_metadatas and i < len(extra_metadatas): meta.update(extra_metadatas[i]) metadatas.append(meta) ids.append(f"{site_id}_chunk_{i}") collection.add( documents=chunks, embeddings=embeddings.tolist(), metadatas=metadatas, ids=ids, ) count = collection.count() logger.info(f"Indexed {count} chunks into collection '{site_id}'") return count except Exception as e: logger.error(f"Error indexing chunks into ChromaDB: {str(e)}") raise def get_collection(self, site_id: str): """ Récupère une collection ChromaDB existante. Args: site_id: Identifiant de la collection Returns: Collection ChromaDB ou None """ try: collection = self.chroma_client.get_collection(name=site_id) return collection except Exception as e: logger.error(f"Erreur lors de la récupération de la collection '{site_id}': {str(e)}") return None def collection_exists(self, site_id: str) -> bool: """ Vérifie si une collection existe. Args: site_id: Identifiant de la collection Returns: True si la collection existe """ try: self.chroma_client.get_collection(name=site_id) return True except Exception: return False def list_collections(self) -> List[str]: """ Liste toutes les collections existantes. Returns: Liste des noms de collections """ try: collections = self.chroma_client.list_collections() return [collection.name for collection in collections] except Exception as e: logger.error(f"Erreur lors de la liste des collections: {str(e)}") return [] def delete_collection(self, site_id: str) -> bool: """ Supprime une collection. Args: site_id: Identifiant de la collection à supprimer Returns: True si la suppression a réussi """ try: self.chroma_client.delete_collection(name=site_id) logger.info(f"Collection '{site_id}' supprimée avec succès") return True except Exception as e: logger.error(f"Erreur lors de la suppression de la collection '{site_id}': {str(e)}") return False async def search_similar_chunks(self, site_id: str, query: str, n_results: int = 3) -> List[dict]: """ Recherche les chunks les plus similaires à une requête. Args: site_id: Identifiant de la collection query: Requête de recherche n_results: Nombre de résultats à retourner Returns: Liste des chunks similaires avec leurs métadonnées """ try: collection = self.get_collection(site_id) if not collection: logger.error(f"Collection '{site_id}' non trouvée") return [] # Générer l'embedding pour la requête query_embedding = self.embedding_model.encode([query], convert_to_tensor=False) # Rechercher les documents similaires results = collection.query( query_embeddings=query_embedding.tolist(), n_results=n_results ) # Formater les résultats formatted_results = [] if results['documents'] and results['documents'][0]: for i, doc in enumerate(results['documents'][0]): metadata = results['metadatas'][0][i] if results['metadatas'] and results['metadatas'][0] else {} distance = results['distances'][0][i] if results['distances'] and results['distances'][0] else 0 # Cosine collections: distance in [0,1], score = 1-distance in [0,1] # Legacy L2 collections: clamp to avoid negative scores score = max(0.0, min(1.0, 1.0 - distance)) formatted_results.append({ "content": doc, "metadata": metadata, "similarity_score": score, "chunk_index": metadata.get("chunk_index", i) }) logger.info(f"Recherche terminée: {len(formatted_results)} résultats trouvés pour '{query[:50]}...'") return formatted_results except Exception as e: logger.error(f"Erreur lors de la recherche de chunks similaires: {str(e)}") return []