File size: 10,409 Bytes
aabd32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#Questo file gestisce la connessione e tutte le query Cypher

from neo4j import GraphDatabase, exceptions
import logging
import os
from typing import List, Dict, Any, Optional

logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO)

#Classe per la gestione della connessione e delle operazioni di base con Neo4j
class GraphDB:
    def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, database: Optional[str] = None):

        #Carica credenziali dalle varibiali dambiente o usa i default del progetto
        self.uri = uri or os.getenv("NEO4J_URI")
        self.user = user or os.getenv("NEO4J_USERNAME")
        self.password = password or os.getenv("NEO4J_PASSWORD")
        self.database = database or os.getenv("NEO4J_DATABASE")

        # VALIDAZIONE: Forza la presenza di tutte le variabili
        if not all([self.uri, self.user, self.password, self.database]):
            missing_vars = [name for name, val in [
                ("NEO4J_URI", self.uri), 
                ("NEO4J_USERNAME", self.user), 
                ("NEO4J_PASSWORD", self.password), 
                ("NEO4J_DATABASE", self.database)
            ] if not val]
            
            # Rilancia un errore chiaro se le credenziali non sono definite nell'ambiente
            raise ValueError(
                f"Credenziali Neo4j mancanti. Assicurati che le seguenti variabili siano definite nel file .env e caricate correttamente: {', '.join(missing_vars)}"
            )
        
        self.driver = None

        try:
            self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
            self.driver.verify_connectivity()

            self.create_indexes_and_constraints()
            logger.info(f"Connessione a Neo4j (DB: {self.database}) stabilita con successo.")
        except Exception as e:
            logger.error(f"Errore durante la connessione a Neo4j su {self.uri}: {e}")
            raise
    
    #Chiude la connessione al driver Neo4j
    def close(self):
        if self.driver:
            self.driver.close()
            logger.info("Connessione a Neo4j chiusa.")
    
    #Crea indici e vincoli essenziali per le performance del RAG
    def create_indexes_and_constraints(self):
        index_queries = [
            #Vincoli per l'unicità dei nodi principali (Documenti, Utenti)
            "CREATE CONSTRAINT IF NOT EXISTS FOR (d:Document) REQUIRE d.filename IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (u:User) REQUIRE u.id IS UNIQUE",
            #Indice per la ricerca di Chunk tramite ID (utile per la citazione del chunk)
            "CREATE INDEX IF NOT EXISTS FOR (c:Chunk) ON (c.chunk_id)",
        ]

        with self.driver.session(database=self.database) as session:
            for query in index_queries:
                try:
                    session.run(query)
                except exceptions.ClientError as e:
                    #Logga l'errore, ma ignora quelli noti di indice/vincolo già esistente
                    if "IndexAlreadyExists" not in e.message and "ConstraintAlreadyExists" not in e.message:
                        logger.error(f"Errore nell'esecuzione della query indice '{query}': {e}")
                        raise
                except Exception as e:
                    logger.error(f"Errore inatteso nell'esecuzione della query indice '{query}': {e}")
                    raise
        
        logger.info("Indici e vincoli Neo4j verificati/creati.")

    
    # --- Operazioni Crud per il RAG ---
    #Crea o Aggiorna un nodo Document
    def create_document_node(self, filename: str, title: str = None):
        query = """
        MERGE (d:Document {filename: $filename})
        ON CREATE SET
            d.created_at = datetime(),
            d.title = COALESCE($title, $filename)
        ON MATCH SET d.last_updated = datetime()
        RETURN d
        """
        return self.run_query(query, {"filename": filename, "title": title})
    
    #Crea o aggiorna un nodo User e registra l'attività
    def create_user_node(self, user_id: str):
        query = """
        MERGE (u:User {id: $user_id})
        ON CREATE SET u.created_at = datetime(), u.last_activity = datetime()
        ON MATCH SET u.last_activity = datetime()
        RETURN u
        """
        return self.run_query(query, {"user_id": user_id})
    
    #Crea una relazione ACCESSED tra User e Document
    def link_user_to_document(self, user_id: str, filename: str):
        query = """
        MATCH (u:User {id: $user_id})
        MATCH (d:Document {filename: $filename})
        MERGE (u)-[r:ACCESSED]->(d)
        ON CREATE SET r.first_access = datetime(), r.last_access = datetime()
        ON MATCH SET r.last_access = datetime()
        RETURN u, d, r
        """
        return self.run_query(query, {"user_id": user_id, "filename": filename})
    
    #Aggiunge un nodo Chunk collegato al nodo Document
    def add_chunk_to_document(self, filename: str, chunk_id: str, content: str, embedding: List[float], metadata: Dict[str, Any]):
        query = """
        MATCH (d:Document {filename: $filename})
        MERGE (c:Chunk {chunk_id: $chunk_id})
        SET c.content = $content,
            c.embedding = $embedding,
            c.section = $section,
            c.source = $filename,
            c.last_updated = datetime()
        MERGE (d)-[:HAS_CHUNK]->(c)
        RETURN c
        """
        parameters = {
            "filename": filename,
            "chunk_id": chunk_id,
            "content": content,
            "embedding": embedding,
            "section": metadata.get("section", "unspecified")
        }
        return self.run_query(query, parameters)
    
    #Crea un indice vettoriale per la ricerca di similarità
    def create_vector_index(self, index_name: str, node_label: str, property_name: str, vector_dimensions: int):        
        query = f"""
        CREATE VECTOR INDEX {index_name} IF NOT EXISTS 
        FOR (n:{node_label})
        ON (n.{property_name})
        OPTIONS {{
            indexConfig: {{
                `vector.dimensions`: {vector_dimensions},
                `vector.similarity_function`: 'cosine'
            }}
        }}
        """
        try:
            self.run_query(query)
            logger.info(f"Indice vettoriale '{index_name}' creato con successo per {node_label}.")
        except Exception as e:
            logger.error(f"Errore nella creazione dell'indice vettoriale '{index_name}': {e}")
            raise

        #Esegue una ricerca vettoriale, opzionalemnte filtrata per documento
    def query_vector_index(self, index_name: str, query_embedding: List[float], k: int = 5, filename: Optional[str] = None) -> List[Dict[str, Any]]:

        # db.index.vector.queryNodes è la procedura Cypher per la ricerca vettoriale
        if filename:
            # Ricerca filtrata per documento specifico (più precisa)
            query = f"""
            CALL db.index.vector.queryNodes('{index_name}', $k, $query_embedding)
            YIELD node, score
            WITH node, score
            MATCH (d:Document {{filename: $filename}})-[:HAS_CHUNK]->(node)
            RETURN node.content AS node_content, score, node.chunk_id AS chunk_id, node.section AS section, d.filename AS filename
            """
            parameters = {"query_embedding": query_embedding, "filename": filename, "k": k}
        else:
            # Ricerca globale su tutti i documenti (usato per cross-document search)
            query = f"""
            CALL db.index.vector.queryNodes('{index_name}', $k, $query_embedding)
            YIELD node, score
            RETURN node.content AS node_content, score, node.chunk_id AS chunk_id, node.section AS section, node.source AS filename
            """
            parameters = {"query_embedding": query_embedding, "k": k}
        
        results = []
        try:
            records = self.run_query(query, parameters)
            for record in records:
                results.append({
                    "node_content": record["node_content"],
                    "score": record["score"],
                    "chunk_id": record["chunk_id"],
                    "section": record.get("section", "N/A"), # Usiamo .get per sicurezza
                    "filename": record.get("filename", "Unknown"),
                })
            logger.debug(f"Ricerca vettoriale ha trovato {len(results)} risultati.")
            return results
        except Exception as e:
            logger.error(f"Errore durante la query dell'indice vettoriale: {e}")
            return []
    
    def add_entity_to_chunk(self, entity_name, entity_type, chunk_id):
        query = """
        MERGE (e:Entity {name: $name, type: $type})
        WITH e
        MATCH (c:Chunk {chunk_id: $chunk_id})
        MERGE (c)-[:CONTAINS_ENTITY]->(e)
        """
        params = {"name": entity_name, "type": entity_type, "chunk_id": chunk_id}
        self.run_query(query, params)

    #Esegue una ricerca esatta basata sui nodi Entity
    def entity_search(self, entity_name: str) -> List[Dict[str, Any]]:
        query = """
        MATCH (e:Entity)
        WHERE toLower(e.name) = toLower($name)
        MATCH (e)<-[:CONTAINS_ENTITY]-(c:Chunk)
        RETURN c.content AS node_content, c.chunk_id AS chunk_id, 1.0 AS score, c.section AS section, c.source AS filename
        LIMIT 5
        """
        results = []
        try:
            records = self.run_query(query, {"name": entity_name})
            for record in records:
                results.append({
                    "node_content": record.get("node_content"),
                    "chunk_id": record.get("chunk_id"),
                    "score": record.get("score"),
                    "section": record.get("section"),
                    "filename": record.get("filename")
                })
            return results
        except Exception as e:
            logger.error(f"Errore nella ricerca per entità '{entity_name}': {e}")
            return []
    
    def run_query(self, query: str, parameters: Optional[Dict[str, Any]] = None):
        if not self.driver:
            raise RuntimeError("Driver Neo4j non inizializzato.")

        with self.driver.session(database=self.database) as session:
            result = session.run(query, parameters)
            return result.data()