Spaces:
Running
Running
| from sentence_transformers import SentenceTransformer | |
| from config import dense_index as indexA | |
| from config import * | |
| import zlib | |
| import base64 | |
| def split_text_into_chunks(text, max_chunk_size=1024): | |
| """Divise le texte en morceaux de taille maximale spécifiée.""" | |
| return [text[i:i+max_chunk_size] for i in range(0, len(text), max_chunk_size)] | |
| def decompress_text(compressed_text): | |
| """Décompresse un texte compressé en base64.""" | |
| decoded = base64.b64decode(compressed_text.encode('ascii')) | |
| return zlib.decompress(decoded).decode('utf-8') | |
| def compress_text(text): | |
| """Compresse le texte et le encode en base64.""" | |
| compressed = zlib.compress(text.encode('utf-8')) | |
| return base64.b64encode(compressed).decode('ascii') | |
| def get_metadata_size(metadata): | |
| """Calcule la taille des métadonnées en bytes.""" | |
| return len(str(metadata).encode('utf-8')) | |
| def get_existing_vectors(index): | |
| """Récupère les textes compressés déjà indexés dans Pinecone.""" | |
| existing_texts = set() | |
| try: | |
| # Récupérer les vecteurs existants (par exemple, les 10 000 premiers) | |
| results = index.query(vector=[0] * 1024, top_k=10000, include_metadata=True) | |
| for match in results.get("matches", []): | |
| if "metadata" in match and "compressed_text" in match["metadata"]: | |
| existing_texts.add(match["metadata"]["compressed_text"]) | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération des vecteurs existants : {e}") | |
| return existing_texts | |
| def index_pdf(texts): | |
| """Indexe les textes dans l'index dense en évitant les doublons.""" | |
| vectors = model.encode(texts) | |
| # Récupérer les textes déjà indexés | |
| existing_texts = get_existing_vectors(indexA) | |
| for i, (vector, chunk) in enumerate(zip(vectors, texts)): | |
| # Diviser le texte en morceaux de 1024 caractères | |
| chunks = split_text_into_chunks(chunk, max_chunk_size=1024) | |
| for j, small_chunk in enumerate(chunks): | |
| # Compresser le morceau | |
| compressed_chunk = compress_text(small_chunk) | |
| # Vérifier si ce texte est déjà indexé | |
| if compressed_chunk in existing_texts: | |
| print(f"Le texte '{small_chunk[:2000]}...' est déjà indexé. Ignorer.") | |
| continue | |
| metadata = {"compressed_text": compressed_chunk} | |
| metadata_size = get_metadata_size(metadata) | |
| if metadata_size > 40960: # 40 KB | |
| print(f"Attention : la taille des métadonnées ({metadata_size} bytes) dépasse la limite de 40960 bytes.") | |
| small_chunk = small_chunk[:512] # Réduire à 512 caractères | |
| compressed_chunk = compress_text(small_chunk) | |
| metadata = {"compressed_text": compressed_chunk} | |
| metadata_size = get_metadata_size(metadata) | |
| if metadata_size > 40960: | |
| print("Impossible de réduire suffisamment la taille des métadonnées. Ignorer ce morceau.") | |
| continue | |
| # Insérer dans Pinecone | |
| indexA.upsert([(f"vec_{i}_{j}", vector.tolist(), metadata)]) | |
| print(f"Indexation réussie pour le morceau '{small_chunk[:2000]}...'") | |
| def retrieve_documents(query, k, similarity_threshold): | |
| """Récupère les documents pertinents en fonction de la requête.""" | |
| query_vector = model.encode([query]).tolist()[0] | |
| results = indexA.query( | |
| vector=query_vector, | |
| top_k=k, | |
| include_metadata=True | |
| ) | |
| relevant_docs = [] | |
| total_words = 0 | |
| total_tokens = 0 | |
| for match in results.get("matches", []): | |
| if "metadata" in match and "compressed_text" in match["metadata"]: | |
| score = match.get("score", 0) # Score de similarité | |
| if score >= similarity_threshold: # Filtrer par seuil | |
| compressed_text = match["metadata"]["compressed_text"] | |
| text = decompress_text(compressed_text) | |
| relevant_docs.append(text) | |
| # Calcul du nombre de mots et de tokens | |
| total_words += len(text.split()) # Nombre de mots (séparés par des espaces) | |
| total_tokens += len(model.tokenizer.encode(text)) # Nombre de tokens | |
| else: | |
| print(f"Skipping match due to missing metadata or compressed_text: {match}") | |
| num_docs = len(relevant_docs) | |
| avg_words_per_doc = total_words / num_docs if num_docs > 0 else 0 | |
| avg_tokens_per_doc = total_tokens / num_docs if num_docs > 0 else 0 | |
| print(f"Nombre de documents récupérés : {num_docs}") | |
| print(f"Moyenne de mots par document : {avg_words_per_doc:.2f}") | |
| print(f"Moyenne de tokens par document : {avg_tokens_per_doc:.2f}") | |
| return relevant_docs |