quick / services /processor.py
FredyHoundayi's picture
MVP2: quickAI — AI Copilot for E-Commerce
b8da9d1
import re
import logging
from typing import List, Optional
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import os
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""
Service pour le traitement des documents: nettoyage, chunking et indexation dans ChromaDB.
"""
def __init__(self):
# Configuration de ChromaDB
self.chroma_client = chromadb.PersistentClient(
path="./storage/chroma",
settings=Settings(anonymized_telemetry=False)
)
# Initialisation du modèle d'embeddings
try:
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
logger.info("Modèle d'embeddings all-MiniLM-L6-v2 chargé avec succès")
except Exception as e:
logger.error(f"Erreur lors du chargement du modèle d'embeddings: {str(e)}")
raise
# Paramètres de chunking
self.chunk_size = 1000
self.chunk_overlap = 200
async def process_and_index(self, markdown_content: str, site_id: str) -> int:
"""
Traite le contenu Markdown et l'indexe dans ChromaDB.
Args:
markdown_content: Contenu Markdown brut
site_id: Identifiant unique pour la collection
Returns:
Nombre de chunks indexés
"""
try:
logger.info(f"Début du processing pour le site: {site_id}")
# Étape 1: Nettoyage du contenu
cleaned_content = self._clean_markdown(markdown_content)
logger.info(f"Nettoyage terminé - {len(cleaned_content)} caractères")
# Étape 2: Chunking
chunks = self._chunk_text(cleaned_content)
logger.info(f"Chunking terminé - {len(chunks)} chunks créés")
if not chunks:
logger.warning("Aucun chunk généré après nettoyage")
return 0
# Étape 3: Indexation dans ChromaDB
chunks_indexed = await self._index_chunks(chunks, site_id)
logger.info(f"Indexation terminée - {chunks_indexed} chunks indexés")
return chunks_indexed
except Exception as e:
logger.error(f"Erreur lors du processing du document: {str(e)}")
raise
def _clean_markdown(self, content: str) -> str:
"""
Clean Markdown content by removing boilerplate without destroying
useful e-commerce data (prices, links, tables, images).
"""
try:
# Remove HTML boilerplate tags (keep their inner text where possible)
content = re.sub(r'<nav[^>]*>.*?</nav>', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'<header[^>]*>.*?</header>', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'<footer[^>]*>.*?</footer>', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'<script[^>]*>.*?</script>', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'<style[^>]*>.*?</style>', '', content, flags=re.IGNORECASE | re.DOTALL)
# Remove remaining HTML tags but keep their text content
content = re.sub(r'<[^>]+>', ' ', content)
# Remove YAML front matter
content = re.sub(r'^---\n.*?\n---\n', '', content, flags=re.DOTALL)
# Collapse 3+ blank lines into 2
content = re.sub(r'\n{3,}', '\n\n', content)
# Normalize spaces within each line (preserve newlines for tables/structure)
lines = content.split('\n')
cleaned_lines = []
for line in lines:
line = re.sub(r'[ \t]+', ' ', line).strip()
# Keep lines with meaningful content (>3 chars)
if len(line) > 3:
cleaned_lines.append(line)
cleaned_content = '\n'.join(cleaned_lines)
if len(cleaned_content.strip()) < 100:
logger.warning("Cleaned content is very short — site may be JS-heavy or poorly crawled")
return cleaned_content.strip()
except Exception as e:
logger.error(f"Error cleaning markdown: {str(e)}")
return content
def _extract_page_title(self, content: str) -> str:
"""
Extract the first H1 or H2 heading from markdown content as the page title.
"""
match = re.search(r'^#{1,2}\s+(.+)$', content, flags=re.MULTILINE)
if match:
return match.group(1).strip()
# Fallback: first non-empty line
for line in content.split('\n'):
line = line.strip()
if len(line) > 5:
return line[:80]
return ""
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into chunks using semantic boundaries first (headings, separators),
then fall back to character-based splitting for large sections.
Preserves table rows and product card blocks together.
"""
try:
if not text or len(text.strip()) == 0:
return []
# Step 1: split on heading boundaries (H1, H2) or horizontal rules
raw_sections = re.split(r'\n(?=#{1,2} )', text)
all_chunks = []
for section in raw_sections:
section = section.strip()
if not section:
continue
if len(section) <= self.chunk_size:
all_chunks.append(section)
else:
# Section is too large — split by characters with overlap
all_chunks.extend(self._split_by_chars(section))
# Filter out chunks that are too short to be meaningful
result = []
for chunk in all_chunks:
chunk = chunk.strip()
if len(chunk) >= 50:
result.append(chunk)
logger.info(
f"Chunking: {len(result)} chunks "
f"(avg {sum(len(c) for c in result) // len(result) if result else 0} chars)"
)
return result
except Exception as e:
logger.error(f"Error chunking text: {str(e)}")
return []
def _split_by_chars(self, text: str) -> List[str]:
"""
Character-based splitting with overlap — used as fallback for large sections.
"""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = min(start + self.chunk_size, text_length)
if end < text_length:
last_space = text.rfind(' ', start, end)
if last_space > start + (self.chunk_size // 2):
end = last_space
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
if end >= text_length:
break
start = max(start + 1, end - self.chunk_overlap)
return chunks
async def process_and_index_records(self, records: List[dict], site_id: str) -> int:
"""
Process structured page records [{url, markdown}, ...] and index into ChromaDB.
Stores source_url and page_title per chunk for richer retrieval.
Args:
records: List of page records with url and markdown fields
site_id: Unique identifier for the ChromaDB collection
Returns:
Number of chunks indexed
"""
try:
logger.info(f"Processing {len(records)} page(s) for site: {site_id}")
all_chunks: List[str] = []
all_extra_meta: List[dict] = []
for record in records:
source_url = record.get("url", "")
markdown = record.get("markdown", "")
if not markdown:
continue
cleaned = self._clean_markdown(markdown)
page_title = self._extract_page_title(cleaned)
chunks = self._chunk_text(cleaned)
logger.info(f" {source_url or '(no url)'}{len(chunks)} chunks")
for chunk in chunks:
all_chunks.append(chunk)
all_extra_meta.append({
"source_url": source_url,
"page_title": page_title,
})
if not all_chunks:
logger.warning("No chunks generated from records")
return 0
return await self._index_chunks(all_chunks, site_id, extra_metadatas=all_extra_meta)
except Exception as e:
logger.error(f"Error in process_and_index_records: {str(e)}")
raise
async def _index_chunks(
self,
chunks: List[str],
site_id: str,
extra_metadatas: Optional[List[dict]] = None
) -> int:
"""
Index chunks into ChromaDB with embeddings.
Args:
chunks: List of text chunks to index
site_id: Collection identifier
extra_metadatas: Optional list of extra metadata dicts (one per chunk)
Returns:
Number of chunks indexed successfully
"""
try:
# Drop existing collection if it exists
try:
self.chroma_client.delete_collection(name=site_id)
logger.info(f"Existing collection '{site_id}' dropped")
except Exception:
pass
# Create new collection with cosine distance
# (SentenceTransformers produces L2-normalized embeddings → cosine scores in [0,1])
collection = self.chroma_client.create_collection(
name=site_id,
metadata={"hnsw:space": "cosine"}
)
logger.info(f"Collection '{site_id}' created (cosine distance)")
if not chunks:
logger.warning("No chunks to index")
return 0
# Generate embeddings for all chunks at once
logger.info("Generating embeddings...")
embeddings = self.embedding_model.encode(chunks, convert_to_tensor=False)
metadatas = []
ids = []
for i, chunk in enumerate(chunks):
meta = {
"site_id": site_id,
"chunk_index": i,
"chunk_length": len(chunk),
"source_url": "",
"page_title": "",
"preview": chunk[:100] + "..." if len(chunk) > 100 else chunk,
}
if extra_metadatas and i < len(extra_metadatas):
meta.update(extra_metadatas[i])
metadatas.append(meta)
ids.append(f"{site_id}_chunk_{i}")
collection.add(
documents=chunks,
embeddings=embeddings.tolist(),
metadatas=metadatas,
ids=ids,
)
count = collection.count()
logger.info(f"Indexed {count} chunks into collection '{site_id}'")
return count
except Exception as e:
logger.error(f"Error indexing chunks into ChromaDB: {str(e)}")
raise
def get_collection(self, site_id: str):
"""
Récupère une collection ChromaDB existante.
Args:
site_id: Identifiant de la collection
Returns:
Collection ChromaDB ou None
"""
try:
collection = self.chroma_client.get_collection(name=site_id)
return collection
except Exception as e:
logger.error(f"Erreur lors de la récupération de la collection '{site_id}': {str(e)}")
return None
def collection_exists(self, site_id: str) -> bool:
"""
Vérifie si une collection existe.
Args:
site_id: Identifiant de la collection
Returns:
True si la collection existe
"""
try:
self.chroma_client.get_collection(name=site_id)
return True
except Exception:
return False
def list_collections(self) -> List[str]:
"""
Liste toutes les collections existantes.
Returns:
Liste des noms de collections
"""
try:
collections = self.chroma_client.list_collections()
return [collection.name for collection in collections]
except Exception as e:
logger.error(f"Erreur lors de la liste des collections: {str(e)}")
return []
def delete_collection(self, site_id: str) -> bool:
"""
Supprime une collection.
Args:
site_id: Identifiant de la collection à supprimer
Returns:
True si la suppression a réussi
"""
try:
self.chroma_client.delete_collection(name=site_id)
logger.info(f"Collection '{site_id}' supprimée avec succès")
return True
except Exception as e:
logger.error(f"Erreur lors de la suppression de la collection '{site_id}': {str(e)}")
return False
async def search_similar_chunks(self, site_id: str, query: str, n_results: int = 3) -> List[dict]:
"""
Recherche les chunks les plus similaires à une requête.
Args:
site_id: Identifiant de la collection
query: Requête de recherche
n_results: Nombre de résultats à retourner
Returns:
Liste des chunks similaires avec leurs métadonnées
"""
try:
collection = self.get_collection(site_id)
if not collection:
logger.error(f"Collection '{site_id}' non trouvée")
return []
# Générer l'embedding pour la requête
query_embedding = self.embedding_model.encode([query], convert_to_tensor=False)
# Rechercher les documents similaires
results = collection.query(
query_embeddings=query_embedding.tolist(),
n_results=n_results
)
# Formater les résultats
formatted_results = []
if results['documents'] and results['documents'][0]:
for i, doc in enumerate(results['documents'][0]):
metadata = results['metadatas'][0][i] if results['metadatas'] and results['metadatas'][0] else {}
distance = results['distances'][0][i] if results['distances'] and results['distances'][0] else 0
# Cosine collections: distance in [0,1], score = 1-distance in [0,1]
# Legacy L2 collections: clamp to avoid negative scores
score = max(0.0, min(1.0, 1.0 - distance))
formatted_results.append({
"content": doc,
"metadata": metadata,
"similarity_score": score,
"chunk_index": metadata.get("chunk_index", i)
})
logger.info(f"Recherche terminée: {len(formatted_results)} résultats trouvés pour '{query[:50]}...'")
return formatted_results
except Exception as e:
logger.error(f"Erreur lors de la recherche de chunks similaires: {str(e)}")
return []