AutomatedSemanticDiscovery / src /graph /graph_loader.py
GaetanoParente's picture
Update src/graph/graph_loader.py
d2bdec9 verified
raw
history blame
5.03 kB
import os
from collections import defaultdict
from neo4j import GraphDatabase
from dotenv import load_dotenv
# Carica variabili d'ambiente
load_dotenv()
class KnowledgeGraphPersister:
def __init__(self):
"""
Inizializza il driver Neo4j e crea i vincoli necessari per le performance.
"""
uri = os.getenv("NEO4J_URI", "neo4j+s://748d6c94.databases.neo4j.io")
user = os.getenv("NEO4J_USER", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "t1bT1DiXwDOGMYfX89qR20loSN8FXurB3Dfg8bPQcTI")
try:
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.driver.verify_connectivity()
print(f"✅ Connesso a Neo4j ({uri}).")
# Creazione indici all'avvio (Fondamentale per la velocità dei MERGE)
self._create_constraints()
except Exception as e:
print(f"❌ Errore critico connessione Neo4j: {e}")
self.driver = None
def close(self):
if self.driver:
self.driver.close()
def _create_constraints(self):
"""
Crea un vincolo di unicità sulla proprietà URI.
Senza questo, MERGE diventa lentissimo (Full Table Scan).
"""
if not self.driver: return
query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE"
with self.driver.session() as session:
try:
session.run(query)
print("⚡ Vincoli/Indici Neo4j verificati.")
except Exception as e:
# Spesso fallisce se l'utente non ha permessi admin o se esiste già con nome diverso
print(f"⚠️ Warning creazione indici: {e}")
def sanitize_name(self, name):
"""
Canonicalization base.
"""
if not name: return "Unknown"
# Rimuove spazi extra e normalizza.
return name.strip().replace(" ", "_").replace("'", "").replace('"', "")
def sanitize_predicate(self, pred):
"""
Pulisce il predicato per evitare Cypher Injection.
FIX: Gestisce meglio i separatori (:, -, spazio) sostituendoli con underscore
per evitare predicati illeggibili come XCHEHASOBJECT.
Es. xche:has_object -> XCHE_HAS_OBJECT
"""
if not pred: return "RELATED_TO"
# 1. Normalizzazione preliminare dei separatori comuni
# Sostituisce i due punti dei namespace e trattini con underscore
pred = pred.replace(":", "_").replace("-", "_").replace(" ", "_")
# 2. Rimozione caratteri non sicuri (mantiene solo alfanumerici e underscore)
clean = "".join(x for x in pred if x.isalnum() or x == "_")
# 3. Conversione in uppercase (convenzione Neo4j per Relationships)
return clean.upper() if clean else "RELATED_TO"
def save_triples(self, triples):
"""
Salva le triple usando VERO Batching (UNWIND).
Raggruppa le triple per predicato per aggirare il limite di parametrizzazione delle relazioni.
"""
if not self.driver or not triples:
return
print(f"💾 Preparazione Batch di {len(triples)} triple...")
# 1. Raggruppamento per Predicato
batched_by_pred = defaultdict(list)
for t in triples:
safe_pred = self.sanitize_predicate(t.predicate)
item = {
"subj_uri": self.sanitize_name(t.subject),
"subj_label": t.subject,
"obj_uri": self.sanitize_name(t.object),
"obj_label": t.object,
"conf": float(t.confidence),
"src": t.source or "unknown"
}
batched_by_pred[safe_pred].append(item)
# 2. Esecuzione Transazioni (Una per tipo di relazione)
with self.driver.session() as session:
for pred, data_list in batched_by_pred.items():
try:
session.execute_write(self._unwind_write_tx, pred, data_list)
print(f" -> Inserite {len(data_list)} relazioni :{pred}")
except Exception as e:
print(f"⚠️ Errore batch per relazione :{pred} -> {e}")
print("✅ Salvataggio completato.")
@staticmethod
def _unwind_write_tx(tx, predicate, batch_data):
"""
Usa UNWIND per inserire migliaia di righe in un colpo solo.
"""
query = (
f"UNWIND $batch AS row "
f"MERGE (s:Resource {{uri: row.subj_uri}}) "
f"ON CREATE SET s.label = row.subj_label "
f"MERGE (o:Resource {{uri: row.obj_uri}}) "
f"ON CREATE SET o.label = row.obj_label "
f"MERGE (s)-[r:`{predicate}`]->(o) "
f"SET r.confidence = row.conf, "
f" r.source = row.src, "
f" r.last_updated = datetime()"
)
tx.run(query, batch=batch_data)