import os from collections import defaultdict from neo4j import GraphDatabase from dotenv import load_dotenv # Carica variabili d'ambiente load_dotenv() class KnowledgeGraphPersister: def __init__(self): """ Inizializza il driver Neo4j e crea i vincoli necessari per le performance. """ uri = os.getenv("NEO4J_URI", "neo4j+s://748d6c94.databases.neo4j.io") user = os.getenv("NEO4J_USER", "neo4j") password = os.getenv("NEO4J_PASSWORD", "t1bT1DiXwDOGMYfX89qR20loSN8FXurB3Dfg8bPQcTI") try: self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.driver.verify_connectivity() print(f"✅ Connesso a Neo4j ({uri}).") # Creazione indici all'avvio (Fondamentale per la velocità dei MERGE) self._create_constraints() except Exception as e: print(f"❌ Errore critico connessione Neo4j: {e}") self.driver = None def close(self): if self.driver: self.driver.close() def _create_constraints(self): """ Crea un vincolo di unicità sulla proprietà URI. Senza questo, MERGE diventa lentissimo (Full Table Scan). """ if not self.driver: return query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE" with self.driver.session() as session: try: session.run(query) print("⚡ Vincoli/Indici Neo4j verificati.") except Exception as e: # Spesso fallisce se l'utente non ha permessi admin o se esiste già con nome diverso print(f"⚠️ Warning creazione indici: {e}") def sanitize_name(self, name): """ Canonicalization base. """ if not name: return "Unknown" # Rimuove spazi extra e normalizza. return name.strip().replace(" ", "_").replace("'", "").replace('"', "") def sanitize_predicate(self, pred): """ Pulisce il predicato per evitare Cypher Injection. FIX: Gestisce meglio i separatori (:, -, spazio) sostituendoli con underscore per evitare predicati illeggibili come XCHEHASOBJECT. Es. xche:has_object -> XCHE_HAS_OBJECT """ if not pred: return "RELATED_TO" # 1. Normalizzazione preliminare dei separatori comuni # Sostituisce i due punti dei namespace e trattini con underscore pred = pred.replace(":", "_").replace("-", "_").replace(" ", "_") # 2. Rimozione caratteri non sicuri (mantiene solo alfanumerici e underscore) clean = "".join(x for x in pred if x.isalnum() or x == "_") # 3. Conversione in uppercase (convenzione Neo4j per Relationships) return clean.upper() if clean else "RELATED_TO" def save_triples(self, triples): """ Salva le triple usando VERO Batching (UNWIND). Raggruppa le triple per predicato per aggirare il limite di parametrizzazione delle relazioni. """ if not self.driver or not triples: return print(f"💾 Preparazione Batch di {len(triples)} triple...") # 1. Raggruppamento per Predicato batched_by_pred = defaultdict(list) for t in triples: safe_pred = self.sanitize_predicate(t.predicate) item = { "subj_uri": self.sanitize_name(t.subject), "subj_label": t.subject, "obj_uri": self.sanitize_name(t.object), "obj_label": t.object, "conf": float(t.confidence), "src": t.source or "unknown" } batched_by_pred[safe_pred].append(item) # 2. Esecuzione Transazioni (Una per tipo di relazione) with self.driver.session() as session: for pred, data_list in batched_by_pred.items(): try: session.execute_write(self._unwind_write_tx, pred, data_list) print(f" -> Inserite {len(data_list)} relazioni :{pred}") except Exception as e: print(f"⚠️ Errore batch per relazione :{pred} -> {e}") print("✅ Salvataggio completato.") @staticmethod def _unwind_write_tx(tx, predicate, batch_data): """ Usa UNWIND per inserire migliaia di righe in un colpo solo. """ query = ( f"UNWIND $batch AS row " f"MERGE (s:Resource {{uri: row.subj_uri}}) " f"ON CREATE SET s.label = row.subj_label " f"MERGE (o:Resource {{uri: row.obj_uri}}) " f"ON CREATE SET o.label = row.obj_label " f"MERGE (s)-[r:`{predicate}`]->(o) " f"SET r.confidence = row.conf, " f" r.source = row.src, " f" r.last_updated = datetime()" ) tx.run(query, batch=batch_data)