666ghj commited on
Commit
01f5cb9
·
1 Parent(s): 13b9600

feat(graph): implement pagination for fetching nodes and edges; add utility functions for streamlined data retrieval

Browse files
backend/app/services/graph_builder.py CHANGED
@@ -15,6 +15,7 @@ from zep_cloud import EpisodeData, EntityEdgeSourceTarget
15
 
16
  from ..config import Config
17
  from ..models.task import TaskManager, TaskStatus
 
18
  from .text_processor import TextProcessor
19
 
20
 
@@ -395,12 +396,12 @@ class GraphBuilderService:
395
 
396
  def _get_graph_info(self, graph_id: str) -> GraphInfo:
397
  """获取图谱信息"""
398
- # 获取节点
399
- nodes = self.client.graph.node.get_by_graph_id(graph_id=graph_id)
400
-
401
- # 获取边
402
- edges = self.client.graph.edge.get_by_graph_id(graph_id=graph_id)
403
-
404
  # 统计实体类型
405
  entity_types = set()
406
  for node in nodes:
@@ -408,7 +409,7 @@ class GraphBuilderService:
408
  for label in node.labels:
409
  if label not in ["Entity", "Node"]:
410
  entity_types.add(label)
411
-
412
  return GraphInfo(
413
  graph_id=graph_id,
414
  node_count=len(nodes),
@@ -426,9 +427,9 @@ class GraphBuilderService:
426
  Returns:
427
  包含nodes和edges的字典,包括时间信息、属性等详细数据
428
  """
429
- nodes = self.client.graph.node.get_by_graph_id(graph_id=graph_id)
430
- edges = self.client.graph.edge.get_by_graph_id(graph_id=graph_id)
431
-
432
  # 创建节点映射用于获取节点名称
433
  node_map = {}
434
  for node in nodes:
 
15
 
16
  from ..config import Config
17
  from ..models.task import TaskManager, TaskStatus
18
+ from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
19
  from .text_processor import TextProcessor
20
 
21
 
 
396
 
397
  def _get_graph_info(self, graph_id: str) -> GraphInfo:
398
  """获取图谱信息"""
399
+ # 获取节点(分页)
400
+ nodes = fetch_all_nodes(self.client, graph_id)
401
+
402
+ # 获取边(分页)
403
+ edges = fetch_all_edges(self.client, graph_id)
404
+
405
  # 统计实体类型
406
  entity_types = set()
407
  for node in nodes:
 
409
  for label in node.labels:
410
  if label not in ["Entity", "Node"]:
411
  entity_types.add(label)
412
+
413
  return GraphInfo(
414
  graph_id=graph_id,
415
  node_count=len(nodes),
 
427
  Returns:
428
  包含nodes和edges的字典,包括时间信息、属性等详细数据
429
  """
430
+ nodes = fetch_all_nodes(self.client, graph_id)
431
+ edges = fetch_all_edges(self.client, graph_id)
432
+
433
  # 创建节点映射用于获取节点名称
434
  node_map = {}
435
  for node in nodes:
backend/app/services/zep_entity_reader.py CHANGED
@@ -11,6 +11,7 @@ from zep_cloud.client import Zep
11
 
12
  from ..config import Config
13
  from ..utils.logger import get_logger
 
14
 
15
  logger = get_logger('mirofish.zep_entity_reader')
16
 
@@ -125,22 +126,18 @@ class ZepEntityReader:
125
 
126
  def get_all_nodes(self, graph_id: str) -> List[Dict[str, Any]]:
127
  """
128
- 获取图谱的所有节点(带重试机制
129
-
130
  Args:
131
  graph_id: 图谱ID
132
-
133
  Returns:
134
  节点列表
135
  """
136
  logger.info(f"获取图谱 {graph_id} 的所有节点...")
137
-
138
- # 使用重试机制调用Zep API
139
- nodes = self._call_with_retry(
140
- func=lambda: self.client.graph.node.get_by_graph_id(graph_id=graph_id),
141
- operation_name=f"获取节点(graph={graph_id})"
142
- )
143
-
144
  nodes_data = []
145
  for node in nodes:
146
  nodes_data.append({
@@ -150,28 +147,24 @@ class ZepEntityReader:
150
  "summary": node.summary or "",
151
  "attributes": node.attributes or {},
152
  })
153
-
154
  logger.info(f"共获取 {len(nodes_data)} 个节点")
155
  return nodes_data
156
-
157
  def get_all_edges(self, graph_id: str) -> List[Dict[str, Any]]:
158
  """
159
- 获取图谱的所有边(带重试机制
160
-
161
  Args:
162
  graph_id: 图谱ID
163
-
164
  Returns:
165
  边列表
166
  """
167
  logger.info(f"获取图谱 {graph_id} 的所有边...")
168
-
169
- # 使用重试机制调用Zep API
170
- edges = self._call_with_retry(
171
- func=lambda: self.client.graph.edge.get_by_graph_id(graph_id=graph_id),
172
- operation_name=f"获取边(graph={graph_id})"
173
- )
174
-
175
  edges_data = []
176
  for edge in edges:
177
  edges_data.append({
@@ -182,7 +175,7 @@ class ZepEntityReader:
182
  "target_node_uuid": edge.target_node_uuid,
183
  "attributes": edge.attributes or {},
184
  })
185
-
186
  logger.info(f"共获取 {len(edges_data)} 条边")
187
  return edges_data
188
 
 
11
 
12
  from ..config import Config
13
  from ..utils.logger import get_logger
14
+ from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
15
 
16
  logger = get_logger('mirofish.zep_entity_reader')
17
 
 
126
 
127
  def get_all_nodes(self, graph_id: str) -> List[Dict[str, Any]]:
128
  """
129
+ 获取图谱的所有节点(分页获取
130
+
131
  Args:
132
  graph_id: 图谱ID
133
+
134
  Returns:
135
  节点列表
136
  """
137
  logger.info(f"获取图谱 {graph_id} 的所有节点...")
138
+
139
+ nodes = fetch_all_nodes(self.client, graph_id)
140
+
 
 
 
 
141
  nodes_data = []
142
  for node in nodes:
143
  nodes_data.append({
 
147
  "summary": node.summary or "",
148
  "attributes": node.attributes or {},
149
  })
150
+
151
  logger.info(f"共获取 {len(nodes_data)} 个节点")
152
  return nodes_data
153
+
154
  def get_all_edges(self, graph_id: str) -> List[Dict[str, Any]]:
155
  """
156
+ 获取图谱的所有边(分页获取
157
+
158
  Args:
159
  graph_id: 图谱ID
160
+
161
  Returns:
162
  边列表
163
  """
164
  logger.info(f"获取图谱 {graph_id} 的所有边...")
165
+
166
+ edges = fetch_all_edges(self.client, graph_id)
167
+
 
 
 
 
168
  edges_data = []
169
  for edge in edges:
170
  edges_data.append({
 
175
  "target_node_uuid": edge.target_node_uuid,
176
  "attributes": edge.attributes or {},
177
  })
178
+
179
  logger.info(f"共获取 {len(edges_data)} 条边")
180
  return edges_data
181
 
backend/app/services/zep_tools.py CHANGED
@@ -18,6 +18,7 @@ from zep_cloud.client import Zep
18
  from ..config import Config
19
  from ..utils.logger import get_logger
20
  from ..utils.llm_client import LLMClient
 
21
 
22
  logger = get_logger('mirofish.zep_tools')
23
 
@@ -648,71 +649,67 @@ class ZepToolsService:
648
 
649
  def get_all_nodes(self, graph_id: str) -> List[NodeInfo]:
650
  """
651
- 获取图谱的所有节点
652
-
653
  Args:
654
  graph_id: 图谱ID
655
-
656
  Returns:
657
  节点列表
658
  """
659
  logger.info(f"获取图谱 {graph_id} 的所有节点...")
660
-
661
- nodes = self._call_with_retry(
662
- func=lambda: self.client.graph.node.get_by_graph_id(graph_id=graph_id),
663
- operation_name=f"获取节点(graph={graph_id})"
664
- )
665
-
666
  result = []
667
  for node in nodes:
 
668
  result.append(NodeInfo(
669
- uuid=getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''),
670
  name=node.name or "",
671
  labels=node.labels or [],
672
  summary=node.summary or "",
673
  attributes=node.attributes or {}
674
  ))
675
-
676
  logger.info(f"获取到 {len(result)} 个节点")
677
  return result
678
-
679
  def get_all_edges(self, graph_id: str, include_temporal: bool = True) -> List[EdgeInfo]:
680
  """
681
- 获取图谱的所有边(包含时间信息)
682
-
683
  Args:
684
  graph_id: 图谱ID
685
  include_temporal: 是否包含时间信息(默认True)
686
-
687
  Returns:
688
  边列表(包含created_at, valid_at, invalid_at, expired_at)
689
  """
690
  logger.info(f"获取图谱 {graph_id} 的所有边...")
691
-
692
- edges = self._call_with_retry(
693
- func=lambda: self.client.graph.edge.get_by_graph_id(graph_id=graph_id),
694
- operation_name=f"获取边(graph={graph_id})"
695
- )
696
-
697
  result = []
698
  for edge in edges:
 
699
  edge_info = EdgeInfo(
700
- uuid=getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', ''),
701
  name=edge.name or "",
702
  fact=edge.fact or "",
703
  source_node_uuid=edge.source_node_uuid or "",
704
  target_node_uuid=edge.target_node_uuid or ""
705
  )
706
-
707
  # 添加时间信息
708
  if include_temporal:
709
  edge_info.created_at = getattr(edge, 'created_at', None)
710
  edge_info.valid_at = getattr(edge, 'valid_at', None)
711
  edge_info.invalid_at = getattr(edge, 'invalid_at', None)
712
  edge_info.expired_at = getattr(edge, 'expired_at', None)
713
-
714
  result.append(edge_info)
715
-
716
  logger.info(f"获取到 {len(result)} 条边")
717
  return result
718
 
 
18
  from ..config import Config
19
  from ..utils.logger import get_logger
20
  from ..utils.llm_client import LLMClient
21
+ from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
22
 
23
  logger = get_logger('mirofish.zep_tools')
24
 
 
649
 
650
  def get_all_nodes(self, graph_id: str) -> List[NodeInfo]:
651
  """
652
+ 获取图谱的所有节点(分页获取)
653
+
654
  Args:
655
  graph_id: 图谱ID
656
+
657
  Returns:
658
  节点列表
659
  """
660
  logger.info(f"获取图谱 {graph_id} 的所有节点...")
661
+
662
+ nodes = fetch_all_nodes(self.client, graph_id)
663
+
 
 
 
664
  result = []
665
  for node in nodes:
666
+ node_uuid = getattr(node, 'uuid_', None) or getattr(node, 'uuid', None) or ""
667
  result.append(NodeInfo(
668
+ uuid=str(node_uuid) if node_uuid else "",
669
  name=node.name or "",
670
  labels=node.labels or [],
671
  summary=node.summary or "",
672
  attributes=node.attributes or {}
673
  ))
674
+
675
  logger.info(f"获取到 {len(result)} 个节点")
676
  return result
677
+
678
  def get_all_edges(self, graph_id: str, include_temporal: bool = True) -> List[EdgeInfo]:
679
  """
680
+ 获取图谱的所有边(分页获取,包含时间信息)
681
+
682
  Args:
683
  graph_id: 图谱ID
684
  include_temporal: 是否包含时间信息(默认True)
685
+
686
  Returns:
687
  边列表(包含created_at, valid_at, invalid_at, expired_at)
688
  """
689
  logger.info(f"获取图谱 {graph_id} 的所有边...")
690
+
691
+ edges = fetch_all_edges(self.client, graph_id)
692
+
 
 
 
693
  result = []
694
  for edge in edges:
695
+ edge_uuid = getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', None) or ""
696
  edge_info = EdgeInfo(
697
+ uuid=str(edge_uuid) if edge_uuid else "",
698
  name=edge.name or "",
699
  fact=edge.fact or "",
700
  source_node_uuid=edge.source_node_uuid or "",
701
  target_node_uuid=edge.target_node_uuid or ""
702
  )
703
+
704
  # 添加时间信息
705
  if include_temporal:
706
  edge_info.created_at = getattr(edge, 'created_at', None)
707
  edge_info.valid_at = getattr(edge, 'valid_at', None)
708
  edge_info.invalid_at = getattr(edge, 'invalid_at', None)
709
  edge_info.expired_at = getattr(edge, 'expired_at', None)
710
+
711
  result.append(edge_info)
712
+
713
  logger.info(f"获取到 {len(result)} 条边")
714
  return result
715
 
backend/app/utils/zep_paging.py ADDED
@@ -0,0 +1,143 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Zep Graph 分页读取工具。
2
+
3
+ Zep 的 node/edge 列表接口使用 UUID cursor 分页,
4
+ 本模块封装自动翻页逻辑(含单页重试),对调用方透明地返回完整列表。
5
+ """
6
+
7
+ from __future__ import annotations
8
+
9
+ import time
10
+ from collections.abc import Callable
11
+ from typing import Any
12
+
13
+ from zep_cloud import InternalServerError
14
+ from zep_cloud.client import Zep
15
+
16
+ from .logger import get_logger
17
+
18
+ logger = get_logger('mirofish.zep_paging')
19
+
20
+ _DEFAULT_PAGE_SIZE = 100
21
+ _MAX_NODES = 2000
22
+ _DEFAULT_MAX_RETRIES = 3
23
+ _DEFAULT_RETRY_DELAY = 2.0 # seconds, doubles each retry
24
+
25
+
26
+ def _fetch_page_with_retry(
27
+ api_call: Callable[..., list[Any]],
28
+ *args: Any,
29
+ max_retries: int = _DEFAULT_MAX_RETRIES,
30
+ retry_delay: float = _DEFAULT_RETRY_DELAY,
31
+ page_description: str = "page",
32
+ **kwargs: Any,
33
+ ) -> list[Any]:
34
+ """单页请求,失败时指数退避重试。仅重试网络/IO类瞬态错误。"""
35
+ if max_retries < 1:
36
+ raise ValueError("max_retries must be >= 1")
37
+
38
+ last_exception: Exception | None = None
39
+ delay = retry_delay
40
+
41
+ for attempt in range(max_retries):
42
+ try:
43
+ return api_call(*args, **kwargs)
44
+ except (ConnectionError, TimeoutError, OSError, InternalServerError) as e:
45
+ last_exception = e
46
+ if attempt < max_retries - 1:
47
+ logger.warning(
48
+ f"Zep {page_description} attempt {attempt + 1} failed: {str(e)[:100]}, retrying in {delay:.1f}s..."
49
+ )
50
+ time.sleep(delay)
51
+ delay *= 2
52
+ else:
53
+ logger.error(f"Zep {page_description} failed after {max_retries} attempts: {str(e)}")
54
+
55
+ assert last_exception is not None
56
+ raise last_exception
57
+
58
+
59
+ def fetch_all_nodes(
60
+ client: Zep,
61
+ graph_id: str,
62
+ page_size: int = _DEFAULT_PAGE_SIZE,
63
+ max_items: int = _MAX_NODES,
64
+ max_retries: int = _DEFAULT_MAX_RETRIES,
65
+ retry_delay: float = _DEFAULT_RETRY_DELAY,
66
+ ) -> list[Any]:
67
+ """分页获取图谱节点,最多返回 max_items 条(默认 2000)。每页请求自带重试。"""
68
+ all_nodes: list[Any] = []
69
+ cursor: str | None = None
70
+ page_num = 0
71
+
72
+ while True:
73
+ kwargs: dict[str, Any] = {"limit": page_size}
74
+ if cursor is not None:
75
+ kwargs["uuid_cursor"] = cursor
76
+
77
+ page_num += 1
78
+ batch = _fetch_page_with_retry(
79
+ client.graph.node.get_by_graph_id,
80
+ graph_id,
81
+ max_retries=max_retries,
82
+ retry_delay=retry_delay,
83
+ page_description=f"fetch nodes page {page_num} (graph={graph_id})",
84
+ **kwargs,
85
+ )
86
+ if not batch:
87
+ break
88
+
89
+ all_nodes.extend(batch)
90
+ if len(all_nodes) >= max_items:
91
+ all_nodes = all_nodes[:max_items]
92
+ logger.warning(f"Node count reached limit ({max_items}), stopping pagination for graph {graph_id}")
93
+ break
94
+ if len(batch) < page_size:
95
+ break
96
+
97
+ cursor = getattr(batch[-1], "uuid_", None) or getattr(batch[-1], "uuid", None)
98
+ if cursor is None:
99
+ logger.warning(f"Node missing uuid field, stopping pagination at {len(all_nodes)} nodes")
100
+ break
101
+
102
+ return all_nodes
103
+
104
+
105
+ def fetch_all_edges(
106
+ client: Zep,
107
+ graph_id: str,
108
+ page_size: int = _DEFAULT_PAGE_SIZE,
109
+ max_retries: int = _DEFAULT_MAX_RETRIES,
110
+ retry_delay: float = _DEFAULT_RETRY_DELAY,
111
+ ) -> list[Any]:
112
+ """分页获取图谱所有边,返回完整列表。每页请求自带重试。"""
113
+ all_edges: list[Any] = []
114
+ cursor: str | None = None
115
+ page_num = 0
116
+
117
+ while True:
118
+ kwargs: dict[str, Any] = {"limit": page_size}
119
+ if cursor is not None:
120
+ kwargs["uuid_cursor"] = cursor
121
+
122
+ page_num += 1
123
+ batch = _fetch_page_with_retry(
124
+ client.graph.edge.get_by_graph_id,
125
+ graph_id,
126
+ max_retries=max_retries,
127
+ retry_delay=retry_delay,
128
+ page_description=f"fetch edges page {page_num} (graph={graph_id})",
129
+ **kwargs,
130
+ )
131
+ if not batch:
132
+ break
133
+
134
+ all_edges.extend(batch)
135
+ if len(batch) < page_size:
136
+ break
137
+
138
+ cursor = getattr(batch[-1], "uuid_", None) or getattr(batch[-1], "uuid", None)
139
+ if cursor is None:
140
+ logger.warning(f"Edge missing uuid field, stopping pagination at {len(all_edges)} edges")
141
+ break
142
+
143
+ return all_edges