| from pypdf import PdfReader |
| from .chunker import Chunker |
| from .amodel import AModel |
| from .store import Store |
|
|
| CHUNK_CHAR_COUNT = 1000 |
| CHUNK_OVERLAP = 200 |
|
|
| class Rag: |
| ''' |
| RAG naïf |
| Classe qui s'occupe de toute la chaine du RAG. |
| Elle permet : |
| d'interroger un llm directement (sans RAG) avec ask_llm() |
| d'interroger le RAG lui même avec ask_rag() |
| d'ajouter des documents à la base de données du RAG |
| de remettre la base à zéro |
| de créer des vecteurs |
| de charger des pdf |
| ''' |
|
|
| |
| prompt_template = """ |
| En vous basant **uniquement** sur les informations fournies dans le contexte |
| ci-dessous, répondez à la question posée. |
| Les équations seront écrites en latex. |
| Si vous ne trouvez pas la réponse dans le contexte, répondez "Je ne sais pas". |
| Contexte : {context} |
| Question : {question} |
| """ |
|
|
| def __init__(self, llm:AModel, emb:AModel, store_dir:str)->None: |
| ''' |
| Constructeur du Rag |
| Args: |
| llm: le model de langage |
| emb: le model d'embeddings |
| store_dir: le répertoire de persistance de la base de données ou None pour éphémère |
| Exception: |
| Si le store ne peut pas se créer (répertoire inaccessible par ex.) |
| ''' |
| self.llm:AModel = llm |
| self.emb:AModel = emb |
| self.store_dir:str = store_dir |
| try: |
| self.emb_store = Store(store_dir) |
| |
| except: |
| raise |
|
|
| def get_llm_name(self): |
| return self.llm.get_llm_name() |
| |
| def get_feature_name(self): |
| return self.emb.get_feature_name() |
| |
| def get_temperature(self): |
| return self.llm.get_temperature() |
| |
| def set_temperature(self, temperature:float): |
| self.llm.set_temperature(temperature) |
|
|
| def reset_store(self): |
| self.emb_store.reset() |
|
|
| def delete_collection(self, name:str)->None: |
| self.emb_store.delete_collection(name) |
|
|
| def create_vectors(self, chunks:list[str])->list[list[float]]: |
| ''' |
| Renvoie les vecteurs correspondant à 'chunks', calculés par 'emb' |
| Args: |
| chunks: les extraits de texte à calculer |
| Return: |
| la liste des vecteurs calculés |
| ''' |
| vectors:list = [] |
| tokens:int = 0 |
| try: |
| vectors:list[list[float]] = self.emb.create_vectors(chunks) |
| return vectors |
| except: |
| raise |
|
|
| def load_pdf(self, file_name:str)->str: |
| ''' Charge le fichier 'file_name' et renvoie son contenu sous forme de texte. ''' |
| reader = PdfReader(file_name) |
| content = "" |
| for page in reader.pages: |
| content += page.extract_text() + "\n" |
| return content |
| |
| def get_chunks(self, text:str)->list: |
| ''' |
| Découpe le 'text' en chunks de taille chunk_size avec un recouvrement |
| Args: |
| text: Le texte à découper |
| Return: |
| La liste des chunks |
| ''' |
| chunker = Chunker() |
| chunks = chunker.split_basic(text=text, char_count=CHUNK_CHAR_COUNT, overlap=CHUNK_OVERLAP) |
| return chunks |
|
|
| def add_pdf_to_store(self, file_name:str, collection_name:str)->None: |
| ''' |
| Ajoute un pdf à la base de données du RAG. |
| Args: |
| file_name: le chemin vers le fichier à ajouter |
| collection_name: Le nom de la collection dans laquelle il faut ajouter les chunks |
| La collection est créée si elle n'existe pas. |
| ''' |
| text:str = self.load_pdf(file_name) |
| chunks:list[str] = self.get_chunks(text) |
| self.add_chunks_to_store(chunks=chunks, collection_name=collection_name, source=file_name) |
|
|
| def add_pdf_stream_to_store(self, stream, collection_name:str)->None: |
| ''' |
| Ajoute un stream provenant de file_uploader de streamlit par exemple |
| ''' |
| text:str = self.load_pdf(stream) |
| chunks:list[str] = self.get_chunks(text) |
| self.add_chunks_to_store(chunks=chunks, collection_name=collection_name, source="stream") |
|
|
| def add_chunks_to_store(self, chunks:list[str], collection_name:str, source:str)->None: |
| ''' |
| Ajoute des chunks à la base de données du RAG. |
| Args: |
| chunks: les chunks à ajouter |
| collection_name: Le nom de la collection dans laquelle il faut ajouter les chunks |
| La collection est créée si elle n'existe pas. |
| source: la source des chunks (nom du fichier, url ...) |
| ''' |
| try: |
| vectors = self.create_vectors(chunks=chunks) |
| except: |
| raise |
| self.emb_store.add_to_collection( |
| collection_name=collection_name, |
| source=source, |
| vectors=vectors, |
| chunks=chunks |
| ) |
|
|
|
|
| def ask_llm(self, question:str)->str: |
| ''' |
| Pose une question au llm, attend sa réponse et la renvoie. |
| Args: |
| question: La question qu'on veut lui poser |
| Returns: |
| La réponse du llm |
| ''' |
| try: |
| return self.llm.ask_llm(question=question) |
| except: |
| return "Error while comminicating with model !" |
|
|
| def ask_rag(self, question:str, collection_name:str)->tuple[str, str, list[str], list[str]]: |
| ''' |
| Pose une question au RAG, attend sa réponse et la renvoie. |
| Args: |
| question: La question qu'on veut lui poser |
| collection_name: le nom de la collection que l'on veut interroger |
| Returns: |
| Le prompt effectivement donné au llm |
| La réponse du llm |
| Les sources du RAG utilisées |
| Les ids des documents du RAG |
| ''' |
| if not question: |
| return "", "Error: No question !", [], [] |
| if not collection_name: |
| return "", "Error: No collection specified !", [], [] |
| if not collection_name in self.emb_store.get_collection_names(): |
| return "", "Error: {name} is no more in the database !".format(name=collection_name), [], [] |
| try: |
| |
| query_vector:list[float] = self.emb.create_vector(question) |
| |
| chunks, sources, ids = self.emb_store.get_similar_chunks( |
| query_vector=query_vector, |
| count=2, |
| collection_name=collection_name |
| ) |
| |
| prompt:str = self.prompt_template.format( |
| context="\n\n\n".join(chunks), |
| question=question |
| ) |
| |
| resp:str = self.ask_llm(question=prompt) |
|
|
| return prompt, resp, sources, ids |
| except: |
| return "", "Error with communicating with model !", [], [] |
| |
|
|
|
|
|
|
|
|
|
|