File size: 4,550 Bytes
a968971
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
from neo4j import GraphDatabase
from dotenv import load_dotenv

# Carica variabili d'ambiente
load_dotenv()

class KnowledgeGraphPersister:
    def __init__(self):
        """
        Inizializza il driver Neo4j usando le variabili d'ambiente per sicurezza.
        """
        uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
        user = os.getenv("NEO4J_USER", "neo4j")
        password = os.getenv("NEO4J_PASSWORD", "activa_semantic_lab")
        
        try:
            self.driver = GraphDatabase.driver(uri, auth=(user, password))
            self.driver.verify_connectivity()
            print(f"✅ Connesso a Neo4j ({uri}) successfully.")
        except Exception as e:
            print(f"❌ Errore critico connessione Neo4j: {e}")
            self.driver = None

    def close(self):
        if self.driver:
            self.driver.close()

    def sanitize_name(self, name):
        """
        Normalizza i nomi per creare URI coerenti (Canonicalization base).
        """
        if not name: return "Unknown"
        # Rimuove caratteri speciali e spazi extra, mantiene coerenza maiuscole/minuscole
        return name.strip().replace(" ", "_").replace("'", "").replace('"', "")

    def save_triples(self, triples):
        """
        Salva le triple in BATCH (ottimizzazione performance).
        Usa UNWIND per processare liste di dati in un'unica transazione.
        """
        if not self.driver:
            print("⚠️ Driver non connesso. Impossibile salvare.")
            return

        if not triples:
            return

        print(f"💾 Salvataggio BATCH di {len(triples)} triple su Neo4j...")
        
        # 1. Prepariamo i dati come lista di dizionari (Payload leggero)
        batch_data = []
        for t in triples:
            batch_data.append({
                "subj_uri": self.sanitize_name(t.subject),
                "subj_label": t.subject,
                "pred": t.predicate, # Nota: Il predicato dinamico richiede attenzione in Cypher
                "obj_uri": self.sanitize_name(t.object),
                "obj_label": t.object,
                "conf": t.confidence,
                "src": t.source
            })

        # 2. Query Batch Ottimizzata
        # Nota: In Cypher non si può parametrizzare il TIPO di relazione (es. :RELAZIONE).
        # Per performance pura con relazioni dinamiche, usiamo APOC o un approccio ibrido.
        # Qui usiamo un approccio sicuro iterando nel driver ma con transazione unica,
        # oppure raggruppiamo per tipo di relazione. 
        
        # Approccio Migliore per MVP: Transazione singola
        with self.driver.session() as session:
            try:
                session.execute_write(self._batch_write_tx, batch_data)
                print("✅ Batch completato.")
            except Exception as e:
                print(f"⚠️ Errore durante il salvataggio batch: {e}")

    @staticmethod
    def _batch_write_tx(tx, batch_data):
        """Funzione transazionale interna."""
        for item in batch_data:
            # Usiamo MERGE per evitare duplicati
            # Usiamo apoc.create.relationship se disponibile per predicati dinamici,
            # altrimenti usiamo string formatting controllata (safe perché interna).
            
            # Sanitizzazione predicato per evitare injection (solo caratteri sicuri)
            safe_pred = "".join(x for x in item['pred'] if x.isalnum() or x in "_:")
            if not safe_pred: safe_pred = "RELATED_TO"

            query = (
                f"MERGE (s:Resource {{uri: $subj_uri}}) "
                f"ON CREATE SET s.label = $subj_label "
                f"MERGE (o:Resource {{uri: $obj_uri}}) "
                f"ON CREATE SET o.label = $obj_label "
                f"MERGE (s)-[r:`{safe_pred}`]->(o) "
                f"SET r.confidence = $conf, r.source = $src"
            )
            
            tx.run(query, item)

# --- TEST ISOLATO ---
if __name__ == "__main__":
    # Creiamo un mock per testare senza dipendenze esterne
    from collections import namedtuple
    MockTriple = namedtuple("MockTriple", ["subject", "predicate", "object", "confidence", "source"])

    triples = [
        MockTriple("Batch Node 1", "TEST_BATCH", "Batch Node 2", 0.99, "test_doc_1"),
        MockTriple("Batch Node 2", "IS_RELATED_TO", "Batch Node 3", 0.85, "test_doc_1")
    ]

    # Assicurati di avere le variabili d'ambiente o fallback attivi
    persister = KnowledgeGraphPersister()
    persister.save_triples(triples)
    persister.close()