GraphGen / graphgen /bases /base_operator.py
github-actions[bot]
Auto-sync from demo at Thu Dec 25 14:16:44 UTC 2025
31df32c
import inspect
import os
from abc import ABC, abstractmethod
from typing import Iterable, Union
import pandas as pd
import ray
class BaseOperator(ABC):
def __init__(self, working_dir: str = "cache", op_name: str = None):
# lazy import to avoid circular import
from graphgen.utils import set_logger
log_dir = os.path.join(working_dir, "logs")
self.op_name = op_name or self.__class__.__name__
try:
ctx = ray.get_runtime_context()
worker_id = ctx.get_actor_id() or ctx.get_worker_id()
worker_id_short = worker_id[-6:] if worker_id else "driver"
except Exception as e:
print(
"Warning: Could not get Ray worker ID, defaulting to 'local'. Exception:",
e,
)
worker_id_short = "local"
# e.g. cache/logs/ChunkService_a1b2c3.log
log_file = os.path.join(log_dir, f"{self.op_name}_{worker_id_short}.log")
self.logger = set_logger(
log_file=log_file, name=f"{self.op_name}.{worker_id_short}", force=True
)
self.logger.info(
"[%s] Operator initialized on Worker %s", self.op_name, worker_id_short
)
def __call__(
self, batch: pd.DataFrame
) -> Union[pd.DataFrame, Iterable[pd.DataFrame]]:
# lazy import to avoid circular import
from graphgen.utils import CURRENT_LOGGER_VAR
logger_token = CURRENT_LOGGER_VAR.set(self.logger)
try:
result = self.process(batch)
if inspect.isgenerator(result):
yield from result
else:
yield result
finally:
CURRENT_LOGGER_VAR.reset(logger_token)
@abstractmethod
def process(self, batch):
raise NotImplementedError("Subclasses must implement the process method.")
def get_logger(self):
return self.logger