| import os |
| from neo4j import GraphDatabase |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| class KnowledgeGraphPersister: |
| def __init__(self): |
| """ |
| Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza. |
| """ |
| uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") |
| user = os.getenv("NEO4J_USER", "neo4j") |
| password = os.getenv("NEO4J_PASSWORD", "activa_semantic_lab") |
| |
| try: |
| self.driver = GraphDatabase.driver(uri, auth=(user, password)) |
| self.driver.verify_connectivity() |
| print(f"✅ Connesso a Neo4j ({uri}) successfully.") |
| except Exception as e: |
| print(f"❌ Errore critico connessione Neo4j: {e}") |
| self.driver = None |
|
|
| def close(self): |
| if self.driver: |
| self.driver.close() |
|
|
| def sanitize_name(self, name): |
| """ |
| Normalizza i nomi per creare URI coerenti (Canonicalization base). |
| """ |
| if not name: return "Unknown" |
| |
| return name.strip().replace(" ", "_").replace("'", "").replace('"', "") |
|
|
| def save_triples(self, triples): |
| """ |
| Salva le triple in BATCH (ottimizzazione performance). |
| Usa UNWIND per processare liste di dati in un'unica transazione. |
| """ |
| if not self.driver: |
| print("⚠️ Driver non connesso. Impossibile salvare.") |
| return |
|
|
| if not triples: |
| return |
|
|
| print(f"💾 Salvataggio BATCH di {len(triples)} triple su Neo4j...") |
| |
| |
| batch_data = [] |
| for t in triples: |
| batch_data.append({ |
| "subj_uri": self.sanitize_name(t.subject), |
| "subj_label": t.subject, |
| "pred": t.predicate, |
| "obj_uri": self.sanitize_name(t.object), |
| "obj_label": t.object, |
| "conf": t.confidence, |
| "src": t.source |
| }) |
|
|
| |
| |
| |
| |
| |
| |
| |
| with self.driver.session() as session: |
| try: |
| session.execute_write(self._batch_write_tx, batch_data) |
| print("✅ Batch completato.") |
| except Exception as e: |
| print(f"⚠️ Errore durante il salvataggio batch: {e}") |
|
|
| @staticmethod |
| def _batch_write_tx(tx, batch_data): |
| """Funzione transazionale interna.""" |
| for item in batch_data: |
| |
| |
| |
| |
| |
| safe_pred = "".join(x for x in item['pred'] if x.isalnum() or x in "_:") |
| if not safe_pred: safe_pred = "RELATED_TO" |
|
|
| query = ( |
| f"MERGE (s:Resource {{uri: $subj_uri}}) " |
| f"ON CREATE SET s.label = $subj_label " |
| f"MERGE (o:Resource {{uri: $obj_uri}}) " |
| f"ON CREATE SET o.label = $obj_label " |
| f"MERGE (s)-[r:`{safe_pred}`]->(o) " |
| f"SET r.confidence = $conf, r.source = $src" |
| ) |
| |
| tx.run(query, item) |
|
|
| |
| if __name__ == "__main__": |
| |
| from collections import namedtuple |
| MockTriple = namedtuple("MockTriple", ["subject", "predicate", "object", "confidence", "source"]) |
|
|
| triples = [ |
| MockTriple("Batch Node 1", "TEST_BATCH", "Batch Node 2", 0.99, "test_doc_1"), |
| MockTriple("Batch Node 2", "IS_RELATED_TO", "Batch Node 3", 0.85, "test_doc_1") |
| ] |
|
|
| |
| persister = KnowledgeGraphPersister() |
| persister.save_triples(triples) |
| persister.close() |