github-actions[bot] commited on
Commit
44c2466
·
1 Parent(s): a84e551

Auto-sync from demo at Mon Mar 30 08:27:48 UTC 2026

Browse files
graphgen/bases/base_operator.py CHANGED
@@ -92,8 +92,11 @@ class BaseOperator(ABC):
92
  is_first = True
93
  for res in result:
94
  yield pd.DataFrame([res])
95
- self.store([res], meta_update if is_first else {})
 
 
96
  is_first = False
 
97
  else:
98
  yield pd.DataFrame(result)
99
  self.store(result, meta_update)
@@ -141,7 +144,7 @@ class BaseOperator(ABC):
141
  recovered_chunks = [c for c in recovered_chunks if c is not None]
142
  return to_process, pd.DataFrame(recovered_chunks)
143
 
144
- def store(self, results: list, meta_update: dict):
145
  results = convert_to_serializable(results)
146
  meta_update = convert_to_serializable(meta_update)
147
 
@@ -159,7 +162,8 @@ class BaseOperator(ABC):
159
  for v in v_list:
160
  inverse_meta[v] = k
161
  self.kv_storage.update({"_meta_inverse": inverse_meta})
162
- self.kv_storage.index_done_callback()
 
163
 
164
  @abstractmethod
165
  def process(self, batch: list) -> Tuple[Union[list, Iterable[dict]], dict]:
 
92
  is_first = True
93
  for res in result:
94
  yield pd.DataFrame([res])
95
+ self.store(
96
+ [res], meta_update if is_first else {}, flush=False
97
+ )
98
  is_first = False
99
+ self.kv_storage.index_done_callback()
100
  else:
101
  yield pd.DataFrame(result)
102
  self.store(result, meta_update)
 
144
  recovered_chunks = [c for c in recovered_chunks if c is not None]
145
  return to_process, pd.DataFrame(recovered_chunks)
146
 
147
+ def store(self, results: list, meta_update: dict, flush: bool = True):
148
  results = convert_to_serializable(results)
149
  meta_update = convert_to_serializable(meta_update)
150
 
 
162
  for v in v_list:
163
  inverse_meta[v] = k
164
  self.kv_storage.update({"_meta_inverse": inverse_meta})
165
+ if flush:
166
+ self.kv_storage.index_done_callback()
167
 
168
  @abstractmethod
169
  def process(self, batch: list) -> Tuple[Union[list, Iterable[dict]], dict]:
graphgen/storage/kv/rocksdb_storage.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import os
2
  from dataclasses import dataclass
3
  from typing import Any, Dict, List, Set
@@ -8,6 +9,8 @@ from rocksdict import Rdict
8
 
9
  from graphgen.bases.base_storage import BaseKVStorage
10
 
 
 
11
 
12
  @dataclass
13
  class RocksDBKVStorage(BaseKVStorage):
@@ -17,8 +20,10 @@ class RocksDBKVStorage(BaseKVStorage):
17
  def __post_init__(self):
18
  self._db_path = os.path.join(self.working_dir, f"{self.namespace}.db")
19
  self._db = Rdict(self._db_path)
20
- print(
21
- f"RocksDBKVStorage initialized for namespace '{self.namespace}' at '{self._db_path}'"
 
 
22
  )
23
 
24
  @property
@@ -30,7 +35,7 @@ class RocksDBKVStorage(BaseKVStorage):
30
 
31
  def index_done_callback(self):
32
  self._db.flush()
33
- print(f"RocksDB flushed for {self.namespace}")
34
 
35
  def get_by_id(self, id: str) -> Any:
36
  return self._db.get(id, None)
 
1
+ import logging
2
  import os
3
  from dataclasses import dataclass
4
  from typing import Any, Dict, List, Set
 
9
 
10
  from graphgen.bases.base_storage import BaseKVStorage
11
 
12
+ logger = logging.getLogger(__name__)
13
+
14
 
15
  @dataclass
16
  class RocksDBKVStorage(BaseKVStorage):
 
20
  def __post_init__(self):
21
  self._db_path = os.path.join(self.working_dir, f"{self.namespace}.db")
22
  self._db = Rdict(self._db_path)
23
+ logger.debug(
24
+ "RocksDBKVStorage initialized for namespace '%s' at '%s'",
25
+ self.namespace,
26
+ self._db_path,
27
  )
28
 
29
  @property
 
35
 
36
  def index_done_callback(self):
37
  self._db.flush()
38
+ logger.debug("RocksDB flushed for %s", self.namespace)
39
 
40
  def get_by_id(self, id: str) -> Any:
41
  return self._db.get(id, None)