AutomatedSemanticDiscovery / src /graph /graph_loader.py
GaetanoParente's picture
Upload 16 files
a968971 verified
raw
history blame
4.55 kB
import os
from neo4j import GraphDatabase
from dotenv import load_dotenv
# Carica variabili d'ambiente
load_dotenv()
class KnowledgeGraphPersister:
def __init__(self):
"""
Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza.
"""
uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
user = os.getenv("NEO4J_USER", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "activa_semantic_lab")
try:
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.driver.verify_connectivity()
print(f"✅ Connesso a Neo4j ({uri}) successfully.")
except Exception as e:
print(f"❌ Errore critico connessione Neo4j: {e}")
self.driver = None
def close(self):
if self.driver:
self.driver.close()
def sanitize_name(self, name):
"""
Normalizza i nomi per creare URI coerenti (Canonicalization base).
"""
if not name: return "Unknown"
# Rimuove caratteri speciali e spazi extra, mantiene coerenza maiuscole/minuscole
return name.strip().replace(" ", "_").replace("'", "").replace('"', "")
def save_triples(self, triples):
"""
Salva le triple in BATCH (ottimizzazione performance).
Usa UNWIND per processare liste di dati in un'unica transazione.
"""
if not self.driver:
print("⚠️ Driver non connesso. Impossibile salvare.")
return
if not triples:
return
print(f"💾 Salvataggio BATCH di {len(triples)} triple su Neo4j...")
# 1. Prepariamo i dati come lista di dizionari (Payload leggero)
batch_data = []
for t in triples:
batch_data.append({
"subj_uri": self.sanitize_name(t.subject),
"subj_label": t.subject,
"pred": t.predicate, # Nota: Il predicato dinamico richiede attenzione in Cypher
"obj_uri": self.sanitize_name(t.object),
"obj_label": t.object,
"conf": t.confidence,
"src": t.source
})
# 2. Query Batch Ottimizzata
# Nota: In Cypher non si può parametrizzare il TIPO di relazione (es. :RELAZIONE).
# Per performance pura con relazioni dinamiche, usiamo APOC o un approccio ibrido.
# Qui usiamo un approccio sicuro iterando nel driver ma con transazione unica,
# oppure raggruppiamo per tipo di relazione.
# Approccio Migliore per MVP: Transazione singola
with self.driver.session() as session:
try:
session.execute_write(self._batch_write_tx, batch_data)
print("✅ Batch completato.")
except Exception as e:
print(f"⚠️ Errore durante il salvataggio batch: {e}")
@staticmethod
def _batch_write_tx(tx, batch_data):
"""Funzione transazionale interna."""
for item in batch_data:
# Usiamo MERGE per evitare duplicati
# Usiamo apoc.create.relationship se disponibile per predicati dinamici,
# altrimenti usiamo string formatting controllata (safe perché interna).
# Sanitizzazione predicato per evitare injection (solo caratteri sicuri)
safe_pred = "".join(x for x in item['pred'] if x.isalnum() or x in "_:")
if not safe_pred: safe_pred = "RELATED_TO"
query = (
f"MERGE (s:Resource {{uri: $subj_uri}}) "
f"ON CREATE SET s.label = $subj_label "
f"MERGE (o:Resource {{uri: $obj_uri}}) "
f"ON CREATE SET o.label = $obj_label "
f"MERGE (s)-[r:`{safe_pred}`]->(o) "
f"SET r.confidence = $conf, r.source = $src"
)
tx.run(query, item)
# --- TEST ISOLATO ---
if __name__ == "__main__":
# Creiamo un mock per testare senza dipendenze esterne
from collections import namedtuple
MockTriple = namedtuple("MockTriple", ["subject", "predicate", "object", "confidence", "source"])
triples = [
MockTriple("Batch Node 1", "TEST_BATCH", "Batch Node 2", 0.99, "test_doc_1"),
MockTriple("Batch Node 2", "IS_RELATED_TO", "Batch Node 3", 0.85, "test_doc_1")
]
# Assicurati di avere le variabili d'ambiente o fallback attivi
persister = KnowledgeGraphPersister()
persister.save_triples(triples)
persister.close()