import os from neo4j import GraphDatabase from dotenv import load_dotenv # Carica variabili d'ambiente load_dotenv() class KnowledgeGraphPersister: def __init__(self): """ Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza. """ uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") user = os.getenv("NEO4J_USER", "neo4j") password = os.getenv("NEO4J_PASSWORD", "activa_semantic_lab") try: self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.driver.verify_connectivity() print(f"✅ Connesso a Neo4j ({uri}) successfully.") except Exception as e: print(f"❌ Errore critico connessione Neo4j: {e}") self.driver = None def close(self): if self.driver: self.driver.close() def sanitize_name(self, name): """ Normalizza i nomi per creare URI coerenti (Canonicalization base). """ if not name: return "Unknown" # Rimuove caratteri speciali e spazi extra, mantiene coerenza maiuscole/minuscole return name.strip().replace(" ", "_").replace("'", "").replace('"', "") def save_triples(self, triples): """ Salva le triple in BATCH (ottimizzazione performance). Usa UNWIND per processare liste di dati in un'unica transazione. """ if not self.driver: print("⚠️ Driver non connesso. Impossibile salvare.") return if not triples: return print(f"💾 Salvataggio BATCH di {len(triples)} triple su Neo4j...") # 1. Prepariamo i dati come lista di dizionari (Payload leggero) batch_data = [] for t in triples: batch_data.append({ "subj_uri": self.sanitize_name(t.subject), "subj_label": t.subject, "pred": t.predicate, # Nota: Il predicato dinamico richiede attenzione in Cypher "obj_uri": self.sanitize_name(t.object), "obj_label": t.object, "conf": t.confidence, "src": t.source }) # 2. Query Batch Ottimizzata # Nota: In Cypher non si può parametrizzare il TIPO di relazione (es. :RELAZIONE). # Per performance pura con relazioni dinamiche, usiamo APOC o un approccio ibrido. # Qui usiamo un approccio sicuro iterando nel driver ma con transazione unica, # oppure raggruppiamo per tipo di relazione. # Approccio Migliore per MVP: Transazione singola with self.driver.session() as session: try: session.execute_write(self._batch_write_tx, batch_data) print("✅ Batch completato.") except Exception as e: print(f"⚠️ Errore durante il salvataggio batch: {e}") @staticmethod def _batch_write_tx(tx, batch_data): """Funzione transazionale interna.""" for item in batch_data: # Usiamo MERGE per evitare duplicati # Usiamo apoc.create.relationship se disponibile per predicati dinamici, # altrimenti usiamo string formatting controllata (safe perché interna). # Sanitizzazione predicato per evitare injection (solo caratteri sicuri) safe_pred = "".join(x for x in item['pred'] if x.isalnum() or x in "_:") if not safe_pred: safe_pred = "RELATED_TO" query = ( f"MERGE (s:Resource {{uri: $subj_uri}}) " f"ON CREATE SET s.label = $subj_label " f"MERGE (o:Resource {{uri: $obj_uri}}) " f"ON CREATE SET o.label = $obj_label " f"MERGE (s)-[r:`{safe_pred}`]->(o) " f"SET r.confidence = $conf, r.source = $src" ) tx.run(query, item) # --- TEST ISOLATO --- if __name__ == "__main__": # Creiamo un mock per testare senza dipendenze esterne from collections import namedtuple MockTriple = namedtuple("MockTriple", ["subject", "predicate", "object", "confidence", "source"]) triples = [ MockTriple("Batch Node 1", "TEST_BATCH", "Batch Node 2", 0.99, "test_doc_1"), MockTriple("Batch Node 2", "IS_RELATED_TO", "Batch Node 3", 0.85, "test_doc_1") ] # Assicurati di avere le variabili d'ambiente o fallback attivi persister = KnowledgeGraphPersister() persister.save_triples(triples) persister.close()