AutomatedSemanticDiscovery / src /graph /graph_loader.py
GaetanoParente's picture
Update src/graph/graph_loader.py
8c4201b verified
raw
history blame
4.7 kB
import os
from neo4j import GraphDatabase
from dotenv import load_dotenv
# Carica variabili d'ambiente
load_dotenv()
class KnowledgeGraphPersister:
def __init__(self):
"""
Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza.
"""
uri = os.getenv("NEO4J_URI", "")
user = os.getenv("NEO4J_USER", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "")
try:
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.driver.verify_connectivity()
print(f"✅ Connesso a Neo4j ({uri}).")
# Creazione indici all'avvio (Fondamentale per la velocità dei MERGE)
self._create_constraints()
except Exception as e:
print(f"❌ Errore critico connessione Neo4j: {e}")
self.driver = None
def close(self):
if self.driver:
self.driver.close()
def _create_constraints(self):
"""
Crea un vincolo di unicità sulla proprietà URI.
Senza questo, MERGE diventa lentissimo (Full Table Scan).
"""
if not self.driver: return
query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE"
with self.driver.session() as session:
try:
session.run(query)
print("⚡ Vincoli/Indici Neo4j verificati.")
except Exception as e:
print(f"⚠️ Warning creazione indici: {e}")
def sanitize_name(self, name):
"""
Canonicalization base.
"""
if not name: return "Unknown"
# Rimuove spazi extra e normalizza.
# Nota: In produzione usare slugify o urllib.parse.quote per URI robusti
return name.strip().replace(" ", "_").replace("'", "").replace('"', "")
def sanitize_predicate(self, pred):
"""
Pulisce il predicato per evitare Cypher Injection, dato che non può essere parametrizzato.
"""
# Accetta solo caratteri alfanumerici e underscore. Upper case per convenzione Neo4j.
clean = "".join(x for x in pred if x.isalnum() or x == "_")
return clean.upper() if clean else "RELATED_TO"
def save_triples(self, triples):
"""
Salva le triple usando VERO Batching (UNWIND).
Raggruppa le triple per predicato per aggirare il limite di parametrizzazione delle relazioni.
"""
if not self.driver or not triples:
return
print(f"💾 Preparazione Batch di {len(triples)} triple...")
# 1. Raggruppamento per Predicato
# Struttura: { "LOCATED_IN": [ {subj:..., obj:..., ...}, ... ], "HAS_TYPE": [...] }
batched_by_pred = defaultdict(list)
for t in triples:
safe_pred = self.sanitize_predicate(t.predicate)
item = {
"subj_uri": self.sanitize_name(t.subject),
"subj_label": t.subject,
"obj_uri": self.sanitize_name(t.object),
"obj_label": t.object,
"conf": float(t.confidence), # Assicura float nativo
"src": t.source or "unknown"
}
batched_by_pred[safe_pred].append(item)
# 2. Esecuzione Transazioni (Una per tipo di relazione)
with self.driver.session() as session:
for pred, data_list in batched_by_pred.items():
try:
session.execute_write(self._unwind_write_tx, pred, data_list)
print(f" -> Inserite {len(data_list)} relazioni :{pred}")
except Exception as e:
print(f"⚠️ Errore batch per relazione :{pred} -> {e}")
print("✅ Salvataggio completato.")
@staticmethod
def _unwind_write_tx(tx, predicate, batch_data):
"""
Usa UNWIND per inserire migliaia di righe in un colpo solo.
Molto più performante su rete cloud.
"""
# La query è dinamica SOLO sul tipo di relazione (sanitizzato prima),
# tutto il resto passa come parametro lista ($batch).
query = (
f"UNWIND $batch AS row "
f"MERGE (s:Resource {{uri: row.subj_uri}}) "
f"ON CREATE SET s.label = row.subj_label "
f"MERGE (o:Resource {{uri: row.obj_uri}}) "
f"ON CREATE SET o.label = row.obj_label "
f"MERGE (s)-[r:`{predicate}`]->(o) "
f"SET r.confidence = row.conf, "
f" r.source = row.src, "
f" r.last_updated = datetime()"
)
tx.run(query, batch=batch_data)