github-actions[bot] commited on
Commit
f19b7a9
·
1 Parent(s): 0134a08

Auto-sync from demo at Mon Dec 22 09:30:22 UTC 2025

Browse files
graphgen/models/storage/graph/kuzu_storage.py CHANGED
@@ -1,6 +1,5 @@
1
  import json
2
  import os
3
- import shutil
4
  from dataclasses import dataclass
5
  from typing import Any
6
 
@@ -69,6 +68,16 @@ class KuzuStorage(BaseGraphStorage):
69
  def index_done_callback(self):
70
  """KuzuDB is ACID, changes are immediate, but we can verify generic persistence here."""
71
 
 
 
 
 
 
 
 
 
 
 
72
  def has_node(self, node_id: str) -> bool:
73
  result = self._conn.execute(
74
  "MATCH (a:Entity {id: $id}) RETURN count(a)", {"id": node_id}
@@ -111,10 +120,11 @@ class KuzuStorage(BaseGraphStorage):
111
  result = self._conn.execute(
112
  "MATCH (a:Entity {id: $id}) RETURN a.data", {"id": node_id}
113
  )
114
- if result.has_next():
115
- data_str = result.get_next()[0]
116
- return json.loads(data_str) if data_str else {}
117
- return None
 
118
 
119
  def update_node(self, node_id: str, node_data: dict[str, str]):
120
  current_data = self.get_node(node_id)
@@ -124,7 +134,11 @@ class KuzuStorage(BaseGraphStorage):
124
 
125
  # Merge existing data with new data
126
  current_data.update(node_data)
127
- json_data = json.dumps(current_data, ensure_ascii=False)
 
 
 
 
128
 
129
  self._conn.execute(
130
  "MATCH (a:Entity {id: $id}) SET a.data = $data",
@@ -137,7 +151,11 @@ class KuzuStorage(BaseGraphStorage):
137
  nodes = []
138
  while result.has_next():
139
  row = result.get_next()
140
- nodes.append((row[0], json.loads(row[1])))
 
 
 
 
141
  return nodes
142
 
143
  def get_edge(self, source_node_id: str, target_node_id: str):
@@ -149,10 +167,11 @@ class KuzuStorage(BaseGraphStorage):
149
  result = self._conn.execute(
150
  query, {"src": source_node_id, "dst": target_node_id}
151
  )
152
- if result.has_next():
153
- data_str = result.get_next()[0]
154
- return json.loads(data_str) if data_str else {}
155
- return None
 
156
 
157
  def update_edge(
158
  self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
@@ -163,14 +182,20 @@ class KuzuStorage(BaseGraphStorage):
163
  return
164
 
165
  current_data.update(edge_data)
166
- json_data = json.dumps(current_data, ensure_ascii=False)
 
 
 
 
 
 
167
 
168
- query = """
 
169
  MATCH (a:Entity {id: $src})-[e:Relation]->(b:Entity {id: $dst})
170
  SET e.data = $data
171
- """
172
- self._conn.execute(
173
- query, {"src": source_node_id, "dst": target_node_id, "data": json_data}
174
  )
175
 
176
  def get_all_edges(self) -> Any:
@@ -180,7 +205,11 @@ class KuzuStorage(BaseGraphStorage):
180
  edges = []
181
  while result.has_next():
182
  row = result.get_next()
183
- edges.append((row[0], row[1], json.loads(row[2])))
 
 
 
 
184
  return edges
185
 
186
  def get_node_edges(self, source_node_id: str) -> Any:
@@ -193,7 +222,11 @@ class KuzuStorage(BaseGraphStorage):
193
  edges = []
194
  while result.has_next():
195
  row = result.get_next()
196
- edges.append((row[0], row[1], json.loads(row[2])))
 
 
 
 
197
  return edges
198
 
199
  def upsert_node(self, node_id: str, node_data: dict[str, str]):
@@ -201,7 +234,11 @@ class KuzuStorage(BaseGraphStorage):
201
  Insert or Update node.
202
  Kuzu supports MERGE clause (similar to Neo4j) to handle upserts.
203
  """
204
- json_data = json.dumps(node_data, ensure_ascii=False)
 
 
 
 
205
  query = """
206
  MERGE (a:Entity {id: $id})
207
  ON MATCH SET a.data = $data
@@ -224,7 +261,13 @@ class KuzuStorage(BaseGraphStorage):
224
  if not self.has_node(target_node_id):
225
  self.upsert_node(target_node_id, {})
226
 
227
- json_data = json.dumps(edge_data, ensure_ascii=False)
 
 
 
 
 
 
228
  query = """
229
  MATCH (a:Entity {id: $src}), (b:Entity {id: $dst})
230
  MERGE (a)-[e:Relation]->(b)
@@ -248,9 +291,3 @@ class KuzuStorage(BaseGraphStorage):
248
 
249
  def reload(self):
250
  """For databases that need reloading, KuzuDB auto-manages this."""
251
-
252
- def drop(self):
253
- """Completely remove the database folder."""
254
- if self.db_path and os.path.exists(self.db_path):
255
- shutil.rmtree(self.db_path)
256
- print(f"Dropped KuzuDB at {self.db_path}")
 
1
  import json
2
  import os
 
3
  from dataclasses import dataclass
4
  from typing import Any
5
 
 
68
  def index_done_callback(self):
69
  """KuzuDB is ACID, changes are immediate, but we can verify generic persistence here."""
70
 
71
+ @staticmethod
72
+ def _safe_json_loads(data_str: str) -> dict:
73
+ if not isinstance(data_str, str) or not data_str.strip():
74
+ return {}
75
+ try:
76
+ return json.loads(data_str)
77
+ except json.JSONDecodeError as e:
78
+ print(f"Error decoding JSON: {e}")
79
+ return {}
80
+
81
  def has_node(self, node_id: str) -> bool:
82
  result = self._conn.execute(
83
  "MATCH (a:Entity {id: $id}) RETURN count(a)", {"id": node_id}
 
120
  result = self._conn.execute(
121
  "MATCH (a:Entity {id: $id}) RETURN a.data", {"id": node_id}
122
  )
123
+ if not result.has_next():
124
+ return None
125
+
126
+ data_str = result.get_next()[0]
127
+ return self._safe_json_loads(data_str)
128
 
129
  def update_node(self, node_id: str, node_data: dict[str, str]):
130
  current_data = self.get_node(node_id)
 
134
 
135
  # Merge existing data with new data
136
  current_data.update(node_data)
137
+ try:
138
+ json_data = json.dumps(current_data, ensure_ascii=False)
139
+ except (TypeError, ValueError) as e:
140
+ print(f"Error serializing JSON for node {node_id}: {e}")
141
+ return
142
 
143
  self._conn.execute(
144
  "MATCH (a:Entity {id: $id}) SET a.data = $data",
 
151
  nodes = []
152
  while result.has_next():
153
  row = result.get_next()
154
+ if row is None or len(row) < 2:
155
+ continue
156
+ node_id, data_str = row[0], row[1]
157
+ data = self._safe_json_loads(data_str)
158
+ nodes.append((node_id, data))
159
  return nodes
160
 
161
  def get_edge(self, source_node_id: str, target_node_id: str):
 
167
  result = self._conn.execute(
168
  query, {"src": source_node_id, "dst": target_node_id}
169
  )
170
+ if not result.has_next():
171
+ return None
172
+
173
+ data_str = result.get_next()[0]
174
+ return self._safe_json_loads(data_str)
175
 
176
  def update_edge(
177
  self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
 
182
  return
183
 
184
  current_data.update(edge_data)
185
+ try:
186
+ json_data = json.dumps(current_data, ensure_ascii=False)
187
+ except (TypeError, ValueError) as e:
188
+ print(
189
+ f"Error serializing JSON for edge {source_node_id}->{target_node_id}: {e}"
190
+ )
191
+ return
192
 
193
+ self._conn.execute(
194
+ """
195
  MATCH (a:Entity {id: $src})-[e:Relation]->(b:Entity {id: $dst})
196
  SET e.data = $data
197
+ """,
198
+ {"src": source_node_id, "dst": target_node_id, "data": json_data},
 
199
  )
200
 
201
  def get_all_edges(self) -> Any:
 
205
  edges = []
206
  while result.has_next():
207
  row = result.get_next()
208
+ if row is None or len(row) < 3:
209
+ continue
210
+ src, dst, data_str = row[0], row[1], row[2]
211
+ data = self._safe_json_loads(data_str)
212
+ edges.append((src, dst, data))
213
  return edges
214
 
215
  def get_node_edges(self, source_node_id: str) -> Any:
 
222
  edges = []
223
  while result.has_next():
224
  row = result.get_next()
225
+ if row is None or len(row) < 3:
226
+ continue
227
+ src, dst, data_str = row[0], row[1], row[2]
228
+ data = self._safe_json_loads(data_str)
229
+ edges.append((src, dst, data))
230
  return edges
231
 
232
  def upsert_node(self, node_id: str, node_data: dict[str, str]):
 
234
  Insert or Update node.
235
  Kuzu supports MERGE clause (similar to Neo4j) to handle upserts.
236
  """
237
+ try:
238
+ json_data = json.dumps(node_data, ensure_ascii=False)
239
+ except (TypeError, ValueError) as e:
240
+ print(f"Error serializing JSON for node {node_id}: {e}")
241
+ return
242
  query = """
243
  MERGE (a:Entity {id: $id})
244
  ON MATCH SET a.data = $data
 
261
  if not self.has_node(target_node_id):
262
  self.upsert_node(target_node_id, {})
263
 
264
+ try:
265
+ json_data = json.dumps(edge_data, ensure_ascii=False)
266
+ except (TypeError, ValueError) as e:
267
+ print(
268
+ f"Error serializing JSON for edge {source_node_id}->{target_node_id}: {e}"
269
+ )
270
+ return
271
  query = """
272
  MATCH (a:Entity {id: $src}), (b:Entity {id: $dst})
273
  MERGE (a)-[e:Relation]->(b)
 
291
 
292
  def reload(self):
293
  """For databases that need reloading, KuzuDB auto-manages this."""