Spaces:
Sleeping
Sleeping
| import re | |
| import logging | |
| from typing import List, Optional | |
| import chromadb | |
| from chromadb.config import Settings | |
| from sentence_transformers import SentenceTransformer | |
| import os | |
| logger = logging.getLogger(__name__) | |
| class DocumentProcessor: | |
| """ | |
| Service pour le traitement des documents: nettoyage, chunking et indexation dans ChromaDB. | |
| """ | |
| def __init__(self): | |
| # Configuration de ChromaDB | |
| self.chroma_client = chromadb.PersistentClient( | |
| path="./storage/chroma", | |
| settings=Settings(anonymized_telemetry=False) | |
| ) | |
| # Initialisation du modèle d'embeddings | |
| try: | |
| self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| logger.info("Modèle d'embeddings all-MiniLM-L6-v2 chargé avec succès") | |
| except Exception as e: | |
| logger.error(f"Erreur lors du chargement du modèle d'embeddings: {str(e)}") | |
| raise | |
| # Paramètres de chunking | |
| self.chunk_size = 1000 | |
| self.chunk_overlap = 200 | |
| async def process_and_index(self, markdown_content: str, site_id: str) -> int: | |
| """ | |
| Traite le contenu Markdown et l'indexe dans ChromaDB. | |
| Args: | |
| markdown_content: Contenu Markdown brut | |
| site_id: Identifiant unique pour la collection | |
| Returns: | |
| Nombre de chunks indexés | |
| """ | |
| try: | |
| logger.info(f"Début du processing pour le site: {site_id}") | |
| # Étape 1: Nettoyage du contenu | |
| cleaned_content = self._clean_markdown(markdown_content) | |
| logger.info(f"Nettoyage terminé - {len(cleaned_content)} caractères") | |
| # Étape 2: Chunking | |
| chunks = self._chunk_text(cleaned_content) | |
| logger.info(f"Chunking terminé - {len(chunks)} chunks créés") | |
| if not chunks: | |
| logger.warning("Aucun chunk généré après nettoyage") | |
| return 0 | |
| # Étape 3: Indexation dans ChromaDB | |
| chunks_indexed = await self._index_chunks(chunks, site_id) | |
| logger.info(f"Indexation terminée - {chunks_indexed} chunks indexés") | |
| return chunks_indexed | |
| except Exception as e: | |
| logger.error(f"Erreur lors du processing du document: {str(e)}") | |
| raise | |
| def _clean_markdown(self, content: str) -> str: | |
| """ | |
| Clean Markdown content by removing boilerplate without destroying | |
| useful e-commerce data (prices, links, tables, images). | |
| """ | |
| try: | |
| # Remove HTML boilerplate tags (keep their inner text where possible) | |
| content = re.sub(r'<nav[^>]*>.*?</nav>', '', content, flags=re.IGNORECASE | re.DOTALL) | |
| content = re.sub(r'<header[^>]*>.*?</header>', '', content, flags=re.IGNORECASE | re.DOTALL) | |
| content = re.sub(r'<footer[^>]*>.*?</footer>', '', content, flags=re.IGNORECASE | re.DOTALL) | |
| content = re.sub(r'<script[^>]*>.*?</script>', '', content, flags=re.IGNORECASE | re.DOTALL) | |
| content = re.sub(r'<style[^>]*>.*?</style>', '', content, flags=re.IGNORECASE | re.DOTALL) | |
| # Remove remaining HTML tags but keep their text content | |
| content = re.sub(r'<[^>]+>', ' ', content) | |
| # Remove YAML front matter | |
| content = re.sub(r'^---\n.*?\n---\n', '', content, flags=re.DOTALL) | |
| # Collapse 3+ blank lines into 2 | |
| content = re.sub(r'\n{3,}', '\n\n', content) | |
| # Normalize spaces within each line (preserve newlines for tables/structure) | |
| lines = content.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line = re.sub(r'[ \t]+', ' ', line).strip() | |
| # Keep lines with meaningful content (>3 chars) | |
| if len(line) > 3: | |
| cleaned_lines.append(line) | |
| cleaned_content = '\n'.join(cleaned_lines) | |
| if len(cleaned_content.strip()) < 100: | |
| logger.warning("Cleaned content is very short — site may be JS-heavy or poorly crawled") | |
| return cleaned_content.strip() | |
| except Exception as e: | |
| logger.error(f"Error cleaning markdown: {str(e)}") | |
| return content | |
| def _extract_page_title(self, content: str) -> str: | |
| """ | |
| Extract the first H1 or H2 heading from markdown content as the page title. | |
| """ | |
| match = re.search(r'^#{1,2}\s+(.+)$', content, flags=re.MULTILINE) | |
| if match: | |
| return match.group(1).strip() | |
| # Fallback: first non-empty line | |
| for line in content.split('\n'): | |
| line = line.strip() | |
| if len(line) > 5: | |
| return line[:80] | |
| return "" | |
| def _chunk_text(self, text: str) -> List[str]: | |
| """ | |
| Split text into chunks using semantic boundaries first (headings, separators), | |
| then fall back to character-based splitting for large sections. | |
| Preserves table rows and product card blocks together. | |
| """ | |
| try: | |
| if not text or len(text.strip()) == 0: | |
| return [] | |
| # Step 1: split on heading boundaries (H1, H2) or horizontal rules | |
| raw_sections = re.split(r'\n(?=#{1,2} )', text) | |
| all_chunks = [] | |
| for section in raw_sections: | |
| section = section.strip() | |
| if not section: | |
| continue | |
| if len(section) <= self.chunk_size: | |
| all_chunks.append(section) | |
| else: | |
| # Section is too large — split by characters with overlap | |
| all_chunks.extend(self._split_by_chars(section)) | |
| # Filter out chunks that are too short to be meaningful | |
| result = [] | |
| for chunk in all_chunks: | |
| chunk = chunk.strip() | |
| if len(chunk) >= 50: | |
| result.append(chunk) | |
| logger.info( | |
| f"Chunking: {len(result)} chunks " | |
| f"(avg {sum(len(c) for c in result) // len(result) if result else 0} chars)" | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error chunking text: {str(e)}") | |
| return [] | |
| def _split_by_chars(self, text: str) -> List[str]: | |
| """ | |
| Character-based splitting with overlap — used as fallback for large sections. | |
| """ | |
| chunks = [] | |
| start = 0 | |
| text_length = len(text) | |
| while start < text_length: | |
| end = min(start + self.chunk_size, text_length) | |
| if end < text_length: | |
| last_space = text.rfind(' ', start, end) | |
| if last_space > start + (self.chunk_size // 2): | |
| end = last_space | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| if end >= text_length: | |
| break | |
| start = max(start + 1, end - self.chunk_overlap) | |
| return chunks | |
| async def process_and_index_records(self, records: List[dict], site_id: str) -> int: | |
| """ | |
| Process structured page records [{url, markdown}, ...] and index into ChromaDB. | |
| Stores source_url and page_title per chunk for richer retrieval. | |
| Args: | |
| records: List of page records with url and markdown fields | |
| site_id: Unique identifier for the ChromaDB collection | |
| Returns: | |
| Number of chunks indexed | |
| """ | |
| try: | |
| logger.info(f"Processing {len(records)} page(s) for site: {site_id}") | |
| all_chunks: List[str] = [] | |
| all_extra_meta: List[dict] = [] | |
| for record in records: | |
| source_url = record.get("url", "") | |
| markdown = record.get("markdown", "") | |
| if not markdown: | |
| continue | |
| cleaned = self._clean_markdown(markdown) | |
| page_title = self._extract_page_title(cleaned) | |
| chunks = self._chunk_text(cleaned) | |
| logger.info(f" {source_url or '(no url)'} → {len(chunks)} chunks") | |
| for chunk in chunks: | |
| all_chunks.append(chunk) | |
| all_extra_meta.append({ | |
| "source_url": source_url, | |
| "page_title": page_title, | |
| }) | |
| if not all_chunks: | |
| logger.warning("No chunks generated from records") | |
| return 0 | |
| return await self._index_chunks(all_chunks, site_id, extra_metadatas=all_extra_meta) | |
| except Exception as e: | |
| logger.error(f"Error in process_and_index_records: {str(e)}") | |
| raise | |
| async def _index_chunks( | |
| self, | |
| chunks: List[str], | |
| site_id: str, | |
| extra_metadatas: Optional[List[dict]] = None | |
| ) -> int: | |
| """ | |
| Index chunks into ChromaDB with embeddings. | |
| Args: | |
| chunks: List of text chunks to index | |
| site_id: Collection identifier | |
| extra_metadatas: Optional list of extra metadata dicts (one per chunk) | |
| Returns: | |
| Number of chunks indexed successfully | |
| """ | |
| try: | |
| # Drop existing collection if it exists | |
| try: | |
| self.chroma_client.delete_collection(name=site_id) | |
| logger.info(f"Existing collection '{site_id}' dropped") | |
| except Exception: | |
| pass | |
| # Create new collection with cosine distance | |
| # (SentenceTransformers produces L2-normalized embeddings → cosine scores in [0,1]) | |
| collection = self.chroma_client.create_collection( | |
| name=site_id, | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| logger.info(f"Collection '{site_id}' created (cosine distance)") | |
| if not chunks: | |
| logger.warning("No chunks to index") | |
| return 0 | |
| # Generate embeddings for all chunks at once | |
| logger.info("Generating embeddings...") | |
| embeddings = self.embedding_model.encode(chunks, convert_to_tensor=False) | |
| metadatas = [] | |
| ids = [] | |
| for i, chunk in enumerate(chunks): | |
| meta = { | |
| "site_id": site_id, | |
| "chunk_index": i, | |
| "chunk_length": len(chunk), | |
| "source_url": "", | |
| "page_title": "", | |
| "preview": chunk[:100] + "..." if len(chunk) > 100 else chunk, | |
| } | |
| if extra_metadatas and i < len(extra_metadatas): | |
| meta.update(extra_metadatas[i]) | |
| metadatas.append(meta) | |
| ids.append(f"{site_id}_chunk_{i}") | |
| collection.add( | |
| documents=chunks, | |
| embeddings=embeddings.tolist(), | |
| metadatas=metadatas, | |
| ids=ids, | |
| ) | |
| count = collection.count() | |
| logger.info(f"Indexed {count} chunks into collection '{site_id}'") | |
| return count | |
| except Exception as e: | |
| logger.error(f"Error indexing chunks into ChromaDB: {str(e)}") | |
| raise | |
| def get_collection(self, site_id: str): | |
| """ | |
| Récupère une collection ChromaDB existante. | |
| Args: | |
| site_id: Identifiant de la collection | |
| Returns: | |
| Collection ChromaDB ou None | |
| """ | |
| try: | |
| collection = self.chroma_client.get_collection(name=site_id) | |
| return collection | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la récupération de la collection '{site_id}': {str(e)}") | |
| return None | |
| def collection_exists(self, site_id: str) -> bool: | |
| """ | |
| Vérifie si une collection existe. | |
| Args: | |
| site_id: Identifiant de la collection | |
| Returns: | |
| True si la collection existe | |
| """ | |
| try: | |
| self.chroma_client.get_collection(name=site_id) | |
| return True | |
| except Exception: | |
| return False | |
| def list_collections(self) -> List[str]: | |
| """ | |
| Liste toutes les collections existantes. | |
| Returns: | |
| Liste des noms de collections | |
| """ | |
| try: | |
| collections = self.chroma_client.list_collections() | |
| return [collection.name for collection in collections] | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la liste des collections: {str(e)}") | |
| return [] | |
| def delete_collection(self, site_id: str) -> bool: | |
| """ | |
| Supprime une collection. | |
| Args: | |
| site_id: Identifiant de la collection à supprimer | |
| Returns: | |
| True si la suppression a réussi | |
| """ | |
| try: | |
| self.chroma_client.delete_collection(name=site_id) | |
| logger.info(f"Collection '{site_id}' supprimée avec succès") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la suppression de la collection '{site_id}': {str(e)}") | |
| return False | |
| async def search_similar_chunks(self, site_id: str, query: str, n_results: int = 3) -> List[dict]: | |
| """ | |
| Recherche les chunks les plus similaires à une requête. | |
| Args: | |
| site_id: Identifiant de la collection | |
| query: Requête de recherche | |
| n_results: Nombre de résultats à retourner | |
| Returns: | |
| Liste des chunks similaires avec leurs métadonnées | |
| """ | |
| try: | |
| collection = self.get_collection(site_id) | |
| if not collection: | |
| logger.error(f"Collection '{site_id}' non trouvée") | |
| return [] | |
| # Générer l'embedding pour la requête | |
| query_embedding = self.embedding_model.encode([query], convert_to_tensor=False) | |
| # Rechercher les documents similaires | |
| results = collection.query( | |
| query_embeddings=query_embedding.tolist(), | |
| n_results=n_results | |
| ) | |
| # Formater les résultats | |
| formatted_results = [] | |
| if results['documents'] and results['documents'][0]: | |
| for i, doc in enumerate(results['documents'][0]): | |
| metadata = results['metadatas'][0][i] if results['metadatas'] and results['metadatas'][0] else {} | |
| distance = results['distances'][0][i] if results['distances'] and results['distances'][0] else 0 | |
| # Cosine collections: distance in [0,1], score = 1-distance in [0,1] | |
| # Legacy L2 collections: clamp to avoid negative scores | |
| score = max(0.0, min(1.0, 1.0 - distance)) | |
| formatted_results.append({ | |
| "content": doc, | |
| "metadata": metadata, | |
| "similarity_score": score, | |
| "chunk_index": metadata.get("chunk_index", i) | |
| }) | |
| logger.info(f"Recherche terminée: {len(formatted_results)} résultats trouvés pour '{query[:50]}...'") | |
| return formatted_results | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la recherche de chunks similaires: {str(e)}") | |
| return [] | |