import os from neo4j import GraphDatabase from dotenv import load_dotenv # Carica variabili d'ambiente load_dotenv() class KnowledgeGraphPersister: def __init__(self): """ Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza. """ uri = os.getenv("NEO4J_URI", "") user = os.getenv("NEO4J_USER", "neo4j") password = os.getenv("NEO4J_PASSWORD", "") try: self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.driver.verify_connectivity() print(f"✅ Connesso a Neo4j ({uri}).") # Creazione indici all'avvio (Fondamentale per la velocità dei MERGE) self._create_constraints() except Exception as e: print(f"❌ Errore critico connessione Neo4j: {e}") self.driver = None def close(self): if self.driver: self.driver.close() def _create_constraints(self): """ Crea un vincolo di unicità sulla proprietà URI. Senza questo, MERGE diventa lentissimo (Full Table Scan). """ if not self.driver: return query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE" with self.driver.session() as session: try: session.run(query) print("⚡ Vincoli/Indici Neo4j verificati.") except Exception as e: print(f"⚠️ Warning creazione indici: {e}") def sanitize_name(self, name): """ Canonicalization base. """ if not name: return "Unknown" # Rimuove spazi extra e normalizza. # Nota: In produzione usare slugify o urllib.parse.quote per URI robusti return name.strip().replace(" ", "_").replace("'", "").replace('"', "") def sanitize_predicate(self, pred): """ Pulisce il predicato per evitare Cypher Injection, dato che non può essere parametrizzato. """ # Accetta solo caratteri alfanumerici e underscore. Upper case per convenzione Neo4j. clean = "".join(x for x in pred if x.isalnum() or x == "_") return clean.upper() if clean else "RELATED_TO" def save_triples(self, triples): """ Salva le triple usando VERO Batching (UNWIND). Raggruppa le triple per predicato per aggirare il limite di parametrizzazione delle relazioni. """ if not self.driver or not triples: return print(f"💾 Preparazione Batch di {len(triples)} triple...") # 1. Raggruppamento per Predicato # Struttura: { "LOCATED_IN": [ {subj:..., obj:..., ...}, ... ], "HAS_TYPE": [...] } batched_by_pred = defaultdict(list) for t in triples: safe_pred = self.sanitize_predicate(t.predicate) item = { "subj_uri": self.sanitize_name(t.subject), "subj_label": t.subject, "obj_uri": self.sanitize_name(t.object), "obj_label": t.object, "conf": float(t.confidence), # Assicura float nativo "src": t.source or "unknown" } batched_by_pred[safe_pred].append(item) # 2. Esecuzione Transazioni (Una per tipo di relazione) with self.driver.session() as session: for pred, data_list in batched_by_pred.items(): try: session.execute_write(self._unwind_write_tx, pred, data_list) print(f" -> Inserite {len(data_list)} relazioni :{pred}") except Exception as e: print(f"⚠️ Errore batch per relazione :{pred} -> {e}") print("✅ Salvataggio completato.") @staticmethod def _unwind_write_tx(tx, predicate, batch_data): """ Usa UNWIND per inserire migliaia di righe in un colpo solo. Molto più performante su rete cloud. """ # La query è dinamica SOLO sul tipo di relazione (sanitizzato prima), # tutto il resto passa come parametro lista ($batch). query = ( f"UNWIND $batch AS row " f"MERGE (s:Resource {{uri: row.subj_uri}}) " f"ON CREATE SET s.label = row.subj_label " f"MERGE (o:Resource {{uri: row.obj_uri}}) " f"ON CREATE SET o.label = row.obj_label " f"MERGE (s)-[r:`{predicate}`]->(o) " f"SET r.confidence = row.conf, " f" r.source = row.src, " f" r.last_updated = datetime()" ) tx.run(query, batch=batch_data)