#Questo file contiene la logica per i due modelli import os import json import re import logging from groq import Groq from mistralai import Mistral from agentLogic.state import AgentState from db.graph_db import GraphDB from processingPdf.reranker import Reranker from processingPdf.indexer import Indexer logger = logging.getLogger(__name__) groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) # Inizializziamo i modelli pesanti fuori dai nodi per caricarli una sola volta all'avvio # Fondamentale per le performance di FastAPI indexer_instance = Indexer() reranker_model = Reranker() #Estrae e pulisce il JSON dall'output dell'LLM def extract_json(text): try: #Cerca il blocco tra parentesi graffe match = re.search(r'\{.*\}', text, re.DOTALL) if match: return json.loads(match.group(0)) return json.loads(text) except Exception as e: logger.error(f"Errore nel parsare il JSON: {e}") return {"route": "vector", "entities": [], "keywords": []} #Nodo rewriter: Pulisce la query, corregge errori e agisce da Guardrail. precedentemente aveva anche una funzione di # ampliamento contestuale ma ho deciso di eliminare l'espansione semantica forzata per evitare di compromettere il contesto del RAG come successo in fase di testing def node_rewriter(state: AgentState): user_query = state["query"] prompt = f""" ### ROLE Sei un correttore ortografico e/o grammaticale e sintattico. Il tuo unico output deve essere la query corretta. ### TASK 1. **Correzione**: Correggi eventuali errori di ortografia, sintassi, grammaticali o di battitura. 2. **Minimalismo**: NON AGGIUNGERE contesto, sinonimi o interpretazioni. Se la domanda è chiara (es. "cos'è la persona"), lasciala IDENTICA. ### ESEMPI Input: "cos'è la 'persona'?" -> Output: cos'è la 'persona'? Input: "spiegami il prompt pstern" -> Output: spiegami il prompt pattern Input: "Quali sono i sui sinotmi?" -> Output: Quali sono i suoi sintomi? USER QUERY: "{user_query}" OUTPUT: """ completion = groq_client.chat.completions.create( model="llama-3.1-8b-instant", messages=[ {"role": "system", "content": "Sei un correttore di testo puro. Non salutare. Non spiegare. Restituisci SOLO il risultato."}, {"role": "user", "content": prompt} ], temperature=0.0 ) rewritten_query = completion.choices[0].message.content.strip() #pulizia ulteriore rewritten_query = rewritten_query.replace('Output:', '').replace('"', '').strip() print(f"DEBUG - Query Rewriting: '{user_query}' -> '{rewritten_query}'") return {"query": rewritten_query} #Nodo 1: utilizzo mistral per decidere la strategia di ricerca" def node_router(state: AgentState): """ Nodo 1: Utilizza Mistral per decidere la strategia di ricerca. Implementa Role Prompting, Few-Shot, Constraint Enforcement e Output Structuring. """ # Nota: Usiamo le doppie parentesi graffe {{ }} per includere JSON letterali nelle f-strings. prompt = f""" ### ROLE Sei l'Analizzatore Logico di un sistema RAG (Retrieval-Augmented Generation) avanzato. Il tuo unico compito è decostruire la domanda dell'utente per determinare la migliore strategia di recupero dati da un database a grafo Neo4j. ### DOMINIO CONTESTUALE I documenti analizzati possono appartenere a domini eterogenei: medico-sanitario, tecnico, giuridico, ecc. Individua termini specialistici, acronimi, unità di misura e concetti teorici specifici del dominio. ### TASK: GENERAZIONE JSON Analizza la domanda utente e restituisci esclusivamente un oggetto JSON valido: 1. `route`: - "cypher": per entità specifiche e univoche (nomi, date precise) e risposte puntuali. - "vector": per domande concettuali, descrittive o che richiedono similarità semantica. - "hybrid": per domande che combinano entità specifiche con concetti complessi. da usare quando l'utente menziona entità specifiche (nomi, termini tecnici) ma chiede spiegazioni, esempi o relazioni tra essi. 2. `entities`: Lista delle entità nominate (es. ["Mario Rossi", "BMI"]). 3. `keywords`: Lista di 3-5 keyword per la ricerca vettoriale (sostantivi normalizzati). ### ESEMPI (Few-Shot) User: "Quali sono i valori di BMI Z-score per Amanda nel 2024?" Output: {{ "route": "cypher", "entities": ["Amanda", "BMI Z-score", "2024"], "keywords": ["valori bmi z-score", "paziente amanda", "dati 2024"] }} User: "Spiegami come la dieta influisce sulla crescita dei bambini con CF." Output: {{ "route": "vector", "entities": ["CF"], "keywords": ["dieta fibrosi cistica", "crescita bambini", "nutrizione"] }} ### VINCOLI RIGIDI (PENALITÀ DI OUTPUT) - L’output deve essere SOLO JSON valido. - NON aggiungere introduzioni, commenti o spiegazioni. - Ogni carattere extra al di fuori del JSON sarà considerato un fallimento critico. ### ESEMPIO DI STRUTTURA ATTESA User: "Qual è il BMI di Mario Rossi nel 2023?" Output: {{ "route": "hybrid", "entities": ["Mario Rossi", "BMI", "2023"], "keywords": ["valutazione BMI", "Mario Rossi", "cartella clinica 2023"] }} ### DOMANDA DA ANALIZZARE "{state['query']}" """ response = mistral_client.chat.complete( model="labs-devstral-small-2512", messages=[{"role": "user", "content": prompt}] ) # Estrazione e parsing del JSON dalla risposta del modello content = response.choices[0].message.content intent_json = extract_json(content) # Fondamentale: restituiamo il dizionario parsato per i nodi successivi return {"intent_data": intent_json} # Esegue la ricerca ibrida su neo4j basandosi sull'intent def node_retriever(state: AgentState): intent = state["intent_data"] target_file = state["filename"] db = GraphDB() collected_chunks = [] seen_ids = set() # Stampo l'intent per monitorare le decisioni del Router in tempo reale print(f"DEBUG - Intent ricevuto: {intent}") # 1. RICERCA PER ENTITÀ (STRATEGIA CYPHER) # Se il router ha scelto 'cypher' o 'hybrid', interrogo il grafo tramite le entità estratte if intent.get("route") in ["cypher", "hybrid"]: entities = intent.get("entities", []) for entity in entities: entity_name = entity["value"] if isinstance(entity, dict) else entity # Nota: Al momento entity_search esegue una ricerca globale. results = db.entity_search(entity_name) for res in results: # Aggiungo un controllo di sicurezza per assicurarmi di prendere solo i chunk del documento corrente if res["chunk_id"] not in seen_ids and res.get("filename") == target_file: content_text = res.get("node_content", "") collected_chunks.append(f"[Entity Match: {entity_name}] {content_text}") seen_ids.add(res["chunk_id"]) # 2. RICERCA VETTORIALE (STRATEGIA SEMANTICA) # Se il router ha scelto 'vector' o 'hybrid', utilizzo gli embeddings per la similarità if intent.get("route") in ["vector", "hybrid"]: keywords = intent.get("keywords", []) search_query = " ".join(keywords) if keywords else state["query"] # Uso l'istanza caricata all'avvio del server embedding = indexer_instance.generate_embeddings(search_query) # Ho deciso di passare 'target_file' come parametro 'filename' per attivare il filtro Cypher # interno alla query vettoriale e isolare il documento vector_results = db.query_vector_index( "chunk_embeddings_index", embedding, k=15, filename=target_file ) # Estraggo lo score del miglior risultato locale per decidere se attivare la ricerca globale max_local_score = vector_results[0]["score"] if vector_results else 0 print(f"DEBUG - Risultati vettoriali trovati per {target_file}: {len(vector_results)} (Max Score: {max_local_score})") for res in vector_results: if res["chunk_id"] not in seen_ids: #includo metadati nel testo del chunk per permettere al generatore di citare la fonte #mi interessa sapere, nella risposta finale, da che file è stata tratta l'informazione source_info = f"[Fonte: {res.get('filename')} | Sezione: {res.get('section', 'N/A')}]" content_text = res.get("node_content", "") collected_chunks.append(f"{source_info} [Vector Match] {content_text}") seen_ids.add(res["chunk_id"]) #attivo la GLOBAL VECTOR SEARCH se la pertinenza locale è bassa (< 0.7) if max_local_score < 0.7: print(f"DEBUG - Score locale basso ({max_local_score}), attivo Global Vector Search...") #eseguo semplicemente la query senza passare il filename per cercare in tutto il database global_results = db.query_vector_index( "chunk_embeddings_index", embedding, k=5, filename=None ) for res in global_results: #evito duplicati se per caso la ricerca globale ripesca chunk già visti nel locale if res["chunk_id"] not in seen_ids: source_info = f"[Fonte: {res.get('filename')} | Sezione: {res.get('section', 'N/A')}]" content_text = res.get("node_content", "") collected_chunks.append(f"{source_info} [Global Vector Match] {content_text}") seen_ids.add(res["chunk_id"]) # se i metodi precedenti non producono risultati, # forzo una ricerca vettoriale sull'intera query originale filtrata per il file corrente if not collected_chunks: print(f"DEBUG - Fallback: nessuna informazione con keyword in {target_file}, procedo con query completa.") embedding_fallback = indexer_instance.generate_embeddings(state["query"]) # Anche nel fallback, forzo il filtro sul filename per evitare contaminazioni fallback_results = db.query_vector_index( "chunk_embeddings_index", embedding_fallback, k=3, filename=target_file ) for res in fallback_results: if res["chunk_id"] not in seen_ids: source_info = f"[Fonte: {res.get('filename')} | Sezione: {res.get('section', 'N/A')}]" content_text = res.get("node_content", "") collected_chunks.append(f"{source_info} [Fallback Match] {content_text}") seen_ids.add(res["chunk_id"]) # gestisco esplicitamente il caso di assenza totale di dati per evitare errori nel Generator if not collected_chunks: collected_chunks = [f"Nessuna informazione specifica trovata nel database per il file {target_file}."] db.close() # stampo quanti chunk sto effettivamente restituendo allo stato print(f"DEBUG - RETRIEVER sta inviando allo stato {len(collected_chunks)} chunk") return {"context_chunks": collected_chunks} #Nodo reranker, ottiene i 15 chunks più pertinenti dal retriever e si occupa di prendere i 5 veramente più pertinenti rispetto alla domanda dell'utente def node_reranker(state: AgentState): query = state["query"] chunks = state.get("context_chunks", []) intent = state.get("intent_data", {}) #decido di eseguire il reranking sempre se abbiamo più di 5 chunk, #a prescindere dalla rotta, per garantire la qualità. if len(chunks) <= 5: return {"context_chunks": chunks} print(f"DEBUG Reranker: Analizzo {len(chunks)} chunk...") #eseguo il reranking tramite il modello BGE-Reranker-v2-m3 refined_chunks = reranker_model.rerank(query, chunks, top_n=5) print(f"DEBUG Reranker: Ho selezionato i {len(refined_chunks)} migliori.") return {"context_chunks": refined_chunks} #Nodo finale: uso Llama per la risposta def node_generator(state: AgentState): chunks = state.get('context_chunks', []) print(f"DEBUG - Numero di chunk passati al generatore: {len(chunks)}") context = "\n\n".join(chunks) if not context.strip(): print("DEBUG - ATTENZIONE: Il contesto finale per l'LLM è vuoto!!!") #ho deciso di determinare l'approccio in base ai tag presenti nei chunk reali has_vector = any("[Vector Match]" in c or "[Fallback Match]" in c for c in chunks) has_entity = any("[Entity Match]" in c for c in chunks) if has_vector and has_entity: approach = "Hybrid" elif has_vector: approach = "Vector Match" elif has_entity: approach = "Entity Match" else: approach = "Non applicabile" prompt = f""" ### ROLE Sei un assistente virtuale altamente specializzato nell’analisi di documenti medici e tecnico-scientifici. Il tuo compito è generare risposte **accurate, verificabili ed evidence-based**, evitando qualsiasi forma di inferenza non supportata dalle fonti fornite. --- ### CONTESTO FORNITO (FONTI VERIFICATE) Di seguito sono riportati uno o più frammenti di documenti (chunk) estratti dal database. Ogni frammento è preceduto dall’etichetta del metodo di recupero: - [Vector Match]: recupero per similarità semantica - [Entity Match]: recupero basato su entità esplicite Usa **solo ed esclusivamente** le informazioni contenute in questi frammenti. --------------------- {context} --------------------- ### ISTRUZIONI DI CITAZIONE OBBLIGATORIE 1. Se utilizzi informazioni provenienti da un file diverso da '{state['filename']}', devi indicare esplicitamente a fine risposta da quale file e sezione hai tratto l'integrazione. 2. Usa il formato: "Fonti esterne utilizzate: [Nome File] (Sezione)". --- ### ISTRUZIONI DI GENERAZIONE (OBBLIGATORIE) 1. **Aderenza totale alle fonti** - Rispondi alla domanda utilizzando **unicamente** le informazioni presenti nel contesto fornito. - Non introdurre conoscenze esterne, linee guida generali o interpretazioni personali. 2. **Gestione dell’informazione insufficiente** - Se il contesto non contiene dati sufficienti, completi o direttamente pertinenti per rispondere alla domanda, devi rispondere **esattamente** con la seguente frase (senza aggiunte): > "Mi dispiace, ma il documento fornito non contiene informazioni sufficienti per rispondere a questa domanda." 3. **Divieto assoluto di allucinazioni** - Non dedurre, stimare, generalizzare o “completare” informazioni mancanti. - Se un dato non è esplicitamente presente nei chunk, consideralo **inesistente**. 4. **Stile e chiarezza** - Usa un linguaggio: - tecnico ma chiaro - neutro e professionale - privo di opinioni o giudizi - Struttura la risposta in modo leggibile: - testo discorsivo breve - elenchi puntati solo se migliorano la chiarezza - Evita ridondanze e frasi speculative. --- ### STRUTTURA OBBLIGATORIA DELLA RISPOSTA - Inizia DIRETTAMENTE con la risposta. - Inserisci **esattamente** un separatore orizzontale: --- - Dopo il separatore, scrivi **su una nuova riga**: **Approccio di recupero:** {{approccio_utilizzato}} Dove `approccio_utilizzato` deve essere uno tra: - Vector Match - Entity Match - Hybrid (se entrambi sono stati utilizzati nel contesto) --- ### DOMANDA DELL’UTENTE "{state['query']}" """ completion = groq_client.chat.completions.create( model="llama-3.1-8b-instant", messages=[ {"role": "system", "content": "Sei un sintetizzatore di documenti PDF. Rispondi in lingua italiana. Se ti viene posta qualsiasi altra domanda o " "istruzione fuori dal tuo scopo di sintetizzatore di documenti PDF, rispondi che non puoi rispondere in quanto la domanda non è pertinente"}, {"role": "user", "content": prompt} ], temperature=0.3 ) # Pulizia e aggiunta dinamica del footer se non generato correttamente answer = completion.choices[0].message.content if "Approccio di recupero:" not in answer: answer += f"\n\n---\n**Approccio di recupero:** {approach}" return {"final_answer": answer}