#Questo file gestisce la connessione e tutte le query Cypher from neo4j import GraphDatabase, exceptions import logging import os from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) #Classe per la gestione della connessione e delle operazioni di base con Neo4j class GraphDB: def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, database: Optional[str] = None): #Carica credenziali dalle varibiali dambiente o usa i default del progetto self.uri = uri or os.getenv("NEO4J_URI") self.user = user or os.getenv("NEO4J_USERNAME") self.password = password or os.getenv("NEO4J_PASSWORD") self.database = database or os.getenv("NEO4J_DATABASE") # VALIDAZIONE: Forza la presenza di tutte le variabili if not all([self.uri, self.user, self.password, self.database]): missing_vars = [name for name, val in [ ("NEO4J_URI", self.uri), ("NEO4J_USERNAME", self.user), ("NEO4J_PASSWORD", self.password), ("NEO4J_DATABASE", self.database) ] if not val] # Rilancia un errore chiaro se le credenziali non sono definite nell'ambiente raise ValueError( f"Credenziali Neo4j mancanti. Assicurati che le seguenti variabili siano definite nel file .env e caricate correttamente: {', '.join(missing_vars)}" ) self.driver = None try: self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password)) self.driver.verify_connectivity() self.create_indexes_and_constraints() logger.info(f"Connessione a Neo4j (DB: {self.database}) stabilita con successo.") except Exception as e: logger.error(f"Errore durante la connessione a Neo4j su {self.uri}: {e}") raise #Chiude la connessione al driver Neo4j def close(self): if self.driver: self.driver.close() logger.info("Connessione a Neo4j chiusa.") #Crea indici e vincoli essenziali per le performance del RAG def create_indexes_and_constraints(self): index_queries = [ #Vincoli per l'unicità dei nodi principali (Documenti, Utenti) "CREATE CONSTRAINT IF NOT EXISTS FOR (d:Document) REQUIRE d.filename IS UNIQUE", "CREATE CONSTRAINT IF NOT EXISTS FOR (u:User) REQUIRE u.id IS UNIQUE", #Indice per la ricerca di Chunk tramite ID (utile per la citazione del chunk) "CREATE INDEX IF NOT EXISTS FOR (c:Chunk) ON (c.chunk_id)", ] with self.driver.session(database=self.database) as session: for query in index_queries: try: session.run(query) except exceptions.ClientError as e: #Logga l'errore, ma ignora quelli noti di indice/vincolo già esistente if "IndexAlreadyExists" not in e.message and "ConstraintAlreadyExists" not in e.message: logger.error(f"Errore nell'esecuzione della query indice '{query}': {e}") raise except Exception as e: logger.error(f"Errore inatteso nell'esecuzione della query indice '{query}': {e}") raise logger.info("Indici e vincoli Neo4j verificati/creati.") # --- Operazioni Crud per il RAG --- #Crea o Aggiorna un nodo Document def create_document_node(self, filename: str, title: str = None): query = """ MERGE (d:Document {filename: $filename}) ON CREATE SET d.created_at = datetime(), d.title = COALESCE($title, $filename) ON MATCH SET d.last_updated = datetime() RETURN d """ return self.run_query(query, {"filename": filename, "title": title}) #Crea o aggiorna un nodo User e registra l'attività def create_user_node(self, user_id: str): query = """ MERGE (u:User {id: $user_id}) ON CREATE SET u.created_at = datetime(), u.last_activity = datetime() ON MATCH SET u.last_activity = datetime() RETURN u """ return self.run_query(query, {"user_id": user_id}) #Crea una relazione ACCESSED tra User e Document def link_user_to_document(self, user_id: str, filename: str): query = """ MATCH (u:User {id: $user_id}) MATCH (d:Document {filename: $filename}) MERGE (u)-[r:ACCESSED]->(d) ON CREATE SET r.first_access = datetime(), r.last_access = datetime() ON MATCH SET r.last_access = datetime() RETURN u, d, r """ return self.run_query(query, {"user_id": user_id, "filename": filename}) #Aggiunge un nodo Chunk collegato al nodo Document def add_chunk_to_document(self, filename: str, chunk_id: str, content: str, embedding: List[float], metadata: Dict[str, Any]): query = """ MATCH (d:Document {filename: $filename}) MERGE (c:Chunk {chunk_id: $chunk_id}) SET c.content = $content, c.embedding = $embedding, c.section = $section, c.source = $filename, c.last_updated = datetime() MERGE (d)-[:HAS_CHUNK]->(c) RETURN c """ parameters = { "filename": filename, "chunk_id": chunk_id, "content": content, "embedding": embedding, "section": metadata.get("section", "unspecified") } return self.run_query(query, parameters) #Crea un indice vettoriale per la ricerca di similarità def create_vector_index(self, index_name: str, node_label: str, property_name: str, vector_dimensions: int): query = f""" CREATE VECTOR INDEX {index_name} IF NOT EXISTS FOR (n:{node_label}) ON (n.{property_name}) OPTIONS {{ indexConfig: {{ `vector.dimensions`: {vector_dimensions}, `vector.similarity_function`: 'cosine' }} }} """ try: self.run_query(query) logger.info(f"Indice vettoriale '{index_name}' creato con successo per {node_label}.") except Exception as e: logger.error(f"Errore nella creazione dell'indice vettoriale '{index_name}': {e}") raise #Esegue una ricerca vettoriale, opzionalemnte filtrata per documento def query_vector_index(self, index_name: str, query_embedding: List[float], k: int = 5, filename: Optional[str] = None) -> List[Dict[str, Any]]: # db.index.vector.queryNodes è la procedura Cypher per la ricerca vettoriale if filename: # Ricerca filtrata per documento specifico (più precisa) query = f""" CALL db.index.vector.queryNodes('{index_name}', $k, $query_embedding) YIELD node, score WITH node, score MATCH (d:Document {{filename: $filename}})-[:HAS_CHUNK]->(node) RETURN node.content AS node_content, score, node.chunk_id AS chunk_id, node.section AS section, d.filename AS filename """ parameters = {"query_embedding": query_embedding, "filename": filename, "k": k} else: # Ricerca globale su tutti i documenti (usato per cross-document search) query = f""" CALL db.index.vector.queryNodes('{index_name}', $k, $query_embedding) YIELD node, score RETURN node.content AS node_content, score, node.chunk_id AS chunk_id, node.section AS section, node.source AS filename """ parameters = {"query_embedding": query_embedding, "k": k} results = [] try: records = self.run_query(query, parameters) for record in records: results.append({ "node_content": record["node_content"], "score": record["score"], "chunk_id": record["chunk_id"], "section": record.get("section", "N/A"), # Usiamo .get per sicurezza "filename": record.get("filename", "Unknown"), }) logger.debug(f"Ricerca vettoriale ha trovato {len(results)} risultati.") return results except Exception as e: logger.error(f"Errore durante la query dell'indice vettoriale: {e}") return [] def add_entity_to_chunk(self, entity_name, entity_type, chunk_id): query = """ MERGE (e:Entity {name: $name, type: $type}) WITH e MATCH (c:Chunk {chunk_id: $chunk_id}) MERGE (c)-[:CONTAINS_ENTITY]->(e) """ params = {"name": entity_name, "type": entity_type, "chunk_id": chunk_id} self.run_query(query, params) #Esegue una ricerca esatta basata sui nodi Entity def entity_search(self, entity_name: str) -> List[Dict[str, Any]]: query = """ MATCH (e:Entity) WHERE toLower(e.name) = toLower($name) MATCH (e)<-[:CONTAINS_ENTITY]-(c:Chunk) RETURN c.content AS node_content, c.chunk_id AS chunk_id, 1.0 AS score, c.section AS section, c.source AS filename LIMIT 5 """ results = [] try: records = self.run_query(query, {"name": entity_name}) for record in records: results.append({ "node_content": record.get("node_content"), "chunk_id": record.get("chunk_id"), "score": record.get("score"), "section": record.get("section"), "filename": record.get("filename") }) return results except Exception as e: logger.error(f"Errore nella ricerca per entità '{entity_name}': {e}") return [] def run_query(self, query: str, parameters: Optional[Dict[str, Any]] = None): if not self.driver: raise RuntimeError("Driver Neo4j non inizializzato.") with self.driver.session(database=self.database) as session: result = session.run(query, parameters) return result.data()