| import os |
| from neo4j import GraphDatabase |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| class KnowledgeGraphPersister: |
| def __init__(self): |
| """ |
| Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza. |
| """ |
| uri = os.getenv("NEO4J_URI", "") |
| user = os.getenv("NEO4J_USER", "neo4j") |
| password = os.getenv("NEO4J_PASSWORD", "") |
| |
| try: |
| self.driver = GraphDatabase.driver(uri, auth=(user, password)) |
| self.driver.verify_connectivity() |
| print(f"✅ Connesso a Neo4j ({uri}).") |
| |
| |
| self._create_constraints() |
| |
| except Exception as e: |
| print(f"❌ Errore critico connessione Neo4j: {e}") |
| self.driver = None |
|
|
| def close(self): |
| if self.driver: |
| self.driver.close() |
|
|
| def _create_constraints(self): |
| """ |
| Crea un vincolo di unicità sulla proprietà URI. |
| Senza questo, MERGE diventa lentissimo (Full Table Scan). |
| """ |
| if not self.driver: return |
| query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE" |
| with self.driver.session() as session: |
| try: |
| session.run(query) |
| print("⚡ Vincoli/Indici Neo4j verificati.") |
| except Exception as e: |
| print(f"⚠️ Warning creazione indici: {e}") |
|
|
| def sanitize_name(self, name): |
| """ |
| Canonicalization base. |
| """ |
| if not name: return "Unknown" |
| |
| |
| return name.strip().replace(" ", "_").replace("'", "").replace('"', "") |
|
|
| def sanitize_predicate(self, pred): |
| """ |
| Pulisce il predicato per evitare Cypher Injection, dato che non può essere parametrizzato. |
| """ |
| |
| clean = "".join(x for x in pred if x.isalnum() or x == "_") |
| return clean.upper() if clean else "RELATED_TO" |
|
|
| def save_triples(self, triples): |
| """ |
| Salva le triple usando VERO Batching (UNWIND). |
| Raggruppa le triple per predicato per aggirare il limite di parametrizzazione delle relazioni. |
| """ |
| if not self.driver or not triples: |
| return |
|
|
| print(f"💾 Preparazione Batch di {len(triples)} triple...") |
|
|
| |
| |
| batched_by_pred = defaultdict(list) |
| |
| for t in triples: |
| safe_pred = self.sanitize_predicate(t.predicate) |
| |
| item = { |
| "subj_uri": self.sanitize_name(t.subject), |
| "subj_label": t.subject, |
| "obj_uri": self.sanitize_name(t.object), |
| "obj_label": t.object, |
| "conf": float(t.confidence), |
| "src": t.source or "unknown" |
| } |
| batched_by_pred[safe_pred].append(item) |
|
|
| |
| with self.driver.session() as session: |
| for pred, data_list in batched_by_pred.items(): |
| try: |
| session.execute_write(self._unwind_write_tx, pred, data_list) |
| print(f" -> Inserite {len(data_list)} relazioni :{pred}") |
| except Exception as e: |
| print(f"⚠️ Errore batch per relazione :{pred} -> {e}") |
|
|
| print("✅ Salvataggio completato.") |
|
|
| @staticmethod |
| def _unwind_write_tx(tx, predicate, batch_data): |
| """ |
| Usa UNWIND per inserire migliaia di righe in un colpo solo. |
| Molto più performante su rete cloud. |
| """ |
| |
| |
| query = ( |
| f"UNWIND $batch AS row " |
| f"MERGE (s:Resource {{uri: row.subj_uri}}) " |
| f"ON CREATE SET s.label = row.subj_label " |
| f"MERGE (o:Resource {{uri: row.obj_uri}}) " |
| f"ON CREATE SET o.label = row.obj_label " |
| f"MERGE (s)-[r:`{predicate}`]->(o) " |
| f"SET r.confidence = row.conf, " |
| f" r.source = row.src, " |
| f" r.last_updated = datetime()" |
| ) |
| |
| tx.run(query, batch=batch_data) |