Spaces:
Running
Running
File size: 4,833 Bytes
ae303c7 6029e40 fd1c27c ae303c7 29b0b66 ae303c7 29b0b66 ae303c7 29b0b66 ae303c7 29b0b66 ae303c7 29b0b66 ae303c7 29b0b66 ae303c7 29b0b66 ae303c7 6029e40 29b0b66 ae303c7 9760e1f 29b0b66 ae303c7 9760e1f ae303c7 fd1c27c ae303c7 9760e1f fd1c27c ae303c7 fd1c27c bf87c6c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
from sentence_transformers import SentenceTransformer
from config import dense_index as indexA
from config import *
import zlib
import base64
def split_text_into_chunks(text, max_chunk_size=1024):
"""Divise le texte en morceaux de taille maximale spécifiée."""
return [text[i:i+max_chunk_size] for i in range(0, len(text), max_chunk_size)]
def decompress_text(compressed_text):
"""Décompresse un texte compressé en base64."""
decoded = base64.b64decode(compressed_text.encode('ascii'))
return zlib.decompress(decoded).decode('utf-8')
def compress_text(text):
"""Compresse le texte et le encode en base64."""
compressed = zlib.compress(text.encode('utf-8'))
return base64.b64encode(compressed).decode('ascii')
def get_metadata_size(metadata):
"""Calcule la taille des métadonnées en bytes."""
return len(str(metadata).encode('utf-8'))
def get_existing_vectors(index):
"""Récupère les textes compressés déjà indexés dans Pinecone."""
existing_texts = set()
try:
# Récupérer les vecteurs existants (par exemple, les 10 000 premiers)
results = index.query(vector=[0] * 1024, top_k=10000, include_metadata=True)
for match in results.get("matches", []):
if "metadata" in match and "compressed_text" in match["metadata"]:
existing_texts.add(match["metadata"]["compressed_text"])
except Exception as e:
print(f"Erreur lors de la récupération des vecteurs existants : {e}")
return existing_texts
def index_pdf(texts):
"""Indexe les textes dans l'index dense en évitant les doublons."""
vectors = model.encode(texts)
# Récupérer les textes déjà indexés
existing_texts = get_existing_vectors(indexA)
for i, (vector, chunk) in enumerate(zip(vectors, texts)):
# Diviser le texte en morceaux de 1024 caractères
chunks = split_text_into_chunks(chunk, max_chunk_size=1024)
for j, small_chunk in enumerate(chunks):
# Compresser le morceau
compressed_chunk = compress_text(small_chunk)
# Vérifier si ce texte est déjà indexé
if compressed_chunk in existing_texts:
print(f"Le texte '{small_chunk[:2000]}...' est déjà indexé. Ignorer.")
continue
metadata = {"compressed_text": compressed_chunk}
metadata_size = get_metadata_size(metadata)
if metadata_size > 40960: # 40 KB
print(f"Attention : la taille des métadonnées ({metadata_size} bytes) dépasse la limite de 40960 bytes.")
small_chunk = small_chunk[:512] # Réduire à 512 caractères
compressed_chunk = compress_text(small_chunk)
metadata = {"compressed_text": compressed_chunk}
metadata_size = get_metadata_size(metadata)
if metadata_size > 40960:
print("Impossible de réduire suffisamment la taille des métadonnées. Ignorer ce morceau.")
continue
# Insérer dans Pinecone
indexA.upsert([(f"vec_{i}_{j}", vector.tolist(), metadata)])
print(f"Indexation réussie pour le morceau '{small_chunk[:2000]}...'")
def retrieve_documents(query, k, similarity_threshold):
"""Récupère les documents pertinents en fonction de la requête."""
query_vector = model.encode([query]).tolist()[0]
results = indexA.query(
vector=query_vector,
top_k=k,
include_metadata=True
)
relevant_docs = []
total_words = 0
total_tokens = 0
for match in results.get("matches", []):
if "metadata" in match and "compressed_text" in match["metadata"]:
score = match.get("score", 0) # Score de similarité
if score >= similarity_threshold: # Filtrer par seuil
compressed_text = match["metadata"]["compressed_text"]
text = decompress_text(compressed_text)
relevant_docs.append(text)
# Calcul du nombre de mots et de tokens
total_words += len(text.split()) # Nombre de mots (séparés par des espaces)
total_tokens += len(model.tokenizer.encode(text)) # Nombre de tokens
else:
print(f"Skipping match due to missing metadata or compressed_text: {match}")
num_docs = len(relevant_docs)
avg_words_per_doc = total_words / num_docs if num_docs > 0 else 0
avg_tokens_per_doc = total_tokens / num_docs if num_docs > 0 else 0
print(f"Nombre de documents récupérés : {num_docs}")
print(f"Moyenne de mots par document : {avg_words_per_doc:.2f}")
print(f"Moyenne de tokens par document : {avg_tokens_per_doc:.2f}")
return relevant_docs |