Spaces:
Paused
Paused
| from textutils import ParagraphDocumentProcessor, DocumentProcessor | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer, AutoModel | |
| import ollama | |
| import faiss | |
| import os | |
| import csv | |
| import re | |
| import pandas as pd | |
| from datetime import datetime | |
| class RAGPipeline: | |
| def __init__(self, | |
| model_name: str = "flaollama", | |
| model_orig: str = "mistral", | |
| docprocessor = ParagraphDocumentProcessor(), | |
| sentence_transformer_name: str = 'paraphrase-multilingual-MiniLM-L12-v2', | |
| numero_frammenti = 10 | |
| ) : | |
| self.model_name = model_name | |
| self.model_orig = model_orig | |
| self.docprocessor = docprocessor | |
| self.sentence_transformer_name = sentence_transformer_name | |
| self.sentence_transformer_model = SentenceTransformer( self.sentence_transformer_name ) | |
| self.numero_frammenti = numero_frammenti | |
| self.documenti = [] | |
| self.files_pdf =[] | |
| self.indice =False | |
| self.timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
| ##attrributi_frammenti contiene una lista di frammenti con attribui es: | |
| ## | |
| ## | |
| ## | |
| self.attributi_frammenti = [] #elenco di dizionari di tutti i frammenti | |
| ##LISTA DI FRAMMENTI INDICIZZATI (lista di testi) | |
| self. frammenti_indicizzati = [] #sonoi testi dei vari frammenti | |
| #print(f"NUMERODI FRAMMENTIIIII {self.numero_frammenti} param {numero_frammenti}") | |
| def dump_excel(dizionario, filename ): | |
| """Salva un dizionario in un file Excel accodando i dati se il file esiste.""" | |
| file_esiste = os.path.isfile(filename) | |
| # Converti il dizionario in un DataFrame con una sola riga | |
| df_nuova_riga = pd.DataFrame([dizionario]) | |
| if file_esiste: | |
| # Carica il file Excel esistente | |
| df_esistente = pd.read_excel(filename, engine='openpyxl') | |
| # Concatenazione dei DataFrame | |
| df_finale = pd.concat([df_esistente, df_nuova_riga], ignore_index=True) | |
| else: | |
| # Se il file non esiste, il DataFrame finale è la nuova riga | |
| df_finale = df_nuova_riga | |
| # Salva il DataFrame finale nel file Excel | |
| df_finale.to_excel(filename, index=False, engine='openpyxl') | |
| def crea_indice(self): | |
| # Converte i documenti in vettori | |
| docId = 0 | |
| fraId = 0 | |
| for documento in self.documenti: | |
| frammenti = self.docprocessor.scomponi_in_frammenti(documento, self.numero_frammenti ) | |
| for frammento in frammenti: | |
| dizionario_frammenti = { | |
| 'timestamp': self.timestamp, | |
| 'id': f"{docId}-{fraId}", | |
| "documento": docId, | |
| "frammento": fraId, | |
| "nomefile": os.path.basename(self.files_pdf[docId]), | |
| 'testo_frammento':frammento | |
| } | |
| self.attributi_frammenti.append( dizionario_frammenti ) | |
| self.frammenti_indicizzati.append(frammento) | |
| fraId = fraId +1 | |
| docId = docId +1 | |
| self.doc_embeddings = self.sentence_transformer_model.encode(self.frammenti_indicizzati) | |
| # Creazione dell'indice Faiss | |
| dimension = self.doc_embeddings.shape[1] | |
| self.indice = faiss.IndexFlatL2(dimension) # Indice L2 (distanza euclidea) | |
| self.indice.add(self.doc_embeddings) | |
| def aggiungi_file_pdf(self, filename: str) : | |
| text= self.docprocessor.estrai_da_pdf(filename) | |
| self.documenti.append(text) | |
| self.files_pdf.append(filename) | |
| class Retriever: | |
| def __init__(self, | |
| indice , | |
| sentence_transformer_model , | |
| query : str, | |
| documenti =[], | |
| frammenti_indicizzati = [], #tutti i frammenti ?? | |
| attributi_frammenti = [] ##elenco attrivuti frammenti | |
| ): | |
| self.indice = indice | |
| self.sentence_transformer = sentence_transformer_model | |
| self.query = query | |
| self.documenti = documenti | |
| self.frammenti_indicizzati = frammenti_indicizzati | |
| self.attributi_frammenti = attributi_frammenti | |
| self.passaggi_rilevanti = [] ## documenti rilevanti per la query (recuperati) | |
| self.attributi_rilevanti = [] ## atteributi dei frammenti rilevanti | |
| def esegui_query(self, top_k = 5): | |
| # Embedding della query | |
| query_embedding = self.sentence_transformer.encode([self.query]) | |
| # Recupero dei documenti più simili | |
| distances, indices = self.indice.search(query_embedding, top_k) | |
| # documenti rilevanti e passaggi rilevanti | |
| self.passaggi_rilevanti = [self.frammenti_indicizzati[j] for j in indices[0]] #frammenti rilevanti | |
| self.attributi_rilevanti = [self.attributi_frammenti[j] for j in indices[0]] #passaggi rilevanti | |
| class ChatBot: | |
| def __init__(self, | |
| model_name: str = "flaollama", | |
| model_orig: str = "mistral" , | |
| model_system=( | |
| "Sei un esperto di diritto amministrativo che deve eseguire il " | |
| "controllo di regolarità amministrativa su un atto amministrativo di un comune italiano. " | |
| "Ti verranno forniti un atto amministrativo (determinazione dirigenziale) ed eventuali allegati, questi sono forniti come frammenti rilevanti. " | |
| "Utilizza solamente i frammenti che ti verranno inviati." | |
| "Rispondi in Italiano usando al massimo 50 parole. " | |
| "Basati esclusivamente sul seguente testo: " | |
| ), | |
| dump_filename="dump.csv" | |
| ): | |
| self.model_name = model_name | |
| self.model_orig = model_orig | |
| self.model_system = model_system | |
| self.dump_filename = dump_filename | |
| ollama.create( | |
| model=model_name, | |
| from_=model_orig, | |
| system = model_system | |
| ) | |
| def dump_excel(self, dizionario, filename ): | |
| RAGPipeline.dump_excel(dizionario=dizionario, filename=filename) | |
| def dump_csv(self,dizionario): | |
| """Salva un dizionario in un file CSV con separatore '|' accodando i dati se il file esiste.""" | |
| file_esiste = os.path.isfile(self.dump_filename) | |
| with open(self.dump_filename, mode="a", newline="", encoding="utf-8") as file: | |
| writer = csv.writer(file, delimiter="|") | |
| # Scrive l'intestazione solo se il file viene creato ex novo | |
| if not file_esiste: | |
| writer.writerow(dizionario.keys()) | |
| # Scrive i valori come una nuova riga | |
| writer.writerow([str(val).replace("\n", "").replace("\r", "").replace("\t", "") for val in dizionario.values()]) | |
| def pulisci_risposta(self, | |
| response: str): | |
| retval=re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL).strip() | |
| retval = retval.replace("\n", " ").replace("\t", " ").replace("|", " ") | |
| return retval | |
| def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str: | |
| prompt = f"ISTRUZIONI: {istruzioni}\n\nCONTESTO:\n" + "\n".join(frammenti) + f"\n\nDOMANDA: {domanda}" | |
| response = ollama.chat(model=self.model_orig, messages=[ | |
| {"role": "user", "content": prompt} | |
| ], options= {'num_ctx' : 4096 }) | |
| return response["message"]["content"] | |
| def generate(self, | |
| relevant_docs = [], | |
| attributi_frammenti_rilevanti = [], | |
| query="", | |
| istruzioni :str = None ##togliere | |
| ): | |
| i = 0 | |
| #print (f"DIMESIONE FILE {len(relevant_files)}") | |
| #print (f"DIMESIONE TESTI {len(relevant_docs)}") | |
| prompt="" | |
| for documento in relevant_docs: | |
| prompt += f"{relevant_docs[i]} " | |
| i = i+1 | |
| #"{context}\n\nDomanda: {query}" | |
| if istruzioni is not None: | |
| query = query + " Istruzioni: " + istruzioni | |
| prompt +=f"\n\nDomanda:{query} \n\n" | |
| #print(prompt) | |
| rersponse = ollama.generate( | |
| model=self.model_name, | |
| prompt=prompt | |
| , options= {'num_ctx' : 4096 } | |
| ) | |
| return rersponse['response'] | |