| from math import sqrt |
| import operator |
| import json |
| import os |
| from pathlib import Path |
| from .astore import AStore |
|
|
| class Document: |
| ''' |
| Un document est : |
| une chaîne de caractère, le chunk |
| une source, livre ou page ou chapitre |
| un vecteur issu d'un modèle d'embedding |
| un id, calculé automatiquement par la collection |
| ''' |
| def __init__(self, chunk:str, source:str, vec:list[float], idd:int): |
| self.chunk = chunk |
| self.source = source |
| self.vec = vec |
| self.idd = idd |
|
|
| def get_json(self): |
| json = { |
| 'c':self.chunk, |
| 's':self.source, |
| 'v':self.vec |
| } |
|
|
| return json |
|
|
| class Collection: |
| ''' |
| Une Collection est : |
| un nom |
| une liste de documents |
| un id, calculé automatiquement par le Store |
| Une collection est sauvée dans un fichier idc.col |
| le nom de la collection |
| la liste des documents: |
| chunk |
| source |
| vector |
| ''' |
| def __init__(self, |
| name:str, |
| docs:list[Document], |
| idc:int): |
| self.name = name |
| self.docs = docs |
| self.idc = idc |
| |
| def add_document(self, chunk:str, source:str, vec:list[float])->Document: |
| ''' |
| Ajoute un document à la collection |
| Args: |
| chunk: le texte du document |
| source: la source du document (livre, chap...) |
| vec: la représentation vectorielle du document |
| Returns: |
| un Document ou None si problème rencontré |
| Raise: |
| si un des paramètres n'est pas défini |
| ''' |
| if chunk == None or source == None or vec == None: |
| raise Exception("Document error: chunk, source or vec is None !") |
| idd:int = len(self.docs) + 1 |
| doc:Document = Document(chunk, source, vec, idd) |
| self.docs.append(doc) |
| return doc |
| |
| def get_length_octets(self)->int: |
| ''' |
| Return la taille en octets de la collection |
| ''' |
| if len(self.docs) == 0: |
| return 0 |
| vector_size = len(self.docs[0]) |
| return len(self.docs) * vector_size * 4 |
| |
| @classmethod |
| def from_disk(self, file_path:str): |
| ''' |
| Méthode de classe qui renvoie une Collection à partir d'un fichier de la base |
| Args: |
| file_path: le chemin vers le fichier |
| Return: |
| la Collection |
| Exception: |
| si le fichier n'existe pas ou qu'on ne peut pas le lire |
| ''' |
| if not os.path.exists(file_path): |
| raise Exception("File {file} doesn't exist !".format(file=file_path)) |
| idc:int = int(Path(file_path).stem) |
| |
| try: |
| with open(file_path, "r") as f: |
| datas = json.load(f) |
| name:str = datas['name'] |
| docs = [] |
| idd: int = 1 |
| for d in datas['docs']: |
| doc:Document = Document(d['c'], d['s'], d['v'], idd) |
| docs.append(doc) |
| idd += 1 |
| return Collection(name, docs, idc) |
| except: |
| raise Exception("Unable to read {file_path} !".format(file_path=file_path)) |
| |
| def save(self, persist_dir:str): |
| ''' |
| La collection est enregistrée avec le nom idc.col dans le persist_dir |
| Args: |
| persist_dir: le chemin du repertoire de la bdd |
| Exception: |
| Si on ne peut pas sauver sur le disque |
| ''' |
|
|
| file_path:str = os.path.join(persist_dir, str(self.idc)) + ".col" |
| |
| json_object = { |
| 'name':self.name, |
| 'docs':[] |
| } |
| for doc in self.docs: |
| json_object['docs'].append(doc.get_json()) |
| json_object = json.dumps(json_object) |
| try: |
| with open(file_path, "w+") as f: |
| f.write(json_object) |
| except: |
| raise Exception("Unable to save the collection {name}, id={id} !".format(name=self.name, id=self.idc)) |
|
|
| def delete(self, persist_dir:str)->None: |
| ''' |
| Supprime la collection de la bdd |
| Args: |
| persist_dir: le chemin du repertoire de la bdd |
| Exception: |
| Si on ne peut pas supprimer du disque |
| ''' |
| self.docs.clear() |
| file_path:str = os.path.join(persist_dir, str(self.idc)) + ".col" |
| try: |
| os.remove(file_path) |
| except: |
| raise Exception("Unable to delete the collection {name}, id={id} !".format(name=self.name, id=self.idc)) |
| |
| class Store(AStore): |
| ''' |
| Un store est une liste de collections. |
| A chaque création, ajout ou suppression d'un élément, la base est sauvée si elle est persistante |
| Sur le disque, dans store_dir: |
| Un sous-repertoire par collection, portant le nom de la collection |
| Dans chaque sous-repertoire d'une collection : la liste des vecteurs |
| |
| ''' |
| def __init__(self, persist_dir:str): |
| ''' Constructeur de Store |
| Args: |
| dir_name: le répertoire persistant de la base de données ou None |
| Exception: |
| Dans le cas d'une base persistante: |
| Impossible de créer le répertoire persistant |
| Impossible de lire les collections |
| ''' |
| self.persist_dir = persist_dir |
| self.collections = [] |
| if persist_dir == None: |
| pass |
| else: |
| |
| try: |
| self._create_persist_dir() |
| files = [os.path.join(persist_dir, f) for f in os.listdir(persist_dir) if os.path.isfile(os.path.join(persist_dir, f))] |
| for f in files: |
| col: Collection = Collection.from_disk(f) |
| self.collections.append(col) |
| except Exception as e: |
| raise |
| |
| def reset(self)->None: |
| ''' |
| Vide la base et l'efface du disque si elle est persistante |
| Exception: |
| Dans le cas d'une base persistante: |
| Impossible de créer le répertoire persistant |
| Impossible de lire les collections |
| ''' |
| self.collections = [] |
| if self.persist_dir == None: |
| pass |
| else: |
| try: |
| |
| if os.path.exists(self.persist_dir): |
| files = [os.path.join(self.persist_dir, f) for f in os.listdir(self.persist_dir) if os.path.isfile(os.path.join(self.persist_dir, f))] |
| |
| for f in files: |
| os.remove(f) |
| os.rmdir(self.persist_dir) |
| except Exception as e: |
| raise |
| |
| def get_collection_names(self)->list[str]: |
| return [col.name for col in self.collections] |
|
|
| def print_infos(self)->None: |
| ''' Affiche le nombre de collections et pour chaque collection, affiche son nom et son nombre de documents ''' |
| print("-------- STORE INFOS ---------------") |
| for col in self.collections: |
| print(col.name) |
| |
| |
| print("\tdocuments:", len(col.docs)) |
| print("-------- /STORE INFOS ---------------") |
|
|
| def get_collection(self, collection_name:str)->Collection: |
| ''' |
| Renvoie la collection dont le nom est 'collection_name' ou None si elle n'existe pas |
| ''' |
| for col in self.collections: |
| if col.name == collection_name: |
| return col |
| return None |
| |
| def _create_persist_dir(self): |
| ''' |
| Recrée le répertoir persistant s'il a disparu après un reset par exemple |
| Exception: |
| Si on ne peut pas créer le 'persist_dir' |
| ''' |
| |
| |
| try: |
| if not os.path.exists(self.persist_dir): |
| print("Trying to recreate persist_dir", self.persist_dir) |
| os.mkdir(self.persist_dir) |
| except: |
| raise Exception("Unable to create the persit directory: {dir}".format(dir=self.persist_dir)) |
|
|
| def create_collection(self, name:str)->Collection: |
| ''' |
| Crée et renvoie une nouvelle collection vide de documents |
| Args: |
| name: le nom de la création à créer |
| Exception: |
| Dans le cas d'une base persistante: |
| Impossible de créer le répertoire persistant |
| Impossible de sauver la collection |
| ''' |
| idc:int = len(self.collections) + 1 |
| col:Collection = Collection(name, [], idc) |
| if self.persist_dir != None: |
| try: |
| self._create_persist_dir() |
| col.save(self.persist_dir) |
| except: |
| raise |
| return col |
| |
| |
| def add_to_collection(self, collection_name:str, source:str, vectors:list[list[float]], chunks:list[str])->None: |
| ''' |
| Ajoute une liste de vecteurs à la collection 'collection_name' |
| Args: |
| collection_name: le nom de la collection |
| source: la source unique des chunks, par exemple un nom de fichier, une url ... |
| vectors: la liste des vecteurs obtenus à l'aide d'un modèle d'embeddings |
| chunks: la liste des chunks (documents) correspondant aux vecteurs |
| Exception: |
| Dans le cas d'une base persistante: |
| Impossible de créer le répertoire persistant |
| Impossible de sauver la collection |
| ''' |
| col:Collection = self.get_collection(collection_name) |
| if col == None: |
| col = self.create_collection(collection_name) |
| self.collections.append(col) |
| for i in range(len(chunks)): |
| col.add_document(chunks[i], source, vectors[i]) |
| if self.persist_dir != None: |
| try: |
| self._create_persist_dir() |
| col.save(self.persist_dir) |
| except: |
| raise |
| |
| def delete_collection(self, name:str)->None: |
| ''' Vide et supprime la collection dont le nom est 'name', et la supprime du disque si elle est persistante ''' |
| col = self.get_collection(name) |
| if col != None: |
| self.collections.remove(col) |
| if self.persist_dir != None: |
| try: |
| self._create_persist_dir() |
| col.delete(self.persist_dir) |
| except: |
| raise |
| |
| def normalize(self, v:list[float])->list[float]: |
| ''' |
| Normalement les LLMs renvoient des vecteurs normalisés mais: |
| c'est pas sûr pour ceux que je n'ai pas testés |
| c'est pratique d'avoir cette méthode pour 'test_store.py' |
| Args: |
| v: le vecteur à normaliser |
| Returns: |
| le vecteur normalisé |
| ''' |
| norm = 0.0 |
| for i in range(len(v)): |
| norm += v[i] * v[i] |
| norm = sqrt(norm) |
| if norm == 0.0: |
| return v.copy() |
| result = [None] * len(v) |
| for i in range(len(v)): |
| result[i] = v[i] / norm |
| return result |
| |
| def dot_product(self, v1:list[float], v2:list[float])->float: |
| ''' |
| Le produit scalaire est utilisé pour une similarité en cosinus: |
| cos(a) = (vecA dot vecB) / (A.B) |
| si les vecteurs A et B sont normalisés, le cos est simplement le produit scalaire |
| Args: |
| v1, v2: les deux vecteurs à multiplier |
| Returns: |
| Un float égal à v1 dot v2 |
| ''' |
| result = 0.0 |
| for i in range(len(v1)): |
| result += v1[i] * v2[i] |
| return result |
| |
| def get_similar_vector(self, vector:list[float], collection_name:str)->list[float]: |
| ''' |
| Renvoie le vecteur de 'collection' le pus similaire à 'vector'. |
| Args: |
| vector: un vecteur obtenu avec le même modèle d'embeddings que les vecteurs de la 'collection' |
| collection_name: le nom de la collection de la base dans laquelle on cherche une similarité |
| Return: |
| Le vecteur le plus similaire 'vector' |
| ''' |
| col:Collection = self.get_collection(collection_name) |
| best_doc:Document = None |
| best_dp: float = -20.0 |
| if col != None: |
| for doc in col.docs: |
| dp:float = self.dot_product(vector, doc.vec) |
| if dp > best_dp: |
| best_dp = dp |
| best_doc = doc |
| return best_doc.vec |
| else: |
| return None |
| |
| def get_similar_chunk(self, query_vector:list[float], collection_name:str)->tuple[str, str]: |
| ''' |
| Renvoie le document de la 'collection' le plus similaire à 'query_vector'. |
| Args: |
| query_vector: un vecteur obtenu avec le même modèle d'embeddings que les vecteurs de la 'collection' |
| collection: la collection de la base dans laquelle on cherche une similarité |
| Returns: |
| Un tuple contenant: |
| le document |
| la source du document |
| ''' |
| col:Collection = self.get_collection(collection_name) |
| best_doc:Document = None |
| best_dp: float = -20.0 |
| if col != None: |
| for doc in col.docs: |
| dp:float = self.dot_product(query_vector, doc.vec) |
| print(dp) |
| if dp > best_dp: |
| best_dp = dp |
| best_doc = doc |
| return best_doc.chunk, best_doc.source |
| else: |
| return None, None |
| |
| def get_similar_chunks(self, query_vector:list[float], count:int, collection_name:str): |
| ''' |
| Returns: |
| Un tuple contenant: |
| les documents |
| la source des documents |
| les ids des documents |
| a[0:count-1] |
| ''' |
| |
| col:Collection = self.get_collection(collection_name) |
| if col == None: |
| return None, None, None |
| bests:list[dict] = [] |
| |
| for doc in col.docs: |
| dp:float = self.dot_product(query_vector, doc.vec) |
| bests.append({'doc':doc, 'dp':dp}) |
| |
| bests.sort(key=operator.itemgetter('dp'), reverse=True) |
| |
| n:int = count if len(bests) >= count else len(bests) |
| |
| |
| docs = [b['doc'].chunk for b in bests[0:n]] |
| source = bests[0]['doc'].source if n > 0 else None |
| ids = [b['doc'].idd for b in bests[0:n]] |
| |
| return docs, source, ids |
| |