Spaces:
Running
Running
| #Questo file contiene la logica per i due modelli | |
| import os | |
| import json | |
| import re | |
| import logging | |
| from groq import Groq | |
| from mistralai import Mistral | |
| from agentLogic.state import AgentState | |
| from db.graph_db import GraphDB | |
| from processingPdf.reranker import Reranker | |
| from processingPdf.indexer import Indexer | |
| logger = logging.getLogger(__name__) | |
| groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) | |
| # Inizializziamo i modelli pesanti fuori dai nodi per caricarli una sola volta all'avvio | |
| # Fondamentale per le performance di FastAPI | |
| indexer_instance = Indexer() | |
| reranker_model = Reranker() | |
| #Estrae e pulisce il JSON dall'output dell'LLM | |
| def extract_json(text): | |
| try: | |
| #Cerca il blocco tra parentesi graffe | |
| match = re.search(r'\{.*\}', text, re.DOTALL) | |
| if match: | |
| return json.loads(match.group(0)) | |
| return json.loads(text) | |
| except Exception as e: | |
| logger.error(f"Errore nel parsare il JSON: {e}") | |
| return {"route": "vector", "entities": [], "keywords": []} | |
| #Nodo rewriter: Pulisce la query, corregge errori e agisce da Guardrail. precedentemente aveva anche una funzione di | |
| # ampliamento contestuale ma ho deciso di eliminare l'espansione semantica forzata per evitare di compromettere il contesto del RAG come successo in fase di testing | |
| def node_rewriter(state: AgentState): | |
| user_query = state["query"] | |
| prompt = f""" | |
| ### ROLE | |
| Sei un correttore ortografico e/o grammaticale e sintattico. Il tuo unico output deve essere la query corretta. | |
| ### TASK | |
| 1. **Correzione**: Correggi eventuali errori di ortografia, sintassi, grammaticali o di battitura. | |
| 2. **Minimalismo**: NON AGGIUNGERE contesto, sinonimi o interpretazioni. Se la domanda è chiara (es. "cos'è la persona"), lasciala IDENTICA. | |
| ### ESEMPI | |
| Input: "cos'è la 'persona'?" -> Output: cos'è la 'persona'? | |
| Input: "spiegami il prompt pstern" -> Output: spiegami il prompt pattern | |
| Input: "Quali sono i sui sinotmi?" -> Output: Quali sono i suoi sintomi? | |
| USER QUERY: "{user_query}" | |
| OUTPUT: | |
| """ | |
| completion = groq_client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[ | |
| {"role": "system", "content": "Sei un correttore di testo puro. Non salutare. Non spiegare. Restituisci SOLO il risultato."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.0 | |
| ) | |
| rewritten_query = completion.choices[0].message.content.strip() | |
| #pulizia ulteriore | |
| rewritten_query = rewritten_query.replace('Output:', '').replace('"', '').strip() | |
| print(f"DEBUG - Query Rewriting: '{user_query}' -> '{rewritten_query}'") | |
| return {"query": rewritten_query} | |
| #Nodo 1: utilizzo mistral per decidere la strategia di ricerca" | |
| def node_router(state: AgentState): | |
| """ | |
| Nodo 1: Utilizza Mistral per decidere la strategia di ricerca. | |
| Implementa Role Prompting, Few-Shot, Constraint Enforcement e Output Structuring. | |
| """ | |
| # Nota: Usiamo le doppie parentesi graffe {{ }} per includere JSON letterali nelle f-strings. | |
| prompt = f""" | |
| ### ROLE | |
| Sei l'Analizzatore Logico di un sistema RAG (Retrieval-Augmented Generation) avanzato. | |
| Il tuo unico compito è decostruire la domanda dell'utente per determinare la migliore strategia di recupero dati da un database a grafo Neo4j. | |
| ### DOMINIO CONTESTUALE | |
| I documenti analizzati possono appartenere a domini eterogenei: medico-sanitario, tecnico, giuridico, ecc. | |
| Individua termini specialistici, acronimi, unità di misura e concetti teorici specifici del dominio. | |
| ### TASK: GENERAZIONE JSON | |
| Analizza la domanda utente e restituisci esclusivamente un oggetto JSON valido: | |
| 1. `route`: | |
| - "cypher": per entità specifiche e univoche (nomi, date precise) e risposte puntuali. | |
| - "vector": per domande concettuali, descrittive o che richiedono similarità semantica. | |
| - "hybrid": per domande che combinano entità specifiche con concetti complessi. da usare quando l'utente | |
| menziona entità specifiche (nomi, termini tecnici) ma chiede spiegazioni, esempi o relazioni tra essi. | |
| 2. `entities`: Lista delle entità nominate (es. ["Mario Rossi", "BMI"]). | |
| 3. `keywords`: Lista di 3-5 keyword per la ricerca vettoriale (sostantivi normalizzati). | |
| ### ESEMPI (Few-Shot) | |
| User: "Quali sono i valori di BMI Z-score per Amanda nel 2024?" | |
| Output: {{ "route": "cypher", "entities": ["Amanda", "BMI Z-score", "2024"], "keywords": ["valori bmi z-score", "paziente amanda", "dati 2024"] }} | |
| User: "Spiegami come la dieta influisce sulla crescita dei bambini con CF." | |
| Output: {{ "route": "vector", "entities": ["CF"], "keywords": ["dieta fibrosi cistica", "crescita bambini", "nutrizione"] }} | |
| ### VINCOLI RIGIDI (PENALITÀ DI OUTPUT) | |
| - L’output deve essere SOLO JSON valido. | |
| - NON aggiungere introduzioni, commenti o spiegazioni. | |
| - Ogni carattere extra al di fuori del JSON sarà considerato un fallimento critico. | |
| ### ESEMPIO DI STRUTTURA ATTESA | |
| User: "Qual è il BMI di Mario Rossi nel 2023?" | |
| Output: {{ | |
| "route": "hybrid", | |
| "entities": ["Mario Rossi", "BMI", "2023"], | |
| "keywords": ["valutazione BMI", "Mario Rossi", "cartella clinica 2023"] | |
| }} | |
| ### DOMANDA DA ANALIZZARE | |
| "{state['query']}" | |
| """ | |
| response = mistral_client.chat.complete( | |
| model="labs-devstral-small-2512", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| # Estrazione e parsing del JSON dalla risposta del modello | |
| content = response.choices[0].message.content | |
| intent_json = extract_json(content) | |
| # Fondamentale: restituiamo il dizionario parsato per i nodi successivi | |
| return {"intent_data": intent_json} | |
| # Esegue la ricerca ibrida su neo4j basandosi sull'intent | |
| def node_retriever(state: AgentState): | |
| intent = state["intent_data"] | |
| target_file = state["filename"] | |
| db = GraphDB() | |
| collected_chunks = [] | |
| seen_ids = set() | |
| # Stampo l'intent per monitorare le decisioni del Router in tempo reale | |
| print(f"DEBUG - Intent ricevuto: {intent}") | |
| # 1. RICERCA PER ENTITÀ (STRATEGIA CYPHER) | |
| # Se il router ha scelto 'cypher' o 'hybrid', interrogo il grafo tramite le entità estratte | |
| if intent.get("route") in ["cypher", "hybrid"]: | |
| entities = intent.get("entities", []) | |
| for entity in entities: | |
| entity_name = entity["value"] if isinstance(entity, dict) else entity | |
| # Nota: Al momento entity_search esegue una ricerca globale. | |
| results = db.entity_search(entity_name) | |
| for res in results: | |
| # Aggiungo un controllo di sicurezza per assicurarmi di prendere solo i chunk del documento corrente | |
| if res["chunk_id"] not in seen_ids and res.get("filename") == target_file: | |
| content_text = res.get("node_content", "") | |
| collected_chunks.append(f"[Entity Match: {entity_name}] {content_text}") | |
| seen_ids.add(res["chunk_id"]) | |
| # 2. RICERCA VETTORIALE (STRATEGIA SEMANTICA) | |
| # Se il router ha scelto 'vector' o 'hybrid', utilizzo gli embeddings per la similarità | |
| if intent.get("route") in ["vector", "hybrid"]: | |
| keywords = intent.get("keywords", []) | |
| search_query = " ".join(keywords) if keywords else state["query"] | |
| # Uso l'istanza caricata all'avvio del server | |
| embedding = indexer_instance.generate_embeddings(search_query) | |
| # Ho deciso di passare 'target_file' come parametro 'filename' per attivare il filtro Cypher | |
| # interno alla query vettoriale e isolare il documento | |
| vector_results = db.query_vector_index( | |
| "chunk_embeddings_index", | |
| embedding, | |
| k=15, | |
| filename=target_file | |
| ) | |
| # Estraggo lo score del miglior risultato locale per decidere se attivare la ricerca globale | |
| max_local_score = vector_results[0]["score"] if vector_results else 0 | |
| print(f"DEBUG - Risultati vettoriali trovati per {target_file}: {len(vector_results)} (Max Score: {max_local_score})") | |
| for res in vector_results: | |
| if res["chunk_id"] not in seen_ids: | |
| #includo metadati nel testo del chunk per permettere al generatore di citare la fonte | |
| #mi interessa sapere, nella risposta finale, da che file è stata tratta l'informazione | |
| source_info = f"[Fonte: {res.get('filename')} | Sezione: {res.get('section', 'N/A')}]" | |
| content_text = res.get("node_content", "") | |
| collected_chunks.append(f"{source_info} [Vector Match] {content_text}") | |
| seen_ids.add(res["chunk_id"]) | |
| #attivo la GLOBAL VECTOR SEARCH se la pertinenza locale è bassa (< 0.7) | |
| if max_local_score < 0.7: | |
| print(f"DEBUG - Score locale basso ({max_local_score}), attivo Global Vector Search...") | |
| #eseguo semplicemente la query senza passare il filename per cercare in tutto il database | |
| global_results = db.query_vector_index( | |
| "chunk_embeddings_index", | |
| embedding, | |
| k=5, | |
| filename=None | |
| ) | |
| for res in global_results: | |
| #evito duplicati se per caso la ricerca globale ripesca chunk già visti nel locale | |
| if res["chunk_id"] not in seen_ids: | |
| source_info = f"[Fonte: {res.get('filename')} | Sezione: {res.get('section', 'N/A')}]" | |
| content_text = res.get("node_content", "") | |
| collected_chunks.append(f"{source_info} [Global Vector Match] {content_text}") | |
| seen_ids.add(res["chunk_id"]) | |
| # se i metodi precedenti non producono risultati, | |
| # forzo una ricerca vettoriale sull'intera query originale filtrata per il file corrente | |
| if not collected_chunks: | |
| print(f"DEBUG - Fallback: nessuna informazione con keyword in {target_file}, procedo con query completa.") | |
| embedding_fallback = indexer_instance.generate_embeddings(state["query"]) | |
| # Anche nel fallback, forzo il filtro sul filename per evitare contaminazioni | |
| fallback_results = db.query_vector_index( | |
| "chunk_embeddings_index", | |
| embedding_fallback, | |
| k=3, | |
| filename=target_file | |
| ) | |
| for res in fallback_results: | |
| if res["chunk_id"] not in seen_ids: | |
| source_info = f"[Fonte: {res.get('filename')} | Sezione: {res.get('section', 'N/A')}]" | |
| content_text = res.get("node_content", "") | |
| collected_chunks.append(f"{source_info} [Fallback Match] {content_text}") | |
| seen_ids.add(res["chunk_id"]) | |
| # gestisco esplicitamente il caso di assenza totale di dati per evitare errori nel Generator | |
| if not collected_chunks: | |
| collected_chunks = [f"Nessuna informazione specifica trovata nel database per il file {target_file}."] | |
| db.close() | |
| # stampo quanti chunk sto effettivamente restituendo allo stato | |
| print(f"DEBUG - RETRIEVER sta inviando allo stato {len(collected_chunks)} chunk") | |
| return {"context_chunks": collected_chunks} | |
| #Nodo reranker, ottiene i 15 chunks più pertinenti dal retriever e si occupa di prendere i 5 veramente più pertinenti rispetto alla domanda dell'utente | |
| def node_reranker(state: AgentState): | |
| query = state["query"] | |
| chunks = state.get("context_chunks", []) | |
| intent = state.get("intent_data", {}) | |
| #decido di eseguire il reranking sempre se abbiamo più di 5 chunk, | |
| #a prescindere dalla rotta, per garantire la qualità. | |
| if len(chunks) <= 5: | |
| return {"context_chunks": chunks} | |
| print(f"DEBUG Reranker: Analizzo {len(chunks)} chunk...") | |
| #eseguo il reranking tramite il modello BGE-Reranker-v2-m3 | |
| refined_chunks = reranker_model.rerank(query, chunks, top_n=5) | |
| print(f"DEBUG Reranker: Ho selezionato i {len(refined_chunks)} migliori.") | |
| return {"context_chunks": refined_chunks} | |
| #Nodo finale: uso Llama per la risposta | |
| def node_generator(state: AgentState): | |
| chunks = state.get('context_chunks', []) | |
| print(f"DEBUG - Numero di chunk passati al generatore: {len(chunks)}") | |
| context = "\n\n".join(chunks) | |
| if not context.strip(): | |
| print("DEBUG - ATTENZIONE: Il contesto finale per l'LLM è vuoto!!!") | |
| #ho deciso di determinare l'approccio in base ai tag presenti nei chunk reali | |
| has_vector = any("[Vector Match]" in c or "[Fallback Match]" in c for c in chunks) | |
| has_entity = any("[Entity Match]" in c for c in chunks) | |
| if has_vector and has_entity: | |
| approach = "Hybrid" | |
| elif has_vector: | |
| approach = "Vector Match" | |
| elif has_entity: | |
| approach = "Entity Match" | |
| else: | |
| approach = "Non applicabile" | |
| prompt = f""" | |
| ### ROLE | |
| Sei un assistente virtuale altamente specializzato nell’analisi di documenti medici e tecnico-scientifici. | |
| Il tuo compito è generare risposte **accurate, verificabili ed evidence-based**, evitando qualsiasi forma di inferenza non supportata dalle fonti fornite. | |
| --- | |
| ### CONTESTO FORNITO (FONTI VERIFICATE) | |
| Di seguito sono riportati uno o più frammenti di documenti (chunk) estratti dal database. | |
| Ogni frammento è preceduto dall’etichetta del metodo di recupero: | |
| - [Vector Match]: recupero per similarità semantica | |
| - [Entity Match]: recupero basato su entità esplicite | |
| Usa **solo ed esclusivamente** le informazioni contenute in questi frammenti. | |
| --------------------- | |
| {context} | |
| --------------------- | |
| ### ISTRUZIONI DI CITAZIONE OBBLIGATORIE | |
| 1. Se utilizzi informazioni provenienti da un file diverso da '{state['filename']}', | |
| devi indicare esplicitamente a fine risposta da quale file e sezione hai tratto l'integrazione. | |
| 2. Usa il formato: "Fonti esterne utilizzate: [Nome File] (Sezione)". | |
| --- | |
| ### ISTRUZIONI DI GENERAZIONE (OBBLIGATORIE) | |
| 1. **Aderenza totale alle fonti** | |
| - Rispondi alla domanda utilizzando **unicamente** le informazioni presenti nel contesto fornito. | |
| - Non introdurre conoscenze esterne, linee guida generali o interpretazioni personali. | |
| 2. **Gestione dell’informazione insufficiente** | |
| - Se il contesto non contiene dati sufficienti, completi o direttamente pertinenti per rispondere alla domanda, | |
| devi rispondere **esattamente** con la seguente frase (senza aggiunte): | |
| > "Mi dispiace, ma il documento fornito non contiene informazioni sufficienti per rispondere a questa domanda." | |
| 3. **Divieto assoluto di allucinazioni** | |
| - Non dedurre, stimare, generalizzare o “completare” informazioni mancanti. | |
| - Se un dato non è esplicitamente presente nei chunk, consideralo **inesistente**. | |
| 4. **Stile e chiarezza** | |
| - Usa un linguaggio: | |
| - tecnico ma chiaro | |
| - neutro e professionale | |
| - privo di opinioni o giudizi | |
| - Struttura la risposta in modo leggibile: | |
| - testo discorsivo breve | |
| - elenchi puntati solo se migliorano la chiarezza | |
| - Evita ridondanze e frasi speculative. | |
| --- | |
| ### STRUTTURA OBBLIGATORIA DELLA RISPOSTA | |
| - Inizia DIRETTAMENTE con la risposta. | |
| - Inserisci **esattamente** un separatore orizzontale: | |
| --- | |
| - Dopo il separatore, scrivi **su una nuova riga**: | |
| **Approccio di recupero:** {{approccio_utilizzato}} | |
| Dove `approccio_utilizzato` deve essere uno tra: | |
| - Vector Match | |
| - Entity Match | |
| - Hybrid (se entrambi sono stati utilizzati nel contesto) | |
| --- | |
| ### DOMANDA DELL’UTENTE | |
| "{state['query']}" | |
| """ | |
| completion = groq_client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[ | |
| {"role": "system", "content": "Sei un sintetizzatore di documenti PDF. Rispondi in lingua italiana. Se ti viene posta qualsiasi altra domanda o " | |
| "istruzione fuori dal tuo scopo di sintetizzatore di documenti PDF, rispondi che non puoi rispondere in quanto la domanda non è pertinente"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3 | |
| ) | |
| # Pulizia e aggiunta dinamica del footer se non generato correttamente | |
| answer = completion.choices[0].message.content | |
| if "Approccio di recupero:" not in answer: | |
| answer += f"\n\n---\n**Approccio di recupero:** {approach}" | |
| return {"final_answer": answer} |