github-actions[bot] commited on
Commit
0134a08
·
1 Parent(s): b3226d1

Auto-sync from demo at Mon Dec 22 08:12:07 UTC 2025

Browse files
graphgen/common/init_llm.py CHANGED
@@ -4,7 +4,6 @@ from typing import Any, Dict, Optional
4
  import ray
5
 
6
  from graphgen.bases import BaseLLMWrapper
7
- from graphgen.common.init_storage import get_actor_handle
8
  from graphgen.models import Tokenizer
9
 
10
 
@@ -74,9 +73,9 @@ class LLMServiceProxy(BaseLLMWrapper):
74
  A proxy class to interact with the LLMServiceActor for distributed LLM operations.
75
  """
76
 
77
- def __init__(self, actor_name: str):
78
  super().__init__()
79
- self.actor_handle = get_actor_handle(actor_name)
80
  self._create_local_tokenizer()
81
 
82
  async def generate_answer(
@@ -128,25 +127,25 @@ class LLMFactory:
128
 
129
  actor_name = f"Actor_LLM_{model_type}"
130
  try:
131
- ray.get_actor(actor_name)
 
132
  except ValueError:
133
  print(f"Creating Ray actor for LLM {model_type} with backend {backend}.")
134
  num_gpus = float(config.pop("num_gpus", 0))
135
- actor = (
136
  ray.remote(LLMServiceActor)
137
  .options(
138
  name=actor_name,
139
  num_gpus=num_gpus,
140
- lifetime="detached",
141
  get_if_exists=True,
142
  )
143
  .remote(backend, config)
144
  )
145
 
146
  # wait for actor to be ready
147
- ray.get(actor.ready.remote())
148
 
149
- return LLMServiceProxy(actor_name)
150
 
151
 
152
  def _load_env_group(prefix: str) -> Dict[str, Any]:
 
4
  import ray
5
 
6
  from graphgen.bases import BaseLLMWrapper
 
7
  from graphgen.models import Tokenizer
8
 
9
 
 
73
  A proxy class to interact with the LLMServiceActor for distributed LLM operations.
74
  """
75
 
76
+ def __init__(self, actor_handle: ray.actor.ActorHandle):
77
  super().__init__()
78
+ self.actor_handle = actor_handle
79
  self._create_local_tokenizer()
80
 
81
  async def generate_answer(
 
127
 
128
  actor_name = f"Actor_LLM_{model_type}"
129
  try:
130
+ actor_handle = ray.get_actor(actor_name)
131
+ print(f"Using existing Ray actor: {actor_name}")
132
  except ValueError:
133
  print(f"Creating Ray actor for LLM {model_type} with backend {backend}.")
134
  num_gpus = float(config.pop("num_gpus", 0))
135
+ actor_handle = (
136
  ray.remote(LLMServiceActor)
137
  .options(
138
  name=actor_name,
139
  num_gpus=num_gpus,
 
140
  get_if_exists=True,
141
  )
142
  .remote(backend, config)
143
  )
144
 
145
  # wait for actor to be ready
146
+ ray.get(actor_handle.ready.remote())
147
 
148
+ return LLMServiceProxy(actor_handle)
149
 
150
 
151
  def _load_env_group(prefix: str) -> Dict[str, Any]:
graphgen/common/init_storage.py CHANGED
@@ -48,6 +48,9 @@ class KVStorageActor:
48
  def reload(self):
49
  return self.kv.reload()
50
 
 
 
 
51
 
52
  class GraphStorageActor:
53
  def __init__(self, backend: str, working_dir: str, namespace: str):
@@ -114,22 +117,14 @@ class GraphStorageActor:
114
  def reload(self):
115
  return self.graph.reload()
116
 
117
-
118
- def get_actor_handle(name: str):
119
- try:
120
- return ray.get_actor(name)
121
- except ValueError as exc:
122
- raise RuntimeError(
123
- f"Actor {name} not found. Make sure it is created before accessing."
124
- ) from exc
125
 
126
 
127
  class RemoteKVStorageProxy(BaseKVStorage):
128
- def __init__(self, namespace: str):
129
  super().__init__()
130
- self.namespace = namespace
131
- self.actor_name = f"Actor_KV_{namespace}"
132
- self.actor = get_actor_handle(self.actor_name)
133
 
134
  def data(self) -> Dict[str, Any]:
135
  return ray.get(self.actor.data.remote())
@@ -163,11 +158,9 @@ class RemoteKVStorageProxy(BaseKVStorage):
163
 
164
 
165
  class RemoteGraphStorageProxy(BaseGraphStorage):
166
- def __init__(self, namespace: str):
167
  super().__init__()
168
- self.namespace = namespace
169
- self.actor_name = f"Actor_Graph_{namespace}"
170
- self.actor = get_actor_handle(self.actor_name)
171
 
172
  def index_done_callback(self):
173
  return ray.get(self.actor.index_done_callback.remote())
@@ -235,27 +228,23 @@ class StorageFactory:
235
  def create_storage(backend: str, working_dir: str, namespace: str):
236
  if backend in ["json_kv", "rocksdb"]:
237
  actor_name = f"Actor_KV_{namespace}"
238
- try:
239
- ray.get_actor(actor_name)
240
- except ValueError:
241
- ray.remote(KVStorageActor).options(
242
- name=actor_name,
243
- lifetime="detached",
244
- get_if_exists=True,
245
- ).remote(backend, working_dir, namespace)
246
- return RemoteKVStorageProxy(namespace)
247
- if backend in ["networkx", "kuzu"]:
248
  actor_name = f"Actor_Graph_{namespace}"
249
- try:
250
- ray.get_actor(actor_name)
251
- except ValueError:
252
- ray.remote(GraphStorageActor).options(
253
- name=actor_name,
254
- lifetime="detached",
255
- get_if_exists=True,
256
- ).remote(backend, working_dir, namespace)
257
- return RemoteGraphStorageProxy(namespace)
258
- raise ValueError(f"Unknown storage backend: {backend}")
 
 
 
259
 
260
 
261
  def init_storage(backend: str, working_dir: str, namespace: str):
 
48
  def reload(self):
49
  return self.kv.reload()
50
 
51
+ def ready(self) -> bool:
52
+ return True
53
+
54
 
55
  class GraphStorageActor:
56
  def __init__(self, backend: str, working_dir: str, namespace: str):
 
117
  def reload(self):
118
  return self.graph.reload()
119
 
120
+ def ready(self) -> bool:
121
+ return True
 
 
 
 
 
 
122
 
123
 
124
  class RemoteKVStorageProxy(BaseKVStorage):
125
+ def __init__(self, actor_handle: ray.actor.ActorHandle):
126
  super().__init__()
127
+ self.actor = actor_handle
 
 
128
 
129
  def data(self) -> Dict[str, Any]:
130
  return ray.get(self.actor.data.remote())
 
158
 
159
 
160
  class RemoteGraphStorageProxy(BaseGraphStorage):
161
+ def __init__(self, actor_handle: ray.actor.ActorHandle):
162
  super().__init__()
163
+ self.actor = actor_handle
 
 
164
 
165
  def index_done_callback(self):
166
  return ray.get(self.actor.index_done_callback.remote())
 
228
  def create_storage(backend: str, working_dir: str, namespace: str):
229
  if backend in ["json_kv", "rocksdb"]:
230
  actor_name = f"Actor_KV_{namespace}"
231
+ actor_class = KVStorageActor
232
+ proxy_class = RemoteKVStorageProxy
233
+ elif backend in ["networkx", "kuzu"]:
 
 
 
 
 
 
 
234
  actor_name = f"Actor_Graph_{namespace}"
235
+ actor_class = GraphStorageActor
236
+ proxy_class = RemoteGraphStorageProxy
237
+ else:
238
+ raise ValueError(f"Unknown storage backend: {backend}")
239
+ try:
240
+ actor_handle = ray.get_actor(actor_name)
241
+ except ValueError:
242
+ actor_handle = ray.remote(actor_class).options(
243
+ name=actor_name,
244
+ get_if_exists=True,
245
+ ).remote(backend, working_dir, namespace)
246
+ ray.get(actor_handle.ready.remote())
247
+ return proxy_class(actor_handle)
248
 
249
 
250
  def init_storage(backend: str, working_dir: str, namespace: str):
graphgen/engine.py CHANGED
@@ -1,8 +1,10 @@
 
1
  import inspect
2
  import logging
3
  from collections import defaultdict, deque
4
  from functools import wraps
5
  from typing import Any, Callable, Dict, List, Set
 
6
 
7
  import ray
8
  import ray.data
@@ -10,7 +12,9 @@ from ray.data import DataContext
10
 
11
  from graphgen.bases import Config, Node
12
  from graphgen.utils import logger
 
13
 
 
14
 
15
  class Engine:
16
  def __init__(
@@ -20,6 +24,8 @@ class Engine:
20
  self.global_params = self.config.global_params
21
  self.functions = functions
22
  self.datasets: Dict[str, ray.data.Dataset] = {}
 
 
23
 
24
  ctx = DataContext.get_current()
25
  ctx.enable_rich_progress_bars = False
@@ -29,6 +35,16 @@ class Engine:
29
  ctx.enable_tensor_extension_casting = False
30
  ctx._metrics_export_port = 0 # Disable metrics exporter to avoid RpcError
31
 
 
 
 
 
 
 
 
 
 
 
32
  if not ray.is_initialized():
33
  context = ray.init(
34
  ignore_reinit_error=True,
@@ -38,6 +54,59 @@ class Engine:
38
  )
39
  logger.info("Ray Dashboard URL: %s", context.dashboard_url)
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  @staticmethod
42
  def _topo_sort(nodes: List[Node]) -> List[Node]:
43
  id_to_node: Dict[str, Node] = {}
 
1
+ import os
2
  import inspect
3
  import logging
4
  from collections import defaultdict, deque
5
  from functools import wraps
6
  from typing import Any, Callable, Dict, List, Set
7
+ from dotenv import load_dotenv
8
 
9
  import ray
10
  import ray.data
 
12
 
13
  from graphgen.bases import Config, Node
14
  from graphgen.utils import logger
15
+ from graphgen.common import init_llm, init_storage
16
 
17
+ load_dotenv()
18
 
19
  class Engine:
20
  def __init__(
 
24
  self.global_params = self.config.global_params
25
  self.functions = functions
26
  self.datasets: Dict[str, ray.data.Dataset] = {}
27
+ self.llm_actors = {}
28
+ self.storage_actors = {}
29
 
30
  ctx = DataContext.get_current()
31
  ctx.enable_rich_progress_bars = False
 
35
  ctx.enable_tensor_extension_casting = False
36
  ctx._metrics_export_port = 0 # Disable metrics exporter to avoid RpcError
37
 
38
+ all_env_vars = os.environ.copy()
39
+ if "runtime_env" not in ray_init_kwargs:
40
+ ray_init_kwargs["runtime_env"] = {}
41
+
42
+ existing_env_vars = ray_init_kwargs["runtime_env"].get("env_vars", {})
43
+ ray_init_kwargs["runtime_env"]["env_vars"] = {
44
+ **all_env_vars,
45
+ **existing_env_vars
46
+ }
47
+
48
  if not ray.is_initialized():
49
  context = ray.init(
50
  ignore_reinit_error=True,
 
54
  )
55
  logger.info("Ray Dashboard URL: %s", context.dashboard_url)
56
 
57
+ self._init_llms()
58
+ self._init_storage()
59
+
60
+ def _init_llms(self):
61
+ self.llm_actors["synthesizer"] = init_llm("synthesizer")
62
+ self.llm_actors["trainee"] = init_llm("trainee")
63
+
64
+ def _init_storage(self):
65
+ kv_namespaces, graph_namespaces = self._scan_storage_requirements()
66
+ working_dir = self.global_params["working_dir"]
67
+
68
+ for node_id in kv_namespaces:
69
+ proxy = init_storage(self.global_params["kv_backend"], working_dir, node_id)
70
+ self.storage_actors[f"kv_{node_id}"] = proxy
71
+ logger.info("Create KV Storage Actor: namespace=%s", node_id)
72
+
73
+ for ns in graph_namespaces:
74
+ proxy = init_storage(self.global_params["graph_backend"], working_dir, ns)
75
+ self.storage_actors[f"graph_{ns}"] = proxy
76
+ logger.info("Create Graph Storage Actor: namespace=%s", ns)
77
+
78
+ def _scan_storage_requirements(self) -> tuple[set[str], set[str]]:
79
+ kv_namespaces = set()
80
+ graph_namespaces = set()
81
+
82
+ # TODO: Temporarily hard-coded; node storage will be centrally managed later.
83
+ for node in self.config.nodes:
84
+ op_name = node.op_name
85
+ if self._function_needs_param(op_name, "kv_backend"):
86
+ kv_namespaces.add(op_name)
87
+ if self._function_needs_param(op_name, "graph_backend"):
88
+ graph_namespaces.add("graph")
89
+ return kv_namespaces, graph_namespaces
90
+
91
+ def _function_needs_param(self, op_name: str, param_name: str) -> bool:
92
+ if op_name not in self.functions:
93
+ return False
94
+
95
+ func = self.functions[op_name]
96
+
97
+ if inspect.isclass(func):
98
+ try:
99
+ sig = inspect.signature(func.__init__)
100
+ return param_name in sig.parameters
101
+ except (ValueError, TypeError):
102
+ return False
103
+
104
+ try:
105
+ sig = inspect.signature(func)
106
+ return param_name in sig.parameters
107
+ except (ValueError, TypeError):
108
+ return False
109
+
110
  @staticmethod
111
  def _topo_sort(nodes: List[Node]) -> List[Node]:
112
  id_to_node: Dict[str, Node] = {}
graphgen/run.py CHANGED
@@ -6,7 +6,6 @@ from typing import Any, Dict
6
 
7
  import ray
8
  import yaml
9
- from dotenv import load_dotenv
10
  from ray.data.block import Block
11
  from ray.data.datasource.filename_provider import FilenameProvider
12
 
@@ -16,8 +15,6 @@ from graphgen.utils import CURRENT_LOGGER_VAR, logger, set_logger
16
 
17
  sys_path = os.path.abspath(os.path.dirname(__file__))
18
 
19
- load_dotenv()
20
-
21
 
22
  def set_working_dir(folder):
23
  os.makedirs(folder, exist_ok=True)
 
6
 
7
  import ray
8
  import yaml
 
9
  from ray.data.block import Block
10
  from ray.data.datasource.filename_provider import FilenameProvider
11
 
 
15
 
16
  sys_path = os.path.abspath(os.path.dirname(__file__))
17
 
 
 
18
 
19
  def set_working_dir(folder):
20
  os.makedirs(folder, exist_ok=True)