from sentence_transformers import SentenceTransformer from config import dense_index as indexA from config import * import zlib import base64 def split_text_into_chunks(text, max_chunk_size=1024): """Divise le texte en morceaux de taille maximale spécifiée.""" return [text[i:i+max_chunk_size] for i in range(0, len(text), max_chunk_size)] def decompress_text(compressed_text): """Décompresse un texte compressé en base64.""" decoded = base64.b64decode(compressed_text.encode('ascii')) return zlib.decompress(decoded).decode('utf-8') def compress_text(text): """Compresse le texte et le encode en base64.""" compressed = zlib.compress(text.encode('utf-8')) return base64.b64encode(compressed).decode('ascii') def get_metadata_size(metadata): """Calcule la taille des métadonnées en bytes.""" return len(str(metadata).encode('utf-8')) def get_existing_vectors(index): """Récupère les textes compressés déjà indexés dans Pinecone.""" existing_texts = set() try: # Récupérer les vecteurs existants (par exemple, les 10 000 premiers) results = index.query(vector=[0] * 1024, top_k=10000, include_metadata=True) for match in results.get("matches", []): if "metadata" in match and "compressed_text" in match["metadata"]: existing_texts.add(match["metadata"]["compressed_text"]) except Exception as e: print(f"Erreur lors de la récupération des vecteurs existants : {e}") return existing_texts def index_pdf(texts): """Indexe les textes dans l'index dense en évitant les doublons.""" vectors = model.encode(texts) # Récupérer les textes déjà indexés existing_texts = get_existing_vectors(indexA) for i, (vector, chunk) in enumerate(zip(vectors, texts)): # Diviser le texte en morceaux de 1024 caractères chunks = split_text_into_chunks(chunk, max_chunk_size=1024) for j, small_chunk in enumerate(chunks): # Compresser le morceau compressed_chunk = compress_text(small_chunk) # Vérifier si ce texte est déjà indexé if compressed_chunk in existing_texts: print(f"Le texte '{small_chunk[:2000]}...' est déjà indexé. Ignorer.") continue metadata = {"compressed_text": compressed_chunk} metadata_size = get_metadata_size(metadata) if metadata_size > 40960: # 40 KB print(f"Attention : la taille des métadonnées ({metadata_size} bytes) dépasse la limite de 40960 bytes.") small_chunk = small_chunk[:512] # Réduire à 512 caractères compressed_chunk = compress_text(small_chunk) metadata = {"compressed_text": compressed_chunk} metadata_size = get_metadata_size(metadata) if metadata_size > 40960: print("Impossible de réduire suffisamment la taille des métadonnées. Ignorer ce morceau.") continue # Insérer dans Pinecone indexA.upsert([(f"vec_{i}_{j}", vector.tolist(), metadata)]) print(f"Indexation réussie pour le morceau '{small_chunk[:2000]}...'") def retrieve_documents(query, k, similarity_threshold): """Récupère les documents pertinents en fonction de la requête.""" query_vector = model.encode([query]).tolist()[0] results = indexA.query( vector=query_vector, top_k=k, include_metadata=True ) relevant_docs = [] total_words = 0 total_tokens = 0 for match in results.get("matches", []): if "metadata" in match and "compressed_text" in match["metadata"]: score = match.get("score", 0) # Score de similarité if score >= similarity_threshold: # Filtrer par seuil compressed_text = match["metadata"]["compressed_text"] text = decompress_text(compressed_text) relevant_docs.append(text) # Calcul du nombre de mots et de tokens total_words += len(text.split()) # Nombre de mots (séparés par des espaces) total_tokens += len(model.tokenizer.encode(text)) # Nombre de tokens else: print(f"Skipping match due to missing metadata or compressed_text: {match}") num_docs = len(relevant_docs) avg_words_per_doc = total_words / num_docs if num_docs > 0 else 0 avg_tokens_per_doc = total_tokens / num_docs if num_docs > 0 else 0 print(f"Nombre de documents récupérés : {num_docs}") print(f"Moyenne de mots par document : {avg_words_per_doc:.2f}") print(f"Moyenne de tokens par document : {avg_tokens_per_doc:.2f}") return relevant_docs