AURA-chatbot / src /ragpipeline.py
fcasadei's picture
Upload 4 files
31b7256 verified
raw
history blame
8.6 kB
from textutils import ParagraphDocumentProcessor, DocumentProcessor
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import ollama
import faiss
import os
import csv
import re
import pandas as pd
from datetime import datetime
class RAGPipeline:
def __init__(self,
model_name: str = "flaollama",
model_orig: str = "mistral",
docprocessor = ParagraphDocumentProcessor(),
sentence_transformer_name: str = 'paraphrase-multilingual-MiniLM-L12-v2',
numero_frammenti = 10
) :
self.model_name = model_name
self.model_orig = model_orig
self.docprocessor = docprocessor
self.sentence_transformer_name = sentence_transformer_name
self.sentence_transformer_model = SentenceTransformer( self.sentence_transformer_name )
self.numero_frammenti = numero_frammenti
self.documenti = []
self.files_pdf =[]
self.indice =False
self.timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
##attrributi_frammenti contiene una lista di frammenti con attribui es:
##
##
##
self.attributi_frammenti = [] #elenco di dizionari di tutti i frammenti
##LISTA DI FRAMMENTI INDICIZZATI (lista di testi)
self. frammenti_indicizzati = [] #sonoi testi dei vari frammenti
#print(f"NUMERODI FRAMMENTIIIII {self.numero_frammenti} param {numero_frammenti}")
@staticmethod
def dump_excel(dizionario, filename ):
"""Salva un dizionario in un file Excel accodando i dati se il file esiste."""
file_esiste = os.path.isfile(filename)
# Converti il dizionario in un DataFrame con una sola riga
df_nuova_riga = pd.DataFrame([dizionario])
if file_esiste:
# Carica il file Excel esistente
df_esistente = pd.read_excel(filename, engine='openpyxl')
# Concatenazione dei DataFrame
df_finale = pd.concat([df_esistente, df_nuova_riga], ignore_index=True)
else:
# Se il file non esiste, il DataFrame finale è la nuova riga
df_finale = df_nuova_riga
# Salva il DataFrame finale nel file Excel
df_finale.to_excel(filename, index=False, engine='openpyxl')
def crea_indice(self):
# Converte i documenti in vettori
docId = 0
fraId = 0
for documento in self.documenti:
frammenti = self.docprocessor.scomponi_in_frammenti(documento, self.numero_frammenti )
for frammento in frammenti:
dizionario_frammenti = {
'timestamp': self.timestamp,
'id': f"{docId}-{fraId}",
"documento": docId,
"frammento": fraId,
"nomefile": os.path.basename(self.files_pdf[docId]),
'testo_frammento':frammento
}
self.attributi_frammenti.append( dizionario_frammenti )
self.frammenti_indicizzati.append(frammento)
fraId = fraId +1
docId = docId +1
self.doc_embeddings = self.sentence_transformer_model.encode(self.frammenti_indicizzati)
# Creazione dell'indice Faiss
dimension = self.doc_embeddings.shape[1]
self.indice = faiss.IndexFlatL2(dimension) # Indice L2 (distanza euclidea)
self.indice.add(self.doc_embeddings)
def aggiungi_file_pdf(self, filename: str) :
text= self.docprocessor.estrai_da_pdf(filename)
self.documenti.append(text)
self.files_pdf.append(filename)
class Retriever:
def __init__(self,
indice ,
sentence_transformer_model ,
query : str,
documenti =[],
frammenti_indicizzati = [], #tutti i frammenti ??
attributi_frammenti = [] ##elenco attrivuti frammenti
):
self.indice = indice
self.sentence_transformer = sentence_transformer_model
self.query = query
self.documenti = documenti
self.frammenti_indicizzati = frammenti_indicizzati
self.attributi_frammenti = attributi_frammenti
self.passaggi_rilevanti = [] ## documenti rilevanti per la query (recuperati)
self.attributi_rilevanti = [] ## atteributi dei frammenti rilevanti
def esegui_query(self, top_k = 5):
# Embedding della query
query_embedding = self.sentence_transformer.encode([self.query])
# Recupero dei documenti più simili
distances, indices = self.indice.search(query_embedding, top_k)
# documenti rilevanti e passaggi rilevanti
self.passaggi_rilevanti = [self.frammenti_indicizzati[j] for j in indices[0]] #frammenti rilevanti
self.attributi_rilevanti = [self.attributi_frammenti[j] for j in indices[0]] #passaggi rilevanti
class ChatBot:
def __init__(self,
model_name: str = "flaollama",
model_orig: str = "mistral" ,
model_system=(
"Sei un esperto di diritto amministrativo che deve eseguire il "
"controllo di regolarità amministrativa su un atto amministrativo di un comune italiano. "
"Ti verranno forniti un atto amministrativo (determinazione dirigenziale) ed eventuali allegati, questi sono forniti come frammenti rilevanti. "
"Utilizza solamente i frammenti che ti verranno inviati."
"Rispondi in Italiano usando al massimo 50 parole. "
"Basati esclusivamente sul seguente testo: "
),
dump_filename="dump.csv"
):
self.model_name = model_name
self.model_orig = model_orig
self.model_system = model_system
self.dump_filename = dump_filename
ollama.create(
model=model_name,
from_=model_orig,
system = model_system
)
def dump_excel(self, dizionario, filename ):
RAGPipeline.dump_excel(dizionario=dizionario, filename=filename)
def dump_csv(self,dizionario):
"""Salva un dizionario in un file CSV con separatore '|' accodando i dati se il file esiste."""
file_esiste = os.path.isfile(self.dump_filename)
with open(self.dump_filename, mode="a", newline="", encoding="utf-8") as file:
writer = csv.writer(file, delimiter="|")
# Scrive l'intestazione solo se il file viene creato ex novo
if not file_esiste:
writer.writerow(dizionario.keys())
# Scrive i valori come una nuova riga
writer.writerow([str(val).replace("\n", "").replace("\r", "").replace("\t", "") for val in dizionario.values()])
def pulisci_risposta(self,
response: str):
retval=re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL).strip()
retval = retval.replace("\n", " ").replace("\t", " ").replace("|", " ")
return retval
def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str:
prompt = f"ISTRUZIONI: {istruzioni}\n\nCONTESTO:\n" + "\n".join(frammenti) + f"\n\nDOMANDA: {domanda}"
response = ollama.chat(model=self.model_orig, messages=[
{"role": "user", "content": prompt}
], options= {'num_ctx' : 4096 })
return response["message"]["content"]
def generate(self,
relevant_docs = [],
attributi_frammenti_rilevanti = [],
query="",
istruzioni :str = None ##togliere
):
i = 0
#print (f"DIMESIONE FILE {len(relevant_files)}")
#print (f"DIMESIONE TESTI {len(relevant_docs)}")
prompt=""
for documento in relevant_docs:
prompt += f"{relevant_docs[i]} "
i = i+1
#"{context}\n\nDomanda: {query}"
if istruzioni is not None:
query = query + " Istruzioni: " + istruzioni
prompt +=f"\n\nDomanda:{query} \n\n"
#print(prompt)
rersponse = ollama.generate(
model=self.model_name,
prompt=prompt
, options= {'num_ctx' : 4096 }
)
return rersponse['response']